作者 [wgf]

添加入库实现

... ... @@ -32,7 +32,7 @@
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
<version>1.2.73</version>
</dependency>
<dependency>
... ... @@ -46,6 +46,24 @@
<artifactId>lombok</artifactId>
<version>1.18.10</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.13</version>
</dependency>
<dependency>
<groupId>tk.mybatis</groupId>
<artifactId>mapper-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.3.2</version>
</dependency>
</dependencies>
<build>
... ... @@ -54,6 +72,30 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<!-- mybatis自动生成插件 -->
<plugin>
<groupId>org.mybatis.generator</groupId>
<artifactId>mybatis-generator-maven-plugin</artifactId>
<version>1.3.5</version>
<configuration>
<configurationFile>src/main/resources/generator/generatorConfig.xml</configurationFile>
<overwrite>true</overwrite>
<verbose>true</verbose>
</configuration>
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.13</version>
</dependency>
<dependency>
<groupId>tk.mybatis</groupId>
<artifactId>mapper</artifactId>
<version>3.4.4</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
... ...
package com.aukey.example.commom.mapper;
import tk.mybatis.mapper.common.BaseMapper;
import tk.mybatis.mapper.common.ConditionMapper;
import tk.mybatis.mapper.common.ExampleMapper;
import tk.mybatis.mapper.common.IdsMapper;
import tk.mybatis.mapper.common.special.InsertListMapper;
/**
* @author wgf
* 定制版MyBatis Mapper插件接口,如需其他接口参考官方文档自行添加。
*/
public interface Mapper<T> extends BaseMapper<T>, ConditionMapper<T>, IdsMapper<T>, InsertListMapper<T>, ExampleMapper<T> {
}
... ...
package com.aukey.example.conf;
import com.aukey.example.commom.mapper.Mapper;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import tk.mybatis.mapper.entity.Config;
import tk.mybatis.mapper.mapperhelper.MapperHelper;
import tk.mybatis.spring.annotation.MapperScan;
import javax.sql.DataSource;
import java.util.Properties;
@Configuration
@MapperScan(basePackages = FbaStockDataSourceConf.SCAN_PACKAGE, sqlSessionFactoryRef = "fbaStockSqlSessionFactory")
public class FbaStockDataSourceConf {
public static final String SCAN_PACKAGE = "com.aukey.example.mapper.fbaStock";
public static final String MAPPER_LOCATION = "classpath*:mapper/fbaStock/*.xml";
@Bean
@ConfigurationProperties(prefix = "spring.datasource.fba-stock")
public DataSource fbaStockDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
public DataSourceTransactionManager fbaStockTransactionManager(@Qualifier("fbaStockDataSource") DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
@Bean
public SqlSessionFactory fbaStockSqlSessionFactory(
@Qualifier("fbaStockDataSource") DataSource dataSource
) throws Exception {
final SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
sessionFactory.setDataSource(dataSource);
//更多详细配置见: https://pagehelper.github.io/docs/howtouse/
Properties properties = new Properties();
properties.setProperty("helperDialect", "mysql"); //方言
properties.setProperty("rowBoundsWithCount", "true"); //使用 RowBounds 分页会进行 count 查询
properties.setProperty("reasonable", "true"); //pageNum<=0 时会查询第一页, pageNum>pages(超过总数时),会查询最后一页
properties.setProperty("pageSizeZero", "true"); //如果 pageSize=0 或者 RowBounds.limit = 0 就会查询出全部的结果
properties.setProperty("supportMethodsArguments", "true"); //支持通过 Mapper 接口参数来传递分页参数
properties.setProperty("offsetAsPageNum", "true"); //将 RowBounds 中的 offset 参数当成 pageNum 使用
//设置mapper location
sessionFactory.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(MAPPER_LOCATION));
return sessionFactory.getObject();
}
@Bean
@Primary
public SqlSessionTemplate fbaStockTestSqlSessionTemplate(@Qualifier("fbaStockSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
return new SqlSessionTemplate(sqlSessionFactory);
}
/**
* Mybatis 通用Mapper配置
*
* @param sqlSessionFactory
* @return
*/
@Bean
public MapperHelper fbaStockMapperHelper(@Qualifier("fbaStockSqlSessionFactory") SqlSessionFactory sqlSessionFactory) {
MapperHelper mapperHelper = new MapperHelper();
//特殊配置
Config config = new Config();
config.setNotEmpty(false);
config.setIDENTITY("MYSQL");
//更多详细配置: http://git.oschina.net/free/Mapper/blob/master/wiki/mapper3/2.Integration.md
mapperHelper.setConfig(config);
mapperHelper.registerMapper(Mapper.class);
mapperHelper.processConfiguration(sqlSessionFactory.getConfiguration());
return mapperHelper;
}
}
... ...
package com.aukey.example.entity.fbaStock;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import javax.persistence.Column;
import javax.persistence.Id;
import javax.persistence.Table;
import javax.persistence.Transient;
import java.io.UnsupportedEncodingException;
import java.util.Date;
@Data
@Table(name = "amazon_order")
public class AmazonOrder {
/**
* 自增主键
*/
@Id
private Integer aid;
/**
* 亚马逊订单ID
*/
@Column(name = "amazon_order_id")
private String amazonOrderId;
/**
* 卖家订单ID
*/
@Column(name = "seller_order_id")
private String sellerOrderId;
/**
* 购买日期
*/
@Column(name = "purchase_date")
private Date purchaseDate;
/**
* 付款日期
*/
@Column(name = "payments_date")
private Date paymentsDate;
/**
* 最后更新日期
*/
@Column(name = "last_update_date")
private Date lastUpdateDate;
/**
* 订单状态:1、Pending Availability等待订单生效;2、Pending待定;3、Unshipped未发货;4、Partially Shipped部分发货;5、Shipped已发货;6、Invoice Unconfirmed发票未确认;7、Canceled已取消;8、Unfulfillable无法发货
*/
@Column(name = "order_status")
private String orderStatus;
/**
* 发货渠道(AFN、MFN)
*/
@Column(name = "fulfillment_channel")
private String fulfillmentChannel;
/**
* 销售渠道(站点)
*/
@Column(name = "sales_channel")
private String salesChannel;
/**
* 运输服务等级
*/
@Column(name = "ship_service_level")
private String shipServiceLevel;
/**
* 收货地址-城市
*/
@Column(name = "shipping_address_city")
private String shippingAddressCity;
/**
* 收货地址-县
*/
@Column(name = "shipping_address_county")
private String shippingAddressCounty;
/**
* 收货地址-地区
*/
@Column(name = "shipping_address_district")
private String shippingAddressDistrict;
/**
* 收货地址-州
*/
@Column(name = "shipping_address_stateOrRegion")
private String shippingAddressStateorregion;
/**
* 收货地址-邮政编码
*/
@Column(name = "shipping_address_postalCode")
private String shippingAddressPostalcode;
/**
* 收货地址-国家编码
*/
@Column(name = "shipping_address_countryCode")
private String shippingAddressCountrycode;
@Column(name = "shipping_address_phone")
private String shippingAddressPhone;
/**
* 订单总额(币种)
*/
@Column(name = "order_total_currency_code")
private String orderTotalCurrencyCode;
/**
* 订单总额
*/
@Column(name = "order_total_amount")
private Double orderTotalAmount;
/**
* 邮费总额
*/
@Column(name = "postage_total")
private Double postageTotal;
/**
* 折扣总额
*/
@Column(name = "discount_total")
private Double discountTotal;
/**
* 发货数量
*/
@Column(name = "number_of_items_shipped")
private Integer numberOfItemsShipped;
/**
* 未发货数量
*/
@Column(name = "number_of_items_unshipped")
private Integer numberOfItemsUnshipped;
/**
* 付款方式
*/
@Column(name = "payment_method")
private String paymentMethod;
/**
* 市场ID
*/
@Column(name = "marketplace_id")
private String marketplaceId;
/**
* 买家邮箱
*/
@Column(name = "buyer_email")
private String buyerEmail;
/**
* 买家名称
*/
@Column(name = "buyer_name")
private String buyerName;
/**
* 出货服务等级类别
*/
@Column(name = "shipment_service_level_category")
private String shipmentServiceLevelCategory;
@Column(name = "shipped_by_amazon_tfm")
private String shippedByAmazonTfm;
@Column(name = "tfm_shipment_status")
private String tfmShipmentStatus;
@Column(name = "cba_displayable_shipping_label")
private String cbaDisplayableShippingLabel;
/**
* 订单类型
*/
@Column(name = "order_type")
private String orderType;
/**
* 最早发货日期
*/
@Column(name = "earliest_ship_date")
private Date earliestShipDate;
/**
* 最晚发货日期
*/
@Column(name = "latest_ship_date")
private Date latestShipDate;
/**
* 最早交货日期
*/
@Column(name = "earliest_delivery_date")
private Date earliestDeliveryDate;
/**
* 最晚交货日期
*/
@Column(name = "latest_delivery_date")
private Date latestDeliveryDate;
@Column(name = "is_business_order")
private String isBusinessOrder;
/**
* 买家采购订单编号
*/
@Column(name = "purchase_order_number")
private String purchaseOrderNumber;
@Column(name = "is_prime")
private String isPrime;
@Column(name = "is_premium_order")
private String isPremiumOrder;
/**
* 是否导入E登:0未导入;1已导入
*/
@Column(name = "is_import_ed")
private String isImportEd;
/**
* 导入E登时间
*/
@Column(name = "import_ed_date")
private Date importEdDate;
/**
* 抓单时间
*/
@Column(name = "import_sys_date")
private Date importSysDate;
/**
* 店铺ID
*/
@Column(name = "account_id")
private Integer accountId;
/**
* 店铺简码
*/
@Column(name = "account_code")
private String accountCode;
/**
* 区域ID
*/
@Column(name = "area_id")
private Integer areaId;
/**
* 区域
*/
private String area;
/**
* 记录创建时间
*/
@Column(name = "create_date")
private Date createDate;
/**
* 记录更新时间
*/
@Column(name = "update_date")
private Date updateDate;
/**
* item表处理状态
*/
@Column(name = "order_item_status")
private String orderItemStatus;
/**
* 收件人姓名
*/
@Column(name = "shipping_address_name")
private byte[] shippingAddressName;
/**
* 收货地址-街道1
*/
@Column(name = "shipping_address_line1")
private byte[] shippingAddressLine1;
/**
* 收货地址-街道2
*/
@Column(name = "shipping_address_line2")
private byte[] shippingAddressLine2;
/**
* 收货地址-街道3
*/
@Column(name = "shipping_address_line3")
private byte[] shippingAddressLine3;
@Transient
@JSONField(name = "shipping_address_name")
private String shippingAddressNameStr;
@Transient
@JSONField(name = "shipping_address_line1")
private String shippingAddressLine1Str;
@Transient
@JSONField(name = "shipping_address_line2")
private String shippingAddressLine2Str;
@Transient
@JSONField(name = "shipping_address_line3")
private String shippingAddressLine3Str;
public byte[] getShippingAddressName() {
if (StringUtils.isNotBlank(shippingAddressNameStr)) {
try {
return shippingAddressNameStr.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return null;
}
public byte[] getShippingAddressLine1() {
if (StringUtils.isNotBlank(shippingAddressLine1Str)) {
try {
return shippingAddressLine1Str.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return null;
}
public byte[] getShippingAddressLine2() {
if (StringUtils.isNotBlank(shippingAddressLine2Str)) {
try {
return shippingAddressLine2Str.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return null;
}
public byte[] getShippingAddressLine3() {
if (StringUtils.isNotBlank(shippingAddressLine3Str)) {
try {
return shippingAddressLine3Str.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return null;
}
}
\ No newline at end of file
... ...
package com.aukey.example.entity.fbaStock;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import javax.persistence.Column;
import javax.persistence.Id;
import javax.persistence.Table;
import javax.persistence.Transient;
import java.io.UnsupportedEncodingException;
import java.util.Date;
@Data
@Table(name = "amazon_order_item")
public class AmazonOrderItem {
/**
* 自增主键
*/
@Id
private Integer aid;
/**
* 亚马逊订单ID
*/
@Column(name = "amazon_order_id")
private String amazonOrderId;
/**
* 亚马逊SKU
*/
private String asin;
/**
* 卖家SKU
*/
@Column(name = "seller_sku")
private String sellerSku;
/**
* 订单项ID
*/
@Column(name = "order_item_id")
private String orderItemId;
/**
* 订购数量
*/
@Column(name = "quantity_ordered")
private Integer quantityOrdered;
/**
* 发货数量
*/
@Column(name = "quantity_shipped")
private Integer quantityShipped;
@Column(name = "points_number")
private Integer pointsNumber;
@Column(name = "points_currency_code")
private String pointsCurrencyCode;
@Column(name = "points_amount")
private Double pointsAmount;
/**
* sku币种
*/
@Column(name = "item_price_currency_code")
private String itemPriceCurrencyCode;
/**
* sku价格
*/
@Column(name = "item_price_amount")
private Double itemPriceAmount;
/**
* 邮费币种
*/
@Column(name = "shipping_price_currency_code")
private String shippingPriceCurrencyCode;
/**
* 邮费
*/
@Column(name = "shipping_price_amount")
private Double shippingPriceAmount;
/**
* 礼品包装币种
*/
@Column(name = "gift_wrap_price_currency_code")
private String giftWrapPriceCurrencyCode;
/**
* 礼品包装价格
*/
@Column(name = "gift_wrap_price_amount")
private Double giftWrapPriceAmount;
/**
* 价格税币种
*/
@Column(name = "item_tax_currency_code")
private String itemTaxCurrencyCode;
/**
* 价格税
*/
@Column(name = "item_tax_amount")
private Double itemTaxAmount;
/**
* 邮费税币种
*/
@Column(name = "shipping_tax_currency_code")
private String shippingTaxCurrencyCode;
/**
* 邮费税
*/
@Column(name = "shipping_tax_amount")
private Double shippingTaxAmount;
/**
* 礼品包装税币种
*/
@Column(name = "gift_wrap_tax_currency_code")
private String giftWrapTaxCurrencyCode;
/**
* 礼品包装价格税
*/
@Column(name = "gift_wrap_tax_amount")
private Double giftWrapTaxAmount;
/**
* 邮费折扣币种
*/
@Column(name = "shipping_discount_currency_code")
private String shippingDiscountCurrencyCode;
/**
* 邮费折扣
*/
@Column(name = "shipping_discount_amount")
private Double shippingDiscountAmount;
/**
* 促销折扣币种
*/
@Column(name = "promotion_discount_currency_code")
private String promotionDiscountCurrencyCode;
/**
* 促销折扣
*/
@Column(name = "promotion_discount_amount")
private Double promotionDiscountAmount;
/**
* 促销IDS
*/
@Column(name = "promotion_ids")
private String promotionIds;
/**
* COD费币种
*/
@Column(name = "cod_fee_currency_code")
private String codFeeCurrencyCode;
/**
* COD费
*/
@Column(name = "cod_fee_amount")
private Double codFeeAmount;
/**
* COD折扣币种
*/
@Column(name = "cod_fee_discount_currency_code")
private String codFeeDiscountCurrencyCode;
/**
* COD折扣
*/
@Column(name = "cod_fee_discount_amount")
private Double codFeeDiscountAmount;
/**
* 礼品包装级别
*/
@Column(name = "gift_wrap_level")
private String giftWrapLevel;
/**
* 发票要求
*/
@Column(name = "invoice_requirement")
private String invoiceRequirement;
/**
* 买家选择发票类别
*/
@Column(name = "buyer_selected_invoice_category")
private String buyerSelectedInvoiceCategory;
/**
* 发票抬头
*/
@Column(name = "invoice_title")
private String invoiceTitle;
/**
* 发票信息
*/
@Column(name = "invoice_information")
private String invoiceInformation;
/**
* 条件备注
*/
@Column(name = "condition_note")
private String conditionNote;
/**
* 条件Id
*/
@Column(name = "condition_id")
private String conditionId;
/**
* 子条件Id
*/
@Column(name = "condition_subtype_id")
private String conditionSubtypeId;
/**
* 预定交货开始日期
*/
@Column(name = "scheduled_delivery_start_date")
private Date scheduledDeliveryStartDate;
/**
* 预定交货结束日期
*/
@Column(name = "scheduled_delivery_end_date")
private Date scheduledDeliveryEndDate;
/**
* 价格标示
*/
@Column(name = "price_designation")
private String priceDesignation;
/**
* 定制URL
*/
@Column(name = "customized_url")
private String customizedUrl;
/**
* 发货渠道(AFN、MFN)
*/
@Column(name = "fulfillment_channel")
private String fulfillmentChannel;
/**
* 店铺ID
*/
@Column(name = "account_id")
private Integer accountId;
/**
* 店铺简码
*/
@Column(name = "account_code")
private String accountCode;
/**
* 区域ID
*/
@Column(name = "area_id")
private Integer areaId;
/**
* 区域
*/
private String area;
/**
* 记录创建时间
*/
@Column(name = "create_date")
private Date createDate;
/**
* 记录更新时间
*/
@Column(name = "update_date")
private Date updateDate;
@Column(name = "order_header_id")
private Integer orderHeaderId;
/**
* 产品标题
*/
@Column(name = "title")
private byte[] titleAlias;
/**
* 礼品消息文本
*/
@Column(name = "gift_message_text")
private byte[] giftMessageText;
@Transient
@JSONField(name = "title")
private String titleStr;
@Transient
@JSONField(name = "gift_message_text")
private String giftMessageTextStr;
public byte[] getTitleAlias() {
if (StringUtils.isNotBlank(titleStr)) {
try {
return titleStr.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return null;
}
public byte[] getGiftMessageText() {
if (StringUtils.isNotBlank(giftMessageTextStr)) {
try {
return giftMessageTextStr.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return null;
}
}
\ No newline at end of file
... ...
package com.aukey.example.listener;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.aukey.example.constant.EventType;
import com.aukey.example.constant.MQConst;
import com.aukey.example.entity.fbaStock.AmazonOrderItem;
import com.aukey.example.service.fbaStock.AmazonOrderItemService;
import com.aukey.example.vo.MessageVo;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author: wgf
* @create: 2020-06-09 14:30
* @description: CanalUser监听器
**/
@Slf4j
@Component
public class AmazonOrderItemListener {
@Autowired
AmazonOrderItemService amazonOrderItemService;
@RabbitListener(queues = "polaris_order_center.amazon_order_item")
public void receive(String message, Channel channel, Message messageEntity) throws IOException {
try {
log.info("接收到队列: {} 消息:{}", MQConst.TEST, message);
MessageVo<AmazonOrderItem> messageVo = JSON.parseObject(message, new TypeReference<MessageVo<AmazonOrderItem>>() {
});
EventType eventType = EventType.valueOf(messageVo.getType());
log.info("当前监听binlog 数据库:{}, 数据表:{}", messageVo.getDatabase(), messageVo.getTable());
// 数据同步只关注这三种事件
switch (eventType) {
case INSERT:
log.info("触发 INSERT 事件");
this.amazonOrderItemService.insertOrUpdate(messageVo.getData());
break;
case UPDATE:
log.info("触发 UPDATE 事件");
this.amazonOrderItemService.insertOrUpdate(messageVo.getData());
break;
case DELETE:
log.info("触发 DELETE 事件");
this.amazonOrderItemService.delete(messageVo.getData());
break;
default:
log.info("其他事件类型:{}, 过滤不处理", eventType);
}
log.info("消息长度:{}", messageVo.getData().size());
/**
* 消息消费确认
* 如果客户端在线没有签收没有签收这条Message,则此消息进入Unacked状态,此时监听器阻塞等待消息确认,不推送新Message
* 如果待消息确认并且客户端下线,下次客户端上线重新推送上次Unacked状态Message
*/
channel.basicAck(messageEntity.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
/**
* 第一个参数deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,deliveryTag在channel范围内是唯一的
* 第二个参数multiple:批量确认标志。如果值为true,包含本条消息在内的、所有比该消息deliveryTag值小的 消息都被拒绝了(除了已经被 ack 的以外);如果值为false,只拒绝三本条消息
* 第三个参数requeue:表示如何处理这条消息,如果值为true,则重新放入RabbitMQ的发送队列,如果值为false,则通知RabbitMQ销毁这条消息
*/
//channel.basicNack(messageEntity.getMessageProperties().getDeliveryTag(), false,true);
e.printStackTrace();
}
}
}
... ...
package com.aukey.example.listener;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.aukey.example.constant.EventType;
import com.aukey.example.constant.MQConst;
import com.aukey.example.entity.fbaStock.AmazonOrder;
import com.aukey.example.service.fbaStock.AmazonOrderService;
import com.aukey.example.vo.MessageVo;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author: wgf
* @create: 2020-06-09 14:30
* @description: CanalUser监听器
**/
@Slf4j
@Component
public class AmazonOrderListener {
@Autowired
AmazonOrderService amazonOrderService;
@RabbitListener(queues = "polaris_order_center.amazon_order")
public void receive(String message, Channel channel, Message messageEntity) throws IOException {
try {
log.info("接收到队列: {} 消息:{}", MQConst.TEST, message);
MessageVo<AmazonOrder> messageVo = JSON.parseObject(message, new TypeReference<MessageVo<AmazonOrder>>() {
});
EventType eventType = EventType.valueOf(messageVo.getType());
log.info("当前监听binlog 数据库:{}, 数据表:{}", messageVo.getDatabase(), messageVo.getTable());
// 数据同步只关注这三种事件
switch (eventType) {
case INSERT:
log.info("触发 INSERT 事件");
this.amazonOrderService.insertOrUpdate(messageVo.getData());
break;
case UPDATE:
log.info("触发 UPDATE 事件");
this.amazonOrderService.insertOrUpdate(messageVo.getData());
break;
case DELETE:
log.info("触发 DELETE 事件");
this.amazonOrderService.delete(messageVo.getData());
break;
default:
log.info("其他事件类型:{}, 过滤不处理", eventType);
}
log.info("消息长度:{}", messageVo.getData().size());
/**
* 消息消费确认
* 如果客户端在线没有签收没有签收这条Message,则此消息进入Unacked状态,此时监听器阻塞等待消息确认,不推送新Message
* 如果待消息确认并且客户端下线,下次客户端上线重新推送上次Unacked状态Message
*/
channel.basicAck(messageEntity.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
/**
* 第一个参数deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,deliveryTag在channel范围内是唯一的
* 第二个参数multiple:批量确认标志。如果值为true,包含本条消息在内的、所有比该消息deliveryTag值小的 消息都被拒绝了(除了已经被 ack 的以外);如果值为false,只拒绝三本条消息
* 第三个参数requeue:表示如何处理这条消息,如果值为true,则重新放入RabbitMQ的发送队列,如果值为false,则通知RabbitMQ销毁这条消息
*/
//channel.basicNack(messageEntity.getMessageProperties().getDeliveryTag(), false,true);
e.printStackTrace();
}
}
}
... ...
package com.aukey.example.mapper.fbaStock;
import com.aukey.example.commom.mapper.Mapper;
import com.aukey.example.entity.fbaStock.AmazonOrderItem;
public interface AmazonOrderItemMapper extends Mapper<AmazonOrderItem> {
}
\ No newline at end of file
... ...
package com.aukey.example.mapper.fbaStock;
import com.aukey.example.commom.mapper.Mapper;
import com.aukey.example.entity.fbaStock.AmazonOrder;
public interface AmazonOrderMapper extends Mapper<AmazonOrder> {
}
\ No newline at end of file
... ...
package com.aukey.example.service.fbaStock;
import com.aukey.example.entity.fbaStock.AmazonOrderItem;
import java.util.List;
/**
* @author: wgf
* @create: 2020-08-25 14:36
* @description:
**/
public interface AmazonOrderItemService {
/**
* demo 具体实现自己优化
*
* @param list
*/
void insertOrUpdate(List<AmazonOrderItem> list);
/**
* demo 具体实现自己优化
*
* @param list
*/
void delete(List<AmazonOrderItem> list);
}
... ...
package com.aukey.example.service.fbaStock;
import com.aukey.example.entity.fbaStock.AmazonOrder;
import java.util.List;
/**
* @author: wgf
* @create: 2020-08-25 14:36
* @description:
**/
public interface AmazonOrderService {
/**
* demo 具体实现自己优化
* @param list
*/
void insertOrUpdate(List<AmazonOrder> list);
/**
* demo 具体实现自己优化
* @param list
*/
void delete(List<AmazonOrder> list);
}
... ...
package com.aukey.example.service.fbaStock.impl;
import com.aukey.example.entity.fbaStock.AmazonOrderItem;
import com.aukey.example.mapper.fbaStock.AmazonOrderItemMapper;
import com.aukey.example.service.fbaStock.AmazonOrderItemService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.Objects;
/**
* @author: wgf
* @create: 2020-08-25 14:36
* @description:
**/
@Service
public class AmazonOrderItemServiceImpl implements AmazonOrderItemService {
@Autowired
AmazonOrderItemMapper amazonOrderItemMapper;
@Override
@Transactional(value = "fbaStockTransactionManager")
public void insertOrUpdate(List<AmazonOrderItem> list) {
for (AmazonOrderItem amazonOrderItem : list) {
AmazonOrderItem query = new AmazonOrderItem();
query.setAccountId(amazonOrderItem.getAccountId());
query.setAreaId(amazonOrderItem.getAreaId());
query.setAmazonOrderId(amazonOrderItem.getAmazonOrderId());
AmazonOrderItem existOrder = this.amazonOrderItemMapper.selectOne(query);
// 主键和 headId不能使用
amazonOrderItem.setOrderHeaderId(null);
if (Objects.isNull(existOrder)) {
amazonOrderItem.setAid(null);
this.amazonOrderItemMapper.insertSelective(amazonOrderItem);
} else {
amazonOrderItem.setAid(existOrder.getAid());
this.amazonOrderItemMapper.updateByPrimaryKeySelective(amazonOrderItem);
}
}
}
@Override
@Transactional(value = "fbaStockTransactionManager")
public void delete(List<AmazonOrderItem> list) {
AmazonOrderItem condition = new AmazonOrderItem();
for (AmazonOrderItem amazonOrderItem : list) {
condition.setAccountId(amazonOrderItem.getAccountId());
condition.setAreaId(amazonOrderItem.getAreaId());
condition.setAmazonOrderId(amazonOrderItem.getAmazonOrderId());
this.amazonOrderItemMapper.delete(condition);
}
}
}
... ...
package com.aukey.example.service.fbaStock.impl;
import com.aukey.example.entity.fbaStock.AmazonOrder;
import com.aukey.example.mapper.fbaStock.AmazonOrderMapper;
import com.aukey.example.service.fbaStock.AmazonOrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.Objects;
/**
* @author: wgf
* @create: 2020-08-25 14:36
* @description:
**/
@Service
public class AmazonOrderServiceImpl implements AmazonOrderService {
@Autowired
AmazonOrderMapper amazonOrderMapper;
@Override
@Transactional(value = "fbaStockTransactionManager")
public void insertOrUpdate(List<AmazonOrder> list) {
for (AmazonOrder amazonOrder : list) {
AmazonOrder query = new AmazonOrder();
query.setAccountId(amazonOrder.getAccountId());
query.setAreaId(amazonOrder.getAreaId());
query.setAmazonOrderId(amazonOrder.getAmazonOrderId());
AmazonOrder existOrder = this.amazonOrderMapper.selectOne(query);
// 主键不能使用
if (Objects.isNull(existOrder)) {
amazonOrder.setAid(null);
this.amazonOrderMapper.insertSelective(amazonOrder);
} else {
amazonOrder.setAid(existOrder.getAid());
this.amazonOrderMapper.updateByPrimaryKeySelective(amazonOrder);
}
}
}
@Override
@Transactional(value = "fbaStockTransactionManager")
public void delete(List<AmazonOrder> list) {
AmazonOrder condition = new AmazonOrder();
for (AmazonOrder amazonOrder : list) {
condition.setAccountId(amazonOrder.getAccountId());
condition.setAreaId(amazonOrder.getAreaId());
condition.setAmazonOrderId(amazonOrder.getAmazonOrderId());
this.amazonOrderMapper.delete(condition);
}
}
}
... ...
... ... @@ -12,3 +12,19 @@ spring:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
datasource:
fba-stock:
driver-class-name: com.mysql.jdbc.Driver
jdbc-url: jdbc:mysql://10.1.1.86:3306/fba_stock?useUnicode=true&characterEncoding=utf8&useSSL=true&allowMultiQueries=true
username: xmdo
password: xmdao123#li
max-idle: 20
min-idle: 20
initial-size: 20
max-wait: 10000
validation-query: SELECT 1 FROM DUAL
test-on-borrow: false
test-while-idle: true
time-between-eviction-runs-millis: 18800
\ No newline at end of file
... ...
jdbc.driverClassName=com.mysql.jdbc.Driver
jdbc.url=jdbc:mysql://10.1.1.86:3306/fba_stock?useUnicode=true&characterEncoding=utf8&useSSL=true&allowMultiQueries=true
jdbc.username=xmdo
jdbc.password=xmdao123#li
targetModelPackage =com.aukey.example.entity.fbaStock
targetJavaProject = src/main/java
targetMapperPackage=com.aukey.example.mapper.fbaStock
targetXMLPackage = mapper/fbaStock
targetResourcesProject = src/main/resources
tableName = amazon_order_item
... ...
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE generatorConfiguration
PUBLIC "-//mybatis.org//DTD MyBatis Generator Configuration 1.0//EN"
"http://mybatis.org/dtd/mybatis-generator-config_1_0.dtd">
<generatorConfiguration>
<properties resource="generator/config.properties"/>
<context id="Mysql" targetRuntime="MyBatis3Simple" defaultModelType="flat">
<plugin type="tk.mybatis.mapper.generator.MapperPlugin">
<property name="mappers" value="com.aukey.example.commom.mapper.Mapper"/>
<!-- caseSensitive默认false,当数据库表名区分大小写时,可以将该属性设置为true -->
<property name="caseSensitive" value="false"/>
</plugin>
<jdbcConnection driverClass="${jdbc.driverClassName}"
connectionURL="${jdbc.url}"
userId="${jdbc.username}"
password="${jdbc.password}">
</jdbcConnection>
<!-- model生成配置 -->
<javaModelGenerator targetPackage="${targetModelPackage}" targetProject="${targetJavaProject}"/>
<!-- xml映射文件生成配置 -->
<sqlMapGenerator targetPackage="${targetXMLPackage}" targetProject="${targetResourcesProject}"/>
<!-- mapper生成配置 -->
<javaClientGenerator targetPackage="${targetMapperPackage}" targetProject="${targetJavaProject}" type="XMLMAPPER" />
<table tableName="${tableName}" >
<generatedKey column="id" sqlStatement="Mysql" identity="true"/>
</table>
</context>
</generatorConfiguration>
\ No newline at end of file
... ...
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aukey.example.mapper.fbaStock.AmazonOrderItemMapper">
<resultMap id="BaseResultMap" type="com.aukey.example.entity.fbaStock.AmazonOrderItem">
<!--
WARNING - @mbg.generated
-->
<id column="aid" jdbcType="INTEGER" property="aid" />
<result column="amazon_order_id" jdbcType="VARCHAR" property="amazonOrderId" />
<result column="asin" jdbcType="VARCHAR" property="asin" />
<result column="seller_sku" jdbcType="VARCHAR" property="sellerSku" />
<result column="order_item_id" jdbcType="VARCHAR" property="orderItemId" />
<result column="quantity_ordered" jdbcType="INTEGER" property="quantityOrdered" />
<result column="quantity_shipped" jdbcType="INTEGER" property="quantityShipped" />
<result column="points_number" jdbcType="INTEGER" property="pointsNumber" />
<result column="points_currency_code" jdbcType="VARCHAR" property="pointsCurrencyCode" />
<result column="points_amount" jdbcType="DOUBLE" property="pointsAmount" />
<result column="item_price_currency_code" jdbcType="VARCHAR" property="itemPriceCurrencyCode" />
<result column="item_price_amount" jdbcType="DOUBLE" property="itemPriceAmount" />
<result column="shipping_price_currency_code" jdbcType="VARCHAR" property="shippingPriceCurrencyCode" />
<result column="shipping_price_amount" jdbcType="DOUBLE" property="shippingPriceAmount" />
<result column="gift_wrap_price_currency_code" jdbcType="VARCHAR" property="giftWrapPriceCurrencyCode" />
<result column="gift_wrap_price_amount" jdbcType="DOUBLE" property="giftWrapPriceAmount" />
<result column="item_tax_currency_code" jdbcType="VARCHAR" property="itemTaxCurrencyCode" />
<result column="item_tax_amount" jdbcType="DOUBLE" property="itemTaxAmount" />
<result column="shipping_tax_currency_code" jdbcType="VARCHAR" property="shippingTaxCurrencyCode" />
<result column="shipping_tax_amount" jdbcType="DOUBLE" property="shippingTaxAmount" />
<result column="gift_wrap_tax_currency_code" jdbcType="VARCHAR" property="giftWrapTaxCurrencyCode" />
<result column="gift_wrap_tax_amount" jdbcType="DOUBLE" property="giftWrapTaxAmount" />
<result column="shipping_discount_currency_code" jdbcType="VARCHAR" property="shippingDiscountCurrencyCode" />
<result column="shipping_discount_amount" jdbcType="DOUBLE" property="shippingDiscountAmount" />
<result column="promotion_discount_currency_code" jdbcType="VARCHAR" property="promotionDiscountCurrencyCode" />
<result column="promotion_discount_amount" jdbcType="DOUBLE" property="promotionDiscountAmount" />
<result column="promotion_ids" jdbcType="VARCHAR" property="promotionIds" />
<result column="cod_fee_currency_code" jdbcType="VARCHAR" property="codFeeCurrencyCode" />
<result column="cod_fee_amount" jdbcType="DOUBLE" property="codFeeAmount" />
<result column="cod_fee_discount_currency_code" jdbcType="VARCHAR" property="codFeeDiscountCurrencyCode" />
<result column="cod_fee_discount_amount" jdbcType="DOUBLE" property="codFeeDiscountAmount" />
<result column="gift_wrap_level" jdbcType="VARCHAR" property="giftWrapLevel" />
<result column="invoice_requirement" jdbcType="VARCHAR" property="invoiceRequirement" />
<result column="buyer_selected_invoice_category" jdbcType="VARCHAR" property="buyerSelectedInvoiceCategory" />
<result column="invoice_title" jdbcType="VARCHAR" property="invoiceTitle" />
<result column="invoice_information" jdbcType="VARCHAR" property="invoiceInformation" />
<result column="condition_note" jdbcType="VARCHAR" property="conditionNote" />
<result column="condition_id" jdbcType="VARCHAR" property="conditionId" />
<result column="condition_subtype_id" jdbcType="VARCHAR" property="conditionSubtypeId" />
<result column="scheduled_delivery_start_date" jdbcType="TIMESTAMP" property="scheduledDeliveryStartDate" />
<result column="scheduled_delivery_end_date" jdbcType="TIMESTAMP" property="scheduledDeliveryEndDate" />
<result column="price_designation" jdbcType="VARCHAR" property="priceDesignation" />
<result column="customized_url" jdbcType="VARCHAR" property="customizedUrl" />
<result column="fulfillment_channel" jdbcType="VARCHAR" property="fulfillmentChannel" />
<result column="account_id" jdbcType="INTEGER" property="accountId" />
<result column="account_code" jdbcType="VARCHAR" property="accountCode" />
<result column="area_id" jdbcType="INTEGER" property="areaId" />
<result column="area" jdbcType="VARCHAR" property="area" />
<result column="create_date" jdbcType="TIMESTAMP" property="createDate" />
<result column="update_date" jdbcType="TIMESTAMP" property="updateDate" />
<result column="order_header_id" jdbcType="INTEGER" property="orderHeaderId" />
<result column="title" jdbcType="LONGVARBINARY" property="titleAlias" />
<result column="gift_message_text" jdbcType="LONGVARBINARY" property="giftMessageText" />
</resultMap>
</mapper>
\ No newline at end of file
... ...
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aukey.example.mapper.fbaStock.AmazonOrderMapper">
<resultMap id="BaseResultMap" type="com.aukey.example.entity.fbaStock.AmazonOrder">
<!--
WARNING - @mbg.generated
-->
<id column="aid" jdbcType="INTEGER" property="aid" />
<result column="amazon_order_id" jdbcType="VARCHAR" property="amazonOrderId" />
<result column="seller_order_id" jdbcType="VARCHAR" property="sellerOrderId" />
<result column="purchase_date" jdbcType="TIMESTAMP" property="purchaseDate" />
<result column="payments_date" jdbcType="TIMESTAMP" property="paymentsDate" />
<result column="last_update_date" jdbcType="TIMESTAMP" property="lastUpdateDate" />
<result column="order_status" jdbcType="VARCHAR" property="orderStatus" />
<result column="fulfillment_channel" jdbcType="VARCHAR" property="fulfillmentChannel" />
<result column="sales_channel" jdbcType="VARCHAR" property="salesChannel" />
<result column="ship_service_level" jdbcType="VARCHAR" property="shipServiceLevel" />
<result column="shipping_address_city" jdbcType="VARCHAR" property="shippingAddressCity" />
<result column="shipping_address_county" jdbcType="VARCHAR" property="shippingAddressCounty" />
<result column="shipping_address_district" jdbcType="VARCHAR" property="shippingAddressDistrict" />
<result column="shipping_address_stateOrRegion" jdbcType="VARCHAR" property="shippingAddressStateorregion" />
<result column="shipping_address_postalCode" jdbcType="VARCHAR" property="shippingAddressPostalcode" />
<result column="shipping_address_countryCode" jdbcType="VARCHAR" property="shippingAddressCountrycode" />
<result column="shipping_address_phone" jdbcType="VARCHAR" property="shippingAddressPhone" />
<result column="order_total_currency_code" jdbcType="VARCHAR" property="orderTotalCurrencyCode" />
<result column="order_total_amount" jdbcType="DOUBLE" property="orderTotalAmount" />
<result column="postage_total" jdbcType="DOUBLE" property="postageTotal" />
<result column="discount_total" jdbcType="DOUBLE" property="discountTotal" />
<result column="number_of_items_shipped" jdbcType="INTEGER" property="numberOfItemsShipped" />
<result column="number_of_items_unshipped" jdbcType="INTEGER" property="numberOfItemsUnshipped" />
<result column="payment_method" jdbcType="VARCHAR" property="paymentMethod" />
<result column="marketplace_id" jdbcType="VARCHAR" property="marketplaceId" />
<result column="buyer_email" jdbcType="VARCHAR" property="buyerEmail" />
<result column="buyer_name" jdbcType="VARCHAR" property="buyerName" />
<result column="shipment_service_level_category" jdbcType="VARCHAR" property="shipmentServiceLevelCategory" />
<result column="shipped_by_amazon_tfm" jdbcType="CHAR" property="shippedByAmazonTfm" />
<result column="tfm_shipment_status" jdbcType="VARCHAR" property="tfmShipmentStatus" />
<result column="cba_displayable_shipping_label" jdbcType="VARCHAR" property="cbaDisplayableShippingLabel" />
<result column="order_type" jdbcType="VARCHAR" property="orderType" />
<result column="earliest_ship_date" jdbcType="TIMESTAMP" property="earliestShipDate" />
<result column="latest_ship_date" jdbcType="TIMESTAMP" property="latestShipDate" />
<result column="earliest_delivery_date" jdbcType="TIMESTAMP" property="earliestDeliveryDate" />
<result column="latest_delivery_date" jdbcType="TIMESTAMP" property="latestDeliveryDate" />
<result column="is_business_order" jdbcType="CHAR" property="isBusinessOrder" />
<result column="purchase_order_number" jdbcType="VARCHAR" property="purchaseOrderNumber" />
<result column="is_prime" jdbcType="CHAR" property="isPrime" />
<result column="is_premium_order" jdbcType="CHAR" property="isPremiumOrder" />
<result column="is_import_ed" jdbcType="CHAR" property="isImportEd" />
<result column="import_ed_date" jdbcType="TIMESTAMP" property="importEdDate" />
<result column="import_sys_date" jdbcType="TIMESTAMP" property="importSysDate" />
<result column="account_id" jdbcType="INTEGER" property="accountId" />
<result column="account_code" jdbcType="VARCHAR" property="accountCode" />
<result column="area_id" jdbcType="INTEGER" property="areaId" />
<result column="area" jdbcType="VARCHAR" property="area" />
<result column="create_date" jdbcType="TIMESTAMP" property="createDate" />
<result column="update_date" jdbcType="TIMESTAMP" property="updateDate" />
<result column="order_item_status" jdbcType="VARCHAR" property="orderItemStatus" />
<result column="shipping_address_name" jdbcType="BLOB" property="shippingAddressName" />
<result column="shipping_address_line1" jdbcType="BLOB" property="shippingAddressLine1" />
<result column="shipping_address_line2" jdbcType="BLOB" property="shippingAddressLine2" />
<result column="shipping_address_line3" jdbcType="BLOB" property="shippingAddressLine3" />
</resultMap>
</mapper>
\ No newline at end of file
... ...