【问题标题】:How to extract records from Dstream and write into Cassandra (Spark Streaming)如何从 Dstream 中提取记录并写入 Cassandra (Spark Streaming)
【发布时间】:2017-03-29 12:51:19
【问题描述】:

我正在从 Kafka 获取数据并在 Spark Streaming 中进行处理并将数据写入 Cassandra

我正在尝试过滤 DStream 记录,但它没有过滤记录并在 Cassandra 中写入完整记录,

任何关于过滤多列记录的示例/示例代码的建议以及任何帮助都将受到高度赞赏我已经对此进行了研究,但无法获得任何解决方案。

class SparkKafkaConsumer1(val recordStream : org.apache.spark.streaming.dstream.DStream[String], val streaming : StreamingContext) {

val internationalAddress = recordStream.map(line => line.split("\\|")(10).toUpperCase)

def timeToStr(epochMillis: Long): String =
  DateTimeFormat.forPattern("YYYYMMddHHmmss").print(epochMillis)

if(internationalAddress =="INDIA")
{
print("-----------------------------------------------")
recordStream.print()
val riskScore = "1"
val timestamp: Long = System.currentTimeMillis
val formatedTimeStamp = timeToStr(timestamp)
var wc1 = recordStream.map(_.split("\\|")).map(r=>Row(r(0),r(1),r(2),r(3),r(4).toInt,r(5).toInt,r(6).toInt,r(7),r(8),r(9),r(10),r(11),r(12),r(13),r(14),r(15),r(16),riskScore.toInt,0,0,0,formatedTimeStamp))
implicit val rowWriter = SqlRowWriter.Factory
wc1.saveToCassandra("fraud", "fraudrating", SomeColumns("purchasetimestamp","sessionid","productdetails","emailid","productprice","itemcount","totalprice","itemtype","luxaryitem","shippingaddress","country","bank","typeofcard","creditordebitcardnumber","contactdetails","multipleitem","ipaddress","consumer1score","consumer2score","consumer3score","consumer4score","recordedtimestamp"))

} 

(注意:我在 Kafka 中有 internationalAddress = INDIA 的记录,而且我对 Scala 非常陌生)

【问题讨论】:

    标签: scala apache-spark apache-kafka spark-streaming


    【解决方案1】:

    我不太确定您要做什么,但如果您只是想过滤与印度有关的记录,您可以这样做:

    implicit val rowWriter = SqlRowWriter.Factory
    recordStream
       .filter(_.split("\\|")(10).toUpperCase) == "INDIA")
       .map(_.split("\\|"))
       .map(r => Row(...))
       .saveToCassandra(...)
    

    顺便说一句,我认为case classes 对你真的很有帮助。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-04-12
      • 2016-10-07
      • 2020-06-03
      • 2017-09-30
      • 1970-01-01
      • 2021-03-06
      • 2015-12-03
      相关资源
      最近更新 更多