【问题标题】:Implementing a turnstile-like operator with RxJava使用 RxJava 实现类似旋转门的操作符
【发布时间】:2015-01-06 11:23:35
【问题描述】:

我需要帮助在 RxJava (RxScala) 中实现类似旋转门的运算符。我花了很长时间思考它,但我似乎被困住了。

函数的类型应该如下:

def turnstile[T](queue: Observable[T], turnstile: Observable[Boolean]): Observable[T]

这个想法是操作员的行为应该与真正的旋转门非常相似。有人来了(queue),有一个turnstile要么准备好接受新的single人(闸机里有一个true元素,你可以把它想象成一个令牌插入旋转门),或关闭(false 在旋转门中,取消之前的令牌)。对于旋转栅门中的每个true 元素,只有一个人可以通过。

此外,在没有人通过的情况下连续插入多个标记(旋转门中的几个true 项目)与仅插入一个标记相同,旋转门不计算标记。

换句话说,旋转门最初是关闭的。当true 元素出现在其中时,它会为一个人打开。如果一个人出现,它会通过(到输出)并且旋转门再次关闭。如果旋转门中出现false 元素,则旋转门也会关闭。

queue       ----A---B-------------C--D--
turnstile   --T--------T--T-T-T-T------T
            ============================
output      ----A------B----------C----D

一个大理石图显示打开的旋转门等待 A 人,然后 B 人等待旋转门打开,然后几个令牌表现得像一个人 - C 人通过,但 D 人必须再次等待新的令牌

----A----B--
--T---T-F-T-
============
----A-----B-

显示旋转门中的false 元素如何再次关闭旋转门的大理石图。

感谢任何帮助。我认为在不编写自定义运算符的情况下实现这一点的唯一方法是以某种方式使用zip 运算符,因为它可能是唯一使一个序列中的元素等待另一个序列中的元素的运算符(或者还有其他我'我不知道?)。但是我需要压缩一些旋转栅门元素,具体取决于它们是否与人配对......

我认为这是一个有趣的问题,我很好奇有什么好的解决方案。

【问题讨论】:

  • 你看过and-then-when运营商,也许结合distinct
  • @AdamS 文档很少,但在我看来,这只是zip 的另一种语法,它也允许zip 的arity > 2(RxJava 已经允许)。还是我弄错了?请您详细说明这将如何帮助解决我的问题?
  • 我想也许我在粗略浏览这些函数时误解了它们的含义 - 你似乎很正确。不过,有趣的问题。我会继续考虑的。
  • this Rx.NET GIST。它依赖于 Rx 的 Create 的异步迭代器重载,它使用 .NET 的 Task<T> 来允许用户定义一个充当状态机的协程。在以前的 Rx.NET (1.1) 实验版本中,在 C# 5.0 中引入协程之前,使用 IEnumerable<T> 而不是 Task<T> 的替代 Create 重载。或者,您也可以只使用Scan 运算符来做类似的事情,这本质上就像一个运行聚合。也许知道 RxJava 的人可以转换它。
  • @Dave 感谢您提供所有链接。顺便说一句,我刚开始读你的博客,这是很棒的东西。我一直怀疑 Hot 和 Cold observables 的定义有些烂,而你对它的看法(对订阅有/没有副作用)更有意义:)

标签: reactive-programming rx-java


【解决方案1】:

所以我认为我有一个更清洁、完全 Rx 的解决方案。这实际上是一个非常有趣的问题。如果它可以满足您的需求,我认为它最终会非常优雅,尽管它需要很长时间才能实现。

遗憾的是,我不了解 Scala,因此您将不得不处理我的 Java8 lambda。 :D

整个实现:

public static Observable<String> getTurnstile(final Observable<String> queue, final Observable<Boolean> tokens) {
    return queue.publish(sharedQueue ->
            tokens.switchMap(token -> token ? sharedQueue.limit(1) : Observable.empty()));
}

所以,这里发生的事情是我们使用publish 来创建可以多次订阅的人员队列的共享 observable。在其中,我们在令牌流上使用switchMap,这意味着任何时候从 switchMap 发出新的 Observable 时,它​​都会丢弃最后一个并订阅新的 Observable。每当令牌为真时,它都会对人员队列进行新的订阅(并且连续多个为真很好,因为它正在取消旧的订阅)。当它为 false 时,它​​只是转储一个空的 Observable,以免浪费时间。

还有一些(通过)测试用例:

@RunWith(JUnit4.class)
public class TurnstileTest {
    private final TestScheduler scheduler = new TestScheduler();
    private final TestSubscriber<String> output = new TestSubscriber<>();

    private final TestSubject<Boolean> tokens = TestSubject.create(scheduler);
    private final TestSubject<String> queue = TestSubject.create(scheduler);

    @Before
    public void setup() {
        Turnstile.getTurnstile(queue, tokens).subscribe(output);
    }

    @Test
    public void allowsOneWithTokenBefore() {
        tokens.onNext(true, 0);
        queue.onNext("Bill", 1);
        queue.onNext("Bob", 2);

        assertPassedThrough("Bill");
    }

    @Test
    public void tokenBeforeIsCancelable() {
        tokens.onNext(true, 0);
        tokens.onNext(false, 1);
        queue.onNext("Bill", 2);

        assertNonePassed();
    }

    @Test
    public void tokensBeforeAreCancelable() {
        tokens.onNext(true, 0);
        tokens.onNext(true, 1);
        tokens.onNext(true, 2);
        tokens.onNext(false, 3);
        queue.onNext("Bill", 4);

        assertNonePassed();
    }

