找回密码
 立即注册
首页 业界区 业界 商品中心—5.商品消息处理系统的技术文档 ...

商品中心—5.商品消息处理系统的技术文档

都硎唷 前天 08:13
 
大纲
1.商品消息处理系统
(1)商品消息处理系统架构设计
(2)商品中心内部需消费的消息
(3)商品中心外部需消费的消息
(4)消息处理相关表
(5)消息处理流程设计
2.消费和处理binlog消息
(1)消费Canal发送到MQ的binlog消息
(2)解析MQ消息字符串成binlog对象
(3)处理binlog消息
(4)自研缓存组件Redis + DB读写实现
(5)将binlog对象封装为数据变更对象
(6)根据数据变更对象组装成内部消息对象并发送
(7)如果存在外部消息配置则保存数据变更对象详情
(8)消息编号的监听处理 + 发送外部消息
 
1.商品消息处理系统
(1)商品消息处理系统架构设计
(2)商品中心内部需要消费的消息
(3)商品中心外部需要消费的消息
(4)消息处理相关表
(5)消息处理流程设计
 
(1)商品消息处理系统架构设计
1.png
(2)商品系统内部需要消费的消息
一.item_info表变更
topic:data_change_topic
MQ消息体:
  1. {
  2.     "action": "UPDATE",//执⾏发送消息的触发动作:INSERT、UPDATE、DELETE
  3.     "table": "item_info",//更新表的表名
  4.     "updateColumn": [
  5.         "item_status"//item_info表item_status变更
  6.     ],
  7.     "data": [{
  8.         "column": "id",//item_info表主键id
  9.         "value": 1
  10.     }, {
  11.         "column": "itemId",//item_info表item_id
  12.         "value": "100000476748"
  13.     }]
  14. }
复制代码
二.sku_info表变更
topic:data_change_topic
MQ消息体:
  1. {
  2.     "action": "UPDATE",//执⾏发送消息的触发动作:INSERT、UPDATE、DELETE
  3.     "table": "sku_info",//更新表的表名
  4.     "updateColumn": [
  5.         "sku_name"//sku_info表sku_name变更
  6.     ],
  7.     "data": [{
  8.         "column": "id",//sku_info表主键id
  9.         "value": 1
  10.     }, {
  11.         "column": "itemId",//sku_info表item_id
  12.         "value": "100000476748"
  13.     }, {
  14.         "column": "skuId",//sku_info表sku_id
  15.         "value": "8000476872"
  16.     }]
  17. }
复制代码
三.attribute_extend表变更
topic:data_change_topic
MQ消息体:
  1. {
  2.     "action": "INSERT",//执⾏发送消息的触发动作:INSERT、UPDATE、DELETE
  3.     "table": "attribute_extend",//更新表的表名
  4.     "data": [{
  5.         "column": "id",//attribute_extend表主键id
  6.         "value": 1
  7.     }]
  8. }
复制代码
四.front_category_relation表变更
topic:data_change_topic
MQ消息体:
  1. {
  2.     "action": "INSERT",//执⾏发送消息的触发动作:INSERT、UPDATE、DELETE
  3.     "table": "front_category_relation",//更新表的表名
  4.     "data": [{
  5.         "column": "id",//front_category_relation表主键id
  6.         "value": 1
  7.     }]
  8. }
复制代码
五.sku_seller_relation表变更
topic:data_change_topic
MQ消息体:
  1. {
  2.     "action": "INSERT",//执⾏发送消息的触发动作:INSERT、UPDATE、DELETE
  3.     "table": "sku_seller_relation",//更新表的表名
  4.     "data": [{
  5.         "column": "id",//sku_seller_relation表主键id
  6.         "value": 1
  7.     }]
  8. }
复制代码
六.item_period_stage表变更
topic:interior_item_expri_result_topic
MQ消息体:
  1. {
  2.     "action": "update",//执⾏发送消息的触发动作:INSERT、UPDATE、DELETE
  3.     "table": "item_period_stage",//更新表的表名
  4.     "data": [{
  5.         "column": "period_stage",//sku_seller_relation表主键id
  6.         "value": 2
  7.     }]
  8. }
复制代码
(3)商品系统外部需要消费的消息
新建或编辑商品发送MQ消息
topic:open_product_topic
MQ消息体:
  1. {
  2.     "action": "INSERT", //执⾏发送消息的触发动作:INSERT、UPDATE、DELETE
  3.     "data": {
  4.         "itemId": "100000476748",
  5.         "itemStatus": 3,
  6.         "skuId": "8000476872"
  7.     }
  8. }
复制代码
(4)消息处理相关表
一.数据变更监听表
数据变更监听表中配置哪些表需要监听。
  1. create table data_change_listen_config (
  2.     id int unsigned auto_increment comment '主键' primary key,
  3.     table_name varchar(40) null comment '数据表名称',
  4.     key_column varchar(40) default 'id' null comment '数据表对应的主键或业务id列名',
  5.     filter_flag tinyint(1) null comment '是否过滤',
  6.     del_flag tinyint(1) null comment '是否删除',
  7.     create_user int null comment '创建⼈',
  8.     create_time datetime null comment '创建时间',
  9.     update_user int null comment '更新⼈',
  10.     update_time datetime null comment '更新时间'
  11. ) comment '数据变更监听表';//一个表只有一条记录
