【发布时间】:2021-03-02 13:14:23
【问题描述】:
要求:使用本地机器上的 Scala 通过 Spark 从 DynamoDB(不是本地,而是在 AWS 上)读取数据。
理解:当我们使用 EMR 集群时,可以使用 emr-hadoop-dynamodb.jar 读取数据
问题:
- 我们能否使用 emr-dynamodb-hadoop.jar 从 DynamoDB(在云端而非本地)读取数据?
- 不使用 EMR 集群。我直接想在本地机器上使用 scala 代码从 spark 访问 dynamodb
build.sbt
version := "0.1"
scalaVersion := "2.11.12"
scalacOptions := Seq("-target:jvm-1.8")
libraryDependencies ++= Seq(
"software.amazon.awssdk" % "dynamodb" % "2.15.1",
"org.apache.spark" %% "spark-core" % "2.4.1",
"com.amazon.emr" % "emr-dynamodb-hadoop" % "4.2.0",
"org.apache.httpcomponents" % "httpclient" % "4.5"
)
dependencyOverrides ++= {
Seq(
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.6.7.1",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7",
"com.fasterxml.jackson.core" % "jackson-core" % "2.6.7"
)
}
readDataFromDDB.scala
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.{SparkConf, SparkContext}
object readDataFromDDB {
def main(args: Array[String]): Unit = {
var sc: SparkContext = null
try {
val conf = new SparkConf().setAppName("DynamoDBApplication").setMaster("local")
sc = new SparkContext(conf)
val jobConf = getDynamoDbJobConf(sc, "Music", "TableNameForWrite")
val tableData = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
println(tableData.count())
} catch {
case e: Exception => {
println(e.getStackTrace)
}
} finally {
sc.stop()
}
}
private def getDynamoDbJobConf(sc: JavaSparkContext, tableNameForRead: String, tableNameForWrite: String) = {
val jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set("dynamodb.servicename", "dynamodb")
jobConf.set("dynamodb.input.tableName", tableNameForRead)
jobConf.set("dynamodb.output.tableName", tableNameForWrite)
jobConf.set("dynamodb.awsAccessKeyId", "*****************")
jobConf.set("dynamodb.awsSecretAccessKey", "*********************")
jobConf.set("dynamodb.endpoint", "dynamodb.us-east-1.amazonaws.com")
jobConf.set("dynamodb.regionid", "us-east-1")
jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
jobConf
}
}
错误
java.lang.RuntimeException:无法在 DynamoDB 中查找表 Music。 在 org.apache.hadoop.dynamodb.DynamoDBClient.describeTable(DynamoDBClient.java:116) 在 org.apache.hadoop.dynamodb.read.ReadIopsCalculator.getThroughput(ReadIopsCalculator.java:67) 在 org.apache.hadoop.dynamodb.read.ReadIopsCalculator.calculateTargetIops(ReadIopsCalculator.java:57) 在 org.apache.hadoop.dynamodb.read.AbstractDynamoDBRecordReader.initReadManager(AbstractDynamoDBRecordReader.java:153) 在 org.apache.hadoop.dynamodb.read.AbstractDynamoDBRecordReader.(AbstractDynamoDBRecordReader.java:84) 在 org.apache.hadoop.dynamodb.read.DefaultDynamoDBRecordReader.(DefaultDynamoDBRecordReader.java:24) 在 org.apache.hadoop.dynamodb.read.DynamoDBInputFormat.getRecordReader(DynamoDBInputFormat.java:32) 在 org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:267) 在 org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:266) 在 org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:224) 在 org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:95) 在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) 在 org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 在 org.apache.spark.scheduler.Task.run(Task.scala:121) 在 org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:403) 在 org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748) 原因:java.lang.RuntimeException:java.lang.IllegalStateException:套接字不是由这个工厂创建的 在 org.apache.hadoop.dynamodb.DynamoDBFibonacciRetryer.handleException(DynamoDBFibonacciRetryer.java:120) 在 org.apache.hadoop.dynamodb.DynamoDBFibonacciRetryer.runWithRetry(DynamoDBFibonacciRetryer.java:83) 在 org.apache.hadoop.dynamodb.DynamoDBClient.describeTable(DynamoDBClient.java:105) ... 20 更多
已审核的链接
https://aws.amazon.com/blogs/big-data/analyze-your-data-on-amazon-dynamodb-with-apache-spark/
read/write dynamo db from apache spark
【问题讨论】:
标签: apache-spark amazon-dynamodb