作者 [wgf]

分页API优化

  1 +<?xml version="1.0" encoding="UTF-8"?>
  2 +<project version="4">
  3 + <component name="MavenProjectsManager">
  4 + <option name="originalFiles">
  5 + <list>
  6 + <option value="$PROJECT_DIR$/pom.xml" />
  7 + </list>
  8 + </option>
  9 + </component>
  10 + <component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" project-jdk-name="1.8" project-jdk-type="JavaSDK">
  11 + <output url="file://$PROJECT_DIR$/classes" />
  12 + </component>
  13 +</project>
@@ -11,7 +11,7 @@ @@ -11,7 +11,7 @@
11 提供三种数据请求方案 11 提供三种数据请求方案
12 --- 12 ---
13 13
14 -- 全量请求:一次http请求获取所有数据,有数量限制,最大1000条。适合小批量数据同步。 14 +- 全量请求:一次http请求获取所有数据,有数量限制,最大3000条。适合小批量数据同步。
15 15
16 - 分页请求:多次http请求获取所有数据,分页参数`pageNumber`, `pageSize(取值范围[1,3000])`. 适合中批量数据同步。 16 - 分页请求:多次http请求获取所有数据,分页参数`pageNumber`, `pageSize(取值范围[1,3000])`. 适合中批量数据同步。
17 17
1 -package com.aukey.example.conf;  
2 -  
3 -import com.alibaba.fastjson.JSON;  
4 -import com.alibaba.fastjson.JSONException;  
5 -import com.alibaba.fastjson.JSONReader;  
6 -import okhttp3.OkHttpClient;  
7 -import okhttp3.Request;  
8 -import okhttp3.Response;  
9 -import org.slf4j.Logger;  
10 -import org.slf4j.LoggerFactory;  
11 -  
12 -import java.io.InputStream;  
13 -import java.io.InputStreamReader;  
14 -import java.io.Reader;  
15 -import java.net.URLEncoder;  
16 -import java.util.ArrayList;  
17 -import java.util.List;  
18 -import java.util.Map;  
19 -import java.util.Objects;  
20 -import java.util.concurrent.TimeUnit;  
21 -import java.util.function.Consumer;  
22 -import java.util.function.Supplier;  
23 -  
24 -/**  
25 - * 描述:流式读取帮助类  
26 - * 创建者: wgf  
27 - * 创建时间:2020年5月20日 10:03:15  
28 - **/  
29 -public class StreamReaderHandler {  
30 -  
31 - private static Logger log = LoggerFactory.getLogger(StreamReaderHandler.class);  
32 -  
33 - private StreamReaderHandler() {  
34 - }  
35 -  
36 - /**  
37 - * @param dwDataApi dw数据请求接口  
38 - * @param api 平台申请的api  
39 - * @param paramMap 参数  
40 - * @param callback 业务回调  
41 - * @param batchSize 业务回调数据量大小,参考值 [500,5000]  
42 - * @param constructor 实体的构造函数  
43 - * @throws Exception  
44 - */  
45 - public static <T> void reader(String dwDataApi, String api,  
46 - Map<String, Object> paramMap,  
47 - Consumer<List<T>> callback,  
48 - int batchSize,  
49 - Supplier<T> constructor) throws Exception {  
50 -  
51 - dwDataApi = dwDataApi + api;  
52 - Response response = null;  
53 - InputStream is = null;  
54 - Reader reader = null;  
55 - JSONReader jsonReader = null;  
56 -  
57 - try {  
58 - response = executeHttpPostRequest(dwDataApi, paramMap);  
59 -  
60 - if (response.code() == 200) {  
61 - List<T> container = new ArrayList<>();  
62 - int total = 0;  
63 -  
64 - // 从响应中获取流  
65 - is = response.body().byteStream();  
66 - reader = new InputStreamReader(is);  
67 - jsonReader = new JSONReader(reader);  
68 - // 开始读取  
69 - jsonReader.startArray();  
70 -  
71 - while (jsonReader.hasNext()) {  
72 - try {  
73 - T t = constructor.get();  
74 - jsonReader.readObject(t);  
75 - container.add(t);  
76 - } catch (JSONException ex) {  
77 - Object o = jsonReader.readObject();  
78 - log.info("json数据转换异常:{}", o.toString());  
79 - }  
80 -  
81 - if (container.size() == batchSize) {  
82 - callback.accept(container);  
83 - total += batchSize;  
84 - log.info("{} 流式读取已读条数:{}", dwDataApi, total);  
85 - container.clear();  
86 - }  
87 - }  
88 -  
89 - // 处理最后一批不足 batchSize 的数据  
90 - if (container.size() > 0) {  
91 - callback.accept(container);  
92 - total += container.size();  
93 - log.info("{} 流式读取已读条数:{}", dwDataApi, total);  
94 - container.clear();  
95 - }  
96 -  
97 - //结束读取  
98 - jsonReader.endArray();  
99 - } else {  
100 - log.info("流式读取异常 [ url:{} ] [ code:{} ]", dwDataApi, response.code());  
101 - }  
102 -  
103 - } finally {  
104 - if (Objects.nonNull(jsonReader)) {  
105 - jsonReader.close();  
106 - }  
107 -  
108 - if (Objects.nonNull(reader)) {  
109 - reader.close();  
110 - }  
111 -  
112 - if (Objects.nonNull(is)) {  
113 - is.close();  
114 - }  
115 -  
116 - if (Objects.nonNull(response)) {  
117 - response.close();  
118 - }  
119 - }  
120 - }  
121 -  
122 - protected static Response executeHttpPostRequest(String url, Map<String, Object> paramMap) throws Exception {  
123 - OkHttpClient client = new OkHttpClient.Builder()  
124 - .connectTimeout(60 * 30 * 2, TimeUnit.SECONDS)  
125 - .readTimeout(60 * 1, TimeUnit.SECONDS)  
126 - .build();  
127 -  
128 - StringBuilder sb = new StringBuilder(url);  
129 - if (paramMap != null && !paramMap.isEmpty()) {  
130 - boolean isFirst = true;  
131 -  
132 - for (Map.Entry<String, Object> entry : paramMap.entrySet()) {  
133 -  
134 - if (Objects.isNull(entry.getValue())) {  
135 - continue;  
136 - }  
137 -  
138 - if (isFirst) {  
139 - sb.append("?");  
140 - isFirst = false;  
141 - } else {  
142 - sb.append("&");  
143 - }  
144 -  
145 - // 兼容参数特殊符号  
146 - String value = String.valueOf(entry.getValue());  
147 - value = URLEncoder.encode(value, "UTF-8");  
148 - value = value.replaceAll("\\+", "%20");  
149 -  
150 - sb.append(entry.getKey())  
151 - .append("=")  
152 - .append(value);  
153 - }  
154 - url = sb.toString();  
155 - }  
156 -  
157 - Request request = new Request.Builder().url(url).build();  
158 - Response response = client.newCall(request).execute();  
159 - return response;  
160 - }  
161 -}  
1 -package com.aukey.example.conf;  
2 -  
3 -import org.springframework.context.annotation.Bean;  
4 -import org.springframework.context.annotation.Configuration;  
5 -import org.springframework.web.context.request.async.TimeoutCallableProcessingInterceptor;  
6 -import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;  
7 -import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;  
8 -  
9 -/**  
10 - * @author: wgf  
11 - * @create: 2020-05-19 12:56  
12 - * @description:  
13 - **/  
14 -//@Configuration  
15 -public class WebMvcConfig implements WebMvcConfigurer {  
16 - @Override  
17 - public void configureAsyncSupport(final AsyncSupportConfigurer configurer) {  
18 - configurer.setDefaultTimeout(30 * 60 * 1000);  
19 - configurer.registerCallableInterceptors(timeoutInterceptor());  
20 - }  
21 -  
22 - @Bean  
23 - public TimeoutCallableProcessingInterceptor timeoutInterceptor() {  
24 - return new TimeoutCallableProcessingInterceptor();  
25 - }  
26 -}  
  1 +package com.aukey.example.constant;
  2 +
  3 +/**
  4 + * @author: wgf
  5 + * @create: 2020-05-21 10:36
  6 + * @description: 这里面配置的API一定要是授权过的,否则获取不了数据
  7 + **/
  8 +public interface DwApi {
  9 +
  10 + /**
  11 + * 汇率API
  12 + */
  13 + String CURRENCY_SET_API = "/base/currency_set";
  14 +
  15 + /**
  16 + * FBA-库存快照
  17 + */
  18 + String FBA_FULFILLMENT_CURRENT_INVENTORY_VIEW_API = "/stock/fba_fulfillment_current_inventory_view";
  19 +}
