【问题标题】:InvalidStateStoreException on KStream join using GlobalKtables使用 GlobalKtables 加入 KStream 时出现 InvalidStateStoreException
【发布时间】:2019-09-24 05:13:15
【问题描述】:

我有一个 Kafka Streams 应用程序,我在其中加入了一个 KStream,它读取 从“topic1”开始,使用从“topic2”读取的 GlobalKTable,然后使用 另一个从“topic3”读取的 GlobalKTable。

当我尝试同时向所有 3 个主题推送消息时,我得到以下异常 -

org.apache.kafka.streams.errors.InvalidStateStoreException

如果我在这些主题中一一推送消息,即在 topic2 中推送消息,然后在 topic3 中,然后在 topic1 中,那么我没有得到这个 例外。

在启动 KafkaStreams 之前我还添加了 StateListener

KafkaStreams.StateListener stateListener = new KafkaStreams.StateListener() {
            @Override
            public void onChange (KafkaStreams.State newState, KafkaStreams.State oldState) {
                if(newState == KafkaStreams.State.REBALANCING) {
                    try {
                        Thread.sleep(1000);
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
streams.setStateListener(stateListener);
streams.start();

另外,我通过调用以下方法等到流开始后可以查询商店

public static  <T> T waitUntilStoreIsQueryable(final String storeName,
                                                   final QueryableStoreType<T> queryableStoreType,
                                                   final KafkaStreams streams) throws InterruptedException {
        while (true) {
            try {
                return streams.store(storeName, queryableStoreType);
            } catch (final InvalidStateStoreException ignored) {
                // store not yet ready for querying
                Thread.sleep(100);
            }
        }
    }

以下是 Kafka Streams 和 GlobalKTable 的加入代码:

KStream<String, GenericRecord> topic1KStream =
           builder.stream(
               "topic1",
               Consumed.with(Serdes.String(), genericRecordSerde)
           );
GlobalKTable<String, GenericRecord> topic2KTable =
           builder.globalTable(
               "topic2",
               Consumed.with(Serdes.String(), genericRecordSerde),
               Materialized.<String, GenericRecord, KeyValueStore<Bytes, byte[]>>as("topic2-global-store")
                   .withKeySerde(Serdes.String())
                   .withValueSerde(genericRecordSerde)
           );
  GlobalKTable<String, GenericRecord> topic3KTable =
           builder.globalTable(
               "topic3",
               Consumed.with(Serdes.String(), genericRecordSerde),
               Materialized.<String, GenericRecord, KeyValueStore<Bytes, byte[]>>as("topic3-global-store")
                   .withKeySerde(Serdes.String())
                   .withValueSerde(genericRecordSerde)
           );

KStream<String, MergedObj> stream_topic1_topic2 = topic1KStream.join(
           topic2KTable,
           (topic2Id, topic1Obj) -> topic1.get("id").toString(),
           (topic1Obj, topic2Obj) -> new MergedObj(topic1Obj, topic2Obj)
       );
       final KStream<String, GenericRecord> enrichedStream =
        stream_topic1_topic2.join(
           topic3KTable,
           (topic2Id, mergedObj) -> mergedObj.topic3Id(),
           (mergedObj, topic3Obj) -> new Enriched(
               mergedObj.topic1Obj,
               mergedObj.topic2Obj,
               topic3Obj
           ).enrich()
       );
enrichedStream.to("enrichedStreamTopic", Produced.with(Serdes.String(),getGenericRecordSerde()));

以上代码与this非常相似。

当我尝试同时向所有 3 个主题推送消息时,我得到了 以下异常:

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000, topic=topic1,
partition=1, offset=61465,
stacktrace=org.apache.kafka.streams.errors.InvalidStateStoreException:
Store topic2-global-store is currently closed.
    at
org.apache.kafka.streams.state.internals.WrappedStateStore.validateStoreOpen(WrappedStateStore.java:66)
    at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:150)
    at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:37)
    at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:135)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadOnlyDecorator.get(ProcessorContextImpl.java:245)
    at
org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
    at
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:71)
    at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
    at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:364)
    at
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
    at
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:420)
    at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:890)
    at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
    at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)

【问题讨论】:

    标签: apache-kafka bigdata apache-kafka-streams confluent-platform


    【解决方案1】:

    我解决了这个问题 在我的代码中,我有 auto.register.schemas=false,因为我已经为我的所有主题手动注册了架构。

    在我设置auto.register.schemas=true 并重新运行流应用程序后,它运行良好。我认为它的内部主题需要这个标志。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-09-26
      • 1970-01-01
      • 1970-01-01
      • 2022-10-24
      • 2018-09-18
      • 2020-04-21
      • 2020-05-02
      • 2019-09-28
      相关资源
      最近更新 更多