admin 管理员组

文章数量: 887021

实战篇

优惠券秒杀

全局唯一ID

当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题:

  • id的规律性太明显
  • 受单表数据量限制

全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足以下列特性:

  • 唯一性
  • 高可用
  • 高性能
  • 递增性
  • 安全性

ID的组成部分:

  • 符号位:1bit,永远为0
  • 时间戳:31bit,以秒为单位,可以使用69年
  • 序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID

工具类编写代码实现

package com.hmdp.utils;import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;/*** @author xc* @date 2023/4/26 14:59*/
@Component
public class RedisIdWorker {/*** 开始时间戳*/private static final long BEGIN_TIMESTAMP = 1640995200L;/*** 左移位数,防止以后需要修改*/private static final int COUNT_BITS = 32;private StringRedisTemplate stringRedisTemplate;public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}public long nextId(String keyPrefix) {// 1.生成时间戳LocalDateTime now = LocalDateTime.now();long end = now.toEpochSecond(ZoneOffset.UTC);long timestamp = end - BEGIN_TIMESTAMP;// 2.生成序列号// 2.1获取当前日期,精确到天String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));Long increment = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);// 3.拼接返回return timestamp << COUNT_BITS | increment;}
}

实现优惠券秒杀下单

下单时需要判断两点:

  • 秒杀是否开始或者结束,如果尚未开始或已经结束则无法下单
  • 库存是否充足

流程:

根据流程实现具体业务

    @Resourceprivate ISeckillVoucherService iSeckillVoucherService;@Resourceprivate IVoucherService iVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Override@Transactionalpublic Result seckillVoucher(Long voucherId) {if (voucherId == null || voucherId < 0) {return Result.fail("请求id错误");}Voucher voucher = iVoucherService.getById(voucherId);if (voucher == null) {return Result.fail("当前优惠券不存在");}SeckillVoucher seckillVoucher = iSeckillVoucherService.getById(voucherId);if (seckillVoucher == null) {return Result.fail("秒杀优惠券不存在");}LocalDateTime beginTime = seckillVoucher.getBeginTime();if (LocalDateTime.now().isBefore(beginTime)) {return Result.fail("秒杀未开始");}LocalDateTime endTime = seckillVoucher.getEndTime();if (LocalDateTime.now().isAfter(endTime)) {return Result.fail("秒杀已结束");}int leftStock = seckillVoucher.getStock();if (leftStock <= 0) {return Result.fail("优惠券已被抢空");}boolean update = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).update();if (!update) {return Result.fail("服务器内部错误");}VoucherOrder voucherOrder = new VoucherOrder();long orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(UserHolder.getUser().getId());voucherOrder.setVoucherId(voucherId);voucherOrder.setPayType(voucher.getType());boolean save = this.save(voucherOrder);if (!save) {return Result.fail("服务器内部错误");}return Result.ok(orderId);}

超卖问题

乐观锁

  • 版本号法

使用CAS方法:

        int leftStock = seckillVoucher.getStock();if (leftStock <= 0) {return Result.fail("优惠券已被抢空");}boolean update = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId)// 更新的时候判断当前剩余库存量是否跟开始查询的时候相等.eq("stock",leftStock).update();

弊端:

成功率很低

库存改为大于0

        int leftStock = seckillVoucher.getStock();if (leftStock <= 0) {return Result.fail("优惠券已被抢空");}boolean update = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId)// 更新的时候判断当前剩余库存量是否大于0.gt("stock",0).update();

一人一单

在抢购前判断数据库是否存在已经的订单

// 查询秒杀优惠券,该用户是否已经抢到int count = query().eq("user_id", userId).eq("voucher_id",voucherId).count();if (count > 0) {return Result.fail("您已经抢过了");}boolean update = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).eq("stock",leftStock).update();if (!update) {return Result.fail("服务器内部错误");}

问题:

在多线程上会出现都走到代码第6行,然后再一起执行更新操作,就会出现一人多单情况

解决:

对操作进行加锁

    @Overridepublic Result seckillVoucher(Long voucherId) {if (voucherId == null || voucherId < 0) {return Result.fail("请求id错误");}Voucher voucher = iVoucherService.getById(voucherId);if (voucher == null) {return Result.fail("当前优惠券不存在");}SeckillVoucher seckillVoucher = iSeckillVoucherService.getById(voucherId);if (seckillVoucher == null) {return Result.fail("秒杀优惠券不存在");}LocalDateTime beginTime = seckillVoucher.getBeginTime();if (LocalDateTime.now().isBefore(beginTime)) {return Result.fail("秒杀未开始");}LocalDateTime endTime = seckillVoucher.getEndTime();if (LocalDateTime.now().isAfter(endTime)) {return Result.fail("秒杀已结束");}int leftStock = seckillVoucher.getStock();if (leftStock <= 0) {return Result.fail("优惠券已被抢空");}return getResult(voucherId, voucher, leftStock);}@Transactionalpublic Result getResult(Long voucherId, Voucher voucher, int leftStock) {Long userId = UserHolder.getUser().getId();// 查询秒杀优惠券,该用户是否已经抢到long orderId;// 对相同用户并发请求加锁    new// String类型也可能出现不同对象     new // intern()的作用是返回字符串常量池中对象的地址   new synchronized (userId.toString().intern()) { int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();if (count > 0) {return Result.fail("您已经抢过了");}boolean update = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).eq("stock", leftStock).update();if (!update) {return Result.fail("服务器内部错误");}VoucherOrder voucherOrder = new VoucherOrder();orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(userId);voucherOrder.setVoucherId(voucherId);voucherOrder.setPayType(voucher.getType());boolean save = this.save(voucherOrder);if (!save) {return Result.fail("服务器内部错误");}}return Result.ok(orderId);}
}

此时还会出现一个问题:

  • 当一个线程释放锁后,但是事务还没提交,那么还是会出现一人多单的情况,所以需要对整个方法调用进行加锁