复制代码
二.监听表变化字段配置表
监听表变化字段配置表中配置哪些字段变更需要监听,只要满⾜其中配置的有⼀个字段值变更,就需要发送内部消息通知订阅⽅。
  1. create table data_change_column_config (
  2.     id int unsigned auto_increment primary key,
  3.     listen_id int null comment '监听表id',
  4.     listen_column varchar(40) null comment '监听字段',
  5.     del_flag tinyint(1) null comment '删除标记',
  6.     create_user int null comment '创建⼈',
  7.     create_time datetime null comment '创建时间',
  8.     update_user int null comment '更新⼈',
  9.     update_time datetime null comment '更新时间'
  10. ) comment '监听表变化字段配置表';
复制代码
三.监听表消息模型表
监听表消息模型表中配置的是数据变更后,消息是内部消息还是外部消息,消息发送的topic,消息延迟等级,消息需要通知的字段。
  1. create table data_change_message_config (
  2.     id int unsigned null comment '主键',
  3.     listen_id int null comment '监听表id',
  4.     notify_column varchar(2000) null comment '变更通知字段,逗号分隔',
  5.     message_topic varchar(256) null comment '变更通知消息主题',
  6.     delay_level int null comment '延迟等级',
  7.     message_type tinyint(3) null comment '消息类型',
  8.     del_flag tinyint(1) null comment '删除标记',
  9.     create_user int null comment '创建⼈',
  10.     create_time datetime null comment '创建时间',
  11.     update_user int null comment '更新⼈',
  12.     update_time datetime null comment '更新时间'
  13. ) comment '监听表消息模型表';
复制代码
四.外部消息记录表
外部消息记录表中记录的是内部消息发送后⽣成的消息编号,通过回调消息查找外部消息的消息内容。
  1. create table data_message_detail (
  2.     id int unsigned auto_increment comment '主键' primary key,
  3.     message_no varchar(64) null comment '消息编号',
  4.     table_data_json text null comment '变化的表信息内容',
  5.     diff_data_arr varchar(2000) null comment '消息变化字段数组,多个,分割',
  6.     table_name varchar(64) null comment '更新表的表名 ',
  7.     action varchar(64) null comment '执⾏发送消息的触发动作:INSERT、UPDATE、DELETE ',
  8.     del_flag tinyint(1) null comment '删除标记',
  9.     create_user int null comment '创建⼈',
  10.     create_time datetime null comment '创建时间',
  11.     update_user int null comment '更新⼈',
  12.     update_time datetime null comment '更新时间'
  13. ) comment '外部消息记录表';
复制代码
(5)消息处理流程设计
2.png
 
2.消费和处理binlog消息
(1)消费Canal发送到MQ的binlog消息
(2)解析MQ消息字符串成binlog对象
(3)处理binlog消息
(4)自研缓存组件Redis + DB读写实现
(5)将binlog对象封装为数据变更对象
(6)根据数据变更对象组装成内部消息对象并发送
(7)如果存在外部消息配置则保存数据变更对象详情
(8)消息编号的监听处理 + 发送外部消息
 
(1)消费Canal发送到MQ的binlog消息
  1. @Configuration
  2. public class ConsumerBeanConfig {
  3.     //配置内容对象
  4.     @Autowired
  5.     private RocketMQProperties rocketMQProperties;
  6.    
  7.     //消费系统内部消息——处理binlog消息
  8.     @Bean("dataChangeTopic")
  9.     public DefaultMQPushConsumer createItemStageConsumer(DataChangeListener dataChangeListener) throws MQClientException {
  10.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.DATA_CHANGE_CONSUMER_GROUP);
  11.         consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
  12.         consumer.subscribe(RocketMqConstant.DATA_CHANGE_TOPIC, "*");
  13.         consumer.registerMessageListener(dataChangeListener);
  14.         consumer.start();
  15.         return consumer;
  16.     }
  17.     ...
  18. }
  19. @Component
  20. public class DataChangeListener implements MessageListenerConcurrently {
  21.     @Autowired
  22.     private MessageService messageService;
  23.    
  24.     @Override
  25.     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  26.         try {
  27.             for (MessageExt messageExt : list) {
  28.                 String msg = new String(messageExt.getBody());
  29.                 log.info("数据变更消息通知,消息内容:{}", msg);
  30.   
  31.                 //获取binlog对象
  32.                 BinlogData binlogData = BinlogUtils.getBinlogData(msg);
  33.                 if (Objects.isNull(binlogData) || Objects.isNull(binlogData.getDataMap())) {
  34.                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  35.                 }
  36.   
  37.                 //操作类型不是insert,delete,update的,不作处理
  38.                 String operateType = binlogData.getOperateType();
  39.                 if (!BinlogType.INSERT.getValue().equals(operateType)
  40.                         && !BinlogType.DELETE.getValue().equals(operateType)
  41.                         && !BinlogType.UPDATE.getValue().equals(operateType)) {
  42.                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  43.                 }
  44.                 //处理binlog消息
  45.                 messageService.processBinlogMessage(binlogData);
  46.             }
  47.         } catch (Exception e) {
  48.             log.error("consume error, 消费数据变更消息失败", e);
  49.             //本次消费失败,下次重新消费
  50.             return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  51.         }
  52.         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  53.     }
  54. }
