【发布时间】:2016-05-11 13:34:51
【问题描述】:
我想设置 Flink,以便它将数据流从 Apache Kafka 转换并重定向到 MongoDB。出于测试目的,我在 flink-streaming-connectors.kafka 示例 (https://github.com/apache/flink) 之上构建。
Kafka 流被 Flink 正确设置为红色,我可以映射它们等,但是当我想将每个接收和转换的消息保存到 MongoDB 时就会出现问题。我发现的关于 MongoDB 集成的唯一示例是来自 github 的 flink-mongodb-test。不幸的是,它使用静态数据源(数据库),而不是数据流。
我相信 MongoDB 应该有一些 DataStream.addSink 实现,但显然没有。
实现它的最佳方法是什么?我是否需要编写自定义接收器功能,或者我可能遗漏了什么?也许应该以不同的方式完成?
我不依赖任何解决方案,所以任何建议都将不胜感激。
下面有一个例子,我得到了什么作为输入以及我需要存储什么作为输出。
Apache Kafka Broker <-------------- "AAABBBCCCDDD" (String)
Apache Kafka Broker --------------> Flink: DataStream<String>
Flink: DataStream.map({
return ("AAABBBCCCDDD").convertTo("A: AAA; B: BBB; C: CCC; D: DDD")
})
.rebalance()
.addSink(MongoDBSinkFunction); // store the row in MongoDB collection
正如你在这个例子中看到的,我使用 Flink 主要是为了 Kafka 的消息流缓冲和一些基本的解析。
【问题讨论】:
标签: mongodb hadoop apache-kafka apache-flink