Redis消息队列详解

Redis消息队列详解

简介

​ 本文的重点在于Stream数据类型实现消息队列,Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能比较完善的消息队列。在Redis 5.0 之前想使用Reids充当消息队列,还有其他两种类型可以使用,分别是List和PubSub,但是这两种类型都有着很大的缺陷,接下来简单介绍一下三种结构如何实现消息队列以及他们的优缺点。

List

​ Redis 1.0发布时就具备了list数据结构,生产者A可以通过lpush写入消息,消费者B通过rpop(或brpop)从队列中读取消息,每个消息只会被读取一次,而且是按照lpush写入的顺序读到。同时Redis的接口是并发安全的,可以同时有多个生产者向一个list中生产消息,多个消费者从list中读取消息。

​ 同时list数据结构是会被持久化到rdb/aof文件的,消息投递可靠性是有些许保障的,详细会在后文中讨论

​ 直接使用单个list是无法实现确认机制(ack机制)的,redis还提供了rpoplpush(或brpoplpush)命令,能够原子的从一个list的右端取数据的同时将该数据再插入到另一个list的左侧。

1
2
3
4
5
6
7
8
9
#基于2个list完成消息消费和确认

#从listA右侧读取消息并写入listB左侧
rpoplpush listA listB
"msg1"

#业务逻辑处理msg1完毕后,从listB中删除msg1,完成消息的确认,listB中遗留的消息就是未确认的消息,可以进行重新消费
lrem listB 1 msg1
(integer) 1

​ Redis LREM 命令用于从列表中删除指定数量的匹配元素。以下是该命令的基本用法:

  • 语法: LREM key count value
    key: 列表的键名。
    count: 要删除的元素个数。如果 count 大于 0,则从表头开始向表尾搜索并删除;如果 count 小于 0,则从表尾开始向表头搜索并删除;如果 count 等于 0,则删除所有匹配的元素。
    value: 要删除的元素值。
    返回值: 返回被删除元素的数量。

​ 由于每个消息只会被读取一次,所以使用list实现消息队列的主要缺点为消息只能被消费一次,缺乏广播机制

PubSub

​ Redis 2.0中引入了一个新的数据结构pubsub,pubsub引入一个概念叫channel,生产者通过publish接口投递消息时会指定channel,消费者通过subscribe接口订阅它关心的channel,当有消息投递到这个channel时Redis服务端会立刻通过该连接将消息推送到消费者。这里一个channel可以被多个应用订阅,消息会同时投递到每个订阅者,做到了消息的广播。并且消费者订阅channel时可以指定通配符,实现订阅一批channel(类似于RabbitMQ的Topic)。

​ pubsub既能单播又能广播,还支持channel的通配符匹配,功能上已经能满足大部分业务的需求,但是在了解pubsub的原理后,我建议你不要使用它

​ pubsub的消息数据是瞬时的,它在Redis服务端不做保存(不会持久化到rdb/aof文件),publish发送到Redis的消息会立刻推送到所有当时subscribe连接的客户端,如果当时客户端因为网络问题断连,那么就会错过这条消息,当客户端重连后,它没法重新获取之前那条消息,甚至无法判断是否有消息丢失。其次,pubsub中消费者获取消息是一个推送模型,Redis会按消息生产的速度给所有的消费者推送消息,不管消费者处理能力如何,如果消费者应用处理能力不足,消息就会在Redis的client buf中堆积,当堆积数据超过一个阈值(可配置)后会断开这条连接,然后就回到上面了(嗨嗨嗨,又丢消息辣)。

​ 基于此,pubsub的案例我也就不给出了,真的别用它。

Stream

​ Redis 作者在开发 Redis 期间,还另外开发了一个开源项目 disque。这个项目的定位,就是一个基于内存的分布式消息队列中间件。但由于种种原因,这个项目一直不温不火。在 Redis 5.0 版本,作者把 disque 功能移植到了 Redis 中,并给它定义了一个新的数据类型:Stream,基于 Stream 可以实现一个功能比较完善的消息队列。

