【问题标题】:Flink: How to write DataSet to a variable instead of to a fileFlink:如何将 DataSet 写入变量而不是文件
【发布时间】:2019-12-24 01:26:14
【问题描述】:

我有一个使用 DataSet API 在 scala 中编写的 flink 批处理程序,它产生了我感兴趣的最终数据集。我想将该数据集作为变量或值(例如字符串列表或序列)在我的程序,而无需将其写入任何文件。有可能吗?

我已经看到 flink 允许收集数据接收器以进行调试(他们的文档中唯一的示例是 Java)。但是,这只允许在本地执行中,无论如何我不知道它在 Scala 中的等价物。我想要的是在对程序值或变量进行整个flink并行执行之后编写最终结果数据集。

【问题讨论】:

    标签: scala apache-flink


    【解决方案1】:

    首先,为收集数据接收器的 scala 版本尝试这个: 导入 org.apache.flink.api.scala._ 导入 org.apache.flink.api.java.io.LocalCollectionOutputFormat;

     .
     .
    val env = ExecutionEnvironment.getExecutionEnvironment
    
    // Create a DataSet from a list of elements
    val words = env.fromElements("w1","w2", "w3")
    
    var outData:java.util.List[String]= new java.util.ArrayList[String]()
    words.output(new LocalCollectionOutputFormat(outData))
    
    // execute program
    env.execute("Flink Batch Scala")
    println(outData)
    

    其次,如果您的数据集适合单机内存,为什么需要使用分布式处理框架?我认为您应该更多地考虑您的用例!并尝试在您的数据集上使用正确的transformations

    【讨论】:

    • 感谢您的回答!很抱歉造成误解,初始数据集不适合内存,但是在应用所有转换后作为答案的数据集确实可以(这就像从一个非常大的项目列表中找到给定某些条件的组的最大值)。
    • 但有时在流环境中它不会发出预期的结果,不知道为什么
    • 得到了答案,导致它异步,也许,解决方案就是我在下面发布的解决方案
    【解决方案2】:

    我使用 flink 1.72 和 scala 2.12。这是使用 SVM 进行的 流式 预测,我将其封装在 Model 类中。我认为最正确的答案是使用collect()。它将返回 Seq。搜索了几个小时后我得到了这个答案。我从Flink Git - Line 95得到这个想法

    var temp_jaringan : DataSet[(Vector,Double)] = model.predict_jaringan(value)
    temp_jaringan.print()
    
    var temp_produk : DataSet[(Vector,Double)] = model.predict_produk(value)
    temp_produk.print()
    
    var result_jaringan : Seq[(Vector,Double)] = temp_jaringan.collect()
    var result_produk : Seq[(Vector,Double)] = temp_produk.collect()
    
    if(result_jaringan(0)._2  == 1.0  && result_produk(0)._2 == 1.0 ){
      println("Keduanya")
    }else if(result_jaringan(0)._2  == 1.0  && result_produk(0)._2 == -1.0){
      println("Jaringan")
    }else if(result_jaringan(0)._2  == -1.0  && result_produk(0)._2 == 1.0){
      println("Produk")
    }else{
      println("Bukan Keduanya")
    }
    

    可能会因其他版本而异。因为在我作为毕业要求的最后一个项目中像疯狗一样使用和搜索了几周甚至几个月的 flink 材料后,我知道这个 flink 开发项目需要更多的文档和教程,尤其是对于像我这样的初学者。

    无论如何,如果我错了,请纠正我。谢谢!

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-12-19
      • 1970-01-01
      • 1970-01-01
      • 2011-08-12
      • 2015-06-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多