【问题标题】:Kafka Streams TopologyTestDriver fails on WindowsKafka Streams TopologyTestDriver 在 Windows 上失败
【发布时间】:2019-07-20 02:11:52
【问题描述】:

我们使用 kafka-streams-test-utils 2.3.0 来测试我们的 Kafka Streams 作业。但是,如果拓扑使用 KTable,则无法在 Windows 上关闭 TopologyTestDriver。

在这里您可以看到一个包含工作和不工作测试的最小示例:

package com.bakdata;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import java.util.Properties;

import static org.assertj.core.api.Assertions.assertThat;

class StreamsTest {

    private static final Serde<Integer> INPUT_KEY_SERDE = Serdes.Integer();
    private static final Serde<Integer> INPUT_VALUE_SERDE = Serdes.Integer();
    private static final Serde<Integer> OUTPUT_KEY_SERDE = Serdes.Integer();
    private static final Serde<Integer> OUTPUT_VALUE_SERDE = Serdes.Integer();
    private static final String INPUT_TOPIC = "input-topic";
    private static final String OUTPUT_TOPIC = "output-topic";
    private final ConsumerRecordFactory<Integer, Integer> recordFactory = createRecordFactory();
    private TopologyTestDriver testDriver;

    private static ConsumerRecordFactory<Integer, Integer> createRecordFactory() {
        return new ConsumerRecordFactory<>(INPUT_TOPIC, INPUT_KEY_SERDE.serializer(), INPUT_VALUE_SERDE.serializer());
    }

    private static Properties getKafkaProperties() {
        Properties properties = new Properties();
        properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app");
        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
        return properties;
    }

    @AfterEach
    void tearDown() {
        testDriver.close();
    }

    @Test
    void test() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(INPUT_TOPIC, Consumed.with(INPUT_KEY_SERDE, INPUT_VALUE_SERDE))
            .mapValues(v -> v + 1)
            .to(OUTPUT_TOPIC, Produced.with(OUTPUT_KEY_SERDE, OUTPUT_VALUE_SERDE));
        Topology topology = builder.build();
        Properties properties = getKafkaProperties();
        testDriver = new TopologyTestDriver(topology, properties);
        addInput(1, 1, 0L);
        addInput(2, 4, 1L);
        addInput(1, 2, 2L);
        assertThat(readRecord())
            .satisfies(record -> assertThat(record.key()).isEqualTo(1))
            .satisfies(record -> assertThat(record.value()).isEqualTo(2));
        assertThat(readRecord())
            .satisfies(record -> assertThat(record.key()).isEqualTo(2))
            .satisfies(record -> assertThat(record.value()).isEqualTo(5));
        assertThat(readRecord())
            .satisfies(record -> assertThat(record.key()).isEqualTo(1))
            .satisfies(record -> assertThat(record.value()).isEqualTo(3));
    }

    // fails on Windows with
    // org.apache.kafka.streams.errors.StreamsException: java.nio.file.DirectoryNotEmptyException: \tmp\kafka-streams\my-app\0_0
    // when testDriver is closed
    @Test
    void testGrouping() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(INPUT_TOPIC, Consumed.with(INPUT_KEY_SERDE, INPUT_VALUE_SERDE))
            .groupByKey()
            .reduce(Integer::sum)
            .toStream()
            .to(OUTPUT_TOPIC, Produced.with(OUTPUT_KEY_SERDE, OUTPUT_VALUE_SERDE));
        Topology topology = builder.build();
        Properties properties = getKafkaProperties();
        testDriver = new TopologyTestDriver(topology, properties);
        addInput(1, 1, 0L);
        addInput(2, 4, 1L);
        addInput(1, 2, 2L);
        assertThat(readRecord())
            .satisfies(record -> assertThat(record.key()).isEqualTo(1))
            .satisfies(record -> assertThat(record.value()).isEqualTo(1));
        assertThat(readRecord())
            .satisfies(record -> assertThat(record.key()).isEqualTo(2))
            .satisfies(record -> assertThat(record.value()).isEqualTo(4));
        assertThat(readRecord())
            .satisfies(record -> assertThat(record.key()).isEqualTo(1))
            .satisfies(record -> assertThat(record.value()).isEqualTo(3));
    }

    private void addInput(int key, int value, long timestampMs) {
        ConsumerRecord<byte[], byte[]> record = recordFactory.create(key, value, timestampMs);
        testDriver.pipeInput(record);
    }

    private ProducerRecord<Integer, Integer> readRecord() {
        return testDriver.readOutput(OUTPUT_TOPIC, OUTPUT_KEY_SERDE.deserializer(), OUTPUT_VALUE_SERDE.deserializer());
    }

}

