【问题标题】:Emit values on the thread that chain is subscribed to在链订阅的线程上发出值
【发布时间】:2018-07-11 14:58:17
【问题描述】:

问题

尝试在观察者中使用广播接收器时遇到问题。

BroadcastReceiver 发出的项目在主线程上发出,而不是在subscribeOn 上传递的线程上发出。

代码 - 实际场景

例如,给定下面的代码,它与 link 中的 RxBroadcastReceiver 类几乎相同:

public static final class RxBroadcastReceiver implements ObservableOnSubscribe<String> {
    public static Observable<String> create(Context context, IntentFilter intentFilter) {
        return Observable.create(new RxBroadcastReceiver(context, intentFilter));
    }

    private final Context context;
    private final IntentFilter intentFilter;

    private RxBroadcastReceiver(Context context, IntentFilter intentFilter) {
        this.context = context;
        this.intentFilter = intentFilter;
    }

    @Override
    public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
        final BroadcastReceiver broadcastReceiver = new BroadcastReceiver() {
            @Override
            public void onReceive(Context context, Intent intent) {
                String threadName = Thread.currentThread().getName();
                emitter.onNext(threadName);
            }
        };
        context.registerReceiver(broadcastReceiver, intentFilter);
        emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
            @Override
            public void run() {
                context.unregisterReceiver(broadcastReceiver);
            }
        }));
    }
}

当以下代码运行时:

RxBroadcastReceiver.create(context, filter)
        .subscribeOn(Schedulers.io())
        .subscribe(new Consumer<String>() {
        @Override
        public void accept(final String executedThread) throws Exception {
            String acceptedThread = Thread.currentThread().getName();

            System.out.println("executedThread: " + executedThread);
            System.out.println("acceptedThread: " + acceptedThread);
        }
    });

输出显示执行的线程是主线程,而不是来自 Schedulers.io 的线程:

executedThread: main
acceptedThread: main

代码 - 预期结果

如果使用下面的代码:

public static final class RxJavaBroadcaster implements ObservableOnSubscribe<String> {
    public static Observable<String> create() {
        return Observable.create(new RxJavaBroadcaster());
    }

    @Override
    public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
        String threadName = Thread.currentThread().getName();
        emitter.onNext(threadName);
    }
}

当以下代码运行时:

RxJavaBroadcaster.create()
        .subscribeOn(Schedulers.io())
        .subscribe(new Consumer<String>() {
        @Override
        public void accept(final String executedThread) throws Exception {
            String acceptedThread = Thread.currentThread().getName();

            System.out.println("executedThread: " + executedThread);
            System.out.println("acceptedThread: " + acceptedThread);
        }
    });

正如预期的那样,输出不在主线程中:

executedThread: RxCachedThreadScheduler-1
acceptedThread: RxCachedThreadScheduler-1

研究

我认为项目在主线程上发出的原因是因为 BroadcastReceiver.onReceive 总是在其进程的主线程中被调用。

鉴于上述正确,我能想到的另一种解决方案是注册 BroadcastReceiver 并使用此method 传递自定义处理程序。

那么我的问题是:

  1. 如何从 RxBroadcastReceiver 中检索传递给 subscribeOn 的正确 Scheduler
  2. 如何创建绑定到特定调度程序的Handler

有没有人看到不同的选择?

我的问题与question 中描述的完全一样,事实上我已经在使用那里提出的解决方案(调用第二个 subscribeOn),但我更喜欢不同的选择。

【问题讨论】:

  • 您对onReceive 的调用线程的基本原理是正确的 - 您是否需要 RxScheduler 上的提供者和消费者?如果您只需要消费者,那么只需在原始示例中将 .subscribeOn(Schedulers.io()) 更改为 .observeOn(Schedulers.io())

标签: android broadcastreceiver rx-java2


【解决方案1】:

如果我理解正确,您想在特定调度程序上发出并在主线程上接受。我认为您的解决方案将使用 RxAndroid https://github.com/ReactiveX/RxAndroid

将代码从预期结果更改为使用observeOn(AndroidSchedulers.mainThread())

RxJavaBroadcaster.create()
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<String>() {
        @Override
        public void accept(final String executedThread) throws Exception {
            String acceptedThread = Thread.currentThread().getName();

            System.out.println("executedThread: " + executedThread);
            System.out.println("acceptedThread: " + acceptedThread);
        }
    });

【讨论】:

  • 不,我的问题不是观察主线程,其实我已经在用RxAndroid做。问题是 RxBroadcastReceiver 在行 emitter.onNext(threadName); 发出的值是在主线程而不是 IO 线程上完成的,在显式调用 .subscribeOn(Schedulers.io())
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-01-08
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多