迫蔺 发表于 2025-7-19 21:00:48

Redis实战-基于RabbitMq消息队列实现异步的下单操作

一、引入依赖
通过使用RabbitMQ初步实现了:

[*]消息的可持久化机制
[*]使用lazy队列,将消息直接存储到磁盘中,直到消费者开始消费时才将消息加载到内存
[*]采用prefetch机制,实现多消费者抢占式争抢消息,能者多劳的效果
[*]RabbitMQ默认的监听机制 ,提高性能
[*]开启RabbitMQ的消费确认机制和失败重试机制,保证消息的可靠性
在pom.xml文件中引入springamqp的依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    spring-boot-starter-amqp</artifactId>
</dependency>二、配置 application.yml
spring:
rabbitmq:
    host: 192.168.15.143 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: itcast # 用户名
    password: 123321 # 密码三、配置消息转换器
默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

[*]数据体积过大
[*]有安全漏洞
[*]可读性差
<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>配置消息转换器。
在启动类中添加一个Bean即可:
@Bean
public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter();
}
四、声明Mq的交换机、队列、Routingkey的常量并绑定
在utils包下,创建工具类MqConstants,用于声明Mq所需的常量
package com.hmdp.utils;

public class MqConstants {
    /**
   * 交换机
   */
    public final static String ORDER_EXCHANGE = "order.topic";
    /**
   * 监听秒杀优惠券下单的队列
   */
    public final static String SECKILLVOCHER_ORDER_QUEUE = "seckillvocher.order.queue";
    /**
   * 监听普通优惠券的下单队列
   */
    public final static String VOCHER_ORDER_QUEUE = "vocher.order.queue";
    /**
   * 下单秒杀优惠券的Routingkey
   */
    public final static String SECKILLVOCHER_ORDER_KEY = "order.seckillvocher";
    /**
   * 下单普通优惠券的Routingkey
   */
    public final static String VOCHER_ORDER_KEY = "order.vocher";

}声明绑定队列交换机
@Configuration
@RequiredArgsConstructor
public class MqConfig {
    /**
   * 声明交换机,注册为Bean
   * @return
   */
    @Bean
    public TopicExchange topicExchange() {
      return new TopicExchange(MqConstants.ORDER_EXCHANGE, true, false);
    }

    /**
   * 声明秒杀优惠券订单队列,注册为Bean
   * @return
   */
    @Bean
    public Queue seckillQueue() {
      return QueueBuilder.durable(MqConstants.SECKILLVOCHER_ORDER_QUEUE)
                .lazy()// 设置为lazy模式消息直接存储到磁盘,减少内存占用
                .build();
    }
    /**
   * 声明普通优惠券订单队列,注册为Bean
   */
    @Bean
    public Queue vocherQueue() {
      return QueueBuilder.durable(MqConstants.VOCHER_ORDER_QUEUE)
                .lazy()
                .build();
    }

    @Bean
    public Binding seckillQueueBinding() {
      return BindingBuilder.bind(seckillQueue()).to(topicExchange()).with(MqConstants.SECKILLVOCHER_ORDER_KEY);
    }

    @Bean
    public Binding vocherQueueBinding() {
      return BindingBuilder.bind(vocherQueue()).to(topicExchange()).with(MqConstants.VOCHER_ORDER_KEY);
    }

    //失败处理策略和消息转换器的实现:
    @Bean
    public MessageConverter messageConverter() {
      // 1.使用Jackson2JsonMessageConverter注入MessageConverter作为消息转换器
      Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
      // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
      jjmc.setCreateMessageIds(true);
      return jjmc;
    }
    // 定义错误队列,交换机 和队列 绑定关系
    @Bean
    public DirectExchange directExchange(){
      return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
      return QueueBuilder.durable("error.queue")
                .lazy()
                .build();
    }
    @Bean
    public Binding binding(DirectExchange directExchange, Queue errorQueue) {
      return BindingBuilder.bind(errorQueue).to(directExchange).with("error");// 关键字RouteKey为error
    }

    /**
   * -RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
   * -ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
   * -RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
   * @param rabbitTemplate
   * @return
   */
    // 代替原来的失败处理策略
    @Bean
    public MessageRecoverer messageConverter(RabbitTemplate rabbitTemplate) {
      return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}五、声明订单消息监听者
@Component
@Slf4j
@RequiredArgsConstructor
public class SpringRabbitListener {
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private VoucherOrderServiceMqImpl voucherOrderServiceMq;