生成的堆栈跟踪如下所示:

[Test worker] INFO org.apache.kafka.streams.processor.internals.StateDirectory - stream-thread [Test worker] Deleting state directory 0_0 for task 0_0 as     user calling cleanup.
[Test worker] INFO org.apache.kafka.streams.state.internals.RocksDBTimestampedStore - Opening store KSTREAM-REDUCE-STATE-STORE-0000000001 in upgrade mode
[Test worker] INFO org.apache.kafka.streams.processor.internals.StateDirectory - stream-thread [Test worker] Deleting state directory 0_0 for task 0_0 as     user calling cleanup.
[Test worker] ERROR org.apache.kafka.streams.processor.internals.StateDirectory - stream-thread [Test worker] Failed to delete the state directory.
java.nio.file.DirectoryNotEmptyException: \tmp\kafka-streams\my-app\0_0
    at java.base/sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:267)
    at java.base/sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:105)
    at java.base/java.nio.file.Files.delete(Files.java:1141)
    at org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:769)
    at org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:752)
    at java.base/java.nio.file.Files.walkFileTree(Files.java:2742)
    at java.base/java.nio.file.Files.walkFileTree(Files.java:2796)
    at org.apache.kafka.common.utils.Utils.delete(Utils.java:752)
    at org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:301)
    at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:240)
    at org.apache.kafka.streams.TopologyTestDriver.close(TopologyTestDriver.java:821)
    at com.bakdata.StreamsTest.tearDown(StreamsTest.java:47)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:628)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:117)
    at org.junit.jupiter.engine.descriptor.ClassTestDescriptor.invokeMethodInExtensionContext(ClassTestDescriptor.java:439)
    at org.junit.jupiter.engine.descriptor.ClassTestDescriptor.lambda$synthesizeAfterEachMethodAdapter$16(ClassTestDescriptor.java:431)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeAfterEachMethods$10(TestMethodTestDescriptor.java:227)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeAllAfterMethodsOrCallbacks$14(TestMethodTestDescriptor.java:245)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeAllAfterMethodsOrCallbacks(TestMethodTestDescriptor.java:243)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeAfterEachMethods(TestMethodTestDescriptor.java:226)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:135)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(    SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(    SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:170)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:154)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:90)
at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(    JUnitPlatformTestClassProcessor.java:92)
at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$100(    JUnitPlatformTestClassProcessor.java:77)
    at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:73)
    at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
    at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
    at com.sun.proxy.$Proxy2.stop(Unknown Source)
    at org.gradle.api.internal.tasks.testing.worker.TestWorker.stop(TestWorker.java:131)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:155)
    at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:137)
    at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
    at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
    at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
    at java.base/java.lang.Thread.run(Thread.java:834)

