Redis消息队列详解
简介
本文的重点在于Stream数据类型实现消息队列,Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能比较完善的消息队列。在Redis 5.0 之前想使用Reids充当消息队列,还有其他两种类型可以使用,分别是List和PubSub,但是这两种类型都有着很大的缺陷,接下来简单介绍一下三种结构如何实现消息队列以及他们的优缺点。
List
Redis 1.0发布时就具备了list数据结构,生产者A可以通过lpush写入消息,消费者B通过rpop(或brpop)从队列中读取消息,每个消息只会被读取一次,而且是按照lpush写入的顺序读到。同时Redis的接口是并发安全的,可以同时有多个生产者向一个list中生产消息,多个消费者从list中读取消息。
同时list数据结构是会被持久化到rdb/aof文件的,消息投递可靠性是有些许保障的,详细会在后文中讨论
。
直接使用单个list是无法实现确认机制(ack机制)的,redis还提供了rpoplpush(或brpoplpush)命令,能够原子的从一个list的右端取数据的同时将该数据再插入到另一个list的左侧。
1 2 3 4 5 6 7 8 9
|
rpoplpush listA listB "msg1"
lrem listB 1 msg1 (integer) 1
|
Redis LREM 命令用于从列表中删除指定数量的匹配元素。以下是该命令的基本用法:
- 语法: LREM key count value
key: 列表的键名。
count: 要删除的元素个数。如果 count 大于 0,则从表头开始向表尾搜索并删除;如果 count 小于 0,则从表尾开始向表头搜索并删除;如果 count 等于 0,则删除所有匹配的元素。
value: 要删除的元素值。
返回值: 返回被删除元素的数量。
由于每个消息只会被读取一次,所以使用list实现消息队列的主要缺点为消息只能被消费一次,缺乏广播机制
。
PubSub
Redis 2.0中引入了一个新的数据结构pubsub,pubsub引入一个概念叫channel,生产者通过publish接口投递消息时会指定channel,消费者通过subscribe接口订阅它关心的channel,当有消息投递到这个channel时Redis服务端会立刻通过该连接将消息推送到消费者
。这里一个channel可以被多个应用订阅,消息会同时投递到每个订阅者,做到了消息的广播。并且消费者订阅channel时可以指定通配符,实现订阅一批channel(类似于RabbitMQ的Topic)。
pubsub既能单播又能广播,还支持channel的通配符匹配,功能上已经能满足大部分业务的需求,但是在了解pubsub的原理后,我建议你不要使用它
。
pubsub的消息数据是瞬时的,它在Redis服务端不做保存(不会持久化到rdb/aof文件),publish发送到Redis的消息会立刻推送到所有当时subscribe连接的客户端,如果当时客户端因为网络问题断连,那么就会错过这条消息
,当客户端重连后,它没法重新获取之前那条消息,甚至无法判断是否有消息丢失。其次,pubsub中消费者获取消息是一个推送模型,Redis会按消息生产的速度给所有的消费者推送消息,不管消费者处理能力如何,如果消费者应用处理能力不足,消息就会在Redis的client buf中堆积,当堆积数据超过一个阈值(可配置)后会断开这条连接,然后就回到上面了(嗨嗨嗨,又丢消息辣)。
基于此,pubsub的案例我也就不给出了,真的别用它。
Stream
Redis 作者在开发 Redis 期间,还另外开发了一个开源项目 disque。这个项目的定位,就是一个基于内存的分布式消息队列中间件。但由于种种原因,这个项目一直不温不火。在 Redis 5.0 版本,作者把 disque 功能移植到了 Redis 中,并给它定义了一个新的数据类型:Stream,基于 Stream 可以实现一个功能比较完善的消息队列。
首先Stream的数据会持久化到rdb/aof文件,其次Stream引入了消费者组的概念,每个消费者组维护一个消息位点(last_delivered_id),同一个消费者组之间共享消息位点,对于一个消费者组,每条消息只会被组内的其中一个消费者获取和处理,提供了消息广播的能力。然后stream中消费者采用拉取的方式,消费速率和消费者自身吞吐相匹配。最后stream引入了ack机制保证消息至少被处理一次。
- 生产者:Stream为每个消息都设置了一个唯一递增的id,通过参数可以让Redis自动生成id或者应用自己指定id,应用可以根据业务逻辑为每个消息生成id,当xadd超时后应用并不能确定消息是否投递成功,可以通过xread查询该id的消息是否存在,存在就说明已经投递成功,不存在则重新投递,而且stream限制了id必须递增,这意味着已经存在的消息重复投递会被拒绝。这套机制保证了每个消息可以仅被投递一次。
- 消费者:消费者读取消息后业务处理完毕,但还没来得及ack就发生了异常,应用恢复后对于这条没有ack的消息会进行重复消费。需要对业务进行改造,保证消息处理的幂等性,
详细会在后文中讨论
。
发送消息的命令:

例如:

创建消费者组:
1 2
| XGROUP CREATE key group <id | $> [MKSTREAM] [ENTRIESREAD entries-read]
|
- key:队列名称
group:消费者组名称
ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
MKSTREAM:队列不存在时自动创建队列
从消费者组读取消息:
1 2
| XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
|
- group:消费组名称
- consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
- count:本次查询的最大数量
- BLOCK milliseconds:当没有消息时最长等待时间
- NOACK:无需手动ACK,获取到消息后自动确认
- STREAMS key:指定队列名称
- ID:获取消息的起始ID
示例代码
初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Component public class MyRedisStreamInitialize implements InitializingBean { @Resource private StringRedisTemplate stringRedisTemplate;
@Override public void afterPropertiesSet() throws Exception { try { stringRedisTemplate.opsForStream().createGroup("my_stream", "my_group"); } catch (Throwable ignore) { } } }
|
生产者
1 2 3 4 5 6 7 8 9 10
| @Component public class MyRedisStreamProducer { @Resource private StringRedisTemplate stringRedisTemplate;
public void send(Map<String, String> contentMap) { stringRedisTemplate.opsForStream().add("my_stream", contentMap); } }
|
配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| @Configuration public class MyRedisStreamConfiguration { @Resource private RedisConnectionFactory redisConnectionFactory; @Resource private MyRedisStreamConsumer myRedisStreamConsumer;
@Bean public ExecutorService pollRedisStreamMessageExecutor() { AtomicInteger index = new AtomicInteger(); return new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), runnable -> { Thread thread = new Thread(runnable); thread.setName("stream_consumer_" + index.incrementAndGet()); thread.setDaemon(true); return thread; }, new ThreadPoolExecutor.DiscardOldestPolicy() ); }
@Bean public Subscription myRedisStreamConsumerSubscription(ExecutorService pollRedisStreamMessageExecutor) { StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions .builder() .batchSize(10) .executor(pollRedisStreamMessageExecutor) .pollTimeout(Duration.ofSeconds(3)) .build(); StreamMessageListenerContainer.StreamReadRequest<String> streamReadRequest = StreamMessageListenerContainer.StreamReadRequest .builder(StreamOffset.create("my_stream", ReadOffset.lastConsumed())) .cancelOnError(throwable -> false) .consumer(Consumer.from("my_group", "my_consumer")) .autoAcknowledge(true) .build(); StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options); Subscription subscription = listenerContainer.register(streamReadRequest, myRedisStreamConsumer); listenerContainer.start(); return subscription; } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11
| @Component public class MyRedisStreamConsumer implements StreamListener<String, MapRecord<String, String, String>> {
@Override public void onMessage(MapRecord<String, String, String> message) { String stream = message.getStream(); String id = message.getId().toString(); Map<String, String> contentMap = message.getValue();
} }
|
问题讨论
List和Stream的数据都会持久化到rdb/aof文件,消息就不会丢失了吗?
Redis数据的持久化依赖rdb和aof文件,对于rdb文件来说,可能丢失的数据量较大,想要让消息尽量不丢失,得使用aof,而aof落盘方式有几种,通过配置appendfsync决定,通常不会配置为always来让每条命令执行完后都做一次fsync,线上配置一般为everysec,每秒做一次fsync,这意味着appendfsync不为always时有可能会丢失数据
。
另一方面线上生产环境的Redis都是高可用架构,当主节点宕机后通常不会走恢复逻辑,而是直接切换到备节点继续提供服务,而Redis的同步方式是异步同步,这意味着主节点上新写入的数据可能还没同步到备节点,在切换后这部分数据就丢失了。所以在故障恢复中Redis中的数据可能会丢失一部分
。
如何通过Redis保障消息消费的幂等性
对于有确认机制的消息队列,消费者读取消息后业务处理完毕,但还没来得及ack就发生了异常,应用恢复后对于这条没有ack的消息会进行重复消费,所以解决办法是,在处理消息时在Redis中记录消息处理的状态
,这需要满足一个前提条件,每个消息需要有唯一的消息id。
消息消费幂等工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| @Component public class RedisMessageIdempotentHandler { @Resource private StringRedisTemplate stringRedisTemplate;
private static final String MESSAGE_IDEMPOTENT_KEY_PREFIX = "message:idempotent:";
private static String getRedisKey(String messageId){ return MESSAGE_IDEMPOTENT_KEY_PREFIX + messageId; }
public boolean isMessageBeingConsumed(String messageId) { String key = getRedisKey(messageId); return Boolean.FALSE.equals( stringRedisTemplate.opsForValue().setIfAbsent(key, "0", 2, TimeUnit.MINUTES) ); }
public boolean isComplete(String messageId) { String key = getRedisKey(messageId); return "1".equals(stringRedisTemplate.opsForValue().get(key)); }
public void complete(String messageId) { String key = getRedisKey(messageId); stringRedisTemplate.opsForValue().set(key, "1", 2, TimeUnit.MINUTES); }
public void delIdempotentFlag(String messageId) { String key = getRedisKey(messageId); stringRedisTemplate.delete(key); } }
|
使用方法示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Resource private RedisMessageIdempotentHandler idempotentHandler;
public void processMessage(String messageID, String message) { if (idempotentHandler.isMessageBeingConsumed(messageID)) { if (idempotentHandler.isComplete(messageID)) { return; } throw new RuntimeException("消息正在消费中"); } try { } catch (Throwable ex) { idempotentHandler.delIdempotentFlag(messageID); throw ex; } idempotentHandler.complete(messageID); }
|