​ 首先Stream的数据会持久化到rdb/aof文件,其次Stream引入了消费者组的概念,每个消费者组维护一个消息位点(last_delivered_id),同一个消费者组之间共享消息位点,对于一个消费者组,每条消息只会被组内的其中一个消费者获取和处理,提供了消息广播的能力。然后stream中消费者采用拉取的方式,消费速率和消费者自身吞吐相匹配。最后stream引入了ack机制保证消息至少被处理一次。

  • 生产者:Stream为每个消息都设置了一个唯一递增的id,通过参数可以让Redis自动生成id或者应用自己指定id,应用可以根据业务逻辑为每个消息生成id,当xadd超时后应用并不能确定消息是否投递成功,可以通过xread查询该id的消息是否存在,存在就说明已经投递成功,不存在则重新投递,而且stream限制了id必须递增,这意味着已经存在的消息重复投递会被拒绝。这套机制保证了每个消息可以仅被投递一次。
  • 消费者:消费者读取消息后业务处理完毕,但还没来得及ack就发生了异常,应用恢复后对于这条没有ack的消息会进行重复消费。需要对业务进行改造,保证消息处理的幂等性,详细会在后文中讨论

​ 发送消息的命令:

image-20241207162915352

​ 例如:

image-20241207162943391

​ 创建消费者组:

1
2
XGROUP CREATE key group <id | $> [MKSTREAM]
[ENTRIESREAD entries-read]
  • key:队列名称
    group:消费者组名称
    ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
    MKSTREAM:队列不存在时自动创建队列

​ 从消费者组读取消息:

1
2
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds]
[NOACK] STREAMS key [key ...] id [id ...]
  • group:消费组名称
  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
  • count:本次查询的最大数量
  • BLOCK milliseconds:当没有消息时最长等待时间
  • NOACK:无需手动ACK,获取到消息后自动确认
  • STREAMS key:指定队列名称
  • ID:获取消息的起始ID

示例代码

​ 初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
public class MyRedisStreamInitialize implements InitializingBean {

@Resource
private StringRedisTemplate stringRedisTemplate;

@Override
public void afterPropertiesSet() throws Exception {
try {
// 创建一个消费者组my_group,当my_stream队列不存在时会自动创建
// 如果队列存在且消费者组也存在,会报错,可以改成分布式锁先判断队列是否存在再创建,我比较懒,直接try-catch了
stringRedisTemplate.opsForStream().createGroup("my_stream", "my_group");
} catch (Throwable ignore) {
}
}
}

​ 生产者

1
2
3
4
5
6
7
8
9
10
@Component
public class MyRedisStreamProducer {

@Resource
private StringRedisTemplate stringRedisTemplate;

public void send(Map<String, String> contentMap) {
stringRedisTemplate.opsForStream().add("my_stream", contentMap);
}
}

​ 配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Configuration
public class MyRedisStreamConfiguration {

@Resource
private RedisConnectionFactory redisConnectionFactory;
@Resource
private MyRedisStreamConsumer myRedisStreamConsumer;

@Bean
public ExecutorService pollRedisStreamMessageExecutor() {
AtomicInteger index = new AtomicInteger();
return new ThreadPoolExecutor(1,
1,
60,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
runnable -> {
Thread thread = new Thread(runnable);
thread.setName("stream_consumer_" + index.incrementAndGet());
thread.setDaemon(true);
return thread;
},
new ThreadPoolExecutor.DiscardOldestPolicy()
);
}

@Bean
public Subscription myRedisStreamConsumerSubscription(ExecutorService pollRedisStreamMessageExecutor) {

StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
// 一次最多获取多少条消息
.batchSize(10)
// 执行从 Stream 拉取消息的线程池
.executor(pollRedisStreamMessageExecutor)
// 如果没有拉取到消息,需要阻塞的时间。不能大于${spring.data.redis.timeout},否则会超时
.pollTimeout(Duration.ofSeconds(3))
.build();
StreamMessageListenerContainer.StreamReadRequest<String> streamReadRequest =
StreamMessageListenerContainer.StreamReadRequest
.builder(StreamOffset.create("my_stream", ReadOffset.lastConsumed()))
.cancelOnError(throwable -> false)
.consumer(Consumer.from("my_group", "my_consumer"))
.autoAcknowledge(true)
.build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
Subscription subscription = listenerContainer.register(streamReadRequest, myRedisStreamConsumer);
listenerContainer.start();
return subscription;
}
}

