【问题标题】:How to continuosly listen on redis stream using lettuce java library如何使用 lettuce java 库持续监听 redis 流
【发布时间】:2020-11-16 07:43:42
【问题描述】:

我正在尝试侦听 redis 流并在消息到达时对其进行处理。我正在使用异步命令,我希望消息被推送而不是被拉取。所以我认为不需要while循环。但是下面的代码好像不行。

public static void main(String[] args) throws InterruptedException {

    RedisClient redisClient = RedisClient
        .create("redis://localhost:6379/");
    StatefulRedisConnection<String, String> connection
        = redisClient.connect();
    RedisAsyncCommands commands = connection.async();
    commands.xgroupCreate(StreamOffset.latest("my-stream"), "G1", new XGroupCreateArgs());
    commands
        .xreadgroup(Consumer.from("G1", "c1"), StreamOffset.lastConsumed("my-stream"))
        .thenAccept(System.out::println);

    Thread.currentThread().join();
}

它只打印程序启动时流的任何内容,而不打印程序运行时添加的消息。不是应该为每条新添加到流中的消息调用回调吗?

【问题讨论】:

    标签: java spring-data-redis lettuce redis-streams spring-data-redis-reactive


    【解决方案1】:

    我认为你应该使用 xgroupCreate 方法来创建消费者和组之间的链接,否则你会得到错误。

    exception in thread "main" java.util.concurrent.ExecutionException: io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'my-stream1' or consumer group 'group1' in XREADGROUP with GROUP option
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        at com.test.TestList.main(TestList.java:57)
    Caused by: io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'my-stream1' or consumer group 'group1' in XREADGROUP with GROUP option
        at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:135)
        at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:108)
        at io.lettuce.core.protocol.AsyncCommand.completeResult(AsyncCommand.java:120)
        at io.lettuce.core.protocol.AsyncCommand.complete(AsyncCommand.java:111)
        at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:654)
        at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:614)
        at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:565)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
        at io.netty.channel.kqueue.AbstractKQueueStreamChannel$KQueueStreamUnsafe.readReady(AbstractKQueueStreamChannel.java:544)
        at io.netty.channel.kqueue.AbstractKQueueChannel$AbstractKQueueUnsafe.readReady(AbstractKQueueChannel.java:381)
        at io.netty.channel.kqueue.KQueueEventLoop.processReady(KQueueEventLoop.java:211)
        at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:289)
        at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
    

    示例代码如下:

    package com.test;
    
    import io.lettuce.core.Consumer;
    import io.lettuce.core.RedisClient;
    import io.lettuce.core.RedisFuture;
    import io.lettuce.core.StreamMessage;
    import io.lettuce.core.XGroupCreateArgs;
    import io.lettuce.core.XReadArgs.StreamOffset;
    import io.lettuce.core.api.StatefulRedisConnection;
    import io.lettuce.core.api.async.RedisAsyncCommands;
    
    import java.util.List;
    public class TestList {
        public static void main(String[] args) throws Exception {
            RedisClient redisClient = RedisClient.create("redis://localhost:6379/");
            StatefulRedisConnection<String, String> connection = redisClient.connect();
            RedisAsyncCommands commands = connection.async();
            RedisFuture<String> redisFuture = commands.xadd("my-stream1", "test", "1234");
            String redisFutureGet = redisFuture.get();
            System.out.println(redisFutureGet);
            commands.xgroupCreate(StreamOffset.latest("my-stream1"), "group1", new XGroupCreateArgs()); // add a group pointing to the stream
            RedisFuture<List<StreamMessage<String, String>>> messages = commands.xreadgroup(Consumer.from("group1", "my-stream1"),
                    StreamOffset.lastConsumed("my-stream1"));
            List<StreamMessage<String, String>> res = messages.get();
            System.out.println(res);
        }
    }
    

    【讨论】:

    • 我已经使用 redis-cli 创建了组。但我的问题与此无关。我不想只阅读流中的任何内容并立即退出(就像您的代码在这里)。我想保持程序运行并在有人将数据添加到流中时继续使用消息。有点像排队。但不使用无限循环(那种比异步推送模型的目的)
    • 我认为当有人将数据添加到流中时,您不能使用 RedisFuture 来消费消息。您可以尝试 react 命令。
    • 我实际上有同样的问题,似乎不可能以异步方式从 redis 流中持续消费。生菜文档对此并不十分清楚。有点失望
    【解决方案2】:

    我认为 Lettuce 只是用于与 Redis 通信的响应,无论是同步,异步还是流方式。它是一个低级库。所以如果你想要这样的高级功能,使用 spinrg-data 这样的东西:

    StreamListener<String, MapRecord<String, String, String>> streamListener = new ExampleStreamListener();
    
       StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                    .builder().pollTimeout(Duration.ofMillis(100)).build();
    
    StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(connectionFactory,
                    containerOptions);
    Subscription subscription = container.receive(StreamOffset.fromStart("key2"), streamListener);
    container.start();
    //----------------------------------------------------------------
    
    public class ExampleStreamListener implements StreamListener<String, MapRecord<String, String, String>> {
    
        @Override
        public void onMessage(MapRecord<String, String, String> message) {
    
            System.out.println("MessageId: " + message.getId());
            System.out.println("Stream: " + message.getStream());
            System.out.println("Body: " + message.getValue());
        }
    }

    【讨论】:

      【解决方案3】:

      您可以使用 Redis 响应式命令来实现此目的:

      RedisReactiveCommands<String, String> commands = connection.reactive();
      commands.xgroupCreate(StreamOffset.latest("my-stream"), "G1", new XGroupCreateArgs());
      commands
          .xreadgroup(Consumer.from("G1", "c1"), StreamOffset.lastConsumed("my-stream"))
          .subscribe(System.out::println, Throwable::printStackTrace);
      

      【讨论】:

      • 这不会让线程继续运行......它会立即退出,所以它不是答案
      猜你喜欢
      • 1970-01-01
      • 2012-03-20
      • 2019-01-22
      • 2020-01-21
      • 2014-10-30
      • 1970-01-01
      • 2023-04-03
      • 1970-01-01
      • 2019-10-21
      相关资源
      最近更新 更多