【问题标题】:Spark Streaming tuning number of records per batch size not working?Spark Streaming 调整每批大小的记录数不起作用?
【发布时间】:2019-11-18 03:28:43
【问题描述】:

我的 spark 流应用程序正在使用 DStream 方法从 kafka 读取数据,我正在尝试使批处理大小在 10 秒内处理 60,000 条消息。

我做了什么,

  • 创建了一个包含 3 个分区的主题
  • spark.streaming.kafka.maxRatePerPartition = 60000
  • spark.streaming.backpressure.enabled = true
  • 在我创建批处理时将批处理持续时间设置为 10 秒 StreamingContext
  • 以 2 个执行器在纱线模式下运行(3 个共 4 个核心) 分区)

现在我如何测试它是否有效。

我有一个生产者一次向该主题发送 60,000 条消息。当我检查 spark UI 时,我得到以下信息:

 batch time | Input size | processing time
 10:54:30   | 17610      | 5s
 10:54:20   | 32790      | 8s
 10:54:10   | 9600       | 3s

所以每批时间间隔 10 秒。我期望的是 1 批有 60,000 条记录。还有其他一些我没有设置的参数吗?从我读到的关于我当前设置的内容来看,我应该在一个批次中获得 10 * 60,000 * 3 = 1800000。

spark.app.id  = application_1551747423133_0677

spark.app.name = KafkaCallDEV

spark.driver.cores = 2

spark.driver.extraJavaOptions   = -XX:+UseG1GC -XX:ConcGCThreads=2
-XX:InitiatingHeapOccupancyPercent=35 -Dlog4j.configuration=log4j.properties -verbose:gc

spark.driver.memory = 3g

spark.driver.port   = 33917

spark.executor.cores = 2

spark.executor.extraJavaOptions = -XX:+UseG1GC -XX:ConcGCThreads=2
-XX:InitiatingHeapOccupancyPercent=35 -Dlog4j.configuration=log4j.properties -verbose:gc

spark.executor.id   = driver

spark.executor.instances    = 2

spark.executor.memory   = 2g

spark.master    = yarn

spark.scheduler.mode    = FIFO

spark.streaming.backpressure.enabled    = true

spark.streaming.kafka.maxRatePerPartition = 60000

spark.submit.deployMode = cluster

spark.ui.filters    = org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter

spark.ui.port = 0

spark.yarn.app.container.log.dir = /data0/yarn/container-logs/application_1551747423133_0677/container_1551747423133_0677_01_000002 

下面是我用

打印出来的
logger.info(sparkSession.sparkContext.getConf.getAll.mkString("\n"))

我删除了一些不必要的日志,例如服务器地址、应用程序名称等。

(spark.executor.extraJavaOptions,-XX:+UseG1GC -XX:ConcGCThreads=2

-XX:InitiatingHeapOccupancyPercent=35 -Dlog4j.configuration=log4j.properties -verbose:gc) (spark.yarn.app.id,application_1551747423133_0681)

(spark.submit.deployMode,cluster)

(spark.streaming.backpressure.enabled,true)

(spark.yarn.credentials.renewalTime,1562764821939ms)

(spark.ui.filters,org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter)

(spark.executor.memory,2g) 

(spark.yarn.credentials.updateTime,1562769141873ms)

(spark.driver.cores,2) 

(spark.executor.id,driver)

(spark.executor.cores,2)

(spark.master,yarn)

(spark.driver.memory,3g)

(spark.sql.warehouse.dir,/user/hive/warehouse) 

(spark.ui.port,0)

(spark.driver.extraJavaOptions,-XX:+UseG1GC -XX:ConcGCThreads=2
-XX:InitiatingHeapOccupancyPercent=35 -Dlog4j.configuration=log4j.properties -verbose:gc) 

(spark.executor.instances,2)

(spark.driver.port,37375)

我还有一些正在打印的 Kafka 配置,所以我也会在下面发布。

    org.apache.kafka.clients.consumer.ConsumerConfig:178 - ConsumerConfig values: 
        metric.reporters = []
        metadata.max.age.ms = 300000
        partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        ssl.keystore.type = JKS
        enable.auto.commit = false
        sasl.mechanism = GSSAPI
        interceptor.classes = null
        exclude.internal.topics = true
        ssl.truststore.password = null
        client.id = 
        ssl.endpoint.identification.algorithm = null
        max.poll.records = 60000
        check.crcs = true
        request.timeout.ms = 40000
        heartbeat.interval.ms = 3000
        auto.commit.interval.ms = 5000
        receive.buffer.bytes = 65536
        ssl.truststore.type = JKS
        ssl.truststore.location = null
        ssl.keystore.password = null
        fetch.min.bytes = 1
        send.buffer.bytes = 131072
        value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        retry.backoff.ms = 100
        ssl.secure.random.implementation = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.trustmanager.algorithm = PKIX
        ssl.key.password = null
        fetch.max.wait.ms = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        session.timeout.ms = 30000
        metrics.num.samples = 2
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        ssl.protocol = TLS
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        ssl.cipher.suites = null
        security.protocol = PLAINTEXT
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        auto.offset.reset = latest

【问题讨论】:

    标签: apache-spark spark-streaming dstream


    【解决方案1】:

    spark.streaming.kafka.maxRatePerPartition = 60000 意味着

    每个 Kafka 的最大速率(以每秒消息数为单位) 分区将被此直接 API 读取,该 API 将由属性 spark.streaming.backpressure.enabled = true 启用

    17610 + 32790 + 9600 = 60000您的批量大小已达到。


    this

    您的 3 个 kafka 分区(有 60k 条消息)由 spark 以块/spark 分区读取,在您的情况下,是 spark 的 3 个分区。但 3 kafka 分区中的原始消息数为 60000(17610 + 32790 + 9600)。即使是高消息率输入流的回压也将使用RateLimiterPIDRateEstimator 保持消息的统一率

    所以你到这里就完成了......

    进一步转发我的post -Short note on Spark Streaming Back Pressure for better understanding

    结论: 如果您启用背压,则无论您发送消息的速率如何。它将允许消息的恒定速率

    就像这个说明性的一般示例......其中背压属性就像流入控制 - 压力调节螺钉以保持消息流的均匀速率。

    【讨论】:

    • 嗨拉姆。我的主题有 3 个分区。我一次向该主题发送 6 万条消息。我在原始帖子中向您展示了一些统计数据。处理所有 60K 消息需要 3 批,每批 10 秒,每批需要不同数量的数据。
    • logger.info(sparkContext.getConf.getAll.mkString("\n")) 只需打印这些并发布!
    • 嗨,拉姆,我已经发布了配置信息
    【解决方案2】:

    所以我找到了 Spark 将我发送的一批记录分成多批的原因。我有spark.streaming.backpressure.enabled = true。这使用来自先前批次的反馈循环来控制接收速率,该接收速率的上限是我在spark.streaming.kafka.maxRatePerPartition 中设置的每个分区的最大速率。所以 spark 正在为我调整接收率。

    【讨论】:

    • " spark 正在为我调整接收率" 这就是我在第一行回答中所说的。见my post here
    • “这就是我在第一行回答中所说的” - 实际上,我在上面发布了关于背压信息的答案,您在我发布后的第二天的编辑中提到了它。跨度>
    猜你喜欢
    • 2015-09-14
    • 1970-01-01
    • 2017-02-20
    • 2019-03-28
    • 2015-10-22
    • 1970-01-01
    • 1970-01-01
    • 2012-07-21
    • 1970-01-01
    相关资源
    最近更新 更多