【问题标题】:Java 8 lambda apiJava 8 拉姆达 API
【发布时间】:2015-02-21 18:41:39
【问题描述】:

我正在努力从 Rx Java 迁移到 Java 8 lambdas。我找不到的一个例子是一种缓冲请求的方法。例如,在 Rx Java 中,我可以这样说。

Observable.create(getIterator()).buffer(20, 1000, TimeUnit. MILLISECONDS).doOnNext(list -> doWrite(list));

我们将 20 个元素缓冲到一个列表中,或者在 1000 毫秒时超时,这是最先发生的。

RX 中的 Observable 是一种“推送”风格的 observable,而 Streams 使用 java pull。这是否有可能在流中实现我自己的映射操作,或者由于doOnNext 必须轮询前一个元素,因此无法发出导致问题?

【问题讨论】:

标签: java lambda java-8 java-stream


【解决方案1】:

一种方法是使用 BlockingQueue 和 Guava。使用Queues.drain,您可以创建一个Collection,然后您可以调用stream() 并进行转换。这是一个链接:Guava Queues.drain

这是一个简单的例子:

public void transform(BlockingQueue<Something> input) 
{
    List<Something> buffer = new ArrayList<>(20);
    Queues.drain(input, buffer, 20, 1000, TimeUnit.MILLISECONDS);
    doWrite(buffer);
}

【讨论】:

    【解决方案2】:

    simple-react 有类似的运算符,但不是这个确切的运算符。虽然它是相当可扩展的,所以应该可以编写自己的。需要注意的是,我没有在 IDE 中编写或测试它,使用 timeout 运算符的简单反应的大致缓冲区看起来像这样

      import com.aol.simple.react.async.Queue;
      import com.aol.simple.react.stream.traits.LazyFutureStream;
      import com.aol.simple.react.async.Queue.ClosedQueueException;
      import com.aol.simple.react.util.SimpleTimer;
      import java.util.concurrent.TimeUnit;
    
      static LazyFutureStream batchBySizeAndTime(LazyFutureStream stream,int size,long time, TimeUnit unit) { 
        Queue queue = stream.toQueue();
        Function<Supplier<U>, Supplier<Collection<U>>> fn = s -> {
            return () -> {
                SimpleTimer timer = new SimpleTimer();
                List<U> list = new ArrayList<>();
                try {
                    do {
                        if(list.size()==size())
                            return list;
                        list.add(s.get());
                    } while (timer.getElapsedNanoseconds()<unit.toNanos(time));
                } catch (ClosedQueueException e) {
    
                    throw new ClosedQueueException(list);
                }
                return list;
            };
        };
        return stream.fromStream(queue.streamBatch(stream.getSubscription(), fn));
    }
    

    【讨论】:

      猜你喜欢
      • 2020-05-20
      • 1970-01-01
      • 2019-09-05
      • 1970-01-01
      • 1970-01-01
      • 2017-08-28
      • 2011-08-28
      • 2022-10-04
      • 1970-01-01
      相关资源
      最近更新 更多