复制代码
(2)解析MQ消息字符串成binlog对象
  1. //MySQL的binlog对象
  2. @Data
  3. public class BinlogData implements Serializable {
  4.     //binlog对应的表名
  5.     private String tableName;
  6.     //操作时间
  7.     private Long operateTime;
  8.     //操作类型
  9.     private String operateType;
  10.     //data节点转换成的Map,key对应的是bean里的属性名,value一律为字符串
  11.     private List<Map<String, Object>> dataMap;
  12.     //data节点转换成的Map,key对应的是bean里的属性名,value一律为字符串
  13.     private List<Map<String, Object>> oldMap;
  14. }
  15. //MySQL binlog解析工具类
  16. public abstract class BinlogUtils {
  17.     //解析binlog json字符串
  18.     public static BinlogData getBinlogData(String binlogStr) {
  19.         //isJson方法里面会判断字符串是不是为空,所以这里不需要重复判断
  20.         if (JSONUtil.isJson(binlogStr)) {
  21.             JSONObject binlogJson = JSONUtil.parseObj(binlogStr);
  22.             //不处理DDL的binlog,只处理数据变更
  23.             if (binlogJson.getBool("isDdl")) {
  24.                 return null;
  25.             }
  26.             
  27.             BinlogData binlogData = new BinlogData();
  28.             //表名
  29.             String tableName = binlogJson.getStr("table");
  30.             binlogData.setTableName(tableName);
  31.             //操作类型
  32.             String operateType = binlogJson.getStr("type");
  33.             binlogData.setOperateType(operateType);
  34.             //操作时间
  35.             Long operateTime = binlogJson.getLong("ts");
  36.             binlogData.setOperateTime(operateTime);
  37.   
  38.             //data数据
  39.             JSONArray dataArray = binlogJson.getJSONArray("data");
  40.             List<Map<String, Object>> dataMap = jsonArrayToMapList(dataArray);
  41.             binlogData.setDataMap(dataMap);
  42.             if (!binlogJson.isNull("old")) {
  43.                 //old数据
  44.                 JSONArray oldArray = binlogJson.getJSONArray("old");
  45.                 List<Map<String, Object>> oldMap = jsonArrayToMapList(oldArray);
  46.                 binlogData.setOldMap(oldMap);
  47.             }
  48.             return binlogData;
  49.         }
  50.         return null;
  51.     }
  52.    
  53.     private static List<Map<String, Object>> jsonArrayToMapList(JSONArray jsonArray) {
  54.         if (null != jsonArray) {
  55.             Iterable<JSONObject> arrayIterator = jsonArray.jsonIter();
  56.             //遍历data节点或old节点并返回Map
  57.             if (null != arrayIterator) {
  58.                 //binlog的data数组或old数组里数据的类型为Map
  59.                 List<Map<String, Object>> dataMap = new ArrayList<>();
  60.                 while (arrayIterator.iterator().hasNext()) {
  61.                     JSONObject jsonObject = arrayIterator.iterator().next();
  62.                     Map<String, Object> data = new HashMap<>(jsonObject.size());
  63.                     jsonObject.keySet().forEach(key -> {
  64.                         data.put(key, jsonObject.get(key));
  65.                     });
  66.                     dataMap.add(data);
  67.                 }
  68.                 return dataMap;
  69.             }
  70.         }
  71.         return null;
  72.     }
  73. }
复制代码
(3)处理binlog消息
步骤一:通过缓存组件获取当前表的配置监听信息
步骤二:将Binlog对象封装为数据变更对象
步骤三:通过缓存组件获取配置的消息模型对象
步骤四:组装需要发送的数据变更消息对象
步骤五:发送内部消息
  1. @Service
  2. public class MessageServiceImpl implements MessageService {
  3.     ...
  4.     //处理binlog消息
  5.     @Override
  6.     public void processBinlogMessage(BinlogData binlogData) {
  7.         //1.通过缓存组件获取当前表的配置监听信息
  8.         DataChangeListenConfigDO listenConfigDO = dataChangeRepository.getListenConfigByTable(binlogData.getTableName());
  9.         //未配置监听信息的表,不作处理
  10.         if (Objects.isNull(listenConfigDO)) {
  11.             return;
  12.         }
  13.         
  14.         //2.将binlog对象封装为数据变更对象
  15.         List<DataChangeMessage> dataChangeMessages = getDataChangeMessage(binlogData, listenConfigDO);
  16.         //不需要监听,或者要监听的字段值未变动
  17.         if (CollectionUtils.isEmpty(dataChangeMessages)) {
  18.             return;
  19.         }
  20.         
  21.         //3.通过缓存组件获取配置的消息模型对象
  22.         List<DataChangeMessageConfigBO> messageConfigBOS = dataChangeRepository.getMessageConfigBOByListenId(listenConfigDO.getId());
  23.         //不需要发送消息
  24.         if (CollectionUtils.isEmpty(messageConfigBOS)) {
  25.             return;
  26.         }
  27.         
  28.         //4.组装需要发送的数据变更消息对象
  29.         List<DataSendMessageBO> sendDataMessageList = getInternalSendDataMessage(dataChangeMessages, binlogData.getDataMap(), messageConfigBOS);
  30.         //待发送的消息为空,无需处理
  31.         if (CollectionUtils.isEmpty(sendDataMessageList)) {
  32.             return;
  33.         }
  34.         
  35.         //5.发送内部消息
  36.         sendDataMessage(sendDataMessageList);
  37.         
  38.         //6.如果存在外部消息配置则保存数据变更对象详情
  39.         if (messageConfigBOS.stream().anyMatch(messageConfigBO -> MessageTypeEnum.EXTERNAL_MESSAGE.getCode().equals(messageConfigBO.getMessageType()))) {
  40.             //保存外部消息详细信息
  41.             saveDataMessageDetail(dataChangeMessages, binlogData);
  42.         }
  43.     }
  44.     ...
  45. }