1 package com.aukey.example.job; 1 package com.aukey.example.job;
2 2
3 -import com.aukey.example.util.DwUtil; 3 +import com.aukey.example.util.DwHelperUtil;
4 import org.slf4j.Logger; 4 import org.slf4j.Logger;
5 import org.slf4j.LoggerFactory; 5 import org.slf4j.LoggerFactory;
6 import org.springframework.beans.factory.annotation.Value; 6 import org.springframework.beans.factory.annotation.Value;
@@ -45,6 +45,6 @@ public class TokenJob { @@ -45,6 +45,6 @@ public class TokenJob {
45 45
46 Map<String, Object> params = new HashMap<>(); 46 Map<String, Object> params = new HashMap<>();
47 params.put("appSecret", this.appSecret); 47 params.put("appSecret", this.appSecret);
48 - DwUtil.doGet(dwTokenApi, params); 48 + DwHelperUtil.doGet(dwTokenApi, params);
49 } 49 }
50 } 50 }
1 package com.aukey.example.util; 1 package com.aukey.example.util;
2 2
  3 +import com.alibaba.fastjson.JSON;
  4 +import com.alibaba.fastjson.JSONException;
  5 +import com.alibaba.fastjson.JSONReader;
  6 +import com.alibaba.fastjson.TypeReference;
  7 +import com.aukey.example.vo.DwParamVo;
  8 +import com.aukey.example.web.TokenController;
  9 +import okhttp3.OkHttpClient;
  10 +import okhttp3.Request;
  11 +import okhttp3.Response;
  12 +import org.slf4j.Logger;
  13 +import org.slf4j.LoggerFactory;
3 import org.springframework.util.StringUtils; 14 import org.springframework.util.StringUtils;
4 15
5 -import java.io.BufferedReader;  
6 -import java.io.IOException;  
7 -import java.io.InputStream;  
8 -import java.io.InputStreamReader; 16 +import java.io.*;
9 import java.net.HttpURLConnection; 17 import java.net.HttpURLConnection;
10 import java.net.URL; 18 import java.net.URL;
11 import java.net.URLEncoder; 19 import java.net.URLEncoder;
12 import java.nio.charset.StandardCharsets; 20 import java.nio.charset.StandardCharsets;
  21 +import java.util.ArrayList;
  22 +import java.util.List;
