【发布时间】:2016-02-27 13:14:12
【问题描述】:
我正在研究一些每秒生成约 30K 消息的 kafka 主题。我有一个 flink 拓扑设置来读取其中一个,聚合一点(5 秒窗口),然后(最终)写入数据库。
当我运行拓扑并删除除读取 -> 聚合步骤之外的所有内容时,每分钟我只能收到约 30K 条消息。没有任何地方可以发生背压。
我做错了什么?
编辑:
- 我无法更改有关主题空间的任何内容。每个主题都有一个分区,并且有数百个。
- 每条消息都是一个压缩的 thrift 对象,平均大小为 2-3Kb
看来我只能获得 ~1.5 MB/s。不接近提到的 100MB/s。
当前代码路径:
DataStream<byte[]> dataStream4 = env.addSource(new FlinkKafkaConsumer081<>("data_4", new RawSchema(), parameterTool.getProperties())).setParallelism(1);
DataStream<Tuple4<Long, Long, Integer, String>> ds4 = dataStream4.rebalance().flatMap(new mapper2("data_4")).setParallelism(4);
public class mapper2 implements FlatMapFunction<byte[], Tuple4<Long, Long, Integer, String>> {
private String mapId;
public mapper2(String mapId) {
this.mapId = mapId;
}
@Override
public void flatMap(byte[] bytes, Collector<Tuple4<Long, Long, Integer, String>> collector) throws Exception {
TimeData timeData = (TimeData)ts_thriftDecoder.fromBytes(bytes);
Tuple4 tuple4 = new Tuple4<Long, Long, Integer, String>();
tuple4.f0 = timeData.getId();
tuple4.f1 = timeData.getOtherId();
tuple4.f2 = timeData.getSections().size();
tuple4.f3 = mapId;
collector.collect(tuple4);
}
}
【问题讨论】:
-
您从哪里获得约 30K msg/sec 的数据?来自你自己的测试?如果你在某处读到过,能否提供链接?
-
我有生产者代码,可以从那里读取吞吐量。
-
所以你的生产者以 ~30K msg/sec 的速度生产,但你的消费者只以 ~30K msg/min 的速度消费?
-
是的。这就是我所看到的。我知道一旦数据进入 flink,它就会非常迅速。我有另一个将> 6M记录/分钟推入cassandra的拓扑。我想知道 kafka 消费者 FlinkKafkaConsumer081 是否存在一些固有的瓶颈。请注意,此主题只有一个分区。这会是个问题吗?
-
好的。这就说得通了。您可能想要编辑您的问题,以更清楚地说明这些事情,和/或您根据我的问题发现的任何其他事实您的调试。
标签: java apache-kafka apache-flink flink-streaming