发布于 

spring cloud stream rocketmq 接入实现普通消息,延时消息,定时消息发送和接收

引入依赖

我这里的版本是0.4.0版本

1
2
3
4
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

添加配置

重点!!!重点!!!重点!!!

配置字符千万不能错,错一点就接收不到消息了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
spring:
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
# RocketMQ Namesrv 地址
name-server: xxxxxxx
# 角色名称
secret-key: xxxxx
# 角色密钥
access-key: xxxxxx
# 全局消费者默认分组
group: base-consumer-group
function:
# 消息消费类的bean名称,和下方bindings.xxx-in-0相对应
definition: testConsumer;testDelayedConsumer;testTimedConsumer
# Binding 配置项,对应 BindingProperties Map
bindings:
# 固定格式 "-out-"
test-out-0:
destination: topic-test
# 固定格式 "{处理类bean名称}-in-"
testConsumer-in-0:
# topic名称
destination: topic-test
# 消费者组 不能重复重复会报已创建
group: test-consumer-group
test-delayed-out-0:
destination: test-delayed
testDelayedConsumer-in-0:
destination: test-delayed
group: test-delayed-consumer-group
test-timed-out-0:
destination: test-timed
testTimedConsumer-in-0:
destination: test-timed
group: test-timed-consumer-group

设置配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import com.alibaba.cloud.stream.binder.rocketmq.convert.RocketMQMessageConverter;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.converter.*;

import java.util.ArrayList;
import java.util.List;


/**
* 消息队列配置类
*
* @author
*/
@AutoConfiguration
public class MQAutoConfiguration {

/**
* 覆盖 {@link RocketMQMessageConverter} 的配置,去掉 fastjson 的转换器,解决不兼容的问题
*/
@Bean(RocketMQMessageConverter.DEFAULT_NAME)
@ConditionalOnMissingBean(name = { RocketMQMessageConverter.DEFAULT_NAME })
public CompositeMessageConverter rocketMQMessageConverter() {
List<MessageConverter> messageConverters = new ArrayList<>();
ByteArrayMessageConverter byteArrayMessageConverter = new ByteArrayMessageConverter();
byteArrayMessageConverter.setContentTypeResolver(null);
messageConverters.add(byteArrayMessageConverter);
messageConverters.add(new StringMessageConverter());
messageConverters.add(new MappingJackson2MessageConverter());
return new CompositeMessageConverter(messageConverters);
}

}

创建消息实体类

实体字段按照自己需求进行封装

1
2
3
4
5
6
7
8
9
10
11
12
13
import lombok.Data;

/**
* @author cjb
* @date 2023/8/2 11:10
* @describe 普通消息实体
*/
@Data
public class TestMessage {

private String message;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
import lombok.Data;

/**
* @author cjb
* @date 2023/8/2 16:22
* @describe 延时消息实体
*/
@Data
public class TestDelayedMessages {

private String message;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14

/**
* @author cjb
* @date 2023/8/2 17:36
* @describe 定时消息实体
*/
@Data
public class TestTimedMessage {

private String message;

private Long time;

}

创建发送者类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;

/**
* @author cjb
* @date 2023/8/2 11:02
* @describe
*/
@Slf4j
@Component
public class TestProducer {

@Resource
private StreamBridge streamBridge;


/**
* 发送普通消息
* @param message 消息内容
*/
public void sendTestMessage(String message) {
TestMessage testMessage = new TestMessage();
testMessage.setMessage(message);
boolean send = streamBridge.send("test-out-0", testMessage);
log.info("sendTestMessage result:{}", send);
}


/**
* 发送延时消息
* @param message 消息内容
* @param delayLevel 延时消息级别 1~18 (1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 【 1=1s 2=5s 3=10s】)
*/
public void sendDelayedMessages(String message,Integer delayLevel) {
if (!(delayLevel > 0 && delayLevel < 19)){
throw new RuntimeException("delayLevel must be between 1 and 18");
}
TestDelayedMessages testDelayedMessages = new TestDelayedMessages();
testDelayedMessages.setMessage(message);
boolean send = streamBridge.send("test-delayed-out-0", MessageBuilder.withPayload(testDelayedMessages)
.setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayLevel)
.build());
log.info("sendDelayedMessages result:{},time:{}", send,System.currentTimeMillis());
}

/**
* 定时消息
* @param message 消息内容
* @param date 指定时间
*/
public void sendTimedMessages(String message, Date date) {
TestTimedMessage testTimedMessage = new TestTimedMessage();
testTimedMessage.setMessage(message);
testTimedMessage.setTime(System.currentTimeMillis());
boolean send = streamBridge.send("test-timed-out-0", MessageBuilder.withPayload(testTimedMessage)
.setHeader("__STARTDELIVERTIME", date.getTime())
.build());
log.info("sendTimedMessages result:{},time:{}", send,System.currentTimeMillis());
}
}

创建消费者类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;

/**
* @author cjb
* @date 2023/8/2 11:10
* @describe 普通测试消息消费
*/
@Component
@Slf4j
public class TestConsumer implements Consumer<TestMessage>{

/**
* 消费方法
* @param testMessage the input argument
*/
@Override
public void accept(TestMessage testMessage) {
log.info("接收到消息,消息内容:{}", JSONUtil.toJsonStr(testMessage));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import java.util.function.Consumer;

/**
* @author cjb
* @date 2023/8/2 16:23
* @describe 延时消息消费者类
*/
@Component
@Slf4j
public class TestDelayedConsumer implements Consumer<Message<TestDelayedMessages>> {


@Override
public void accept(Message<TestDelayedMessages> testDelayedMessagesMessage) {
log.info("收到延时消息体:{},时间戳:{}", JSONUtil.toJsonStr(testDelayedMessagesMessage.getPayload()),System.currentTimeMillis());
log.info("收到延时消息头:{}", JSONUtil.toJsonStr(testDelayedMessagesMessage.getHeaders()));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* @author cjb
* @date 2023/8/2 17:42
* @describe 定时消息消费者类
*/
@Component
@Slf4j
public class TestTimedConsumer implements Consumer<Message<TestTimedMessage>> {


@Override
public void accept(Message<TestTimedMessage> testTimedMessageMessage) {
log.info("收到定时消息体:{},时间戳:{}", JSONUtil.toJsonStr(testTimedMessageMessage.getPayload()),System.currentTimeMillis());
log.info("收到定时消息头:{}", JSONUtil.toJsonStr(testTimedMessageMessage.getHeaders()));
}
}

调用结果

阿里云MQ Group和Topic配置

Topic

Group


本站由 @binvv 使用 Stellar 主题创建。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。