【问题标题】:Kafka Streams: State Store partition errorKafka Streams:状态存储分区错误
【发布时间】:2018-10-24 20:22:00
【问题描述】:

定义了一个自定义存储,用于自定义 Transformer(参考下面)。

https://github.com/apache/kafka/blob/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java

public class KafkaStream {
    public static void main(String[] args) {

        StateStoreSupplier houseStore = Stores.create("HOUSE").withKeys(Serdes.String()).withValues(houseSerde).persistent().build();
        KStreamBuilder kstreamBuilder = new KStreamBuilder();
        kstreamBuilder.addStateStore(houseStore);
        .
        .
        .

        KStream<String, String> testStream = kstreamBuilder.stream(Serdes.String(), Serdes.String(), "test");
        testStream.transform(HourlyDetail::new, houseStore.name());
        .
        .
        .
    }
}

class HouseDetail implements Transformer<String, String, KeyValue<String, House>> {
    @SuppressWarnings("unchecked")
    @Override
    public void init(ProcessorContext context) {
        this.usageStore = (KeyValueStore<String, House>) context.getStateStore("HOUSE");
    }
    .
    .
    .
}


我得到以下异常。不确定,为什么使用单个分区和单个复制创建内部主题“test_01-HOUSE-changelog”,而不是源分区“test”中的 2 个分区。这里缺少什么?

[2018-05-14 23:38:09,391] ERROR stream-thread [StreamThread-1] Failed to create an active task 0_1:  (org.apache.kafka.streams.processor.internals.StreamThread:666)
org.apache.kafka.streams.errors.StreamsException: task [0_1] Store HOUSE's change log (test_01-HOUSE-changelog) does not contain partition 1
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:185)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)
        at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:169)
        at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
        at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
        at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:119)
        at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)
        at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)
        at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69)
        at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)


$ ./kafka-topics.sh --zookeeper localhost:2181 --topic test --describe                                                      
Topic:test  PartitionCount:2        ReplicationFactor:3     Configs:
   Topic: test Partition: 0    Leader: 1001    Replicas: 1001,1002,1003        Isr: 1002,1001,1003
   Topic: test Partition: 1    Leader: 1002    Replicas: 1002,1003,1001        Isr: 1002,1001,1003

$ ./kafka-topics.sh --zookeeper localhost:2181 --topic test_01-HOUSE-changelog --describe                                    
Topic:test_01-HOUSE-changelog        PartitionCount:1        ReplicationFactor:1     Configs:
   Topic: test_01-HOUSE-changelog       Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001

禁用自动创建主题后出现异常

[2018-05-17 14:25:41,114] ERROR stream-thread [StreamThread-1] Failed to create an active task 0_0:  (org.apache.kafka.streams.processor.internals.StreamThread:666)
org.apache.kafka.streams.errors.StreamsException: task [0_0] Could not find partition info for topic: test_01-HOUSE-changelog
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:174)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)
        at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:169)
        at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
        at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
        at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:119)
        at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)
        at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)
        at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69)
        at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    如果主题存在一个分区,Kafka Streams 不会自动更改分区数。目前尚不清楚为什么该主题是使用您提供的信息中的一个分区创建的。一种可能性是,当您第一次启动应用程序时,您的输入主题有一个分区,后来您向输入主题添加了第二个分区。

    您需要使用文档中描述的应用程序重置工具清理应用程序(注意,这是一个两步过程):https://docs.confluent.io/current/streams/developer-guide/app-reset-tool.html

    【讨论】:

    • 输入主题最初是用 2 个分区创建的。但是,我删除了内部主题“test_01-HOUSE-changelog”主题,并确保在再次列出主题时它不会出现。根据上面的列表,输入主题有 2 个分区,我重新运行了流应用程序,但它再次使用单个分区创建了主题。任何其他可以帮助确定为什么要创建单个分区的部分?
    • 手动创建内部主题使流应用程序运行,但由于命名约定可以改变功能,这不是一个选项
    • 嗯...这是奇怪的行为。您可能想在InternalTopicManger#makeReady() 方法中设置断点以获得更多信息。另一个想法是关于自动主题创建——它是否启用了代理端(如果是,可能代理通过这种机制错误地创建了主题——在这种情况下,你应该禁用自动主题创建——无论如何都建议禁用它)
    • 自动主题创建已启用,但禁用会导致流进程不创建主题(内部)。
    • 嗯...如果无法创建主题,应用程序应该会失败并出现错误——您检查日志了吗?
    猜你喜欢
    • 2017-12-18
    • 2019-03-15
    • 2019-07-03
    • 1970-01-01
    • 2021-01-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-07-14
    相关资源
    最近更新 更多