一、引入依赖
通过使用RabbitMQ初步实现了:
- 消息的可持久化机制
- 使用lazy队列,将消息直接存储到磁盘中,直到消费者开始消费时才将消息加载到内存
- 采用prefetch机制,实现多消费者抢占式争抢消息,能者多劳的效果
- RabbitMQ默认的监听机制 ,提高性能
- 开启RabbitMQ的消费确认机制和失败重试机制,保证消息的可靠性
在pom.xml文件中引入springamqp的依赖- <dependency>
- <groupId>org.springframework.boot</groupId>
- spring-boot-starter-amqp</artifactId>
- </dependency>
复制代码 二、配置 application.yml- spring:
- rabbitmq:
- host: 192.168.15.143 # 主机名
- port: 5672 # 端口
- virtual-host: / # 虚拟主机
- username: itcast # 用户名
- password: 123321 # 密码
复制代码 三、配置消息转换器
默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
- 数据体积过大
- 有安全漏洞
- 可读性差
- <dependency>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- jackson-dataformat-xml</artifactId>
- <version>2.9.10</version>
- </dependency>
复制代码 配置消息转换器。
在启动类中添加一个Bean即可:- @Bean
- public MessageConverter jsonMessageConverter(){
- return new Jackson2JsonMessageConverter();
- }
复制代码 四、声明Mq的交换机、队列、Routingkey的常量并绑定
在utils包下,创建工具类MqConstants,用于声明Mq所需的常量- package com.hmdp.utils;
- public class MqConstants {
- /**
- * 交换机
- */
- public final static String ORDER_EXCHANGE = "order.topic";
- /**
- * 监听秒杀优惠券下单的队列
- */
- public final static String SECKILLVOCHER_ORDER_QUEUE = "seckillvocher.order.queue";
- /**
- * 监听普通优惠券的下单队列
- */
- public final static String VOCHER_ORDER_QUEUE = "vocher.order.queue";
- /**
- * 下单秒杀优惠券的Routingkey
- */
- public final static String SECKILLVOCHER_ORDER_KEY = "order.seckillvocher";
- /**
- * 下单普通优惠券的Routingkey
- */
- public final static String VOCHER_ORDER_KEY = "order.vocher";
- }
复制代码 声明绑定队列交换机- @Configuration
- @RequiredArgsConstructor
- public class MqConfig {
- /**
- * 声明交换机,注册为Bean
- * @return
- */
- @Bean
- public TopicExchange topicExchange() {
- return new TopicExchange(MqConstants.ORDER_EXCHANGE, true, false);
- }
- /**
- * 声明秒杀优惠券订单队列,注册为Bean
- * @return
- */
- @Bean
- public Queue seckillQueue() {
- return QueueBuilder.durable(MqConstants.SECKILLVOCHER_ORDER_QUEUE)
- .lazy() // 设置为lazy模式 消息直接存储到磁盘,减少内存占用
- .build();
- }
- /**
- * 声明普通优惠券订单队列,注册为Bean
- */
- @Bean
- public Queue vocherQueue() {
- return QueueBuilder.durable(MqConstants.VOCHER_ORDER_QUEUE)
- .lazy()
- .build();
- }
- @Bean
- public Binding seckillQueueBinding() {
- return BindingBuilder.bind(seckillQueue()).to(topicExchange()).with(MqConstants.SECKILLVOCHER_ORDER_KEY);
- }
- @Bean
- public Binding vocherQueueBinding() {
- return BindingBuilder.bind(vocherQueue()).to(topicExchange()).with(MqConstants.VOCHER_ORDER_KEY);
- }
- //失败处理策略和消息转换器的实现:
- @Bean
- public MessageConverter messageConverter() {
- // 1.使用Jackson2JsonMessageConverter注入MessageConverter作为消息转换器
- Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
- // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
- jjmc.setCreateMessageIds(true);
- return jjmc;
- }
- // 定义错误队列,交换机 和队列 绑定关系
- @Bean
- public DirectExchange directExchange(){
- return new DirectExchange("error.direct");
- }
- @Bean
- public Queue errorQueue(){
- return QueueBuilder.durable("error.queue")
- .lazy()
- .build();
- }
- @Bean
- public Binding binding(DirectExchange directExchange, Queue errorQueue) {
- return BindingBuilder.bind(errorQueue).to(directExchange).with("error");// 关键字RouteKey为error
- }
- /**
- * - RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
- * - ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- * - RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
- * @param rabbitTemplate
- * @return
- */
- // 代替原来的失败处理策略
- @Bean
- public MessageRecoverer messageConverter(RabbitTemplate rabbitTemplate) {
- return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
- }
- }
复制代码 五、声明订单消息监听者- @Component
- @Slf4j
- @RequiredArgsConstructor
- public class SpringRabbitListener {
- @Resource
- private RabbitTemplate rabbitTemplate;
- @Resource
- private VoucherOrderServiceMqImpl voucherOrderServiceMq;
- /**
- * 二、
- * 监听器监听秒杀订单的懒惰队列
- * 接收到消息 执行优惠券订单
- * @param voucherOrder
- * @param message
- */
- @RabbitListener(queues = MqConstants.SECKILLVOCHER_ORDER_QUEUE, concurrency = "1-10")
- public void receiveMessage(VoucherOrder voucherOrder, Message message) {
- log.debug("接收到的消息 ID:{} ", message.getMessageProperties().getMessageId());
- log.debug("线程: {} - \n收到优惠券订单消息:{}",Thread.currentThread().getName(), voucherOrder);
- voucherOrderServiceMq.handleVoucherOrder(voucherOrder);
- }
- }
复制代码 六、编写VoucherOrderServiceMqImpl类- @Slf4j
- @Service
- public class VoucherOrderServiceMqImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderServiceMq {
- @Resource
- private ISeckillVoucherService seckillVoucherService;
- @Resource
- private RedisIdWorker redisIdWorker;
- @Resource
- private RedissonClient redissonClient;
- @Resource
- private StringRedisTemplate stringRedisTemplate;
- @Resource
- private RabbitTemplate rabbitTemplate;
- @Resource
- private IVoucherOrderServiceMq proxy;
- // Lua脚本
- private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
- static {
- SECKILL_SCRIPT = new DefaultRedisScript<>();
- SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
- SECKILL_SCRIPT.setResultType(Long.class);
- }
- /**
- * 一、
- * 先执行Lua脚本,保证redis的原子性 并判断用户的秒杀资格
- * 将订单写入消息队列 等待异步下单操作
- * @param voucherId
- * @return
- */
- @Override
- public Result seckillVocher(Long voucherId) {
- // 获取用户
- Long userId = UserHolder.getUser().getId();
- long orderId = redisIdWorker.nextId("order");
- // 1.执行Lua脚本 保证redis的原子性
- Long result = stringRedisTemplate.execute(
- SECKILL_SCRIPT,
- Collections.emptyList(),
- voucherId.toString(), userId.toString(), String.valueOf(orderId)
- );
- int r = result.intValue();
- // 2.判断结果是否为0
- if (r != 0) {
- // 2.1不为0,代表没有购买资格
- return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
- }
- // 3.创建优惠券订单并写入消息队列
- VoucherOrder voucherOrder = new VoucherOrder();
- voucherOrder.setId(orderId);
- voucherOrder.setUserId(userId);
- voucherOrder.setVoucherId(voucherId);
- //TODO 使用RabbitMq实现秒杀优惠券的异步下单
- rabbitTemplate.convertAndSend(MqConstants.ORDER_EXCHANGE, MqConstants.SECKILLVOCHER_ORDER_KEY, voucherOrder);
- // 5.返回订单id
- return Result.ok(orderId);
- }
- /**
- * 三、
- * 监听器执行该方法
- * 使用Redisson获取分布式锁
- * 对数据库保存订单的方法加锁 (注意:spring的事务是放在threadLocal中,多线程的话,事务会失效;同一个类中调用事务的方法也会失效,要通过代理对象来使事务生效)
- * @param voucherOrder
- */
- public void handleVoucherOrder(VoucherOrder voucherOrder) {
- // 1.获取用户
- Long userId = voucherOrder.getUserId();
- // 2.创建锁对象 分布式锁
- RLock redisLock = redissonClient.getLock("lock:order:" + userId);
- // 3.尝试获取锁
- boolean isLock = redisLock.tryLock();
- // 4.判断是否获得锁成功
- if (!isLock) {
- // 获取锁失败
- log.error("不允许重复下单!");
- return;
- }
- try {
- // 获取代理对象
- proxy = (IVoucherOrderServiceMq) AopContext.currentProxy();
- // 创建订单 更新库存 保存订单到数据库 通过代理对象 来保证调用同一个类中的事务方法的事务生效
- proxy.createVoucherOrder(voucherOrder);
- } finally {
- // 释放锁
- redisLock.unlock();
- }
- }
- /**
- * 四、
- * 操作数据库扣减库存、保存订单`
- * @param voucherOrder
- * @return
- */
- @Transactional
- public Result createVoucherOrder(VoucherOrder voucherOrder) {
- // 1.查询订单
- Long userId = voucherOrder.getUserId();
- Integer count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
- // 1.1判断是否存在
- if (count > 0) {
- // 用户已经购买过了
- return Result.fail("用户已经购买过一次!");
- }
- //2.扣减库存
- // 使用乐观锁解决超卖问题 主要在于,修改的时候判断一下是否有人修改过
- boolean success = seckillVoucherService.update()
- .setSql("stock= stock -1") // set stock = stock - 1
- .eq("voucher_id", voucherOrder.getVoucherId())
- .gt("stock", 0)
- .update(); // where id = ? and stock > 0 这样成功率更高
- if (!success) {
- //扣减库存
- return Result.fail("库存不足!");
- }
- save(voucherOrder);
- return Result.ok();
- }
- }
复制代码 来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |