【发布时间】:2017-04-20 08:17:15
【问题描述】:
我正在使用 Akka(版本 2.4.17)在 Java 中构建观察流(假设类型为 <T> 的元素保持通用)。
我的要求是该流程应该是可定制的,以便在每单位时间一旦到达就提供最大数量的观察。例如,它应该能够每分钟最多提供 2 个观察值(第一个到达,其余的可以丢弃)。
我非常仔细地查看了 Akka 文档,尤其是 this page,其中详细介绍了内置阶段及其语义。
到目前为止,我尝试了以下方法。
-
使用
throttle和shaping()模式(超过限制时不关闭流):Flow.of(T.class) .throttle(2, new FiniteDuration(1, TimeUnit.MINUTES), 0, ThrottleMode.shaping()) -
使用
groupedWith和一个中间自定义方法:final int nbObsMax = 2; Flow.of(T.class) .groupedWithin(Integer.MAX_VALUE, new FiniteDuration(1, TimeUnit.MINUTES)) .map(list -> { List<T> listToTransfer = new ArrayList<>(); for (int i = list.size()-nbObsMax ; i>0 && i<list.size() ; i++) { listToTransfer.add(new T(list.get(i))); } return listToTransfer; }) .mapConcat(elem -> elem) // Splitting List<T> in a Flow of T objects
以前的方法为我提供了每单位时间的正确观察次数,但这些观察被保留并且仅在时间窗口结束时提供(因此会有额外的延迟)。
举一个更具体的例子,如果以下观察结果进入我的流程:
[Obs1 t=0s] [Obs2 t=45s] [Obs3 t=47s] [Obs4 t=121s] [Obs5 t=122s]
它应该只在它们到达后立即输出(这里可以忽略处理时间):
窗口 1:[Obs1 t~0s] [Obs2 t~45s] 窗口 2:[Obs4 t~121s] [Obs5 t~122s]
任何帮助将不胜感激,感谢您阅读我的第一篇 StackOverflow 帖子 ;)
【问题讨论】: