【发布时间】:2020-11-16 07:43:42
【问题描述】:
我正在尝试侦听 redis 流并在消息到达时对其进行处理。我正在使用异步命令,我希望消息被推送而不是被拉取。所以我认为不需要while循环。但是下面的代码好像不行。
public static void main(String[] args) throws InterruptedException {
RedisClient redisClient = RedisClient
.create("redis://localhost:6379/");
StatefulRedisConnection<String, String> connection
= redisClient.connect();
RedisAsyncCommands commands = connection.async();
commands.xgroupCreate(StreamOffset.latest("my-stream"), "G1", new XGroupCreateArgs());
commands
.xreadgroup(Consumer.from("G1", "c1"), StreamOffset.lastConsumed("my-stream"))
.thenAccept(System.out::println);
Thread.currentThread().join();
}
它只打印程序启动时流的任何内容,而不打印程序运行时添加的消息。不是应该为每条新添加到流中的消息调用回调吗?
【问题讨论】:
标签: java spring-data-redis lettuce redis-streams spring-data-redis-reactive