StreamReaderHandler.java 5.2 KB
package com.aukey.example.conf;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONReader;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
 * 描述:流式读取帮助类
 * 创建者: wgf
 * 创建时间:2020年5月20日 10:03:15
 **/
public class StreamReaderHandler {

    private static Logger log = LoggerFactory.getLogger(StreamReaderHandler.class);

    private StreamReaderHandler() {
    }

    /**
     * @param dwDataApi   dw数据请求接口
     * @param api         平台申请的api
     * @param paramMap    参数
     * @param callback    业务回调
     * @param batchSize   业务回调数据量大小,参考值 [500,5000]
     * @param constructor 实体的构造函数
     * @throws Exception
     */
    public static <T> void reader(String dwDataApi, String api,
                                  Map<String, Object> paramMap,
                                  Consumer<List<T>> callback,
                                  int batchSize,
                                  Supplier<T> constructor) throws Exception {

        dwDataApi = dwDataApi  + api;
        Response    response   = null;
        InputStream is         = null;
        Reader      reader     = null;
        JSONReader  jsonReader = null;

        try {
            response = executeHttpPostRequest(dwDataApi, paramMap);

            if (response.code() == 200) {
                List<T> container = new ArrayList<>();
                int     total     = 0;

                // 从响应中获取流
                is = response.body().byteStream();
                reader = new InputStreamReader(is);
                jsonReader = new JSONReader(reader);
                // 开始读取
                jsonReader.startArray();

                while (jsonReader.hasNext()) {
                    try {
                        T t = constructor.get();
                        jsonReader.readObject(t);
                        container.add(t);
                    } catch (JSONException ex) {
                        Object o = jsonReader.readObject();
                        log.info("json数据转换异常:{}", o.toString());
                    }

                    if (container.size() == batchSize) {
                        callback.accept(container);
                        total += batchSize;
                        log.info("{} 流式读取已读条数:{}", dwDataApi, total);
                        container.clear();
                    }
                }

                // 处理最后一批不足 batchSize 的数据
                if (container.size() > 0) {
                    callback.accept(container);
                    total += container.size();
                    log.info("{} 流式读取已读条数:{}", dwDataApi, total);
                    container.clear();
                }

                //结束读取
                jsonReader.endArray();
            } else {
                log.info("流式读取异常 [ url:{} ] [ code:{} ]", dwDataApi, response.code());
            }

        } finally {
            if (Objects.nonNull(jsonReader)) {
                jsonReader.close();
            }

            if (Objects.nonNull(reader)) {
                reader.close();
            }

            if (Objects.nonNull(is)) {
                is.close();
            }

            if (Objects.nonNull(response)) {
                response.close();
            }
        }
    }

    protected static Response executeHttpPostRequest(String url, Map<String, Object> paramMap) throws Exception {
        OkHttpClient client = new OkHttpClient.Builder()
                .connectTimeout(60 * 30 * 2, TimeUnit.SECONDS)
                .readTimeout(60 * 1, TimeUnit.SECONDS)
                .build();

        StringBuilder sb = new StringBuilder(url);
        if (paramMap != null && !paramMap.isEmpty()) {
            boolean isFirst = true;

            for (Map.Entry<String, Object> entry : paramMap.entrySet()) {

                if (Objects.isNull(entry.getValue())) {
                    continue;
                }

                if (isFirst) {
                    sb.append("?");
                    isFirst = false;
                } else {
                    sb.append("&");
                }

                // 兼容参数特殊符号
                String value = String.valueOf(entry.getValue());
                value = URLEncoder.encode(value, "UTF-8");
                value = value.replaceAll("\\+", "%20");

                sb.append(entry.getKey())
                        .append("=")
                        .append(value);
            }
            url = sb.toString();
        }

        Request  request  = new Request.Builder().url(url).build();
        Response response = client.newCall(request).execute();
        return response;
    }
}