作者 [wgf]

消息队列数据同步demo

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.aukey.example</groupId>
<artifactId>canal-mq-client</artifactId>
<version>0.0.1</version>
<name>canal-mq-client</name>
<description>客户端demo项目</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
... ...
package com.aukey.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MqExampleApplication {
public static void main(String[] args) {
SpringApplication.run(MqExampleApplication.class, args);
}
}
... ...
package com.aukey.example.conf;
import com.aukey.example.converter.CharArrayToStringConverter;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author: wgf
* @create: 2020-06-10 18:52
* @description:
**/
@Configuration
public class RabbitConf {
/**
* spring boot 在2.2.7.RELEASE 及以上版本不用配置,因为新版本amqp兼容content-type为空的消息
* @param connectionFactory
* @return
*/
@Bean
public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory listenerContainerFactory = new SimpleRabbitListenerContainerFactory();
listenerContainerFactory.setConnectionFactory(connectionFactory);
//--加上这句
listenerContainerFactory.setMessageConverter(new CharArrayToStringConverter());
listenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return listenerContainerFactory;
}
}
... ...
package com.aukey.example.constant;
/**
* @author: wgf
* @create: 2020-06-09 15:03
* @description: Msql数据库事件类型枚举
* mysql binlog启用的是row模式,只有C,U,D操作
**/
public enum EventType {
INSERT,
UPDATE,
DELETE,
CREATE,
ALTER,
ERASE,
QUERY,
TRUNCATE,
RENAME,
CINDEX,
DINDEX,
GTID,
XACOMMIT,
XAROLLBACK,
MHEARTBEAT;
}
... ...
package com.aukey.example.constant;
/**
* @author: wgf
* @create: 2020-06-09 14:35
* @description: 消息队列配置
**/
public interface MQConst {
/**
* 测试队列
*/
String TEST = "";
}
... ...
package com.aukey.example.converter;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import java.io.UnsupportedEncodingException;
/**
* @author: wgf
* @create: 2020-06-10 22:39
* @description: 消息类型转换器
* 如果 spring boot 用的是 2.2.7.RELEASE以上的版本则不需要使用此转换器
**/
public class CharArrayToStringConverter implements MessageConverter {
private String defaultCharset = "utf-8";
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
// TODO 不发消息不实现转换
return null;
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
Object content = null;
MessageProperties properties = message.getMessageProperties();
if (properties != null) {
String contentType = properties.getContentType();
if (StringUtils.isBlank(contentType) || contentType.startsWith("text")) {
String encoding = properties.getContentEncoding();
if (encoding == null) {
encoding = this.defaultCharset;
}
try {
content = new String(message.getBody(), encoding);
} catch (UnsupportedEncodingException e) {
throw new MessageConversionException(
"failed to convert text-based Message content", e);
}
}
} else {
content = message.getBody();
}
return content;
}
}
... ...
package com.aukey.example.entity;
import lombok.Data;
import java.util.Date;
/**
* @author: wgf
* @create: 2020-06-09 11:30
* @description:
**/
@Data
public class AmazonOrder {
/**
* 自增主键
*/
private Integer aid;
/**
* 亚马逊订单ID
*/
private String amazonOrderId;
/**
* 卖家订单ID
*/
private String sellerOrderId;
/**
* 购买日期
*/
private Date purchaseDate;
/**
* 付款日期
*/
private Date paymentsDate;
/**
* 最后更新日期
*/
private Date lastUpdateDate;
/**
* 订单状态:1、Pending Availability等待订单生效;2、Pending待定;3、Unshipped未发货;4、Partially Shipped部分发货;5、Shipped已发货;6、Invoice Unconfirmed发票未确认;7、Canceled已取消;8、Unfulfillable无法发货
*/
private String orderStatus;
/**
* 发货渠道(AFN、MFN)
*/
private String fulfillmentChannel;
/**
* 销售渠道(站点)
*/
private String salesChannel;
/**
* 运输服务等级
*/
private String shipServiceLevel;
/**
* 收货地址-城市
*/
private String shippingAddressCity;
/**
* 收货地址-县
*/
private String shippingAddressCounty;
/**
* 收货地址-地区
*/
private String shippingAddressDistrict;
/**
* 收货地址-州
*/
private String shippingAddressStateOrRegion;
/**
* 收货地址-邮政编码
*/
private String shippingAddressPostalCode;
/**
* 收货地址-国家编码
*/
private String shippingAddressCountryCode;
private String shippingAddressPhone;
/**
* 订单总额(币种)
*/
private String orderTotalCurrencyCode;
/**
* 订单总额
*/
private Double orderTotalAmount;
/**
* 邮费总额
*/
private Double postageTotal;
/**
* 折扣总额
*/
private Double discountTotal;
/**
* 发货数量
*/
private Integer numberOfItemsShipped;
/**
* 未发货数量
*/
private Integer numberOfItemsUnshipped;
/**
* 付款方式
*/
private String paymentMethod;
/**
* 市场ID
*/
private String marketplaceId;
/**
* 买家邮箱
*/
private String buyerEmail;
/**
* 买家名称
*/
private String buyerName;
/**
* 出货服务等级类别
*/
private String shipmentServiceLevelCategory;
private String shippedByAmazonTfm;
private String tfmShipmentStatus;
private String cbaDisplayableShippingLabel;
/**
* 订单类型
*/
private String orderType;
/**
* 最早发货日期
*/
private Date earliestShipDate;
/**
* 最晚发货日期
*/
private Date latestShipDate;
/**
* 最早交货日期
*/
private Date earliestDeliveryDate;
/**
* 最晚交货日期
*/
private Date latestDeliveryDate;
private String isBusinessOrder;
/**
* 买家采购订单编号
*/
private String purchaseOrderNumber;
private String isPrime;
private String isPremiumOrder;
/**
* 抓单时间
*/
private Date importSysDate;
/**
* 店铺ID
*/
private Integer accountId;
/**
* 店铺简码
*/
private String accountCode;
/**
* 区域ID
*/
private Integer areaId;
/**
* 区域
*/
private String area;
/**
* 记录创建时间
*/
private Date createDate;
/**
* 记录更新时间
*/
private Date updateDate;
/**
* item表处理状态
*/
private String orderItemStatus;
private Integer siteId;
private String site;
/**
* 店铺站点ID
*/
private String authId;
private String dataDigest;
/**
* 是否补发订单
*/
private String reissue;
/**
* 收件人姓名
*/
private String shippingAddressName;
/**
* 收货地址-街道1
*/
private String shippingAddressLine1;
/**
* 收货地址-街道2
*/
private String shippingAddressLine2;
/**
* 收货地址-街道3
*/
private String shippingAddressLine3;
private String skus;
private String asins;
}
... ...
package com.aukey.example.listener;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.aukey.example.constant.EventType;
import com.aukey.example.constant.MQConst;
import com.aukey.example.entity.AmazonOrder;
import com.aukey.example.vo.MessageVo;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author: wgf
* @create: 2020-06-09 14:30
* @description: CanalUser监听器
**/
@Slf4j
@Component
public class CanalUserListener {
@RabbitListener(queues = "polaris_order_center.amazon_order")
public void receive(String message, Channel channel, Message messageEntity) throws IOException {
try {
log.info("接收到队列: {} 消息:{}", MQConst.TEST, message);
MessageVo<AmazonOrder> messageVo = JSON.parseObject(message, new TypeReference<MessageVo<AmazonOrder>>() {
});
EventType eventType = EventType.valueOf(messageVo.getType());
log.info("当前监听binlog 数据库:{}, 数据表:{}", messageVo.getDatabase(), messageVo.getTable());
// 数据同步只关注这三种事件
switch (eventType) {
case INSERT:
log.info("触发 INSERT 事件");
// messageVo.getData(); 获取数据变更
// TODO 自定义实现
break;
case UPDATE:
log.info("触发 UPDATE 事件");
// TODO 自定义实现
break;
case DELETE:
log.info("触发 DELETE 事件");
// TODO 自定义实现
break;
default:
log.info("其他事件类型:{}, 过滤不处理", eventType);
}
log.info("消息长度:{}", messageVo.getData().size());
/**
* 消息消费确认
* 如果客户端在线没有签收没有签收这条Message,则此消息进入Unacked状态,此时监听器阻塞等待消息确认,不推送新Message
* 如果待消息确认并且客户端下线,下次客户端上线重新推送上次Unacked状态Message
*/
channel.basicAck(messageEntity.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
/**
* 第一个参数deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,deliveryTag在channel范围内是唯一的
* 第二个参数multiple:批量确认标志。如果值为true,包含本条消息在内的、所有比该消息deliveryTag值小的 消息都被拒绝了(除了已经被 ack 的以外);如果值为false,只拒绝三本条消息
* 第三个参数requeue:表示如何处理这条消息,如果值为true,则重新放入RabbitMQ的发送队列,如果值为false,则通知RabbitMQ销毁这条消息
*/
//channel.basicNack(messageEntity.getMessageProperties().getDeliveryTag(), false,true);
e.printStackTrace();
}
}
}
... ...
package com.aukey.example.vo;
import lombok.Data;
import lombok.ToString;
import java.util.List;
import java.util.Map;
/**
* @author: wgf
* @create: 2020-06-09 10:44
* @description: 消息实体
**/
@Data
@ToString
public class MessageVo<T> {
// 数据库
private String database;
// 数据表
private String table;
// INSERT,DELETE,UPDATE
private String type;
// 最新版本binlog数据
private List<T> data;
// 旧版本binlog数据,只有UPDATE时才有值
private List<T> old;
// 主键字段
private String[] pkNames;
/**
* value 每个字段对应的sql规范数据枚举类型
* 参考 {@link java.sql.Types}
*/
private Map<String, Integer> sqlType;
// 每个字段对应的Mysql数据类型
private Map<String, String> mysqlType;
}
... ...
spring:
rabbitmq:
host: 121.37.17.48
port: 5666
virtual-host: canal
username: canal_read
password: uC4235OY@4
publisher-confirm-type: correlated # 开启发送确认
publisher-returns: true # 开启发送失败退回
listener: # 开启ack消费确认
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
... ...