复制代码
(4)商品系统自研缓存组件Redis + DB读写实现
一.通过缓存组件获取监听的配置信息
二.缓存组件RedisReadWriteManager的实现
 
一.通过缓存组件获取监听的配置信息
  1. @Repository
  2. public class DataChangeRepository {
  3.     @Resource
  4.     private RedisReadWriteManager redisReadWriteManager;
  5.     ...
  6.    
  7.     //根据表名获取监听配置信息
  8.     public DataChangeListenConfigDO getListenConfigByTable(String tableName) {
  9.         Optional<DataChangeListenConfigDO> optional = redisReadWriteManager.getRedisStringDataByCache(
  10.             tableName,
  11.             DataChangeListenConfigDO.class,
  12.             AbstractRedisKeyConstants::getListenConfigStringKey,
  13.             this::getListenConfigByTableFromDB
  14.         );
  15.         return optional.orElse(null);
  16.     }
  17.    
  18.     //获取监听变更字段配置表信息
  19.     public List<DataChangeColumnConfigDO> getColumnConfigByListenId(Long id) {
  20.         Optional<List<DataChangeColumnConfigDO>> optional = redisReadWriteManager.listRedisStringDataByCache(
  21.             id,
  22.             DataChangeColumnConfigDO.class,
  23.             AbstractRedisKeyConstants::getColumnConfigStringKey,
  24.             this::getColumnConfigByListenIdFromDB
  25.         );
  26.         return optional.orElse(null);
  27.     }
  28.    
  29.     //获取监听表消息模型配置
  30.     public List<DataChangeMessageConfigDO> getMessageConfigByListenId(Long id, MessageTypeEnum messageTypeEnum) {
  31.         //获取监听表消息模型配置
  32.         Optional<List<DataChangeMessageConfigDO>> optional = redisReadWriteManager.listRedisStringDataByCache(
  33.             id,
  34.             DataChangeMessageConfigDO.class,
  35.             AbstractRedisKeyConstants::getMessageConfigStringKey,
  36.             this::getMessageConfigByListenIdFromDB
  37.         );
  38.         
  39.         //如果未指定是内部消息还是外部消息,则不需要过滤
  40.         if (Objects.isNull(messageTypeEnum)) {
  41.             return optional.orElse(null);
  42.         }
  43.         return optional.map(dataChangeMessageConfigDOS -> dataChangeMessageConfigDOS.stream()
  44.             .filter(messageConfigBO -> messageTypeEnum.getCode().equals(messageConfigBO.getMessageType()))
  45.             .collect(Collectors.toList())).orElse(null);
  46.     }
  47.    
  48.     //根据表名获取监听配置信息
  49.     public Optional<DataChangeListenConfigDO> getListenConfigByTableFromDB(String tableName) {
  50.         LambdaQueryWrapper<DataChangeListenConfigDO> queryWrapper = Wrappers.lambdaQuery();
  51.         queryWrapper.eq(DataChangeListenConfigDO::getTableName, tableName).eq(BaseEntity::getDelFlag, DelFlagEnum.EFFECTIVE.getCode());
  52.         DataChangeListenConfigDO listenConfigDO = dataChangeListenConfigMapper.selectOne(queryWrapper);
  53.         return Objects.isNull(listenConfigDO) ? Optional.empty() : Optional.of(listenConfigDO);
  54.     }
  55.    
  56.     //获取监听变更字段配置表信息
  57.     public Optional<List<DataChangeColumnConfigDO>> getColumnConfigByListenIdFromDB(Long id) {
  58.         LambdaQueryWrapper<DataChangeColumnConfigDO> queryWrapper = Wrappers.lambdaQuery();
  59.         queryWrapper.eq(DataChangeColumnConfigDO::getListenId, id).eq(BaseEntity::getDelFlag, DelFlagEnum.EFFECTIVE.getCode());
  60.         List<DataChangeColumnConfigDO> columnConfigDOS = dataChangeColumnConfigMapper.selectList(queryWrapper);
  61.         return CollectionUtils.isEmpty(columnConfigDOS) ? Optional.empty() : Optional.of(columnConfigDOS);
  62.     }
  63.    
  64.     //查询数据变更对象列表
  65.     public Optional<List<DataChangeMessageConfigDO>> getMessageConfigByListenIdFromDB(Long id) {
  66.         LambdaQueryWrapper<DataChangeMessageConfigDO> queryWrapper = Wrappers.lambdaQuery();
  67.         queryWrapper.eq(DataChangeMessageConfigDO::getListenId, id).eq(BaseEntity::getDelFlag, DelFlagEnum.EFFECTIVE.getCode());
  68.         List<DataChangeMessageConfigDO> messageConfigDOS = dataChangeMessageConfigMapper.selectList(queryWrapper);
  69.         return CollectionUtils.isEmpty(messageConfigDOS) ? Optional.empty() : Optional.of(messageConfigDOS);
  70.     }
  71.     ...
  72. }
