TestApiController.java 9.6 KB
package com.aukey.example.web;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.aukey.example.constant.DwApi;
import com.aukey.example.entity.CurrencySet;
import com.aukey.example.entity.FulfillmentCurrentInventory;
import com.aukey.example.util.DwHelperUtil;
import com.aukey.example.vo.DwParamVo;
import com.aukey.example.vo.DwResultVo;
import com.aukey.example.vo.ObjVo;
import com.aukey.example.vo.SlVo;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

/**
 * @author: wgf
 * @create: 2020-05-13 16:34
 * @description: API调用测试
 * <p>
 * ****************************************************************************************
 * *                                                                                      *
 * * 这里的TEST代码写在controller是方便使用swagger2本地调试,正常业务应该是在项目的定时任务里 *
 * *                                                                                      *
 * ****************************************************************************************
 * <p>
 * Demo 提供三种数据拉取方式,
 * 第一种:直接调用API适合[1, 3000]数据读取
 * 第二种:分页适合[1, 1000000]数据读取,数据基数过大深层分页会影响查询性能,并且客户端需要不断发起请求
 * 第三种:流式读取适合[1, 10000000]数据读取,不存在深层分页性能影响,并且客户端只需要发起一次请求
 **/
@RestController
@Api(tags = "API调用DEMO")
public class TestApiController {
    private Logger log = LoggerFactory.getLogger(TestApiController.class);

    @Value("${dwDataApi:null}")
    private String dwDataApi;

    @Value("${dwAppId:null}")
    private String appId;


    @ApiOperation(value = "小批量读取 OBJ 方式")
    @GetMapping("/query_all_obj")
    public void queryAllObj() {

        // 构造请求参数
        DwParamVo paramVo = new DwParamVo(this.appId, TokenController.getCurrentToken());

        // 添加查询条件
        paramVo.setQueryCondition("WHERE currency_code = 'USD'");
        // 每页数据条数 dw服务端有限制 取值[1, 3000]
        paramVo.setPageSize(3000);
        paramVo.setDataStructure(DwParamVo.OBJ);

        // 调用API
        String result = DwHelperUtil.doGet(dwDataApi, DwApi.CURRENCY_SET_API, paramVo.toMap());

        if (StringUtils.isEmpty(result)) {
            throw new RuntimeException(String.format("API %s 调用失败", DwApi.CURRENCY_SET_API));
        }
        // json解析为对象
        DwResultVo<ObjVo<CurrencySet>> resultVo = JSON.parseObject(result, new TypeReference<DwResultVo<ObjVo<CurrencySet>>>() {
        });

        log.info("");
        log.info("API请求状态:{}", resultVo.getMessage());
        log.info("API返回数据:{}", JSON.toJSONString(resultVo.getData().getData()));
        log.info("API返回数据量:{}", resultVo.getData().getTotal());
        log.info("");
    }


    @ApiOperation(value = "小批量读取 SL 方式")
    @GetMapping("/query_all_sl")
    public void queryAllSl() {

        // 构造请求参数
        DwParamVo paramVo = new DwParamVo(this.appId, TokenController.getCurrentToken());

        // 添加查询条件
        paramVo.setQueryCondition("WHERE currency_code = 'USD'");
        // 每页数据条数 dw服务端有限制 取值[1, 3000]
        paramVo.setPageSize(3000);
        paramVo.setDataStructure(DwParamVo.SL);

        // 调用API
        String result = DwHelperUtil.doGet(dwDataApi, DwApi.CURRENCY_SET_API, paramVo.toMap());

        if (StringUtils.isEmpty(result)) {
            throw new RuntimeException(String.format("API %s 调用失败", DwApi.CURRENCY_SET_API));
        }

        // json解析为对象
        DwResultVo<SlVo> resultVo = JSON.parseObject(result, new TypeReference<DwResultVo<SlVo>>() {
        });

        log.info("");
        log.info("API请求状态:{}", resultVo.getMessage());
        log.info("API返回数据字段:{}", JSON.toJSONString(resultVo.getData().getData().getHeadData()));
        log.info("API返回数据数组:{}", JSON.toJSONString(resultVo.getData().getData().getColumnDataList()));
        log.info("API返回数据量:{}", resultVo.getData().getTotal());
        log.info("");
    }


