【问题标题】:What is the difference between "return <Stream>" and "yield* <Stream>" in flutter?Flutter中的“return <Stream>”和“yield* <Stream>”有什么区别?
【发布时间】:2021-09-12 19:54:17
【问题描述】:

在 Flutter 中使用 Streams 时,我注意到流的一些奇怪行为。

设置

EventChannel 提供事件流。 在我的Cubit 中,我监听该流并取消Cubitclose 方法中的StreamSubscription。 下一个屏幕也使用相同的EventChannel 来监听事件。 当我进入第二个屏幕时,Android 中的onCancel 方法被调用了两次,因此没有事件传递到第二个CubitScanner 是一个单例,所以两个 Cubits 使用相同的实例。

流的功能

class Scanner {
  final eventChannel = EventChannel("events");

  Stream<ScanEvent> getScanEvent() {
    return _scanEvents.receiveBroadcastStream().map((event) => ScanEvent.fromJson(jsonDecode(event)));
  }
}

尺码

Scanner scanner = get<Scanner>();

Future<void> listenForScan() async {
  _streamSubscription = _scanner.getScanEvent().listen((event) => submitSerialText(event.scanData));
}

@override
Future<void> close() {
  _streamSubscription?.cancel();
  return super.close();
}

修复

当我将async*yield* 一起使用时,它可以正常工作:

流的固定函数

class Scanner {
  final eventChannel = EventChannel("events");

  Stream<ScanEvent> getScanEvent() async* {
    yield* _scanEvents.receiveBroadcastStream().map((event) => ScanEvent.fromJson(jsonDecode(event)));
  }
}

问题

为什么第一种方法的流被取消了两次?

【问题讨论】:

  • This 可能会有所帮助。另请阅读this 之一。
  • 这是对流的一个很好的概述,但没有回答问题,为什么第一个“返回”的流被取消了两次。
  • 这可能是由于肘部的代码吗?我没有使用肘的经验。但我觉得蒸汽部分很好。

标签: flutter dart stream


【解决方案1】:

async* 函数本质上将结果包装到另一个流中。请参阅下面的简短演示:

import 'dart:async';

class StreamDemo {
  final controller = StreamController<int>.broadcast();
  Stream<int> getYield() async* {
    yield* controller.stream;
  }
  Stream<int> getReturn() {
    return controller.stream;
  }  
}

Future<void> main() async {
  final demo = StreamDemo();
  print(demo.getReturn().runtimeType); // _BroadcastStream<int>
  print(demo.getYield().runtimeType); // _ControllerStream<int>
  print(demo.getReturn() == demo.getReturn()); // true
  print(demo.getYield() == demo.getYield()); // false
}

我认为正在发生的事情是,当您返回值时,您正在对流的同一个实例进行操作,因此您取消了在两个地方引用的单个流。而使用 async* 函数,您将在两个完全不同的流实例上进行操作。


根据您下面的评论,我做了更多的挖掘。

当您在流上调用.map 时,将返回_MapStream 的实例。

/// A stream pipe that converts data events before passing them on.
class _MapStream<S, T> extends _ForwardingStream<S, T> {
  final _Transformation<S, T> _transform;

  _MapStream(Stream<S> source, T transform(S event))
      : this._transform = transform,
        super(source);

  void _handleData(S inputEvent, _EventSink<T> sink) {
    T outputEvent;
    try {
      outputEvent = _transform(inputEvent);
    } catch (e, s) {
      _addErrorWithReplacement(sink, e, s);
      return;
    }
    sink._add(outputEvent);
  }
}

这里的关键点是_MapStream 扩展了_ForwardingStream

/// A [Stream] that forwards subscriptions to another stream.
///
/// This stream implements [Stream], but forwards all subscriptions
/// to an underlying stream, and wraps the returned subscription to
/// modify the events on the way.
///
/// This class is intended for internal use only.
abstract class _ForwardingStream<S, T> extends Stream<T> {
  final Stream<S> _source;

  _ForwardingStream(this._source);

  bool get isBroadcast => _source.isBroadcast;

  StreamSubscription<T> listen(void onData(T value)?,
      {Function? onError, void onDone()?, bool? cancelOnError}) {
    return _createSubscription(onData, onError, onDone, cancelOnError ?? false);
  }

  StreamSubscription<T> _createSubscription(void onData(T data)?,
      Function? onError, void onDone()?, bool cancelOnError) {
    return new _ForwardingStreamSubscription<S, T>(
        this, onData, onError, onDone, cancelOnError);
  }

  // Override the following methods in subclasses to change the behavior.

  void _handleData(S data, _EventSink<T> sink);

  void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) {
    sink._addError(error, stackTrace);
  }

  void _handleDone(_EventSink<T> sink) {
    sink._close();
  }
}

在这里,当您调用 .listen 时,会调用 _createSubscription 实例化 _ForwardingStreamSubscription

/// Abstract superclass for subscriptions that forward to other subscriptions.
class _ForwardingStreamSubscription<S, T>
    extends _BufferingStreamSubscription<T> {
  final _ForwardingStream<S, T> _stream;

  StreamSubscription<S>? _subscription;

  _ForwardingStreamSubscription(this._stream, void onData(T data)?,
      Function? onError, void onDone()?, bool cancelOnError)
      : super(onData, onError, onDone, cancelOnError) {
    _subscription = _stream._source
        .listen(_handleData, onError: _handleError, onDone: _handleDone);
  }

  // _StreamSink interface.
  // Transformers sending more than one event have no way to know if the stream
  // is canceled or closed after the first, so we just ignore remaining events.

  void _add(T data) {
    if (_isClosed) return;
    super._add(data);
  }

  void _addError(Object error, StackTrace stackTrace) {
    if (_isClosed) return;
    super._addError(error, stackTrace);
  }

  // StreamSubscription callbacks.

  void _onPause() {
    _subscription?.pause();
  }

  void _onResume() {
    _subscription?.resume();
  }

  Future<void>? _onCancel() {
    var subscription = _subscription;
    if (subscription != null) {
      _subscription = null;
      return subscription.cancel();
    }
    return null;
  }

  // Methods used as listener on source subscription.

  void _handleData(S data) {
    _stream._handleData(data, this);
  }

  void _handleError(error, StackTrace stackTrace) {
    _stream._handleError(error, stackTrace, this);
  }

  void _handleDone() {
    _stream._handleDone(this);
  }
}

请注意,这里的订阅最终是通过在构造函数中调用_stream._source.listen 创建的。因此,即使您有两个不同的 _MapStream 实例,它们最终都会在同一个源实例上调用 listen

相比之下,_ControllerStream 没有扩展 _ForwardingStream,而是从 _StreamImpl 扩展。

【讨论】:

  • 但我实际上并没有返回controller.stream,而是controller.stream.map()。这样,returnyield* 的相等性检查结果为 false。而且我在转换到第二个屏幕/肘部时只调用了一次subscription.cancel()。但是,流被取消了两次,一次是在第二个 cubit 订阅之前和一次在订阅之后(但是第二个 cubit 还没有调用取消)。
  • 我试图为这个问题创建一个最小的工作示例,但至少对于flutter_bloc 的最新版本,我无法重现它。感谢您对流的深入回答。仍然不确定为什么返回的流被关闭了两次,而产生的流只关闭了一次。可能是我们使用的flutter_bloc 版本中的一个错误。
猜你喜欢
  • 2018-10-25
  • 2017-01-09
  • 2014-01-10
  • 2020-05-24
  • 2015-05-19
  • 1970-01-01
  • 2020-05-30
  • 2018-10-22
  • 1970-01-01
相关资源
最近更新 更多