【问题标题】:Error handling for zipped observables压缩 observables 的错误处理
【发布时间】:2015-06-14 19:11:31
【问题描述】:

我的用例是:我得到一个永久链接列表,并且需要为每个永久链接发出两个 REST 请求以获取部分数据。当两个请求都返回时,我想将他们的信息合并在一起并用它做一些事情(在这里 - 打印出来)。我想使用 zip 运算符的代码来做到这一点。这是我当前的代码(以及我正在使用的库的模拟):

public class Main {

    public static void main(String[] args) {
        ContentManager cm = new ContentManager();

        Observable
                .from(cm.getPermalinks(10))
                .flatMap(permalink -> Observable.zip(
                        Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))),
                        Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))),
                        (dataContent, streamUrlContent) -> {
                            if (dataContent == null || streamUrlContent == null) {
                                System.err.println("not zipping " + dataContent + " and " + streamUrlContent);
                                return Observable.empty();
                            }

                            return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl);
                        }))
                .subscribe(System.out::println);
    }
}

class SubscribingRestCallback implements RestCallback {

    private final Subscriber<? super Content> subscriber;

    public SubscribingRestCallback(Subscriber<? super Content> subscriber) {
        this.subscriber = subscriber;
    }

    @Override
    public void onSuccess(Content content) {
        subscriber.onNext(content);
        subscriber.onCompleted();
    }

    @Override
    public void onFailure(int code, String message) {
        System.err.println(message);
        subscriber.onNext(null);
        subscriber.onCompleted();
    }
}

public class Content {

    public final String permalink;

    public final String logoUrl;

    public final String streamUrl;

    public Content(String permalink, String logoUrl, String streamUrl) {
        this.permalink = permalink;
        this.logoUrl = logoUrl;
        this.streamUrl = streamUrl;
    }

    @Override
    public String toString() {
        return String.format("Content [%s, %s, %s]", permalink, logoUrl, streamUrl);
    }
}

public interface RestCallback {

    void onSuccess(Content content);

    void onFailure(int code, String message);
}

class ContentManager {

    private final Random random = new Random();

    public List<String> getPermalinks(int n) {
        List<String> permalinks = new ArrayList<>(n);
        for (int i = 1; i <= n; ++i) {
            permalinks.add("perma_" + i);
        }

        return permalinks;
    }

    public void getDataByPermalink(String permalink, RestCallback callback) {
        getByPermalink(permalink, callback, false);
    }

    public void getStreamByPermalink(String permalink, RestCallback callback) {
        getByPermalink(permalink, callback, true);
    }

    private void getByPermalink(String permalink, RestCallback callback, boolean stream) {
        // simulate network latency and unordered results
        new Thread(() -> {
            try {
                Thread.sleep(random.nextInt(1000) + 200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (random.nextInt(100) < 95) {
                String logoUrl;
                String streamUrl;
                if (stream) {
                    logoUrl = null;
                    streamUrl = "http://" + permalink + "/stream";
                } else {
                    logoUrl = "http://" + permalink + "/logo.png";
                    streamUrl = null;
                }
                callback.onSuccess(new Content(permalink, logoUrl, streamUrl));
            } else {
                callback.onFailure(-1, permalink + " data failure");
            }
        }).start();
    }
}

一般来说,它可以工作,但我不喜欢这个实现中的错误处理。基本上,REST 请求可能会失败,在这种情况下,onFailure 方法会调用subscriber.onNext(null),因此zip 方法总是可以使用(一个请求可能失败,但另一个请求可能没有,我不知道哪个失败)。然后,在zip 函数中,我需要一个if 来检查两者都不是null(如果Contents 中的任何一个是null,我的代码就会崩溃)。

如果可能的话,我希望能够在某处使用filter 运算符过滤掉null。或者也许有比为失败情况发出 null 值更好的方法,但它仍然适用于 zip 函数?

【问题讨论】:

  • 为什么从onFailure方法调用subscriber.onNext(null)?在这种情况下,您应该致电 subscriber.onError(throwable)
  • 因为调用onError() 会使整个流失败,我不希望这样。我想收集尽可能多的结果,基本上忽略/过滤掉失败。还是我错了?
  • 即使您不希望整个流失败,您仍然需要调用subscriber.onError() 方法。还有一些其他方法可以减少错误。其中之一是onErrorResumeNext 运算符。这是一个如何使用它的示例 - gist.github.com/nsk-mironov/3270b103fc3326a325e2.
  • 感谢您的 sn-p。因此,当压缩的任何Observables 失败时,zip 也会失败;当它失败时,它只是说它不想爆炸或任何东西,而是用一个空的替换失败的(1-element)Observable,然后这个空的返回给flatMap - 这是正确的?
  • 是的,它就像你描述的那样工作。

标签: rx-java


【解决方案1】:

首先,通知Subscriber 错误的正确方法是调用subscriber.onError 方法:

class SubscribingRestCallback implements RestCallback {
    private final Subscriber<? super Content> subscriber;

    public SubscribingRestCallback(Subscriber<? super Content> subscriber) {
        this.subscriber = subscriber;
    }

    @Override
    public void onSuccess(Content content) {
        subscriber.onNext(content);
        subscriber.onCompleted();
    }

    @Override
    public void onFailure(int code, String message) {
        subscriber.onError(new Exception(message));
    }
}

即使您不希望整个流失败,您仍然需要调用subscriber.onError() 方法。还有一些其他方法可以减少错误。其中之一是onErrorResumeNext 运算符:

Observable
        .from(cm.getPermalinks(10))
        .flatMap(permalink -> Observable.zip(
                Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))),
                Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))),
                (dataContent, streamUrlContent) -> {
                    return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl);
                }).onErrorResumeNext(Observable.empty()))
        .subscribe(System.out::println);

编辑

我有最后一个问题:如果你注意到我的拉链功能,我会回来 Observable.empty() 如果两个对象不能被压缩,并且一旦我 返回内容。这似乎是错误的。我应该如何处理这样的错误 拉链函数中的条件?

是的,返回Observable.empty() 是完全错误的。从zip 函数中抛出异常似乎是最好的解决方案:

Observable
        .from(cm.getPermalinks(10))
        .flatMap(permalink -> Observable.zip(
                Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))),
                Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))),
                (dataContent, streamUrlContent) -> {
                    if (!isDataValid(dataContent, streamUrlContent)) {
                        throw new RuntimeException("Something went wrong.");
                    }
                    return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl);
                }).onErrorResumeNext(Observable.empty()))
        .subscribe(System.out::println);

【讨论】:

  • 我最终做的是:RestCallback#onFailure() 方法调用subscriber.onError(),在RestCallback#onSuccess() 中,如果内容有效,我要么调用onNext()onComplete(),要么调用onError()如果不。然后,将onResumeNext() 应用于压缩后的Observable 也可以处理此类内容错误,而我的压缩功能就像(dataContent, streamUrlContent) -&gt; new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl)) 一样简单。我添加了doOnError() 调用以额外记录。我喜欢现在的代码。感谢您的帮助。
猜你喜欢
  • 1970-01-01
  • 2018-09-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-12-22
  • 1970-01-01
  • 2011-06-02
  • 2014-01-06
相关资源
最近更新 更多