async* 函数本质上将结果包装到另一个流中。请参阅下面的简短演示:
import 'dart:async';
class StreamDemo {
final controller = StreamController<int>.broadcast();
Stream<int> getYield() async* {
yield* controller.stream;
}
Stream<int> getReturn() {
return controller.stream;
}
}
Future<void> main() async {
final demo = StreamDemo();
print(demo.getReturn().runtimeType); // _BroadcastStream<int>
print(demo.getYield().runtimeType); // _ControllerStream<int>
print(demo.getReturn() == demo.getReturn()); // true
print(demo.getYield() == demo.getYield()); // false
}
我认为正在发生的事情是,当您返回值时,您正在对流的同一个实例进行操作,因此您取消了在两个地方引用的单个流。而使用 async* 函数,您将在两个完全不同的流实例上进行操作。
根据您下面的评论,我做了更多的挖掘。
当您在流上调用.map 时,将返回_MapStream 的实例。
/// A stream pipe that converts data events before passing them on.
class _MapStream<S, T> extends _ForwardingStream<S, T> {
final _Transformation<S, T> _transform;
_MapStream(Stream<S> source, T transform(S event))
: this._transform = transform,
super(source);
void _handleData(S inputEvent, _EventSink<T> sink) {
T outputEvent;
try {
outputEvent = _transform(inputEvent);
} catch (e, s) {
_addErrorWithReplacement(sink, e, s);
return;
}
sink._add(outputEvent);
}
}
这里的关键点是_MapStream 扩展了_ForwardingStream。
/// A [Stream] that forwards subscriptions to another stream.
///
/// This stream implements [Stream], but forwards all subscriptions
/// to an underlying stream, and wraps the returned subscription to
/// modify the events on the way.
///
/// This class is intended for internal use only.
abstract class _ForwardingStream<S, T> extends Stream<T> {
final Stream<S> _source;
_ForwardingStream(this._source);
bool get isBroadcast => _source.isBroadcast;
StreamSubscription<T> listen(void onData(T value)?,
{Function? onError, void onDone()?, bool? cancelOnError}) {
return _createSubscription(onData, onError, onDone, cancelOnError ?? false);
}
StreamSubscription<T> _createSubscription(void onData(T data)?,
Function? onError, void onDone()?, bool cancelOnError) {
return new _ForwardingStreamSubscription<S, T>(
this, onData, onError, onDone, cancelOnError);
}
// Override the following methods in subclasses to change the behavior.
void _handleData(S data, _EventSink<T> sink);
void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) {
sink._addError(error, stackTrace);
}
void _handleDone(_EventSink<T> sink) {
sink._close();
}
}
在这里,当您调用 .listen 时,会调用 _createSubscription 实例化 _ForwardingStreamSubscription。
/// Abstract superclass for subscriptions that forward to other subscriptions.
class _ForwardingStreamSubscription<S, T>
extends _BufferingStreamSubscription<T> {
final _ForwardingStream<S, T> _stream;
StreamSubscription<S>? _subscription;
_ForwardingStreamSubscription(this._stream, void onData(T data)?,
Function? onError, void onDone()?, bool cancelOnError)
: super(onData, onError, onDone, cancelOnError) {
_subscription = _stream._source
.listen(_handleData, onError: _handleError, onDone: _handleDone);
}
// _StreamSink interface.
// Transformers sending more than one event have no way to know if the stream
// is canceled or closed after the first, so we just ignore remaining events.
void _add(T data) {
if (_isClosed) return;
super._add(data);
}
void _addError(Object error, StackTrace stackTrace) {
if (_isClosed) return;
super._addError(error, stackTrace);
}
// StreamSubscription callbacks.
void _onPause() {
_subscription?.pause();
}
void _onResume() {
_subscription?.resume();
}
Future<void>? _onCancel() {
var subscription = _subscription;
if (subscription != null) {
_subscription = null;
return subscription.cancel();
}
return null;
}
// Methods used as listener on source subscription.
void _handleData(S data) {
_stream._handleData(data, this);
}
void _handleError(error, StackTrace stackTrace) {
_stream._handleError(error, stackTrace, this);
}
void _handleDone() {
_stream._handleDone(this);
}
}
请注意,这里的订阅最终是通过在构造函数中调用_stream._source.listen 创建的。因此,即使您有两个不同的 _MapStream 实例,它们最终都会在同一个源实例上调用 listen。
相比之下,_ControllerStream 没有扩展 _ForwardingStream,而是从 _StreamImpl 扩展。