【问题标题】:RxJava - How to Kick off Async Network call inside lambda function of subscribe? ObservableRxJava - 如何在订阅的 lambda 函数中启动异步网络调用?可观察的
【发布时间】:2016-02-26 15:12:59
【问题描述】:

以下是我的确切代码。请注意,在 subscribe lambda 函数中调用了 foo(o)。这会在 Android 中产生严格模式错误。现在我当然可以在这里分拆一个线程,但那是 Rxjava 的做事方式吗?我现在是否必须再次考虑线程?想知道我在这里的最佳选择。我怎样才能做到这种 RxJava 方式,以便代码的其他部分可以订阅将返回网络调用响应的新事件?所以本质上我是通过异步响应将一种类型的事件 MyObj 转换为 NetObj 响应事件。

 public static final PublishSubject<MyObj> myObjSubject = PublishSubject.create();
  public static final Observable<MyObj> observable = myObjSubject.asObservable();


protected CompositeSubscription myCompositeSubs = new CompositeSubscription();

myCompositeSubs.add(
            observable.subscribe((MyObj o) -> {
                 // `. Update UI with MyObject and 
                 // 2. Kick off network call which will return JsonResponse type object which I would like to possibly process here and also publish to let others respond to it.
                 foo(o);  // $$$$$ million dollar question what if I am kicking off something long running here?
            }));


private void foo(MyObj o){

// long running request.  How to kick off long running from here? 
  JsonRespons resp = RestAdapter.makeQuest(o.url);
  // I want this JsonResponse object to be an even others can listen for. I don't want to just process it right here.

}

更新:只是为了澄清我想要做的是在类(Android 片段)中侦听数据事件 MyObject。一旦我得到这个,我想做两件事,用 MyObject 同步更新 UI,这是我想要消费的东西,而不仅仅是转换。但我也想启动一个长期运行的请求。返回 JsonObject 响应的网络调用。当它返回时,我可能会在同一个片段中处理它,或者我可能希望其他类能够监听这个事件。所以我不只是观察 MyObject 然后进行网络调用并获取响应。我需要同时执行这两项操作,一项同步另一项异步。

【问题讨论】:

标签: android rx-java


【解决方案1】:

这是使用像flatMap 这样的 RxJava 运算符的好机会。如果您需要更多文档,请查看 here,但 flatMap 允许您获取一个类型的 Observable(您的 Observable&lt;MyObj&gt;)并返回另一种类型的 Observable(在这种情况下,它可以是 Observable&lt;Void&gt; 或者可能Observable&lt;Boolean&gt; 如果您想知道您的长期运行请求是否完成)。

看看这段代码:

Subscription sub = observable.flatMap((MyObj o) -> {
      // this will be run on your subscribeOn thread -- in background
      return Observable.just(foo(o));
    }).subscribe(aVoid -> {
      // we have completed both operations here, emissions will be 
      // on our observeOn() thread
    });

foo(MyObj) 函数现在的结构如下:

private Void foo(MyObj o) {
    // long running stuff
    return null;
}

将返回类型设为Void 而不是void 很重要,这样我们才能使用flatMap 运算符并返回Observable&lt;Void&gt;

您所有长时间运行的foo() 操作将在您的subscribeOn() 线程上运行,以及您的初始可观察对象,一旦操作完成,它将在您的@ 上向您的subscribe() 发出一个Void 项目987654338@ 线程(如果您想通知 UI,这可以是您在 Android 中的主线程)。

已更新以允许 UI 在 flatMap 调用中工作:

注意:这假定第一个 observable 已在后台线程上定义了 subscribeOn() 并且将 observeOn() 定义为 AndroidSchedulers.mainThread()

Subscription sub = observable.flatMap((MyObj o) -> {
      // << do stuff with MyObj to UI HERE >> 
      return Observable.defer(() -> Observable.just(foo(o)))
          .subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
    }).subscribe(aVoid -> {
      // foo() has completed, we're back on the UI thread
    });

如果您想使用 JsonResponse 做某事,您可以更改 foo() 函数以返回它而不是 Void,然后您将能够在 UI 线程上使用 foo() 的结果。

更新 2:Observable.create() 更改为 Observable.defer() 以使其更干净。

【讨论】:

  • 只是为了澄清我在一个类(Android Fragment)中,我想以这样一种方式处理它,以便我可以在接收到 MyObject 时采取一些同步操作,但同时也想使异步调用,并在完成或错误时采取其他操作。
  • 我有各种观察者和主题的单例,但让我们看看我是否返回 Objservable.just(foo) 可以在这一类中工作,但如果我希望响应在片段之外可见怎么办?例如,我想要映射到 JSONResponse。因此,我希望他们能够在外部课程中使用侦听 JSonResponse 。外部侦听器如何接入 JSonResponse 对象?
  • 所以我希望 Fragment 能够对 MyObj 采取一些操作,并启动此异步操作,该操作将返回一个 JsonResponse。它是我希望能够以流的形式收听或观察的响应。我可能想在启动它的片段或其他地方处理它。所以我要说的是,单个类将使用 MyObj 类来更新 UI,它会启动异步。一旦返回,我希望它被发布,或者有一种方法可以监听流的 JsonResponse 类型。
  • 我已经更新了我的问题,以澄清我试图在 Fragment 中执行的同步和异步这两个操作。
  • 查看我的更新回复。将 foo() 的返回更改为 JsonResponse 将允许 PublishSubject 的任何订阅者对您的 subscribe() 调用中发出的值进行操作。因此,您可以有多个Subscriptions 来收听主题,或者您可以设置一个回调机制,通知您subscribe()中的任何JsonResponse 侦听器
猜你喜欢
  • 2016-07-10
  • 2015-06-04
  • 2017-11-14
  • 1970-01-01
  • 2022-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-06-21
  • 1970-01-01
相关资源
最近更新 更多