大纲
1.商品C端系统监听商品变更及刷新缓存
2.自研缓存框架的数据表缓存组件
3.自研缓存框架的通用缓存读写组件与DB操作组件
1.商品C端系统监听商品变更及刷新缓存
FlushRedisCache的flushRedisStringData()方法刷新缓存的逻辑是:首先从DB查询最新的数据 -> 然后删除旧缓存 -> 最后更新缓存。- @Configuration
- public class ConsumerBeanConfig {
- //配置内容对象
- @Autowired
- private RocketMQProperties rocketMQProperties;
-
- //监听商品修改的MQ消息
- @Bean("productUpdateTopic")
- public DefaultMQPushConsumer productUpdateTopic(ProductUpdateListener productUpdateListener) throws MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.PRODUCT_UPDATE_CONSUMER_GROUP);
- consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
- consumer.subscribe(RocketMqConstant.PRODUCT_UPDATE_TOPIC, "*");
- consumer.registerMessageListener(productUpdateListener);
- consumer.start();
- return consumer;
- }
- }
- //商品变更时的缓存处理
- @Component
- public class ProductUpdateListener implements MessageListenerConcurrently {
- @DubboReference(version = "1.0.0")
- private TableDataUpdateApi tableDataUpdateApi;
-
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
- try {
- for (MessageExt messageExt : list) {
- //消息处理这里,涉及到sku的缓存更新以及对应的整个商品明细的缓存更新
- String msg = new String(messageExt.getBody());
- log.info("执行商品缓存数据更新逻辑,消息内容:{}", msg);
-
- TableDataChangeDTO tableDataChangeMessage = JsonUtil.json2Object(msg, TableDataChangeDTO.class);
- //更新sku对应的商品缓存信息
- tableDataUpdateApi.tableDataChange(tableDataChangeMessage);
- //发送回调消息通知
- tableDataUpdateApi.sendCallbackMessage(tableDataChangeMessage);
- }
- } catch (Exception e) {
- log.error("consume error, 商品缓存更新失败", e);
- //本次消费失败,下次重新消费
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- }
- //商品信息变更服务
- @DubboService(version = "1.0.0", interfaceClass = TableDataUpdateApi.class, retries = 0)
- public class TableDataUpdateApiImpl implements TableDataUpdateApi {
- @Resource
- private FlushRedisCache flushRedisCache;
-
- private ExecutorService executorService = Executors.newFixedThreadPool(10);
-
- //商品表数据变更逆向更新缓存
- @Override
- public JsonResult tableDataChange(TableDataChangeDTO tableDataChangeDTO) {
- executorService.execute(() -> {
- try {
- //刷新缓存数据
- flushRedisCache.flushRedisStringData(false, tableDataChangeDTO.getTableName(), Sets.newHashSet(tableDataChangeDTO.getKeyId()));
- } catch (Exception e) {
- log.error("刷新string类型缓存异常,入参为tableDataChangeDTO={}", tableDataChangeDTO, e);
- }
- try {
- flushRedisCache.flushRedisSetData(false, tableDataChangeDTO.getTableName(), Sets.newHashSet(tableDataChangeDTO.getKeyId()));
- } catch (Exception e) {
- log.error("刷新set类型缓存异常,入参为tableDataChangeDTO={}", tableDataChangeDTO, e);
- }
- try {
- flushRedisCache.flushRedisSortedSetData(false, tableDataChangeDTO.getTableName(), Sets.newHashSet(tableDataChangeDTO.getKeyId()));
- } catch (Exception e) {
- log.error("刷新sortedset类型缓存异常,入参为tableDataChangeDTO={}", tableDataChangeDTO, e);
- }
- });
- return JsonResult.buildSuccess();
- }
- ...
- }
- //数据变更—刷新缓存
- @Component
- public class FlushRedisCache {
- //继承了AbstractRedisStringCache的缓存实例会被注入到abstractRedisStringCacheMap这个map中
- //例如CategoryBaseCache、FrontCategoryCache、ItemCollectCache、ProductDetailCache、SkuCollectCache等
- @Autowired
- private Map<String, AbstractRedisStringCache> abstractRedisStringCacheMap;
-
- //更新string类型缓存
- //@param flushAll 是否全量刷新
- //@param tableName 表名
- //@param idSet 主键ID集合
- public void flushRedisStringData(boolean flushAll, String tableName, Set<Long> idSet) {
- for (Map.Entry<String, AbstractRedisStringCache> entry : abstractRedisStringCacheMap.entrySet()) {
- AbstractRedisStringCache stringCache = entry.getValue();
- if (flushAll) {
- stringCache.flushRedisStringDataByTableUpdateData();
- continue;
- }
- //继承AbstractRedisStringCache的每个缓存实例都指定来表名,如下用于匹配出对应的缓存实例
- if (stringCache.getTableName().contains(tableName)) {
- stringCache.flushRedisStringDataByTableAndIdSet(tableName, idSet);
- }
- }
- }
- ...
- }
- //Redis(string)缓存抽象类
- public abstract class AbstractRedisStringCache<DO, BO> {
- @Resource
- private RedisReadWriteManager redisReadWriteManager;
- ...
-
- //刷新缓存—根据主表ID集合(关联表变更需要查询主表)
- public void flushRedisStringDataByTableAndIdSet(String tableName, Set<Long> idSet) {
- Optional<Set<Long>> idSetOpt = getStringDatabase().getTableIdSetByRelationTableIdSet(tableName, idSet);
- if (!idSetOpt.isPresent()) {
- return;
- }
- flushRedisStringDataByIdSet(idSetOpt.get());
- }
-
- //刷新缓存—根据主键ID集合
- //@param idSet 数据表主键ID
- private void flushRedisStringDataByIdSet(Set<Long> idSet) {
- //根据id集合从数据库中查询出数据
- Optional<RedisStringCache<DO>> stringSourceOpt = getStringDatabase().listTableDataByIdSet(idSet, queryType());
- if (!stringSourceOpt.isPresent()) {
- return;
- }
- RedisStringCache<DO> redisStringCache = stringSourceOpt.get();
- if (!CollectionUtils.isEmpty(redisStringCache.getDeleteSet())) {
- //通过缓存读写组件删除缓存
- redisReadWriteManager.delete(redisStringCache.getDeleteSet().stream().map(this::getRedisKey).collect(toSet()).toArray(new String[]{}));
- }
- if (CollectionUtils.isEmpty(redisStringCache.getAddList())) {
- return;
- }
- List<BO> boList = convertDO2BO(redisStringCache.getAddList());
- Map<String, BO> redisMap = convertBO2Map(boList);
- //通过缓存读写组件写入缓存
- redisReadWriteManager.batchWriteRedisString(redisMap);
- }
- ...
- }
- //缓存读写管理
- @Service
- public class RedisReadWriteManager {
- @Resource
- private RedisCache redisCache;
- ...
-
- //删除指定的key
- public void delete(String... keys) {
- Arrays.asList(keys).stream().forEach(key -> redisCache.delete(key));
- }
-
- //批量添加string缓存
- public <T> void batchWriteRedisString(Map<String, T> redisMap) {
- List<Map.Entry<String, T>> list = Lists.newArrayList(redisMap.entrySet());
- try {
- for (List<Map.Entry<String, T>> entries : Lists.partition(list, PAGE_SIZE_100)) {
- for (Map.Entry<String, T> entry : entries) {
- redisCache.setex(true, entry.getKey(), JSON.toJSONString(entry.getValue()), RedisKeyUtils.redisKeyRandomTime(INT_EXPIRED_SEVEN_DAYS));
- }
- try {
- Thread.sleep(SLEEP_100);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- } catch (Exception e) {
- log.error("批量添加string缓存异常 redisMap={}", redisMap, e);
- }
- }
- ...
- }
复制代码
2.自研缓存框架的数据表缓存组件
(1)自研缓存框架的目录结构
(2)通过注解实现继承同一个抽象类的数据表缓存组件实例自动注入Map
(1)自研缓存框架的目录结构
(2)通过注解实现继承同一个抽象类的数据表缓存组件实例自动注入Map
继承AbstractRedisStringCache的数据表缓存组件实例会被注入到Map中,之后通过Map便可以根据表名来获取对应的数据表缓存组件实例了。数据表缓存组件实例中会封装一些获取DB数据表数据、获取缓存key等方法,而其继承的抽象类会提供一些根据DB获取缓存、刷新缓存等模版方法。- @Component
- public class FlushRedisCache {
- //继承了AbstractRedisStringCache的缓存实例会被注入到abstractRedisStringCacheMap这个map中
- //例如CategoryBaseCache、FrontCategoryCache、ItemCollectCache、ProductDetailCache、SkuCollectCache等
- @Autowired
- private Map<String, AbstractRedisStringCache> abstractRedisStringCacheMap;
- //继承了AbstractRedisSortedSetCache的缓存实例会被注入到abstractRedisSortedSetCacheMap这个map中
- //例如SkuSellerCache
- @Autowired
- private Map<String, AbstractRedisSortedSetCache> abstractRedisSortedSetCacheMap;
- ...
- }
- @Service("productDetailCache")
- public class ProductDetailCache extends AbstractRedisStringCache<ProductDetailDO, ProductDetailBO> {
- //DB操作组件
- @Resource
- private ProductDetailStringDatabase productDetailStringDatabase;
-
- @Resource
- private ProductDetailConverter productDetailConverter;
-
- @Override
- protected RedisStringDatabase<ProductDetailDO> getStringDatabase() {
- return productDetailStringDatabase;
- }
-
- @Override
- protected String getPendingRedisKey() {
- return AbstractRedisKeyConstants.PRODUCT_SKU_INFO_STRING;
- }
-
- @Override
- protected List<ProductDetailBO> convertDO2BO(Collection<ProductDetailDO> doList) {
- return productDetailConverter.converterDetailList(doList);
- }
-
- @Override
- protected Map<String, Object> getTableFieldsMap(String key) {
- Map<String, Object> dbInputMap = Maps.newHashMap();
- dbInputMap.put(SkuCollectStringDatabase.SKU_ID, key);
- return dbInputMap;
- }
-
- @Override
- protected Class<ProductDetailBO> getBOClass() {
- return ProductDetailBO.class;
- }
- ...
- }
- //Redis(string)缓存抽象类
- //@param <DO> 数据对象
- //@param <BO> 缓存对象
- public abstract class AbstractRedisStringCache<DO, BO> {
- @Resource
- private RedisReadWriteManager redisReadWriteManager;
-
- //获取redis key
- //@param key 需要替换的关键字
- protected String getRedisKey(String key) {
- return String.format(getPendingRedisKey(), key);
- }
-
- //单个获取数据库表名
- public String getTableName() {
- return getStringDatabase().getTableName();
- }
-
- //获取DB读取对象
- protected abstract RedisStringDatabase<DO> getStringDatabase();
-
- //获取待处理的Redis key
- protected abstract String getPendingRedisKey();
-
- //DO转BO
- protected abstract List<BO> convertDO2BO(Collection<DO> doList);
-
- //关联表字段值
- protected abstract Map<String, Object> getTableFieldsMap(String key);
-
- //获取BO对象的class
- protected abstract Class<BO> getBOClass();
- ...
-
- //模版方法:根据关键字批量获取数据
- //@param useLocalCache 是否使用本地缓存
- //@param keyList 入参关键字
- public Optional<List<BO>> listRedisStringData(Boolean useLocalCache, List<String> keyList) {
- if (CollectionUtils.isEmpty(keyList)) {
- return Optional.empty();
- }
- Optional<List<BO>> boListOpt = redisReadWriteManager.listRedisStringDataByCache(useLocalCache, keyList,
- getBloomKey(), getStringDatabase()::getTableSingleFiled, getStringDatabase().getBloomField(),
- getBOClass(), this::getRedisKey, (key) -> {
- Map<String, Object> tableFieldsMap = getTableFieldsMap(key);
- Optional<DO> doOpt;
- try {
- doOpt = getStringDatabase().getTableData(tableFieldsMap, queryType());
- } catch (Exception e) {
- log.error("根据关键字批量获取数据异常 key={},paramMap={}", key, tableFieldsMap, e);
- return Optional.empty();
- }
- if (!doOpt.isPresent()) {
- return Optional.empty();
- }
- List<BO> boList = convertDO2BO(Arrays.asList(doOpt.get()));
- if (CollectionUtils.isEmpty(boList)) {
- return Optional.empty();
- }
- return Optional.of(boList.get(0));
- }
- );
- return boListOpt;
- }
-
- //模版方法:根据多关键字批量获取集合数据
- //@param keyList 入参关键字
- //@param requestMap key 数据库表字段,value 字段值,该map中的字段不要与getTableFieldsMap(key)获取的字段重复
- public Optional<List<BO>> listRedisStringData(List<String> keyList, Map<String, Object> requestMap) {
- if (CollectionUtils.isEmpty(keyList)) {
- return Optional.empty();
- }
- Optional<List<BO>> boListOpt = redisReadWriteManager.listRedisStringDataByBatchCache(keyList, getBOClass(), this::getRedisKey, (key) -> {
- Map<String, Object> tableFieldsMap = getTableFieldsMap(key);
- if (MapUtils.isNotEmpty(requestMap)) {
- tableFieldsMap.putAll(requestMap);
- }
- Optional<List<DO>> doOpt;
- try {
- doOpt = getStringDatabase().listTableData(tableFieldsMap, queryType());
- } catch (Exception e) {
- log.error("根据关键字批量获取数据异常 key={},paramMap={}", key, tableFieldsMap, e);
- return Optional.empty();
- }
- if (!doOpt.isPresent()) {
- return Optional.empty();
- }
- List<BO> boList = convertDO2BO(doOpt.get());
- if (CollectionUtils.isEmpty(boList)) {
- return Optional.empty();
- }
- return Optional.of(boList);
- });
- return boListOpt;
- }
-
- //模版方法:刷新缓存—根据主表ID集合(关联表变更需要查询主表)
- //@param tableName 关联表名
- //@param idSet 关联表主键集合
- public void flushRedisStringDataByTableAndIdSet(String tableName, Set<Long> idSet) {
- Optional<Set<Long>> idSetOpt = getStringDatabase().getTableIdSetByRelationTableIdSet(tableName, idSet);
- if (!idSetOpt.isPresent()) {
- return;
- }
- flushRedisStringDataByIdSet(idSetOpt.get());
- }
-
- //模版方法:刷新缓存—根据表变更数据ID集合
- public void flushRedisStringDataByTableUpdateData() {
- Optional<Set<Long>> updateIdSetOpt = getStringDatabase().getTableUpdateIdSet();
- if (!updateIdSetOpt.isPresent()) {
- return;
- }
- flushRedisStringDataByIdSet(updateIdSetOpt.get());
- }
-
- //模版方法:刷新缓存—根据主键ID集合
- //@param idSet 数据表主键ID
- private void flushRedisStringDataByIdSet(Set<Long> idSet) {
- //根据id集合从数据库中查询出数据
- Optional<RedisStringCache<DO>> stringSourceOpt = getStringDatabase().listTableDataByIdSet(idSet, queryType());
- if (!stringSourceOpt.isPresent()) {
- return;
- }
- RedisStringCache<DO> redisStringCache = stringSourceOpt.get();
- if (!CollectionUtils.isEmpty(redisStringCache.getDeleteSet())) {
- //通过缓存读写组件删除缓存
- redisReadWriteManager.delete(redisStringCache.getDeleteSet().stream().map(this::getRedisKey).collect(toSet()).toArray(new String[]{}));
- }
- if (CollectionUtils.isEmpty(redisStringCache.getAddList())) {
- return;
- }
- List<BO> boList = convertDO2BO(redisStringCache.getAddList());
- Map<String, BO> redisMap = convertBO2Map(boList);
- //通过缓存读写组件写入缓存
- redisReadWriteManager.batchWriteRedisString(redisMap);
- }
- ...
- }
复制代码
3.自研缓存框架的通用缓存读写组件与DB操作组件
(1)通用缓存读写组件
(2)数据表的DB操作组件
每个数据表缓存组件都会有至少两个必备组件:一个通用缓存读写组件 + 一个数据表的DB操作组件。
(1)通用缓存读写组件
通用缓存读写组件也包含两个必备组件:一个操作缓存的RedisCache组件 + 一个操作分布式锁的RedisLock组件。通用缓存读写组件封装了大量基础的缓存读写操作,这些基础的缓存读写操作会结合DB读库 + 缓存问题等解决方案来进行实现。- //缓存读写管理
- @Service
- public class RedisReadWriteManager {
- @Resource
- private RedisCache redisCache;
-
- @Resource
- private RedisLock redisLock;
- ...
-
- //删除指定的key
- public void delete(String... keys) {
- Arrays.asList(keys).stream().forEach(key -> redisCache.delete(key));
- }
-
- //批量添加string缓存
- public <T> void batchWriteRedisString(Map<String, T> redisMap) {
- List<Map.Entry<String, T>> list = Lists.newArrayList(redisMap.entrySet());
- try {
- for (List<Map.Entry<String, T>> entries : Lists.partition(list, PAGE_SIZE_100)) {
- for (Map.Entry<String, T> entry : entries) {
- redisCache.setex(true, entry.getKey(), JSON.toJSONString(entry.getValue()), RedisKeyUtils.redisKeyRandomTime(INT_EXPIRED_SEVEN_DAYS));
- }
- try {
- Thread.sleep(SLEEP_100);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- } catch (Exception e) {
- log.error("批量添加string缓存异常 redisMap={}", redisMap, e);
- }
- }
-
- //批量获取多缓存数据
- public <T> Optional<List<T>> listRedisStringDataByBatchCache(List<String> keyList, Class<T> clazz, Function<String, String> getRedisKeyFunction, Function<String, Optional<List<T>>> getDbFuction) {
- try {
- List<T> list = Lists.newArrayList();
- List<String> pendingKeyList = keyList.stream().distinct().collect(toList());
- List<String> redisKeyList = pendingKeyList.stream().map(getRedisKeyFunction).distinct().collect(toList());
- List<String> cacheList = redisCache.mget(redisKeyList);
- for (int i = 0; i < cacheList.size(); i++) {
- String cache = cacheList.get(i);
- //过滤无效缓存
- if (EMPTY_OBJECT_STRING.equals(cache)) {
- continue;
- }
- if (StringUtils.isNotBlank(cache)) {
- List<T> tList = JSON.parseArray(cache, clazz);
- list.addAll(tList);
- continue;
- }
- //缓存没有则读库
- Optional<List<T>> optional = listRedisStringDataByDb(pendingKeyList.get(i), getRedisKeyFunction, getDbFuction);
- if (optional.isPresent()) {
- list.addAll(optional.get());
- }
- }
- return CollectionUtils.isEmpty(list) ? Optional.empty() : Optional.of(list);
- } catch (Exception e) {
- log.error("批量获取缓存数据异常 keyList={},clazz={}", keyList, clazz, e);
- throw e;
- }
- }
-
- //读取数据库表多数据赋值到Redis
- public <T> Optional<List<T>> listRedisStringDataByDb(String key, Function<String, String> getRedisKeyFunction, Function<String, Optional<List<T>>> getDbFuction) {
- if (StringUtils.isEmpty(key) || Objects.isNull(getDbFuction)) {
- return Optional.empty();
- }
- try {
- if (!redisLock.lock(key)) {
- return Optional.empty();
- }
- String redisKey = getRedisKeyFunction.apply(key);
- Optional<List<T>> optional = getDbFuction.apply(key);
- if (!optional.isPresent()) {
- //把空对象暂存到redis
- redisCache.setex(true, redisKey, EMPTY_OBJECT_STRING, RedisKeyUtils.redisKeyRandomTime(INT_EXPIRED_ONE_DAY, TimeUnit.HOURS, NUMBER_24));
- log.warn("发生缓存穿透 redisKey={}", redisKey);
- return optional;
- }
- //把表数据对象存到redis
- redisCache.setex(true, redisKey, JSON.toJSONString(optional.get()), RedisKeyUtils.redisKeyRandomTime(INT_EXPIRED_SEVEN_DAYS));
- log.info("表数据对象存到Redis redisKey={}, data={}", redisKey, optional.get());
- return optional;
- } finally {
- redisLock.unlock(key);
- }
- }
- ...
- }
复制代码 (2)数据表的DB操作组件
该组件主要提供对数据表数据的查询方法。- @Service("productDetailStringDatabase")
- public class ProductDetailStringDatabase extends AbstractRedisStringDatabase<ProductDetailDO> {
- public static final String SKU_ID = "skuId";
- private static final String TABLE_NAME = "sku_info";
-
- @Resource
- private ProductRepository productRepository;
-
- @Override
- public String getTableName() {
- return TABLE_NAME;
- }
-
- @Override
- public Set<String> getTableNameSet() {
- return Sets.newHashSet(getTableName());
- }
-
- @Override
- public Optional<ProductDetailDO> getTableData(Map<String, Object> tableFieldsMap, String queryType) {
- if (tableFieldsMap.containsKey(SKU_ID)) {
- String skuId = (String) tableFieldsMap.get(SKU_ID);
- ProductDetailDO productDetailDO = productRepository.queryProductDetail(skuId);
- if (!Objects.isNull(productDetailDO) && DelFlagEnum.EFFECTIVE.getCode().equals(productDetailDO.getDelFlag())) {
- return Optional.of(productDetailDO);
- }
- return Optional.empty();
- }
- return Optional.empty();
- }
-
- @Override
- public Optional<List<ProductDetailDO>> listTableData(Map<String, Object> tableFieldsMap, String queryType) {
- return Optional.empty();
- }
-
- @Override
- public Optional<RedisStringCache<ProductDetailDO>> listTableDataByIdSet(Set<Long> idSet, String queryType) {
- RedisStringCache redisStringCache = new RedisStringCache();
- List<ProductDetailDO> addList = new ArrayList<>();
- for (Long skuId : idSet) {
- ProductDetailDO productDetailDO = productRepository.queryProductDetail(String.valueOf(skuId));
- if (!Objects.isNull(productDetailDO)) {
- addList.add(productDetailDO);
- }
- }
- redisStringCache.setAddList(addList);
- return Optional.of(redisStringCache);
- }
-
- @Override
- public Optional<RedisStringCache<ProductDetailDO>> listTableDataByIdSet(List<Long> idSet, String queryType) {
- Long skuId = idSet.get(0);
- ProductDetailDO productDetailDO = productRepository.queryProductDetail(String.valueOf(skuId));
- RedisStringCache redisStringCache = new RedisStringCache();
- redisStringCache.setAddList(Arrays.asList(productDetailDO));
- return Optional.of(redisStringCache);
- }
- }
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |