【问题标题】:No output to Kafka topic: Spark Structured Streaming and Kafka Integration没有输出到 Kafka 主题:Spark Structured Streaming 和 Kafka 集成
【发布时间】:2019-02-19 05:12:25
【问题描述】:

我正在尝试使用 kafka sink 将流输出从 Apache Spark 2.3.1 发送到 Apache Kafka:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.udf
import org.apache.kafka.clients
import org.apache.spark.streaming
import java.sql.Timestamp
import java.util.Properties

object CQ3D {    
  def main(args: Array[String]) {

val spark = SparkSession
      .builder
      .appName("test")
      .getOrCreate()

val predictionStreamSchema = new StructType()
  .add("production_id", "long")
  .add("type", "string")

val lines = spark
      .readStream
      .option("sep", ",")
      .schema(testSchema)
      .csv("/path/to/directory/")

val query = lines.selectExpr("CAST(production_id AS STRING) AS key", "type AS value").writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "test")
      .option("checkpointLocation", "/local/directory")
      .outputMode("complete")
      .start()

query.awaitTermination()

我的 build.sbt 文件如下所示:

name := "CQ3D"
version := "0.1"
scalaVersion := "2.11.8"
val sparkVersion = "2.3.1"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion
)

libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion

我的代码使用控制台接收器提供正确的输出,但是在使用 kafka 接收器时没有生成输出或发送到 kafka 主题。我的 kafka zookeeper 和 kafka 服务器在同一台机器上运行。控制台消息如下:

./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 --class CQ3D --master local[4] /home/salman/Development

/SparkStreaming/Scala/target/scala-2.11/cq3d_2.11-0.1.jar
Ivy Default Cache set to: /home/salman/.ivy2/cache
The jars for the packages stored in: /home/salman/.ivy2/jars
:: loading settings :: url = jar:file:/home/salman/spark-2.3.1-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-18e5a4df-cae8-4cf2-92bb-e02af7673888;1.0
    confs: [default]
    found org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.1 in spark-list
    found org.apache.kafka#kafka-clients;0.10.0.1 in spark-list
    found net.jpountz.lz4#lz4;1.3.0 in spark-list
    found org.xerial.snappy#snappy-java;1.1.2.6 in spark-list
    found org.slf4j#slf4j-api;1.7.21 in central
    found org.spark-project.spark#unused;1.0.0 in spark-list
:: resolution report :: resolve 247ms :: artifacts dl 4ms
    :: modules in use:
    net.jpountz.lz4#lz4;1.3.0 from spark-list in [default]
    org.apache.kafka#kafka-clients;0.10.0.1 from spark-list in [default]
    org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.1 from spark-list in [default]
    org.slf4j#slf4j-api;1.7.21 from central in [default]
    org.spark-project.spark#unused;1.0.0 from spark-list in [default]
    org.xerial.snappy#snappy-java;1.1.2.6 from spark-list in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   6   |   0   |   0   |   0   ||   6   |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-18e5a4df-cae8-4cf2-92bb-e02af7673888
    confs: [default]
    0 artifacts copied, 6 already retrieved (0kB/5ms)
2018-09-14 20:14:58 WARN  Utils:66 - Your hostname, salman-ubuntu-desktop resolves to a loopback address: 127.0.1.1; using 150.82.219.122 instead (on interface enp4s0)
2018-09-14 20:14:58 WARN  Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address
2018-09-14 20:14:59 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2018-09-14 20:14:59 INFO  SparkContext:54 - Running Spark version 2.3.1
2018-09-14 20:14:59 INFO  SparkContext:54 - Submitted application: CQ3D
2018-09-14 20:14:59 INFO  SecurityManager:54 - Changing view acls to: salman
2018-09-14 20:14:59 INFO  SecurityManager:54 - Changing modify acls to: salman
2018-09-14 20:14:59 INFO  SecurityManager:54 - Changing view acls groups to: 
2018-09-14 20:14:59 INFO  SecurityManager:54 - Changing modify acls groups to: 
2018-09-14 20:14:59 INFO  SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(salman); groups with view permissions: Set(); users  with modify permissions: Set(salman); groups with modify permissions: Set()
2018-09-14 20:14:59 INFO  Utils:54 - Successfully started service 'sparkDriver' on port 36805.

我是否使用了正确的导入和/或 libraryDependencies?

有时在编译时我会收到以下警告:

[warn] There may be incompatibilities among your library dependencies.
[warn] Run 'evicted' to see detailed eviction warnings

但是,代码仍然使用“sbt package”进行编译。当我使用以下代码执行代码时,我在 kafka 主题中没有得到任何输出?

./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 --class testClass --master local[4] /home/user/Dev/Scala/target/scala-2.11/testClass_2.11-0.1.jar

【问题讨论】:

  • 最后添加 query.awaitTermination() 试试
  • 我用过Query.awaitTermination(),这里忘了说。
  • 删除.outputMode("complete"),因为它不是必需的:spark.apache.org/docs/2.3.1/…
  • 注意:Kafka Connect 是一个内置在 Kafka 中的框架,用于处理此问题。您不需要自己编写 CSV 解析代码。 github.com/jcustenborder/kafka-connect-spooldir
  • 您是否遇到了特定错误?不会读取目录中预先存在的 CSV 文件。启动 Spark Streaming 后必须将文件移动到目录中

标签: apache-spark apache-kafka kafka-producer-api


【解决方案1】:

在 Spark 文档中,它提到对于本地文件系统的 Spark Streaming,文件必须自动移动到源文件夹中。可能有读取现有文件的配置,但我不记得了。

在 cmets 中,我提到了 Kafka Connect,它是用于将数据传输到 Kafka 的内置框架,您只需构建链接项目并运行 Kafka Connect。

否则,如果您已经在使用 Hadoop,我向其他人推荐的工具是 Flume;如果您使用 Elasticsearch 将文件导入 Kafka,则使用 Filebeat / Fluentd。基本上,Spark 对于这样一个简单的程序从本地文件系统读取来说开销太大,并且不需要任何并行性来读取每个文件

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-01-15
    • 2021-02-28
    • 2021-05-22
    • 2020-07-25
    • 1970-01-01
    • 2019-06-08
    • 2021-04-03
    • 2019-07-12
    相关资源
    最近更新 更多