发布于 

基于spring boot加redis Stream实现一个消息队列

消息队列添加消息和消费确认以及删除消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* 消息队列添加消息
* @param message 队列存储消息
* @param queueKey 队列
*/
public void addMessageBlockingQueue(String message,String queueKey){
Record<String,String> record = StreamRecords.objectBacked(message).withStreamKey(queueKey);
redisTemplate.opsForStream().add(record);
}

/**
* 消息队列消费确认
* @param queueKey 消息队列key
* @param group 分组名称
* @param recordId 消息id
* @return 成功或者失败
*/
public boolean messageQueueConsumptionAck(String queueKey,String group,RecordId recordId){
try {
Long result = redisTemplate.opsForStream().acknowledge(queueKey, group, recordId);
if (SUCCESS.equals(result)) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
return false;
}

/**
* 消息队列删除消息
* @param queueKey 消息队列key
* @param recordId 消息id
* @return
*/
public boolean messageQueueConsumptionDelField(String queueKey, RecordId recordId){
try {
Long result = redisTemplate.opsForStream().delete(queueKey, recordId);
if (SUCCESS.equals(result)) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
return false;
}

创建消费者监听处理类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component()
public class RedisStreamListener implements StreamListener<String, ObjectRecord<String,String>> {

@Override
public void onMessage(ObjectRecord<String, String> message) {
log.info(message.toString());
// 消息消费ack确认
redisService.messageQueueConsumptionAck("key", "group", "recordId");
// 消费完成消息直接删除
redisService.messageQueueConsumptionDelField("key", "recordId");
}
}

创建消费者监听类的订阅配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import io.lettuce.core.RedisException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

@Slf4j
@RequiredArgsConstructor
@Configuration
public class Config {

private final StringRedisTemplate redisTemplate;

private final StreamListener<String, ObjectRecord<String, String>> streamListener;

@Bean
public Subscription subscription(RedisConnectionFactory factory) {
checkGroup();
// 创建Stream消息监听容器配置
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
// 设置阻塞时间
.pollTimeout(Duration.ofSeconds(1))
// 配置消息类型
.targetType(String.class)
.build();
// 创建Stream消息监听容器
StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer = StreamMessageListenerContainer.create(factory, options);
// 设置消费手动提交配置
Subscription subscription = listenerContainer.receive(
// 设置消费者分组和名称
Consumer.from("group", "consumer-1"),
// 设置订阅Stream的key和获取偏移量,以及消费处理类
StreamOffset.create("key", ReadOffset.lastConsumed()),
streamListener);
// 监听容器启动
listenerContainer.start();
return subscription;
}

/**
* 由于订阅需要先有stream,先做下检查
*/
private void checkGroup() {
// 创建需要校验的分组List
List<String> consumers = new ArrayList<>();
consumers.add("group");
StreamInfo.XInfoGroups infoGroups = null;
try {
// 获取Stream的所有组信息
infoGroups = redisTemplate.opsForStream().groups(BaseConstant.SEND_MESSAGE_QUEUE_KEY);
} catch (RedisSystemException | RedisException | InvalidDataAccessApiUsageException ex) {
log.error("group key not exist or commend error", ex);
}

// 遍历校验分组是否存在
for (String consumer : consumers) {
boolean consumerExist = false;
if (Objects.nonNull(infoGroups)) {
if (infoGroups.stream().anyMatch(t -> Objects.equals(consumer, t.groupName()))) {
consumerExist = true;
}
}
// 创建不存在的分组
if (!consumerExist) {
redisTemplate.opsForStream().createGroup("key", consumer);
}
}

}

}

本站由 @binvv 使用 Stellar 主题创建。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。