【问题标题】:Spark Streaming + Kinesis : Receiver MaxRate is violatedSpark Streaming + Kinesis:接收器 MaxRate 被违反
【发布时间】:2017-04-13 00:35:47
【问题描述】:

我正在通过 maxRate 调用 spark-submit,我有一个 kinesis 接收器和 1s 批次

spark-submit --conf spark.streaming.receiver.maxRate=10 ....

但是,单个批次可能会大大超过已建立的 maxRate。即:我得到 300 条记录。

我是否缺少任何设置?

【问题讨论】:

    标签: apache-spark spark-streaming amazon-kinesis


    【解决方案1】:

    这对我来说似乎是一个错误。从代码中查看,Kinesis 似乎完全忽略了spark.streaming.receiver.maxRate 配置。

    如果您查看KinesisReceiver.onStart 内部,您会看到:

    val kinesisClientLibConfiguration =
      new KinesisClientLibConfiguration(checkpointAppName, streamName, awsCredProvider, workerId)
      .withKinesisEndpoint(endpointUrl)
      .withInitialPositionInStream(initialPositionInStream)
      .withTaskBackoffTimeMillis(500)
      .withRegionName(regionName)
    

    这个构造函数最终会调用另一个构造函数,该构造函数有很多配置的默认值:

    public KinesisClientLibConfiguration(String applicationName,
            String streamName,
            AWSCredentialsProvider kinesisCredentialsProvider,
            AWSCredentialsProvider dynamoDBCredentialsProvider,
            AWSCredentialsProvider cloudWatchCredentialsProvider,
            String workerId) {
        this(applicationName, streamName, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider,
                dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId,
                DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
                DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
                DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
                new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(),
                DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE,
                DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null);
    }
    

    您关心的是DEFAULT_MAX_RECORDS,它不断设置为10,000 条记录。 KinesisClientLibConfiguration 上有一个名为 withMaxRecords 的方法,您可以调用它来设置实际记录数。这应该很容易解决。

    但目前看来,Kinesis 接收器似乎不遵守该参数。

    【讨论】:

      【解决方案2】:

      供日后参考。

      这是在Spark 2.2.0 版本中修复的已知bug

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2015-09-20
        • 1970-01-01
        • 1970-01-01
        • 2021-03-23
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多