找回密码
 立即注册
首页 业界区 安全 Redis实战-基于RabbitMq消息队列实现异步的下单操作 ...

Redis实战-基于RabbitMq消息队列实现异步的下单操作

迫蔺 2025-7-19 21:00:48
一、引入依赖
通过使用RabbitMQ初步实现了:

  • 消息的可持久化机制
  • 使用lazy队列,将消息直接存储到磁盘中,直到消费者开始消费时才将消息加载到内存
  • 采用prefetch机制,实现多消费者抢占式争抢消息,能者多劳的效果
  • RabbitMQ默认的监听机制 ,提高性能
  • 开启RabbitMQ的消费确认机制和失败重试机制,保证消息的可靠性
在pom.xml文件中引入springamqp的依赖
  1. <dependency>
  2.     <groupId>org.springframework.boot</groupId>
  3.     spring-boot-starter-amqp</artifactId>
  4. </dependency>
复制代码
二、配置 application.yml
  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.15.143 # 主机名
  4.     port: 5672 # 端口
  5.     virtual-host: / # 虚拟主机
  6.     username: itcast # 用户名
  7.     password: 123321 # 密码
复制代码
三、配置消息转换器
默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性差
    1. <dependency>
    2.     <groupId>com.fasterxml.jackson.dataformat</groupId>
    3.     jackson-dataformat-xml</artifactId>
    4.     <version>2.9.10</version>
    5. </dependency>
    复制代码
    配置消息转换器。
    在启动类中添加一个Bean即可:
    1. @Bean
    2. public MessageConverter jsonMessageConverter(){
    3.     return new Jackson2JsonMessageConverter();
    4. }
    复制代码
四、声明Mq的交换机、队列、Routingkey的常量并绑定
在utils包下,创建工具类MqConstants,用于声明Mq所需的常量
  1. package com.hmdp.utils;
  2. public class MqConstants {
  3.     /**
  4.      * 交换机
  5.      */
  6.     public final static String ORDER_EXCHANGE = "order.topic";
  7.     /**
  8.      * 监听秒杀优惠券下单的队列
  9.      */
  10.     public final static String SECKILLVOCHER_ORDER_QUEUE = "seckillvocher.order.queue";
  11.     /**
  12.      * 监听普通优惠券的下单队列
  13.      */
  14.     public final static String VOCHER_ORDER_QUEUE = "vocher.order.queue";
  15.     /**
  16.      * 下单秒杀优惠券的Routingkey
  17.      */
  18.     public final static String SECKILLVOCHER_ORDER_KEY = "order.seckillvocher";
  19.     /**
  20.      * 下单普通优惠券的Routingkey
  21.      */
  22.     public final static String VOCHER_ORDER_KEY = "order.vocher";
  23. }
