【发布时间】:2016-05-14 16:31:42
【问题描述】:
我有一个用 Scala 编写的 spark 作业,我只是想写一行,用逗号分隔,从 Kafka 生产者到 Cassandra 数据库。 但我无法调用 saveToCassandra。 我看到几个 wordcount 的例子,他们正在用两列将地图结构写入 Cassandra 表,看起来工作正常。但是我有很多列,我发现数据结构需要并行化。 这是我的代码示例:
object TestPushToCassandra extends SparkStreamingJob {
def validate(ssc: StreamingContext, config: Config): SparkJobValidation = SparkJobValid
def runJob(ssc: StreamingContext, config: Config): Any = {
val bp_conf=BpHooksUtils.getSparkConf()
val brokers=bp_conf.get("bp_kafka_brokers","unknown_default")
val input_topics = config.getString("topics.in").split(",").toSet
val output_topic = config.getString("topic.out")
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, input_topics)
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(","))
val li = words.par
li.saveToCassandra("testspark","table1", SomeColumns("col1","col2","col3"))
li.print()
words.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach{
case x:String=>{
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val outMsg=x+" from spark"
val producer = new KafkaProducer[String,String](props)
val message=new ProducerRecord[String, String](output_topic,null,outMsg)
producer.send(message)
}
}
)
)
ssc.start()
ssc.awaitTermination()
}
}
我认为这是 Scala 的语法不正确。 提前致谢。
【问题讨论】:
-
调用 words.par 几乎肯定不是正确的做法。 Dstream“词”已经是一个 DStream,它本质上已经分布和并行化。没有那个你有什么问题?
-
它可以在没有“.par”的情况下工作,但现在我想知道如何拆分值以提取 col1、col2、col3 的值?例如,如果在 kafka 生产者内部我写了“val1,val2,val3”,那么如何提取这些值并分别存储在 col1、col2 和 col3 中?
-
你是说你不能 .split(",") 字符串?
-
是的。所以基本上如果我从我的生产者那里传递“val1,val2,val3”,我的变量“lines”和“words”在上面的代码中应该是什么代码,以便我可以在 words 变量上调用 saveToCassandra?
标签: scala apache-spark cassandra apache-kafka