Redis实战-基于RabbitMq消息队列实现异步的下单操作
一、引入依赖通过使用RabbitMQ初步实现了:
[*]消息的可持久化机制
[*]使用lazy队列,将消息直接存储到磁盘中,直到消费者开始消费时才将消息加载到内存
[*]采用prefetch机制,实现多消费者抢占式争抢消息,能者多劳的效果
[*]RabbitMQ默认的监听机制 ,提高性能
[*]开启RabbitMQ的消费确认机制和失败重试机制,保证消息的可靠性
在pom.xml文件中引入springamqp的依赖
<dependency>
<groupId>org.springframework.boot</groupId>
spring-boot-starter-amqp</artifactId>
</dependency>二、配置 application.yml
spring:
rabbitmq:
host: 192.168.15.143 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: itcast # 用户名
password: 123321 # 密码三、配置消息转换器
默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
[*]数据体积过大
[*]有安全漏洞
[*]可读性差
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>配置消息转换器。
在启动类中添加一个Bean即可:
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
四、声明Mq的交换机、队列、Routingkey的常量并绑定
在utils包下,创建工具类MqConstants,用于声明Mq所需的常量
package com.hmdp.utils;
public class MqConstants {
/**
* 交换机
*/
public final static String ORDER_EXCHANGE = "order.topic";
/**
* 监听秒杀优惠券下单的队列
*/
public final static String SECKILLVOCHER_ORDER_QUEUE = "seckillvocher.order.queue";
/**
* 监听普通优惠券的下单队列
*/
public final static String VOCHER_ORDER_QUEUE = "vocher.order.queue";
/**
* 下单秒杀优惠券的Routingkey
*/
public final static String SECKILLVOCHER_ORDER_KEY = "order.seckillvocher";
/**
* 下单普通优惠券的Routingkey
*/
public final static String VOCHER_ORDER_KEY = "order.vocher";
}声明绑定队列交换机
@Configuration
@RequiredArgsConstructor
public class MqConfig {
/**
* 声明交换机,注册为Bean
* @return
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(MqConstants.ORDER_EXCHANGE, true, false);
}
/**
* 声明秒杀优惠券订单队列,注册为Bean
* @return
*/
@Bean
public Queue seckillQueue() {
return QueueBuilder.durable(MqConstants.SECKILLVOCHER_ORDER_QUEUE)
.lazy()// 设置为lazy模式消息直接存储到磁盘,减少内存占用
.build();
}
/**
* 声明普通优惠券订单队列,注册为Bean
*/
@Bean
public Queue vocherQueue() {
return QueueBuilder.durable(MqConstants.VOCHER_ORDER_QUEUE)
.lazy()
.build();
}
@Bean
public Binding seckillQueueBinding() {
return BindingBuilder.bind(seckillQueue()).to(topicExchange()).with(MqConstants.SECKILLVOCHER_ORDER_KEY);
}
@Bean
public Binding vocherQueueBinding() {
return BindingBuilder.bind(vocherQueue()).to(topicExchange()).with(MqConstants.VOCHER_ORDER_KEY);
}
//失败处理策略和消息转换器的实现:
@Bean
public MessageConverter messageConverter() {
// 1.使用Jackson2JsonMessageConverter注入MessageConverter作为消息转换器
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jjmc.setCreateMessageIds(true);
return jjmc;
}
// 定义错误队列,交换机 和队列 绑定关系
@Bean
public DirectExchange directExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return QueueBuilder.durable("error.queue")
.lazy()
.build();
}
@Bean
public Binding binding(DirectExchange directExchange, Queue errorQueue) {
return BindingBuilder.bind(errorQueue).to(directExchange).with("error");// 关键字RouteKey为error
}
/**
* -RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
* -ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
* -RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
* @param rabbitTemplate
* @return
*/
// 代替原来的失败处理策略
@Bean
public MessageRecoverer messageConverter(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}五、声明订单消息监听者
@Component
@Slf4j
@RequiredArgsConstructor
public class SpringRabbitListener {
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private VoucherOrderServiceMqImpl voucherOrderServiceMq;
/**
* 二、
*监听器监听秒杀订单的懒惰队列
*接收到消息 执行优惠券订单
* @param voucherOrder
* @param message
*/
@RabbitListener(queues = MqConstants.SECKILLVOCHER_ORDER_QUEUE, concurrency = "1-10")
public void receiveMessage(VoucherOrder voucherOrder, Message message) {
log.debug("接收到的消息 ID:{} ", message.getMessageProperties().getMessageId());
log.debug("线程: {} - \n收到优惠券订单消息:{}",Thread.currentThread().getName(), voucherOrder);
voucherOrderServiceMq.handleVoucherOrder(voucherOrder);
}
}六、编写VoucherOrderServiceMqImpl类
@Slf4j
@Service
public class VoucherOrderServiceMqImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderServiceMq {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private RedissonClient redissonClient;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private IVoucherOrderServiceMq proxy;
// Lua脚本
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
/**
* 一、
*先执行Lua脚本,保证redis的原子性 并判断用户的秒杀资格
*将订单写入消息队列 等待异步下单操作
* @param voucherId
* @return
*/
@Override
public Result seckillVocher(Long voucherId) {
// 获取用户
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
// 1.执行Lua脚本保证redis的原子性
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId)
);
int r = result.intValue();
// 2.判断结果是否为0
if (r != 0) {
// 2.1不为0,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// 3.创建优惠券订单并写入消息队列
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
//TODO 使用RabbitMq实现秒杀优惠券的异步下单
rabbitTemplate.convertAndSend(MqConstants.ORDER_EXCHANGE, MqConstants.SECKILLVOCHER_ORDER_KEY, voucherOrder);
// 5.返回订单id
return Result.ok(orderId);
}
/**
* 三、
*监听器执行该方法
*使用Redisson获取分布式锁
*对数据库保存订单的方法加锁(注意:spring的事务是放在threadLocal中,多线程的话,事务会失效;同一个类中调用事务的方法也会失效,要通过代理对象来使事务生效)
* @param voucherOrder
*/
public void handleVoucherOrder(VoucherOrder voucherOrder) {
// 1.获取用户
Long userId = voucherOrder.getUserId();
// 2.创建锁对象 分布式锁
RLock redisLock = redissonClient.getLock("lock:order:" + userId);
// 3.尝试获取锁
boolean isLock = redisLock.tryLock();
// 4.判断是否获得锁成功
if (!isLock) {
// 获取锁失败
log.error("不允许重复下单!");
return;
}
try {
// 获取代理对象
proxy = (IVoucherOrderServiceMq) AopContext.currentProxy();
// 创建订单 更新库存 保存订单到数据库通过代理对象 来保证调用同一个类中的事务方法的事务生效
proxy.createVoucherOrder(voucherOrder);
} finally {
// 释放锁
redisLock.unlock();
}
}
/**
* 四、
*操作数据库扣减库存、保存订单`
* @param voucherOrder
* @return
*/
@Transactional
public Result createVoucherOrder(VoucherOrder voucherOrder) {
// 1.查询订单
Long userId = voucherOrder.getUserId();
Integer count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
// 1.1判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}
//2.扣减库存
// 使用乐观锁解决超卖问题 主要在于,修改的时候判断一下是否有人修改过
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1")// set stock = stock - 1
.eq("voucher_id", voucherOrder.getVoucherId())
.gt("stock", 0)
.update();// where id = ? and stock > 0这样成功率更高
if (!success) {
//扣减库存
return Result.fail("库存不足!");
}
save(voucherOrder);
return Result.ok();
}
}
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页:
[1]