    /**
   * 二、
   *监听器监听秒杀订单的懒惰队列
   *接收到消息 执行优惠券订单
   * @param voucherOrder
   * @param message
   */
    @RabbitListener(queues = MqConstants.SECKILLVOCHER_ORDER_QUEUE, concurrency = "1-10")
    public void receiveMessage(VoucherOrder voucherOrder, Message message) {
      log.debug("接收到的消息 ID:{} ", message.getMessageProperties().getMessageId());
      log.debug("线程: {} - \n收到优惠券订单消息:{}",Thread.currentThread().getName(), voucherOrder);
      voucherOrderServiceMq.handleVoucherOrder(voucherOrder);
    }

}六、编写VoucherOrderServiceMqImpl类
@Slf4j
@Service
public class VoucherOrderServiceMqImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderServiceMq {

    @Resource
    private ISeckillVoucherService seckillVoucherService;
    @Resource
    private RedisIdWorker redisIdWorker;
    @Resource
    private RedissonClient redissonClient;
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private IVoucherOrderServiceMq proxy;


    // Lua脚本
    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
    static {
      SECKILL_SCRIPT = new DefaultRedisScript<>();
      SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
      SECKILL_SCRIPT.setResultType(Long.class);
    }

    /**
   * 一、
   *先执行Lua脚本,保证redis的原子性 并判断用户的秒杀资格
   *将订单写入消息队列 等待异步下单操作
   * @param voucherId
   * @return
   */
    @Override
    public Result seckillVocher(Long voucherId) {
      // 获取用户
      Long userId = UserHolder.getUser().getId();
      long orderId = redisIdWorker.nextId("order");
      // 1.执行Lua脚本保证redis的原子性
      Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(), userId.toString(), String.valueOf(orderId)
      );
      int r = result.intValue();
      // 2.判断结果是否为0
      if (r != 0) {
            // 2.1不为0,代表没有购买资格
            return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
      }
      // 3.创建优惠券订单并写入消息队列
      VoucherOrder voucherOrder = new VoucherOrder();
      voucherOrder.setId(orderId);
      voucherOrder.setUserId(userId);
      voucherOrder.setVoucherId(voucherId);
      //TODO 使用RabbitMq实现秒杀优惠券的异步下单
      rabbitTemplate.convertAndSend(MqConstants.ORDER_EXCHANGE, MqConstants.SECKILLVOCHER_ORDER_KEY, voucherOrder);
      // 5.返回订单id
      return Result.ok(orderId);
    }

    /**
   * 三、
   *监听器执行该方法
   *使用Redisson获取分布式锁
   *对数据库保存订单的方法加锁(注意:spring的事务是放在threadLocal中,多线程的话,事务会失效;同一个类中调用事务的方法也会失效,要通过代理对象来使事务生效)
   * @param voucherOrder
   */
    public void handleVoucherOrder(VoucherOrder voucherOrder) {
      // 1.获取用户
      Long userId = voucherOrder.getUserId();
      // 2.创建锁对象 分布式锁
      RLock redisLock = redissonClient.getLock("lock:order:" + userId);
      // 3.尝试获取锁
      boolean isLock = redisLock.tryLock();
      // 4.判断是否获得锁成功
      if (!isLock) {
            // 获取锁失败
            log.error("不允许重复下单!");
            return;
      }
      try {
            // 获取代理对象
            proxy = (IVoucherOrderServiceMq) AopContext.currentProxy();
            // 创建订单 更新库存 保存订单到数据库通过代理对象 来保证调用同一个类中的事务方法的事务生效
            proxy.createVoucherOrder(voucherOrder);
      } finally {
            // 释放锁
            redisLock.unlock();
      }
    }

    /**
   * 四、
   *操作数据库扣减库存、保存订单`
   * @param voucherOrder
   * @return
   */
    @Transactional
    public Result createVoucherOrder(VoucherOrder voucherOrder) {
      // 1.查询订单
      Long userId = voucherOrder.getUserId();
      Integer count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
      // 1.1判断是否存在
      if (count > 0) {
            // 用户已经购买过了
            return Result.fail("用户已经购买过一次!");
      }
      //2.扣减库存
      // 使用乐观锁解决超卖问题 主要在于,修改的时候判断一下是否有人修改过
      boolean success = seckillVoucherService.update()
                .setSql("stock= stock -1")// set stock = stock - 1
                .eq("voucher_id", voucherOrder.getVoucherId())
                .gt("stock", 0)
                .update();// where id = ? and stock > 0这样成功率更高
      if (!success) {
            //扣减库存
            return Result.fail("库存不足!");
      }
      save(voucherOrder);
      return Result.ok();
    }
}
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: Redis实战-基于RabbitMq消息队列实现异步的下单操作