【发布时间】:2018-04-30 04:07:04
【问题描述】:
我正在尝试在 Spark 中创建成批的 Dataset 行。
为了保持发送到服务的记录数量,我想对这些项目进行批处理,以便我可以保持发送数据的速率。
对于,
case class Person(name:String, address: String)
case class PersonBatch(personBatch: List[Person])
对于给定的Dataset[Person],我想创建Dataset[PersonBatch]
例如,如果输入 Dataset[Person] 有 100 条记录,则输出 Dataset 应该类似于 Dataset[PersonBatch],其中每个 PersonBatch 应该是 n 记录(人)的列表。
我已经试过了,但它不起作用。
object DataBatcher extends Logger {
var batchList: ListBuffer[PersonBatch] = ListBuffer[PersonBatch]()
var batchSize: Long = 500 //default batch size
def addToBatchList(batch: PersonBatch): Unit = {
batchList += batch
}
def clearBatchList(): Unit = {
batchList.clear()
}
def createBatches(ds: Dataset[Person]): Dataset[PersonBatch] = {
val dsCount = ds.count()
logger.info(s"Count of dataset passed for creating batches : ${dsCount}")
val batchElement = ListBuffer[Person]()
val batch = PersonBatch(batchElement)
ds.foreach(x => {
batch.personBatch += x
if(batch.personBatch.length == batchSize) {
addToBatchList(batch)
batch.requestBatch.clear()
}
})
if(batch.personBatch.length > 0) {
addToBatchList(batch)
batch.personBatch.clear()
}
sparkSession.createDataset(batchList)
}
}
我想在 Hadoop 集群上运行此作业。 有人可以帮我解决这个问题吗?
【问题讨论】:
标签: scala apache-spark spark-dataframe apache-spark-dataset