复制代码
二.缓存组件RedisReadWriteManager的实现
  1. //缓存读写管理
  2. @Service
  3. public class RedisReadWriteManager {
  4.     @Resource
  5.     private RedisCache redisCache;
  6.    
  7.     @Resource
  8.     private RedisLock redisLock;
  9.    
  10.     //批量获取缓存数据
  11.     //@param key                 关键字列表
  12.     //@param clazz               需要将缓存JSON转换的对象
  13.     //@param getRedisKeyFunction 获取Redis key的方法
  14.     //@param getDbFunction       获取数据源对象的方法
  15.     //@return java.util.Optional<java.util.List<T>>
  16.     public <T> Optional<List<T>> listRedisStringDataByCache(Long key, Class<T> clazz, Function<Long, String> getRedisKeyFunction, Function<Long, Optional<List<T>>> getDbFunction) {
  17.         try {
  18.             String redisKey = getRedisKeyFunction.apply(key);
  19.             //过滤无效缓存
  20.             String cache = redisCache.get(redisKey);
  21.             if (EMPTY_OBJECT_STRING.equals(cache)) {
  22.                 return Optional.empty();
  23.             }
  24.             if (StringUtils.isNotBlank(cache)) {
  25.                 List<T> list = JSON.parseArray(cache, clazz);
  26.                 return Optional.of(list);
  27.             }
  28.             //缓存没有则读库
  29.             return listRedisStringDataByDb(key, getRedisKeyFunction, getDbFunction);
  30.         } catch (Exception e) {
  31.             log.error("获取缓存数据异常 key={},clazz={}", key, clazz, e);
  32.             throw e;
  33.         }
  34.     }
  35.    
  36.     //读取数据库表数据赋值到Redis
  37.     public <T> Optional<List<T>> listRedisStringDataByDb(Long key, Function<Long, String> getRedisKeyFunction, Function<Long, Optional<List<T>>> getDbFunction) {
  38.         if (Objects.isNull(key) || Objects.isNull(getDbFunction)) {
  39.             return Optional.empty();
  40.         }
  41.         try {
  42.             if (!redisLock.lock(String.valueOf(key))) {
  43.                 return Optional.empty();
  44.             }
  45.             String redisKey = getRedisKeyFunction.apply(key);
  46.             Optional<List<T>> optional = getDbFunction.apply(key);
  47.             putCacheString(redisKey, optional);
  48.             return optional;
  49.         } finally {
  50.             redisLock.unlock(String.valueOf(key));
  51.         }
  52.     }
  53.    
  54.     private void putCacheString(String redisKey, Optional optional) {
  55.         if (!optional.isPresent()) {
  56.             //把空对象暂存到Redis
  57.             redisCache.setex(redisKey, EMPTY_OBJECT_STRING, RedisKeyUtils.redisKeyRandomTime(INT_EXPIRED_ONE_DAY, TimeUnit.HOURS, NUMBER_24));
  58.             log.warn("发生缓存穿透 redisKey={}", redisKey);
  59.             return;
  60.         }
  61.         //把表数据对象存到Redis
  62.         redisCache.setex(redisKey, JSON.toJSONString(optional.get()), RedisKeyUtils.redisKeyRandomTime(INT_EXPIRED_SEVEN_DAYS));
  63.         log.info("表数据对象存到Redis redisKey={}, data={}", redisKey, JSON.toJSONString(optional.get()));
  64.     }
  65.    
  66.     //批量获取缓存数据
  67.     //@param key                 关键字列表
  68.     //@param clazz               需要将缓存JSON转换的对象
  69.     //@param getRedisKeyFunction 获取redis key的方法
  70.     //@param getDbFunction       获取数据源对象的方法
  71.     //@return java.util.Optional<java.util.List < T>>
  72.     public <T> Optional<T> getRedisStringDataByCache(String key, Class<T> clazz, Function<String, String> getRedisKeyFunction, Function<String, Optional<T>> getDbFunction) {
  73.         try {
  74.             String redisKey = getRedisKeyFunction.apply(key);
  75.             String cache = redisCache.get(redisKey);
  76.             //过滤无效缓存
  77.             if (EMPTY_OBJECT_STRING.equals(cache)) {
  78.                 return Optional.empty();
  79.             }
  80.             if (StringUtils.isNotBlank(cache)) {
  81.                 T t = JSON.parseObject(cache, clazz);
  82.                 return Optional.of(t);
  83.             }
  84.             //缓存没有则读库
  85.             return getRedisStringDataByDb(key, getRedisKeyFunction, getDbFunction);
  86.         } catch (Exception e) {
  87.             log.error("获取缓存数据异常 key={},clazz={}", key, clazz, e);
  88.             throw e;
  89.         }
  90.     }
  91.    
  92.     //读取数据库表数据赋值到Redis
  93.     public <T> Optional<T> getRedisStringDataByDb(String key, Function<String, String> getRedisKeyFunction, Function<String, Optional<T>> getDbFunction) {
  94.         if (StringUtils.isBlank(key) || Objects.isNull(getDbFunction)) {
  95.             return Optional.empty();
  96.         }
  97.         try {
  98.             if (!redisLock.lock(key)) {
  99.                 return Optional.empty();
  100.             }
  101.             String redisKey = getRedisKeyFunction.apply(key);
  102.             Optional<T> optional = getDbFunction.apply(key);
  103.             putCacheString(redisKey, optional);
  104.             return optional;
  105.         } finally {
  106.             redisLock.unlock(key);
  107.         }
  108.     }
  109. }
  110. @Component
  111. public class RedisCache {
  112.     private RedisTemplate redisTemplate;
  113.    
  114.     public RedisCache(RedisTemplate redisTemplate) {
  115.         this.redisTemplate = redisTemplate;
  116.     }
  117.     ...
  118.    
  119.     //缓存获取
  120.     public String get(String key) {
  121.         ValueOperations<String, String> vo = redisTemplate.opsForValue();
  122.         return vo.get(key);
  123.     }
  124.    
  125.     //缓存存储并设置过期时间
  126.     public void setex(String key, String value, long time) {
  127.         redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
  128.     }
  129.     ...
  130. }
