【问题标题】:How to keep redis connection open when reading from reactive API从反应式 API 读取时如何保持 redis 连接打开
【发布时间】:2020-11-25 13:33:53
【问题描述】:

我一直在使用 spring 反应式 api(使用 lettuce 驱动程序)监听 redis 流。我正在使用独立连接。似乎反应器的事件循环每次读取消息时都会打开一个新连接,而不是保持连接打开。当我运行我的程序时,我在我的机器上看到了很多 TIME_WAIT 端口。这是正常的吗?有没有办法让 lettuce 知道重复使用连接而不是每次都重新连接?

这是我的代码:

StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(factory);
return receiver
    .receive(Consumer.from(keyCacheStreamsConfig.getConsumerGroup(), keyCacheStreamsConfig.getConsumer()),
        StreamOffset.create(keyCacheStreamsConfig.getStreamName(), ReadOffset.lastConsumed()))//
    // flatMap reads 256 messages by default and processes them in the given scheduler
    .flatMap(record -> Mono.fromCallable(() -> consumer.consume(record)).subscribeOn(Schedulers.boundedElastic()))//
    .doOnError(t -> {
      log.error("Error processing.", t);
      streamConnections.get(nodeName).setDirty(true);
    })//
    .onErrorContinue((err, elem) -> log.error("Error processing message. Continue listening."))//
    .subscribe();

【问题讨论】:

    标签: redis reactive-programming spring-data-redis lettuce


    【解决方案1】:

    看起来 spring-data-redis 库仅在流接收器选项中将轮询超时设置为“0”并将其作为StreamReceiver.create(factory, options) 中的第二个参数传递时才重新使用连接。通过查看 spring-data-redis 的源代码来计算。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-06-27
      • 2016-01-05
      • 1970-01-01
      • 1970-01-01
      • 2020-07-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多