【问题标题】:Save data from Kafka topic to Cassandra将数据从 Kafka 主题保存到 Cassandra
【发布时间】:2016-05-27 22:45:45
【问题描述】:

我正在学习 Spark 流式传输,并尝试使用 spark-streaming 和 Cassandra Spark 连接器将从 Kafka 主题收到的示例 Stock 数据(只是字符串,如“MSFT:28.29”)保存到 Cassandra。

如果不保存到 Cassandra,我的代码可以正常工作(从 Kafka 获取数据并进行一些简单的统计计算)。 Cassandra 已配置,连接也已建立。

但如果我尝试添加以下行以在处理之前将原始数据保存到 Cassandra 表:

 stockParsed.saveToCassandra("dashboard","raw_tick")

在 Spark 流式 UI 中,我看到 1 个批次处于“处理中”状态,其余批次处于“已排队”状态,Cassandra 中没有任何数据。

在 Spark 控制台中,我只看到以下行:

16/02/16 10:18:40 INFO JobScheduler: Added jobs for time 1455635920000 ms
16/02/16 10:18:50 INFO JobScheduler: Added jobs for time 1455635930000 ms
16/02/16 10:19:00 INFO JobScheduler: Added jobs for time 1455635940000 ms

这是我的代码:

case class Stock(ticker: String, price: Double)
// ....

val conf = new SparkConf().setAppName("KafkaStream").setMaster("local[*]")
  .set("spark.cassandra.connection.host", "localhost")
  .set("spark.cassandra.auth.username", "cassandra")
  .set("spark.cassandra.auth.password", "cassandra")
  .set("spark.cassandra.connection.keep_alive_ms","60000")
  .set("spark.cassandra.input.split.size_in_mb","1")

val ssc = new StreamingContext(conf, Seconds(10))

val topicMap = Map("test" -> 1)

val lines = KafkaUtils.createStream(ssc, "localhost:2181", "test-group", topicMap).map(_._2)

val stockParsed = lines.map(line => line.split(':')).map(s => Stock(s(0).toString, s(1).toDouble))

//Problem here
stockParsed.saveToCassandra("dashboard","raw_tick",SomeColumns("ticker", "price"))

//Some processing below

我的 build.sbt:

import sbt.Keys._

name := "KafkaStreamSbt"

version := "1.0"

scalaVersion := "2.10.6"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"  % "provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-assembly" % "1.6.0"
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector-java" % "1.5.0-RC1"
libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.16"

有什么解决办法吗?

【问题讨论】:

  • 您是否至少分配了 2 个核心?
  • 2 核用于什么?我使用“local[*]”选项在本地启动 Spark
  • 这应该将机器上的所有核心设置为可用。你需要至少一个执行器核心来运行接收器(如果你在接收器模式下运行)如果只有一个核心可用,你只能运行接收器而不实际处理数据。
  • 如何配置?我在这个虚拟机上有 4 个核心
  • 那么 * 应该设置为 4。你也总是可以明确地做 local[4] 。如果这不能解决问题,我会检查您的执行程序日志

标签: cassandra apache-kafka spark-streaming spark-cassandra-connector


【解决方案1】:

问题已解决:我在 Cassandra 密钥空间配置中出错。使用此脚本重新创建键空间后:

CREATE KEYSPACE tutorial WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};

代码运行良好。

【讨论】:

    猜你喜欢
    • 2022-01-11
    • 2021-10-23
    • 1970-01-01
    • 1970-01-01
    • 2022-09-25
    • 2020-07-17
    • 2021-11-16
    • 1970-01-01
    • 2016-05-01
    相关资源
    最近更新 更多