【问题标题】:Dart : How to create a stream that aggregates events from another stream?Dart:如何创建一个聚合来自另一个流的事件的流?
【发布时间】:2020-04-19 16:38:18
【问题描述】:

创建需要从另一个 Stream 聚合多个事件的流的最佳方法是什么?

我的目标是创建一个流来聚合来自另一个流的事件,直到它有足够的事件来构建消息。在我的情况下,我正在从 Socket 流中读取数据,因此消息可能分布在不同的事件中,并且事件可能包含各种消息的数据,因此我不能只对每个元素应用映射操作。

似乎正确的方法是使用 Stream Transformer,但我无法找到有关如何正确实现它且没有太多样板代码的信息。

在阅读了有关如何创建流的信息后,我想出了一个解决方案,但我不确定这是否可以接受,也不是最好的方法。

这是我的解决方案示例:

Stream<String> joinWordsIfStartWithC(Stream<String> a) async* {
  var prevWord= '';
  await for (var i in a) {
    prevWord += i;
    if(i.startsWith('C')){
      yield prevWord;
      prevWord = '';
    }
  }
}

Stream<String> periodicStream(Duration interval) async* {
  while (true) {
    await Future.delayed(interval);
    yield 'C';
    yield 'A';
    yield 'B';
    yield 'C';
    yield 'C';
    yield 'B';
    yield 'C';
  }
}

void main(List<String> arguments) async {
  var intStream = periodicStream(Duration(seconds: 2));

  var sStream = joinWordsIfStartWithC(intStream);

  sStream.listen((s) => print(s));
}

【问题讨论】:

    标签: dart stream dart-async dart-stream


    【解决方案1】:

    我会说你的解决方案似乎很好,但是如果你想制作一个流转换器,通过从 StreamTransformerBase 扩展是相当容易的:

    import 'dart:async';
    
    class JoinWordsIfStartWithCTransformer extends StreamTransformerBase<String, String> {
      Stream<String> bind(Stream<String> a) async* {
        var prevWord = '';
        await for (var i in a) {
          prevWord += i;
          if (i.startsWith('C')) {
            yield prevWord;
            prevWord = '';
          }
        }
      }
    }
    
    Stream<String> periodicStream(Duration interval) async* {
      while (true) {
        await Future.delayed(interval);
        yield 'C';
        yield 'A';
        yield 'B';
        yield 'C';
        yield 'C';
        yield 'B';
        yield 'C';
      }
    }
    
    void main(List<String> arguments) async {
      var intStream = periodicStream(Duration(seconds: 2));
    
      var sStream = intStream.transform(JoinWordsIfStartWithCTransformer());
    
      sStream.listen((s) => print(s));
    }
    

    【讨论】:

    • 谢谢你,你的例子虽然简单,但帮助我最终理解了我应该在 bind 方法中做什么。我没有意识到我可以只返回流。
    猜你喜欢
    • 2019-09-25
    • 1970-01-01
    • 2019-04-07
    • 2022-01-05
    • 2016-07-06
    • 2022-11-03
    • 1970-01-01
    • 2015-10-16
    • 1970-01-01
    相关资源
    最近更新 更多