【问题标题】:Writing data to cassandra using spark使用 spark 将数据写入 cassandra
【发布时间】:2016-05-14 16:31:42
【问题描述】:

我有一个用 Scala 编写的 spark 作业,我只是想写一行,用逗号分隔,从 Kafka 生产者到 Cassandra 数据库。 但我无法调用 saveToCassandra。 我看到几个 wordcount 的例子,他们正在用两列将地图结构写入 Cassandra 表,看起来工作正常。但是我有很多列,我发现数据结构需要并行化。 这是我的代码示例:

object TestPushToCassandra extends SparkStreamingJob {
def validate(ssc: StreamingContext, config: Config): SparkJobValidation = SparkJobValid

def runJob(ssc: StreamingContext, config: Config): Any = {

val bp_conf=BpHooksUtils.getSparkConf()
val brokers=bp_conf.get("bp_kafka_brokers","unknown_default")


val input_topics = config.getString("topics.in").split(",").toSet


val output_topic = config.getString("topic.out")


val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, input_topics)


val lines = messages.map(_._2)
val words = lines.flatMap(_.split(","))

val li = words.par

li.saveToCassandra("testspark","table1", SomeColumns("col1","col2","col3"))
li.print()



words.foreachRDD(rdd =>
  rdd.foreachPartition(partition =>
    partition.foreach{
      case x:String=>{

        val props = new HashMap[String, Object]()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")

        val outMsg=x+" from spark"
        val producer = new KafkaProducer[String,String](props)
        val message=new ProducerRecord[String, String](output_topic,null,outMsg)
        producer.send(message)
      }
    }


  )
)


ssc.start()
ssc.awaitTermination()
}
}

我认为这是 Scala 的语法不正确。 提前致谢。

【问题讨论】:

  • 调用 words.par 几乎肯定不是正确的做法。 Dstream“词”已经是一个 DStream,它本质上已经分布和并行化。没有那个你有什么问题?
  • 它可以在没有“.par”的情况下工作,但现在我想知道如何拆分值以提取 col1、col2、col3 的值?例如,如果在 kafka 生产者内部我写了“val1,val2,val3”,那么如何提取这些值并分别存储在 col1、col2 和 col3 中?
  • 你是说你不能 .split(",") 字符串?
  • 是的。所以基本上如果我从我的生产者那里传递“val1,val2,val3”,我的变量“lines”和“words”在上面的代码中应该是什么代码,以便我可以在 words 变量上调用 saveToCassandra?

标签: scala apache-spark cassandra apache-kafka


【解决方案1】:

您需要将您的话 DStream 更改为连接器可以处理的内容。

像一个元组

val words = lines
  .map(_.split(","))
  .map( wordArr => (wordArr(0), wordArr(1), wordArr(2)) 

或案例类

case class YourRow(col1: String, col2: String, col3: String)
val words = lines
  .map(_.split(","))
  .map( wordArr => YourRow(wordArr(0), wordArr(1), wordArr(2)))

或 CassandraRow

这是因为如果您在其中单独放置一个数组,它可能是 C* 中的一个数组,您尝试插入而不是 3 列。

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md

【讨论】:

  • 感谢您的回答。当我尝试使用您的解决方案时,它在数据库中的存储方式有所不同。也许我错过了一些小语法。如果我通过 abc,def,ghi 来自 Kafka 生产者,这是我的代码: val lines = messages.map(._2) val words =lines.flatMap(.split(" ")) val innerWords = words.flatMap(_.split(",")) val wordCounts = innerWords.map(wordArr = (wordArr(0),wordArr(1),wordArr(2))) wordCounts.saveToCassandra("keyspace01","table1" , SomeColumns("col1","col2","col3")) 这段代码在数据库中产生三个条目,即第一个:a,b,c 第二个:d,e,f 第三个:g,h,i
  • 哎呀我不应该复制你的线,应该是地图而不是平面地图
  • 如果我首先使用带有单词的映射进行拆分,那么它会在“case x:String”行的 words.foreachRDD 函数中给出编译器错误。它说scrutinee与pattern type不兼容;发现:需要字符串:数组[字符串]
  • 啊,它成功了,我只需要将这种情况从 x:String 更改为 x:Array[String]。非常感谢!!!
  • 我的下一个任务是将 csv 文件发送给 kafka 生产者,以便 spark 作业将设法将它们添加到数据库中。你有我可以遵循的资源来实现这样的目标吗?
猜你喜欢
  • 2019-10-15
  • 2018-10-06
  • 2020-07-17
  • 2015-05-24
  • 1970-01-01
  • 2017-05-06
  • 2018-12-10
  • 2017-01-07
  • 2019-11-12
相关资源
最近更新 更多