【问题标题】:Kafka to Spark streaming using Pyspark errorKafka 到 Spark 流使用 Pyspark 错误
【发布时间】:2019-10-26 20:25:35
【问题描述】:

我正在尝试从 kafka 主题中获取数据,但我无法做到这一点。 我已经尝试了共享链接的教程,但最后我得到了错误。 我也添加了所有必需的 jar 文件(位置:-​​usr/local/spark/jars)。 请让我知道可能出了什么问题。 我也想知道如何用 scala 编程来做到这一点。

https://spark.apache.org/docs/2.1.0/streaming-programming-guide.html#deploying-applications

https://medium.com/@kass09/spark-streaming-kafka-in-python-a-test-on-local-machine-edd47814746

尝试这个火花流命令我得到了错误。

" bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.1 examples/src/main/python/streaming/direct_kafka_wordcount.py"

我遇到了一个 jupyter 错误,所以我尝试了以下命令来解决它,但错误仍然相同 "pip3 install --upgrade --force-reinstall --no-cache-dir jupyter"

【问题讨论】:

  • 你想提交一个 jupyter notebook 吗?那是行不通的(据我所知)。您可以在 jupyter notebook 中提交 .py 文件或创建 spark 上下文。
  • 请分享 sudo 代码和有关如何运行程序的步骤以更好地提供帮助,这似乎是由于缺少库或类路径错误而导致的错误。
  • 我分享的 medium.com 的第二个链接,我已经完成了完全相同的步骤,但是在运行 python 代码时出现错误。我还遵循了链接 1(spark.apache.org) @SureshChaganti 中的简单字数示例
  • @SureshChaganti 请检查我在问题中添加的图片。
  • @SureshChaganti 嗨,我在尝试了几个解决方案后更新了我的问题,所以现在我收到了这个错误。请检查它并帮助我解决这个问题。

标签: apache-spark pyspark apache-kafka jupyter-notebook


【解决方案1】:

Scala 中的 Spark 和 Kafka 集成

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent


object sparkStreaming_Kafka {

  @transient lazy val log = org.apache.log4j.LogManager.getLogger("sparkStreaming_Kafka")

  def main(args: Array[String]): Unit = {

    log.debug("added the messages ****** ---->>>")

    val spark = SparkSession
      .builder()
      .appName("my_App" )
      .getOrCreate()

    val ssc = new StreamingContext(spark.sparkContext, Seconds(2)) // the polling frequency is 2 seconds, can be modified based on the BM requirements.
    log.debug("Before starting the Stream -->>")
    val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String]
      (Array.apply("my_topic"), getKafkaParams())).map(record => record.value)

    stream.foreachRDD { rdd =>
      try {
        if (!rdd.isEmpty()) {
          rdd.foreach(x => postData(x))
        }
      } catch {
        case t: Throwable =>
          t.printStackTrace() // TODO: handle error
          log.error("Exception occured while processing the data exception is {}", t.getCause)
      }
    }

    ssc.start()
    log.debug("started now-->> " + compat.Platform.currentTime)
    ssc.awaitTermination()
  }

  def getKafkaParams(): Map[String, Object] = {
    Map[String, Object]("bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "group_id",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean)) 
  }


  def postData(event: String): Unit = {
    log.info("before KinesisSink.process call ==>>"+event)

    print(event)  // use the event as per the requirement

  }
}

【讨论】:

  • 我在哪里运行这个 scala 代码?在 spark-shell scala 终端中?
  • 嗨@NishadNazar,请围绕它创建一个maven项目。
  • 嗨,我想我这次做的有点对。在这里,我们有代码,但我猜有些依赖错误。 [错误] 19/06/27 11:24:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... 在适用的情况下使用内置 java 类这是我在 SBT RUN 项目后立即得到的。
  • 嗨@SureshChaganti?
  • 嗨@Suresh Chaganti 只是一个小问题,有没有办法将数据从kafka 发送到除spark 之外的任何数据库。请让我知道这件事。
猜你喜欢
  • 2016-06-11
  • 2021-05-08
  • 1970-01-01
  • 2018-01-09
  • 1970-01-01
  • 1970-01-01
  • 2017-06-27
  • 2018-08-10
  • 2021-05-08
相关资源
最近更新 更多