    /**
     * 适合获取 百万级别的表,大表用分页读取会非常慢
     * 大表建议使用流式读取
     */
    @ApiOperation(value = "分页读取 OBJ 方式")
    @GetMapping("/query_page_obj")
    public void queryPageObj() {

        // 构造请求参数
        DwParamVo paramVo = new DwParamVo(this.appId, TokenController.getCurrentToken());
        // 每页数据条数 dw服务端有限制 取值[1, 3000]
        paramVo.setPageSize(1000);
        paramVo.setDataStructure(DwParamVo.OBJ);
        // TODO 自定义查询条件

        // json字符串转实体引用类型
        TypeReference typeReference = new TypeReference<DwResultVo<ObjVo<CurrencySet>>>() {
        };

        // 回调函数
        Function<DwResultVo<ObjVo<CurrencySet>>, Integer> callback = (DwResultVo<ObjVo<CurrencySet>> resultVo) -> {
            List<CurrencySet> dataList = resultVo.getData().getData();
            System.out.println(dataList.size());
            // TODO 业务逻辑在这里实现
            // ... more
            // mapper.insertList(dataList)

            // 需要返回总页数
            return resultVo.getData().getTotal();
        };

        // 使用分页API
        DwHelperUtil.pageReader(callback,
                dwDataApi,
                DwApi.CURRENCY_SET_API,
                paramVo,
                typeReference);
    }


    /**
     * 适合获取 百万级别的表,大表用分页读取会非常慢
     * 大表建议使用流式读取
     */
    @ApiOperation(value = "分页读取 SL 方式")
    @GetMapping("/query_page_sl")
    public void queryPageSl() {

        // 构造请求参数
        DwParamVo paramVo = new DwParamVo(this.appId, TokenController.getCurrentToken());
        // 每页数据条数 dw服务端有限制 取值[1, 3000]
        paramVo.setPageSize(1000);
        paramVo.setDataStructure(DwParamVo.SL);
        // TODO 自定义查询条件

        // json字符串转实体引用类型
        TypeReference typeReference = new TypeReference<DwResultVo<SlVo>>() {
        };

        // 回调函数
        Function<DwResultVo<SlVo>, Integer> callback = (DwResultVo<SlVo> resultVo) -> {
            log.info("字段字段:{}", resultVo.getData().getData().getHeadData());
            log.info("数据数组:{}", resultVo.getData().getData().getColumnDataList());
            // TODO 业务逻辑在这里实现
            // ... more
            // mapper.insertList(dataList)

            // 需要返回总页数
            return resultVo.getData().getTotal();
        };

        // 使用分页API
        DwHelperUtil.pageReader(callback,
                dwDataApi,
                DwApi.CURRENCY_SET_API,
                paramVo,
                typeReference);
    }


    /**
     * 流式读取适合百级别的表内网数据同步,每次数据读取建议在[100000, 1000000],本机测试
     * <p>
     * 网络环境:局域网
     * 数据源  :华为dws
     * cpu     : 4核3.2GHz
     * 内存    :16GB DDR3
     * 测试数据:100W条
     * 回调函数不做持久化操作
     * <p>
     * <p>
     * 测试报表
     * **************************************
     * *
     * *        每秒获取条数     :1700
     * *        耗时             :100W / 5分钟
     * *        CPU毛刺          :1% - 5% 波动
     * *        内存毛刺         :50Mb - 150Mb 波动
     * *
     * **************************************
     */
    @ApiOperation(value = "流式读取 DEMO")
    @GetMapping("/query_stream")
    public void queryStream() {
        long stratTime = System.currentTimeMillis();

        try {
            // 构造请求参数
            DwParamVo paramVo = new DwParamVo(this.appId, TokenController.getCurrentToken());
            // 设置为流式读取
            paramVo.setStream("Y");
            // TODO 自定义查询条件

            // 定义实体构造函数
            Supplier<FulfillmentCurrentInventory> constructor = FulfillmentCurrentInventory::new;

            // 流式读取回调函数
            Consumer<List<FulfillmentCurrentInventory>> callBack = (List<FulfillmentCurrentInventory> list) -> {
                System.out.println(list.size());
                // TODO 业务逻辑在这里实现
                // ... more
                // mapper.insertList(list)
            };

            // 指定回调函数数据大小,取值[1000, 3000]. 建议1000最为稳定
            // 取值越大,数据读取占用的堆内存越高
            int batchSize = 1000;

            // 使用 StreamAPI
            DwHelperUtil.streamReader(
                    dwDataApi,
                    DwApi.FULFILLMENT_CURRENT_INVENTORY,
                    paramVo.toMap(),
                    callBack,
                    batchSize,
                    constructor);

        } catch (Exception e) {
            log.info("流式读取异常", e);
        } finally {
            log.info("流式读取使用时间 {} 秒", (System.currentTimeMillis() - stratTime) / 1000.0f);
        }
    }
}