1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
| import io.lettuce.core.RedisException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.redis.RedisSystemException; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.stream.*; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.stream.StreamListener; import org.springframework.data.redis.stream.StreamMessageListenerContainer; import org.springframework.data.redis.stream.Subscription;
import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Objects;
@Slf4j @RequiredArgsConstructor @Configuration public class Config {
private final StringRedisTemplate redisTemplate;
private final StreamListener<String, ObjectRecord<String, String>> streamListener;
@Bean public Subscription subscription(RedisConnectionFactory factory) { checkGroup(); StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options = StreamMessageListenerContainer .StreamMessageListenerContainerOptions .builder() .pollTimeout(Duration.ofSeconds(1)) .targetType(String.class) .build(); StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer = StreamMessageListenerContainer.create(factory, options); Subscription subscription = listenerContainer.receive( Consumer.from("group", "consumer-1"), StreamOffset.create("key", ReadOffset.lastConsumed()), streamListener); listenerContainer.start(); return subscription; }
private void checkGroup() { List<String> consumers = new ArrayList<>(); consumers.add("group"); StreamInfo.XInfoGroups infoGroups = null; try { infoGroups = redisTemplate.opsForStream().groups(BaseConstant.SEND_MESSAGE_QUEUE_KEY); } catch (RedisSystemException | RedisException | InvalidDataAccessApiUsageException ex) { log.error("group key not exist or commend error", ex); }
for (String consumer : consumers) { boolean consumerExist = false; if (Objects.nonNull(infoGroups)) { if (infoGroups.stream().anyMatch(t -> Objects.equals(consumer, t.groupName()))) { consumerExist = true; } } if (!consumerExist) { redisTemplate.opsForStream().createGroup("key", consumer); } }
}
}
|