【问题标题】:How to create a KTable from an Object received through StreamListener?如何从通过 StreamListener 接收的对象创建 KTable?
【发布时间】:2019-08-09 15:31:31
【问题描述】:

我想创建一个 KTable(未来将是一个 GlobalKTable)来跟踪使用 Spring Cloud Streams 通过我的 Kafka 代理的条目。

我现在拥有的代码在侦听器的角度上可以正常工作,因为它接收通过 Kafka 传递的数据并提取我需要创建注册表的两个 Id。

最终的目标(简而言之)是在系统中创建一个位置,以便我们可以检查是否正在针对该组合(userId 和 appId)处理数据


// Peek on myBindings interface
public interface myBindings {

    String INPUT = "input";

    String OUTPUT = "output";

    @Input(INPUT)
    MessageChannel input();

    @Output(OUTPUT)
    MessageChannel output();

}

//Peek on the service

    @StreamListener(target = myBindings.INPUT)
    public void listenforMessage(@Payload String jsonAsString) throws IOException {
        ObjectNode node = new ObjectMapper().readValue(jsonAsString, ObjectNode.class);
        KeyValue kvPair = new KeyValue<>(node.get(Info.USER_ID_KEY), node.get(Info.APP_ID_KEY));

        // Here comes the question...
    }

在检查(或打印)这对之后,我们看到数据是正确的。那么问题是:

如何用它构建一个 KTable,因为我没有使用 KStreams 并且我的输入对象不同?

我在网上找到的所有示例都使用 KStream/KTables 作为方法的入口点/参数,但是没有使用其他类型的入口的示例。

Kafka/Spring Cloud Streams 的新手。感谢您的帮助!

【问题讨论】:

  • 您确实需要KStream。实际上,您可以声明一个单独的 kstream 类型的绑定,直接从主题创建 ktable。您甚至可以将其具体化为键值存储,以便通过配置进行查找。

标签: java apache-kafka-streams spring-cloud-stream


【解决方案1】:

Kafka Streams 的模型(以及在 Spring Cloud Stream binder 中使用的模型)是您需要从 Kafka 主题接收记录并将其作为 KStreamKTableGlobalKTable 使用。目前,常规 Kafka binder(使用 vanilla Kafka 生产者/消费者)和 Kafka Streams binder 之间没有互操作性,因为这两个 binder 的目标类型完全不同。您可以使用两种单独的StreamListener 方法,其中一种是普通的 Kafka 消费者(使用 Kafka 活页夹),另一种是基于 Kafka Streams 活页夹的消费者。至于您所描述的具体用例,我认为您可以将其作为普通的 Kafka Streams 应用程序来处理。

// Peek on myBindings interface
public interface MyBindings {

    @Input("input)
    KStream<?,? input();

}
@StreamListener
public void listenforMessage(@Input("input") KStream<?, String> input) throws IOException {
       input.... //operations on input KStream here. 

    }

您可以使用KStream 上提供的功能操作将数据具体化为KTable

【讨论】:

  • 感谢您的回复。现在我收到如下错误。 Invocation of init method failed; nested exception is java.lang.IllegalStateException: No factory found for binding target type: org.apache.kafka.streams.kstream.KStream among registered factories: channelFactory,messageSourceFactory 想知道我是否缺少 bean/config,因为我的应用程序与这些 MessageChannel 输入非常吻合。
  • 是否有一个小的独立应用程序可以重现该问题?最好在github上。
  • 抱歉,我在这件事上花费了我的时间——当我解决上述问题时,我正在 github 中创建一个条目。上面的错误是由库版本引起的,与我使用的其他库不兼容。使用 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams' 和版本:'2.2.0.RELEASE'(而不是 2.0.0)解决了它。将答案标记为正确。谢谢。
猜你喜欢
  • 2011-04-27
  • 1970-01-01
  • 1970-01-01
  • 2013-07-13
  • 2020-08-29
  • 1970-01-01
  • 2012-02-27
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多