【发布时间】:2021-04-12 14:03:10
【问题描述】:
我正在尝试返回流操作的结果,在这种情况下是:
- 总结一个列表
- 平方值
- 平方值
表示为:
.fold(0, (aggr, next) -> aggr + next)
.map(x -> x * x)
.map(x -> x * x)
访问我使用的值
final AtomicInteger returnValue = new AtomicInteger();
后跟:
.to(Sink.foreach(x -> {
returnValue.set(x);
System.out.println("got: " + x);
}))
这需要阻塞调用以允许流完成,这是不可接受的:
Thread.sleep(2000);
如果我使用:
CompletableFuture<Object> futureValue =
ask(actorRef, Done.done(), Duration.ofMillis(5000)).toCompletableFuture();
System.out.println(futureValue.toCompletableFuture().get().toString());
返回错误:
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://StreamsExamples/system/Materializers/StreamSupervisor-0/$$a-actorRefSource#1663100910]] after [5000 ms]. Message of type [akka.Done$]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
在这种情况下,接收参与者是 Source,并在 Done.done 消息中返回以下内容:
return Optional.of(CompletionStrategy.immediately());
可以使用 Akka 流从流中返回计算值吗?唯一的选择是将计算的值存储在数据库中,或者在计算值时将其发送到 Kafka 主题:
.to(Sink.foreach(x -> {
?
完整的源代码:
import akka.Done;
import akka.actor.ActorRef;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
public class GetStreamValue {
final static akka.actor.ActorSystem system = akka.actor.ActorSystem.create("StreamsExamples");
public static void main(String args[]) throws InterruptedException, ExecutionException {
int bufferSize = 100;
final Source<Integer, ActorRef> source =
Source.actorRef(
elem -> {
// complete stream immediately if we send it Done
if (elem == Done.done()) {
return Optional.of(CompletionStrategy.immediately());
}
else {
return Optional.empty();
}
},
// never fail the stream because of a message
elem -> Optional.empty(),
bufferSize,
OverflowStrategy.dropHead());
final AtomicInteger returnValue = new AtomicInteger();
final ActorRef actorRef = source
.fold(0, (aggr, next) -> aggr + next)
.map(x -> x * x)
.map(x -> x * x)
.to(Sink.foreach(x -> {
returnValue.set(x);
System.out.println("got: " + x);
}))
.run(system);
Arrays.asList(1, 2, 3).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
Arrays.asList(1,2).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
actorRef.tell(Done.done(), ActorRef.noSender());
Thread.sleep(2000);
System.out.println("returnValue is "+returnValue);
}
}
【问题讨论】:
标签: java akka akka-stream