1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.common.message.MessageConst; import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import java.util.Date;
@Slf4j @Component public class TestProducer {
@Resource private StreamBridge streamBridge;
public void sendTestMessage(String message) { TestMessage testMessage = new TestMessage(); testMessage.setMessage(message); boolean send = streamBridge.send("test-out-0", testMessage); log.info("sendTestMessage result:{}", send); }
public void sendDelayedMessages(String message,Integer delayLevel) { if (!(delayLevel > 0 && delayLevel < 19)){ throw new RuntimeException("delayLevel must be between 1 and 18"); } TestDelayedMessages testDelayedMessages = new TestDelayedMessages(); testDelayedMessages.setMessage(message); boolean send = streamBridge.send("test-delayed-out-0", MessageBuilder.withPayload(testDelayedMessages) .setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayLevel) .build()); log.info("sendDelayedMessages result:{},time:{}", send,System.currentTimeMillis()); }
public void sendTimedMessages(String message, Date date) { TestTimedMessage testTimedMessage = new TestTimedMessage(); testTimedMessage.setMessage(message); testTimedMessage.setTime(System.currentTimeMillis()); boolean send = streamBridge.send("test-timed-out-0", MessageBuilder.withPayload(testTimedMessage) .setHeader("__STARTDELIVERTIME", date.getTime()) .build()); log.info("sendTimedMessages result:{},time:{}", send,System.currentTimeMillis()); } }
|