【问题标题】:How can I forward Protobuf data from Flink to Kafka and stdout?如何将 Protobuf 数据从 Flink 转发到 Kafka 和 stdout?
【发布时间】:2020-12-03 03:28:42
【问题描述】:

我想在这里添加一些代码并从 Flink 中标准输出 protobuf 数据。

我正在使用 Flink 的 Apache Kafka 连接器来将 Flink 连接到 Kafka。

这是我的 Flink 的代码。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "localhost:9092").
val producer = FlinkKafkaProducer011(topic, new myProtobufSchema, props)
env.addSink(producer)
env.execute("To Kafka")

这是我的 Kafka 代码。

  val props: Properties = {
    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "protobuf-application")
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
    p
  }

  val builder: StreamsBuilder = new StreamsBuilder
  // TODO: implement here to stdout 

  val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
  streams.start()

  sys.ShutdownHookThread {
     streams.close(Duration.ofSeconds(10))
  }

【问题讨论】:

    标签: scala apache-kafka apache-flink apache-kafka-streams


    【解决方案1】:

    您需要设置 StreamsBuilder 从主题消费

    val builder: StreamsBuilder = new StreamsBuilder()
        .stream(topic)
        .print(Printed.toSysOut());
    

    【讨论】:

    猜你喜欢
    • 2021-01-05
    • 2016-11-08
    • 2017-12-06
    • 2019-08-19
    • 1970-01-01
    • 2017-04-07
    • 2017-04-13
    • 2020-07-31
    • 1970-01-01
    相关资源
    最近更新 更多