​ 消费者

1
2
3
4
5
6
7
8
9
10
11
@Component
public class MyRedisStreamConsumer implements StreamListener<String, MapRecord<String, String, String>> {

@Override
public void onMessage(MapRecord<String, String, String> message) {
String stream = message.getStream();
String id = message.getId().toString();
Map<String, String> contentMap = message.getValue();

}
}

问题讨论

List和Stream的数据都会持久化到rdb/aof文件,消息就不会丢失了吗?

​ Redis数据的持久化依赖rdb和aof文件,对于rdb文件来说,可能丢失的数据量较大,想要让消息尽量不丢失,得使用aof,而aof落盘方式有几种,通过配置appendfsync决定,通常不会配置为always来让每条命令执行完后都做一次fsync,线上配置一般为everysec,每秒做一次fsync,这意味着appendfsync不为always时有可能会丢失数据

​ 另一方面线上生产环境的Redis都是高可用架构,当主节点宕机后通常不会走恢复逻辑,而是直接切换到备节点继续提供服务,而Redis的同步方式是异步同步,这意味着主节点上新写入的数据可能还没同步到备节点,在切换后这部分数据就丢失了。所以在故障恢复中Redis中的数据可能会丢失一部分

如何通过Redis保障消息消费的幂等性

​ 对于有确认机制的消息队列,消费者读取消息后业务处理完毕,但还没来得及ack就发生了异常,应用恢复后对于这条没有ack的消息会进行重复消费,所以解决办法是,在处理消息时在Redis中记录消息处理的状态,这需要满足一个前提条件,每个消息需要有唯一的消息id

​ 消息消费幂等工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Component
public class RedisMessageIdempotentHandler {
@Resource
private StringRedisTemplate stringRedisTemplate;

private static final String MESSAGE_IDEMPOTENT_KEY_PREFIX = "message:idempotent:";

/**
* 根据消息id获取Redis的key
* @return Redis的key
*/
private static String getRedisKey(String messageId){
return MESSAGE_IDEMPOTENT_KEY_PREFIX + messageId;
}

/**
* 判断当前消息是否消费过
* @return 消息是否消费过
*/
public boolean isMessageBeingConsumed(String messageId) {
String key = getRedisKey(messageId);
return Boolean.FALSE.equals(
stringRedisTemplate.opsForValue().setIfAbsent(key, "0", 2, TimeUnit.MINUTES)
);
}

/**
* 判断消息消费流程是否执行完成
* @return 消息是否执行完成
*/
public boolean isComplete(String messageId) {
String key = getRedisKey(messageId);
return "1".equals(stringRedisTemplate.opsForValue().get(key));
}

/**
* 设置消息流程执行完成
*/
public void complete(String messageId) {
String key = getRedisKey(messageId);
stringRedisTemplate.opsForValue().set(key, "1", 2, TimeUnit.MINUTES);
}

/**
* 如果消息处理遇到异常情况,删除幂等标识
*/
public void delIdempotentFlag(String messageId) {
String key = getRedisKey(messageId);
stringRedisTemplate.delete(key);
}
}

​ 使用方法示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Resource
private RedisMessageIdempotentHandler idempotentHandler;

public void processMessage(String messageID, String message) {
// 判断当前的这个消息是否在消费中
if (idempotentHandler.isMessageBeingConsumed(messageID)) {
// 判断当前的这个消息流程是否执行完成
if (idempotentHandler.isComplete(messageID)) {
return;
}
throw new RuntimeException("消息正在消费中");
}
try {
// 业务代码
// 消息ack
} catch (Throwable ex) {
// 删除幂等标记,并回滚业务(需要业务支持)
idempotentHandler.delIdempotentFlag(messageID);
throw ex;
}
idempotentHandler.complete(messageID);
}