【问题标题】:Check file on csv file streaming with scala使用 scala 检查 csv 文件流式传输的文件
【发布时间】:2019-07-19 02:38:32
【问题描述】:

我正在使用 spark 流式传输,并且不想在新的流式传输文件每 10 分钟出现一次时处理旧文件:

val val1= spark  
.read //  
.option("header", "true")    
.option("schema", "true")    
.option("sep", ",")    
.csv(path_to_file).toDF().cache()  
val1.registerTempTable("test")

创建数据框后,我进行了一些转换和处理 检查点可以帮助我以及我在我的情况下如何使用

【问题讨论】:

  • 没关系我做

标签: scala apache-spark bigdata cloudera hortonworks-sandbox


【解决方案1】:

*****************解决方案*******************

val spark = SparkSession .builder .appName("测试") .config("spark.local", "local[*]") .getOrCreate() spark.sparkContext.setCheckpointDir(path_checkpoint) 在我调用数据帧上的检查点函数之后 我指定了一个触发器来执行作业

   .writeStream
    .format("csv") 
    .option("codec", "org.apache.hadoop.io.compress.GzipCodec") 
    .option("checkpointLocation",CheckPoint)   
 .trigger(Trigger.ProcessingTime("180 seconds")) 
    .option("Path",Path )  
    .option("header", true)  
    .outputMode("Append")
    .queryName("test")
    .start()

【讨论】:

    猜你喜欢
    • 2011-07-06
    • 2017-04-28
    • 2013-05-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-11-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多