TestApiController.java 7.4 KB
package com.aukey.example.web;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.aukey.example.conf.StreamReaderHandler;
import com.aukey.example.entity.CurrencySet;
import com.aukey.example.entity.FbaFulfillmentCurrentInventoryView;
import com.aukey.example.util.DwUtil;
import com.aukey.example.vo.DwParamVo;
import com.aukey.example.vo.DwResultVo;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
 * @author: wgf
 * @create: 2020-05-13 16:34
 * @description: API调用测试
 * <p>
 * ****************************************************************************************
 * *                                                                                      *
 * * 这里的TEST代码写在controller是方便使用swagger2本地调试,正常业务应该是在项目的定时任务里 *
 * *                                                                                      *
 * ****************************************************************************************
 * <p>
 * Demo 提供三种数据拉取方式,
 * 第一种:直接调用API适合[1, 100000]数据读取
 * 第二种:分页适合[1, 1000000]数据读取,数据基数过大深层分页会影响查询性能,并且客户端需要不断发起请求
 * 第三种:流式读取适合[1, 10000000]数据读取,不存在深层分页性能影响,并且客户端只需要发起一次请求
 **/
@RestController
@Api(tags = "API调用DEMO")
public class TestApiController {
    private Logger log = LoggerFactory.getLogger(TestApiController.class);

    // API需要在傲基数仓申请授权
    public static final String CURRENCY_SET_API                           = "/base/currency_set";
    public static final String FBA_FULFILLMENT_CURRENT_INVENTORY_VIEW_API = "/stock/fba_fulfillment_current_inventory_view";

    @Value("${dwDataApi:null}")
    private String dwDataApi;

    @Value("${dwAppId:null}")
    private String appId;

    @ApiOperation(value = "获取前100条汇率 DEMO")
    @GetMapping("/query_all")
    public void queryAll() {

        // 构造请求参数
        DwParamVo paramVo = new DwParamVo(this.appId, TokenController.getCurrentToken());

        // 添加查询条件
        paramVo.setQueryCondition("WHERE currency_code = 'USD'");

        // 指定获取条数
        paramVo.setOffset(0);
        paramVo.setLimit(100);

        // 调用API
        String result = DwUtil.doGet(dwDataApi, CURRENCY_SET_API, paramVo.toMap());

        if (StringUtils.isEmpty(result)) {
            throw new RuntimeException(String.format("API %s 调用失败", CURRENCY_SET_API));
        }
        // json解析为对象
        DwResultVo<CurrencySet> resultVo = JSON.parseObject(result, new TypeReference<DwResultVo<CurrencySet>>() {
        });

        log.info("");
        log.info("↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓");
        log.info("API请求状态:{}", resultVo.getMessage());
        log.info("API TOP100 数据:{}", JSON.toJSONString(resultVo.getData()));
        log.info("↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑");
        log.info("");
    }


    @ApiOperation(value = "分页查询 DEMO")
    @GetMapping("/query_page")
    public void queryPage() {

        // 构造请求参数
        DwParamVo paramVo = new DwParamVo(this.appId, TokenController.getCurrentToken());

        // 页数
        int pageNum = 0;
        // 每页数据大小
        final int pageSize = 1000;
        // 当前页数据
        int currentPageSize;

        // 分页请求
        do {
            paramVo.setToken(TokenController.getCurrentToken());
            paramVo.setOffset(pageNum * pageSize);
            paramVo.setLimit(pageSize);
            // TODO 自定义查询条件

            String result = DwUtil.doGet(dwDataApi, CURRENCY_SET_API, paramVo.toMap());
            if (StringUtils.isEmpty(result)) {
                throw new RuntimeException(String.format("API %s 调用失败", CURRENCY_SET_API));
            }
            // json解析为对象
            DwResultVo<CurrencySet> resultVo = JSON.parseObject(result, new TypeReference<DwResultVo<CurrencySet>>() {
            });

            currentPageSize = CollectionUtils.isEmpty(resultVo.getData()) ? 0 : resultVo.getData().size();
            pageNum++;
            log.info("========== 获取API:{} 第:{}页数据 数据条数:{} ==========", CURRENCY_SET_API, pageNum + 1, currentPageSize);
            // TODO 自定义实现持久化逻辑

        } while (currentPageSize == pageSize);
    }


    /**
     * 流式读取适合百/千万级别的内网数据同步,本机测试
     * <p>
     * 网络环境:局域网
     * 数据源  :华为dws
     * cpu     : 4核3.2GHz
     * 内存    :16GB DDR3
     * 测试数据:1000W条
     * 回调函数不做持久化操作
     * <p>
     * <p>
     * 测试报表
     * **************************************
     * *
     * *        QPS     :1800
     * *        耗时    :100W / 5分钟
     * *        CPU毛刺 :1% - 5% 波动
     * *        内存毛刺:50Mb - 150Mb 波动
     * *
     * **************************************
     */
    @ApiOperation(value = "流式读取 DEMO")
    @GetMapping("/query_stream")
    public void queryStream() {
        long stratTime = System.currentTimeMillis();

        try {
            // 构造请求参数
            DwParamVo paramVo = new DwParamVo(this.appId, TokenController.getCurrentToken());
            // 设置为流式读取
            paramVo.setStream("Y");
            // TODO 自定义查询条件
            paramVo.setOffset(3000000);

            // 定义实体构造函数
            Supplier<FbaFulfillmentCurrentInventoryView> constructor = FbaFulfillmentCurrentInventoryView::new;

            // 流式读取回调函数
            Consumer<List<FbaFulfillmentCurrentInventoryView>> callBack = (List<FbaFulfillmentCurrentInventoryView> list) -> {
                // TODO 自定义实现持久化逻辑
                // ... more
                // mapper.insertList(list)
            };

            // 指定回调函数数据大小,取值[500, 5000].
            // 取值越大,数据读取占用的堆内存越高
            int batchSize = 5000;

            // 使用 StreamAPI
            StreamReaderHandler.reader(
                    dwDataApi,
                    FBA_FULFILLMENT_CURRENT_INVENTORY_VIEW_API,
                    paramVo.toMap(),
                    callBack,
                    batchSize,
                    constructor);

        } catch (Exception e) {
            log.info("流式读取异常", e);
        } finally {
            log.info("流式读取使用时间 {} 秒", (System.currentTimeMillis() - stratTime) / 1000.0f);
        }
    }
}