java.nio.file.DirectoryNotEmptyException: \tmp\kafka-streams\my-app\0_0
org.apache.kafka.streams.errors.StreamsException: java.nio.file.DirectoryNotEmptyException: \tmp\kafka-streams\my-app\0_0
    at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:243)
    at org.apache.kafka.streams.TopologyTestDriver.close(TopologyTestDriver.java:821)
    at com.bakdata.StreamsTest.tearDown(StreamsTest.java:47)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:628)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:117)
    at org.junit.jupiter.engine.descriptor.ClassTestDescriptor.invokeMethodInExtensionContext(ClassTestDescriptor.java:439)
    at org.junit.jupiter.engine.descriptor.ClassTestDescriptor.lambda$synthesizeAfterEachMethodAdapter$16(ClassTestDescriptor.java:431)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeAfterEachMethods$10(TestMethodTestDescriptor.java:227)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeAllAfterMethodsOrCallbacks$14(TestMethodTestDescriptor.java:245)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeAllAfterMethodsOrCallbacks(TestMethodTestDescriptor.java:243)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeAfterEachMethods(TestMethodTestDescriptor.java:226)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:135)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(    SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(    SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:170)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:154)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:90)
at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(    JUnitPlatformTestClassProcessor.java:92)
at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$100(    JUnitPlatformTestClassProcessor.java:77)
    at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:73)
    at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
    at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
    at com.sun.proxy.$Proxy2.stop(Unknown Source)
    at org.gradle.api.internal.tasks.testing.worker.TestWorker.stop(TestWorker.java:131)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:155)
    at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:137)
    at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
    at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
    at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.nio.file.DirectoryNotEmptyException: \tmp\kafka-streams\my-app\0_0
    at java.base/sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:267)
    at java.base/sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:105)
    at java.base/java.nio.file.Files.delete(Files.java:1141)
    at org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:769)
    at org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:752)
    at java.base/java.nio.file.Files.walkFileTree(Files.java:2742)
    at java.base/java.nio.file.Files.walkFileTree(Files.java:2796)
    at org.apache.kafka.common.utils.Utils.delete(Utils.java:752)
    at org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:301)
    at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:240)
    ... 81 more

我也试过打电话

testDriver.getAllStateStores().forEach((name, stateStore) -> stateStore.close());

在关闭 TopologyTestDriver 之前。

有人知道如何在 Windows 上正确关闭 TopologyTestDriver 吗?我认为这与打开的文件句柄有关,尽管存在打开的文件句柄,但 Unix 操作系统会释放资源。

【问题讨论】:

  • 嗨。我也遇到过这个问题。这显然是由于本地商店无法在 Windows 上删除。 stackoverflow.com/questions/56282751/… 中讨论了一个类似的问题,但在 Linux 上 - 我感觉到您的问题和另一个问题。有相同的根本原因。遗憾的是,没有提供明确的答案或解决方法。 Matthias J. Sax 怀疑存在错误...
  • 这 2 个问题可能更类似于您的:stackoverflow.com/questions/52505306/…stackoverflow.com/questions/50602512/…,Or Biran 的答案值得测试。
  • 手动清理状态目录无济于事,因为相同的文件锁定已到位以防止删除。我担心这是一个实际的错误,只能由 Kafka Streams 团队自己解决

标签: junit apache-kafka apache-kafka-streams


【解决方案1】:

我怀疑测试工作了一次,然后将状态存储目录填充到持久位置,测试不知道如何清理。

您能否尝试使用 JUnit 规则(在测试之间删除)为状态存储添加一个临时文件夹?

@Rule
public TemporaryFolder folder = new TemporaryFolder();

在属性中

properties.put(StreamsConfig.STATE_DIR_CONFIG, folder.newFolder("functionUnderTest").getAbsolutePath());

还有org.apache.kafka.test.TestUtils.tempDirectory().getAbsolutePath()

【讨论】:

  • 不幸的是,这并不能解决问题。关闭测试驱动后仍然报错目录不为空
  • TestUtils.tempDirectory().getAbsolutePath() 把戏拯救了我的下午。非常感谢。
猜你喜欢
  • 1970-01-01
  • 2020-12-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-12-17
  • 2019-07-20
  • 1970-01-01
  • 2019-03-30
相关资源
最近更新 更多