【问题标题】:Java Collector with Closeable resource as accumulator具有可关闭资源作为累加器的 Java 收集器
【发布时间】:2018-03-15 10:49:39
【问题描述】:

假设我正在尝试创建一个收集器,将数据聚合到使用后必须关闭的资源中。有没有办法在Collector 中实现类似于finally 块的东西?在成功的情况下,这可以在finisher 方法中完成,但似乎没有在异常情况下调用任何方法。

目标是以干净的方式实现如下操作,而无需先将流收集到内存列表中。

stream.collect(groupingBy(this::extractFileName, collectToFile()));

【问题讨论】:

  • 所以你在想Collector#onError之类的东西?如果是这样,没有这样的事情,你必须自己做......
  • @Eugene 是的,或者可能是一种干净地将onClose 方法附加到正在收集的流的方法。
  • @FedericoPeraltaSchaffner 我希望关闭每个组并传播异常。在我的示例中,Collector.supplier() 提供的累加器可以实现 Closeable 并且应该像在 try-with-resources 块中调用它一样。
  • @FedericoPeraltaSchaffner 任何已检查的异常都必须包装到未检查的异常中,例如 UncheckedIOException。但我认为这个问题比文件 io.无论是成功完成还是出现任何异常,我都想close 一个资源。再看一些,我可能可以包装所有收集器函数来执行此操作,但是流处理本身内部仍然存在任何异常的边缘情况。我不认为有任何保证这些永远不会抛出。
  • @FedericoPeraltaSchaffner 这实际上是我评论中的第一个想法(删除它,因为它看起来太愚蠢了 :| )。您确实可以包装收集器中的每个方法,但我的疑问是,虽然一些线程捕获了异常,而其他一些线程正在/将计划写入同一个文件。这些线程需要以某种方式进行通信,我认为 OP 将提供他希望如何处理此类情况的详细信息

标签: java java-stream collectors


【解决方案1】:

我认为您可以满足您的要求的唯一方法是通过提供给Stream.onClose 方法的关闭处理程序。假设你有以下类:

class CloseHandler implements Runnable {
    List<Runnable> children = new ArrayList<>();

    void add(Runnable ch) { children.add(ch); }

    @Override
    public void run() { children.forEach(Runnable::run); }
}

现在,您需要按如下方式使用您的流:

CloseHandler closeAll = new CloseHandler();
try (Stream<Something> stream = list.stream().onClose(closeAll)) {
    // Now collect
    stream.collect(Collectors.groupingBy(
        this::extractFileName, 
        toFile(closeAll)));
}

这使用try-with-resources 构造,因此流在使用时或发生错误时会自动关闭。请注意,我们将 closeAll 关闭处理程序传递给 Stream.onClose 方法。

这是您的下游收集器的草图,它将收集/写入/发送元素到 Closeable 资源(请注意,我们还将 closeAll 关闭处理程序传递给它):

