DwHelperUtil.java 10.2 KB
package com.aukey.example.util;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONReader;
import com.alibaba.fastjson.TypeReference;
import com.aukey.example.vo.DwParamVo;
import com.aukey.example.web.TokenController;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

/**
 * @author: wgf
 * @create: 2020-05-13 11:21
 * @description: dw帮助工具类
 * <p>
 * 关于 java url 编码问题
 * Java官方的URLEncoder.encode 实际上是为了post请求的content-type为x-www-form-urlencoded来设计的
 * 在进行特殊参数转义的时候会将空格转为 +
 * 但是在 RFC1738、RFC2396协议中规定,GET请求的空格转为为 %20
 * 所以在发送GET请求的特殊参数中存在空格必须 先URLEncoder.encode 然后再用 %20 替换掉所有 + 号
 **/
public class DwHelperUtil {
    private static Logger log = LoggerFactory.getLogger(DwHelperUtil.class);

    private DwHelperUtil() {
    }

    /**
     * GET请求
     *
     * @param urlStr 请求url
     * @param params 请求参数
     * @return
     */
    public static String doGet(String urlStr, Map<String, Object> params) {

        if (StringUtils.isEmpty(urlStr)) {
            return null;
        }

        HttpURLConnection connection = null;
        InputStream       is         = null;
        BufferedReader    br         = null;
        String            result     = null;

        try {
            urlStr = jointUrl(urlStr, params);
            URL url = new URL(urlStr);
            connection = (HttpURLConnection) url.openConnection();
            connection.setRequestMethod("GET");
            connection.setConnectTimeout(15000);
            connection.setReadTimeout(60000);
            connection.connect();
            int responseCode = connection.getResponseCode();
            if (responseCode == 200) {
                is = connection.getInputStream();
                br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));
                StringBuilder sbf = new StringBuilder();
                String        temp;
                while ((temp = br.readLine()) != null) {
                    sbf.append(temp);
                    sbf.append("\r\n");
                }
                result = sbf.toString();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (null != br) {
                try {
                    br.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            if (null != is) {
                try {
                    is.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            if (connection != null) {
                connection.disconnect();// 关闭远程连接
            }
        }

        return result;
    }


    /**
     * @param dwDoMain 数据仓库域名
     * @param api      数据仓库申请的API
     * @param params   参数
     * @return
     */
    public static String doGet(String dwDoMain, String api, Map<String, Object> params) {
        return doGet(dwDoMain + api, params);
    }


    /**
     * url参数拼接
     *
     * @param url
     * @param params
     * @return
     */
    public static String jointUrl(String url, Map<String, Object> params) throws UnsupportedEncodingException {
        StringBuilder sb = new StringBuilder(url);
        if (params != null && !params.isEmpty()) {
            boolean isFirst = true;

            for (Map.Entry<String, Object> entry : params.entrySet()) {

                if (Objects.isNull(entry.getValue())) {
                    continue;
                }

                if (isFirst) {
                    sb.append("?");
                    isFirst = false;
                } else {
                    sb.append("&");
                }

                // 兼容参数特殊符号
                String value = String.valueOf(entry.getValue());
                value = URLEncoder.encode(value, "UTF-8");
                value = value.replaceAll("\\+", "%20");

                sb.append(entry.getKey())
                        .append("=")
                        .append(value);
            }

            url = sb.toString();
        }

        return url;
    }


    /**
     * 分页查询
     *
     * @param callBack      回调方法,需要返回总页数
     * @param dwDataApi     dw获取数据接口
     * @param api           申请的api
     * @param paramVo       请求参数
     * @param typeReference json字符串转实体引用类型
     * @param <T>
     */
    public static <T> void pageReader(Function<T, Integer> callBack,
                                      String dwDataApi,
                                      String api,
                                      DwParamVo paramVo,
                                      TypeReference<T> typeReference) {

        // 当前页
        int currentPageNumber = 0;
        // 总页数
        int totalPageNum = 1;

        // 分页请求
        do {
            currentPageNumber++;
            // 每次请求获取最新token
            paramVo.setToken(TokenController.getCurrentToken());
            paramVo.setPageNumber(currentPageNumber);

            String jsonStr = DwHelperUtil.doGet(dwDataApi, api, paramVo.toMap());
            if (StringUtils.isEmpty(jsonStr)) {
                throw new RuntimeException(String.format("API %s 调用失败", dwDataApi + api));
            }

            // json解析为对象
            T t = JSON.parseObject(jsonStr, typeReference);

            // 回调函数获取总页数
            int total = callBack.apply(t);

            if (currentPageNumber == 1) {
                totalPageNum = calculatePage(total, paramVo.getPageSize());
            }

            log.info("========== 获取API:{} 第:{}页数据 ==========", dwDataApi + api, currentPageNumber);
        } while (currentPageNumber < totalPageNum);
    }


    /**
     * 总页数计算
     *
     * @param total
     * @param pageSize
     * @return
     */
    private static int calculatePage(int total, int pageSize) {
        int result = 1;
        if (total % pageSize == 0) {
            result = total / pageSize;
        } else {
            result = total / pageSize + 1;
        }

        return result;
    }


    /**
     * 流式读取
     *
     * @param dwDataApi   dw数据请求接口
     * @param api         平台申请的api
     * @param paramMap    参数
     * @param callback    业务回调
     * @param batchSize   业务回调数据量大小,参考值 [500,5000]
     * @param constructor 实体的构造函数
     * @throws Exception
     */
    public static <T> void streamReader(String dwDataApi, String api,
                                        Map<String, Object> paramMap,
                                        Consumer<List<T>> callback,
                                        int batchSize,
                                        Supplier<T> constructor) throws Exception {

        dwDataApi = dwDataApi + api;
        Response    response   = null;
        InputStream is         = null;
        Reader      reader     = null;
        JSONReader  jsonReader = null;

        try {
            response = executeHttpPostRequest(dwDataApi, paramMap);

            if (response.code() == 200) {
                List<T> container = new ArrayList<>();
                int     total     = 0;

                // 从响应中获取流
                is = response.body().byteStream();
                reader = new InputStreamReader(is);
                jsonReader = new JSONReader(reader);
                // 开始读取
                jsonReader.startArray();

                while (jsonReader.hasNext()) {
                    T t = constructor.get();
                    jsonReader.readObject(t);
                    container.add(t);

                    if (container.size() == batchSize) {
                        callback.accept(container);
                        total += batchSize;
                        log.info("{} 流式读取已读条数:{}", dwDataApi, total);
                        container.clear();
                    }
                }

                // 处理最后一批不足 batchSize 的数据
                if (container.size() > 0) {
                    callback.accept(container);
                    total += container.size();
                    log.info("{} 流式读取已读条数:{}", dwDataApi, total);
                    container.clear();
                }
            } else {
                log.info("流式读取异常 [ url:{} ] [ code:{} ]", dwDataApi, response.code());
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            throw e;
        } finally {
            //结束读取
            jsonReader.endArray();

            if (Objects.nonNull(jsonReader)) {
                jsonReader.close();
            }

            if (Objects.nonNull(reader)) {
                reader.close();
            }

            if (Objects.nonNull(is)) {
                is.close();
            }

            if (Objects.nonNull(response)) {
                response.close();
            }
        }
    }


    protected static Response executeHttpPostRequest(String url, Map<String, Object> paramMap) throws Exception {
        OkHttpClient client = new OkHttpClient.Builder()
                .connectTimeout(60 * 60 * 2, TimeUnit.SECONDS)
                .writeTimeout(60 * 2, TimeUnit.SECONDS)
                .readTimeout(60 * 2, TimeUnit.SECONDS)
                .build();

        url = jointUrl(url, paramMap);

        Request  request  = new Request.Builder().url(url).build();
        Response response = client.newCall(request).execute();
        return response;
    }
}