【问题标题】:Properly handling empty Observable in RxJava在 RxJava 中正确处理空的 Observable
【发布时间】:2015-03-16 20:35:25
【问题描述】:

我有一种情况,我正在创建一个包含数据库结果的 Observable。然后我对它们应用一系列过滤器。然后我有一个正在记录结果的订阅者。可能没有元素通过过滤器。我的业务逻辑表明这不是错误。但是,当发生这种情况时,我的 onError 被调用并包含以下异常:java.util.NoSuchElementException: Sequence contains no elements

仅检测该类型的异常并忽略它是公认的做法吗?或者有没有更好的方法来处理这个问题?

版本是1.0.0。

这是一个简单的测试用例,它揭示了我所看到的。它似乎与在到达 map 和 reduce 之前过滤所有事件有关。

  @Test
public void test()
{

    Integer values[] = new Integer[]{1, 2, 3, 4, 5};

    Observable.from(values).filter(new Func1<Integer, Boolean>()
    {
        @Override
        public Boolean call(Integer integer)
        {
            if (integer < 0)
                return true;
            else
                return false;
        }
    }).map(new Func1<Integer, String>()
    {
        @Override
        public String call(Integer integer)
        {
            return String.valueOf(integer);
        }
    }).reduce(new Func2<String, String, String>()
    {
        @Override
        public String call(String s, String s2)
        {
            return s + "," + s2;
        }
    })

            .subscribe(new Action1<String>()
            {
                @Override
                public void call(String s)
                {
                    System.out.println(s);
                }
            });
}

因为我使用的是安全订阅者,所以它最初会抛出一个 OnErrorNotImplementedException,其中包含以下异常:

java.util.NoSuchElementException: Sequence contains no elements
    at rx.internal.operators.OperatorSingle$1.onCompleted(OperatorSingle.java:82)
    at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140)
    at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:73)
    at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45)
    at rx.internal.operators.OperatorTakeLast$1.onCompleted(OperatorTakeLast.java:59)
    at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:121)
    at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43)
    at rx.internal.operators.OperatorFilter$1.onCompleted(OperatorFilter.java:42)
    at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:79)
    at rx.internal.operators.OperatorScan$2$1.request(OperatorScan.java:147)
    at rx.Subscriber.setProducer(Subscriber.java:139)
    at rx.internal.operators.OperatorScan$2.setProducer(OperatorScan.java:139)
    at rx.Subscriber.setProducer(Subscriber.java:133)
    at rx.Subscriber.setProducer(Subscriber.java:133)
    at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:47)
    at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33)
    at rx.Observable$1.call(Observable.java:144)
    at rx.Observable$1.call(Observable.java:136)
    at rx.Observable$1.call(Observable.java:144)
    at rx.Observable$1.call(Observable.java:136)
    at rx.Observable$1.call(Observable.java:144)
    at rx.Observable$1.call(Observable.java:136)
    at rx.Observable$1.call(Observable.java:144)
    at rx.Observable$1.call(Observable.java:136)
    at rx.Observable$1.call(Observable.java:144)
    at rx.Observable$1.call(Observable.java:136)
    at rx.Observable.subscribe(Observable.java:7284)

根据下面@davem 的回答,我创建了一个新的测试用例:

@Test
public void testFromBlockingAndSingle()
{

    Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};

    List<String> results = Observable.from(values).filter(new Func1<Integer, Boolean>()
    {
        @Override
        public Boolean call(Integer integer)
        {
            if (integer < 0)
                return true;
            else
                return false;
        }
    }).map(new Func1<Integer, String>()
    {
        @Override
        public String call(Integer integer)
        {
            return String.valueOf(integer);
        }
    }).reduce(new Func2<String, String, String>()
    {
        @Override
        public String call(String s, String s2)
        {
            return s + "," + s2;
        }
    }).toList().toBlocking().single();

    System.out.println("Test: " + results + " Size: " + results.size());

}

并且此测试导致以下行为:

当输入为:

Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};

那么结果(如预期)是:

Test: [-2,-1] Size: 1

