【问题标题】:Spark Streaming - Error when reading from KinesisSpark Streaming - 从 Kinesis 读取时出错
【发布时间】:2017-01-19 22:02:19
【问题描述】:

我是 Apache Spark Streaming 的新手。尝试构建 Spark 以从 Kinesis Stream 读取值。这是我的python脚本

import settings
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils,   InitialPositionInStream
spark_context = SparkContext(master="local[2]", appName=settings.KINESIS_APP_NAME)

streaming_context = StreamingContext(sparkContext=spark_context, batchDuration=settings.BATCH_DURATION)

kinesis_good_stream = KinesisUtils.createStream(
ssc=streaming_context, kinesisAppName=settings.KINESIS_APP_NAME,
streamName=settings.KINESIS_GOOD_STREAM, endpointUrl=settings.KINESIS_ENDPOINT,
awsAccessKeyId=settings.AWS_ACCESS_KEY, awsSecretKey=settings.AWS_SECRET_KEY,
checkpointInterval=settings.KINESIS_CHECKPOINT_INTERVAL, regionName=settings.KINESIS_REGION,
initialPositionInStream=InitialPositionInStream.LATEST)

counts = kinesis_good_stream.flatMap(lambda line: line.split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a+b)
counts.pprint()

streaming_context.start()
streaming_context.awaitTermination()

设置文件

# Kinesis Configuration
KINESIS_REGION = 'ap-southeast-1'
KINESIS_ENDPOINT = 'kinesis.ap-southeast-1.amazonaws.com'
KINESIS_GOOD_STREAM = 'GoodStream'
KINESIS_BAD_STREAM = 'BadStream'
KINESIS_CHECKPOINT_INTERVAL = 2000
KINESIS_APP_NAME = 'test-spark'

# Spark context
BATCH_DURATION = 2

# AWS Credential
AWS_ACCESS_KEY = ''
AWS_SECRET_KEY = ''

我用这个命令运行脚本

spark-submit --jars spark-streaming-kinesis-asl-assembly.jar kinesis.py  

来自我的 django 项目

INFO:snowplow_tracker.emitters:GET request finished with status code: 200
INFO:snowplow_tracker.emitters:POST request finished with status code: 200

从我的收藏家那里,注意到写入 Kinesis 是成功的

08:00:19.720 [pool-1-thread-9] INFO  c.s.s.c.s.sinks.KinesisSink - Successfully wrote 2 out of 2 records

对于我的 Spark Streaming

-------------------------------------------
Time: 2016-11-25 07:59:25
-------------------------------------------

16/11/25 07:59:30 ERROR Executor: Exception in task 0.0 in stage 345.0 (TID 173)
java.lang.NoSuchMethodError: org.apache.spark.storage.BlockManager.get(Lorg/apache/spark/storage/BlockId;)Lscala/Option;
at org.apache.spark.streaming.kinesis.KinesisBackedBlockRDD.getBlockFromBlockManager$1(KinesisBackedBlockRDD.scala:104)
at org.apache.spark.streaming.kinesis.KinesisBackedBlockRDD.compute(KinesisBackedBlockRDD.scala:117)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:390)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

对于我的 Kinesis Stream,我使用 1 个分片并使用 2 个核心设置 Spark 上下文

【问题讨论】:

  • 您能否发布您的 sbt / maven 构建文件,以便我们查看您使用的版本?特别是 aws 库和 spark 版本
  • 抱歉,刚刚注意到你在使用 pyspark,我的错
  • 我正在使用 Spark 2.0.2 中的 pyspark

标签: apache-spark spark-streaming amazon-kinesis


【解决方案1】:

设法解决了错误。我正在使用 Spark-2.0.2 运行,但我使用的是 streaming-kinesis-asl-assembly.2.10-2.0.0.jar,这会导致 java.lang.NoSuchMethodError。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-03-29
    • 1970-01-01
    • 2015-12-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-06-24
    • 1970-01-01
    相关资源
    最近更新 更多