复制代码
声明绑定队列交换机
  1. @Configuration
  2. @RequiredArgsConstructor
  3. public class MqConfig {
  4.     /**
  5.      * 声明交换机,注册为Bean
  6.      * @return
  7.      */
  8.     @Bean
  9.     public TopicExchange topicExchange() {
  10.         return new TopicExchange(MqConstants.ORDER_EXCHANGE, true, false);
  11.     }
  12.     /**
  13.      * 声明秒杀优惠券订单队列,注册为Bean
  14.      * @return
  15.      */
  16.     @Bean
  17.     public Queue seckillQueue() {
  18.         return QueueBuilder.durable(MqConstants.SECKILLVOCHER_ORDER_QUEUE)
  19.                 .lazy()  // 设置为lazy模式  消息直接存储到磁盘,减少内存占用
  20.                 .build();
  21.     }
  22.     /**
  23.      * 声明普通优惠券订单队列,注册为Bean
  24.      */
  25.     @Bean
  26.     public Queue vocherQueue() {
  27.         return QueueBuilder.durable(MqConstants.VOCHER_ORDER_QUEUE)
  28.                 .lazy()
  29.                 .build();
  30.     }
  31.     @Bean
  32.     public Binding seckillQueueBinding() {
  33.         return BindingBuilder.bind(seckillQueue()).to(topicExchange()).with(MqConstants.SECKILLVOCHER_ORDER_KEY);
  34.     }
  35.     @Bean
  36.     public Binding vocherQueueBinding() {
  37.         return BindingBuilder.bind(vocherQueue()).to(topicExchange()).with(MqConstants.VOCHER_ORDER_KEY);
  38.     }
  39.     //失败处理策略和消息转换器的实现:
  40.     @Bean
  41.     public MessageConverter messageConverter() {
  42.         // 1.使用Jackson2JsonMessageConverter注入MessageConverter作为消息转换器
  43.         Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
  44.         // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
  45.         jjmc.setCreateMessageIds(true);
  46.         return jjmc;
  47.     }
  48.     // 定义错误队列,交换机 和队列 绑定关系
  49.     @Bean
  50.     public DirectExchange directExchange(){
  51.         return new DirectExchange("error.direct");
  52.     }
  53.     @Bean
  54.     public Queue errorQueue(){
  55.         return QueueBuilder.durable("error.queue")
  56.                 .lazy()
  57.                 .build();
  58.     }
  59.     @Bean
  60.     public Binding binding(DirectExchange directExchange, Queue errorQueue) {
  61.         return BindingBuilder.bind(errorQueue).to(directExchange).with("error");// 关键字RouteKey为error
  62.     }
  63.     /**
  64.      * -  RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  65.      * -  ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  66.      * -  RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
  67.      * @param rabbitTemplate
  68.      * @return
  69.      */
  70.     // 代替原来的失败处理策略
  71.     @Bean
  72.     public MessageRecoverer messageConverter(RabbitTemplate rabbitTemplate) {
  73.         return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
  74.     }
  75. }
复制代码
五、声明订单消息监听者
  1. @Component
  2. @Slf4j
  3. @RequiredArgsConstructor
  4. public class SpringRabbitListener {
  5.     @Resource
  6.     private RabbitTemplate rabbitTemplate;
  7.     @Resource
  8.     private VoucherOrderServiceMqImpl voucherOrderServiceMq;
  9.     /**
  10.      * 二、
  11.      *  监听器监听秒杀订单的懒惰队列
  12.      *  接收到消息 执行优惠券订单
  13.      * @param voucherOrder
  14.      * @param message
  15.      */
  16.     @RabbitListener(queues = MqConstants.SECKILLVOCHER_ORDER_QUEUE, concurrency = "1-10")
  17.     public void receiveMessage(VoucherOrder voucherOrder, Message message) {
  18.         log.debug("接收到的消息 ID:{} ", message.getMessageProperties().getMessageId());
  19.         log.debug("线程: {} - \n收到优惠券订单消息:{}",Thread.currentThread().getName(), voucherOrder);
  20.         voucherOrderServiceMq.handleVoucherOrder(voucherOrder);
  21.     }
  22. }
