【发布时间】:2019-04-19 09:03:35
【问题描述】:
您好,我正在与akka streams 和akka-stream-kafka 一起工作。我正在使用以下设置设置流:
Source (Kafka) --> | Akka Actor Flow | --> Sink (MongoDB)
Actor Flow 基本上由将处理数据的Actors,下面是层次结构:
System
|
Master Actor
/ \
URLTypeHandler SerializedTypeHandler
/ \ |
Type1Handler Type2Handler SomeOtherHandler
所以Kafka有消息,我写了消费者并在atMostOnceSource配置中运行它并使用
Consumer.Control control =
Consumer.atMostOnceSource(consumerSettings, Subscriptions.topics(TOPIC))
.mapAsyncUnordered(10, record -> processAccessLog(rootHandler, record.value()))
.to(Sink.foreach(it -> System.out.println("FinalReturnedString--> " + it)))
.run(materializer);
我最初使用打印作为接收器,只是为了让流程运行。
而processAccessLog 定义为:
private static CompletionStage<String> processAccessLog(ActorRef handler, byte[] value) {
handler.tell(value, ActorRef.noSender());
return CompletableFuture.completedFuture("");
}
现在,从定义ask 必须在演员期待响应时使用,在这种情况下是有意义的,因为我想返回要写入接收器的值。
但是每个人(包括文档),提到避免ask 而使用tell 和forward,一个惊人的博客写在上面Don't Ask, Tell。
在他提到的博客中,在嵌套actors的情况下,使用tell作为第一条消息,然后使用forward作为消息到达目的地,然后在处理后直接将消息发送回根actor。
现在问题来了,
- 如何将消息从 D 发送回 A,这样我仍然可以使用接收器。
- 开放式流是一种好的做法吗?例如Sink 无关紧要的流,因为演员已经完成了这项工作。 (我认为不建议这样做,似乎有缺陷)。
【问题讨论】:
标签: java akka akka-stream