大纲
1.商品消息处理系统
(1)商品消息处理系统架构设计
(2)商品中心内部需消费的消息
(3)商品中心外部需消费的消息
(4)消息处理相关表
(5)消息处理流程设计
2.消费和处理binlog消息
(1)消费Canal发送到MQ的binlog消息
(2)解析MQ消息字符串成binlog对象
(3)处理binlog消息
(4)自研缓存组件Redis + DB读写实现
(5)将binlog对象封装为数据变更对象
(6)根据数据变更对象组装成内部消息对象并发送
(7)如果存在外部消息配置则保存数据变更对象详情
(8)消息编号的监听处理 + 发送外部消息
1.商品消息处理系统
(1)商品消息处理系统架构设计
(2)商品中心内部需要消费的消息
(3)商品中心外部需要消费的消息
(4)消息处理相关表
(5)消息处理流程设计
(1)商品消息处理系统架构设计
(2)商品系统内部需要消费的消息
一.item_info表变更
topic:data_change_topic
MQ消息体:- {
- "action": "UPDATE",//执⾏发送消息的触发动作:INSERT、UPDATE、DELETE
- "table": "item_info",//更新表的表名
- "updateColumn": [
- "item_status"//item_info表item_status变更
- ],
- "data": [{
- "column": "id",//item_info表主键id
- "value": 1
- }, {
- "column": "itemId",//item_info表item_id
- "value": "100000476748"
- }]
- }
复制代码 二.sku_info表变更
topic:data_change_topic
MQ消息体:- {
- "action": "UPDATE",//执⾏发送消息的触发动作:INSERT、UPDATE、DELETE
- "table": "sku_info",//更新表的表名
- "updateColumn": [
- "sku_name"//sku_info表sku_name变更
- ],
- "data": [{
- "column": "id",//sku_info表主键id
- "value": 1
- }, {
- "column": "itemId",//sku_info表item_id
- "value": "100000476748"
- }, {
- "column": "skuId",//sku_info表sku_id
- "value": "8000476872"
- }]
- }
复制代码 三.attribute_extend表变更
topic:data_change_topic
MQ消息体:- {
- "action": "INSERT",//执⾏发送消息的触发动作:INSERT、UPDATE、DELETE
- "table": "attribute_extend",//更新表的表名
- "data": [{
- "column": "id",//attribute_extend表主键id
- "value": 1
- }]
- }
复制代码 四.front_category_relation表变更
topic:data_change_topic
MQ消息体:- {
- "action": "INSERT",//执⾏发送消息的触发动作:INSERT、UPDATE、DELETE
- "table": "front_category_relation",//更新表的表名
- "data": [{
- "column": "id",//front_category_relation表主键id
- "value": 1
- }]
- }
复制代码 五.sku_seller_relation表变更
topic:data_change_topic
MQ消息体:- {
- "action": "INSERT",//执⾏发送消息的触发动作:INSERT、UPDATE、DELETE
- "table": "sku_seller_relation",//更新表的表名
- "data": [{
- "column": "id",//sku_seller_relation表主键id
- "value": 1
- }]
- }
复制代码 六.item_period_stage表变更
topic:interior_item_expri_result_topic
MQ消息体:- {
- "action": "update",//执⾏发送消息的触发动作:INSERT、UPDATE、DELETE
- "table": "item_period_stage",//更新表的表名
- "data": [{
- "column": "period_stage",//sku_seller_relation表主键id
- "value": 2
- }]
- }
复制代码 (3)商品系统外部需要消费的消息
新建或编辑商品发送MQ消息
topic:open_product_topic
MQ消息体:- {
- "action": "INSERT", //执⾏发送消息的触发动作:INSERT、UPDATE、DELETE
- "data": {
- "itemId": "100000476748",
- "itemStatus": 3,
- "skuId": "8000476872"
- }
- }
复制代码 (4)消息处理相关表
一.数据变更监听表
数据变更监听表中配置哪些表需要监听。- create table data_change_listen_config (
- id int unsigned auto_increment comment '主键' primary key,
- table_name varchar(40) null comment '数据表名称',
- key_column varchar(40) default 'id' null comment '数据表对应的主键或业务id列名',
- filter_flag tinyint(1) null comment '是否过滤',
- del_flag tinyint(1) null comment '是否删除',
- create_user int null comment '创建⼈',
- create_time datetime null comment '创建时间',
- update_user int null comment '更新⼈',
- update_time datetime null comment '更新时间'
- ) comment '数据变更监听表';//一个表只有一条记录
复制代码 二.监听表变化字段配置表
监听表变化字段配置表中配置哪些字段变更需要监听,只要满⾜其中配置的有⼀个字段值变更,就需要发送内部消息通知订阅⽅。- create table data_change_column_config (
- id int unsigned auto_increment primary key,
- listen_id int null comment '监听表id',
- listen_column varchar(40) null comment '监听字段',
- del_flag tinyint(1) null comment '删除标记',
- create_user int null comment '创建⼈',
- create_time datetime null comment '创建时间',
- update_user int null comment '更新⼈',
- update_time datetime null comment '更新时间'
- ) comment '监听表变化字段配置表';
复制代码 三.监听表消息模型表
监听表消息模型表中配置的是数据变更后,消息是内部消息还是外部消息,消息发送的topic,消息延迟等级,消息需要通知的字段。- create table data_change_message_config (
- id int unsigned null comment '主键',
- listen_id int null comment '监听表id',
- notify_column varchar(2000) null comment '变更通知字段,逗号分隔',
- message_topic varchar(256) null comment '变更通知消息主题',
- delay_level int null comment '延迟等级',
- message_type tinyint(3) null comment '消息类型',
- del_flag tinyint(1) null comment '删除标记',
- create_user int null comment '创建⼈',
- create_time datetime null comment '创建时间',
- update_user int null comment '更新⼈',
- update_time datetime null comment '更新时间'
- ) comment '监听表消息模型表';
复制代码 四.外部消息记录表
外部消息记录表中记录的是内部消息发送后⽣成的消息编号,通过回调消息查找外部消息的消息内容。- create table data_message_detail (
- id int unsigned auto_increment comment '主键' primary key,
- message_no varchar(64) null comment '消息编号',
- table_data_json text null comment '变化的表信息内容',
- diff_data_arr varchar(2000) null comment '消息变化字段数组,多个,分割',
- table_name varchar(64) null comment '更新表的表名 ',
- action varchar(64) null comment '执⾏发送消息的触发动作:INSERT、UPDATE、DELETE ',
- del_flag tinyint(1) null comment '删除标记',
- create_user int null comment '创建⼈',
- create_time datetime null comment '创建时间',
- update_user int null comment '更新⼈',
- update_time datetime null comment '更新时间'
- ) comment '外部消息记录表';
复制代码 (5)消息处理流程设计
2.消费和处理binlog消息
(1)消费Canal发送到MQ的binlog消息
(2)解析MQ消息字符串成binlog对象
(3)处理binlog消息
(4)自研缓存组件Redis + DB读写实现
(5)将binlog对象封装为数据变更对象
(6)根据数据变更对象组装成内部消息对象并发送
(7)如果存在外部消息配置则保存数据变更对象详情
(8)消息编号的监听处理 + 发送外部消息
(1)消费Canal发送到MQ的binlog消息- @Configuration
- public class ConsumerBeanConfig {
- //配置内容对象
- @Autowired
- private RocketMQProperties rocketMQProperties;
-
- //消费系统内部消息——处理binlog消息
- @Bean("dataChangeTopic")
- public DefaultMQPushConsumer createItemStageConsumer(DataChangeListener dataChangeListener) throws MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.DATA_CHANGE_CONSUMER_GROUP);
- consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
- consumer.subscribe(RocketMqConstant.DATA_CHANGE_TOPIC, "*");
- consumer.registerMessageListener(dataChangeListener);
- consumer.start();
- return consumer;
- }
- ...
- }
- @Component
- public class DataChangeListener implements MessageListenerConcurrently {
- @Autowired
- private MessageService messageService;
-
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
- try {
- for (MessageExt messageExt : list) {
- String msg = new String(messageExt.getBody());
- log.info("数据变更消息通知,消息内容:{}", msg);
-
- //获取binlog对象
- BinlogData binlogData = BinlogUtils.getBinlogData(msg);
- if (Objects.isNull(binlogData) || Objects.isNull(binlogData.getDataMap())) {
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
-
- //操作类型不是insert,delete,update的,不作处理
- String operateType = binlogData.getOperateType();
- if (!BinlogType.INSERT.getValue().equals(operateType)
- && !BinlogType.DELETE.getValue().equals(operateType)
- && !BinlogType.UPDATE.getValue().equals(operateType)) {
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- //处理binlog消息
- messageService.processBinlogMessage(binlogData);
- }
- } catch (Exception e) {
- log.error("consume error, 消费数据变更消息失败", e);
- //本次消费失败,下次重新消费
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- }
复制代码 (2)解析MQ消息字符串成binlog对象- //MySQL的binlog对象
- @Data
- public class BinlogData implements Serializable {
- //binlog对应的表名
- private String tableName;
- //操作时间
- private Long operateTime;
- //操作类型
- private String operateType;
- //data节点转换成的Map,key对应的是bean里的属性名,value一律为字符串
- private List<Map<String, Object>> dataMap;
- //data节点转换成的Map,key对应的是bean里的属性名,value一律为字符串
- private List<Map<String, Object>> oldMap;
- }
- //MySQL binlog解析工具类
- public abstract class BinlogUtils {
- //解析binlog json字符串
- public static BinlogData getBinlogData(String binlogStr) {
- //isJson方法里面会判断字符串是不是为空,所以这里不需要重复判断
- if (JSONUtil.isJson(binlogStr)) {
- JSONObject binlogJson = JSONUtil.parseObj(binlogStr);
- //不处理DDL的binlog,只处理数据变更
- if (binlogJson.getBool("isDdl")) {
- return null;
- }
-
- BinlogData binlogData = new BinlogData();
- //表名
- String tableName = binlogJson.getStr("table");
- binlogData.setTableName(tableName);
- //操作类型
- String operateType = binlogJson.getStr("type");
- binlogData.setOperateType(operateType);
- //操作时间
- Long operateTime = binlogJson.getLong("ts");
- binlogData.setOperateTime(operateTime);
-
- //data数据
- JSONArray dataArray = binlogJson.getJSONArray("data");
- List<Map<String, Object>> dataMap = jsonArrayToMapList(dataArray);
- binlogData.setDataMap(dataMap);
- if (!binlogJson.isNull("old")) {
- //old数据
- JSONArray oldArray = binlogJson.getJSONArray("old");
- List<Map<String, Object>> oldMap = jsonArrayToMapList(oldArray);
- binlogData.setOldMap(oldMap);
- }
- return binlogData;
- }
- return null;
- }
-
- private static List<Map<String, Object>> jsonArrayToMapList(JSONArray jsonArray) {
- if (null != jsonArray) {
- Iterable<JSONObject> arrayIterator = jsonArray.jsonIter();
- //遍历data节点或old节点并返回Map
- if (null != arrayIterator) {
- //binlog的data数组或old数组里数据的类型为Map
- List<Map<String, Object>> dataMap = new ArrayList<>();
- while (arrayIterator.iterator().hasNext()) {
- JSONObject jsonObject = arrayIterator.iterator().next();
- Map<String, Object> data = new HashMap<>(jsonObject.size());
- jsonObject.keySet().forEach(key -> {
- data.put(key, jsonObject.get(key));
- });
- dataMap.add(data);
- }
- return dataMap;
- }
- }
- return null;
- }
- }
复制代码 (3)处理binlog消息
步骤一:通过缓存组件获取当前表的配置监听信息
步骤二:将Binlog对象封装为数据变更对象
步骤三:通过缓存组件获取配置的消息模型对象
步骤四:组装需要发送的数据变更消息对象
步骤五:发送内部消息- @Service
- public class MessageServiceImpl implements MessageService {
- ...
- //处理binlog消息
- @Override
- public void processBinlogMessage(BinlogData binlogData) {
- //1.通过缓存组件获取当前表的配置监听信息
- DataChangeListenConfigDO listenConfigDO = dataChangeRepository.getListenConfigByTable(binlogData.getTableName());
- //未配置监听信息的表,不作处理
- if (Objects.isNull(listenConfigDO)) {
- return;
- }
-
- //2.将binlog对象封装为数据变更对象
- List<DataChangeMessage> dataChangeMessages = getDataChangeMessage(binlogData, listenConfigDO);
- //不需要监听,或者要监听的字段值未变动
- if (CollectionUtils.isEmpty(dataChangeMessages)) {
- return;
- }
-
- //3.通过缓存组件获取配置的消息模型对象
- List<DataChangeMessageConfigBO> messageConfigBOS = dataChangeRepository.getMessageConfigBOByListenId(listenConfigDO.getId());
- //不需要发送消息
- if (CollectionUtils.isEmpty(messageConfigBOS)) {
- return;
- }
-
- //4.组装需要发送的数据变更消息对象
- List<DataSendMessageBO> sendDataMessageList = getInternalSendDataMessage(dataChangeMessages, binlogData.getDataMap(), messageConfigBOS);
- //待发送的消息为空,无需处理
- if (CollectionUtils.isEmpty(sendDataMessageList)) {
- return;
- }
-
- //5.发送内部消息
- sendDataMessage(sendDataMessageList);
-
- //6.如果存在外部消息配置则保存数据变更对象详情
- if (messageConfigBOS.stream().anyMatch(messageConfigBO -> MessageTypeEnum.EXTERNAL_MESSAGE.getCode().equals(messageConfigBO.getMessageType()))) {
- //保存外部消息详细信息
- saveDataMessageDetail(dataChangeMessages, binlogData);
- }
- }
- ...
- }
复制代码 (4)商品系统自研缓存组件Redis + DB读写实现
一.通过缓存组件获取监听的配置信息
二.缓存组件RedisReadWriteManager的实现
一.通过缓存组件获取监听的配置信息- @Repository
- public class DataChangeRepository {
- @Resource
- private RedisReadWriteManager redisReadWriteManager;
- ...
-
- //根据表名获取监听配置信息
- public DataChangeListenConfigDO getListenConfigByTable(String tableName) {
- Optional<DataChangeListenConfigDO> optional = redisReadWriteManager.getRedisStringDataByCache(
- tableName,
- DataChangeListenConfigDO.class,
- AbstractRedisKeyConstants::getListenConfigStringKey,
- this::getListenConfigByTableFromDB
- );
- return optional.orElse(null);
- }
-
- //获取监听变更字段配置表信息
- public List<DataChangeColumnConfigDO> getColumnConfigByListenId(Long id) {
- Optional<List<DataChangeColumnConfigDO>> optional = redisReadWriteManager.listRedisStringDataByCache(
- id,
- DataChangeColumnConfigDO.class,
- AbstractRedisKeyConstants::getColumnConfigStringKey,
- this::getColumnConfigByListenIdFromDB
- );
- return optional.orElse(null);
- }
-
- //获取监听表消息模型配置
- public List<DataChangeMessageConfigDO> getMessageConfigByListenId(Long id, MessageTypeEnum messageTypeEnum) {
- //获取监听表消息模型配置
- Optional<List<DataChangeMessageConfigDO>> optional = redisReadWriteManager.listRedisStringDataByCache(
- id,
- DataChangeMessageConfigDO.class,
- AbstractRedisKeyConstants::getMessageConfigStringKey,
- this::getMessageConfigByListenIdFromDB
- );
-
- //如果未指定是内部消息还是外部消息,则不需要过滤
- if (Objects.isNull(messageTypeEnum)) {
- return optional.orElse(null);
- }
- return optional.map(dataChangeMessageConfigDOS -> dataChangeMessageConfigDOS.stream()
- .filter(messageConfigBO -> messageTypeEnum.getCode().equals(messageConfigBO.getMessageType()))
- .collect(Collectors.toList())).orElse(null);
- }
-
- //根据表名获取监听配置信息
- public Optional<DataChangeListenConfigDO> getListenConfigByTableFromDB(String tableName) {
- LambdaQueryWrapper<DataChangeListenConfigDO> queryWrapper = Wrappers.lambdaQuery();
- queryWrapper.eq(DataChangeListenConfigDO::getTableName, tableName).eq(BaseEntity::getDelFlag, DelFlagEnum.EFFECTIVE.getCode());
- DataChangeListenConfigDO listenConfigDO = dataChangeListenConfigMapper.selectOne(queryWrapper);
- return Objects.isNull(listenConfigDO) ? Optional.empty() : Optional.of(listenConfigDO);
- }
-
- //获取监听变更字段配置表信息
- public Optional<List<DataChangeColumnConfigDO>> getColumnConfigByListenIdFromDB(Long id) {
- LambdaQueryWrapper<DataChangeColumnConfigDO> queryWrapper = Wrappers.lambdaQuery();
- queryWrapper.eq(DataChangeColumnConfigDO::getListenId, id).eq(BaseEntity::getDelFlag, DelFlagEnum.EFFECTIVE.getCode());
- List<DataChangeColumnConfigDO> columnConfigDOS = dataChangeColumnConfigMapper.selectList(queryWrapper);
- return CollectionUtils.isEmpty(columnConfigDOS) ? Optional.empty() : Optional.of(columnConfigDOS);
- }
-
- //查询数据变更对象列表
- public Optional<List<DataChangeMessageConfigDO>> getMessageConfigByListenIdFromDB(Long id) {
- LambdaQueryWrapper<DataChangeMessageConfigDO> queryWrapper = Wrappers.lambdaQuery();
- queryWrapper.eq(DataChangeMessageConfigDO::getListenId, id).eq(BaseEntity::getDelFlag, DelFlagEnum.EFFECTIVE.getCode());
- List<DataChangeMessageConfigDO> messageConfigDOS = dataChangeMessageConfigMapper.selectList(queryWrapper);
- return CollectionUtils.isEmpty(messageConfigDOS) ? Optional.empty() : Optional.of(messageConfigDOS);
- }
- ...
- }
复制代码 二.缓存组件RedisReadWriteManager的实现- //缓存读写管理
- @Service
- public class RedisReadWriteManager {
- @Resource
- private RedisCache redisCache;
-
- @Resource
- private RedisLock redisLock;
-
- //批量获取缓存数据
- //@param key 关键字列表
- //@param clazz 需要将缓存JSON转换的对象
- //@param getRedisKeyFunction 获取Redis key的方法
- //@param getDbFunction 获取数据源对象的方法
- //@return java.util.Optional<java.util.List<T>>
- public <T> Optional<List<T>> listRedisStringDataByCache(Long key, Class<T> clazz, Function<Long, String> getRedisKeyFunction, Function<Long, Optional<List<T>>> getDbFunction) {
- try {
- String redisKey = getRedisKeyFunction.apply(key);
- //过滤无效缓存
- String cache = redisCache.get(redisKey);
- if (EMPTY_OBJECT_STRING.equals(cache)) {
- return Optional.empty();
- }
- if (StringUtils.isNotBlank(cache)) {
- List<T> list = JSON.parseArray(cache, clazz);
- return Optional.of(list);
- }
- //缓存没有则读库
- return listRedisStringDataByDb(key, getRedisKeyFunction, getDbFunction);
- } catch (Exception e) {
- log.error("获取缓存数据异常 key={},clazz={}", key, clazz, e);
- throw e;
- }
- }
-
- //读取数据库表数据赋值到Redis
- public <T> Optional<List<T>> listRedisStringDataByDb(Long key, Function<Long, String> getRedisKeyFunction, Function<Long, Optional<List<T>>> getDbFunction) {
- if (Objects.isNull(key) || Objects.isNull(getDbFunction)) {
- return Optional.empty();
- }
- try {
- if (!redisLock.lock(String.valueOf(key))) {
- return Optional.empty();
- }
- String redisKey = getRedisKeyFunction.apply(key);
- Optional<List<T>> optional = getDbFunction.apply(key);
- putCacheString(redisKey, optional);
- return optional;
- } finally {
- redisLock.unlock(String.valueOf(key));
- }
- }
-
- private void putCacheString(String redisKey, Optional optional) {
- if (!optional.isPresent()) {
- //把空对象暂存到Redis
- redisCache.setex(redisKey, EMPTY_OBJECT_STRING, RedisKeyUtils.redisKeyRandomTime(INT_EXPIRED_ONE_DAY, TimeUnit.HOURS, NUMBER_24));
- log.warn("发生缓存穿透 redisKey={}", redisKey);
- return;
- }
- //把表数据对象存到Redis
- redisCache.setex(redisKey, JSON.toJSONString(optional.get()), RedisKeyUtils.redisKeyRandomTime(INT_EXPIRED_SEVEN_DAYS));
- log.info("表数据对象存到Redis redisKey={}, data={}", redisKey, JSON.toJSONString(optional.get()));
- }
-
- //批量获取缓存数据
- //@param key 关键字列表
- //@param clazz 需要将缓存JSON转换的对象
- //@param getRedisKeyFunction 获取redis key的方法
- //@param getDbFunction 获取数据源对象的方法
- //@return java.util.Optional<java.util.List < T>>
- public <T> Optional<T> getRedisStringDataByCache(String key, Class<T> clazz, Function<String, String> getRedisKeyFunction, Function<String, Optional<T>> getDbFunction) {
- try {
- String redisKey = getRedisKeyFunction.apply(key);
- String cache = redisCache.get(redisKey);
- //过滤无效缓存
- if (EMPTY_OBJECT_STRING.equals(cache)) {
- return Optional.empty();
- }
- if (StringUtils.isNotBlank(cache)) {
- T t = JSON.parseObject(cache, clazz);
- return Optional.of(t);
- }
- //缓存没有则读库
- return getRedisStringDataByDb(key, getRedisKeyFunction, getDbFunction);
- } catch (Exception e) {
- log.error("获取缓存数据异常 key={},clazz={}", key, clazz, e);
- throw e;
- }
- }
-
- //读取数据库表数据赋值到Redis
- public <T> Optional<T> getRedisStringDataByDb(String key, Function<String, String> getRedisKeyFunction, Function<String, Optional<T>> getDbFunction) {
- if (StringUtils.isBlank(key) || Objects.isNull(getDbFunction)) {
- return Optional.empty();
- }
- try {
- if (!redisLock.lock(key)) {
- return Optional.empty();
- }
- String redisKey = getRedisKeyFunction.apply(key);
- Optional<T> optional = getDbFunction.apply(key);
- putCacheString(redisKey, optional);
- return optional;
- } finally {
- redisLock.unlock(key);
- }
- }
- }
- @Component
- public class RedisCache {
- private RedisTemplate redisTemplate;
-
- public RedisCache(RedisTemplate redisTemplate) {
- this.redisTemplate = redisTemplate;
- }
- ...
-
- //缓存获取
- public String get(String key) {
- ValueOperations<String, String> vo = redisTemplate.opsForValue();
- return vo.get(key);
- }
-
- //缓存存储并设置过期时间
- public void setex(String key, String value, long time) {
- redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
- }
- ...
- }
复制代码 (5)将binlog对象封装为数据变更对象- @Service
- public class MessageServiceImpl implements MessageService {
- ...
- //处理binlog消息
- @Override
- public void processBinlogMessage(BinlogData binlogData) {
- //1.通过缓存组件获取当前表的配置监听信息
- DataChangeListenConfigDO listenConfigDO = dataChangeRepository.getListenConfigByTable(binlogData.getTableName());
- //未配置监听信息的表,不作处理
- if (Objects.isNull(listenConfigDO)) {
- return;
- }
-
- //2.将binlog对象封装为数据变更对象
- List<DataChangeMessage> dataChangeMessages = getDataChangeMessage(binlogData, listenConfigDO);
- //不需要监听,或者要监听的字段值未变动
- if (CollectionUtils.isEmpty(dataChangeMessages)) {
- return;
- }
-
- //3.通过缓存组件获取配置的消息模型对象
- List<DataChangeMessageConfigBO> messageConfigBOS = dataChangeRepository.getMessageConfigBOByListenId(listenConfigDO.getId());
- //不需要发送消息
- if (CollectionUtils.isEmpty(messageConfigBOS)) {
- return;
- }
-
- //4.组装需要发送的数据变更消息对象
- List<DataSendMessageBO> sendDataMessageList = getInternalSendDataMessage(dataChangeMessages, binlogData.getDataMap(), messageConfigBOS);
- //待发送的消息为空,无需处理
- if (CollectionUtils.isEmpty(sendDataMessageList)) {
- return;
- }
-
- //5.发送内部消息
- sendDataMessage(sendDataMessageList);
-
- //6.如果存在外部消息配置则保存数据变更消息对象详情
- if (messageConfigBOS.stream().anyMatch(messageConfigBO -> MessageTypeEnum.EXTERNAL_MESSAGE.getCode().equals(messageConfigBO.getMessageType()))) {
- //保存外部消息详细信息
- saveDataMessageDetail(dataChangeMessages, binlogData);
- }
- }
-
- //将binlog对象封装为数据变更对象
- public List<DataChangeMessage> getDataChangeMessage(BinlogData binlogData, DataChangeListenConfigDO listenConfigDO) {
- //获取监听变更字段配置表信息
- List<DataChangeColumnConfigDO> columnConfigDOS = dataChangeRepository.getColumnConfigByListenId(listenConfigDO.getId());
- //要监听的字段为空,不作处理
- if (CollectionUtils.isEmpty(columnConfigDOS)) {
- return null;
- }
- //封装数据变更对象
- return buildChangeColumn(binlogData, columnConfigDOS, listenConfigDO);
- }
-
- //封装数据变更对象
- private List<DataChangeMessage> buildChangeColumn(BinlogData binlogData, List<DataChangeColumnConfigDO> columnConfigDOS, DataChangeListenConfigDO listenConfigDO) {
- List<DataChangeMessage> dataChangeMessages = new ArrayList<>();
- //操作类型
- String operateType = binlogData.getOperateType();
- for (int i = 0; i < binlogData.getDataMap().size(); i++) {
- Map<String, Object> data = binlogData.getDataMap().get(i);//新值
- if (BinlogType.INSERT.getValue().equals(operateType) || BinlogType.DELETE.getValue().equals(operateType)) {
- //如果是新增或者删除,则所有监听字段都变更
- List<String> updateColumns = columnConfigDOS.stream().map(DataChangeColumnConfigDO::getListenColumn).collect(Collectors.toList());
- DataChangeMessage dataChangeMessage = buildDataChangeMessage(binlogData, updateColumns, data.get(listenConfigDO.getKeyColumn()));
- dataChangeMessages.add(dataChangeMessage);
- } else {
- Map<String, Object> old = binlogData.getOldMap().get(i);//旧值
- List<String> updateColumns = new ArrayList<>();
- for (DataChangeColumnConfigDO columnConfigDO : columnConfigDOS) {
- String column = columnConfigDO.getListenColumn();
- Object columnOldValue = old.get(column);
- //旧的字段值有数据,就表示该字段变更了,添加至修改的字段集合
- if (!Objects.isNull(columnOldValue)) {
- updateColumns.add(column);
- }
- }
- //监听的字段有数据变更
- if (!CollectionUtils.isEmpty(updateColumns)) {
- DataChangeMessage dataChangeMessage = buildDataChangeMessage(binlogData, updateColumns, data.get(listenConfigDO.getKeyColumn()));
- dataChangeMessages.add(dataChangeMessage);
- }
- }
- }
- return dataChangeMessages;
- }
-
- //构建数据变更对象
- private DataChangeMessage buildDataChangeMessage(BinlogData binlogData, List<String> updateColumns, Object keyId) {
- DataChangeMessage dataChangeMessage = new DataChangeMessage(binlogData.getOperateType(), binlogData.getTableName(), updateColumns);
- dataChangeMessage.setMessageNo(SnowflakeIdWorker.getCode());//雪花算法设置消息编号
- dataChangeMessage.setKeyId(keyId);
- return dataChangeMessage;
- }
- ...
- }
- //数据变更对象
- @Data
- public class DataChangeMessage implements Serializable {
- //内部消息编号
- private String messageNo;
- //操作行为,INSERT、UPDATE、DELETE
- private String action;
- //表名
- private String tableName;
- //主键或业务id
- private Object keyId;
- //变更的列
- private List<String> updateColumns;
- //唯一确定当前数据的字段以及字段值
- private List<ColumnValue> columnValues;
- //消息处理成功之后的回调topic
- private String callbackTopic = RocketMqConstant.DATA_EXTERNAL_CHANGE_TOPIC;
-
- @Data
- @NoArgsConstructor
- @AllArgsConstructor
- public static class ColumnValue {
- private String column;//列
- private Object value;//值
- }
-
- public DataChangeMessage(String operateType, String tableName, List<String> updateColumns) {
- this.action = operateType;
- this.tableName = tableName;
- this.updateColumns = updateColumns;
- }
- }
复制代码 (6)根据数据变更对象组装成内部消息对象并发送- @Service
- public class MessageServiceImpl implements MessageService {
- ...
- //组装需要发送的数据变更消息对象
- public List<DataSendMessageBO> getInternalSendDataMessage(List<DataChangeMessage> dataChangeMessages, List<Map<String, Object>> dataMap, List<DataChangeMessageConfigBO> dataChangeMessageConfigBOS) {
- List<DataSendMessageBO> dataSendMessageBOS = new ArrayList<>();
- for (DataChangeMessageConfigBO messageConfigBO : dataChangeMessageConfigBOS) {
- //不是内部消息的不处理
- if (!MessageTypeEnum.INTERNAL_MESSAGE.getCode().equals(messageConfigBO.getMessageType())) {
- continue;
- }
- String notifyColumn = messageConfigBO.getNotifyColumn();
- String[] columns = notifyColumn.split(CoreConstant.COMMA);
- for (int i = 0; i < dataChangeMessages.size(); i++) {
- DataChangeMessage dataChangeMessage = dataChangeMessages.get(i);
- List<DataChangeMessage.ColumnValue> columnValues = new ArrayList<>();
- dataChangeMessage.setColumnValues(columnValues);
- Map<String, Object> data = dataMap.get(i);
- for (String column : columns) {
- columnValues.add(new DataChangeMessage.ColumnValue(column, data.get(column)));
- }
- dataSendMessageBOS.add(new DataSendMessageBO(messageConfigBO, dataChangeMessage));
- }
- }
- return dataSendMessageBOS;
- }
-
- //发送内部消息
- private void sendDataMessage(List<DataSendMessageBO> sendDataMessageList) {
- for (DataSendMessageBO dataChangeMessage : sendDataMessageList) {
- DataChangeMessage dataMessage = dataChangeMessage.getDataChangeMessage();
- DataChangeMessageConfigBO dataChangeMessageConfigBO = dataChangeMessage.getDataChangeMessageConfigBO();
- //发送一个延迟队列的消息出去
- dataMessageProducer.send(dataMessage, dataChangeMessageConfigBO.getMessageTopic(), dataChangeMessageConfigBO.getDelayLevel());
- }
- }
- ...
- }
复制代码 (7)如果存在外部消息配置则保存数据变更对象详情- //消息业务实现类
- @Service
- public class MessageServiceImpl implements MessageService {
- @Resource
- private DataChangeRepository dataChangeRepository;
- ...
-
- //处理binlog消息
- @Override
- public void processBinlogMessage(BinlogData binlogData) {
- //获取当前表的监听信息
- DataChangeListenConfigDO listenConfigDO = dataChangeRepository.getListenConfigByTable(binlogData.getTableName());
- //未配置监听信息的表,不作处理
- if (Objects.isNull(listenConfigDO)) {
- return;
- }
-
- //获取数据变更对象列表,也就是将一条binlog数据转换成可能多个的数据变更对象
- List<DataChangeMessage> dataChangeMessages = getDataChangeMessage(binlogData, listenConfigDO);
- //不需要监听,或者要监听的字段值未变动
- if (CollectionUtils.isEmpty(dataChangeMessages)) {
- return;
- }
-
- //获取配置的消息对象
- //对这个表的多条数据变更对象会封装成配置的消息对象,然后发送到RocketMQ的topic里
- List<DataChangeMessageConfigBO> messageConfigBOS = dataChangeRepository.getMessageConfigBOByListenId(listenConfigDO.getId());
- //不需要发送消息
- if (CollectionUtils.isEmpty(messageConfigBOS)) {
- return;
- }
-
- //封装成需要发送的消息对象
- //对这个表的多条数据变更对象会封装成配置的消息对象,然后发送到RocketMQ的topic里
- List<DataSendMessageBO> sendDataMessageList = getInternalSendDataMessage(dataChangeMessages, binlogData.getDataMap(), messageConfigBOS);
- //待发送的消息为空,无需处理
- if (CollectionUtils.isEmpty(sendDataMessageList)) {
- return;
- }
-
- //发送消息
- sendDataMessage(sendDataMessageList);
-
- //配置的消息对象列表中,如果包含外部消息类型的消息对象,就需要保存
- if (messageConfigBOS.stream().anyMatch(messageConfigBO -> MessageTypeEnum.EXTERNAL_MESSAGE.getCode().equals(messageConfigBO.getMessageType()))) {
- //保存外部消息详细信息
- saveDataMessageDetail(dataChangeMessages, binlogData);
- }
- }
-
- //保存消息详细信息
- public void saveDataMessageDetail(List<DataChangeMessage> dataChangeMessages, BinlogData binlogData) {
- List<DataMessageBO> dataMessageBOS = converterDataMessageBOList(dataChangeMessages, binlogData);
- dataChangeRepository.saveDataMessageDetail(dataMessageBOS);
- }
-
- //转换消息详细信息
- private List<DataMessageBO> converterDataMessageBOList(List<DataChangeMessage> dataChangeMessages, BinlogData binlogData) {
- List<DataMessageBO> dataMessageBOS = new ArrayList<>(dataChangeMessages.size());
- for (int i = 0; i < dataChangeMessages.size(); i++) {
- DataChangeMessage dataChangeMessage = dataChangeMessages.get(i);
- DataMessageBO dataMessageBO = dataMessageConverter.converterBO(dataChangeMessage);
- dataMessageBO.setDiffDataArr(String.join(CoreConstant.COMMA, dataChangeMessage.getUpdateColumns()));
- dataMessageBO.setTableDataJson(JSON.toJSONString(binlogData.getDataMap().get(i)));
- dataMessageBOS.add(dataMessageBO);
- }
- return dataMessageBOS;
- }
- ...
- }
- @Repository
- public class DataChangeRepository {
- ...
- //存储外部消息的数据信息
- public void saveDataMessageDetail(List<DataMessageBO> dataMessageBOS) {
- List<DataMessageDetailDO> dataMessageDetailDOS = dataMessageConverter.converterDOList(dataMessageBOS);
- int count = dataMessageDetailMapper.insertBatch(dataMessageDetailDOS);
- if (count <= 0) {
- throw new BaseBizException(CommonErrorCodeEnum.SQL_ERROR);
- }
- }
- }
- //外部消息处理对象
- @Data
- public class DataMessageBO implements Serializable {
- //内部消息编号
- private String messageNo;
- //变化的表信息内容
- private String tableDataJson;
- //消息变化字段数组
- private String diffDataArr;
- //表名
- private String tableName;
- //操作类型
- private String action;
- }
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |