【问题标题】:Dart Stream firstWhere does not immediately resolveDart Stream firstWhere 没有立即解决
【发布时间】:2021-12-20 17:36:02
【问题描述】:

例如,我有这个简单的代码:

//create stream with numbers from 1 to 100, delayed by 10sec duration
Stream<int> countStream() async* {
  for (int i = 1; i <= 100; i++) {
    yield i;      
    sleep(Duration(seconds: 10));
  }
}

void main() async {
  var x = await countStream().firstWhere((element) => element == 1); //here Im waiting for number 1
  print(x);
}

问题是 firstWhere 不是在yield 1之后立即退出,而是在yield 2之后, 并保持打印 10 秒。

为什么?在我现实生活中的应用程序中,我有转换为消息流的 websocket 流,并等待特定消息。但是因为websocket流没有产生另一个消息,firstWhere挂了。

这是我的原始代码:

  Stream<Message> lines() async* {
    var partial = '';

    await for (String chunk in ws!) { //ws is WebSocket
      var lines = chunk.split('\n');
      lines[0] = partial + lines[0];
      partial = lines.removeLast();
      for (final line in lines) {
        var msg = Message.parse(line); //Message.parse returns CodeMessage object
        if (msg != null) yield msg;
      }
    }
  }

//at some place in code this hangs because last arrived message is CodeMessage
var msg = await lines().firstWhere((obj) => obj is CodeMessage);
print(msg);

还有其他方法可以做到这一点还是我错了?

【问题讨论】:

标签: flutter dart websocket async-await stream


【解决方案1】:

这里有两个问题。

  1. 首先,在你的例子中:

    这里使用sleep 是不合适的。 考虑这个函数的文档:

    请谨慎使用,因为无法处理异步操作 在一个隔离中,而它在睡眠调用中被阻塞。

    您的 countStream 函数尽管是异步的,但仍会阻止 睡眠时整个隔离。

    试试这个:

    await Future<void>.delayed(const Duration(seconds: 10));
    
  2. 现在,谈谈firstWhere 没有立即解决的真正原因:

    让我们尝试一个简单的实验:

    Stream<int> testStream() async* {
      for (var i = 0; i < 100; ++i) {
        print(i);
        yield i;
      }
    }
    
    void main() {
      late final StreamSubscription streamSubscription;
      streamSubscription = testStream().listen((value) {
        if (value == 1) streamSubscription.cancel();
      }); 
    }
    

    输出:

    0
    1
    2
    

    那么这里发生了什么?订阅在i == 1 取消 - 为什么循环一直持续到 2?

    答案很简单。在到达另一个 yield 语句之前,异步生成器函数不会停止(因此流不会关闭)。这是由于event loop works的方式:

    一旦 Dart 函数开始执行,它会继续执行直到退出。也就是说,Dart 函数不能被其他 Dart 代码中断。

    函数没有机会停止,直到它再次产生,因为取消流订阅不能立即停止它。

    firstWhere 使用内部 Dart _cancelAndValue 函数来完成一个值。在流关闭之前它不会完成,并且在到达下一个 yield 之前不会关闭流 - 在您的情况下,这可能会延迟甚至永远不会发生。

    在使用异步生成器函数时解决此问题的唯一方法是在下一次延迟之前添加另一个 yieldreturn 语句。

【讨论】:

  • 代码中使用的sleep 调用是一个示例,并不是问题的相关细节。它可以是任何具有可观察到的副作用的东西(例如,print() 调用)。
  • 问题是关于示例代码的;原始代码作为上下文提供。因此,我回答了被问到的问题。话虽如此,我忽略了影响两个代码示例的同一问题的另一个原因。我的问题已被编辑以解决这个问题。
猜你喜欢
  • 2016-07-06
  • 2019-01-11
  • 2015-02-10
  • 2015-02-09
  • 2019-12-21
  • 1970-01-01
  • 2014-08-06
  • 2018-03-21
  • 1970-01-01
相关资源
最近更新 更多