【问题标题】:Saving values from spark to Cassandra将值从 spark 保存到 Cassandra
【发布时间】:2016-05-02 02:51:58
【问题描述】:

我需要存储来自 kafka->spark streaming->cassandra 的值。

现在,我正在接收来自 kafka->spark 的值,并且我有一个 spark 作业来将值保存到 cassandra db 中。但是,我遇到了数据类型 dstream 的问题。

在下面的 sn-p 中,您可以看到我如何尝试将 DStream 转换为 python 友好的列表对象,以便我可以使用它,但它给出了错误。

kafka 生产者的输入:

伯恩 24 圣地亚哥 robbyrne@email.com 罗伯

火花工作:

map1={'spark-kafka':1}
kafkaStream = KafkaUtils.createStream(stream, 'localhost:2181', "name", map1)
lines = kafkaStream.map(lambda x: x[1])
words = lines.flatMap(lambda line: line.split(" "))

words.pprint() # outputs-> Byrne 24 SanDiego robbyrne@email.com Rob

list=[lambda word for word in words]
#gives an error -> TypeError: 'TransformedDStream' object is not iterable

这就是我从 spark->cassandra

保存值的方式
rdd2=sc.parallelize([{
... "lastname":'Byrne',
... "age":24,
... "city":"SanDiego",
... "email":"robbyrne@email.com",
... "firstname":"Rob"}])
rdd2.saveToCassandra("keyspace2","users")

将 DStream 对象转换为字典的最佳方式是什么,或者我在这里尝试做的最佳方式是什么?

我只需要将从 kafka 收到的值(以 DStream 的形式)保存在 Cassandra 中。

谢谢,任何帮助都会很好!

版本:

Cassandra v2.1.12
Spark v1.4.1
Scala 2.10

【问题讨论】:

  • 因为是我的错,与 $JAVA_HOME 的问题有关,而不是 mesos。
  • 好吧,放松一下。没有理由在这里变得超级。我们现在不要在这篇文章中发送垃圾邮件。

标签: apache-spark cassandra apache-kafka pyspark datastax


【解决方案1】:

就像所有“火花”一样,我认为应该做一个简短的解释,因为即使您熟悉 RDD,DStream 也是一个更高的概念:
离散化流(DStream)是相同类型的 RDD 的连续序列,表示连续的数据流。在您的情况下,DStreams 是从实时 Kafka 数据创建的。
当 Spark Streaming 程序运行时,每个 DStream 会定期从实时 Kafka 数据生成一个 RDD

现在,要迭代接收到的 RDD,您需要使用 DStream#foreachRDD(正如其名称所暗示的,它的用途与 foreach 相似,但这次是迭代 RDD)。
拥有 RDD 后,您可以调用 rdd.collect()rdd.take() 或任何其他用于 RDD 的标准 API。

现在,作为结束说明,为了让事情变得更有趣,Spark 引入了一种新的无接收器“直接”方法,以确保更强大的端到端保证。
KafkaUtils.createDirectStream 需要 Spark 1.3+)
这种方法不是使用接收器来接收数据,而是定期向 Kafka 查询每个主题+分区中的最新偏移量,并相应地定义要在每批中处理的偏移量范围。当处理数据的作业启动时,Kafka 的简单消费者 API 用于从 Kafka 读取定义的偏移范围。
(这是一种很好的方式来说明您将不得不自己“弄乱”偏移量)

更多详情请参阅Direct Streams Approach
Scala 代码示例见here

【讨论】:

    【解决方案2】:

    根据 spark-cassandra 连接器的官方文档:https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md

    import com.datastax.spark.connector.streaming._
    
    val ssc = new StreamingContext(conf, Seconds(n))
    
    val stream = ...
    
    val wc = stream
            .map(...)
            .filter(...)   
            .saveToCassandra("streaming_test", "words", SomeColumns("word", "count")) 
    
    ssc.start()
    

    【讨论】:

      【解决方案3】:

      其实我在本教程http://katychuang.me/blog/2015-09-30-kafka_spark.html中找到了答案。

      【讨论】:

      • @HackCode- 我也在尝试执行相同的示例,但我在 saveToCassandra('keyspace','table') 行遇到错误。错误 - py4j.protocol.Py4JJavaError: 发生错误在调用 o38.newInstance 时。我缺少什么你能建议我吗?
      猜你喜欢
      • 2016-01-30
      • 1970-01-01
      • 2015-05-10
      • 1970-01-01
      • 2018-03-16
      • 2015-03-06
      • 2016-05-01
      • 2020-08-24
      • 1970-01-01
      相关资源
      最近更新 更多