【问题标题】:Batching of Dataset Spark scala数据集 Spark scala 的批处理
【发布时间】:2018-04-30 04:07:04
【问题描述】:

我正在尝试在 Spark 中创建成批的 Dataset 行。 为了保持发送到服务的记录数量,我想对这些项目进行批处理,以便我可以保持发送数据的速率。 对于,

case class Person(name:String, address: String)
case class PersonBatch(personBatch: List[Person])

对于给定的Dataset[Person],我想创建Dataset[PersonBatch]

例如,如果输入 Dataset[Person] 有 100 条记录,则输出 Dataset 应该类似于 Dataset[PersonBatch],其中每个 PersonBatch 应该是 n 记录(人)的列表。

我已经试过了,但它不起作用。

object DataBatcher extends Logger {

  var batchList: ListBuffer[PersonBatch] = ListBuffer[PersonBatch]()
  var batchSize: Long = 500  //default batch size

  def addToBatchList(batch: PersonBatch): Unit = {
    batchList += batch
  }

  def clearBatchList(): Unit = {
    batchList.clear()
  }

  def createBatches(ds: Dataset[Person]): Dataset[PersonBatch] = {

    val dsCount = ds.count()
    logger.info(s"Count of dataset passed for creating batches : ${dsCount}")
    val batchElement = ListBuffer[Person]()
    val batch = PersonBatch(batchElement)
    ds.foreach(x => {
      batch.personBatch += x
      if(batch.personBatch.length == batchSize) {
        addToBatchList(batch)
        batch.requestBatch.clear()
      }
    })
    if(batch.personBatch.length > 0) {
      addToBatchList(batch)
      batch.personBatch.clear()
    }
    sparkSession.createDataset(batchList)
  }  
}

我想在 Hadoop 集群上运行此作业。 有人可以帮我解决这个问题吗?

【问题讨论】:

    标签: scala apache-spark spark-dataframe apache-spark-dataset


    【解决方案1】:

    rdd.iterator 有分组功能可能对你有用。

    例如:

    iter.grouped(batchSize)

    示例代码 sn-p 使用 iter.grouped(batchsize) 批量插入,此处为 1000,我试图插入数据库

       df.repartition(numofpartitionsyouwant) // numPartitions ~ number of simultaneous DB connections you can planning to give...
    def insertToTable(sqlDatabaseConnectionString: String,
                      sqlTableName: String): Unit = {
    
      val tableHeader: String = dataFrame.columns.mkString(",")
      dataFrame.foreachPartition { partition =>
        //NOTE : EACH PARTITION ONE CONNECTION (more better way is to use connection pools)
        val sqlExecutorConnection: Connection =
          DriverManager.getConnection(sqlDatabaseConnectionString)
        //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql
        partition.grouped(1000).foreach { group =>
          val insertString: scala.collection.mutable.StringBuilder =
            new scala.collection.mutable.StringBuilder()
    
          group.foreach { record =>
            insertString.append("('" + record.mkString(",") + "'),")
          }
    
          sqlExecutorConnection
            .createStatement()
            .executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES "
              + insertString.stripSuffix(","))
        }
    
        sqlExecutorConnection.close() // close the connection so that connections wont exhaust.
      }
    }
    

    【讨论】:

      【解决方案2】:
      val tableHeader: String = dataFrame.columns.mkString(",")
      dataFrame.foreachPartition((it: Iterator[Row]) => {
            println("partition index: " )
            val url = "jdbc:...+ "user=;password=;"
            val conn = DriverManager.getConnection(url)
            conn.setAutoCommit(true)
            val stmt = conn.createStatement()
            val batchSize = 10
            var i =0
            while (it.hasNext) {
              val row = it.next
              import java.sql.SQLException
              import java.sql.SQLIntegrityConstraintViolationException
              try {
                stmt.addBatch(" UPDATE TABLE SET STATUS = 0 , " +
                  " DATE ='" + new java.sql.Timestamp(System.currentTimeMillis()) +"'" +
                  " where id = " + row.getAs("IDNUM")  )
                i += 1
                if (  i  % batchSize == 0 ) {
                  stmt.executeBatch
                  conn.commit
                }
              } catch {
                case e: SQLIntegrityConstraintViolationException =>
                case e: SQLException =>
                  e.printStackTrace()
              }
              finally {
                   stmt.executeBatch
                   conn.commit
              }
      
            }
            import java.util
            val ret = stmt.executeBatch
            System.out.println("Ret val: " + util.Arrays.toString(ret))
            System.out.println("Update count: " + stmt.getUpdateCount)
            conn.commit
            stmt.close
      

      【讨论】:

        猜你喜欢
        • 2017-02-10
        • 2017-01-24
        • 1970-01-01
        • 1970-01-01
        • 2018-09-14
        • 2020-03-04
        • 2010-11-01
        • 2021-01-01
        • 1970-01-01
        相关资源
        最近更新 更多