【发布时间】:2017-06-19 22:52:05
【问题描述】:
我们有一个 spark 流应用程序,我们从 kafka 接收 dstream 并需要存储到 dynamoDB ....我正在尝试两种方法来做到这一点,如下面的代码所述
requestsWithState 是一个 Dstream
带有 foreachRDD 的代码片段 1:
requestsWithState.foreachRDD { rdd =>
println("Data being populated to Pulsar")
rdd.foreach { case (id, eventStream) =>
println("id is " + id + " Event is " + eventStream)
DBUtils.putItem(dynamoConnection, id, eventStream.toString())
}
}
带地图的代码片段 2:
requestsWithState.map (rdd => { rdd match {
case (id, eventStream) => {
println("id is " + id + " Event is " + eventStream)
val dynamoConnection = setupDynamoClientConnection()
DBUtils.putItem(dynamoConnection, id, eventStream.toString())
}
}
})
requestsWithState.print(1)
代码片段 1 工作正常并填充数据库......第二个代码 sn-p 不起作用......我们是新手,很想知道它背后的原因和获取方法它工作吗? ........我们正在试验的原因(我们知道这是一个转换,foreachRdd 是一个动作)对于我们的集群负载很重的用例来说 foreachRdd 非常慢,我们发现如果我们能让它工作,那张地图会快得多.....请帮助我们让地图代码工作
【问题讨论】:
-
你应该(几乎)永远不要在地图或平面地图中产生副作用!
标签: apache-spark spark-streaming