【发布时间】:2020-12-03 03:28:42
【问题描述】:
我想在这里添加一些代码并从 Flink 中标准输出 protobuf 数据。
我正在使用 Flink 的 Apache Kafka 连接器来将 Flink 连接到 Kafka。
这是我的 Flink 的代码。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "localhost:9092").
val producer = FlinkKafkaProducer011(topic, new myProtobufSchema, props)
env.addSink(producer)
env.execute("To Kafka")
这是我的 Kafka 代码。
val props: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "protobuf-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
p
}
val builder: StreamsBuilder = new StreamsBuilder
// TODO: implement here to stdout
val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
streams.start()
sys.ShutdownHookThread {
streams.close(Duration.ofSeconds(10))
}
【问题讨论】:
标签: scala apache-kafka apache-flink apache-kafka-streams