【问题标题】:subscribeOn main thread and observerOn caller thread RxJava2subscribeOn 主线程和observerOn 调用者线程RxJava2
【发布时间】:2018-08-19 15:04:29
【问题描述】:

我正在为一个非线程安全的库包装遗留代码。 当库的客户端从主线程调用 API 时,我需要切换到主线程,然后切换回调用者线程以返回结果

我认为我可以使用(这是针对 Android,但问题更笼统)

internal object TransformCompletableTemporarilySwitchToMainThread : CompletableTransformer {
    override fun apply(upstream: Completable): CompletableSource {
        return upstream
                .observeOn(Schedulers.trampoline())
                .subscribeOn(AndroidSchedulers.mainThread())
    }
}

有没有像 RxJava1 的 Schedulers.immediate() 这样的东西?我知道对于测试,您可以将Schedulers.immediate() 替换为Schedulers.trampoline(),但是从文档和我运行的测试来看,Schedulers.trampoline()Schedulers.immediate() 没有太大关系 有其他方法吗?

添加

   /**
 * Returns a default, shared {@link Scheduler} instance whose {@link io.reactivex.Scheduler.Worker}
 * instances queue work and execute them in a FIFO manner on one of the participating threads.
 * <p>
 * The default implementation's {@link Scheduler#scheduleDirect(Runnable)} methods execute the tasks on the current thread
 * without any queueing and the timed overloads use blocking sleep as well.
 * <p>
 * Note that this scheduler can't be reliably used to return the execution of
 * tasks to the "main" thread. Such behavior requires a blocking-queueing scheduler currently not provided
 * by RxJava itself but may be found in external libraries.
 * <p>
 * This scheduler can't be overridden via an {@link RxJavaPlugins} method.
 * @return a {@link Scheduler} that queues work on the current thread
 */

这两部分是什么意思?

  • 返回一个默认的共享 {@link Scheduler} 实例,其 {@link io.reactivex.Scheduler.Worker} * 实例排队工作并执行 它们在其中一个参与线程上以 FIFO 方式进行。

  • @return 一个 {@link Scheduler} 将当前线程上的工作排入队列

【问题讨论】:

    标签: android rx-java2


    【解决方案1】:

    immediatetrampoline 调度程序不适合返回特定线程。您需要一个单线程调度程序,您可以从 ExecutorService 创建它:

    ExecutorService exec = Executors.newSingleThreadedExecutor();
    Scheduler singleThreaded = Schedulers.from(exec);
    
    Observable.fromCallable(() -> api.init())
    .subscribeOn(singleThreaded)
    .observeOn(AndroidSchedulers.mainThread())
    .doOnNext(v -> System.out.println("Initialized"))
    .observeOn(singleThreaded)
    .map(v -> api.getData(v))
    .observeOn(AndroidSchedulers.mainThread())
    .doOnNext(v -> System.out.println("Some data: "))
    .observeOn(singleThreaded)
    .doOnNext(v -> api.close())
    .observeOn(AndroidSchedulers.mainThread())
    .doOnNext(v -> System.out.println("Done"))
    ;
    
    // later
    exec.shutdown();
    

    编辑:

    不可能返回到任意线程。但是,如果一个线程有一个循环器/处理程序,那么您可以使用 AndroidScheduler 将其转换为调度程序,并通过observeOn 定位它。

    【讨论】:

    • 我的问题不同。我需要切换到单个线程(在我的示例中,我使用了主线程,但它可以是另一个线程),然后在调用者线程上的观察者可以是调用者决定使用的任何线程
    • 如果那个线程有一个looper/handler,那么你可以用AndroidSchedulers把它变成一个调度器。
    • 好吧好吧。如果您将此添加为答案,我会批准它。谢谢
    猜你喜欢
    • 2018-06-28
    • 1970-01-01
    • 2016-06-03
    • 2018-03-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-05-26
    相关资源
    最近更新 更多