复制代码
(5)将binlog对象封装为数据变更对象
  1. @Service
  2. public class MessageServiceImpl implements MessageService {
  3.     ...
  4.     //处理binlog消息
  5.     @Override
  6.     public void processBinlogMessage(BinlogData binlogData) {
  7.         //1.通过缓存组件获取当前表的配置监听信息
  8.         DataChangeListenConfigDO listenConfigDO = dataChangeRepository.getListenConfigByTable(binlogData.getTableName());
  9.         //未配置监听信息的表,不作处理
  10.         if (Objects.isNull(listenConfigDO)) {
  11.             return;
  12.         }
  13.         
  14.         //2.将binlog对象封装为数据变更对象
  15.         List<DataChangeMessage> dataChangeMessages = getDataChangeMessage(binlogData, listenConfigDO);
  16.         //不需要监听,或者要监听的字段值未变动
  17.         if (CollectionUtils.isEmpty(dataChangeMessages)) {
  18.             return;
  19.         }
  20.         
  21.         //3.通过缓存组件获取配置的消息模型对象
  22.         List<DataChangeMessageConfigBO> messageConfigBOS = dataChangeRepository.getMessageConfigBOByListenId(listenConfigDO.getId());
  23.         //不需要发送消息
  24.         if (CollectionUtils.isEmpty(messageConfigBOS)) {
  25.             return;
  26.         }
  27.         
  28.         //4.组装需要发送的数据变更消息对象
  29.         List<DataSendMessageBO> sendDataMessageList = getInternalSendDataMessage(dataChangeMessages, binlogData.getDataMap(), messageConfigBOS);
  30.         //待发送的消息为空,无需处理
  31.         if (CollectionUtils.isEmpty(sendDataMessageList)) {
  32.             return;
  33.         }
  34.         
  35.         //5.发送内部消息
  36.         sendDataMessage(sendDataMessageList);
  37.         
  38.         //6.如果存在外部消息配置则保存数据变更消息对象详情
  39.         if (messageConfigBOS.stream().anyMatch(messageConfigBO -> MessageTypeEnum.EXTERNAL_MESSAGE.getCode().equals(messageConfigBO.getMessageType()))) {
  40.             //保存外部消息详细信息
  41.             saveDataMessageDetail(dataChangeMessages, binlogData);
  42.         }
  43.     }
  44.    
  45.     //将binlog对象封装为数据变更对象
  46.     public List<DataChangeMessage> getDataChangeMessage(BinlogData binlogData, DataChangeListenConfigDO listenConfigDO) {
  47.         //获取监听变更字段配置表信息
  48.         List<DataChangeColumnConfigDO> columnConfigDOS = dataChangeRepository.getColumnConfigByListenId(listenConfigDO.getId());
  49.         //要监听的字段为空,不作处理
  50.         if (CollectionUtils.isEmpty(columnConfigDOS)) {
  51.             return null;
  52.         }
  53.         //封装数据变更对象
  54.         return buildChangeColumn(binlogData, columnConfigDOS, listenConfigDO);
  55.     }
  56.    
  57.     //封装数据变更对象
  58.     private List<DataChangeMessage> buildChangeColumn(BinlogData binlogData, List<DataChangeColumnConfigDO> columnConfigDOS, DataChangeListenConfigDO listenConfigDO) {
  59.         List<DataChangeMessage> dataChangeMessages = new ArrayList<>();
  60.         //操作类型
  61.         String operateType = binlogData.getOperateType();
  62.         for (int i = 0; i < binlogData.getDataMap().size(); i++) {
  63.             Map<String, Object> data = binlogData.getDataMap().get(i);//新值
  64.             if (BinlogType.INSERT.getValue().equals(operateType) || BinlogType.DELETE.getValue().equals(operateType)) {
  65.                 //如果是新增或者删除,则所有监听字段都变更
  66.                 List<String> updateColumns = columnConfigDOS.stream().map(DataChangeColumnConfigDO::getListenColumn).collect(Collectors.toList());
  67.                 DataChangeMessage dataChangeMessage = buildDataChangeMessage(binlogData, updateColumns, data.get(listenConfigDO.getKeyColumn()));
  68.                 dataChangeMessages.add(dataChangeMessage);
  69.             } else {
  70.                 Map<String, Object> old = binlogData.getOldMap().get(i);//旧值
  71.                 List<String> updateColumns = new ArrayList<>();
  72.                 for (DataChangeColumnConfigDO columnConfigDO : columnConfigDOS) {
  73.                     String column = columnConfigDO.getListenColumn();
  74.                     Object columnOldValue = old.get(column);
  75.                     //旧的字段值有数据,就表示该字段变更了,添加至修改的字段集合
  76.                     if (!Objects.isNull(columnOldValue)) {
  77.                         updateColumns.add(column);
  78.                     }
  79.                 }
  80.                 //监听的字段有数据变更
  81.                 if (!CollectionUtils.isEmpty(updateColumns)) {
  82.                     DataChangeMessage dataChangeMessage = buildDataChangeMessage(binlogData, updateColumns, data.get(listenConfigDO.getKeyColumn()));
  83.                     dataChangeMessages.add(dataChangeMessage);
  84.                 }
  85.             }
  86.         }
  87.         return dataChangeMessages;
  88.     }
  89.    
  90.     //构建数据变更对象
  91.     private DataChangeMessage buildDataChangeMessage(BinlogData binlogData, List<String> updateColumns, Object keyId) {
  92.         DataChangeMessage dataChangeMessage = new DataChangeMessage(binlogData.getOperateType(), binlogData.getTableName(), updateColumns);
  93.         dataChangeMessage.setMessageNo(SnowflakeIdWorker.getCode());//雪花算法设置消息编号
  94.         dataChangeMessage.setKeyId(keyId);
  95.         return dataChangeMessage;
  96.     }
  97.     ...
  98. }
  99. //数据变更对象
  100. @Data
  101. public class DataChangeMessage implements Serializable {
  102.     //内部消息编号
  103.     private String messageNo;
  104.     //操作行为,INSERT、UPDATE、DELETE
  105.     private String action;
  106.     //表名
  107.     private String tableName;
  108.     //主键或业务id
  109.     private Object keyId;
  110.     //变更的列
  111.     private List<String> updateColumns;
  112.     //唯一确定当前数据的字段以及字段值
  113.     private List<ColumnValue> columnValues;
  114.     //消息处理成功之后的回调topic
  115.     private String callbackTopic = RocketMqConstant.DATA_EXTERNAL_CHANGE_TOPIC;
  116.    
  117.     @Data
  118.     @NoArgsConstructor
  119.     @AllArgsConstructor
  120.     public static class ColumnValue {
  121.         private String column;//列
  122.         private Object value;//值
  123.     }
  124.    
  125.     public DataChangeMessage(String operateType, String tableName, List<String> updateColumns) {
  126.         this.action = operateType;
  127.         this.tableName = tableName;
  128.         this.updateColumns = updateColumns;
  129.     }
  130. }
