【发布时间】:2017-03-06 02:31:57
【问题描述】:
我正在尝试实现 Apache kafka 和 spark 流集成 这是我的python代码:
from __future__ import print_function
import sys
from pyspark.streaming import StreamingContext
from pyspark import SparkContext,SparkConf
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
#conf = SparkConf().setAppName("Kafka-Spark").setMaster("spark://127.0.0.1:7077")
conf = SparkConf().setAppName("Kafka-Spark")
#sc = SparkContext(appName="KafkaSpark")
sc = SparkContext(conf=conf)
stream=StreamingContext(sc,1)
map1={'demo':1}
kafkaStream = KafkaUtils.createStream(stream, 'localhost:2181', "test-consumer-group", map1)
# kafkaStream = KafkaUtils.createStream(stream, 'localhost:2181', "name", map1) #tried with localhost:2181 too
lines = kafkaStream.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" "))
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
stream.start()
stream.awaitTermination()
当我运行上面的程序时,它会在终端上显示输出:
16/10/24 15:27:20 错误执行程序:阶段 0.0 (TID 0) 中任务 0.0 中的异常 java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class 在 kafka.utils.Pool.(Pool.scala:28) 在 kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:91) 在 kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:143) 在 kafka.consumer.Consumer$.create(ConsumerConnector.scala:94) 在 org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100) 在 org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149) 在 org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131) 在 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:597) 在 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:587) 在 org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1993) 在 org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1993) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 在 org.apache.spark.scheduler.Task.run(Task.scala:86) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:745) 引起:java.lang.ClassNotFoundException:scala.collection.GenTraversableOnce$class 在 java.net.URLClassLoader.findClass(URLClassLoader.java:381) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:424) 在 sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 17 更多 24 年 16 月 10 日 15:27:20 错误 SparkUncaughtExceptionHandler:线程线程中未捕获的异常 [执行器任务启动 worker-0,5,main] java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class 在 kafka.utils.Pool.(Pool.scala:28) 在 kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:91) 在 kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:143) 在 kafka.consumer.Consumer$.create(ConsumerConnector.scala:94) 在 org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100) 在 org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149) 在 org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131) 在 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:597) 在 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:587) 在 org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1993) 在 org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1993) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 在 org.apache.spark.scheduler.Task.run(Task.scala:86) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:745) 引起:java.lang.ClassNotFoundException:scala.collection.GenTraversableOnce$class 在 java.net.URLClassLoader.findClass(URLClassLoader.java:381) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:424) 在 sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 17 更多 16/10/24 15:27:20 INFO StreamingContext:从关闭挂钩调用停止(stopGracefully=false) 16/10/24 15:27:20 WARN TaskSetManager: 在阶段 0.0 (TID 0, localhost) 丢失任务 0.0: java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class 在 kafka.utils.Pool.(Pool.scala:28) 在 kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:91) 在 kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:143) 在 kafka.consumer.Consumer$.create(ConsumerConnector.scala:94) 在 org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100) 在 org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149) 在 org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131) 在 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:597) 在 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:587) 在 org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1993) 在 org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1993) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 在 org.apache.spark.scheduler.Task.run(Task.scala:86) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:745) 引起:java.lang.ClassNotFoundException:scala.collection.GenTraversableOnce$class 在 java.net.URLClassLoader.findClass(URLClassLoader.java:381) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:424) 在 sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 17 更多
16/10/24 15:27:20 ERROR TaskSetManager: 阶段 0.0 中的任务 0 失败 1 次;
【问题讨论】:
标签: python-2.7 apache-spark apache-kafka pyspark