@Overridepublic Result seckillVoucher(Long voucherId) {if (voucherId == null || voucherId < 0) {return Result.fail("请求id错误");}Voucher voucher = iVoucherService.getById(voucherId);if (voucher == null) {return Result.fail("当前优惠券不存在");}SeckillVoucher seckillVoucher = iSeckillVoucherService.getById(voucherId);if (seckillVoucher == null) {return Result.fail("秒杀优惠券不存在");}LocalDateTime beginTime = seckillVoucher.getBeginTime();if (LocalDateTime.now().isBefore(beginTime)) {return Result.fail("秒杀未开始");}LocalDateTime endTime = seckillVoucher.getEndTime();if (LocalDateTime.now().isAfter(endTime)) {return Result.fail("秒杀已结束");}int leftStock = seckillVoucher.getStock();if (leftStock <= 0) {return Result.fail("优惠券已被抢空");}Long userId = UserHolder.getUser().getId();// new  synchronized (userId.toString().intern()) {return getResult(voucherId, voucher, leftStock);}}@Transactionalpublic Result getResult(Long voucherId, Voucher voucher, int leftStock) {Long userId = UserHolder.getUser().getId();// 查询秒杀优惠券,该用户是否已经抢到long orderId;// 对相同用户并发请求加锁// String类型也可能出现不同对象// intern()的作用是返回字符串常量池中对象的地址int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();if (count > 0) {return Result.fail("您已经抢过了");}boolean update = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).eq("stock", leftStock).update();if (!update) {return Result.fail("服务器内部错误");}VoucherOrder voucherOrder = new VoucherOrder();orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(userId);voucherOrder.setVoucherId(voucherId);voucherOrder.setPayType(voucher.getType());boolean save = this.save(voucherOrder);if (!save) {return Result.fail("服务器内部错误");}return Result.ok(orderId);}

对于spring事务管理熟悉的话,在seckillVoucher方法中调用有事务的getResult这个方法,会出现事务失效。因为相当于this.getResult,用的不是代理类。

解决方法:

@Overridepublic Result seckillVoucher(Long voucherId) {if (voucherId == null || voucherId < 0) {return Result.fail("请求id错误");}Voucher voucher = iVoucherService.getById(voucherId);if (voucher == null) {return Result.fail("当前优惠券不存在");}SeckillVoucher seckillVoucher = iSeckillVoucherService.getById(voucherId);if (seckillVoucher == null) {return Result.fail("秒杀优惠券不存在");}LocalDateTime beginTime = seckillVoucher.getBeginTime();if (LocalDateTime.now().isBefore(beginTime)) {return Result.fail("秒杀未开始");}LocalDateTime endTime = seckillVoucher.getEndTime();if (LocalDateTime.now().isAfter(endTime)) {return Result.fail("秒杀已结束");}int leftStock = seckillVoucher.getStock();if (leftStock <= 0) {return Result.fail("优惠券已被抢空");}Long userId = UserHolder.getUser().getId();synchronized (userId.toString().intern()) {// 获取当前对象的代理对象   newIVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.getResult(voucherId, voucher, leftStock);}}/*** xc* @param voucherId* @param voucher* @param leftStock* @return*/@Override@Transactionalpublic Result getResult(Long voucherId, Voucher voucher, int leftStock) {Long userId = UserHolder.getUser().getId();// 查询秒杀优惠券,该用户是否已经抢到long orderId;// 对相同用户并发请求加锁// String类型也可能出现不同对象// intern()的作用是返回字符串常量池中对象的地址int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();if (count > 0) {return Result.fail("您已经抢过了");}boolean update = iSeckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).eq("stock", leftStock).update();if (!update) {return Result.fail("服务器内部错误");}VoucherOrder voucherOrder = new VoucherOrder();orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(userId);voucherOrder.setVoucherId(voucherId);voucherOrder.setPayType(voucher.getType());boolean save = this.save(voucherOrder);if (!save) {return Result.fail("服务器内部错误");}return Result.ok(orderId);}

需要导入aspectj的依赖

        <dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId></dependency>

然后在主启动类上暴露代理对象

@MapperScan("com.hmdp.mapper")
@SpringBootApplication
@EnableTransactionManagement
@EnableAspectJAutoProxy(exposeProxy = true)
public class HmDianPingApplication {public static void main(String[] args) {SpringApplication.run(HmDianPingApplication.class, args);}}

分布式锁

通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了

模拟集群效果:

怎么添加idea的Serivces

在Service中添加SpringBoot项目,通过不同的端口启动

# 在VM options中添加此段代码,以指定端口启动
-Dserver.port=8082

前端nginx通过负载均衡访问后端接口

在集群模式下:

有多个JVM实例的存在,所以又会出现超卖问题

使用分布式锁解决

流程:

基于Redis实现分布式锁初级版本:

需求:定义一个类,实现下面接口,利用Redis实现分布式锁功能。

public interface Ilock {/*** 尝试获取锁* @param timeoutSec 过期时间* @return 获取锁是否成功*/boolean tryLock(long timeoutSec);void unlock();
}

简单锁实现类

package com.hmdp.utils;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.TimeUnit;/*** @author: xc* @date: 2023/4/27 20:46*/public class SimpleRedisLock implements ILock {/*** 锁的统一前缀*/private static final String KEY_PREFIX = "lock:";/*** 锁的名称*/private String name;// 因为不是spring管理的bean所以需要构造方法private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean tryLock(long timeoutSec) {// 当前线程idLong threadId = Thread.currentThread().getId();// setIfAbsent:如果不存在就设置Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId+"", timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}@Overridepublic void unlock() {// 删除keystringRedisTemplate.delete(KEY_PREFIX + name);}
}

实现初级redis分布式锁版本

    	Long userId = UserHolder.getUser().getId();// 因为只对同一个用户加锁,所以用 order:+userId 作为锁的keySimpleRedisLock lock = new SimpleRedisLock("order:"+userId, stringRedisTemplate);// 获取锁boolean tryLock = lock.tryLock(LOCK_TIMEOUT);if (!tryLock) {// 获取锁失败return Result.fail("不允许重复下单");}try {IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.getResult(voucherId, voucher, leftStock);} finally {// 拿到锁的释放锁lock.unlock();}

会出现的问题:

会释放别人的锁

解决方案:释放锁的时候先看一下是不是自己的锁

流程:

改进Redis的分布式锁:

  • 在获取锁时存入线程表示(可以用UUID表示)

  • 在释放锁时获取线程ID,判断是否与当前线程标示一致

    • 如果一致则释放锁
    • 如果不一致则不释放锁

修改获取锁和释放锁的逻辑

package com.hmdp.utils;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.TimeUnit;
import java.util.UUID;/*** @author: xc* @date: 2023/4/27 20:46*/public class SimpleRedisLock implements ILock {/*** 锁的统一前缀*/private static final String KEY_PREFIX = "lock:";/*** 随机生成线程uuid的前缀*/private static final String ID_PREFIX = UUID.randomUUID() +"-";/*** 锁的名称*/private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean tryLock(long timeoutSec) {long threadId = Thread.currentThread().getId();// 标识位 ID_PREFIX+threadIdBoolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, ID_PREFIX+threadId, timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}@Overridepublic void unlock() {long threadId = Thread.currentThread().getId();// 判断标识是否一致if ((ID_PREFIX+threadId).equals(stringRedisTemplate.opsForValue().get(KEY_PREFIX + name))) {stringRedisTemplate.delete(KEY_PREFIX + name);}}
}

出现问题:

  • 线程1如果判断完是自己的锁后,出现gc阻塞线程知道锁过期,此时线程2过来获取到锁执行自己的业务,然后线程1又阻塞完毕回到删除锁,就会将线程2的锁删除。然而又有线程3来过来获取锁没获取到,就会出现线程2和线程3同时执行代码。

解决办法: 保证判断锁和释放锁的原子性

使用Redis的Lua脚本:

关于redis的基本语法

执行Lua脚本

再次改进Redis的分布式锁

总结

基于Redis的分布式锁实现思路:

  • 利用set nx ex获取锁,并设置过期时间,保存线程标示
  • 释放锁时先判断线程标示是否与自己一致,一致则删除锁

特性:

  • 利用set nx满足互斥性
  • 利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
  • 利用Redis集群保证高可用和高并发特性

基于Redis的分布式锁的优化:

基于setnx实现的分布式锁存在下面的问题:

Redisson入门

引入依赖

        <dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version></dependency>

配置Redisson客户端

/*** @author xc* @date 2023/4/28 9:16*/
@Configuration
public class RedissonConfig {public RedissonClient redissonClient() {// 配置Config config = new Config();config.setTransportMode(TransportMode.EPOLL);config.useSingleServer().setAddress("redis://127.0.0.1:6379");return Redisson.create(config);}}

使用Redisson的分布式锁

Redisson可重入锁的原理

流程图:

获取锁Lua脚本:

释放锁Lua脚本:

Redisson底层源码讲解(P66、P67)

Redisson分布式锁原理:

  • 可重入:利用hash结构记录线程id和重入次数
  • 可重试:利用信号量和PubSub功能实现等待、唤醒、获取锁失败的重试机制
  • 超时续约:利用watchDog,每个一段时间(releaseTime/3),重置超时时间

解决主从一致(P68)

Redis优化秒杀

改进秒杀业务,提高并发性能

需求:

  • 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
	// 引入redis@Resourceprivate StringRedisTemplate stringRedisTemplate;// 在保存秒杀优惠券的时候,也将优惠券的id和库存保存到redis中stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY+voucher.getId(),voucher.getStock().toString());
  • 基于Lua脚本,判断秒杀库存、一人一单、决定用户是否抢购成功

lua脚本

---
--- Generated by Luanalysis
--- Created by xc.
--- DateTime: 2023/4/28 11:48
---
local voucherId = ARGV[1]
local userId = ARGV[2]local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. voucherIdif(tonumber(redis.call('get',stockKey)) <= 0) thenreturn 1
endif(redis.call('sismember',orderKey,userId) == 1) thenreturn 2
end
redis.call('incrby',stockKey,-1)
redis.call('sadd',orderKey,userId)
return 0

java代码

private static final DefaultRedisScript<Long> SECKILL_SCIPT;static {SECKILL_SCIPT = new DefaultRedisScript<>();ClassPathResource pathResource = new ClassPathResource("seckill.lua");SECKILL_SCIPT.setLocation(pathResource);SECKILL_SCIPT.setResultType(Long.class);}@Overridepublic Result seckillVoucher(Long voucherId) {// 1.执行lua脚本Long userId = UserHolder.getUser().getId();long execute = stringRedisTemplate.execute(SECKILL_SCIPT,Collections.EMPTY_LIST,voucherId.toString(), userId.toString());// 2.判断结果是否为0if (execute != 0) {// 2.1 不为0,代表没有购买资格// 为1时库存不足,2时重复下单return Result.fail(execute == 1 ? "库存不足" : "重复下单");}// 2.2 为0 ,有购买资格,把下单信息保存到阻塞队列long orderId = redisIdWorker.nextId("order");
//        new ArrayBlockingQueue<>()// 3.返回订单idreturn Result.ok(orderId);}
  • 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
    // 阻塞队列private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);@Overridepublic Result seckillVoucher(Long voucherId) {// 1.执行lua脚本Long userId = UserHolder.getUser().getId();long execute = stringRedisTemplate.execute(SECKILL_SCIPT,Collections.EMPTY_LIST,voucherId.toString(), userId.toString());// 2.判断结果是否为0if (execute != 0) {// 2.1 不为0,代表没有购买资格// 为1时库存不足,2时重复下单return Result.fail(execute == 1 ? "库存不足" : "重复下单");}VoucherOrder voucherOrder = new VoucherOrder();// 2.2 为0 ,有购买资格,把下单信息保存到阻塞队列long orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(userId);voucherOrder.setVoucherId(voucherId);// 加入到阻塞队列 orderTasks.add(voucherOrder);proxy = (IVoucherOrderService) AopContext.currentProxy();// 3.返回订单idreturn Result.ok(orderId);}
  • 开启线程任务,不断从阻塞队列中回去信息,实现异步下单功能
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();/*** 在类初始化完后会执行该方法*/@PostConstructprivate void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable{@Overridepublic void run() {while (true){// 获取队列中的队列信息try {VoucherOrder voucherOrder = orderTasks.take();handleVoucherOrder(voucherOrder);} catch (Exception e) {log.error("处理订单异常",e);}}}private void handleVoucherOrder(VoucherOrder voucherOrder) {// 因为是全新开启一个线程,所以需要在订单中拿到用户idLong userId = voucherOrder.getUserId();// 因为只对同一个用户加锁,所以用 order:+userId 作为锁的keyRLock lock = redissonClient.getLock("lock:order:" + userId);boolean tryLock = lock.tryLock();if (!tryLock) {// 获取锁失败log.error("不允许重复下单");return;}try {proxy.getResult(voucherOrder);} finally {// 拿到锁的释放锁lock.unlock();}}}

总结

秒杀业务的优化思路是什么?

  • 先利用Redis完成库存余量、一人一单判断,完成抢单业务
  • 再将下单业务放入阻塞队列,利用独立线程异步下单

基于阻塞队列的异步秒杀存在哪些问题?

  • 内存限制问题
  • 数据安全问题

Redis消息队列实现异步秒杀

消息队列,字面意思就是存放消息队列。最简单的消息队列模型包括3个角色

  • 消息队列:存储和管理消息,也被称为消息代理
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息

基于List结构模拟消息队列

基于List的消息队列由哪些优缺点?

优点:

  • 利用Redis存储,不受限于JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息有序性

缺点:

  • 无法避免消息丢失
  • 只支持单消费者

基于PubSub的消息队列

基于PubSub的消息队列由哪些优缺点?

优点:

  • 采用发布订阅模型,支持多生产、多消费

缺点:

  • 不支持数据持久化
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失

基于Stream的消息队列

基于STREAM的消息队列由哪些优缺点?

优点:

  • 消息可回溯
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取

缺点:

  • 有消息漏读的风险

基于Stream的消息队列-消费者组

创建消费者组:

XGROUP CREATE key groupName ID [MKSTREAM]
  • key:队列名称
  • groupName:消费者组名称
  • ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
  • MKSTREAM:队列不存在时自动创建队列

其它常见命令:

# 删除指定的消费者组
XGROUP DESTORY key groupName# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername

Stream类型消息队列的XREADGROUP命令特点:

  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读风险
  • 有消息确认机制,保证消息至少被消费一次

本文标签: 实战篇