【问题标题】:How is spark.streaming.kafka.maxRatePerPartition related to spark.streaming.backpressure.enabled incase of spark streaming with Kafka?spark.streaming.kafka.maxRatePerPartition 与 spark.streaming.backpressure.enabled 有什么关系?
【发布时间】:2021-11-08 17:45:35
【问题描述】:

在读取如下配置单元表后,我正在尝试将数据写入 Kafka 主题。

write_kafka_data.py:
read_df = spark.sql("select * from db.table where some_column in ('ASIA', 'Europe')")
final_df = read_df.select(F.to_json(F.struct(F.col("*"))).alias("value"))

final_df.write.format("kafka")\
        .option("kafka.bootstrap.servers", kafka_broker)\
        .option("kafka.batch.size", 51200)\
        .option("retries", 3)\
        .option("kafka.max.request.size", 500000)\
        .option("kafka.max.block.ms", 120000)\
        .option("kafka.metadata.max.age.ms", 120000)\
        .option("kafka.request.timeout.ms", 120000)\
        .option("kafka.linger.ms", 0)\
        .option("kafka.delivery.timeout.ms", 130000)\
        .option("acks", "1")\
        .option("kafka.compression.type", "snappy")\
        .option("kafka.security.protocol", "SASL_SSL")\
        .option("kafka.sasl.jaas.config", oauth_config)\
        .option("kafka.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")\
        .option("kafka.sasl.mechanism", "OAUTHBEARER")\
        .option("topic", 'topic_name')\
        .save()

成功写入后(记录数为 29000),我正在另一个文件中读取与以下相同主题的数据: read_kafka_data.py:

    # SCHEMA
    schema = StructType([StructField("col1", StringType()),
            StructField("col2", IntegerType())
    ])

    # READ FROM TOPIC
    jass_config = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required" \
                          + " oauth.token.endpoint.uri=" + '"' + "uri" + '"' \
                          + " oauth.client.id=" + '"' + "client_id" + '"' \
                          + " oauth.client.secret=" + '"' + "secret_key" + '" ;'

    stream_df = spark.readStream \
            .format('kafka') \
            .option('kafka.bootstrap.servers', kafka_broker) \
            .option('subscribe', 'topic_name') \
            .option('kafka.security.protocol', 'SASL_SSL') \
            .option('kafka.sasl.mechanism', 'OAUTHBEARER') \
            .option('kafka.sasl.jaas.config', jass_config) \
            .option('kafka.sasl.login.callback.handler.class', "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler") \
            .option('startingOffsets', 'latest') \
            .option('group.id', 'group_id') \
            .option('maxOffsetsPerTrigger', 200) \
            .option('fetchOffset.retryIntervalMs', 200) \
            .option('fetchOffset.numRetries', 3) \
            .load()\
            .select(from_json(col('value').cast('string'), schema).alias("json_dta")).selectExpr('json_dta.*')

    stream_df.writeStream.outputMode('append')
    .format(HiveWarehouseSession.STREAM_TO_STREAM)
      .option("database", "database_name")
      .option("table", "table_name")
      .option("metastoreUri", spark.conf.get("spark.datasource.hive.warehouse.metastoreUri"))
      .option("checkpointLocation", "/path/to/checkpoint/dir")
      .start().awaitTermination()

我是 Kafka 的初学者,一直在阅读 Kafka 性能优化技术,遇到了这两个。

spark.streaming.backpressure.enabledspark.streaming.kafka.maxRatePerPartition

启用第一个参数:

sparkConf.set("spark.streaming.backpressure.enabled",”true”)

官方文档中对上述参数的解释为:

启用或禁用 Spark Streaming 的内部背压机制 (从 1.5 开始)。这使 Spark Streaming 能够控制接收 基于当前批处理调度延迟和处理时间的速率 以便系统仅以系统可以处理的速度接收。 在内部,这会动态设置最大接收速率 接收器。这个比率的上限是值 spark.streaming.receiver.maxRatespark.streaming.kafka.maxRatePerPartition

现在我是第一次运行应用程序并且没有以前的微批处理,我应该为spark.streaming.backpressure.initialRate指定一些值

