找回密码
 立即注册
首页 业界区 业界 商品中心—17.缓存与DB一致性的技术文档 ...

商品中心—17.缓存与DB一致性的技术文档

豌笆 昨天 17:18
大纲
1.缓存与数据库一致性服务的设计
2.缓存与数据库一致性服务的注解
3.缓存与数据库一致性服务的处理入口
4.缓存与数据库一致性服务的消费缓存消息
5.缓存与数据库一致性服务的消费检查
6.缓存与数据库一致性服务的实现总结
一.缓存 + DB双写的注解与AOP切面实现
二.先执行DB写入再基于AOP异步写缓存
三.缓存数据双写之缓存消息写MQ + 缓存数据写内存队列再延迟写DB
四.缓存消息基于内存双队列异步批量写DB
五.内存双队列定时交换与Batch切分
六.基于双内存队列实现定时批量写入DB
七.基于缓存key的Hash值实现内存队列分发
八.消息基于内存队列分发给线程后写入缓存
九.基于定时任务查询DB中缓存消息实现补偿
 
1.缓存与数据库一致性服务的设计
(1)缓存消息DB记录表
(2)整体流程图
 
(1)缓存消息DB记录表
  1. CREATE TABLE `data_refresh_detail` (
  2.     `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
  3.     `cache_key` varchar(128) NOT NULL DEFAULT '' COMMENT '缓存key',
  4.     `operation_type` tinyint(1) NOT NULL DEFAULT '1' COMMENT '1新增/修改,2删除',
  5.     `cache_json` text NOT NULL COMMENT '缓存内容',
  6.     `cache_type` tinyint(1) DEFAULT NULL COMMENT '缓存类型,1Redis,2Tair',
  7.     `cache_status` tinyint(1) DEFAULT '0' COMMENT '缓存的处理状态,默认为0未处理,1为已处理',
  8.     `version` varchar(32) NOT NULL COMMENT '消息版本号',
  9.     `message_type` tinyint(1) NOT NULL COMMENT '是否热点消息,0普通,1热点',
  10.     `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
  11.     `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  12.     PRIMARY KEY (`id`) USING BTREE
  13. ) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8 COMMENT='缓存消息DB记录表';
复制代码
(2)整体流程
1.png
一.为什么获取缓存数据后不直接更新缓存而发送消息到MQ
最简单的实现方案,应该就是AOP切面获取到缓存数据后直接更新缓存。由于MQ可以提升性能、削峰、解耦,以及随着业务的迭代,更新缓存的环节可能会越来越复杂。所以AOP切面获取到缓存数据后,应该先将缓存数据发送到MQ,通过消费MQ的缓存消息来更新缓存,尽量不影响添加了注解的方法。
 
二.为什么要往DB写入缓存数据的记录
但是将缓存数据以消息的形式发送到MQ后,消费消息时可能会出现故障。所以为了保证缓存数据消息能够最终被消费到并更新缓存,需要当AOP切面获取到缓存数据后,就将缓存数据写入DB。这时为了不影响添加了注解的方法的性能,可以使用异步线程去写入DB。所以DB中的缓存数据主要用来检查消费是否异常,因此允许部分丢失。
 
三.为什么没有使用最简单的线程池,而添加了多个内存队列
检查消费是否异常时,内存队列可以方便对异常的数据添加处理,消费缓存消息时通过多个内存队列 + 多线程的方式来提升处理速度。在应用启动时会创建一个线程池 + 多个内存队列 + 多个任务线程,每个任务线程都会负责处理其中一个内存队列中的缓存消息,这些任务线程都会被添加到这个线程池中执行。在消费缓存消息时,缓存消息就会不断被添加到对应的内存队列中,这样就实现了多线程处理消费到的缓存消息。
 
四.为了避免消费缓存消息出现问题启会动定时任务检查消费是否异常
定时任务会每分钟执行一次,检查缓存消息的消费是否出现问题。如果出现问题,则从DB中获取具体的缓存数据来更新缓存。
 
2.缓存与数据库一致性服务的注解
(1)注解的定义
(2)注解的使用
 
(1)注解的定义
需要实现缓存与DB一致性的方法在使用该注解时需要注意:
 
一.指定具体的缓存名称
对应于注解中的cacheKey字段。
 
二.指定第几个入参参数作为缓存的内容
对应于注解中的index字段。
 
三.指定操作类型
对应于注解中的operationType字段,1是新增或修改,2是删除。
 
四.指定发送哪种缓存数据的消息
对应于注解中的messageType,0是普通缓存的消息,1是热点缓存的消息。用来保证DB和缓存的一致性时,消息类型为普通缓存的消息。用来保证本地缓存和分布式缓存的一致性时,消息类型为热点缓存的消息。
 
五.指定使用的缓存组件类型
对应于注解中的cacheType字段,1是Redis,2是Tair。
 
六.指定发送的缓存数据消息所属的MQ分组
对应于注解中的mqCacheKey字段,指定了消息分组key。同⼀分组的消息会路由到同⼀Patition,保证消息不会被多个消费者消费。
  1. @Retention(RetentionPolicy.RUNTIME)
  2. @Target(ElementType.METHOD)
  3. public @interface CacheRefresh {
  4.     //缓存消息key
  5.     String mqCacheKey();
  6.     //缓存的key
  7.     String cacheKey();
  8.     //需要缓存的值在方法参数中的坐标偏移量,默认不传取第一个参数
  9.     String index() default "0";
  10.     //是否热点缓存消息:0普通缓存消息,1热点缓存消息
  11.     String messageType() default "0";
  12.     //缓存的操作类型:1新增/修改,2删除
  13.     String operationType() default "1";
  14.     //缓存的组件类别:1是Redis,2是Tair
  15.     String cacheType() default "1";
  16. }
复制代码
(2)注解的使用
一.使用示例
  1. @Component
  2. @Data
  3. public class InventoryBucketCache {
  4.     ...
  5.     //本地存储关于分桶信息
  6.     @CacheRefresh(cacheKey = "bucketKey", mqCacheKey = CacheConstant.INVENTORY_SKU_KEY, index = "1", messageType = CacheConstant.MESSAGE_TYPE_HOT, cacheType = CacheConstant.TAIR_CACHE_TYPE)
  7.     public void setBucketLocalCache(String bucketKey, BucketLocalCache bucketLocalCache) {
  8.         ...
  9.     }
  10.     ...
  11. }
复制代码
二.使用场景
场景一:对于需要操作DB和缓存的⽅法,为了保证数据⼀致性,可以通过注解实现DB和缓存的数据⼀致性。
 
场景二:对于需要操作本地缓存和分布式缓存的方法,为了保证数据一致性,可以通过注解实现本地缓存和分布式缓存的数据一致性。
 
3.缓存与数据库一致性服务的处理入口
(1)通过自定义的注解 + AOP切面来处理缓存数据
(2)对读写队列中的缓存数据进⾏持久化
 
(1)通过自定义的注解 + AOP切面来处理缓存数据
在执行完被注解修饰的方法后,例如该方法向数据库更新了数据。那么AOP切面就会先将缓存数据写入到读写队列,然后发一条缓存数据消息到MQ由消息系统消费进行缓存更新处理。
 
其中,读写队列中的缓存数据会被定时每秒批量写入到DB,而进行定时每秒批量写入是因为直接单条写入DB,可能会对DB造成压力,以及如果将缓存数据同步写入DB会影响添加了注解的方法的性能。而DB中的缓存数据主要用来进行兜底检查,所以允许部分丢失。
  1. //刷新缓存的自定义注解
  2. @Aspect
  3. @Component
  4. public class CacheRefreshAspect {
  5.     @Autowired
  6.     private DataRefreshProducer producer;
  7.    
  8.     @Autowired
  9.     private CacheRefreshConverter cacheRefreshConverter;
  10.    
  11.     @Autowired
  12.     private CacheQueue cacheQueue;
  13.     //切入点,@CacheRefresh注解标注的
  14.     @Pointcut("@annotation(com.demo.eshop.cache.annotation.CacheRefresh)")
  15.         public void pointcut() {
  16.     }
  17.     //环绕通知,在方法执行前后
  18.     //@param point 切入点
  19.     //@return 结果
  20.     @Around("pointcut() && @annotation(cacheRefresh)")
  21.     public Object around(ProceedingJoinPoint point, CacheRefresh cacheRefresh) throws Throwable {
  22.         //签名信息
  23.         Signature signature = point.getSignature();
  24.         //强转为方法信息
  25.         MethodSignature methodSignature = (MethodSignature) signature;
  26.         //参数名称
  27.         String[] parameterNames = methodSignature.getParameterNames();
  28.         //参数值
  29.         Object[] parameterValues = point.getArgs();
  30.         Object response;
  31.         try {
  32.             //先执行本地方法再执行异步的操作
  33.             response = point.proceed();
  34.         } catch (Throwable throwable) {
  35.             log.error("执行方法: {}失败,异常信息: {}", methodSignature.getMethod().getName(), throwable);
  36.             throw throwable;
  37.         }
  38.   
  39.         try {
  40.             MessageCache messageCache = new MessageCache();
  41.             for (int i = 0; i < parameterValues.length; i++) {
  42.                 if (parameterNames[i].equals(cacheRefresh.cacheKey())) {
  43.                     messageCache.setCacheKey(String.valueOf(parameterValues[i]));
  44.                 }
  45.                 if (Integer.valueOf(cacheRefresh.index()) == i) {
  46.                     messageCache.setCacheJSON(JSONObject.toJSONString(parameterValues[i]));
  47.                 }
  48.             }
  49.             messageCache.setOperationType(Integer.valueOf(cacheRefresh.operationType()));
  50.             //给定一个有序的版本号(默认统一的工作ID和数据中心ID)
  51.             messageCache.setVersion(SnowflakeIdWorker.getVersion());
  52.             messageCache.setMessageType(Integer.valueOf(cacheRefresh.messageType()));
  53.             messageCache.setCacheType(Integer.valueOf(cacheRefresh.cacheType()));
  54.             messageCache.setCreateDate(new Date());
  55.             
  56.             //将缓存数据写入读写队列
  57.             //缓存数据写入读写队列后,会定时每秒批量写入数据库(缓存数据写入DB只用于兜底,所以偶尔出现丢失并不影响)
  58.             DataRefreshDetailDO dataRefreshDetailDO = cacheRefreshConverter.converter(messageCache);
  59.             cacheQueue.submit(dataRefreshDetailDO);
  60.             
  61.             //发送MQ消息去处理缓存数据,比如将缓存数据更新到缓存上
  62.             //一般来说,热点缓存会比普通缓存少很多,所以普通缓存的更新会比较多,热点缓存的更新会比较少
  63.             //此外,热点缓存的更新会对时效性要求比较高,通过消息去异步处理本来就已存在一定的延迟
  64.             //所以这里将普通缓存和热点缓存的更新进行分开处理,减少时效性高的消息的处理延迟
  65.             if (CacheConstant.MESSAGE_TYPE_HOT.equals(cacheRefresh.messageType())) {
  66.                 producer.sendMessage(RocketMqConstant.DATA_HOT_RADIO_TOPIC, JSONObject.toJSONString(messageCache), "热点缓存消息发送");
  67.             } else {
  68.                 producer.sendMessage(RocketMqConstant.DATA_MESSAGE_CACHE_TOPIC, JSONObject.toJSONString(messageCache), cacheRefresh.mqCacheKey(), "通用缓存消息发送");
  69.             }
  70.         } catch (Exception e) {
  71.             log.error("处理缓存同步:{}失败,异常信息:{}", methodSignature.getMethod().getName(), e);
  72.         }
  73.         return response;
  74.     }
  75. }
  76. //消息缓存处理对象
  77. @Data
  78. public class MessageCache implements Serializable {
  79.     //缓存的key
  80.     private String cacheKey;
  81.     //缓存的操作类型
  82.     private Integer operationType;
  83.     //使用的缓存类型,1为Redis,2为Tair
  84.     private Integer cacheType;
  85.     //缓存的消息内容
  86.     private String cacheJSON;
  87.     //消息的版本号(默认用时间戳来标记先后顺序)
  88.     private String version;
  89.     //缓存的数据状态是否还有效(0默认有效,1无效)
  90.     private Integer cacheStatus = 0;
  91.     //消息的创建时间
  92.     private Date createDate;
  93.     //是否热点消息,0普通消息,1热点消息
  94.     private Integer messageType = 0;
  95. }
  96. //外部消息处理对象
  97. @Data
  98. @TableName("data_refresh_detail")
  99. public class DataRefreshDetailDO extends BaseEntity {
  100.     //缓存的key
  101.     private String cacheKey;
  102.     //缓存的操作类型
  103.     private Integer operationType = 1;
  104.     //使用的缓存类型,1为Redis,2为Tair
  105.     private Integer cacheType;
  106.     //缓存的消息内容
  107.     private String cacheJSON;
  108.     //消息的版本号(默认用时间戳来标记先后顺序)
  109.     private String version;
  110.     //是否热点消息,0普通消息,1热点消息
  111.     private Integer messageType = 0;
  112. }
  113. //缓存数据的读写队列
  114. @Component
  115. public class CacheQueue {
  116.     //提供锁的实例对象
  117.     private final PutDataLock lock = new PutDataLock();
  118.     //缓存数据的写队列
  119.     private volatile List<DataRefreshDetailDO> writeQueue = new LinkedList<>();
  120.     //缓存数据的读队列
  121.     private volatile List<DataRefreshDetailDO> readQueue = new LinkedList<>();
  122.     //是否正在写入数据
  123.     private volatile boolean isWrite = false;
  124.     ...
  125.    
  126.     //缓存数据写入写队列
  127.     //@param dataRefreshDetailDO db存储对象
  128.     public void submit(DataRefreshDetailDO dataRefreshDetailDO) {
  129.         lock.lock();
  130.         try {
  131.             writeQueue.add(dataRefreshDetailDO);
  132.         } finally {
  133.             lock.unlock();
  134.         }
  135.     }
  136.     ...
  137. }
  138. //锁竞争类对象
  139. //由于使用这个自旋锁是用于处理内存操作的,所以会很快处理完,可以忽略CPU的消耗
  140. public class PutDataLock {
  141.     private final AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
  142.    
  143.     //上锁
  144.     public void lock() {
  145.         boolean flag;
  146.         do {
  147.             flag = this.putMessageSpinLock.compareAndSet(true, false);
  148.         }
  149.         while (!flag);
  150.     }
  151.    
  152.     //解锁
  153.     public void unlock() {
  154.         this.putMessageSpinLock.compareAndSet(false, true);
  155.     }
  156. }
复制代码
(2)对读写队列中的缓存数据进⾏持久化
项⽬启动在初始化数据源时,会同时启动⼀个定时调度任务,这个定时任务就会负责每隔1秒把读写队列中的缓存数据批量写⼊DB。DB中的这些数据主要用来兜底,允许部分丢失。
  1. //数据源配置
  2. @Component
  3. public class DataSourceConfig extends AbstractDataSourceConfig {
  4.     private SqlSessionTemplate sqlSessionTemplate;//存储数据源对象
  5.    
  6.     @Autowired
  7.     private CacheQueue cacheQueue;
  8.    
  9.     //初始化加载目前需要进行数据源的相关配置
  10.     @PostConstruct
  11.     public void initMigrateDateSource() throws Exception {
  12.         //加载数据源
  13.         DataSource dataSource = buildDataSource();
  14.         SqlSessionFactoryBean sqlSessionFactory = new SqlSessionFactoryBean();
  15.         sqlSessionFactory.setDataSource(dataSource);
  16.         sqlSessionTemplate = new SqlSessionTemplate(sqlSessionFactory.getObject());
  17.   
  18.         //启动一个定时任务触发写入缓存数据到DB,每隔1秒触发一次,避免每次有缓存请求都执行DB操作
  19.         ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
  20.             new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build()
  21.         );
  22.         executorService.scheduleAtFixedRate(new Runnable() {
  23.             @Override
  24.             public void run() {
  25.                 //判断当前是否正在将读写队列中的缓存数据写入DB
  26.                 if (!cacheQueue.getIsWrite()) {
  27.                     //提交写队列中的缓存数据,然后写入DB
  28.                     cacheQueue.doCommit();
  29.                 }
  30.             }
  31.         }, 1000, 1000, TimeUnit.MILLISECONDS);
  32.     }
  33.    
  34.     //获取数据源
  35.     public SqlSession getSqlSession() {
  36.         try {
  37.             return SqlSessionUtils.getSqlSession(sqlSessionTemplate.getSqlSessionFactory(), sqlSessionTemplate.getExecutorType(), sqlSessionTemplate.getPersistenceExceptionTranslator());
  38.         } catch (Exception e) {
  39.             log.error("加载数据源对应连接池失败", e);
  40.         }
  41.         return null;
  42.     }
  43.    
  44.     //关闭sqlSession
  45.     public void closeSqlSession(SqlSession session) {
  46.         SqlSessionUtils.closeSqlSession(session, sqlSessionTemplate.getSqlSessionFactory());
  47.     }
  48. }
  49. //数据源配置
  50. public abstract class AbstractDataSourceConfig {
  51.     //构建数据源
  52.     public DruidDataSource buildDataSource() throws IOException {
  53.         Properties prop = new Properties();
  54.         InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream("cache.properties");
  55.         prop.load(inputStream);
  56.         inputStream.close();
  57.         DruidDataSource druidDataSource = DataSourceBuilder.create()
  58.             .type(DruidDataSource.class)
  59.             .driverClassName(prop.getProperty("datasource.driver-class-name"))
  60.             .url(prop.getProperty("datasource.url"))
  61.             .username(prop.getProperty("datasource.username"))
  62.             .password(prop.getProperty("datasource.password"))
  63.             .build();
  64.         druidDataSource.setTestOnBorrow(true);
  65.         druidDataSource.setTestWhileIdle(true);
  66.         return druidDataSource;
  67.     }
  68. }
  69. //缓存数据的读写队列
  70. @Component
  71. public class CacheQueue {
  72.     //提供锁的实例对象
  73.     private final PutDataLock lock = new PutDataLock();
  74.     //缓存数据的写队列
  75.     private volatile List<DataRefreshDetailDO> writeQueue = new LinkedList<>();
  76.     //缓存数据的读队列
  77.     private volatile List<DataRefreshDetailDO> readQueue = new LinkedList<>();
  78.     //是否正在写入数据
  79.     private volatile boolean isWrite = false;
  80.     @Autowired
  81.     private DataRefreshService dataRefreshService;
  82.     ...
  83.    
  84.     //交换读写队列
  85.     private void swapRequests() {
  86.         lock.lock();
  87.         try {
  88.             List<DataRefreshDetailDO> tmp = writeQueue;
  89.             writeQueue = readQueue;
  90.             readQueue = tmp;
  91.         } finally {
  92.             lock.unlock();
  93.         }
  94.     }
  95.    
  96.     //提交写队列中的缓存数据,然后写入DB
  97.     public void doCommit() {
  98.         this.isWrite = true;
  99.         //交互读写队列后,再将读队列中的缓存数据写入DB
  100.         swapRequests();
  101.         if (!readQueue.isEmpty()) {
  102.             //先进行数据切割,每次写入DB的记录为500条
  103.             List<List<DataRefreshDetailDO>> dataRefreshDetailList = DataCuttingUtil.dataCuttingString(readQueue, CollectionSize.WRITE_SIZE);
  104.             for (List<DataRefreshDetailDO> dataRefreshDetailDOS : dataRefreshDetailList) {
  105.                 dataRefreshService.saveDataRefreshDetailList(dataRefreshDetailDOS);
  106.             }
  107.         }
  108.         readQueue.clear();
  109.         this.isWrite = false;
  110.     }
  111.    
  112.     //每隔1秒执行的定时任务会调用这个方法,判断当前是否正在将读写队列中的缓存数据写入DB
  113.     //@return 是否正在读取
  114.     public Boolean getIsWrite() {
  115.         return this.isWrite;
  116.     }
  117.     ...
  118. }
  119. //将缓存数据写入DB
  120. @Service
  121. public class DataRefreshServiceImpl implements DataRefreshService {
  122.     @Resource
  123.     private DataSourceConfig dataSourceConfig;
  124.    
  125.     @Override
  126.     public void saveDataRefreshDetailList(List<DataRefreshDetailDO> dataRefreshDetailDOList) {
  127.         SqlSession session = null;
  128.         PreparedStatement pst = null;
  129.         try {
  130.             StringBuffer sql = new StringBuffer();
  131.             sql.append("INSERT INTO data_refresh_detail(cache_key, operation_type, cache_json, version, message_type, cache_type, create_time, update_time) values (?,?,?,?,?,?,now(),now())");
  132.             session = dataSourceConfig.getSqlSession();
  133.             pst = session.getConnection().prepareStatement(sql.toString());
  134.             for (DataRefreshDetailDO dataRefreshDetailDO : dataRefreshDetailDOList) {
  135.                 pst.setString(1, dataRefreshDetailDO.getCacheKey());
  136.                 pst.setInt(2, dataRefreshDetailDO.getOperationType());
  137.                 pst.setString(3, dataRefreshDetailDO.getCacheJSON());
  138.                 pst.setString(4, dataRefreshDetailDO.getVersion());
  139.                 pst.setInt(5, dataRefreshDetailDO.getMessageType());
  140.                 pst.setInt(6, dataRefreshDetailDO.getCacheType());
  141.                 pst.addBatch();
  142.             }
  143.             pst.executeBatch();
  144.         } catch (Exception e) {
  145.             log.error("sql执行失败:{}", e);
  146.         } finally {
  147.             closeSqlSession(session, pst);
  148.         }
  149.     }
  150.    
  151.     //关闭连接
  152.     private void closeSqlSession(SqlSession session, PreparedStatement pst) {
  153.         if (pst != null) {
  154.             try {
  155.                 pst.close();
  156.             } catch (SQLException e) {
  157.                 e.printStackTrace();
  158.             }
  159.         }
  160.         dataSourceConfig.closeSqlSession(session);
  161.     }
  162. }
复制代码
 
4.缓存与数据库一致性服务的消费缓存消息
(1)消费缓存消息时会将缓存消息添加到内存队列
(2)应用启动时会初始化多个内存队列并创建对应的任务线程处理每个内存队列
(3)当缓存的操作类型为新增或者修改时的处理逻辑
(4)当缓存的操作类型为删除时的处理逻辑
 
(1)消费缓存消息时会将缓存消息添加到内存队列
通过消费缓存消息来执行缓存的处理逻辑。首先根据缓存消息里的缓存key,通过Hash定位获取对应的内存队列。然后将消息添加到该内存队列中,从而实现多线程处理消费到的缓存消息。
 
为了对缓存消息进行兜底处理,每秒会记录⼀次缓存消息的最新消费时间。如果最新消费时间,⽐写入DB的最新数据的创建时间晚了超过1分钟,则会认为⽬前的消息消费出现问题,此时会有另外⼀个定时任务进⾏处理。
  1. //缓存消息处理
  2. @Component
  3. public class DataMessageCacheListener implements MessageListenerConcurrently {
  4.     @Autowired
  5.     private CacheQueue cacheQueue;
  6.    
  7.     @Autowired
  8.     private RedisCache redisCache;
  9.    
  10.     //上次记录消费的时间
  11.     private long lastTimestamp = -1L;
  12.    
  13.     @Override
  14.     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
  15.         for (MessageExt messageExt : list) {
  16.             try {
  17.                 String messageData = new String(messageExt.getBody());
  18.                 log.info("DataMessageCacheListener缓存数据变更刷新,消息内容:{}", messageData);
  19.                 MessageCache messageCache = JsonUtil.json2Object(messageData, MessageCache.class);
  20.   
  21.                 //根据消息的缓存key,获取到对应的内存队列,分散队列提高处理效率,并保证单key的执行不会并发
  22.                 BlockingQueue blockingQueue = cacheQueue.getBlockingQueue(messageCache.getCacheKey());
  23.                 //将缓存消息添加到对应的内存队列中
  24.                 blockingQueue.offer(messageCache);
  25.   
  26.                 //记录最新的消费数据时间
  27.                 setCacheRefreshTime();
  28.             } catch (Exception e) {
  29.                 log.error("consume error, 缓存消息写入队列失败", e);
  30.                 //本次消费失败,下次重新消费
  31.                 return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  32.             }
  33.         }
  34.         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  35.     }
  36.    
  37.     //记录最新的消费时间,为避免无效的set操作,这里控制每秒最多执行一次set
  38.     private synchronized void setCacheRefreshTime() {
  39.         //获取到当前的时间,精确到秒
  40.         long timestamp = System.currentTimeMillis() / 1000;
  41.         //同一个时间则默认不处理
  42.         if (lastTimestamp == timestamp) {
  43.             return;
  44.         }
  45.         //标记最新MQ消息的接收时间
  46.         //如果写入DB的最新数据的创建时间,⽐消息的最新消费时间早1分钟以上
  47.         //则认为⽬前的消息消费出现问题,此时会有另外⼀个线程去查询出DB的数据来刷新缓存(兜底处理)
  48.         redisCache.set(CacheConstant.CACHE_ROCKET_TIME_KEY, DateFormatUtils.format(new Date(), DateConstant.DATE_TIME_FORMAT_PATTERN), 0);
  49.         lastTimestamp = timestamp;
  50.     }
  51. }
复制代码
(2)应用启动时会初始化多个内存队列并创建对应的任务线程处理每个内存队列
每个内存队列会对应线程池中的一个任务线程,该任务线程会处理添加到其负责处理的内存队列中的缓存消息。缓存消息按操作类型可分为2类:⼀是新增或修改,⼀是删除。
 
线程池在应用启动时就会开始运行,并创建多个任务线程,每个任务线程都会处理其负责的内存队列中的缓存消息。而在消费缓存消息时,这些缓存消息会不断被添加到对应的内存队列中,这样就实现了多线程处理消费到的缓存消息。
  1. //处理缓存消息时使用的内存队列
  2. @Component
  3. public class CacheQueue {
  4.     //处理缓存消息的内存队列
  5.     private final List<BlockingQueue> messageQueue = new ArrayList<>();
  6.     //配置的消息内存队列数量
  7.     @Value("${message.queue-num}")
  8.     private Integer messageQueueNum;
  9.     @PostConstruct
  10.     public void init() {
  11.         ExecutorService executors = Executors.newFixedThreadPool(messageQueueNum);
  12.         for (int i = 0; i < messageQueueNum; i++) {
  13.             //设置一个队列最大容纳数量
  14.             BlockingQueue blockingQueue = new ArrayBlockingQueue(150000);
  15.             messageQueue.add(blockingQueue);
  16.             //每个内存队列都对应线程池executors里的一个任务线程任务,该任务线程会处理添加到该内存队列中的缓存消息
  17.             executors.execute(new CacheConsistencyRunner(blockingQueue));
  18.         }
  19.     }
  20.     //对消息的key进行hash处理,从而定位到具体的内存队列上
  21.     public BlockingQueue getBlockingQueue(String key) {
  22.         //先获取到传入的消息key对应的hash值
  23.         long hash = HashUtil.murMurHash(key.getBytes());
  24.         //计算出对应的内存队列
  25.         int c = (int) (hash %= messageQueue.size());
  26.         BlockingQueue blockingQueue = messageQueue.get(c);
  27.         return blockingQueue;
  28.     }
  29. }
  30. //处理内存队列中缓存消息的线程
  31. public class CacheConsistencyRunner implements Runnable {
  32.     //缓存消息的失效时间,超过这个失效时间的历史消息不处理新增修改
  33.     private final static Integer failureTime = 60;
  34.    
  35.     //存放缓存消息的内存队列
  36.     private BlockingQueue blockingQueue;
  37.     private RedisCache redisCache;
  38.     private TairCache tairCache;
  39.     private DefaultProducer defaultProducer;
  40.    
  41.     public CacheConsistencyRunner(BlockingQueue blockingQueue) {
  42.         this.blockingQueue = blockingQueue;
  43.         this.redisCache = ApplicationContextUtil.getBean(RedisCache.class);
  44.         this.tairCache = ApplicationContextUtil.getBean(TairCache.class);
  45.         this.defaultProducer = ApplicationContextUtil.getBean(DefaultProducer.class);
  46.     }
  47.    
  48.     //处理一个内存队列中的缓存消息的线程
  49.     @Override
  50.     public void run() {
  51.         try {
  52.             //TODO 对blockingQueue使用wait+notify实现内存队列为空时线程挂起
  53.             while (true) {
  54.                 MessageCache cache = (MessageCache) blockingQueue.take();
  55.                 //先判断是不是删除类型的缓存操作
  56.                 if (MessageOperationEnum.DELETE.getCode().equals(cache.getOperationType())) {
  57.                     deleteCache(cache);
  58.                 } else {
  59.                     //其它类型操作都是修改或者新增
  60.                     refreshCache(cache);
  61.                 }
  62.             }
  63.         } catch (Exception e) {
  64.             e.printStackTrace();
  65.             log.error("处理缓存消息异常", e);
  66.         }
  67.     }
  68.     ...
  69. }
复制代码
(3)当缓存的操作类型为新增或者修改时的处理逻辑
⾸先判断这个消息是否为僵⼫消息,⽐如1⼩时前的消息就没必要处理了。然后根据缓存消息中的缓存组件类型 + 缓存key,从缓存⾥获取缓存数据。如果缓存不存在则直接覆盖,如果缓存存在就需要判断⼀下各⾃的版本号,以版本号最新的为准。
 
同时要注意:第一.消息缓存组件是什么,根据缓存组件去使用对应的缓存⼯具类。第二.消息是否是热点缓存,热点数据还需要进⾏⼀次⼴播,从而让其它对这个本地缓存有需要的服务也刷新其本地缓存。
  1. //处理内存队列中缓存消息的线程
  2. public class CacheConsistencyRunner implements Runnable {
  3.     //缓存消息的失效时间,超过这个失效时间的历史消息不处理新增修改
  4.     private final static Integer failureTime = 60;
  5.     //存放缓存消息的内存队列
  6.     private BlockingQueue blockingQueue;
  7.     private RedisCache redisCache;
  8.     private TairCache tairCache;
  9.     private DefaultProducer defaultProducer;
  10.    
  11.     public CacheConsistencyRunner(BlockingQueue blockingQueue) {
  12.         this.blockingQueue = blockingQueue;
  13.         this.redisCache = ApplicationContextUtil.getBean(RedisCache.class);
  14.         this.tairCache = ApplicationContextUtil.getBean(TairCache.class);
  15.         this.defaultProducer = ApplicationContextUtil.getBean(DefaultProducer.class);
  16.     }
  17.     ...
  18.    
  19.     //刷新缓存,或者新增缓存
  20.     private void refreshCache(MessageCache cache) {
  21.         log.info("开始处理新增或者修改缓存{}", JSONObject.toJSONString(cache));
  22.         //处理缓存之前,先看看这个消息的时间是多少,避免复活一些僵尸数据
  23.         Boolean isCache = DateFormatUtil.compareTo(cache.getCreateDate(), failureTime);
  24.         //如果这条消息的创建时间已经超过的有效期,那么视为无效消息不处理
  25.         if (!isCache) {
  26.             return;
  27.         }
  28.         //获取缓存
  29.         String cacheStr = getCache(cache.getCacheType(), cache.getCacheKey());
  30.         //当前没有缓存记录,直接覆盖一条
  31.         if (StringUtils.isEmpty(cacheStr)) {
  32.             setCache(cache.getCacheType(), cache.getCacheKey(), JSONObject.toJSONString(cache), 0);
  33.             send(cache);
  34.             return;
  35.         }
  36.         MessageCache messageCache = JsonUtil.json2Object(cacheStr, MessageCache.class);
  37.         //判断版本和缓存版本的记录谁更新,记录以最新的为准,避免低版本覆盖高版本
  38.         if (cache.getVersion().compareTo(messageCache.getVersion()) > 0) {
  39.             setCache(cache.getCacheType(), cache.getCacheKey(), JSONObject.toJSONString(cache), 0);
  40.             send(cache);
  41.         }
  42.     }
  43.    
  44.     //获取缓存key
  45.     private String getCache(Integer cacheType, String cacheKey) {
  46.         if (CaCheTypeEnum.REDIS.getCode().equals(cacheType)) {
  47.             return redisCache.get(cacheKey);
  48.         }
  49.         return tairCache.get(cacheKey);
  50.     }
  51.    
  52.     //设置缓存
  53.     private void setCache(Integer cacheType, String cacheKey, String cacheValue, Integer seconds) {
  54.         if (CaCheTypeEnum.REDIS.getCode().equals(cacheType)) {
  55.             redisCache.set(cacheKey, cacheValue, seconds);
  56.         } else {
  57.             tairCache.set(cacheKey, cacheValue, seconds);
  58.         }
  59.     }
  60.    
  61.     //发送消息
  62.     public void send(MessageCache cache) {
  63.         //热点数据才处理发送广播消息
  64.         if (cache.getMessageType().equals(1)) {
  65.             defaultProducer.sendMessage(RocketMqConstant.DATA_HOT_RADIO_TOPIC, JSONObject.toJSONString(cache), 0, "广播消息发送");
  66.         }
  67.     }
  68.     ...
  69. }
复制代码
(4)当缓存的操作类型为删除时的处理逻辑
进行缓存删除时需要注意的是缓存穿透,也就是说删除缓存操作其实只是将这个记录标记为已删除的状态。如果本来就没有这个缓存则需要设置它的默认版本号为-1,后续对比的时候默认将⼤于这个版本号的删除缓存标记为已删除,也和正常的增改缓存操作对应的版本号对比起来,从而避免错误的顺序影响实际的缓存结果。
  1. //处理内存队列中缓存消息的线程
  2. public class CacheConsistencyRunner implements Runnable {
  3.     //缓存消息的失效时间,超过这个失效时间的历史消息不处理新增修改
  4.     private final static Integer failureTime = 60;
  5.     //存放缓存消息的内存队列
  6.     private BlockingQueue blockingQueue;
  7.     private RedisCache redisCache;
  8.     private TairCache tairCache;
  9.     private DefaultProducer defaultProducer;
  10.    
  11.     public CacheConsistencyRunner(BlockingQueue blockingQueue) {
  12.         this.blockingQueue = blockingQueue;
  13.         this.redisCache = ApplicationContextUtil.getBean(RedisCache.class);
  14.         this.tairCache = ApplicationContextUtil.getBean(TairCache.class);
  15.         this.defaultProducer = ApplicationContextUtil.getBean(DefaultProducer.class);
  16.     }
  17.     ...
  18.    
  19.     //删除缓存
  20.     private void deleteCache(MessageCache cache) {
  21.         //获取缓存
  22.         String cacheStr = getCache(cache.getCacheType(), cache.getCacheKey());
  23.         MessageCache messageCache = null;
  24.         if (StringUtils.isEmpty(cacheStr)) {
  25.             messageCache = new MessageCache();
  26.             messageCache.setVersion("-1");
  27.         } else {
  28.             messageCache = JsonUtil.json2Object(cacheStr, MessageCache.class);
  29.         }
  30.   
  31.         //判断一下新的请求版本是否超过或者等于缓存的版本
  32.         //如果是则标记为已删除,同时缓存60分钟,避免缓存穿透以及可能的新增无效请求无版本可比对
  33.         if (cache.getVersion().compareTo(messageCache.getVersion()) >= 0) {
  34.             cache.setCacheStatus(1);
  35.             setCache(cache.getCacheType(), cache.getCacheKey(), JSONObject.toJSONString(messageCache), failureTime * 60);
  36.             send(cache);
  37.         }
  38.     }
  39.    
  40.     //设置缓存
  41.     private void setCache(Integer cacheType, String cacheKey, String cacheValue, Integer seconds) {
  42.         if (CaCheTypeEnum.REDIS.getCode().equals(cacheType)) {
  43.             redisCache.set(cacheKey, cacheValue, seconds);
  44.         } else {
  45.             tairCache.set(cacheKey, cacheValue, seconds);
  46.         }
  47.     }
  48.    
  49.     //发送消息
  50.     public void send(MessageCache cache) {
  51.         //热点数据才处理发送广播消息
  52.         if (cache.getMessageType().equals(1)) {
  53.             defaultProducer.sendMessage(RocketMqConstant.DATA_HOT_RADIO_TOPIC, JSONObject.toJSONString(cache), 0, "广播消息发送");
  54.         }
  55.     }
  56.     ...
  57. }
  58. //消息缓存处理对象
  59. @Data
  60. public class MessageCache implements Serializable {
  61.     //缓存的key
  62.     private String cacheKey;
  63.     //缓存的操作类型
  64.     private Integer operationType;
  65.     //使用的缓存类型,1为Redis,2为Tair
  66.     private Integer cacheType;
  67.     //缓存的消息内容
  68.     private String cacheJSON;
  69.     //消息的版本号(默认用时间戳来标记先后顺序)
  70.     private String version;
  71.     //缓存的数据状态是否还有效(0默认有效,1无效)
  72.     private Integer cacheStatus = 0;
  73.     //消息的创建时间
  74.     private Date createDate;
  75.     //是否热点消息,0普通消息,1热点消息
  76.     private Integer messageType = 0;
  77. }
复制代码
 
5.缓存与数据库一致性服务的消费检查
(1)定时任务检查消费是否发生异常
(2)从缓存中获取MQ缓存消息的最新消费时间
(3)按最新消费时间来分⻚查询DB的缓存数据
 
(1)定时任务检查消费是否发生异常
为了避免从DB查询出的缓存数据量过⼤,导致处理时间超过1分钟,从而出现同时有多个定时检查任务在执行,所以需要加⼀个分布式锁。
  1. //负责定时检查消费是否发生异常而需要从DB查询缓存数据并刷新缓存
  2. @Component
  3. public class DataRefreshTask {
  4.     @Autowired
  5.     private RedisLock redisLock;
  6.    
  7.     @Autowired
  8.     private MessageService messageService;
  9.    
  10.     //每分钟验证下是否触发缓存DB兜底
  11.     @Scheduled(fixedDelay = 60000)
  12.     void DataRefreshTask() {
  13.         boolean lock = redisLock.lock(CacheConstant.CACHE_LOCK_KEY);
  14.         try {
  15.             if (lock) {
  16.                 messageService.outDataCacheRefresh();
  17.             }
  18.         } finally {
  19.             redisLock.unlock(CacheConstant.CACHE_LOCK_KEY);
  20.         }
  21.     }
  22. }
复制代码
(2)从缓存中获取MQ缓存消息的最新消费时间
如果没有最新消费时间,则默认取前1个⼩时的时间。如果有最新消费时间,则减去⼀分钟,避免和消息处理出现重复。
 
(3)按最新消费时间来分⻚查询DB的缓存数据
将查询到的数据按缓存key定位hash写⼊到具体的内存队列中,复⽤消息消费缓存的定时线程任务处理对应的缓存逻辑。
  1. @Service
  2. public class MessageServiceImpl implements MessageService {
  3.     @Autowired
  4.     private RedisCache redisCache;
  5.    
  6.     @Autowired
  7.     private DataRefreshRepository dataRefreshRepository;
  8.    
  9.     @Autowired
  10.     private CacheQueue cacheQueue;
  11.     ...
  12.    
  13.     //从DB查询缓存数据并刷新缓存
  14.     @Override
  15.     public void outDataCacheRefresh() {
  16.         //1.先获取缓存中最新的消息消费时间,这里先减去1分钟,避免和MQ的处理直接重复
  17.         String createDate = redisCache.get(CacheConstant.CACHE_ROCKET_TIME_KEY);
  18.         if (StringUtils.isEmpty(createDate)) {
  19.             //如果缓存都不存在最新消费时间,默认处理1个小时内的DB数据,再超过前的缓存没有处理必要
  20.             createDate = DateFormatUtil.getHoursDate(-1);
  21.         } else {
  22.             //缓存时间存在,对时间减去一分钟进行处理
  23.             createDate = DateFormatUtil.getMinuteDate(createDate, -1);
  24.         }
  25.         //每次处理DB兜底的查询之前,先删除掉查询时间范围外的数据,避免数据一直写入导致数据量过大影响性能
  26.         dataRefreshRepository.deleteDataRefresh(DateFormatUtil.getHoursDate(-1));
  27.   
  28.         //2.获取是否有超过的最新消费时间的数据落入DB(默认查询超过1分钟还未消费的数据)
  29.         int pageNum = 1;
  30.         //设置每次查询的数据量,最大为500
  31.         int pageSize = CollectionSize.WRITE_SIZE;
  32.         Page<DataRefreshDetailDO> page = new Page<>(pageNum, pageSize);
  33.         Page<DataRefreshDetailDO> pageResult = dataRefreshRepository.queryDataRefreshDetailDOList(page, createDate);
  34.         List<DataRefreshDetailDO> dataRefreshDetailList = pageResult.getRecords();
  35.         //将缓存数据写入内存队列进行处理
  36.         dataRefreshQueue(dataRefreshDetailList);
  37.         try {
  38.             while (pageNum <= pageResult.getTotal()) {
  39.                 pageNum += 1;
  40.                 page.setCurrent(pageNum);
  41.                 pageResult = dataRefreshRepository.queryDataRefreshDetailDOList(page, createDate);
  42.                 //将缓存数据写入内存队列进行处理
  43.                 dataRefreshQueue(pageResult.getRecords());
  44.                 //每次循环获取数据后,休眠20ms,避免对数据库造成太大压力
  45.                 Thread.sleep(20);
  46.             }
  47.         } catch (InterruptedException e) {
  48.             throw new BaseBizException(ProductExceptionCode.PRODUCT_SQL);
  49.         }
  50.     }
  51.    
  52.     //将数据写入队列进行处理
  53.     private void dataRefreshQueue(List<DataRefreshDetailDO> dataRefreshDetailList) {
  54.         if (!CollectionUtils.isEmpty(dataRefreshDetailList)) {
  55.             for (DataRefreshDetailDO dataRefreshDetailDO : dataRefreshDetailList) {
  56.                 MessageCache messageCache = dataMessageConverter.converter(dataRefreshDetailDO);
  57.                 BlockingQueue blockingQueue = cacheQueue.getBlockingQueue(messageCache.getCacheKey());
  58.                 blockingQueue.offer(messageCache);
  59.             }
  60.         }
  61.     }
  62.     ...
  63. }
  64. @Repository
  65. public class DataRefreshRepository {
  66.     @Resource
  67.     private DataRefreshMapper dataRefreshMapper;
  68.    
  69.     //获取得到消息最近消费时间的记录(默认查询大于这个时间点大于1分钟的数据)
  70.     public Page<DataRefreshDetailDO> queryDataRefreshDetailDOList(Page<DataRefreshDetailDO> page, String createDate) {
  71.         LambdaQueryWrapper<DataRefreshDetailDO> queryWrapper = Wrappers.lambdaQuery();
  72.         //查询创建时间大于这个版本号且未处理的数据
  73.         queryWrapper.gt(DataRefreshDetailDO::getVersion, createDate);
  74.         queryWrapper.eq(DataRefreshDetailDO::getCacheStatus, 0);
  75.         return dataRefreshMapper.selectPage(page, queryWrapper);
  76.     }
  77.    
  78.     //删除超过一定时间区间的缓存DB数据
  79.     public void deleteDataRefresh(String createDate) {
  80.         LambdaUpdateWrapper<DataRefreshDetailDO> updateWrapper = Wrappers.lambdaUpdate();
  81.         updateWrapper.lt(DataRefreshDetailDO::getVersion, createDate);
  82.         dataRefreshMapper.delete(updateWrapper);
  83.     }
  84. }
复制代码
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册