【发布时间】:2015-06-14 19:11:31
【问题描述】:
我的用例是:我得到一个永久链接列表,并且需要为每个永久链接发出两个 REST 请求以获取部分数据。当两个请求都返回时,我想将他们的信息合并在一起并用它做一些事情(在这里 - 打印出来)。我想使用 zip 运算符的代码来做到这一点。这是我当前的代码(以及我正在使用的库的模拟):
public class Main {
public static void main(String[] args) {
ContentManager cm = new ContentManager();
Observable
.from(cm.getPermalinks(10))
.flatMap(permalink -> Observable.zip(
Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))),
Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))),
(dataContent, streamUrlContent) -> {
if (dataContent == null || streamUrlContent == null) {
System.err.println("not zipping " + dataContent + " and " + streamUrlContent);
return Observable.empty();
}
return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl);
}))
.subscribe(System.out::println);
}
}
class SubscribingRestCallback implements RestCallback {
private final Subscriber<? super Content> subscriber;
public SubscribingRestCallback(Subscriber<? super Content> subscriber) {
this.subscriber = subscriber;
}
@Override
public void onSuccess(Content content) {
subscriber.onNext(content);
subscriber.onCompleted();
}
@Override
public void onFailure(int code, String message) {
System.err.println(message);
subscriber.onNext(null);
subscriber.onCompleted();
}
}
public class Content {
public final String permalink;
public final String logoUrl;
public final String streamUrl;
public Content(String permalink, String logoUrl, String streamUrl) {
this.permalink = permalink;
this.logoUrl = logoUrl;
this.streamUrl = streamUrl;
}
@Override
public String toString() {
return String.format("Content [%s, %s, %s]", permalink, logoUrl, streamUrl);
}
}
public interface RestCallback {
void onSuccess(Content content);
void onFailure(int code, String message);
}
class ContentManager {
private final Random random = new Random();
public List<String> getPermalinks(int n) {
List<String> permalinks = new ArrayList<>(n);
for (int i = 1; i <= n; ++i) {
permalinks.add("perma_" + i);
}
return permalinks;
}
public void getDataByPermalink(String permalink, RestCallback callback) {
getByPermalink(permalink, callback, false);
}
public void getStreamByPermalink(String permalink, RestCallback callback) {
getByPermalink(permalink, callback, true);
}
private void getByPermalink(String permalink, RestCallback callback, boolean stream) {
// simulate network latency and unordered results
new Thread(() -> {
try {
Thread.sleep(random.nextInt(1000) + 200);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (random.nextInt(100) < 95) {
String logoUrl;
String streamUrl;
if (stream) {
logoUrl = null;
streamUrl = "http://" + permalink + "/stream";
} else {
logoUrl = "http://" + permalink + "/logo.png";
streamUrl = null;
}
callback.onSuccess(new Content(permalink, logoUrl, streamUrl));
} else {
callback.onFailure(-1, permalink + " data failure");
}
}).start();
}
}
一般来说,它可以工作,但我不喜欢这个实现中的错误处理。基本上,REST 请求可能会失败,在这种情况下,onFailure 方法会调用subscriber.onNext(null),因此zip 方法总是可以使用(一个请求可能失败,但另一个请求可能没有,我不知道哪个失败)。然后,在zip 函数中,我需要一个if 来检查两者都不是null(如果Contents 中的任何一个是null,我的代码就会崩溃)。
如果可能的话,我希望能够在某处使用filter 运算符过滤掉null。或者也许有比为失败情况发出 null 值更好的方法,但它仍然适用于 zip 函数?
【问题讨论】:
-
为什么从
onFailure方法调用subscriber.onNext(null)?在这种情况下,您应该致电subscriber.onError(throwable)。 -
因为调用
onError()会使整个流失败,我不希望这样。我想收集尽可能多的结果,基本上忽略/过滤掉失败。还是我错了? -
即使您不希望整个流失败,您仍然需要调用
subscriber.onError()方法。还有一些其他方法可以减少错误。其中之一是onErrorResumeNext运算符。这是一个如何使用它的示例 - gist.github.com/nsk-mironov/3270b103fc3326a325e2. -
感谢您的 sn-p。因此,当压缩的任何
Observables失败时,zip也会失败;当它失败时,它只是说它不想爆炸或任何东西,而是用一个空的替换失败的(1-element)Observable,然后这个空的返回给flatMap- 这是正确的? -
是的,它就像你描述的那样工作。
标签: rx-java