【问题标题】:Junit cannot delete @TempDir with file created by Spark Structured StreamingJunit 无法使用 Spark Structured Streaming 创建的文件删除 @TempDir
【发布时间】:2019-05-24 10:00:47
【问题描述】:

我为我的管道创建了一个集成测试来检查是否生成了正确的 CSV 文件:

class CsvBatchSinkTest {

    @RegisterExtension
    static SparkExtension spark = new SparkExtension();

    @TempDir
    static Path directory;

    //this checks if the file is already available
    static boolean isFileWithSuffixAvailable(File directory, String suffix) throws IOException {
        return Files.walk(directory.toPath()).anyMatch(f -> f.toString().endsWith(suffix));
    }

    //this gets content of file
    static List<String> extractFileWithSuffixContent(File file, String suffix) throws IOException {
        return Files.readAllLines(
                Files.walk(file.toPath())
                        .filter(f -> f.toString().endsWith(suffix))
                        .findFirst()
                        .orElseThrow(AssertionException::new));
    }

    @Test
    @DisplayName("When correct dataset is sent to sink, then correct csv file should be generated.")
    void testWrite() throws IOException, InterruptedException {

        File file = new File(directory.toFile(), "output");


        List<Row> data =
                asList(RowFactory.create("value1", "value2"), RowFactory.create("value3", "value4"));

        Dataset<Row> dataset =
                spark.session().createDataFrame(data, CommonTestSchemas.SCHEMA_2_STRING_FIELDS);

         dataset.coalesce(1)
                .write()
                .option("header", "true")
                .option("delimiter", ";")
                .csv(file.getAbsolutePath());

        Awaitility.await()
                .atMost(10, TimeUnit.SECONDS)
                .until(() -> isFileWithSuffixAvailable(file, ".csv"));

        Awaitility.await()
                .atMost(10, TimeUnit.SECONDS)
                .untilAsserted(
                        () ->
                                assertThat(extractFileWithSuffixContent(file, ".csv"))
                                        .containsExactlyInAnyOrder("field1;field2", "value1;value2", "value3;value4"));
    }
}

真实的代码看起来有点不同,这只是一个可重现的例子。

Spark 扩展只是在每次测试之前启动本地 Spark,然后在之后关闭。

测试通过了,但是当junit尝试清理@TempDir时抛出以下异常:

删除临时目录 C:\Users\RK03GJ\AppData\Local\Temp\junit596680345801656194 失败。以下路径无法删除

我能以某种方式修复这个错误吗?我尝试等待 spark 停止使用 awaility,但我并没有真正提供帮助。

也许我可以忽略这个错误?

【问题讨论】:

  • @jannis 我将它添加到try 中,现在可以使用了,谢谢。您能添加一个答案,以便我接受吗?

标签: java junit temporary-directory tempdir


【解决方案1】:

快速猜测:您需要关闭Files.walk 返回的流。引用文档:

如果需要及时处置文件系统资源,则应使用try-with-resources构造确保流操作完成后调用流的close方法。

-- https://docs.oracle.com/javase/8/docs/api/java/nio/file/Files.html#walk-java.nio.file.Path-java.nio.file.FileVisitOption...-

要解决此问题,请在 isFileWithSuffixAvailable 方法中添加 try-with-resources:

static boolean isFileWithSuffixAvailable(File directory, String suffix) throws IOException {
    try (Stream<Path> walk = Files.walk(directory.toPath())) {
        return walk.anyMatch(f -> f.toString().endsWith(suffix));
    }
}

【讨论】:

    猜你喜欢
    • 2020-07-01
    • 2019-10-18
    • 2017-07-10
    • 1970-01-01
    • 2020-03-19
    • 1970-01-01
    • 1970-01-01
    • 2020-09-12
    相关资源
    最近更新 更多