复制代码
六、编写VoucherOrderServiceMqImpl类
  1. @Slf4j
  2. @Service
  3. public class VoucherOrderServiceMqImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderServiceMq {
  4.     @Resource
  5.     private ISeckillVoucherService seckillVoucherService;
  6.     @Resource
  7.     private RedisIdWorker redisIdWorker;
  8.     @Resource
  9.     private RedissonClient redissonClient;
  10.     @Resource
  11.     private StringRedisTemplate stringRedisTemplate;
  12.     @Resource
  13.     private RabbitTemplate rabbitTemplate;
  14.     @Resource
  15.     private IVoucherOrderServiceMq proxy;
  16.     // Lua脚本
  17.     private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
  18.     static {
  19.         SECKILL_SCRIPT = new DefaultRedisScript<>();
  20.         SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
  21.         SECKILL_SCRIPT.setResultType(Long.class);
  22.     }
  23.     /**
  24.      * 一、
  25.      *  先执行Lua脚本,保证redis的原子性 并判断用户的秒杀资格
  26.      *  将订单写入消息队列 等待异步下单操作
  27.      * @param voucherId
  28.      * @return
  29.      */
  30.     @Override
  31.     public Result seckillVocher(Long voucherId) {
  32.         // 获取用户
  33.         Long userId = UserHolder.getUser().getId();
  34.         long orderId = redisIdWorker.nextId("order");
  35.         // 1.执行Lua脚本  保证redis的原子性
  36.         Long result = stringRedisTemplate.execute(
  37.                 SECKILL_SCRIPT,
  38.                 Collections.emptyList(),
  39.                 voucherId.toString(), userId.toString(), String.valueOf(orderId)
  40.         );
  41.         int r = result.intValue();
  42.         // 2.判断结果是否为0
  43.         if (r != 0) {
  44.             // 2.1不为0,代表没有购买资格
  45.             return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
  46.         }
  47.         // 3.创建优惠券订单并写入消息队列
  48.         VoucherOrder voucherOrder = new VoucherOrder();
  49.         voucherOrder.setId(orderId);
  50.         voucherOrder.setUserId(userId);
  51.         voucherOrder.setVoucherId(voucherId);
  52.         //TODO 使用RabbitMq实现秒杀优惠券的异步下单
  53.         rabbitTemplate.convertAndSend(MqConstants.ORDER_EXCHANGE, MqConstants.SECKILLVOCHER_ORDER_KEY, voucherOrder);
  54.         // 5.返回订单id
  55.         return Result.ok(orderId);
  56.     }
  57.     /**
  58.      * 三、
  59.      *  监听器执行该方法
  60.      *  使用Redisson获取分布式锁
  61.      *  对数据库保存订单的方法加锁  (注意:spring的事务是放在threadLocal中,多线程的话,事务会失效;同一个类中调用事务的方法也会失效,要通过代理对象来使事务生效)
  62.      * @param voucherOrder
  63.      */
  64.     public void handleVoucherOrder(VoucherOrder voucherOrder) {
  65.         // 1.获取用户
  66.         Long userId = voucherOrder.getUserId();
  67.         // 2.创建锁对象 分布式锁
  68.         RLock redisLock = redissonClient.getLock("lock:order:" + userId);
  69.         // 3.尝试获取锁
  70.         boolean isLock = redisLock.tryLock();
  71.         // 4.判断是否获得锁成功
  72.         if (!isLock) {
  73.             // 获取锁失败
  74.             log.error("不允许重复下单!");
  75.             return;
  76.         }
  77.         try {
  78.             // 获取代理对象
  79.             proxy = (IVoucherOrderServiceMq) AopContext.currentProxy();
  80.             // 创建订单 更新库存 保存订单到数据库  通过代理对象 来保证调用同一个类中的事务方法的事务生效
  81.             proxy.createVoucherOrder(voucherOrder);
  82.         } finally {
  83.             // 释放锁
  84.             redisLock.unlock();
  85.         }
  86.     }
  87.     /**
  88.      * 四、
  89.      *  操作数据库扣减库存、保存订单`
  90.      * @param voucherOrder
  91.      * @return
  92.      */
  93.     @Transactional
  94.     public Result createVoucherOrder(VoucherOrder voucherOrder) {
  95.         // 1.查询订单
  96.         Long userId = voucherOrder.getUserId();
  97.         Integer count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
  98.         // 1.1判断是否存在
  99.         if (count > 0) {
  100.             // 用户已经购买过了
  101.             return Result.fail("用户已经购买过一次!");
  102.         }
  103.         //2.扣减库存
  104.         // 使用乐观锁解决超卖问题 主要在于,修改的时候判断一下是否有人修改过
  105.         boolean success = seckillVoucherService.update()
  106.                 .setSql("stock= stock -1")  // set stock = stock - 1
  107.                 .eq("voucher_id", voucherOrder.getVoucherId())
  108.                 .gt("stock", 0)
  109.                 .update();  // where id = ? and stock > 0  这样成功率更高
  110.         if (!success) {
  111.             //扣减库存
  112.             return Result.fail("库存不足!");
  113.         }
  114.         save(voucherOrder);
  115.         return Result.ok();
  116.     }
  117. }
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册