【发布时间】:2016-10-23 01:41:46
【问题描述】:
使用 RxJS 5.0.0-rc.1,我试图通过使用 yield 和 .next() 交换数据,以类似于 how generators/iterators work 的方式传达我的 Observer 和 Observable。其目的是掌握对.subscribe 的调用返回的内容,并据此修改/更新我的可观察流中的以下值。
我不完全确定这是否可能。不过,我发现您可以捕获.subscribe 回调中抛出的异常。下面的 sn-ps 打印出"Boom!":
var source = Observable.create((observer) => {
try {
observer.next(42);
} catch (e) {
// This will catch the Error
// thrown on the subscriber
console.log(e.message);
}
observer.complete();
});
source.subscribe(() => {
throw new Error('Boom!');
});
那么,如果订阅者不是抛出,而是返回一个值呢? Observable 有没有办法检索它?也许我以错误的方式接近这个。如果是这样,在这种情况下,什么是“被动”的做事方式?
非常感谢。
编辑
我想出的一种可能方法是为流中的每个项目提供一个回调函数。比如:
var source = Observable.create((observer) => {
// This will print "{ success: true }"
observer.next({ value: 42, reply: console.log });
observer.complete();
});
source.subscribe(({ value, reply }) => {
console.log('Got', value);
return reply({ success: true });
});
还有其他想法吗?
编辑 2
由于我最初的问题对我想要实现的目标带来了一些困惑,我将描述我的真实世界场景。我正在编写一个模块的 API,用于通过队列管理消息(很像简化的内存中 AMQP-RPC 机制),我认为 RxJS 会很合适。
它的工作方式与您预期的一样:Publisher 将消息推送到队列,然后将其传递到 Consumer。在术语中,Consumer 可以回复Publisher,如果有兴趣可以收听该回复。
在理想情况下,API 应如下所示:
Consumer().consume('some.pattern')
.subscribe(function(msg) {
// Do something with `msg`
console.log(msg.foo);
return { ok: true };
});
Publisher().publish('some.pattern', { foo: 42 })
// (optional) `.subscribe()` to get reply from Consumer
该示例将打印42。
回复Publisher 的逻辑位于Consumer 函数中。但实际响应来自 .subscribe() 回调。这引出了我最初的问题:我应该如何从流的创建者那里获取返回值?
把Consumer#consume()想象成:
/**
* Returns an async handler that gets invoked every time
* a new message matching the pattern of this consumer
* arrives.
*/
function waitOnMessage(observer) {
return function(msg) {
observer.next(msg);
// Conceptually, I'd like the returned
// object from `.subscribe()` to be available
// in this scope, somehow.
// That would allow me to go like:
// `sendToQueue(pubQueue, response);`
}
}
return Observable.create((observer) => {
queue.consume(waitOnMessage(observer));
});
还有意义吗?
【问题讨论】:
标签: javascript reactive-programming rxjs observer-pattern rxjs5