复制代码
(6)根据数据变更对象组装成内部消息对象并发送
  1. @Service
  2. public class MessageServiceImpl implements MessageService {
  3.     ...
  4.     //组装需要发送的数据变更消息对象
  5.     public List<DataSendMessageBO> getInternalSendDataMessage(List<DataChangeMessage> dataChangeMessages, List<Map<String, Object>> dataMap, List<DataChangeMessageConfigBO> dataChangeMessageConfigBOS) {
  6.         List<DataSendMessageBO> dataSendMessageBOS = new ArrayList<>();
  7.         for (DataChangeMessageConfigBO messageConfigBO : dataChangeMessageConfigBOS) {
  8.             //不是内部消息的不处理
  9.             if (!MessageTypeEnum.INTERNAL_MESSAGE.getCode().equals(messageConfigBO.getMessageType())) {
  10.                 continue;
  11.             }
  12.             String notifyColumn = messageConfigBO.getNotifyColumn();
  13.             String[] columns = notifyColumn.split(CoreConstant.COMMA);
  14.             for (int i = 0; i < dataChangeMessages.size(); i++) {
  15.                 DataChangeMessage dataChangeMessage = dataChangeMessages.get(i);
  16.                 List<DataChangeMessage.ColumnValue> columnValues = new ArrayList<>();
  17.                 dataChangeMessage.setColumnValues(columnValues);
  18.                 Map<String, Object> data = dataMap.get(i);
  19.                 for (String column : columns) {
  20.                     columnValues.add(new DataChangeMessage.ColumnValue(column, data.get(column)));
  21.                 }
  22.                 dataSendMessageBOS.add(new DataSendMessageBO(messageConfigBO, dataChangeMessage));
  23.             }
  24.         }
  25.         return dataSendMessageBOS;
  26.     }
  27.    
  28.     //发送内部消息
  29.     private void sendDataMessage(List<DataSendMessageBO> sendDataMessageList) {
  30.         for (DataSendMessageBO dataChangeMessage : sendDataMessageList) {
  31.             DataChangeMessage dataMessage = dataChangeMessage.getDataChangeMessage();
  32.             DataChangeMessageConfigBO dataChangeMessageConfigBO = dataChangeMessage.getDataChangeMessageConfigBO();
  33.             //发送一个延迟队列的消息出去
  34.             dataMessageProducer.send(dataMessage, dataChangeMessageConfigBO.getMessageTopic(), dataChangeMessageConfigBO.getDelayLevel());
  35.         }
  36.     }
  37.     ...
  38. }
