【问题标题】:Spark DynamoDB Connectivity IssueSpark DynamoDB 连接问题
【发布时间】:2021-03-02 13:14:23
【问题描述】:

要求:使用本地机器上的 Scala 通过 Spark 从 DynamoDB(不是本地,而是在 AWS 上)读取数据。

理解:当我们使用 EMR 集群时,可以使用 emr-hadoop-dynamodb.jar 读取数据

问题

  1. 我们能否使用 emr-dynamodb-hadoop.jar 从 DynamoDB(在云端而非本地)读取数据?
  2. 不使用 EMR 集群。我直接想在本地机器上使用 scala 代码从 spark 访问 dynamodb

build.sbt

version := "0.1"

scalaVersion := "2.11.12"
scalacOptions := Seq("-target:jvm-1.8")

libraryDependencies ++= Seq(
  "software.amazon.awssdk" % "dynamodb" % "2.15.1",
  "org.apache.spark" %% "spark-core" % "2.4.1",
  "com.amazon.emr" % "emr-dynamodb-hadoop" % "4.2.0",
  "org.apache.httpcomponents" % "httpclient" % "4.5"
)

dependencyOverrides ++= {
  Seq(
    "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.6.7.1",
    "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7",
    "com.fasterxml.jackson.core" % "jackson-core" % "2.6.7"
  )
}

readDataFromDDB.scala

import org.apache.hadoop.dynamodb.DynamoDBItemWritable
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.{SparkConf, SparkContext}

object readDataFromDDB {
  def main(args: Array[String]): Unit = {
    var sc: SparkContext = null
    try {
      val conf = new SparkConf().setAppName("DynamoDBApplication").setMaster("local")
      sc = new SparkContext(conf)
      val jobConf = getDynamoDbJobConf(sc, "Music", "TableNameForWrite")
      val tableData = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
      println(tableData.count())

    } catch {
      case e: Exception => {
        println(e.getStackTrace)
      }
    } finally {
      sc.stop()
    }
  }

  private def getDynamoDbJobConf(sc: JavaSparkContext, tableNameForRead: String, tableNameForWrite: String) = {
    val jobConf = new JobConf(sc.hadoopConfiguration)
    jobConf.set("dynamodb.servicename", "dynamodb")
    jobConf.set("dynamodb.input.tableName", tableNameForRead)
    jobConf.set("dynamodb.output.tableName", tableNameForWrite)
    jobConf.set("dynamodb.awsAccessKeyId", "*****************")
    jobConf.set("dynamodb.awsSecretAccessKey", "*********************")
    jobConf.set("dynamodb.endpoint", "dynamodb.us-east-1.amazonaws.com")
    jobConf.set("dynamodb.regionid", "us-east-1")
    jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
    jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
    jobConf
  }
}

错误

java.lang.RuntimeException:无法在 DynamoDB 中查找表 Music。 在 org.apache.hadoop.dynamodb.DynamoDBClient.describeTable(DynamoDBClient.java:116) 在 org.apache.hadoop.dynamodb.read.ReadIopsCalculator.getThroughput(ReadIopsCalculator.java:67) 在 org.apache.hadoop.dynamodb.read.ReadIopsCalculator.calculateTargetIops(ReadIopsCalculator.java:57) 在 org.apache.hadoop.dynamodb.read.AbstractDynamoDBRecordReader.initReadManager(AbstractDynamoDBRecordReader.java:153) 在 org.apache.hadoop.dynamodb.read.AbstractDynamoDBRecordReader.(AbstractDynamoDBRecordReader.java:84) 在 org.apache.hadoop.dynamodb.read.DefaultDynamoDBRecordReader.(DefaultDynamoDBRecordReader.java:24) 在 org.apache.hadoop.dynamodb.read.DynamoDBInputFormat.getRecordReader(DynamoDBInputFormat.java:32) 在 org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:267) 在 org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:266) 在 org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:224) 在 org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:95) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 在 org.apache.spark.scheduler.Task.run(Task.scala:121) 在 org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:403) 在 org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748) 原因:java.lang.RuntimeException:java.lang.IllegalStateException:套接字不是由这个工厂创建的 在 org.apache.hadoop.dynamodb.DynamoDBFibonacciRetryer.handleException(DynamoDBFibonacciRetryer.java:120) 在 org.apache.hadoop.dynamodb.DynamoDBFibonacciRetryer.runWithRetry(DynamoDBFibonacciRetryer.java:83) 在 org.apache.hadoop.dynamodb.DynamoDBClient.describeTable(DynamoDBClient.java:105) ... 20 更多

已审核的链接

https://aws.amazon.com/blogs/big-data/analyze-your-data-on-amazon-dynamodb-with-apache-spark/

read/write dynamo db from apache spark

Spark 2.2.0 - How to write/read DataFrame to DynamoDB

https://github.com/awslabs/emr-dynamodb-connector

【问题讨论】:

    标签: apache-spark amazon-dynamodb


    【解决方案1】:

    在更新以下依赖版本时解决了这个问题

    "software.amazon.awssdk" % "dynamodb" % "2.15.31",
    "com.amazon.emr" % "emr-dynamodb-hadoop" % "4.14.0"
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-11-28
      • 1970-01-01
      • 1970-01-01
      • 2017-08-13
      • 1970-01-01
      • 2015-07-02
      • 2016-08-11
      • 1970-01-01
      相关资源
      最近更新 更多