【问题标题】:how to read file names under directory(local file system/hdfs) using flink java api如何使用 flink java api 读取目录(本地文件系统/hdfs)下的文件名
【发布时间】:2017-04-05 13:39:51
【问题描述】:

我是 Flink 新手。其实我正在尝试通过flink java api读取文件和csv转换。

根据我们的要求。 a) 需要将文件夹作为输入参数,输出参数作为 csv 文件名 b) 需要从本地文件系统/HDFS 读取文件 c) 将相同的数据写入csv

我的代码:

public class WriteToCSV {

    public static void main(String[] args) throws Exception {
        final ParameterTool params = ParameterTool.fromArgs(args);
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        List<String> paths = new ArrayList<String>();
        File dir = new File("C://");
        for (File f : dir.listFiles()) {
              paths.add(f.getName());
        }
        DataSet<String> data = env.fromCollection(paths).rebalance();

        DataSet<Tuple2<String, Integer>> counts = 
                    // split up the lines in pairs (2-tuples) containing: (word,1)
                    data.flatMap(new MySplitter()).groupBy(0).sum(1);

        System.out.println(" data -:"+data);
        data.print();
        counts.writeAsCsv("C://new.csv");
    }
}


class MySplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        // normalize and split the line into words
        String[] tokens = value.split("\\W+");

        // emit the pairs
        for (String token : tokens) {
            if (token.length() > 0) {
                out.collect(new Tuple2<String, Integer>(token, 1));
            }
        }
    }
}

我能够获取文件名 (data.print()) 。但是 csv 没有创建,在服务器控制台中也不例外。

【问题讨论】:

    标签: filesystems export-to-csv apache-flink


    【解决方案1】:

    你的代码中没有写入csv的原因是你没有在counts.writeAsCsv("C://new.csv");之后调用env.execute()

    为了进一步改进您的代码,您可以使用env.readTextFile(path),它接受目录的路径并读取该目录中的所有文件,为每一行生成记录。

    【讨论】:

    • DataSet 和 DataStream 程序的 print() 行为不同。 DataSet 程序在调用print() 时触发执行,并将结果写入提交程序的客户端的标准输出。 DataStream 程序不会启动程序(这需要execute())并打印到工作的标准输出。
    • 是的,我知道,但是在提供的示例中,print()writeAsCsv 之前被调用,所以我相信它打印得很好,但输出没有写入 csv。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-09-16
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多