如果是这样,我应该如何确定spark.streaming.backpressure.initialRate的值。 该文档还说,如果将spark.streaming.backpressure.enabled 设置为true,则最大接收速率是动态设置的。 如果是这样,我们是否还需要配置: spark.streaming.receiver.maxRatespark.streaming.kafka.maxRatePerPartition 如果spark.streaming.backpressure.enabled 设置为true

这个link 表示在施加背压时使用spark.streaming.backpressure.initialRate 没有影响。

任何有助于消除混乱的帮助将不胜感激。

【问题讨论】:

  • 您使用的是 sparks 结构化流而不是 spark 流。结构化流不支持 Afaik 背压。请查看此post 并复制您的帖子,以防此帖子回答您的问题。

标签: apache-spark pyspark apache-kafka spark-streaming spark-structured-streaming


【解决方案1】:

您所指的配置spark.streaming.[...] 属于Direct Streaming(又名Spark Streaming),属于Structured Streaming

如果您不知道其中的区别,我建议您查看单独的编程指南:

结构化流不提供背压机制。当您从 Kafka 消费时,您可以使用(就像您已经在做的那样)选项 maxOffsetsPerTrigger 来设置每个触发器上读取消息的限制。此选项在Structured Streaming and Kafka Integration Guide 中记录为:

“每个触发间隔处理的最大偏移量的速率限制。指定的总偏移量将按比例分配到不同卷的主题分区中。”


如果你仍然对标题问题感兴趣

如果使用 Kafka 进行火花流式传输,spark.streaming.kafka.maxRatePerPartitionspark.streaming.backpressure.enabled 有什么关系?

Spark's Configuration 上的文档中解释了这种关系:

"启用或禁用 Spark Streaming 的内部背压机制(从 1.5 开始)。这使 Spark Streaming 能够根据当前的批处理调度延迟和处理时间来控制接收速率,以便系统仅以系统可以处理的速度接收. 在内部,这会动态设置接收器的最大接收速率。如果设置了值spark.streaming.receiver.maxRatespark.streaming.kafka.maxRatePerPartition,则该速率的上限为(见下文)。"

关于 Spark Streaming(DStream,不是 Structured Streaming)中可用的背压机制的所有详细信息都在您已链接 Enable Back Pressure To Make Your Spark Streaming Application Production Ready 的博客中进行了解释。

通常,如果您启用背压,您会将 spark.streaming.kafka.maxRatePerPartition 设置为最佳估计速率的 150% ~ 200%。

PID 控制器的精确计算可以在 PIDRateEstimator 类中的代码中找到。

使用 Spark Streaming 的背压示例

正如您所要求的示例,这是我在我的一个生产应用程序中完成的示例:

设置

  • Kafka 主题有 16 个分区
  • Spark 使用 16 个工作核心运行,因此每个分区都可以并行使用
  • 使用 Spark 流式处理(非结构化流式处理)
  • 批处理间隔为 10 秒
  • spark.streaming.backpressure.enabled 设置为真
  • spark.streaming.kafka.maxRatePerPartition 设置为 10000
  • spark.streaming.backpressure.pid.minRate 保持默认值 100
  • 该作业每秒每个分区可以处理大约 5000 条消息
  • Kafka 主题在开始流式作业之前在每个分区中包含数百万条消息

观察

  • 在第一批中,流作业获取 16000(= 10 秒 * 16 个分区 * 100 pid.minRate)消息。
  • 该作业处理这 16000 条消息的速度非常快,因此 PID 控制器估计出一个大于 10000 的 masRatePerPartition 的最佳速率。
  • 因此,在第二批中,流式作业获取 16000(= 10 秒 * 16 个分区 * 10000 maxRatePerPartition)消息。
  • 现在,第二批完成大约需要 22 秒
  • 因为我们的批处理间隔设置为 10 秒,所以 10 秒后流式作业已经安排了第三个微批处理,再次为 1600000。原因是 PID 控制器只能使用 finished中的性能信息> 微批次。
  • 只有在第六个或第七个微批处理中,PID 控制器才能找到每个分区每秒大约 5000 条消息的最佳处理速率。

【讨论】:

    猜你喜欢
    • 2019-09-14
    • 2011-08-19
    • 2011-11-17
    • 2014-07-27
    • 2016-04-29
    • 2017-06-25
    • 2019-01-09
    • 1970-01-01
    • 2012-05-24
    相关资源
    最近更新 更多