【问题标题】:Spark map vs foreachRdd火花地图与 foreachRdd
【发布时间】:2017-06-19 22:52:05
【问题描述】:

我们有一个 spark 流应用程序,我们从 kafka 接收 dstream 并需要存储到 dynamoDB ....我正在尝试两种方法来做到这一点,如下面的代码所述

requestsWithState 是一个 Dstream

带有 foreachRDD 的代码片段 1:

requestsWithState.foreachRDD { rdd =>
   println("Data being populated to Pulsar")
   rdd.foreach { case (id, eventStream) =>
     println("id is " + id + " Event is " + eventStream)
     DBUtils.putItem(dynamoConnection, id, eventStream.toString())
   }
}

带地图的代码片段 2:

   requestsWithState.map (rdd => { rdd match {
         case (id, eventStream) => {
           println("id is " + id + " Event is " + eventStream)
           val dynamoConnection = setupDynamoClientConnection()
           DBUtils.putItem(dynamoConnection, id, eventStream.toString())
         }
       }
     })

 requestsWithState.print(1)

代码片段 1 工作正常并填充数据库......第二个代码 sn-p 不起作用......我们是新手,很想知道它背后的原因和获取方法它工作吗? ........我们正在试验的原因(我们知道这是一个转换,foreachRdd 是一个动作)对于我们的集群负载很重的用例来说 foreachRdd 非常慢,我们发现如果我们能让它工作,那张地图会快得多.....请帮助我们让地图代码工作

【问题讨论】:

  • 你应该(几乎)永远不要在地图或平面地图中产生副作用!

标签: apache-spark spark-streaming


【解决方案1】:

Map 是 Spark 中的一种转换(惰性转换),除非您在 this 之后调用 spark 操作,否则它不会执行。 对于 Spark 转换和操作,请参阅以下链接 http://spark.apache.org/docs/latest/programming-guide.html#transformations

【讨论】:

  • .....我在 map requestsWithState.print(1) 之后确实有一个动作,但它仍然不起作用我已经相应地更新了问题,请看一下
  • RDD 是不可变的,所以 map 会返回一个新的 rdd。所以试试code requestsWithState = requestsWithState.map (rdd => { rdd match { case (id, eventStream) => { println("id is " + id + " Event is " + eventStream) val dynamoConnection = setupDynamoClientConnection() DBUtils.putItem(dynamoConnection, id, eventStream.toString()) } } }) requestsWithState.print(1) code
【解决方案2】:

DStream.map 返回另一个流。您应该在该流而不是原始流上调用 print。

所以在 Scala 中:

val transformedStream = requestsWithState.map (rdd => { rdd match {
         case (id, eventStream) => {
           println("id is " + id + " Event is " + eventStream)
           val dynamoConnection = setupDynamoClientConnection()
           DBUtils.putItem(dynamoConnection, id, eventStream.toString())
         }
       }
     })

transformedStream.print(1)

【讨论】:

    【解决方案3】:

    map 的版本没有任何动作,.map 不是动作,而是变换。

    没有动作就不会执行转换。

    参见例如http://training.databricks.com/visualapi.pdfhttp://spark.apache.org/docs/latest/programming-guide.html#transformations

    【讨论】:

    • 我在 map requestsWithState.print(1) 之后确实有一个动作,但它仍然不起作用我已经相应地更新了问题,请看一下
    • 嗨,自从您添加操作 requestsWithState.print(1) 后,您找到解决问题的方法了吗?我知道这是一个老问题,但我对你的用例很感兴趣。谢谢
    猜你喜欢
    • 1970-01-01
    • 2017-01-29
    • 2017-03-09
    • 1970-01-01
    • 2022-01-15
    • 1970-01-01
    • 2017-02-13
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多