作者 [wgf]

添加 SL数据格式 DEMO

### dw example 使用手册
数据格式
---
- `OBJ`:传统的JSON格式,返回数组包含严格的 key:value
- `SL`:修剪过的JSON格式,字段名称和字段值分别分开存储在不同数组种
```
{
"headData": [
"currency_id",
"currency_code",
"currency_name"
]
},
"columnDataList": [
[
14186,
"USD",
"美元"
],
[
14187,
"USD",
"美元"
],
...
]
}
```
因为SL格式每次数据交互只序列化一次key,和传统的JSON结构相比。生成的JSON字符串长度会缩小20% ~ 40%,
但相对的比较难解析,可以在`DwParamVo`中指定格式。
修改配置文件
---
修改 application.yml 的 `dwAppId`、 `dwAppSecret` 配置
... ...
... ... @@ -15,5 +15,5 @@ public interface DwApi {
/**
* FBA-库存快照
*/
String FBA_FULFILLMENT_CURRENT_INVENTORY_VIEW_API = "/stock/fba_fulfillment_current_inventory_view";
String FULFILLMENT_CURRENT_INVENTORY = "/stock/fulfillment_current_inventory";
}
... ...
package com.aukey.example.entity;
/**
* @author: wgf
* @create: 2020-05-19 17:18
* @description:
**/
public class FbaFulfillmentCurrentInventoryView {
private String corporationName;
private String groupCode;
private String groupName;
private String deptCode;
private String deptName;
private String companySku;
private String amazonSku;
private String fnsku;
/**
* ......
* more field
*/
public String getCorporationName() {
return corporationName;
}
public void setCorporationName(String corporationName) {
this.corporationName = corporationName;
}
public String getGroupCode() {
return groupCode;
}
public void setGroupCode(String groupCode) {
this.groupCode = groupCode;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public String getDeptCode() {
return deptCode;
}
public void setDeptCode(String deptCode) {
this.deptCode = deptCode;
}
public String getDeptName() {
return deptName;
}
public void setDeptName(String deptName) {
this.deptName = deptName;
}
public String getCompanySku() {
return companySku;
}
public void setCompanySku(String companySku) {
this.companySku = companySku;
}
public String getAmazonSku() {
return amazonSku;
}
public void setAmazonSku(String amazonSku) {
this.amazonSku = amazonSku;
}
public String getFnsku() {
return fnsku;
}
public void setFnsku(String fnsku) {
this.fnsku = fnsku;
}
}
package com.aukey.example.entity;
import java.util.Date;
/**
* @author: wgf
* @create: 2020-05-19 17:18
* @description:
**/
public class FulfillmentCurrentInventory {
private Integer id;
private Integer accountId;
private String accountName;
private String siteName;
private Integer areaId;
private String area;
private Date snapshotDate;
private Date createDate;
private String fnSku;
private String amazonSku;
private String productName;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public Integer getAccountId() {
return accountId;
}
public void setAccountId(Integer accountId) {
this.accountId = accountId;
}
public String getAccountName() {
return accountName;
}
public void setAccountName(String accountName) {
this.accountName = accountName;
}
public String getSiteName() {
return siteName;
}
public void setSiteName(String siteName) {
this.siteName = siteName;
}
public Integer getAreaId() {
return areaId;
}
public void setAreaId(Integer areaId) {
this.areaId = areaId;
}
public String getArea() {
return area;
}
public void setArea(String area) {
this.area = area;
}
public Date getSnapshotDate() {
return snapshotDate;
}
public void setSnapshotDate(Date snapshotDate) {
this.snapshotDate = snapshotDate;
}
public Date getCreateDate() {
return createDate;
}
public void setCreateDate(Date createDate) {
this.createDate = createDate;
}
public String getFnSku() {
return fnSku;
}
public void setFnSku(String fnSku) {
this.fnSku = fnSku;
}
public String getAmazonSku() {
return amazonSku;
}
public void setAmazonSku(String amazonSku) {
this.amazonSku = amazonSku;
}
public String getProductName() {
return productName;
}
public void setProductName(String productName) {
this.productName = productName;
}
}
... ...
package com.aukey.example.util;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONReader;
import com.alibaba.fastjson.TypeReference;
import com.aukey.example.vo.DwParamVo;
... ... @@ -31,7 +30,7 @@ import java.util.function.Supplier;
* @author: wgf
* @create: 2020-05-13 11:21
* @description: dw帮助工具类
*
* <p>
* 关于 java url 编码问题
* Java官方的URLEncoder.encode 实际上是为了post请求的content-type为x-www-form-urlencoded来设计的
* 在进行特殊参数转义的时候会将空格转为 +
... ... @@ -268,14 +267,9 @@ public class DwHelperUtil {
jsonReader.startArray();
while (jsonReader.hasNext()) {
try {
T t = constructor.get();
jsonReader.readObject(t);
container.add(t);
} catch (JSONException ex) {
Object o = jsonReader.readObject();
log.info("json数据转换异常:{}", o.toString());
}
T t = constructor.get();
jsonReader.readObject(t);
container.add(t);
if (container.size() == batchSize) {
callback.accept(container);
... ... @@ -292,14 +286,16 @@ public class DwHelperUtil {
log.info("{} 流式读取已读条数:{}", dwDataApi, total);
container.clear();
}
//结束读取
jsonReader.endArray();
} else {
log.info("流式读取异常 [ url:{} ] [ code:{} ]", dwDataApi, response.code());
}
} catch (Exception e) {
log.error(e.getMessage(), e);
throw e;
} finally {
//结束读取
jsonReader.endArray();
if (Objects.nonNull(jsonReader)) {
jsonReader.close();
}
... ... @@ -321,8 +317,9 @@ public class DwHelperUtil {
protected static Response executeHttpPostRequest(String url, Map<String, Object> paramMap) throws Exception {
OkHttpClient client = new OkHttpClient.Builder()
.connectTimeout(60 * 30 * 2, TimeUnit.SECONDS)
.readTimeout(60 * 1, TimeUnit.SECONDS)
.connectTimeout(60 * 60 * 2, TimeUnit.SECONDS)
.writeTimeout(60 * 2, TimeUnit.SECONDS)
.readTimeout(60 * 2, TimeUnit.SECONDS)
.build();
url = jointUrl(url, paramMap);
... ...
... ... @@ -11,9 +11,10 @@ import java.util.List;
*
* @author 吴耿锋
* @version 2018年5月11日
* 对应 OBJ 返回数据类型
*/
public class PageVo<T> implements Serializable {
public class ObjVo<T> implements Serializable {
@ApiModelProperty("页数")
... ...
package com.aukey.example.vo;
import java.util.List;
/**
* @author: wgf
* @create: 2020-05-22 15:42
* @description:
**/
public class SlNodeVo {
/**
* 字段名称
*/
private List<String> headData;
/**
* 数据列表
*/
private List<List<Object>> columnDataList;
public List<List<Object>> getColumnDataList() {
return columnDataList;
}
public void setColumnDataList(List<List<Object>> columnDataList) {
this.columnDataList = columnDataList;
}
public List<String> getHeadData() {
return headData;
}
public void setHeadData(List<String> headData) {
this.headData = headData;
}
}
... ...
package com.aukey.example.vo;
/**
* @author: wgf
* @create: 2020-05-22 14:55
* @description: 对应 SL 返回数据类型
* 返回数据和字段在 SlNodeVo
* SL这种方式需要对数据进行格式化
**/
public class SlVo {
private Integer total;
private Integer pageNumber;
private SlNodeVo data;
public Integer getTotal() {
return total;
}
public void setTotal(Integer total) {
this.total = total;
}
public Integer getPageNumber() {
return pageNumber;
}
public void setPageNumber(Integer pageNumber) {
this.pageNumber = pageNumber;
}
public SlNodeVo getData() {
return data;
}
public void setData(SlNodeVo data) {
this.data = data;
}
}
... ...
... ... @@ -4,11 +4,12 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.aukey.example.constant.DwApi;
import com.aukey.example.entity.CurrencySet;
import com.aukey.example.entity.FbaFulfillmentCurrentInventoryView;
import com.aukey.example.entity.FulfillmentCurrentInventory;
import com.aukey.example.util.DwHelperUtil;
import com.aukey.example.vo.DwParamVo;
import com.aukey.example.vo.DwResultVo;
import com.aukey.example.vo.PageVo;
import com.aukey.example.vo.ObjVo;
import com.aukey.example.vo.SlVo;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger;
... ... @@ -51,9 +52,9 @@ public class TestApiController {
private String appId;
@ApiOperation(value = "小批量读取 DEMO")
@GetMapping("/query_all")
public void queryAll() {
@ApiOperation(value = "小批量读取 OBJ 方式")
@GetMapping("/query_all_obj")
public void queryAllObj() {
// 构造请求参数
DwParamVo paramVo = new DwParamVo(this.appId, TokenController.getCurrentToken());
... ... @@ -62,6 +63,7 @@ public class TestApiController {
paramVo.setQueryCondition("WHERE currency_code = 'USD'");
// 每页数据条数 dw服务端有限制 取值[1, 3000]
paramVo.setPageSize(3000);
paramVo.setDataStructure(DwParamVo.OBJ);
// 调用API
String result = DwHelperUtil.doGet(dwDataApi, DwApi.CURRENCY_SET_API, paramVo.toMap());
... ... @@ -70,13 +72,46 @@ public class TestApiController {
throw new RuntimeException(String.format("API %s 调用失败", DwApi.CURRENCY_SET_API));
}
// json解析为对象
DwResultVo<PageVo<CurrencySet>> resultVo = JSON.parseObject(result, new TypeReference<DwResultVo<PageVo<CurrencySet>>>() {
DwResultVo<ObjVo<CurrencySet>> resultVo = JSON.parseObject(result, new TypeReference<DwResultVo<ObjVo<CurrencySet>>>() {
});
log.info("");
log.info("API请求状态:{}", resultVo.getMessage());
log.info("API返回数据:{}", JSON.toJSONString(resultVo.getData().getData()));
log.info("API返回数据两:{}", resultVo.getData().getData().size());
log.info("API返回数据量:{}", resultVo.getData().getTotal());
log.info("");
}
@ApiOperation(value = "小批量读取 SL 方式")
@GetMapping("/query_all_sl")
public void queryAllSl() {
// 构造请求参数
DwParamVo paramVo = new DwParamVo(this.appId, TokenController.getCurrentToken());
// 添加查询条件
paramVo.setQueryCondition("WHERE currency_code = 'USD'");
// 每页数据条数 dw服务端有限制 取值[1, 3000]
paramVo.setPageSize(3000);
paramVo.setDataStructure(DwParamVo.SL);
// 调用API
String result = DwHelperUtil.doGet(dwDataApi, DwApi.CURRENCY_SET_API, paramVo.toMap());
if (StringUtils.isEmpty(result)) {
throw new RuntimeException(String.format("API %s 调用失败", DwApi.CURRENCY_SET_API));
}
// json解析为对象
DwResultVo<SlVo> resultVo = JSON.parseObject(result, new TypeReference<DwResultVo<SlVo>>() {
});
log.info("");
log.info("API请求状态:{}", resultVo.getMessage());
log.info("API返回数据字段:{}", JSON.toJSONString(resultVo.getData().getData().getHeadData()));
log.info("API返回数据数组:{}", JSON.toJSONString(resultVo.getData().getData().getColumnDataList()));
log.info("API返回数据量:{}", resultVo.getData().getTotal());
log.info("");
}
... ... @@ -85,22 +120,23 @@ public class TestApiController {
* 适合获取 百万级别的表,大表用分页读取会非常慢
* 大表建议使用流式读取
*/
@ApiOperation(value = "分页读取 DEMO")
@GetMapping("/query_page")
public void queryPage() {
@ApiOperation(value = "分页读取 OBJ 方式")
@GetMapping("/query_page_obj")
public void queryPageObj() {
// 构造请求参数
DwParamVo paramVo = new DwParamVo(this.appId, TokenController.getCurrentToken());
// 每页数据条数 dw服务端有限制 取值[1, 3000]
paramVo.setPageSize(1000);
paramVo.setDataStructure(DwParamVo.OBJ);
// TODO 自定义查询条件
// json字符串转实体引用类型
TypeReference typeReference = new TypeReference<DwResultVo<PageVo<CurrencySet>>>() {
TypeReference typeReference = new TypeReference<DwResultVo<ObjVo<CurrencySet>>>() {
};
// 回调函数
Function<DwResultVo<PageVo<CurrencySet>>, Integer> callback = (DwResultVo<PageVo<CurrencySet>> resultVo) -> {
Function<DwResultVo<ObjVo<CurrencySet>>, Integer> callback = (DwResultVo<ObjVo<CurrencySet>> resultVo) -> {
List<CurrencySet> dataList = resultVo.getData().getData();
System.out.println(dataList.size());
// TODO 业务逻辑在这里实现
... ... @@ -121,13 +157,53 @@ public class TestApiController {
/**
* 流式读取适合百/千万级别的表内网数据同步,本机测试
* 适合获取 百万级别的表,大表用分页读取会非常慢
* 大表建议使用流式读取
*/
@ApiOperation(value = "分页读取 SL 方式")
@GetMapping("/query_page_sl")
public void queryPageSl() {
// 构造请求参数
DwParamVo paramVo = new DwParamVo(this.appId, TokenController.getCurrentToken());
// 每页数据条数 dw服务端有限制 取值[1, 3000]
paramVo.setPageSize(1000);
paramVo.setDataStructure(DwParamVo.SL);
// TODO 自定义查询条件
// json字符串转实体引用类型
TypeReference typeReference = new TypeReference<DwResultVo<SlVo>>() {
};
// 回调函数
Function<DwResultVo<SlVo>, Integer> callback = (DwResultVo<SlVo> resultVo) -> {
log.info("字段字段:{}", resultVo.getData().getData().getHeadData());
log.info("数据数组:{}", resultVo.getData().getData().getColumnDataList());
// TODO 业务逻辑在这里实现
// ... more
// mapper.insertList(dataList)
// 需要返回总页数
return resultVo.getData().getTotal();
};
// 使用分页API
DwHelperUtil.pageReader(callback,
dwDataApi,
DwApi.CURRENCY_SET_API,
paramVo,
typeReference);
}
/**
* 流式读取适合百级别的表内网数据同步,每次数据读取建议在[100000, 1000000],本机测试
* <p>
* 网络环境:局域网
* 数据源 :华为dws
* cpu : 4核3.2GHz
* 内存 :16GB DDR3
* 测试数据:1000W条
* 测试数据:100W条
* 回调函数不做持久化操作
* <p>
* <p>
... ... @@ -154,24 +230,24 @@ public class TestApiController {
// TODO 自定义查询条件
// 定义实体构造函数
Supplier<FbaFulfillmentCurrentInventoryView> constructor = FbaFulfillmentCurrentInventoryView::new;
Supplier<FulfillmentCurrentInventory> constructor = FulfillmentCurrentInventory::new;
// 流式读取回调函数
Consumer<List<FbaFulfillmentCurrentInventoryView>> callBack = (List<FbaFulfillmentCurrentInventoryView> list) -> {
Consumer<List<FulfillmentCurrentInventory>> callBack = (List<FulfillmentCurrentInventory> list) -> {
System.out.println(list.size());
// TODO 业务逻辑在这里实现
// ... more
// mapper.insertList(list)
};
// 指定回调函数数据大小,取值[500, 5000].
// 指定回调函数数据大小,取值[1000, 5000]. 建议2000
// 取值越大,数据读取占用的堆内存越高
int batchSize = 5000;
int batchSize = 1000;
// 使用 StreamAPI
DwHelperUtil.streamReader(
dwDataApi,
DwApi.FBA_FULFILLMENT_CURRENT_INVENTORY_VIEW_API,
DwApi.FULFILLMENT_CURRENT_INVENTORY,
paramVo.toMap(),
callBack,
batchSize,
... ...