    @Test
    public void eventualPassThroughAfterFalseTokens() {
        tokens.onNext(false, 0);
        queue.onNext("Bill", 1);
        tokens.onNext(false, 2);
        tokens.onNext(false, 3);
        queue.onNext("Jane", 4);
        queue.onNext("Bob", 5);
        tokens.onNext(true, 6);
        tokens.onNext(true, 7);
        tokens.onNext(false, 8);
        tokens.onNext(false, 9);
        queue.onNext("Phil", 10);
        tokens.onNext(false, 11);
        tokens.onNext(false, 12);
        tokens.onNext(true, 13);

        assertPassedThrough("Bill", "Jane", "Bob");
    }

    @Test
    public void allowsOneWithTokenAfter() {
        queue.onNext("Bill", 0);
        tokens.onNext(true, 1);
        queue.onNext("Bob", 2);

        assertPassedThrough("Bill");
    }

    @Test
    public void multipleTokenEntriesBeforeOnlyAllowsOneAtATime() {
        tokens.onNext(true, 0);
        tokens.onNext(true, 1);
        tokens.onNext(true, 2);
        queue.onNext("Bill", 3);
        tokens.onNext(true, 4);
        tokens.onNext(true, 5);
        queue.onNext("Jane", 6);
        queue.onNext("John", 7);

        assertPassedThrough("Bill", "Jane");
    }

    @Test
    public void noneShallPassWithoutToken() {
        queue.onNext("Jane", 0);
        queue.onNext("John", 1);

        assertNonePassed();
    }

    private void closeSubjects() {
        scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
        scheduler.triggerActions();
        tokens.onCompleted();
        queue.onCompleted();
        scheduler.triggerActions();
    }

    private void assertNonePassed() {
        closeSubjects();
        output.assertReceivedOnNext(Lists.newArrayList());
    }

    private void assertPassedThrough(final String... names) {
        closeSubjects();
        output.assertReceivedOnNext(Lists.newArrayList(names));
    }
}

如果您发现任何不适用于此功能的边缘情况,请告诉我,尤其是在实时存在问题时,因为测试显然是在受控环境中。

【讨论】:

  • 是的,它确实有效,确实通过了我的测试,并且是一个不错的解决方案(回想起来很简单:)。它的 Scala 版本是queue.publish(sharedQueue ⇒ tokens.switchMap(token ⇒ if (token) sharedQueue.take(1) else Observable.empty))。谢谢,我很高兴你在解决这个问题时玩得开心。
  • 实际上,我越想,解决方案对我来说就越不明显。似乎主要技巧是使用publish 在内部对元素进行排队。我将不得不考虑一段时间..
  • 所以,我对publish 运算符进行了一些探讨。它有几种工作方式。 “正常”方式返回一个ConnectableObservable,当您连接到它时开始流式传输。这里与选择器一起使用的publish 有点不同。它提供了一个(常规的Observable 版本的Observable,它可以被多次订阅。它还管理从订阅中请求项目,这解释了为什么我们可以在订阅之间切换而不会丢失项目。一个需要注意的是,如果 queue 生成项目太快,您可能需要 onBackpressureBuffer
【解决方案2】:

好的,我找到了一个解决方案,灵感来自 Dave Sexton 的评论。最后我没有使用zip,因为我无法用它找到解决方案。

我基本上将闸机实现为具有三个状态变量的状态机:是否锁定,等待通过闸机的元素队列,以及通过闸机的最后一个元素(这些都收集在最后产生实际的输出)。

状态机的输入是一个转换请求流,它由两个输入流合并而成:锁定/解锁请求流和要通过闸机的元素流。我只是用scan 处理转换,然后collect 处理来自结果状态的传递元素。

/** sample elements from queue through turnstile, one at a time
*
* @param queue source of elements to pass through the turnstile.
* @param turnstile For every `true` in the turnstile pass one element through from the queue
* @tparam T type of the elements
* @return the source of queue elements passing through the turnstile
*/
def queueThroughTurnstile[T](queue: Observable[T], turnstile: Observable[Boolean]): Observable[T] = {
  import scala.collection.immutable.Queue

  case class State(isOpen: Boolean, elementsInQueue: Queue[T], maybeLastEmittedElement: Option[T])
  sealed abstract class Transition
  case object Lock extends Transition
  case object Unlock extends Transition
  case class Element(element: T) extends Transition

  val initialState = State(isOpen = false, Queue.empty, None)

  queue.map(element ⇒ Element(element))
    .merge(turnstile map (unlock ⇒ if (unlock) Unlock else Lock))
    .scan(initialState) { case (State(isOpen, elementsInQueue, _), transition) ⇒ transition match {
    case Lock ⇒ State(isOpen = false, elementsInQueue, None)
    case Unlock ⇒ {
      if (elementsInQueue.isEmpty)
        State(isOpen = true, elementsInQueue, None)
      else {
        val (firstElement, newQueue) = elementsInQueue.dequeue
        State(isOpen = false, newQueue, Some(firstElement))
      }
    }
    case Element(newElement) ⇒ {
      if (isOpen) {
        if (elementsInQueue.isEmpty)
          State(isOpen = false, Queue.empty, Some(newElement))
        else {
          val (firstElement, newQueue) = elementsInQueue.dequeue
          State(isOpen = false, newQueue enqueue newElement, Some(firstElement))
        }  
      } else {
        State(isOpen = false, elementsInQueue enqueue newElement, None)
      }
    }
  }
  }.collect { case State(_, _, Some(lastEmittedElement)) ⇒ lastEmittedElement}
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多