【问题标题】:Spark Streaming Kafka TimeoutSpark Streaming Kafka 超时
【发布时间】:2020-10-29 09:54:47
【问题描述】:

我正在尝试使用 spark-shell 的简单示例在 Amazon EMR 上运行 Spark + Kafka 集成,但我不断收到超时错误。但是,当我使用org.apache.kafka 和以下相同的设置发布时,它可以正常工作。

超时错误:

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

我将 client.truststore.jksclient.keystore.p12 移动到 hdfs 并运行以下内容

$ spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0
import org.apache.spark.sql.functions.col

val kafkaOptions = Map("kafka.bootstrap.servers" -> s"$host:$port",
        "kafka.security.protocol" -> "SSL",
        "kafka.ssl.endpoint.identification.algorithm" -> "",
        "kafka.ssl.truststore.location" -> "/home/hadoop/client.truststore.jks",
        "kafka.ssl.truststore.password" -> "password",
        "kafka.ssl.keystore.type" -> "PKCS12",
        "kafka.ssl.key.password" -> "password",
        "kafka.ssl.keystore.location" -> "/home/hadoop/client.keystore.p12",
        "kafka.ssl.keystore.password" -> "password")
    )

 val df = spark
        .read
        .option("header", true)
        .option("escape", "\"")
        .csv("s3://bucket/file.csv")

 val publishToKafkaDf = df.withColumn("value", col("body"))

 publishToKafkaDf
      .selectExpr( "CAST(value AS STRING)")
      .write
      .format("kafka")
      .option("topic", "test-topic")
      .options(kafkaOptions)
      .save()

【问题讨论】:

    标签: scala apache-spark apache-kafka amazon-emr spark-structured-streaming


    【解决方案1】:

    已解决,这是工作节点的 AWS 安全组出站问题

    【讨论】:

      猜你喜欢
      • 2017-01-11
      • 2019-08-08
      • 2016-03-12
      • 2018-05-17
      • 1970-01-01
      • 1970-01-01
      • 2017-12-28
      • 2019-01-15
      • 2015-10-15
      相关资源
      最近更新 更多