【发布时间】:2019-11-04 20:23:03
【问题描述】:
我用spark就是这样写数据到kafka的。
df.write()。格式(“卡夫卡”)。保存()
我可以控制写入kafka的速度以避免对kafka造成压力吗? 是否有一些选项有助于降低速度?
【问题讨论】:
标签: apache-spark apache-kafka spark-structured-streaming
我用spark就是这样写数据到kafka的。
df.write()。格式(“卡夫卡”)。保存()
我可以控制写入kafka的速度以避免对kafka造成压力吗? 是否有一些选项有助于降低速度?
【问题讨论】:
标签: apache-spark apache-kafka spark-structured-streaming
我认为将linger.ms 设置为非零值会有所帮助。因为它控制在发送当前批次之前等待附加消息的时间量。代码如下所示
df.write.format("kafka").option("linger.ms", "100").save()
但这真的取决于很多事情。如果您的 Kafka 足够“大”并且配置正确,那么我不会太担心速度。毕竟,kafka 就是为了应对这种情况(流量高峰)而设计的。
【讨论】:
一般情况下,结构化流式处理会在默认情况下尝试尽可能快地处理数据。每个源中都有允许控制处理速率的选项,例如 File 源中的 maxFilesPerTrigger 和 Kafka 源中的 maxOffsetsPerTrigger。
val streamingETLQuery = cloudtrailEvents
.withColumn("date", $"timestamp".cast("date") // derive the date
.writeStream
.trigger(ProcessingTime("10 seconds")) // check for files every 10s
.format("parquet") // write as Parquet partitioned by date
.partitionBy("date")
.option("path", "/cloudtrail")
.option("checkpointLocation", "/cloudtrail.checkpoint/")
.start()
val df = spark.readStream
.format("text")
.option("maxFilesPerTrigger", 1)
.load("text-logs")
阅读以下链接了解更多详情:
https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-KafkaSource.html https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-FileStreamSource.html https://databricks.com/blog/2017/01/19/real-time-streaming-etl-structured-streaming-apache-spark-2-1.html http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
【讨论】:
kafka producer。