【问题标题】:Spark Streaming - Read Data using the metadata received from KafkaSpark Streaming - 使用从 Kafka 接收的元数据读取数据
【发布时间】:2021-04-10 06:05:57
【问题描述】:

我对 Spark 世界非常陌生。我正在尝试为以下用例编写优化的解决方案:

  1. 需要从 Kafka 读取流数据,主要是一些压缩文件的 S3 文件路径。
  2. 从上面收到的文件路径中读取压缩文件并对其进行处理并将其存储回某个 S3 存储桶。

我能够阅读 Kafka 主题并获取文件路径,但不确定现在如何阅读此文件路径? spark.read.binaryFile(filePath) 之类的东西。

任何帮助或指导将不胜感激。

【问题讨论】:

标签: apache-kafka spark-streaming


【解决方案1】:

这有很多examples

读取压缩文件

rdd = sc.textFile("s3://bucket/lahs/blahblah.*.gz")

没有你的代码很难,这里有一个关于读写的大纲

其余的从这个答案......

 val spark = SparkSession.builder()
    .appName("myKfconsumer")
    .master("local[*]")
    .getOrCreate()

  //... create your schema
  // you path

  val filePath = "file:///tmp/spark/Blah/blah"
  // create it as your batch data
  // someBatchData

  // now read it, you need your schema and write it back in the process section below
  import spark.implicits._
  val kafkaStream = spark.readStream
    .format("kafka")
    .option("kafka.xyz.servers", "localhost:0001")
    .option("subscribe", "blah")
    .option("startingOffsets", "latest")
    .option("failOnDataLoss", "true") // stop and debug it
    .load()
    .as[String]

      kafkaStream.writeStream.foreachBatch((someBatchData:Dataset[String], batchId:Long) => {

    val records = someBatchData.collect()
    // go through all the records    
    records.foreach((path: String) => {
      val yourData = spark.read.schema(.. youfileSchema).json(..youPath)
      // write it back as you wanted..
      //
    })
  }).start()

  spark.streams.awaitAnyTermination()

【讨论】:

  • someBatchData.collect() 通常应避免使用。那不应该是分区上的循环吗?
  • 你好 Transformer,如果我理解正确的话,在批处理上调用 collect 会将驱动程序上的数据作为 Array [String] 调用,然后对此调用 foreach 会导致顺序处理,不是吗?是否可以并行处理记录?对不起,如果这个问题没有任何意义。
  • 嗨@cody 我给出了部分答案/样本来帮助您解决当前的问题。这对我有用,如果它适用于您标记为答案;并提出一个新问题,我也会尝试回答。谢谢
  • @OneCricketeer 我不知道 collect() 不好......你能分享更多关于这个的信息吗?如果您在下面看到我的代码注释,它清楚地表明遍历所有记录...循环很好或委托等...
  • 您应该使用 forEachPartition 而不是 collect 来保持数据与执行程序隔离,而不是发送给驱动程序。此解决方案可能有效,但不是最佳的。抄送@cody
猜你喜欢
  • 1970-01-01
  • 2017-01-26
  • 2014-10-19
  • 1970-01-01
  • 2018-04-04
  • 2015-08-07
  • 1970-01-01
  • 1970-01-01
  • 2018-09-06
相关资源
最近更新 更多