static Collector<Something, ?, Void> toFile(CloseHandler closeAll) {

    class Acc {

        SomeResource resource; // this is your closeable resource

        Acc() {
            try {
                resource = new SomeResource(...); // create closeable resource
                closeAll.add(this::close);        // this::close is a Runnable
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }

        void add(Something elem) {
            try {
                // TODO write/send to closeable resource here
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }

        Acc merge(Acc another) {
            // TODO left as an exercise
        }

        // This is the close handler for this particular closeable resource
        private void close() {
            try {
                // Here we close our closeable resource
                if (resource != null) resource.close();
            } catch (IOException ignored) {
            }
        }
    }
    return Collector.of(Acc::new, Acc::add, Acc::merge, a -> null);
}

因此,这使用了一个本地类(名为Acc)来包装可关闭资源,并将方法声明到add 流的元素到可关闭资源,以及merge 两个Acc 实例如果流是并行的(留作练习,以防值得付出努力)。

Collector.of 用于基于Acc 类的方法创建一个收集器,并带有一个返回null 的终结器,因为我们不想在Collectors.groupingBy 创建的地图中放置任何东西。

最后,还有close 方法,它会关闭包装好的可关闭资源,以防它被创建。

当流通过try-with-resources 构造隐式关闭时,CloseHandler.run 方法将自动执行,这将依次执行之前在每个Acc 实例创建时添加的所有子关闭处理程序。

【讨论】:

  • merge 主要是阻止我发布这样的内容...除非 OP 提供有关 ThreadA 抛出异常(在 merge 中)和 ThreadB 时究竟发生了什么的详细信息想要写入同一个文件,甚至不确定这是要求的一部分
  • ... 无需先将流收集到内存列表中,因此似乎有一些 IO/DB/JMS。如果 OP 确认,我很乐意顺便投票
  • 谢谢,可变的onClose 回调的组合,每个累加器都被注册,很好地解决了这个问题。目前我并不关心并行处理,合并操作是否可能(并且比顺序处理更快)在很大程度上取决于资源。实际上,后续问题是是否保证顺序流永远不会调用收集器的组合器方法。
  • @JörnHorstmann 由规范保证,请查看Collector javadocs
【解决方案2】:

好的,我查看了Collectors 的实现,您需要CollectorImpl 来创建自定义收集器,但它不公开。所以我使用它的副本来实现一个新的(你可能感兴趣的最后两种方法):

public class CollectorUtils<T, A, R> implements Collector<T, A, R> {

    static final Set<Collector.Characteristics> CH_ID = Collections
            .unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH));

    private final Supplier<A> supplier;
    private final BiConsumer<A, T> accumulator;
    private final BinaryOperator<A> combiner;
    private final Function<A, R> finisher;
    private final Set<Characteristics> characteristics;

    CollectorUtils(Supplier<A> supplier, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner,
            Function<A, R> finisher, Set<Characteristics> characteristics) {
        this.supplier = supplier;
        this.accumulator = accumulator;
        this.combiner = combiner;
        this.finisher = finisher;
        this.characteristics = characteristics;
    }

    CollectorUtils(Supplier<A> supplier, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner,
            Set<Characteristics> characteristics) {
        this(supplier, accumulator, combiner, castingIdentity(), characteristics);
    }

    @Override
    public BiConsumer<A, T> accumulator() {
        return accumulator;
    }

    @Override
    public Supplier<A> supplier() {
        return supplier;
    }

    @Override
    public BinaryOperator<A> combiner() {
        return combiner;
    }

    @Override
    public Function<A, R> finisher() {
        return finisher;
    }

    @Override
    public Set<Characteristics> characteristics() {
        return characteristics;
    }

    @SuppressWarnings("unchecked")
    private static <I, R> Function<I, R> castingIdentity() {
        return i -> (R) i;
    }

    public static <C extends Collection<File>> Collector<String, ?, C> toFile() {
        return new CollectorUtils<>((Supplier<List<File>>) ArrayList::new, (c, t) -> {
            c.add(toFile(t));
        }, (r1, r2) -> {
            r1.addAll(r2);
            return r1;
        }, CH_ID);
    }

    private static File toFile(String fileName) {
        try (Closeable type = () -> System.out.println("Complete! closing file " + fileName);) {
            // stuff
            System.out.println("Converting " + fileName);

            return new File(fileName);
        } catch (FileNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        throw new RuntimeException("Failed to create file");

    }

}

然后我如下调用流:

public static void main(String[] args) {
        Stream.of("x.txt", "y.txt","z.txt").collect(CollectorUtils.toFile());
    }

输出:

Convertingx.txt
closing filex.txt
Convertingy.txt
closing filey.txt
Convertingz.txt
closing filez.txt

【讨论】:

  • 这与我想要实现的目标不同,目标是将流的内容写入不同的文件,具体取决于某些分组键。这需要FileOutputStream 在某个时候关闭,即使出现异常也是如此。顺便说一句,您可以使用Collector.of 作为工厂方法,而不是自己实现接口。
  • @JörnHorstmann 感谢 Collector.of 真的很酷不知道
猜你喜欢
  • 2022-11-12
  • 1970-01-01
  • 1970-01-01
  • 2016-12-30
  • 1970-01-01
  • 1970-01-01
  • 2018-07-31
  • 1970-01-01
相关资源
最近更新 更多