【发布时间】:2015-01-06 11:23:35
【问题描述】:
我需要帮助在 RxJava (RxScala) 中实现类似旋转门的运算符。我花了很长时间思考它,但我似乎被困住了。
函数的类型应该如下:
def turnstile[T](queue: Observable[T], turnstile: Observable[Boolean]): Observable[T]
这个想法是操作员的行为应该与真正的旋转门非常相似。有人来了(queue),有一个turnstile要么准备好接受新的single人(闸机里有一个true元素,你可以把它想象成一个令牌插入旋转门),或关闭(false 在旋转门中,取消之前的令牌)。对于旋转栅门中的每个true 元素,只有一个人可以通过。
此外,在没有人通过的情况下连续插入多个标记(旋转门中的几个true 项目)与仅插入一个标记相同,旋转门不计算标记。
换句话说,旋转门最初是关闭的。当true 元素出现在其中时,它会为一个人打开。如果一个人出现,它会通过(到输出)并且旋转门再次关闭。如果旋转门中出现false 元素,则旋转门也会关闭。
queue ----A---B-------------C--D--
turnstile --T--------T--T-T-T-T------T
============================
output ----A------B----------C----D
一个大理石图显示打开的旋转门等待 A 人,然后 B 人等待旋转门打开,然后几个令牌表现得像一个人 - C 人通过,但 D 人必须再次等待新的令牌
----A----B--
--T---T-F-T-
============
----A-----B-
显示旋转门中的false 元素如何再次关闭旋转门的大理石图。
感谢任何帮助。我认为在不编写自定义运算符的情况下实现这一点的唯一方法是以某种方式使用zip 运算符,因为它可能是唯一使一个序列中的元素等待另一个序列中的元素的运算符(或者还有其他我'我不知道?)。但是我需要不压缩一些旋转栅门元素,具体取决于它们是否与人配对......
我认为这是一个有趣的问题,我很好奇有什么好的解决方案。
【问题讨论】:
-
你看过and-then-when运营商,也许结合distinct?
-
@AdamS 文档很少,但在我看来,这只是
zip的另一种语法,它也允许zip的arity > 2(RxJava 已经允许)。还是我弄错了?请您详细说明这将如何帮助解决我的问题? -
我想也许我在粗略浏览这些函数时误解了它们的含义 - 你似乎很正确。不过,有趣的问题。我会继续考虑的。
-
见this Rx.NET GIST。它依赖于 Rx 的
Create的异步迭代器重载,它使用 .NET 的Task<T>来允许用户定义一个充当状态机的协程。在以前的 Rx.NET (1.1) 实验版本中,在 C# 5.0 中引入协程之前,使用IEnumerable<T>而不是Task<T>的替代Create重载。或者,您也可以只使用Scan运算符来做类似的事情,这本质上就像一个运行聚合。也许知道 RxJava 的人可以转换它。 -
@Dave 感谢您提供所有链接。顺便说一句,我刚开始读你的博客,这是很棒的东西。我一直怀疑 Hot 和 Cold observables 的定义有些烂,而你对它的看法(对订阅有/没有副作用)更有意义:)
标签: reactive-programming rx-java