【问题标题】:Spark structured streaming foreachpartition / connection pooling issueSpark结构化流foreachpartition /连接池问题
【发布时间】:2025-12-04 11:50:01
【问题描述】:

我正在使用结构化流从 Kafka 读取数据,我需要保存 数据到 InfluxDB。在常规的基于 Dstreams 的方法中,我这样做了 如下:

val messages:DStream[(String, String)] =  kafkaStream.map(record => 
(record.topic, record.value)) 
messages.foreachRDD { rdd => 
  rdd.foreachPartition { partitionOfRecords => 
    val influxService = new InfluxService() 
    val connection = influxService.createInfluxDBConnectionWithParams( 
        host, 
        port, 
        username, 
        password, 
        database 
        ) 
    partitionOfRecords.foreach(record => { 
      ABCService.handleData(connection, record._1, record._2) 
    } 
    ) 
  } 
} 
ssc.start() 
logger.info("Started Spark-Kafka streaming session") 
ssc.awaitTermination() 

注意

我在foreachpartition 中创建连接对象。我该怎么做呢 在结构化流中?

我尝试了连接池方法(我 在主节点上创建一个连接池并将其传递给工作节点 ) 这里 Spark connection pooling - Is this the right approach
并且工作人员无法获取连接池对象。任何明显的东西 我在这里失踪了吗?

【问题讨论】:

    标签: apache-spark spark-structured-streaming


    【解决方案1】:

    Structured Streaming 的设计完全不同,旧的基于 RDD 的模式在这里并不适用。

    相反,您应该实现自己的ForeachWriter 。它需要三种方法:

    • open

      abstract def open(partitionId: Long, version: Long): Boolean 在执行器中开始处理新数据的一个分区时调用。

      这是您初始化连接对象的地方。一般来说,它不应该依赖于通过闭包传递的对象(你在第二个问题中犯的错误)。

      如果你想限制连接数,你可以使用单例对象,只要所有组件都是线程安全的。

    • process

      abstract def process(value: T): Unit
      

      在执行者端调用处理数据。

      这相当于foreach

    • close

      abstract def close(errorOrNull: Throwable): Unit

      在执行器端停止处理新数据的一个分区时调用。

      您可以在此处关闭连接并处理其他临时对象。

    【讨论】:

    • 感谢您回答我的问题。我确实尝试过 (*.com/questions/50205650/…),但没有成功。
    • 即使我的 Kafka 主题有 3 个分区,在数据处理过程中也会多次调用 open。因此,如果我在 open() 块中创建连接对象,它会被多次调用导致内存不足异常
    • foreachPartition 也会发生同样的事情。连接对象并非设计为可序列化并通过线路发送。你可以尝试使用单例对象来存储池,只要所有组件都是线程安全的
    • 再次感谢您的回复。你介意看看这种方法吗? *.com/questions/50205650/…
    • 嗨 /u/user9613318 我尝试了在 open 方法中创建连接的相同逻辑并且它有效。不知道为什么它不能更早地工作。