【问题标题】:Batching Results from ReactiveX ObservableReactiveX Observable 的批处理结果
【发布时间】:2017-08-26 22:03:40
【问题描述】:

假设我有一个看起来像这样的 observable(这是 Python,但应该对所有语言都通用):

rx.Observable.from_iterable([[1],[],[2],[3],[],[],[4],[5,6],[7],[8,9],[10]])

我希望最终能够将整数批处理成长度为 5 的列表,并能够将它们传递给一个函数,所以像这样:

batch_function([1,2,3,4,5])
batch_function([6,7,8,9,10])

实际上,传入的数据将是(可能为空的)列表的无限流。我只是想确保在累积 5 个实际值之前,我不会对batch_function 进行后续调用。感谢您的帮助。

【问题讨论】:

  • 最后是否需要一个 5 元素列表流?订阅流的观察者并在观察者中手动缓冲就足够了,每当缓冲区超过 5 个元素时调用batch_function
  • @concat 我刚刚使用buffer_with_count 发布了对我有用的内容。我需要该列表,因为将批次发送到下游功能更有效。

标签: reactivex rx-py


【解决方案1】:

以下 sn-p 使用 buffer_with_count 为我工作。不过,我不确定是否有更简洁的方法,即没有flat_map

BUFFER_COUNT=5
rx.Observable.from_iterable(iter(get_items())) \
  .flat_map(lambda it: it) \
  .buffer_with_count(BUFFER_COUNT) \
  .map(lambda my_array: do_something_with(my_array)) \
  .subscribe(lambda it: print(it))

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-06-19
    • 2019-07-17
    • 2012-01-05
    • 1970-01-01
    相关资源
    最近更新 更多