【问题标题】:How to access the computation result of an Akka Stream?如何访问 Akka Stream 的计算结果?
【发布时间】:2021-04-12 14:03:10
【问题描述】:

我正在尝试返回流操作的结果,在这种情况下是:

  1. 总结一个列表
  2. 平方值
  3. 平方值

表示为:

        .fold(0, (aggr, next) -> aggr + next)
        .map(x -> x * x)
        .map(x -> x * x)

访问我使用的值

final AtomicInteger returnValue = new AtomicInteger();

后跟:

        .to(Sink.foreach(x -> {
            returnValue.set(x);
            System.out.println("got: " + x);
        }))

这需要阻塞调用以允许流完成,这是不可接受的:

Thread.sleep(2000);

如果我使用:

    CompletableFuture<Object> futureValue =
            ask(actorRef, Done.done(), Duration.ofMillis(5000)).toCompletableFuture();
    System.out.println(futureValue.toCompletableFuture().get().toString());

返回错误:

Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://StreamsExamples/system/Materializers/StreamSupervisor-0/$$a-actorRefSource#1663100910]] after [5000 ms]. Message of type [akka.Done$]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.

在这种情况下,接收参与者是 Source,并在 Done.done 消息中返回以下内容:

return Optional.of(CompletionStrategy.immediately());

可以使用 Akka 流从流中返回计算值吗?唯一的选择是将计算的值存储在数据库中,或者在计算值时将其发送到 Kafka 主题:

.to(Sink.foreach(x -> {

?

完整的源代码:

import akka.Done;
import akka.actor.ActorRef;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

public class GetStreamValue {

    final static akka.actor.ActorSystem system = akka.actor.ActorSystem.create("StreamsExamples");

    public static void main(String args[]) throws InterruptedException, ExecutionException {


        int bufferSize = 100;
        final Source<Integer, ActorRef> source =
                Source.actorRef(
                        elem -> {
                            // complete stream immediately if we send it Done
                            if (elem == Done.done()) {
                                return Optional.of(CompletionStrategy.immediately());
                            }
                            else {
                                return Optional.empty();
                            }
                        },
                        // never fail the stream because of a message
                        elem -> Optional.empty(),
                        bufferSize,
                        OverflowStrategy.dropHead());

        final AtomicInteger returnValue = new AtomicInteger();

        final ActorRef actorRef = source
                .fold(0, (aggr, next) -> aggr + next)
                .map(x -> x * x)
                .map(x -> x * x)
                .to(Sink.foreach(x -> {
                    returnValue.set(x);
                    System.out.println("got: " + x);
                }))
                .run(system);

        Arrays.asList(1, 2, 3).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
        Arrays.asList(1,2).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
        actorRef.tell(Done.done(), ActorRef.noSender());

        Thread.sleep(2000);

        System.out.println("returnValue is "+returnValue);

    }
}

【问题讨论】:

    标签: java akka akka-stream


    【解决方案1】:

    我认为您可能缺少的是了解 Akka Streams 中的物化价值的概念。浏览文档的this part,尤其是在结合具体化值方面。我也尝试过解释这个概念here(搜索物化值)。如果你理解物化价值,那么我在这里写的也许会更有意义。

    Source.actorRef(..) 的调用返回Source&lt;T, ActorRef&gt;,其中T 是流经流的元素的数据类型(在您的情况下为Integer),ActorRef 是其物化值 Source。当您在 RunnableGraph 上调用 run 时,您会同步获取具体化值,这是 to(...) 调用返回的内容。

    ActorRef 是您可以按照Source.actorRef(...) 语义“驱动”流的方式。

    现在的问题是如何获得通过流的数据。在您的情况下,您将所有Integers 减少为一个,因此您可以使用Sink.head,而不是使用有利于副作用的Sink.foreach(...)。你看,Sinks 也可以产生物化值,如果是Sink.head,它会具体化为流中第一个元素的CompletionStage,在你的情况下这是唯一的元素。所以让我们试试吧:

    final ActorRef actorRef = source
                                    .fold(0, (aggr, next) -> aggr + next)
                                    .map(x -> x * x)
                                    .map(x -> x * x)
                                    .to(Sink.head())
                                    .run(system);
    

    好吧,这并没有太大帮助。您仍然得到Source 的物化值。要获得 Sink 的具体化值,我们需要明确要求它:

    final Pair<ActorRef, CompletionStage<Integer>> matVals =
          source
            .fold(0, (aggr, next) -> aggr + next)
            .map(x -> x * x)
            .map(x -> x * x)
            .toMat(Sink.head(), Keep.both())
            .run(system);
    

    现在我们得到SourceSink 物化值。您可以像以前一样通过ActorRef 驱动您的流:

    final ActorRef actorRef = matVals.first();
    
    Arrays.asList(1, 2, 3).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
    Arrays.asList(1,2).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
    actorRef.tell(Done.done(), ActorRef.noSender());
    

    您还可以使用CompletableStage API 从流中获取您的价值。就像这样说:

    Integer folded = matVals.second().toCompletableFuture().join(); 
    

    是的,这是阻塞的,但是您需要在流运行完成之前以某种方式阻止主线程完成。

    【讨论】:

    • 谢谢,使用 Keep.both() 是否意味着“我们同时获得 Source 和 Sink 物化值”?
    • 获取值也可以使用:CompletionStage future = matVals.second(); final Integer folded = future.toCompletableFuture().get(1, TimeUnit.SECONDS);
    • 是的,使用 Keep.both() 就是这个意思
    • future.toCompletableFuture().get(1, TimeUnit.SECONDS) 也可以。尽管超时,但它仍然处于阻塞状态。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-02-06
    • 1970-01-01
    • 2020-08-14
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多