|
|
package com.aukey.example.listener;
|
|
|
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
import com.alibaba.fastjson.TypeReference;
|
|
|
import com.aukey.example.constant.EventType;
|
|
|
import com.aukey.example.constant.MQConst;
|
|
|
import com.aukey.example.entity.AmazonOrder;
|
|
|
import com.aukey.example.vo.MessageVo;
|
|
|
import com.rabbitmq.client.Channel;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.amqp.core.Message;
|
|
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
|
/**
|
|
|
* @author: wgf
|
|
|
* @create: 2020-06-09 14:30
|
|
|
* @description: CanalUser监听器
|
|
|
**/
|
|
|
@Slf4j
|
|
|
@Component
|
|
|
public class CanalUserListener {
|
|
|
|
|
|
@RabbitListener(queues = "polaris_order_center.amazon_order")
|
|
|
public void receive(String message, Channel channel, Message messageEntity) throws IOException {
|
|
|
|
|
|
try {
|
|
|
|
|
|
log.info("接收到队列: {} 消息:{}", MQConst.TEST, message);
|
|
|
|
|
|
MessageVo<AmazonOrder> messageVo = JSON.parseObject(message, new TypeReference<MessageVo<AmazonOrder>>() {
|
|
|
});
|
|
|
|
|
|
EventType eventType = EventType.valueOf(messageVo.getType());
|
|
|
|
|
|
log.info("当前监听binlog 数据库:{}, 数据表:{}", messageVo.getDatabase(), messageVo.getTable());
|
|
|
|
|
|
// 数据同步只关注这三种事件
|
|
|
switch (eventType) {
|
|
|
case INSERT:
|
|
|
log.info("触发 INSERT 事件");
|
|
|
// messageVo.getData(); 获取数据变更
|
|
|
// TODO 自定义实现
|
|
|
break;
|
|
|
case UPDATE:
|
|
|
log.info("触发 UPDATE 事件");
|
|
|
// TODO 自定义实现
|
|
|
break;
|
|
|
case DELETE:
|
|
|
log.info("触发 DELETE 事件");
|
|
|
// TODO 自定义实现
|
|
|
break;
|
|
|
default:
|
|
|
log.info("其他事件类型:{}, 过滤不处理", eventType);
|
|
|
}
|
|
|
|
|
|
log.info("消息长度:{}", messageVo.getData().size());
|
|
|
|
|
|
/**
|
|
|
* 消息消费确认
|
|
|
* 如果客户端在线没有签收没有签收这条Message,则此消息进入Unacked状态,此时监听器阻塞等待消息确认,不推送新Message
|
|
|
* 如果待消息确认并且客户端下线,下次客户端上线重新推送上次Unacked状态Message
|
|
|
*/
|
|
|
channel.basicAck(messageEntity.getMessageProperties().getDeliveryTag(), false);
|
|
|
} catch (Exception e) {
|
|
|
/**
|
|
|
* 第一个参数deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,deliveryTag在channel范围内是唯一的
|
|
|
* 第二个参数multiple:批量确认标志。如果值为true,包含本条消息在内的、所有比该消息deliveryTag值小的 消息都被拒绝了(除了已经被 ack 的以外);如果值为false,只拒绝三本条消息
|
|
|
* 第三个参数requeue:表示如何处理这条消息,如果值为true,则重新放入RabbitMQ的发送队列,如果值为false,则通知RabbitMQ销毁这条消息
|
|
|
*/
|
|
|
//channel.basicNack(messageEntity.getMessageProperties().getDeliveryTag(), false,true);
|
|
|
e.printStackTrace();
|
|
|
}
|
|
|
}
|
|
|
} |