复制代码
(7)如果存在外部消息配置则保存数据变更对象详情
  1. //消息业务实现类
  2. @Service
  3. public class MessageServiceImpl implements MessageService {
  4.     @Resource
  5.     private DataChangeRepository dataChangeRepository;
  6.     ...
  7.    
  8.     //处理binlog消息
  9.     @Override
  10.     public void processBinlogMessage(BinlogData binlogData) {
  11.         //获取当前表的监听信息
  12.         DataChangeListenConfigDO listenConfigDO = dataChangeRepository.getListenConfigByTable(binlogData.getTableName());
  13.         //未配置监听信息的表,不作处理
  14.         if (Objects.isNull(listenConfigDO)) {
  15.             return;
  16.         }
  17.   
  18.         //获取数据变更对象列表,也就是将一条binlog数据转换成可能多个的数据变更对象
  19.         List<DataChangeMessage> dataChangeMessages = getDataChangeMessage(binlogData, listenConfigDO);
  20.         //不需要监听,或者要监听的字段值未变动
  21.         if (CollectionUtils.isEmpty(dataChangeMessages)) {
  22.             return;
  23.         }
  24.   
  25.         //获取配置的消息对象
  26.         //对这个表的多条数据变更对象会封装成配置的消息对象,然后发送到RocketMQ的topic里
  27.         List<DataChangeMessageConfigBO> messageConfigBOS = dataChangeRepository.getMessageConfigBOByListenId(listenConfigDO.getId());
  28.         //不需要发送消息
  29.         if (CollectionUtils.isEmpty(messageConfigBOS)) {
  30.             return;
  31.         }
  32.   
  33.         //封装成需要发送的消息对象
  34.         //对这个表的多条数据变更对象会封装成配置的消息对象,然后发送到RocketMQ的topic里
  35.         List<DataSendMessageBO> sendDataMessageList = getInternalSendDataMessage(dataChangeMessages, binlogData.getDataMap(), messageConfigBOS);
  36.         //待发送的消息为空,无需处理
  37.         if (CollectionUtils.isEmpty(sendDataMessageList)) {
  38.             return;
  39.         }
  40.   
  41.         //发送消息
  42.         sendDataMessage(sendDataMessageList);
  43.   
  44.         //配置的消息对象列表中,如果包含外部消息类型的消息对象,就需要保存
  45.         if (messageConfigBOS.stream().anyMatch(messageConfigBO -> MessageTypeEnum.EXTERNAL_MESSAGE.getCode().equals(messageConfigBO.getMessageType()))) {
  46.             //保存外部消息详细信息
  47.             saveDataMessageDetail(dataChangeMessages, binlogData);
  48.         }
  49.     }
  50.    
  51.     //保存消息详细信息
  52.     public void saveDataMessageDetail(List<DataChangeMessage> dataChangeMessages, BinlogData binlogData) {
  53.         List<DataMessageBO> dataMessageBOS = converterDataMessageBOList(dataChangeMessages, binlogData);
  54.         dataChangeRepository.saveDataMessageDetail(dataMessageBOS);
  55.     }
  56.    
  57.     //转换消息详细信息
  58.     private List<DataMessageBO> converterDataMessageBOList(List<DataChangeMessage> dataChangeMessages, BinlogData binlogData) {
  59.         List<DataMessageBO> dataMessageBOS = new ArrayList<>(dataChangeMessages.size());
  60.         for (int i = 0; i < dataChangeMessages.size(); i++) {
  61.             DataChangeMessage dataChangeMessage = dataChangeMessages.get(i);
  62.             DataMessageBO dataMessageBO = dataMessageConverter.converterBO(dataChangeMessage);
  63.             dataMessageBO.setDiffDataArr(String.join(CoreConstant.COMMA, dataChangeMessage.getUpdateColumns()));
  64.             dataMessageBO.setTableDataJson(JSON.toJSONString(binlogData.getDataMap().get(i)));
  65.             dataMessageBOS.add(dataMessageBO);
  66.         }
  67.         return dataMessageBOS;
  68.     }
  69.     ...
  70. }
  71. @Repository
  72. public class DataChangeRepository {
  73.     ...
  74.     //存储外部消息的数据信息
  75.     public void saveDataMessageDetail(List<DataMessageBO> dataMessageBOS) {
  76.         List<DataMessageDetailDO> dataMessageDetailDOS = dataMessageConverter.converterDOList(dataMessageBOS);
  77.         int count = dataMessageDetailMapper.insertBatch(dataMessageDetailDOS);
  78.         if (count <= 0) {
  79.             throw new BaseBizException(CommonErrorCodeEnum.SQL_ERROR);
  80.         }
  81.     }
  82. }
  83. //外部消息处理对象
  84. @Data
  85. public class DataMessageBO implements Serializable {
  86.     //内部消息编号
  87.     private String messageNo;
  88.     //变化的表信息内容
  89.     private String tableDataJson;
  90.     //消息变化字段数组
  91.     private String diffDataArr;
  92.     //表名
  93.     private String tableName;
  94.     //操作类型
  95.     private String action;
  96. }
复制代码
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册