作者 [wgf]

提交demo项目

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.7.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.aukey.example</groupId>
<artifactId>dw-example</artifactId>
<version>0.0.1</version>
<name>dw-example</name>
<description>dw客户端demo项目</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 在线文档,实际使用不需要依赖 !-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.7.0</version>
</dependency>
<!-- 在线文档end !-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>3.14.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
... ...
### dw example 使用手册
修改配置文件
---
修改 application.yml 的 `dwAppId`、 `dwAppSecret` 配置
- `dwAppId`:傲基数仓 APP 模块的 AppId。
- `dwAppSecret`:傲基数仓 APP 模块的 应用秘钥。
提供三种数据请求方案
---
- 全量请求:一次http请求获取所有数据,有数量限制,最大1000条。适合小批量数据同步。
- 分页请求:多次http请求获取所有数据,分页参数`pageNumber`, `pageSize(取值范围[1,3000])`. 适合中批量数据同步。
- 流式请求:一次http请求获取所有数据,没有数量限制,要求网络稳定,最好是内网环境。适合大批量数据同步。
token更新
---
`TokenJob` 为tokne更新定时任务
`TokenController` token更新回调地址
定时任务定点调用dw服务刷新token api,然后dw将最新token作为参数调用数仓平台上对应的APP配置
的回调地址,将token传递给客户端。
如何使用
---
1. 申请API
2. 启动项目
3. 访问 http://localhost:8099/swagger-ui.html
4. 源码 demo 在 `TestApiController`
... ...
package com.aukey.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableScheduling;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@EnableSwagger2
@EnableScheduling
@SpringBootApplication
public class DwExampleApplication {
public static void main(String[] args) {
SpringApplication.run(DwExampleApplication.class, args);
}
@Bean
public Docket createRestApi() {
Docket docket = new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
.select()
//controller所在包
.apis(RequestHandlerSelectors.basePackage("com.aukey.example.web"))
.paths(PathSelectors.any())
.build();
return docket;
}
//构建 api文档的详细信息函数
private ApiInfo apiInfo() {
return new ApiInfoBuilder()
.title("dw example")
.description("")
.termsOfServiceUrl("")
.version("1.0.0")
.build();
}
}
... ...
package com.aukey.example.conf;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONReader;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
/**
* 描述:流式读取帮助类
* 创建者: wgf
* 创建时间:2020年5月20日 10:03:15
**/
public class StreamReaderHandler {
private static Logger log = LoggerFactory.getLogger(StreamReaderHandler.class);
private StreamReaderHandler() {
}
/**
* @param dwDataApi dw数据请求接口
* @param api 平台申请的api
* @param paramMap 参数
* @param callback 业务回调
* @param batchSize 业务回调数据量大小,参考值 [500,5000]
* @param constructor 实体的构造函数
* @throws Exception
*/
public static <T> void reader(String dwDataApi, String api,
Map<String, Object> paramMap,
Consumer<List<T>> callback,
int batchSize,
Supplier<T> constructor) throws Exception {
dwDataApi = dwDataApi + api;
Response response = null;
InputStream is = null;
Reader reader = null;
JSONReader jsonReader = null;
try {
response = executeHttpPostRequest(dwDataApi, paramMap);
if (response.code() == 200) {
List<T> container = new ArrayList<>();
int total = 0;
// 从响应中获取流
is = response.body().byteStream();
reader = new InputStreamReader(is);
jsonReader = new JSONReader(reader);
// 开始读取
jsonReader.startArray();
while (jsonReader.hasNext()) {
try {
T t = constructor.get();
jsonReader.readObject(t);
container.add(t);
} catch (JSONException ex) {
Object o = jsonReader.readObject();
log.info("json数据转换异常:{}", o.toString());
}
if (container.size() == batchSize) {
callback.accept(container);
total += batchSize;
log.info("{} 流式读取已读条数:{}", dwDataApi, total);
container.clear();
}
}
// 处理最后一批不足 batchSize 的数据
if (container.size() > 0) {
callback.accept(container);
total += container.size();
log.info("{} 流式读取已读条数:{}", dwDataApi, total);
container.clear();
}
//结束读取
jsonReader.endArray();
} else {
log.info("流式读取异常 [ url:{} ] [ code:{} ]", dwDataApi, response.code());
}
} finally {
if (Objects.nonNull(jsonReader)) {
jsonReader.close();
}
if (Objects.nonNull(reader)) {
reader.close();
}
if (Objects.nonNull(is)) {
is.close();
}
if (Objects.nonNull(response)) {
response.close();
}
}
}
protected static Response executeHttpPostRequest(String url, Map<String, Object> paramMap) throws Exception {
OkHttpClient client = new OkHttpClient.Builder()
.connectTimeout(60 * 30 * 2, TimeUnit.SECONDS)
.readTimeout(60 * 1, TimeUnit.SECONDS)
.build();
StringBuilder sb = new StringBuilder(url);
if (paramMap != null && !paramMap.isEmpty()) {
boolean isFirst = true;
for (Map.Entry<String, Object> entry : paramMap.entrySet()) {
if (Objects.isNull(entry.getValue())) {
continue;
}
if (isFirst) {
sb.append("?");
isFirst = false;
} else {
sb.append("&");
}
// 兼容参数特殊符号
String value = String.valueOf(entry.getValue());
value = URLEncoder.encode(value, "UTF-8");
value = value.replaceAll("\\+", "%20");
sb.append(entry.getKey())
.append("=")
.append(value);
}
url = sb.toString();
}
Request request = new Request.Builder().url(url).build();
Response response = client.newCall(request).execute();
return response;
}
}
... ...
package com.aukey.example.conf;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.async.TimeoutCallableProcessingInterceptor;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
/**
* @author: wgf
* @create: 2020-05-19 12:56
* @description:
**/
//@Configuration
public class WebMvcConfig implements WebMvcConfigurer {
@Override
public void configureAsyncSupport(final AsyncSupportConfigurer configurer) {
configurer.setDefaultTimeout(30 * 60 * 1000);
configurer.registerCallableInterceptors(timeoutInterceptor());
}
@Bean
public TimeoutCallableProcessingInterceptor timeoutInterceptor() {
return new TimeoutCallableProcessingInterceptor();
}
}
... ...
package com.aukey.example.entity;
import java.math.BigDecimal;
import java.util.Date;
/**
* @author: wgf
* @create: 2020-05-13 14:38
* @description: 汇率实体
**/
public class CurrencySet {
private Integer currencyId;
private String currencyCode;
private String currencyName;
private BigDecimal currencyRate;
private String baseCurrency;
private Integer createBy;
private Date createTime;
private String updateBy;
private Date updateTime;
private String dataStatus;
private Integer auditBy;
private String auditStatus;
private Date auditTime;
private String currencySymbol;
public Integer getCurrencyId() {
return currencyId;
}
public void setCurrencyId(Integer currencyId) {
this.currencyId = currencyId;
}
public String getCurrencyCode() {
return currencyCode;
}
public void setCurrencyCode(String currencyCode) {
this.currencyCode = currencyCode;
}
public String getCurrencyName() {
return currencyName;
}
public void setCurrencyName(String currencyName) {
this.currencyName = currencyName;
}
public BigDecimal getCurrencyRate() {
return currencyRate;
}
public void setCurrencyRate(BigDecimal currencyRate) {
this.currencyRate = currencyRate;
}
public String getBaseCurrency() {
return baseCurrency;
}
public void setBaseCurrency(String baseCurrency) {
this.baseCurrency = baseCurrency;
}
public Integer getCreateBy() {
return createBy;
}
public void setCreateBy(Integer createBy) {
this.createBy = createBy;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public String getUpdateBy() {
return updateBy;
}
public void setUpdateBy(String updateBy) {
this.updateBy = updateBy;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
public String getDataStatus() {
return dataStatus;
}
public void setDataStatus(String dataStatus) {
this.dataStatus = dataStatus;
}
public Integer getAuditBy() {
return auditBy;
}
public void setAuditBy(Integer auditBy) {
this.auditBy = auditBy;
}
public String getAuditStatus() {
return auditStatus;
}
public void setAuditStatus(String auditStatus) {
this.auditStatus = auditStatus;
}
public Date getAuditTime() {
return auditTime;
}
public void setAuditTime(Date auditTime) {
this.auditTime = auditTime;
}
public String getCurrencySymbol() {
return currencySymbol;
}
public void setCurrencySymbol(String currencySymbol) {
this.currencySymbol = currencySymbol;
}
}
... ...
package com.aukey.example.entity;
/**
* @author: wgf
* @create: 2020-05-19 17:18
* @description:
**/
public class FbaFulfillmentCurrentInventoryView {
private String corporationName;
private String groupCode;
private String groupName;
private String deptCode;
private String deptName;
private String companySku;
private String amazonSku;
private String fnsku;
/**
* ......
* more field
*/
public String getCorporationName() {
return corporationName;
}
public void setCorporationName(String corporationName) {
this.corporationName = corporationName;
}
public String getGroupCode() {
return groupCode;
}
public void setGroupCode(String groupCode) {
this.groupCode = groupCode;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public String getDeptCode() {
return deptCode;
}
public void setDeptCode(String deptCode) {
this.deptCode = deptCode;
}
public String getDeptName() {
return deptName;
}
public void setDeptName(String deptName) {
this.deptName = deptName;
}
public String getCompanySku() {
return companySku;
}
public void setCompanySku(String companySku) {
this.companySku = companySku;
}
public String getAmazonSku() {
return amazonSku;
}
public void setAmazonSku(String amazonSku) {
this.amazonSku = amazonSku;
}
public String getFnsku() {
return fnsku;
}
public void setFnsku(String fnsku) {
this.fnsku = fnsku;
}
}
... ...
package com.aukey.example.job;
import com.aukey.example.util.DwUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* @author: wgf
* @create: 2020-05-13 10:54
* @description: 获取token定时任务
**/
@Component
public class TokenJob {
private Logger log = LoggerFactory.getLogger(TokenJob.class);
@Value("${dwTokenApi:null}")
private String dwTokenApi;/* DW获取token的接口 */
@Value("${dwAppId:null}")/* 数据仓库AppId */
private String appId;
@Value("${dwAppSecret:null}")/* 数据仓库应用秘钥 */
private String appSecret;
/**
* 3分钟刷新获取一次token
*/
@Scheduled(fixedDelay = 180 * 1000l)
public synchronized void fetchToken() {
if ("null".equals(this.appSecret)) {
log.info("未配置appSecret,不访问远程DW服务!");
return;
}
if ("null".equals(this.appId)) {
log.info("未配置appId,不访问远程DW服务!");
return;
}
Map<String, Object> params = new HashMap<>();
params.put("appSecret", this.appSecret);
DwUtil.doGet(dwTokenApi, params);
}
}
... ...
package com.aukey.example.util;
import org.springframework.util.StringUtils;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
/**
* @author: wgf
* @create: 2020-05-13 11:21
* @description:
* 关于 java url 编码问题
* Java官方的URLEncoder.encode 实际上是为了post请求的content-type为x-www-form-urlencoded来设计的
* 在进行特殊参数转义的时候会将空格转为 +
* 但是在 RFC1738、RFC2396协议中规定,GET请求的空格转为为 %20
* 所以在发送GET请求的特殊参数中存在空格必须 先URLEncoder.encode 然后再用 %20替换掉所有+
**/
public class DwUtil {
private DwUtil() {
}
/**
* GET请求
*
* @param urlStr 请求url
* @param params 请求参数
* @return
*/
public static String doGet(String urlStr, Map<String, Object> params) {
if (StringUtils.isEmpty(urlStr)) {
return null;
}
HttpURLConnection connection = null;
InputStream is = null;
BufferedReader br = null;
String result = null;
try {
StringBuilder sb = new StringBuilder(urlStr);
if (params != null && !params.isEmpty()) {
boolean isFirst = true;
for (Map.Entry<String, Object> entry : params.entrySet()) {
if (Objects.isNull(entry.getValue())) {
continue;
}
if (isFirst) {
sb.append("?");
isFirst = false;
} else {
sb.append("&");
}
// 兼容参数特殊符号
String value = String.valueOf(entry.getValue());
value = URLEncoder.encode(value, "UTF-8");
value = value.replaceAll("\\+", "%20");
sb.append(entry.getKey())
.append("=")
.append(value);
}
urlStr = sb.toString();
}
URL url = new URL(urlStr);
connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("GET");
connection.setConnectTimeout(15000);
connection.setReadTimeout(60000);
connection.connect();
int responseCode = connection.getResponseCode();
if (responseCode == 200) {
is = connection.getInputStream();
br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));
StringBuilder sbf = new StringBuilder();
String temp;
while ((temp = br.readLine()) != null) {
sbf.append(temp);
sbf.append("\r\n");
}
result = sbf.toString();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != br) {
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (null != is) {
try {
is.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (connection != null) {
connection.disconnect();// 关闭远程连接
}
}
return result;
}
/**
* @param dwDoMain 数据仓库域名
* @param api 数据仓库申请的API
* @param params 参数
* @return
*/
public static String doGet(String dwDoMain, String api, Map<String, Object> params) {
return doGet(dwDoMain + api, params);
}
}
... ...
package com.aukey.example.vo;
import org.springframework.cglib.beans.BeanMap;
import java.util.Map;
/**
* @author: wgf
* @create: 2020-05-13 12:01
* @description:
**/
public class DwParamVo {
public DwParamVo() {
}
public DwParamVo(String appId, String token) {
this.appId = appId;
this.token = token;
}
/**
* 应用id
*/
private String appId;
/**
* token
*/
private String token;
/**
* 查询条件(可不传)
*/
private String queryCondition;
/**
* 字段选择(可不传)
*/
private String multiFields;
/**
* 偏移量(可不传 如果传递offset则必传limit)
*/
private Integer offset;
/**
* 限制条数(可不传 如果传递limit则必传offset)
*/
private Integer limit;
/**
* 是否流式读写(Y/N)
*/
private String stream;
public String getAppId() {
return appId;
}
public void setAppId(String appId) {
this.appId = appId;
}
public String getToken() {
return token;
}
public void setToken(String token) {
this.token = token;
}
public String getQueryCondition() {
return queryCondition;
}
public void setQueryCondition(String queryCondition) {
this.queryCondition = queryCondition;
}
public String getMultiFields() {
return multiFields;
}
public void setMultiFields(String multiFields) {
this.multiFields = multiFields;
}
public Integer getOffset() {
return offset;
}
public void setOffset(Integer offset) {
this.offset = offset;
}
public Integer getLimit() {
return limit;
}
public void setLimit(Integer limit) {
this.limit = limit;
}
public String getStream() {
return stream;
}
public void setStream(String stream) {
this.stream = stream;
}
public Map<String, Object> toMap() {
return BeanMap.create(this);
}
}
... ...
package com.aukey.example.vo;
import java.util.List;
/**
* @author: wgf
* @create: 2020-05-13 14:22
* @description: 数据仓库结果集返回vo
**/
public class DwResultVo<T> {
/**
* API是否调用成功
*/
private boolean success;
/**
* API返回的异常消息
*/
private String message;
/**
* 错误编码 200为正常
*/
private Integer code;
/**
* 返回数据
*/
private List<T> data;
public boolean isSuccess() {
return success;
}
public void setSuccess(boolean success) {
this.success = success;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public Integer getCode() {
return code;
}
public void setCode(Integer code) {
this.code = code;
}
public List<T> getData() {
return data;
}
public void setData(List<T> data) {
this.data = data;
}
}
... ...
package com.aukey.example.web;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.aukey.example.conf.StreamReaderHandler;
import com.aukey.example.entity.CurrencySet;
import com.aukey.example.entity.FbaFulfillmentCurrentInventoryView;
import com.aukey.example.util.DwUtil;
import com.aukey.example.vo.DwParamVo;
import com.aukey.example.vo.DwResultVo;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;
/**
* @author: wgf
* @create: 2020-05-13 16:34
* @description: API调用测试
* <p>
* ****************************************************************************************
* * *
* * 这里的TEST代码写在controller是方便使用swagger2本地调试,正常业务应该是在项目的定时任务里 *
* * *
* ****************************************************************************************
* <p>
* Demo 提供三种数据拉取方式,
* 第一种:直接调用API适合[1, 100000]数据读取
* 第二种:分页适合[1, 1000000]数据读取,数据基数过大深层分页会影响查询性能,并且客户端需要不断发起请求
* 第三种:流式读取适合[1, 10000000]数据读取,不存在深层分页性能影响,并且客户端只需要发起一次请求
**/
@RestController
@Api(tags = "API调用DEMO")
public class TestApiController {
private Logger log = LoggerFactory.getLogger(TestApiController.class);
// API需要在傲基数仓申请授权
public static final String CURRENCY_SET_API = "/base/currency_set";
public static final String FBA_FULFILLMENT_CURRENT_INVENTORY_VIEW_API = "/stock/fba_fulfillment_current_inventory_view";
@Value("${dwDataApi:null}")
private String dwDataApi;
@Value("${dwAppId:null}")
private String appId;
@ApiOperation(value = "获取前100条汇率 DEMO")
@GetMapping("/query_all")
public void queryAll() {
// 构造请求参数
DwParamVo paramVo = new DwParamVo(this.appId, TokenController.getCurrentToken());
// 添加查询条件
paramVo.setQueryCondition("WHERE currency_code = 'USD'");
// 指定获取条数
paramVo.setOffset(0);
paramVo.setLimit(100);
// 调用API
String result = DwUtil.doGet(dwDataApi, CURRENCY_SET_API, paramVo.toMap());
if (StringUtils.isEmpty(result)) {
throw new RuntimeException(String.format("API %s 调用失败", CURRENCY_SET_API));
}
// json解析为对象
DwResultVo<CurrencySet> resultVo = JSON.parseObject(result, new TypeReference<DwResultVo<CurrencySet>>() {
});
log.info("");
log.info("↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓");
log.info("API请求状态:{}", resultVo.getMessage());
log.info("API TOP100 数据:{}", JSON.toJSONString(resultVo.getData()));
log.info("↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑");
log.info("");
}
@ApiOperation(value = "分页查询 DEMO")
@GetMapping("/query_page")
public void queryPage() {
// 构造请求参数
DwParamVo paramVo = new DwParamVo(this.appId, TokenController.getCurrentToken());
// 页数
int pageNum = 0;
// 每页数据大小
final int pageSize = 1000;
// 当前页数据
int currentPageSize;
// 分页请求
do {
paramVo.setToken(TokenController.getCurrentToken());
paramVo.setOffset(pageNum * pageSize);
paramVo.setLimit(pageSize);
// TODO 自定义查询条件
String result = DwUtil.doGet(dwDataApi, CURRENCY_SET_API, paramVo.toMap());
if (StringUtils.isEmpty(result)) {
throw new RuntimeException(String.format("API %s 调用失败", CURRENCY_SET_API));
}
// json解析为对象
DwResultVo<CurrencySet> resultVo = JSON.parseObject(result, new TypeReference<DwResultVo<CurrencySet>>() {
});
currentPageSize = CollectionUtils.isEmpty(resultVo.getData()) ? 0 : resultVo.getData().size();
pageNum++;
log.info("========== 获取API:{} 第:{}页数据 数据条数:{} ==========", CURRENCY_SET_API, pageNum + 1, currentPageSize);
// TODO 自定义实现持久化逻辑
} while (currentPageSize == pageSize);
}
/**
* 流式读取适合百/千万级别的内网数据同步,本机测试
* <p>
* 网络环境:局域网
* 数据源 :华为dws
* cpu : 4核3.2GHz
* 内存 :16GB DDR3
* 测试数据:1000W条
* 回调函数不做持久化操作
* <p>
* <p>
* 测试报表
* **************************************
* *
* * QPS :1800
* * 耗时 :100W / 5分钟
* * CPU毛刺 :1% - 5% 波动
* * 内存毛刺:50Mb - 150Mb 波动
* *
* **************************************
*/
@ApiOperation(value = "流式读取 DEMO")
@GetMapping("/query_stream")
public void queryStream() {
long stratTime = System.currentTimeMillis();
try {
// 构造请求参数
DwParamVo paramVo = new DwParamVo(this.appId, TokenController.getCurrentToken());
// 设置为流式读取
paramVo.setStream("Y");
// TODO 自定义查询条件
paramVo.setOffset(3000000);
// 定义实体构造函数
Supplier<FbaFulfillmentCurrentInventoryView> constructor = FbaFulfillmentCurrentInventoryView::new;
// 流式读取回调函数
Consumer<List<FbaFulfillmentCurrentInventoryView>> callBack = (List<FbaFulfillmentCurrentInventoryView> list) -> {
// TODO 自定义实现持久化逻辑
// ... more
// mapper.insertList(list)
};
// 指定回调函数数据大小,取值[500, 5000].
// 取值越大,数据读取占用的堆内存越高
int batchSize = 5000;
// 使用 StreamAPI
StreamReaderHandler.reader(
dwDataApi,
FBA_FULFILLMENT_CURRENT_INVENTORY_VIEW_API,
paramVo.toMap(),
callBack,
batchSize,
constructor);
} catch (Exception e) {
log.info("流式读取异常", e);
} finally {
log.info("流式读取使用时间 {} 秒", (System.currentTimeMillis() - stratTime) / 1000.0f);
}
}
}
... ...
package com.aukey.example.web;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import springfox.documentation.annotations.ApiIgnore;
import java.util.concurrent.TimeUnit;
/**
* @author: wgf
* @create: 2020-05-13 10:11
* @description: 获取token回调controller
* 当向数据仓库平台发起 http://dw.aukeyit.com/api/authorize?appSecret=应用秘钥 请求时,
* 数据仓库验证应用秘钥后会生成token,并通过调用APP回调地址将token作为参数返回到客户端
**/
@RestController
@RequestMapping("/token")
@ApiIgnore
public class TokenController {
private Logger log = LoggerFactory.getLogger(TokenController.class);
/**
* 当前token
*/
private static String currentToken = "";
/*********************************************************************
* 这里的controller path 需要和数据仓库中APP设置的回调地址保持一致。 *
* 例如APP里回调地址为 http://localhost:8099/token/receive *
* TokenJob定时任务定时请求数据仓库获取授权API后 *
* 数据仓库回调 http://localhost:8099/token/receive,并且传递token参数 *
* 获取token后,可以放在 mysql,redis等。这里DEMO取巧放在内存中。 *
********************************************************************/
@PostMapping("/receive")
public void receive(String token) {
log.info("获取到远程DW服务回调请求,token:{}", token);
currentToken = token;
}
public static String getCurrentToken() {
for (int i = 0; i < 5; i++) {
if (StringUtils.isEmpty(currentToken)) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
return currentToken;
}
}
return null;
}
}
... ...
server:
port: 8099
# appid
dwAppId: 709f9bd417db4117a8665bb0f1a3346b
# 应用秘钥
dwAppSecret: d219db9b6c3c43a381c4a6312a1c3724
dwDoMain: http://dw.aukeyit.com
dwTokenApi: ${dwDoMain}/api/authorize
dwDataApi: ${dwDoMain}/api/data
\ No newline at end of file
... ...