【发布时间】:2018-10-27 22:39:15
【问题描述】:
我正在开发火花流作业(使用结构化流而不使用 DStream)。我从 kafka 收到一条消息,其中将包含许多带有逗号分隔值的字段,其中第一列将是文件名。现在基于该文件名,我将不得不从 HDFS 读取文件并创建一个数据帧并在此基础上进一步操作。这似乎很简单,但 spark 不允许我在调用 start 之前运行任何操作。 Spark 文档也引用了相同的内容。
此外,还有一些 Dataset 方法不适用 流数据集。它们是会立即运行查询的操作 并返回结果,这在流数据集上没有意义。
以下是我尝试过的。
object StructuredStreamingExample {
case class filenameonly(value:String)
def main(args:Array[String])
{
val spark = SparkSession.builder.appName("StructuredNetworkWordCount").master("local[*]").getOrCreate()
spark.sqlContext.setConf("spark.sql.shuffle.partitions", "5")
import spark.implicits._
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "strtest")
.load()
val values=lines.selectExpr("CAST(value AS STRING)").as[String]
val filename = values.map(x => x.split(",")(0)).toDF().select($"value")
//Here how do i convert the filename which is a Dataframe to string and apply that to spark.readtextfile(filename)
datareadfromhdfs
.writeStream
.trigger(ProcessingTime("10 seconds"))
.outputMode("append")
.format("console")
.start()
.awaitTermination()
现在在上面的代码中,在我获得作为 Dataframe 的文件名之后,我如何将其转换为字符串,以便我可以执行 spark.readtextfile(filename) 来读取 HDFS 中的文件。
【问题讨论】:
-
鉴于我们的讨论,我认为标题可能会产生误导。
-
是的,一开始我期待使用 Streaming Dataframe 的解决方案,但经过我们的讨论,这似乎不是流本身的用例。所以让主题相同,我将等待与流数据帧相关的答案。
标签: scala apache-spark spark-structured-streaming