当输入是:

Integer values[] = new Integer[]{0, 1, 2, 3, 4, 5};

那么结果就是下面的堆栈跟踪:

java.util.NoSuchElementException: Sequence contains no elements
at rx.internal.operators.OperatorSingle$1.onCompleted(OperatorSingle.java:82)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140)
at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:73)
at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45)
at rx.internal.operators.OperatorTakeLast$1.onCompleted(OperatorTakeLast.java:59)
at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:121)
at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43)
at rx.internal.operators.OperatorFilter$1.onCompleted(OperatorFilter.java:42)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:79)
at rx.internal.operators.OperatorScan$2$1.request(OperatorScan.java:147)
at rx.Subscriber.setProducer(Subscriber.java:139)
at rx.internal.operators.OperatorScan$2.setProducer(OperatorScan.java:139)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.Subscriber.setProducer(Subscriber.java:133)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:47)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable$1.call(Observable.java:144)
at rx.Observable$1.call(Observable.java:136)
at rx.Observable.subscribe(Observable.java:7284)
at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:441)
at rx.observables.BlockingObservable.single(BlockingObservable.java:340)
at EmptyTest2.test(EmptyTest2.java:19)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)

所以看来问题肯定出在reduce函数的使用上。请参阅以下处理这两种情况的测试用例:

@Test
public void testNoReduce()
{

    Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};

    List<String> results = Observable.from(values).filter(new Func1<Integer, Boolean>()
    {
        @Override
        public Boolean call(Integer integer)
        {
            if (integer < 0)
                return true;
            else
                return false;
        }
    }).map(new Func1<Integer, String>()
    {
        @Override
        public String call(Integer integer)
        {
            return String.valueOf(integer);
        }
    }).toList().toBlocking().first();

    Iterator<String> itr = results.iterator();
    StringBuilder b = new StringBuilder();

    while (itr.hasNext())
    {
        b.append(itr.next());

        if (itr.hasNext())
            b.append(",");
    }

    System.out.println("Test NoReduce: " + b);

}

使用以下输入:

Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};

我得到以下预期的结果:

Test NoReduce: -2,-1

然后输入以下内容:

Integer values[] = new Integer[]{0, 1, 2, 3, 4, 5};

我得到以下预期的输出:

Test NoReduce: 

因此,除非我完全误解了某些东西,否则真正处理由过滤器产生的零元素 Observable 的唯一方法是在 Observable 链之外实现 reduce 逻辑。大家同意这个说法吗?


最终解决方案

这是我在实施 Tomáš Dvořák 和 David Motten 建议后的最终解决方案。我认为这个解决方案是合理的。

@Test
public void testWithToList()
{

    Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};

     Observable.from(values).filter(new Func1<Integer, Boolean>()
     {
         @Override
         public Boolean call(Integer integer)
         {
             if (integer < 0)
                 return true;
             else
                 return false;
         }
     }).toList().map(new Func1<List<Integer>, String>()
     {
         @Override
         public String call(List<Integer> integers)
         {
             Iterator<Integer> intItr = integers.iterator();
             StringBuilder b = new StringBuilder();

             while (intItr.hasNext())
             {
                 b.append(intItr.next());

                 if (intItr.hasNext())
                 {
                     b.append(",");
                 }
             }

             return b.toString();
         }
     }).subscribe(new Action1<String>()
     {
         @Override
         public void call(String s)
         {
             System.out.println("With a toList: " + s);
         }
     });

}

这是在给定以下输入时此测试的行为方式。

当给定的流将通过过滤器传递一些值时:

Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5};

结果是:

With a toList: -2,-1

当给定一个没有任何值的流通过过滤器时:

Integer values[] = new Integer[]{0, 1, 2, 3, 4, 5};

结果是:

With a toList: <empty string>

【问题讨论】:

  • 请发布一些代码,这样我们就不必从堆栈跟踪中推断它了。
  • 你说得对,我没有添加测试用例。我添加了一个简单的。

标签: java rx-java


【解决方案1】:

