【问题标题】:Storm analog for spark.streaming.kafka.maxRatePerPartition in SparkSpark 中 spark.streaming.kafka.maxRatePerPartition 的风暴模拟
【发布时间】:2016-10-11 09:42:38
【问题描述】:

Spark Streaming 中有 spark.streaming.kafka.maxRatePerPartition 属性,它限制每秒从 Apache Kafka 读取的消息数。 Storm 有类似的属性吗?

【问题讨论】:

  • 你有什么版本的storm?
  • @fhuz,版本 1.0.0

标签: apache-spark apache-kafka apache-storm


【解决方案1】:

有一种解决方法,它有助于在 Storm 中做到这一点。您可以简单地为 KafkaSpout 编写以下包装器,它计算每秒 spout 发出的消息数。当它达到所需的数字 (Config.RATE) 时,它什么也不返回。

public class MyKafkaSpout extends KafkaSpout {
    private int counter = 0;
    private int currentSecond = 0;
    private final int tuplesPerSecond = Config.RATE;

    public MyKafkaSpout(SpoutConfig spoutConf) {
        super(spoutConf);
    }

    @Override
    public void nextTuple() {
        if (counter == tuplesPerSecond) {
            int newSecond = (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
            if (newSecond <= currentSecond) {
                return;
            }
            counter = 0;
            currentSecond = newSecond;
        }

        ++counter;
        super.nextTuple();
    }
}

【讨论】:

    【解决方案2】:

    我认为没有属性可以限制每秒的消息数。

    如果你使用 new kafka client (kafka 0.9) spout,你可以设置 'MaxUncommittedOffsets' 这将限制未提交偏移的数量(即飞行消息的数量)。

    但是,如果您仍在使用旧的 kafka spout(0.9 之前的 kafka),您可以使用风暴属性 'topology.max.spout.pending',它可以限制每个 spout 任务的未确认消息总数。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-01-22
      • 1970-01-01
      • 2016-11-11
      相关资源
      最近更新 更多