【问题标题】:kafka to sparkstreaming to HDFSkafka 触发流式传输到 HDFS
【发布时间】:2016-04-27 07:43:40
【问题描述】:

我正在使用 creatDirectStream 来集成 SparkStreaming 和 Kafka。这是我使用的代码:

val ssc = new StreamingContext(new SparkConf, Seconds(10))
    val kafkaParams = Map("metadata.broker.list" -> "sandbox:6667")
    val topics = Set("topic1")

    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics)

现在我想将消息存储到 HDFS 中。这样做对吗?

messages.saveAsTextFiles("/tmp/spark/messages")

【问题讨论】:

    标签: apache-spark hdfs apache-kafka


    【解决方案1】:

    saveAsTextFiles("/tmp/spark/messages") - 这会将您的数据保存在本地文件系统中,如果提供的文件夹结构(“/tmp/spark/messages”)是您本地 HDFS 的一部分,那么它也会显示在 HDFS 目录中,因为 @987654322 @ 利用相同的 MapeReduce API 来编写输出。

    上述方法适用于 Spark Executors 和 HDFS 在同一台物理机器上的情况,但如果您的 HDFS 目录或 URL 不同,而不是在运行 executors 的同一台机器上,那么这将不起作用。

    如果您需要确保您的数据持久保存在 HDFS 中,那么作为一种良好做法,您应该始终提供完整的 HDFS URL。像这样 - saveAsTextFiles("http://<HOST-NAME>:9000/tmp/spark/messages")

    或者您也可以利用以下任一方法:-

    1. DStream.saveAsNewAPIHadoopFiles(<HDFS URL with Location>)
    2. DStream.saveAsHadoopFiles(<HDFS URL with Location>)

    【讨论】:

    • 我不知道这是否在所有情况下都是正确的,我使用的是普通路径,默认情况下它会转到 hdfs
    • 这也不是真的,它会创建他们不存在的文件夹,或者你可以保存到你想要的根目录
    • @SebastianPiu - 我从来没有说过它不会创建文件夹。您可能需要再次阅读我的答案。仅针对文件夹路径提供完整的 HDFS URL 时会有所不同。同样,最好始终提供完整的 URL+路径
    • 对不起,我不明白你在说什么,我只是这样做,而不是保存在本地系统中,它直接进入 HDFS,你是这个意思吗?
    • 在您的情况下,执行程序在您拥有 HDFS 的同一台机器上运行,并且您拥有专用的整个磁盘作为 HDFS 的一部分,因此它可以工作。现在想想你的执行器在不同的机器上运行并且 HDFS 在不同的节点上的场景,或者你已经定义了磁盘的特定文件夹作为 HDFS 的一部分,不一定从系统的根目录开始。在这些情况下,您需要指定完整的 HDFS URL,这就是始终建议指定完整的 HDFS URL 的原因。
    猜你喜欢
    • 2016-11-16
    • 2018-10-08
    • 2018-03-19
    • 2019-05-19
    • 1970-01-01
    • 2021-08-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多