【问题标题】:Akka stream - limiting Flow rate without introducing delayAkka 流 - 在不引入延迟的情况下限制流量
【发布时间】:2017-04-20 08:17:15
【问题描述】:

我正在使用 Akka(版本 2.4.17)在 Java 中构建观察流(假设类型为 <T> 的元素保持通用)。

我的要求是该流程应该是可定制的,以便在每单位时间一旦到达就提供最大数量的观察。例如,它应该能够每分钟最多提供 2 个观察值(第一个到达,其余的可以丢弃)。

我非常仔细地查看了 Akka 文档,尤其是 this page,其中详细介绍了内置阶段及其语义。

到目前为止,我尝试了以下方法。

  • 使用throttleshaping() 模式(超过限制时不关闭流):

      Flow.of(T.class)
           .throttle(2, 
                     new FiniteDuration(1, TimeUnit.MINUTES), 
                     0, 
                     ThrottleMode.shaping())
    
  • 使用groupedWith 和一个中间自定义方法:

    final int nbObsMax = 2;
    
    Flow.of(T.class)
        .groupedWithin(Integer.MAX_VALUE, new FiniteDuration(1, TimeUnit.MINUTES))
        .map(list -> {
             List<T> listToTransfer = new ArrayList<>();
             for (int i = list.size()-nbObsMax ; i>0 && i<list.size() ; i++) {
                 listToTransfer.add(new T(list.get(i)));
             }
             return listToTransfer;
        })
        .mapConcat(elem -> elem)  // Splitting List<T> in a Flow of T objects
    

以前的方法为我提供了每单位时间的正确观察次数,但这些观察被保留并且仅在时间窗口结束时提供(因此会有额外的延迟)。

举一个更具体的例子,如果以下观察结果进入我的流程:

[Obs1 t=0s] [Obs2 t=45s] [Obs3 t=47s] [Obs4 t=121s] [Obs5 t=122s]

它应该只在它们到达后立即输出(这里可以忽略处理时间):

窗口 1:[Obs1 t~0s] [Obs2 t~45s] 窗口 2:[Obs4 t~121s] [Obs5 t~122s]

任何帮助将不胜感激,感谢您阅读我的第一篇 StackOverflow 帖子 ;)

【问题讨论】:

    标签: java stream akka


    【解决方案1】:

    我想不出一个开箱即用的解决方案来满足您的需求。由于它是如何使用桶模型实现的,而不是在每个时间段开始时都有一个允许的租约,因此 Throttle 将稳定地发出。

    要获得确切的行为,您必须创建自己的自定义速率限制阶段(这可能并不难)。您可以在此处找到有关如何创建自定义阶段的文档:http://doc.akka.io/docs/akka/2.5.0/java/stream/stream-customize.html#custom-linear-processing-stages-using-graphstage

    一种可行的设计是有一个限额计数器,说明可以发射多少元素,您在每个间隔重置,对于每个传入的元素,您从计数器中减去一个并发射,当限额用完时,您继续拉上游,但丢弃元素而不是发射它们。将TimerGraphStageLogic 用于GraphStageLogic 允许您设置可以重置限额的定时回调。

    【讨论】:

    • 我完全错过了TimerGraphStageLogic 上的部分!我将开始实现我自己的模块。谢谢你的回答:)
    • 添加了实现您建议的解决方案的代码。再次感谢。
    【解决方案2】:

    【讨论】:

    • 感谢您的回答。我已经看过这个解决方案,但它并不令人满意,因为它使用了专用的 Actor 而不是 GraphStage(我需要在我的应用程序的其他部分重用这个模块)。
    【解决方案3】:

    感谢@johanandren 的回答,我已经成功实现了一个自定义的基于时间的GraphStage 满足我的要求。

    我把代码贴在下面,如果有人感兴趣的话:

    import akka.stream.Attributes;
    import akka.stream.FlowShape;
    import akka.stream.Inlet;
    import akka.stream.Outlet;
    import akka.stream.stage.*;
    import scala.concurrent.duration.FiniteDuration;
    
    public class CustomThrottleGraphStage<A> extends GraphStage<FlowShape<A, A>> {
    
        private final FiniteDuration silencePeriod;
        private int nbElemsMax;
    
        public CustomThrottleGraphStage(int nbElemsMax, FiniteDuration silencePeriod) {
            this.silencePeriod = silencePeriod;
            this.nbElemsMax = nbElemsMax;
        }
    
        public final Inlet<A> in = Inlet.create("TimedGate.in");
        public final Outlet<A> out = Outlet.create("TimedGate.out");
    
        private final FlowShape<A, A> shape = FlowShape.of(in, out);
        @Override
        public FlowShape<A, A> shape() {
            return shape;
        }
    
        @Override
        public GraphStageLogic createLogic(Attributes inheritedAttributes) {
            return new TimerGraphStageLogic(shape) {
    
                private boolean open = false;
                private int countElements = 0;
    
                {
                    setHandler(in, new AbstractInHandler() {
                        @Override
                        public void onPush() throws Exception {
                            A elem = grab(in);
                            if (open || countElements >= nbElemsMax) {
                                pull(in);  // we drop all incoming observations since the rate limit has been reached
                            }
                            else {
                                if (countElements == 0) { // we schedule the next instant to reset the observation counter
                                    scheduleOnce("resetCounter", silencePeriod);
                                }
                                push(out, elem); // we forward the incoming observation
                                countElements += 1; // we increment the counter
                            }
                        }
                    });
                    setHandler(out, new AbstractOutHandler() {
                        @Override
                        public void onPull() throws Exception {
                            pull(in);
                        }
                    });
                }
    
                @Override
                public void onTimer(Object key) {
                    if (key.equals("resetCounter")) {
                        open = false;
                        countElements = 0;
                    }
                }
            };
        }
    }
    

    【讨论】:

      猜你喜欢
      • 2023-04-04
      • 2022-10-24
      • 1970-01-01
      • 2015-09-12
      • 1970-01-01
      • 1970-01-01
      • 2021-10-11
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多