现在更新后,错误非常明显。 RxJava 中的 Reduce 将失败并返回 IllegalArgumentException 如果它正在减少的 observable 为空,完全符合规范 (http://reactivex.io/documentation/operators/reduce.html)。

在函数式编程中,通常有两个通用运算符将​​集合聚合为单个值,foldreduce。在公认的术语中,fold 采用初始累加器值,以及一个函数,该函数采用正在运行的累加器和集合中的值并产生另一个累加器值。伪代码示例:

[1, 2, 3, 4].fold(0, (accumulator, value) =&gt; accumulator + value)

将从 0 开始,最终将 1、2、3、4 添加到正在运行的累加器中,最后产生 10,即值的总和。

Reduce 非常相似,只是它不显式地取累加器的初始值,它使用第一个值作为初始累加器,然后累加所有剩余的值。如果您是例如,这是有道理的。寻找最小值或最大值。

[1, 2, 3, 4].reduce((accumulator, value) =&gt; min(accumulator, value))

查看 fold 和 reduce 不同的方式,您可能会使用 fold,只要聚合值即使在空集合上也有意义(例如,在 sum 中,0 有意义),否则 reduce(@987654332 @ 对空集合没有意义,reduce 将无法对此类集合进行操作,在您的情况下会抛出异常)。

您正在执行类似的聚合,用逗号散布字符串集合以生成单个字符串。这是一个更困难的情况。这可能对空集合有意义(您可能期望一个空字符串),但另一方面,如果您从一个空累加器开始,结果中的逗号将比您预期的多一个。对此的正确解决方案是首先检查集合是否为空,然后为空集合返回后备字符串,或者对非空集合执行reduce。您可能会观察到,通常您实际上并不希望在空集合案例中使用空字符串,但“集合为空”之类的内容可能更合适,从而进一步向您保证此解决方案是干净的。

顺便说一句,我在这里使用 collection 这个词而不是 observable,只是为了教育目的。另外,在 RxJava 中,foldreduce 的调用方式相同,reduce,只是该方法有两个版本,一个只带一个参数,另一个带两个参数。

至于你的最后一个问题:你不必离开 Observable 链。正如 David Motten 建议的那样,只需使用 toList()。

.filter(...)
.toList()
.map(listOfValues => listOfValues.intersperse(", "))

intersperse 可以根据 reduce 实现,如果还不是库函数(这很常见)。

collection.intersperse(separator) = 
    if (collection.isEmpty()) 
      ""
    else
      collection.reduce(accumulator, element => accumulator + separator + element)

【讨论】:

  • 感谢您的详细解释。这有很大帮助。我用我的最终解决方案更新了问题。
【解决方案2】:

发生这种情况的原因是您在空流上使用toBlocking().single()。如果您希望流中有 0 或 1 个值,您可以执行 toList().toBlocking().single() 然后检查列表中的值(可能为空,但不会引发您得到的异常)。

【讨论】:

  • 干得好,仅从堆栈跟踪推断,绝对没有代码:) +1 为您,-1 为问题。只是,你不是说检查toBlocking.toList 的结果而不是toList.toBlocking.single 吗?
  • 谢谢。 toBlocking.toList 不是 1.0.8 API 的一部分,但可以使用 toBlocking.toIterabletoBlocking.singleOrDefault 是另一种可能性,但没有详细信息,toList 版本涵盖了它。
  • 我明白了,我没有阅读文档并认为原来的 toList 是一个错误,我什至不知道它的存在。如果我只是在第一个元素之后节省资源,我可能会使用.take(1).toBlocking.singleOrDefault,但正如你所说,谁知道什么适合这种特殊情况。
  • 根据我发布的示例代码,这个答案是否仍然有意义?
【解决方案3】:

您可以将 reduce 与初始值版本 .reduce(0, (x,y) -&gt; x+y ) 一起使用,它应该像 Tomáš Dvořák 解释的折叠运算符一样工作

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-03-12
    • 2016-02-06
    • 2016-06-24
    • 1970-01-01
    相关资源
    最近更新 更多