13 import java.util.Map; 23 import java.util.Map;
14 import java.util.Objects; 24 import java.util.Objects;
  25 +import java.util.concurrent.TimeUnit;
  26 +import java.util.function.Consumer;
  27 +import java.util.function.Function;
  28 +import java.util.function.Supplier;
15 29
16 /** 30 /**
17 * @author: wgf 31 * @author: wgf
18 * @create: 2020-05-13 11:21 32 * @create: 2020-05-13 11:21
19 - * @description: 33 + * @description: dw帮助工具类
  34 + *
20 * 关于 java url 编码问题 35 * 关于 java url 编码问题
21 * Java官方的URLEncoder.encode 实际上是为了post请求的content-type为x-www-form-urlencoded来设计的 36 * Java官方的URLEncoder.encode 实际上是为了post请求的content-type为x-www-form-urlencoded来设计的
22 * 在进行特殊参数转义的时候会将空格转为 + 37 * 在进行特殊参数转义的时候会将空格转为 +
23 * 但是在 RFC1738、RFC2396协议中规定,GET请求的空格转为为 %20 38 * 但是在 RFC1738、RFC2396协议中规定,GET请求的空格转为为 %20
24 - * 所以在发送GET请求的特殊参数中存在空格必须 先URLEncoder.encode 然后再用 %20替换掉所有+ 39 + * 所以在发送GET请求的特殊参数中存在空格必须 先URLEncoder.encode 然后再用 %20 替换掉所有 + 号
25 **/ 40 **/
26 -public class DwUtil {  
27 - private DwUtil() { 41 +public class DwHelperUtil {
  42 + private static Logger log = LoggerFactory.getLogger(DwHelperUtil.class);
  43 +
  44 + private DwHelperUtil() {
28 } 45 }
29 46
30 /** 47 /**
@@ -46,36 +63,7 @@ public class DwUtil { @@ -46,36 +63,7 @@ public class DwUtil {
46 String result = null; 63 String result = null;
47 64
48 try { 65 try {
49 - StringBuilder sb = new StringBuilder(urlStr);  
50 - if (params != null && !params.isEmpty()) {  
51 - boolean isFirst = true;  
52 -  
53 - for (Map.Entry<String, Object> entry : params.entrySet()) {  
54 -  
55 - if (Objects.isNull(entry.getValue())) {  
56 - continue;  
57 - }  
58 -  
59 - if (isFirst) {  
60 - sb.append("?");  
61 - isFirst = false;  
62 - } else {  
63 - sb.append("&");  
64 - }  
65 -  
66 - // 兼容参数特殊符号  
67 - String value = String.valueOf(entry.getValue());  
68 - value = URLEncoder.encode(value, "UTF-8");  
69 - value = value.replaceAll("\\+", "%20");  
70 -  
71 - sb.append(entry.getKey())  
72 - .append("=")  
73 - .append(value);  
74 - }  
75 -  
76 - urlStr = sb.toString();  
77 - }  
78 - 66 + urlStr = jointUrl(urlStr, params);
79 URL url = new URL(urlStr); 67 URL url = new URL(urlStr);
80 connection = (HttpURLConnection) url.openConnection(); 68 connection = (HttpURLConnection) url.openConnection();
81 connection.setRequestMethod("GET"); 69 connection.setRequestMethod("GET");
@@ -131,4 +119,216 @@ public class DwUtil { @@ -131,4 +119,216 @@ public class DwUtil {
131 public static String doGet(String dwDoMain, String api, Map<String, Object> params) { 119 public static String doGet(String dwDoMain, String api, Map<String, Object> params) {
132 return doGet(dwDoMain + api, params); 120 return doGet(dwDoMain + api, params);
133 } 121 }
  122 +
  123 +
  124 + /**
  125 + * url参数拼接
  126 + *
  127 + * @param url
  128 + * @param params
  129 + * @return
  130 + */
  131 + public static String jointUrl(String url, Map<String, Object> params) throws UnsupportedEncodingException {
  132 + StringBuilder sb = new StringBuilder(url);
  133 + if (params != null && !params.isEmpty()) {
  134 + boolean isFirst = true;
  135 +
  136 + for (Map.Entry<String, Object> entry : params.entrySet()) {
  137 +
  138 + if (Objects.isNull(entry.getValue())) {
  139 + continue;
  140 + }
  141 +
  142 + if (isFirst) {
  143 + sb.append("?");
  144 + isFirst = false;
  145 + } else {
  146 + sb.append("&");
  147 + }
  148 +
  149 + // 兼容参数特殊符号
  150 + String value = String.valueOf(entry.getValue());
  151 + value = URLEncoder.encode(value, "UTF-8");
  152 + value = value.replaceAll("\\+", "%20");
  153 +
  154 + sb.append(entry.getKey())
  155 + .append("=")
  156 + .append(value);
  157 + }
  158 +
  159 + url = sb.toString();
  160 + }
  161 +
  162 + return url;
  163 + }
  164 +
  165 +
  166 + /**
  167 + * 分页查询
  168 + *
  169 + * @param callBack 回调方法,需要返回总页数
  170 + * @param dwDataApi dw获取数据接口
  171 + * @param api 申请的api
  172 + * @param paramVo 请求参数
  173 + * @param typeReference json字符串转实体引用类型
  174 + * @param <T>
  175 + */
  176 + public static <T> void pageReader(Function<T, Integer> callBack,
  177 + String dwDataApi,
  178 + String api,
  179 + DwParamVo paramVo,
  180 + TypeReference<T> typeReference) {
  181 +
  182 + // 当前页
  183 + int currentPageNumber = 0;
  184 + // 总页数
  185 + int totalPageNum = 1;
  186 +
  187 + // 分页请求
  188 + do {
  189 + currentPageNumber++;
  190 + // 每次请求获取最新token
  191 + paramVo.setToken(TokenController.getCurrentToken());
  192 + paramVo.setPageNumber(currentPageNumber);
  193 +
  194 + String jsonStr = DwHelperUtil.doGet(dwDataApi, api, paramVo.toMap());
  195 + if (StringUtils.isEmpty(jsonStr)) {
  196 + throw new RuntimeException(String.format("API %s 调用失败", dwDataApi + api));
  197 + }
  198 +
  199 + // json解析为对象
  200 + T t = JSON.parseObject(jsonStr, typeReference);
  201 +
  202 + // 回调函数获取总页数
  203 + int total = callBack.apply(t);
  204 +
  205 + if (currentPageNumber == 1) {
  206 + totalPageNum = calculatePage(total, paramVo.getPageSize());
  207 + }
  208 +
  209 + log.info("========== 获取API:{} 第:{}页数据 ==========", dwDataApi + api, currentPageNumber);
  210 + } while (currentPageNumber < totalPageNum);
  211 + }
  212 +
  213 +
  214 + /**
  215 + * 总页数计算
  216 + *
  217 + * @param total
  218 + * @param pageSize
  219 + * @return
  220 + */
  221 + private static int calculatePage(int total, int pageSize) {
  222 + int result = 1;
  223 + if (total % pageSize == 0) {
  224 + result = total / pageSize;
  225 + } else {
  226 + result = total / pageSize + 1;
  227 + }
  228 +
  229 + return result;
  230 + }
  231 +
  232 +
  233 + /**
  234 + * 流式读取
  235 + *
  236 + * @param dwDataApi dw数据请求接口
  237 + * @param api 平台申请的api
  238 + * @param paramMap 参数
  239 + * @param callback 业务回调
  240 + * @param batchSize 业务回调数据量大小,参考值 [500,5000]
  241 + * @param constructor 实体的构造函数
  242 + * @throws Exception
  243 + */
  244 + public static <T> void streamReader(String dwDataApi, String api,
  245 + Map<String, Object> paramMap,
  246 + Consumer<List<T>> callback,
  247 + int batchSize,
  248 + Supplier<T> constructor) throws Exception {
  249 +
  250 + dwDataApi = dwDataApi + api;
  251 + Response response = null;
  252 + InputStream is = null;
  253 + Reader reader = null;
  254 + JSONReader jsonReader = null;
  255 +
  256 + try {
  257 + response = executeHttpPostRequest(dwDataApi, paramMap);
  258 +
  259 + if (response.code() == 200) {
  260 + List<T> container = new ArrayList<>();
  261 + int total = 0;
  262 +
  263 + // 从响应中获取流
  264 + is = response.body().byteStream();
  265 + reader = new InputStreamReader(is);
  266 + jsonReader = new JSONReader(reader);
  267 + // 开始读取
  268 + jsonReader.startArray();
  269 +
  270 + while (jsonReader.hasNext()) {
  271 + try {
  272 + T t = constructor.get();
  273 + jsonReader.readObject(t);
  274 + container.add(t);
  275 + } catch (JSONException ex) {
  276 + Object o = jsonReader.readObject();
  277 + log.info("json数据转换异常:{}", o.toString());
  278 + }
  279 +
  280 + if (container.size() == batchSize) {
  281 + callback.accept(container);
  282 + total += batchSize;
  283 + log.info("{} 流式读取已读条数:{}", dwDataApi, total);
  284 + container.clear();
  285 + }
  286 + }
  287 +
  288 + // 处理最后一批不足 batchSize 的数据
  289 + if (container.size() > 0) {
  290 + callback.accept(container);
  291 + total += container.size();
  292 + log.info("{} 流式读取已读条数:{}", dwDataApi, total);
  293 + container.clear();
  294 + }
  295 +
  296 + //结束读取
  297 + jsonReader.endArray();
  298 + } else {
  299 + log.info("流式读取异常 [ url:{} ] [ code:{} ]", dwDataApi, response.code());
  300 + }
  301 +
  302 + } finally {
  303 + if (Objects.nonNull(jsonReader)) {
  304 + jsonReader.close();
  305 + }
  306 +
  307 + if (Objects.nonNull(reader)) {
  308 + reader.close();
  309 + }
  310 +
  311 + if (Objects.nonNull(is)) {
  312 + is.close();
  313 + }
  314 +
  315 + if (Objects.nonNull(response)) {
  316 + response.close();
  317 + }
  318 + }
  319 + }
  320 +
  321 +
  322 + protected static Response executeHttpPostRequest(String url, Map<String, Object> paramMap) throws Exception {
  323 + OkHttpClient client = new OkHttpClient.Builder()
  324 + .connectTimeout(60 * 30 * 2, TimeUnit.SECONDS)
  325 + .readTimeout(60 * 1, TimeUnit.SECONDS)
  326 + .build();
  327 +
  328 + url = jointUrl(url, paramMap);
  329 +
  330 + Request request = new Request.Builder().url(url).build();
  331 + Response response = client.newCall(request).execute();
  332 + return response;
  333 + }
134 } 334 }
@@ -7,9 +7,14 @@ import java.util.Map; @@ -7,9 +7,14 @@ import java.util.Map;
7 /** 7 /**
8 * @author: wgf 8 * @author: wgf
9 * @create: 2020-05-13 12:01 9 * @create: 2020-05-13 12:01
10 - * @description: 10 + * @description: API查询实体
11 **/ 11 **/
12 public class DwParamVo { 12 public class DwParamVo {
  13 +
  14 + // TODO 返回数据类型,待扩展
  15 + public static final String OBJ = "OBJ";
  16 + public static final String SL = "SL";
  17 +
13 public DwParamVo() { 18 public DwParamVo() {
14 } 19 }
15 20
@@ -39,20 +44,27 @@ public class DwParamVo { @@ -39,20 +44,27 @@ public class DwParamVo {
39 private String multiFields; 44 private String multiFields;
40 45
41 /** 46 /**
42 - * 偏移量(可不传 如果传递offset则必传limit) 47 + * 页码,启始页为1(可不传 如果传递offset则必传limit)
43 */ 48 */
44 - private Integer offset; 49 + private Integer pageNumber;
45 50
46 /** 51 /**
47 * 限制条数(可不传 如果传递limit则必传offset) 52 * 限制条数(可不传 如果传递limit则必传offset)
48 */ 53 */
49 - private Integer limit; 54 + private Integer pageSize;
50 55
51 /** 56 /**
52 - * 是否流式读写(Y/N) 57 + * 是否流式读写(非流式可不传 Y/N)
53 */ 58 */
54 private String stream; 59 private String stream;
55 60
  61 + /**
  62 + * 可不传,默认OBJ
  63 + * OBJ: 使用PageVo对象
  64 + * SL: 使用
  65 + */
  66 + private String dataStructure;
  67 +
56 public String getAppId() { 68 public String getAppId() {
57 return appId; 69 return appId;
58 } 70 }
@@ -85,20 +97,20 @@ public class DwParamVo { @@ -85,20 +97,20 @@ public class DwParamVo {
85 this.multiFields = multiFields; 97 this.multiFields = multiFields;
86 } 98 }
87 99
88 - public Integer getOffset() {  
89 - return offset; 100 + public Integer getPageNumber() {
  101 + return pageNumber;
90 } 102 }
91 103
92 - public void setOffset(Integer offset) {  
93 - this.offset = offset; 104 + public void setPageNumber(Integer pageNumber) {
  105 + this.pageNumber = pageNumber;
94 } 106 }
95 107
96 - public Integer getLimit() {  
97 - return limit; 108 + public Integer getPageSize() {
  109 + return pageSize;
98 } 110 }
99 111
100 - public void setLimit(Integer limit) {  
101 - this.limit = limit; 112 + public void setPageSize(Integer pageSize) {
  113 + this.pageSize = pageSize;
102 } 114 }
103 115
104 public String getStream() { 116 public String getStream() {
@@ -112,4 +124,12 @@ public class DwParamVo { @@ -112,4 +124,12 @@ public class DwParamVo {
112 public Map<String, Object> toMap() { 124 public Map<String, Object> toMap() {
113 return BeanMap.create(this); 125 return BeanMap.create(this);
114 } 126 }
  127 +
  128 + public String getDataStructure() {
  129 + return dataStructure;
  130 + }
  131 +
  132 + public void setDataStructure(String dataStructure) {
  133 + this.dataStructure = dataStructure;
  134 + }
115 } 135 }
@@ -27,7 +27,7 @@ public class DwResultVo<T> { @@ -27,7 +27,7 @@ public class DwResultVo<T> {
27 /** 27 /**
28 * 返回数据 28 * 返回数据
29 */ 29 */
30 - private List<T> data; 30 + private T data;
31 31
32 public boolean isSuccess() { 32 public boolean isSuccess() {
33 return success; 33 return success;
@@ -53,11 +53,11 @@ public class DwResultVo<T> { @@ -53,11 +53,11 @@ public class DwResultVo<T> {
53 this.code = code; 53 this.code = code;
54 } 54 }
55 55
56 - public List<T> getData() { 56 + public T getData() {
57 return data; 57 return data;
58 } 58 }
59 59
60 - public void setData(List<T> data) { 60 + public void setData(T data) {
61 this.data = data; 61 this.data = data;
62 } 62 }
63 } 63 }
  1 +package com.aukey.example.vo;
  2 +
  3 +
  4 +import io.swagger.annotations.ApiModelProperty;
  5 +
  6 +import java.io.Serializable;
  7 +import java.util.List;
  8 +
  9 +/**
  10 + * 分页对象
  11 + *
  12 + * @author 吴耿锋
  13 + * @version 2018年5月11日
  14 + */
  15 +
  16 +public class PageVo<T> implements Serializable {
  17 +
  18 +
  19 + @ApiModelProperty("页数")
  20 + private int pageNumber;
  21 +
  22 + @ApiModelProperty("每页条数")
  23 + private int pageSize;
  24 +
  25 + @ApiModelProperty("总条数")
  26 + private int total;
  27 +
  28 + @ApiModelProperty("每页数据")
  29 + private List<T> data;
  30 +
  31 + public int getPageNumber() {
  32 + return pageNumber;
  33 + }
  34 +
  35 + public void setPageNumber(int pageNumber) {
  36 + this.pageNumber = pageNumber;
  37 + }
  38 +
  39 + public int getPageSize() {
  40 + return pageSize;
  41 + }
  42 +
  43 + public void setPageSize(int pageSize) {
  44 + this.pageSize = pageSize;
  45 + }
  46 +
  47 + public int getTotal() {
  48 + return total;
  49 + }
  50 +
  51 + public void setTotal(int total) {
  52 + this.total = total;
  53 + }
  54 +
  55 + public List<T> getData() {
  56 + return data;
  57 + }
  58 +
  59 + public void setData(List<T> data) {
  60 + this.data = data;
  61 + }
  62 +}
@@ -2,24 +2,25 @@ package com.aukey.example.web; @@ -2,24 +2,25 @@ package com.aukey.example.web;
2 2
3 import com.alibaba.fastjson.JSON; 3 import com.alibaba.fastjson.JSON;
4 import com.alibaba.fastjson.TypeReference; 4 import com.alibaba.fastjson.TypeReference;
5 -import com.aukey.example.conf.StreamReaderHandler; 5 +import com.aukey.example.constant.DwApi;
6 import com.aukey.example.entity.CurrencySet; 6 import com.aukey.example.entity.CurrencySet;
7 import com.aukey.example.entity.FbaFulfillmentCurrentInventoryView; 7 import com.aukey.example.entity.FbaFulfillmentCurrentInventoryView;
8 -import com.aukey.example.util.DwUtil; 8 +import com.aukey.example.util.DwHelperUtil;
9 import com.aukey.example.vo.DwParamVo; 9 import com.aukey.example.vo.DwParamVo;
10 import com.aukey.example.vo.DwResultVo; 10 import com.aukey.example.vo.DwResultVo;
  11 +import com.aukey.example.vo.PageVo;
11 import io.swagger.annotations.Api; 12 import io.swagger.annotations.Api;
12 import io.swagger.annotations.ApiOperation; 13 import io.swagger.annotations.ApiOperation;
13 import org.slf4j.Logger; 14 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory; 15 import org.slf4j.LoggerFactory;
15 import org.springframework.beans.factory.annotation.Value; 16 import org.springframework.beans.factory.annotation.Value;
16 -import org.springframework.util.CollectionUtils;  
17 import org.springframework.util.StringUtils; 17 import org.springframework.util.StringUtils;
18 import org.springframework.web.bind.annotation.GetMapping; 18 import org.springframework.web.bind.annotation.GetMapping;
19 import org.springframework.web.bind.annotation.RestController; 19 import org.springframework.web.bind.annotation.RestController;
20 20
21 import java.util.List; 21 import java.util.List;
22 import java.util.function.Consumer; 22 import java.util.function.Consumer;
  23 +import java.util.function.Function;
23 import java.util.function.Supplier; 24 import java.util.function.Supplier;
24 25
25 /** 26 /**
@@ -34,7 +35,7 @@ import java.util.function.Supplier; @@ -34,7 +35,7 @@ import java.util.function.Supplier;
34 * **************************************************************************************** 35 * ****************************************************************************************
35 * <p> 36 * <p>
36 * Demo 提供三种数据拉取方式, 37 * Demo 提供三种数据拉取方式,
37 - * 第一种:直接调用API适合[1, 100000]数据读取 38 + * 第一种:直接调用API适合[1, 3000]数据读取
38 * 第二种:分页适合[1, 1000000]数据读取,数据基数过大深层分页会影响查询性能,并且客户端需要不断发起请求 39 * 第二种:分页适合[1, 1000000]数据读取,数据基数过大深层分页会影响查询性能,并且客户端需要不断发起请求
39 * 第三种:流式读取适合[1, 10000000]数据读取,不存在深层分页性能影响,并且客户端只需要发起一次请求 40 * 第三种:流式读取适合[1, 10000000]数据读取,不存在深层分页性能影响,并且客户端只需要发起一次请求
40 **/ 41 **/
@@ -43,17 +44,14 @@ import java.util.function.Supplier; @@ -43,17 +44,14 @@ import java.util.function.Supplier;
43 public class TestApiController { 44 public class TestApiController {
44 private Logger log = LoggerFactory.getLogger(TestApiController.class); 45 private Logger log = LoggerFactory.getLogger(TestApiController.class);
45 46
46 - // API需要在傲基数仓申请授权  
47 - public static final String CURRENCY_SET_API = "/base/currency_set";  
48 - public static final String FBA_FULFILLMENT_CURRENT_INVENTORY_VIEW_API = "/stock/fba_fulfillment_current_inventory_view";  
49 -  
50 @Value("${dwDataApi:null}") 47 @Value("${dwDataApi:null}")
51 private String dwDataApi; 48 private String dwDataApi;
52 49
53 @Value("${dwAppId:null}") 50 @Value("${dwAppId:null}")
54 private String appId; 51 private String appId;
55 52
56 - @ApiOperation(value = "获取前100条汇率 DEMO") 53 +
  54 + @ApiOperation(value = "小批量读取 DEMO")
57 @GetMapping("/query_all") 55 @GetMapping("/query_all")
58 public void queryAll() { 56 public void queryAll() {
59 57
@@ -62,70 +60,68 @@ public class TestApiController { @@ -62,70 +60,68 @@ public class TestApiController {
62 60
63 // 添加查询条件 61 // 添加查询条件
64 paramVo.setQueryCondition("WHERE currency_code = 'USD'"); 62 paramVo.setQueryCondition("WHERE currency_code = 'USD'");
65 -  
66 - // 指定获取条数  
67 - paramVo.setOffset(0);  
68 - paramVo.setLimit(100); 63 + // 每页数据条数 dw服务端有限制 取值[1, 3000]
  64 + paramVo.setPageSize(3000);
69 65
70 // 调用API 66 // 调用API
71 - String result = DwUtil.doGet(dwDataApi, CURRENCY_SET_API, paramVo.toMap()); 67 + String result = DwHelperUtil.doGet(dwDataApi, DwApi.CURRENCY_SET_API, paramVo.toMap());
72 68
73 if (StringUtils.isEmpty(result)) { 69 if (StringUtils.isEmpty(result)) {
74 - throw new RuntimeException(String.format("API %s 调用失败", CURRENCY_SET_API)); 70 + throw new RuntimeException(String.format("API %s 调用失败", DwApi.CURRENCY_SET_API));
75 } 71 }
76 // json解析为对象 72 // json解析为对象
77 - DwResultVo<CurrencySet> resultVo = JSON.parseObject(result, new TypeReference<DwResultVo<CurrencySet>>() { 73 + DwResultVo<PageVo<CurrencySet>> resultVo = JSON.parseObject(result, new TypeReference<DwResultVo<PageVo<CurrencySet>>>() {
78 }); 74 });
79 75
80 log.info(""); 76 log.info("");
81 - log.info("↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓");  
82 log.info("API请求状态:{}", resultVo.getMessage()); 77 log.info("API请求状态:{}", resultVo.getMessage());
83 - log.info("API TOP100 数据:{}", JSON.toJSONString(resultVo.getData()));  
84 - log.info("↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑"); 78 + log.info("API返回数据:{}", JSON.toJSONString(resultVo.getData().getData()));
  79 + log.info("API返回数据两:{}", resultVo.getData().getData().size());
85 log.info(""); 80 log.info("");
86 } 81 }
87 82
88 83
89 - @ApiOperation(value = "分页查询 DEMO") 84 + /**
  85 + * 适合获取 百万级别的表,大表用分页读取会非常慢
  86 + * 大表建议使用流式读取
  87 + */
  88 + @ApiOperation(value = "分页读取 DEMO")
90 @GetMapping("/query_page") 89 @GetMapping("/query_page")
91 public void queryPage() { 90 public void queryPage() {
92 91
93 // 构造请求参数 92 // 构造请求参数
94 DwParamVo paramVo = new DwParamVo(this.appId, TokenController.getCurrentToken()); 93 DwParamVo paramVo = new DwParamVo(this.appId, TokenController.getCurrentToken());
95 -  
96 - // 页数  
97 - int pageNum = 0;  
98 - // 每页数据大小  
99 - final int pageSize = 1000;  
100 - // 当前页数据  
101 - int currentPageSize;  
102 -  
103 - // 分页请求  
104 - do {  
105 - paramVo.setToken(TokenController.getCurrentToken());  
106 - paramVo.setOffset(pageNum * pageSize);  
107 - paramVo.setLimit(pageSize); 94 + // 每页数据条数 dw服务端有限制 取值[1, 3000]
  95 + paramVo.setPageSize(1000);
108 // TODO 自定义查询条件 96 // TODO 自定义查询条件
109 97
110 - String result = DwUtil.doGet(dwDataApi, CURRENCY_SET_API, paramVo.toMap());  
111 - if (StringUtils.isEmpty(result)) {  
112 - throw new RuntimeException(String.format("API %s 调用失败", CURRENCY_SET_API));  
113 - }  
114 - // json解析为对象  
115 - DwResultVo<CurrencySet> resultVo = JSON.parseObject(result, new TypeReference<DwResultVo<CurrencySet>>() {  
116 - }); 98 + // json字符串转实体引用类型
  99 + TypeReference typeReference = new TypeReference<DwResultVo<PageVo<CurrencySet>>>() {
  100 + };
117 101
118 - currentPageSize = CollectionUtils.isEmpty(resultVo.getData()) ? 0 : resultVo.getData().size();  
119 - pageNum++;  
120 - log.info("========== 获取API:{} 第:{}页数据 数据条数:{} ==========", CURRENCY_SET_API, pageNum + 1, currentPageSize);  
121 - // TODO 自定义实现持久化逻辑 102 + // 回调函数
  103 + Function<DwResultVo<PageVo<CurrencySet>>, Integer> callback = (DwResultVo<PageVo<CurrencySet>> resultVo) -> {
  104 + List<CurrencySet> dataList = resultVo.getData().getData();
  105 + System.out.println(dataList.size());
  106 + // TODO 业务逻辑在这里实现
  107 + // ... more
  108 + // mapper.insertList(dataList)
122 109
123 - } while (currentPageSize == pageSize); 110 + // 需要返回总页数
  111 + return resultVo.getData().getTotal();
  112 + };
  113 +
  114 + // 使用分页API
  115 + DwHelperUtil.pageReader(callback,
  116 + dwDataApi,
  117 + DwApi.CURRENCY_SET_API,
  118 + paramVo,
  119 + typeReference);
124 } 120 }
125 121
126 122
127 /** 123 /**
128 - * 流式读取适合百/千万级别的内网数据同步,本机测试 124 + * 流式读取适合百/千万级别的内网数据同步,本机测试
129 * <p> 125 * <p>
130 * 网络环境:局域网 126 * 网络环境:局域网
131 * 数据源 :华为dws 127 * 数据源 :华为dws
@@ -138,10 +134,10 @@ public class TestApiController { @@ -138,10 +134,10 @@ public class TestApiController {
138 * 测试报表 134 * 测试报表
139 * ************************************** 135 * **************************************
140 * * 136 * *
141 - * * QPS :1800 137 + * * 每秒获取条数 :1700
142 * * 耗时 :100W / 5分钟 138 * * 耗时 :100W / 5分钟
143 * * CPU毛刺 :1% - 5% 波动 139 * * CPU毛刺 :1% - 5% 波动
144 - * * 内存毛刺:50Mb - 150Mb 波动 140 + * * 内存毛刺 :50Mb - 150Mb 波动
145 * * 141 * *
146 * ************************************** 142 * **************************************
147 */ 143 */
@@ -156,14 +152,14 @@ public class TestApiController { @@ -156,14 +152,14 @@ public class TestApiController {
156 // 设置为流式读取 152 // 设置为流式读取
157 paramVo.setStream("Y"); 153 paramVo.setStream("Y");
158 // TODO 自定义查询条件 154 // TODO 自定义查询条件
159 - paramVo.setOffset(3000000);  
160 155
161 // 定义实体构造函数 156 // 定义实体构造函数
162 Supplier<FbaFulfillmentCurrentInventoryView> constructor = FbaFulfillmentCurrentInventoryView::new; 157 Supplier<FbaFulfillmentCurrentInventoryView> constructor = FbaFulfillmentCurrentInventoryView::new;
163 158
164 // 流式读取回调函数 159 // 流式读取回调函数
165 Consumer<List<FbaFulfillmentCurrentInventoryView>> callBack = (List<FbaFulfillmentCurrentInventoryView> list) -> { 160 Consumer<List<FbaFulfillmentCurrentInventoryView>> callBack = (List<FbaFulfillmentCurrentInventoryView> list) -> {
166 - // TODO 自定义实现持久化逻辑 161 + System.out.println(list.size());
  162 + // TODO 业务逻辑在这里实现
167 // ... more 163 // ... more
168 // mapper.insertList(list) 164 // mapper.insertList(list)
169 }; 165 };
@@ -173,9 +169,9 @@ public class TestApiController { @@ -173,9 +169,9 @@ public class TestApiController {
173 int batchSize = 5000; 169 int batchSize = 5000;
174 170
175 // 使用 StreamAPI 171 // 使用 StreamAPI
176 - StreamReaderHandler.reader( 172 + DwHelperUtil.streamReader(
177 dwDataApi, 173 dwDataApi,
178 - FBA_FULFILLMENT_CURRENT_INVENTORY_VIEW_API, 174 + DwApi.FBA_FULFILLMENT_CURRENT_INVENTORY_VIEW_API,
179 paramVo.toMap(), 175 paramVo.toMap(),
180 callBack, 176 callBack,
181 batchSize, 177 batchSize,
@@ -28,7 +28,7 @@ public class TokenController { @@ -28,7 +28,7 @@ public class TokenController {
28 /** 28 /**
29 * 当前token 29 * 当前token
30 */ 30 */
31 - private static String currentToken = ""; 31 + private static String currentToken = null;
32 32
33 33
34 /********************************************************************* 34 /*********************************************************************
@@ -36,7 +36,7 @@ public class TokenController { @@ -36,7 +36,7 @@ public class TokenController {
36 * 例如APP里回调地址为 http://localhost:8099/token/receive * 36 * 例如APP里回调地址为 http://localhost:8099/token/receive *
37 * TokenJob定时任务定时请求数据仓库获取授权API后 * 37 * TokenJob定时任务定时请求数据仓库获取授权API后 *
38 * 数据仓库回调 http://localhost:8099/token/receive,并且传递token参数 * 38 * 数据仓库回调 http://localhost:8099/token/receive,并且传递token参数 *
39 - * 获取token后,可以放在 mysql,redis等。这里DEMO取巧放在内存中。 * 39 + * 获取token后,可以放在 mysql,redis等。DEMO就直接放在内存中。 *
40 ********************************************************************/ 40 ********************************************************************/
41 @PostMapping("/receive") 41 @PostMapping("/receive")
42 public void receive(String token) { 42 public void receive(String token) {
@@ -45,10 +45,10 @@ public class TokenController { @@ -45,10 +45,10 @@ public class TokenController {
45 } 45 }
46 46
47 public static String getCurrentToken() { 47 public static String getCurrentToken() {
48 - for (int i = 0; i < 5; i++) { 48 + for (int i = 0; i < 3; i++) {
49 if (StringUtils.isEmpty(currentToken)) { 49 if (StringUtils.isEmpty(currentToken)) {
50 try { 50 try {
51 - TimeUnit.SECONDS.sleep(2); 51 + TimeUnit.SECONDS.sleep(1);
52 } catch (InterruptedException e) { 52 } catch (InterruptedException e) {
53 e.printStackTrace(); 53 e.printStackTrace();
54 } 54 }