【发布时间】:2021-11-08 17:45:35
【问题描述】:
在读取如下配置单元表后,我正在尝试将数据写入 Kafka 主题。
write_kafka_data.py:
read_df = spark.sql("select * from db.table where some_column in ('ASIA', 'Europe')")
final_df = read_df.select(F.to_json(F.struct(F.col("*"))).alias("value"))
final_df.write.format("kafka")\
.option("kafka.bootstrap.servers", kafka_broker)\
.option("kafka.batch.size", 51200)\
.option("retries", 3)\
.option("kafka.max.request.size", 500000)\
.option("kafka.max.block.ms", 120000)\
.option("kafka.metadata.max.age.ms", 120000)\
.option("kafka.request.timeout.ms", 120000)\
.option("kafka.linger.ms", 0)\
.option("kafka.delivery.timeout.ms", 130000)\
.option("acks", "1")\
.option("kafka.compression.type", "snappy")\
.option("kafka.security.protocol", "SASL_SSL")\
.option("kafka.sasl.jaas.config", oauth_config)\
.option("kafka.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")\
.option("kafka.sasl.mechanism", "OAUTHBEARER")\
.option("topic", 'topic_name')\
.save()
成功写入后(记录数为 29000),我正在另一个文件中读取与以下相同主题的数据: read_kafka_data.py:
# SCHEMA
schema = StructType([StructField("col1", StringType()),
StructField("col2", IntegerType())
])
# READ FROM TOPIC
jass_config = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required" \
+ " oauth.token.endpoint.uri=" + '"' + "uri" + '"' \
+ " oauth.client.id=" + '"' + "client_id" + '"' \
+ " oauth.client.secret=" + '"' + "secret_key" + '" ;'
stream_df = spark.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', kafka_broker) \
.option('subscribe', 'topic_name') \
.option('kafka.security.protocol', 'SASL_SSL') \
.option('kafka.sasl.mechanism', 'OAUTHBEARER') \
.option('kafka.sasl.jaas.config', jass_config) \
.option('kafka.sasl.login.callback.handler.class', "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler") \
.option('startingOffsets', 'latest') \
.option('group.id', 'group_id') \
.option('maxOffsetsPerTrigger', 200) \
.option('fetchOffset.retryIntervalMs', 200) \
.option('fetchOffset.numRetries', 3) \
.load()\
.select(from_json(col('value').cast('string'), schema).alias("json_dta")).selectExpr('json_dta.*')
stream_df.writeStream.outputMode('append')
.format(HiveWarehouseSession.STREAM_TO_STREAM)
.option("database", "database_name")
.option("table", "table_name")
.option("metastoreUri", spark.conf.get("spark.datasource.hive.warehouse.metastoreUri"))
.option("checkpointLocation", "/path/to/checkpoint/dir")
.start().awaitTermination()
我是 Kafka 的初学者,一直在阅读 Kafka 性能优化技术,遇到了这两个。
spark.streaming.backpressure.enabled和spark.streaming.kafka.maxRatePerPartition
启用第一个参数:
sparkConf.set("spark.streaming.backpressure.enabled",”true”)
官方文档中对上述参数的解释为:
启用或禁用 Spark Streaming 的内部背压机制 (从 1.5 开始)。这使 Spark Streaming 能够控制接收 基于当前批处理调度延迟和处理时间的速率 以便系统仅以系统可以处理的速度接收。 在内部,这会动态设置最大接收速率 接收器。这个比率的上限是值
spark.streaming.receiver.maxRate和spark.streaming.kafka.maxRatePerPartition
现在我是第一次运行应用程序并且没有以前的微批处理,我应该为spark.streaming.backpressure.initialRate指定一些值
如果是这样,我应该如何确定spark.streaming.backpressure.initialRate的值。
该文档还说,如果将spark.streaming.backpressure.enabled 设置为true,则最大接收速率是动态设置的。
如果是这样,我们是否还需要配置:
spark.streaming.receiver.maxRate 和 spark.streaming.kafka.maxRatePerPartition
如果spark.streaming.backpressure.enabled 设置为true?
这个link 表示在施加背压时使用spark.streaming.backpressure.initialRate 没有影响。
任何有助于消除混乱的帮助将不胜感激。
【问题讨论】:
-
您使用的是 sparks 结构化流而不是 spark 流。结构化流不支持 Afaik 背压。请查看此post 并复制您的帖子,以防此帖子回答您的问题。
标签: apache-spark pyspark apache-kafka spark-streaming spark-structured-streaming