【问题标题】:KafkaStreams multiple streams in same applicationKafkaStreams 同一应用程序中的多个流
【发布时间】:2018-03-23 19:15:19
【问题描述】:

我正在尝试根据 KafkaStreams 的约定和合理性做出实用的设计决策。

假设我有两个不同的事件要放入KTables。我有一个生产者将这些消息发送到正在收听该主题的KStream

据我所知,我不能对使用KafkaStreams 的消息使用条件转发,因此如果流订阅了多个主题(例如,上述每个消息一个主题),我只能调用stream.to单个接收器主题 - 否则,我将不得不在流上调用 foreach 并将带有 KProducer 的消息发送到接收器主题。

以上建议使用单个流。我以为我可以在同一个应用程序中设置多个流,每个流都监听一个主题,映射并转发到一个表接收器,但是每次我尝试创建 KafkaStreams 的两个实例时,只有第一个初始化订阅它的主题 - other 收到来自客户端的警告,提示其主题没有订阅。

我可以在同一个应用中设置多个流吗?如果有,有什么特殊要求吗?

    class Stream(topic: String) {
      val props: Option[Map[String, String]] = Some(TopicProps.get(topic))
      val streamsBuilder = new StreamsBuilder
      val topics = new util.ArrayList[String]
      topics.add(props.get("topic"))

      val stream: KStream[String, String] = configureStream(streamsBuilder, topics, props.get("sink"))

      def configureStream(builder: StreamsBuilder, topics: java.util.List[String], sink: String): KStream[String, String] = {
        builder.stream[String, String](
          topics,
          Consumed.`with`(String(), String())
        )
      }

      def init(): KafkaStreams = {
        val streams = new KafkaStreams(streamsBuilder.build(), KafkaConfig.streamConfig)

        streams.start()

        streams
      }
    }

    class Streams() {

      val eventStream = new Stream("first_event") //looking good!
      val eventStream2 = new Stream("second_event") // no subscribers
      //if I switch the other of these, eventStream2 is subscribed to and eventStream is dead in the water
      val streams: KafkaStreams = eventStream.init()
      val streams2: KafkaStreams = eventStream2.init()

    }

流配置

    val streamConfig: Properties = {
        val properties = new Properties()
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-application")
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BrokerHost)
        properties
    }

我也喜欢任何建议的替代品

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    当您创建 KafkaStreams 时,您需要传递具有不同 application.id 的属性,例如:

        props.put(StreamsConfig.APPLICATION_ID_CONFIG,"APP1");
        StreamsBuilder builder = new SteamsBuilder();
        KStream stream1 = builder.stream("topic1");
        KafkaStreams streams = new KafkaStreams(builder, props);
        streams.start();
    

    然后你应该创建另一个流:

        props.put(StreamsConfig.APPLICATION_ID_CONFIG,"APP2");
        StreamsBuilder builder = new SteamsBuilder();
        KStream stream2 = builder.stream("topic2");
        KafkaStreams streams2 = new KafkaStreams(builder, props);
        streams2.start();
    

    【讨论】:

      【解决方案2】:

      据我所知,我不能对消息使用条件转发

      您知道KStream#split()(订购版本为KStream#branch())吗?与条件转发基本相同。

      我想我可以在同一个应用程序中设置多个流,每个流都监听一个主题,映射并转发到一个表接收器,

      这应该如下工作:

      StreamsBuilder builder = new SteamsBuilder();
      KStream stream1 = builder.stream("topic1");
      KStream stream2 = builder.stream("topic2");
      
      stream1.to("table1-topic");
      stream2.to("table2-topic");
      

      但每次我尝试创建 KafkaStreams 的两个实例时,只有第一个初始化订阅其主题 - 另一个从客户端收到警告,称其主题没有订阅。

      不确定。这应该有效。也许你可以分享你的代码?

      【讨论】:

      • 代码在上面,真的很简单——当我初始化第二个流时它什么也不做——然而,kafka 知道,没有分配给它
      • fwiw 这是在播放网络服务内部运行的,但我看不出这会如何影响这一点 - 我正在创建具有所有唯一依赖项实例的流的新实例。但是,我很想检查分支
      • 为什么要使用两个StreamsBuilder 并创建两个KafakStreams 实例?如果它应该是一个应用程序,您应该每个只使用一个。
      • 这是一个很好的问题 - 将流附加到单个构建器就可以了。回想起来,是的,它当然做到了——我在想什么。
      猜你喜欢
      • 1970-01-01
      • 2018-05-19
      • 2018-07-13
      • 2014-03-05
      • 1970-01-01
      • 2015-09-08
      • 2019-06-07
      • 2016-11-26
      • 1970-01-01
      相关资源
      最近更新 更多