作者 [wgf]

优化流式读取,将普通流修改为压缩流

@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
25 import java.util.function.Consumer; 25 import java.util.function.Consumer;
26 import java.util.function.Function; 26 import java.util.function.Function;
27 import java.util.function.Supplier; 27 import java.util.function.Supplier;
  28 +import java.util.zip.GZIPInputStream;
28 29
29 /** 30 /**
30 * @author: wgf 31 * @author: wgf
@@ -40,6 +41,28 @@ import java.util.function.Supplier; @@ -40,6 +41,28 @@ import java.util.function.Supplier;
40 public class DwHelperUtil { 41 public class DwHelperUtil {
41 private static Logger log = LoggerFactory.getLogger(DwHelperUtil.class); 42 private static Logger log = LoggerFactory.getLogger(DwHelperUtil.class);
42 43
  44 + private static OkHttpClient client;
  45 +
  46 + /**
  47 + * http请求默认超时为7200秒
  48 + * 可根据同步数据大小自行设置超时时间
  49 + * @return
  50 + */
  51 + public static OkHttpClient getClient() {
  52 + if (client == null) {
  53 + synchronized (DwHelperUtil.class) {
  54 + if (client == null) {
  55 + client = new OkHttpClient.Builder()
  56 + .callTimeout(60 * 60 * 2, TimeUnit.SECONDS)
  57 + .writeTimeout(60 * 2, TimeUnit.SECONDS)
  58 + .readTimeout(60 * 2, TimeUnit.SECONDS)
  59 + .build();
  60 + }
  61 + }
  62 + }
  63 + return client;
  64 + }
  65 +
43 private DwHelperUtil() { 66 private DwHelperUtil() {
44 } 67 }
45 68
@@ -247,10 +270,11 @@ public class DwHelperUtil { @@ -247,10 +270,11 @@ public class DwHelperUtil {
247 Supplier<T> constructor) throws Exception { 270 Supplier<T> constructor) throws Exception {
248 271
249 dwDataApi = dwDataApi + api; 272 dwDataApi = dwDataApi + api;
250 - Response response = null;  
251 - InputStream is = null;  
252 - Reader reader = null;  
253 - JSONReader jsonReader = null; 273 + Response response = null;
  274 + InputStream is = null;
  275 + GZIPInputStream gzipInputStream = null;
  276 + Reader reader = null;
  277 + JSONReader jsonReader = null;
254 278
255 try { 279 try {
256 response = executeHttpGetRequest(dwDataApi, paramMap); 280 response = executeHttpGetRequest(dwDataApi, paramMap);
@@ -258,10 +282,12 @@ public class DwHelperUtil { @@ -258,10 +282,12 @@ public class DwHelperUtil {
258 if (response.code() == 200) { 282 if (response.code() == 200) {
259 List<T> container = new ArrayList<>(); 283 List<T> container = new ArrayList<>();
260 int total = 0; 284 int total = 0;
  285 + long start = System.currentTimeMillis();
261 286
262 // 从响应中获取流 287 // 从响应中获取流
263 is = response.body().byteStream(); 288 is = response.body().byteStream();
264 - reader = new InputStreamReader(is); 289 + gzipInputStream = new GZIPInputStream(is);
  290 + reader = new InputStreamReader(gzipInputStream);
265 jsonReader = new JSONReader(reader); 291 jsonReader = new JSONReader(reader);
266 // 开始读取 292 // 开始读取
267 jsonReader.startArray(); 293 jsonReader.startArray();
@@ -286,6 +312,9 @@ public class DwHelperUtil { @@ -286,6 +312,9 @@ public class DwHelperUtil {
286 log.info("{} 流式读取已读条数:{}", dwDataApi, total); 312 log.info("{} 流式读取已读条数:{}", dwDataApi, total);
287 container.clear(); 313 container.clear();
288 } 314 }
  315 +
  316 + float consuming = (System.currentTimeMillis() - start) / 1000.f;
  317 + log.info("流式平均每秒读取条数:{}", total / consuming);
289 } else { 318 } else {
290 log.info("流式读取异常 [ url:{} ] [ code:{} ]", dwDataApi, response.code()); 319 log.info("流式读取异常 [ url:{} ] [ code:{} ]", dwDataApi, response.code());
291 } 320 }
@@ -300,6 +329,10 @@ public class DwHelperUtil { @@ -300,6 +329,10 @@ public class DwHelperUtil {
300 jsonReader.close(); 329 jsonReader.close();
301 } 330 }
302 331
  332 + if (Objects.nonNull(gzipInputStream)) {
  333 + gzipInputStream.close();
  334 + }
  335 +
303 if (Objects.nonNull(reader)) { 336 if (Objects.nonNull(reader)) {
304 reader.close(); 337 reader.close();
305 } 338 }
@@ -316,14 +349,8 @@ public class DwHelperUtil { @@ -316,14 +349,8 @@ public class DwHelperUtil {
316 349
317 350
318 protected static Response executeHttpGetRequest(String url, Map<String, Object> paramMap) throws Exception { 351 protected static Response executeHttpGetRequest(String url, Map<String, Object> paramMap) throws Exception {
319 - OkHttpClient client = new OkHttpClient.Builder()  
320 - .connectTimeout(60 * 60 * 2, TimeUnit.SECONDS)  
321 - .writeTimeout(60 * 2, TimeUnit.SECONDS)  
322 - .readTimeout(60 * 2, TimeUnit.SECONDS)  
323 - .build();  
324 - 352 + OkHttpClient client = getClient();
325 url = jointUrl(url, paramMap); 353 url = jointUrl(url, paramMap);
326 -  
327 Request request = new Request.Builder().url(url).build(); 354 Request request = new Request.Builder().url(url).build();
328 Response response = client.newCall(request).execute(); 355 Response response = client.newCall(request).execute();
329 return response; 356 return response;
@@ -197,7 +197,8 @@ public class TestApiController { @@ -197,7 +197,8 @@ public class TestApiController {
197 197
198 198
199 /** 199 /**
200 - * 流式读取适合百级别的表内网数据同步,每次数据读取建议在[100000, 1000000],本机测试 200 + * 流式读取适合百级别的表内网数据同步,每次数据读取建议在1千万以内,连接2小时会超时。
  201 + * 几千万级别的建议按条件分批
201 * <p> 202 * <p>
202 * 网络环境:局域网 203 * 网络环境:局域网
203 * 数据源 :华为dws 204 * 数据源 :华为dws
@@ -210,8 +211,8 @@ public class TestApiController { @@ -210,8 +211,8 @@ public class TestApiController {
210 * 测试报表 211 * 测试报表
211 * ************************************** 212 * **************************************
212 * * 213 * *
213 - * * 每秒获取条数 :1700  
214 - * * 耗时 :100W / 5分钟 214 + * * 每秒获取条数 :3100+
  215 + * * 数据总将 :1000W+
215 * * CPU毛刺 :1% - 5% 波动 216 * * CPU毛刺 :1% - 5% 波动
216 * * 内存毛刺 :50Mb - 150Mb 波动 217 * * 内存毛刺 :50Mb - 150Mb 波动
217 * * 218 * *