StreamReaderHandler.java
5.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package com.aukey.example.conf;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONReader;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
/**
* 描述:流式读取帮助类
* 创建者: wgf
* 创建时间:2020年5月20日 10:03:15
**/
public class StreamReaderHandler {
private static Logger log = LoggerFactory.getLogger(StreamReaderHandler.class);
private StreamReaderHandler() {
}
/**
* @param dwDataApi dw数据请求接口
* @param api 平台申请的api
* @param paramMap 参数
* @param callback 业务回调
* @param batchSize 业务回调数据量大小,参考值 [500,5000]
* @param constructor 实体的构造函数
* @throws Exception
*/
public static <T> void reader(String dwDataApi, String api,
Map<String, Object> paramMap,
Consumer<List<T>> callback,
int batchSize,
Supplier<T> constructor) throws Exception {
dwDataApi = dwDataApi + api;
Response response = null;
InputStream is = null;
Reader reader = null;
JSONReader jsonReader = null;
try {
response = executeHttpPostRequest(dwDataApi, paramMap);
if (response.code() == 200) {
List<T> container = new ArrayList<>();
int total = 0;
// 从响应中获取流
is = response.body().byteStream();
reader = new InputStreamReader(is);
jsonReader = new JSONReader(reader);
// 开始读取
jsonReader.startArray();
while (jsonReader.hasNext()) {
try {
T t = constructor.get();
jsonReader.readObject(t);
container.add(t);
} catch (JSONException ex) {
Object o = jsonReader.readObject();
log.info("json数据转换异常:{}", o.toString());
}
if (container.size() == batchSize) {
callback.accept(container);
total += batchSize;
log.info("{} 流式读取已读条数:{}", dwDataApi, total);
container.clear();
}
}
// 处理最后一批不足 batchSize 的数据
if (container.size() > 0) {
callback.accept(container);
total += container.size();
log.info("{} 流式读取已读条数:{}", dwDataApi, total);
container.clear();
}
//结束读取
jsonReader.endArray();
} else {
log.info("流式读取异常 [ url:{} ] [ code:{} ]", dwDataApi, response.code());
}
} finally {
if (Objects.nonNull(jsonReader)) {
jsonReader.close();
}
if (Objects.nonNull(reader)) {
reader.close();
}
if (Objects.nonNull(is)) {
is.close();
}
if (Objects.nonNull(response)) {
response.close();
}
}
}
protected static Response executeHttpPostRequest(String url, Map<String, Object> paramMap) throws Exception {
OkHttpClient client = new OkHttpClient.Builder()
.connectTimeout(60 * 30 * 2, TimeUnit.SECONDS)
.readTimeout(60 * 1, TimeUnit.SECONDS)
.build();
StringBuilder sb = new StringBuilder(url);
if (paramMap != null && !paramMap.isEmpty()) {
boolean isFirst = true;
for (Map.Entry<String, Object> entry : paramMap.entrySet()) {
if (Objects.isNull(entry.getValue())) {
continue;
}
if (isFirst) {
sb.append("?");
isFirst = false;
} else {
sb.append("&");
}
// 兼容参数特殊符号
String value = String.valueOf(entry.getValue());
value = URLEncoder.encode(value, "UTF-8");
value = value.replaceAll("\\+", "%20");
sb.append(entry.getKey())
.append("=")
.append(value);
}
url = sb.toString();
}
Request request = new Request.Builder().url(url).build();
Response response = client.newCall(request).execute();
return response;
}
}