【问题描述】:

我有两个流从两个不同的 api 获取。

Stream<Month> get monthOutStream => monthOutController.stream;
Stream<MySchedule> get resultOutStream => resultController.stream;

我在应用程序的两种不同状态下获取这些数据,开始时的结果和来自用户的某些事件后的几个月。

MyScheduleBloc(){
  initialData();
}

Future initialData() async {
  MySchedule mySchedule = await myScheduleViewModel.importMySchedule(now.id);
  resultController.add(mySchedule);
}

我的屏幕有一个流构建器

Widget build(BuildContext context) {
final webCalenderBloc = WebCalenderBloc();
return StreamBuilder(
  stream: webCalenderBloc.resultOutStream,
  builder: (BuildContext context , snapdata){
    if(!snapdata.hasData){
      return Center(
        child: CircularProgressIndicator(),
      );
    }
    return body(snapdata.data);
   },
 );
}

由于主要的小部件构建方法将带有 resultoutstream 的 StreamBuilder 小部件作为流。我在哪里获取另一个流monthoutStream。 我可以在流中获取流吗?在处理两个流时我是否遗漏了任何东西。我不想从monthoutstream构建任何小部件,但想检查其中的数据。

【问题讨论】:

    标签: stream dart flutter


    【解答1】:

    在我的情况下,我倾向于将多个流合并为一个流,如果它们来自同一类型,那么您可以使用:

    import 'package:async/async.dart' show StreamGroup;
    ...
    StreamGroup.merge([stream1,stream2]);
    

    【问题讨论】:

    【解答2】:

    我正在使用一种 BLoC,我在其中广播了 Stream,当某些事情发生变化时,它只会通知听众。有点像 Qt 的信号和插槽。无论如何,对于我想听不止一个流的情况,我做了这门课。它基本上是 StreamBuilder 但您可以收听多个流,它会丢弃流中的所有数据。

    import 'dart:async';
    
    import 'package:flutter/widgets.dart';
    
    typedef MultiStreamWidgetBuilder<T> = Widget Function(BuildContext context);
    
    // A widget that basically re-calls its builder whenever any of the streams
    // has an event.
    class MultiStreamBuilder extends StatefulWidget {
      const MultiStreamBuilder({
        required this.streams,
        required this.builder,
        Key? key,
      }) : super(key: key);
    
      final List<Stream<dynamic>> streams;
      final MultiStreamWidgetBuilder builder;
    
      Widget build(BuildContext context) => builder(context);
    
      @override
      State<MultiStreamBuilder> createState() => _MultiStreamBuilderState();
    }
    
    class _MultiStreamBuilderState extends State<MultiStreamBuilder> {
      final List<StreamSubscription<dynamic>> _subscriptions = [];
    
      @override
      void initState() {
        super.initState();
        _subscribe();
      }
    
      @override
      void didUpdateWidget(MultiStreamBuilder oldWidget) {
        super.didUpdateWidget(oldWidget);
        if (oldWidget.streams != widget.streams) {
          // Unsubscribe from all the removed streams and subscribe to all the added ones.
          // Just unsubscribe all and then resubscribe. In theory we could only
          // unsubscribe from the removed streams and subscribe from the added streams
          // but then we'd have to keep the set of streams we're subscribed to too.
          // This should happen infrequently enough that I don't think it matters.
          _unsubscribe();
          _subscribe();
        }
      }
    
      @override
      Widget build(BuildContext context) => widget.build(context);
    
      @override
      void dispose() {
        _unsubscribe();
        super.dispose();
      }
    
      void _subscribe() {
        for (final s in widget.streams) {
          final subscription = s.listen(
            (dynamic data) {
              setState(() {});
            },
            onError: (Object error, StackTrace stackTrace) {
              setState(() {});
            },
            onDone: () {
              setState(() {});
            },
          );
          _subscriptions.add(subscription);
        }
      }
    
      void _unsubscribe() {
        for (final s in _subscriptions) {
          s.cancel();
        }
        _subscriptions.clear();
      }
    }
    

    使用示例:

    class AppWidget extends StatelessWidget {
      @override
      Widget build(BuildContext context) {
        return MultiStreamBuilder(
          streams: [appState.repoListChanged, appState.selectedRepoChanged],
          builder: _buildMain,
        );
      }
    
      Widget _buildMain(BuildContext context) {
        return Scaffold(
          body: Row(
    ...
    

    我只是写了它,所以我没有做太多测试。我认为理论上你可以制作一个给你状态的系统,虽然我不确定 Dart 的类型系统是否足够先进,可以让你在不使用 dynamic 的情况下做到这一点。

    【问题讨论】:

      【解答3】:
      Observable.combineLatest2(
              aStream,
              bStream,
              (a, b, c) =>
              a != '' && b != '');
      

      combineLatestN 返回一个组合流

      【问题讨论】:

      • 我试过这个,但没有用。 StreamBuilder 正在返回错误消息,并显示“状态不佳:Stream 已被收听。”
      • @NihadDelic 您可能在接收数据后关闭了流,或者您没有使用 streamController.broadcast() 。
      【解答4】:

      【问题讨论】:

      • 感谢您提供有关合并两个流或一个接一个使用的信息。在某些时候,我会遇到第一个流具有数据“真”的情况,然后它必须弹出导航,如果“假”然后转到另一个流。问题出现在小部件正在构建并且必须弹出导航虽然弹出工作但有几秒钟存在错误说小部件正在构建返回一个空值。如何在一个接一个地使用流时解决这个问题。
      • 弹出导航应该在监听器中,而不是在streamBuilder中。