【问题标题】:Storm analog for spark.streaming.kafka.maxRatePerPartition in SparkSpark 中 spark.streaming.kafka.maxRatePerPartition 的风暴模拟
【发布时间】:2016-10-11 09:42:38
【问题描述】:
Spark Streaming 中有 spark.streaming.kafka.maxRatePerPartition 属性,它限制每秒从 Apache Kafka 读取的消息数。 Storm 有类似的属性吗?
【问题讨论】:
标签:
apache-spark
apache-kafka
apache-storm
【解决方案1】:
有一种解决方法,它有助于在 Storm 中做到这一点。您可以简单地为 KafkaSpout 编写以下包装器,它计算每秒 spout 发出的消息数。当它达到所需的数字 (Config.RATE) 时,它什么也不返回。
public class MyKafkaSpout extends KafkaSpout {
private int counter = 0;
private int currentSecond = 0;
private final int tuplesPerSecond = Config.RATE;
public MyKafkaSpout(SpoutConfig spoutConf) {
super(spoutConf);
}
@Override
public void nextTuple() {
if (counter == tuplesPerSecond) {
int newSecond = (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
if (newSecond <= currentSecond) {
return;
}
counter = 0;
currentSecond = newSecond;
}
++counter;
super.nextTuple();
}
}
【解决方案2】:
我认为没有属性可以限制每秒的消息数。
如果你使用 new kafka client (kafka 0.9) spout,你可以设置 'MaxUncommittedOffsets' 这将限制未提交偏移的数量(即飞行消息的数量)。
但是,如果您仍在使用旧的 kafka spout(0.9 之前的 kafka),您可以使用风暴属性 'topology.max.spout.pending',它可以限制每个 spout 任务的未确认消息总数。