【发布时间】:2016-05-02 02:51:58
【问题描述】:
我需要存储来自 kafka->spark streaming->cassandra 的值。
现在,我正在接收来自 kafka->spark 的值,并且我有一个 spark 作业来将值保存到 cassandra db 中。但是,我遇到了数据类型 dstream 的问题。
在下面的 sn-p 中,您可以看到我如何尝试将 DStream 转换为 python 友好的列表对象,以便我可以使用它,但它给出了错误。
kafka 生产者的输入:
伯恩 24 圣地亚哥 robbyrne@email.com 罗伯
火花工作:
map1={'spark-kafka':1}
kafkaStream = KafkaUtils.createStream(stream, 'localhost:2181', "name", map1)
lines = kafkaStream.map(lambda x: x[1])
words = lines.flatMap(lambda line: line.split(" "))
words.pprint() # outputs-> Byrne 24 SanDiego robbyrne@email.com Rob
list=[lambda word for word in words]
#gives an error -> TypeError: 'TransformedDStream' object is not iterable
这就是我从 spark->cassandra
保存值的方式rdd2=sc.parallelize([{
... "lastname":'Byrne',
... "age":24,
... "city":"SanDiego",
... "email":"robbyrne@email.com",
... "firstname":"Rob"}])
rdd2.saveToCassandra("keyspace2","users")
将 DStream 对象转换为字典的最佳方式是什么,或者我在这里尝试做的最佳方式是什么?
我只需要将从 kafka 收到的值(以 DStream 的形式)保存在 Cassandra 中。
谢谢,任何帮助都会很好!
版本:
Cassandra v2.1.12
Spark v1.4.1
Scala 2.10
【问题讨论】:
-
因为是我的错,与 $JAVA_HOME 的问题有关,而不是 mesos。
-
好吧,放松一下。没有理由在这里变得超级。我们现在不要在这篇文章中发送垃圾邮件。
标签: apache-spark cassandra apache-kafka pyspark datastax