【发布时间】:2025-12-04 11:50:01
【问题描述】:
我正在使用结构化流从 Kafka 读取数据,我需要保存 数据到 InfluxDB。在常规的基于 Dstreams 的方法中,我这样做了 如下:
val messages:DStream[(String, String)] = kafkaStream.map(record =>
(record.topic, record.value))
messages.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val influxService = new InfluxService()
val connection = influxService.createInfluxDBConnectionWithParams(
host,
port,
username,
password,
database
)
partitionOfRecords.foreach(record => {
ABCService.handleData(connection, record._1, record._2)
}
)
}
}
ssc.start()
logger.info("Started Spark-Kafka streaming session")
ssc.awaitTermination()
注意:
我在foreachpartition 中创建连接对象。我该怎么做呢
在结构化流中?
我尝试了连接池方法(我
在主节点上创建一个连接池并将其传递给工作节点
) 这里
Spark connection pooling - Is this the right approach
并且工作人员无法获取连接池对象。任何明显的东西
我在这里失踪了吗?
【问题讨论】:
标签: apache-spark spark-structured-streaming