【问题标题】:Flink map stream csv file into TupleFlink 映射流 csv 文件到 Tuple
【发布时间】:2019-01-31 14:50:27
【问题描述】:

我正在尝试将已由 Flink 使用并由 Kafka 生成的 CSV 文件映射到 Tuple4。我的 CSV 文件有 4 列,我想将每一行映射到一个 Tuple4。问题是我不知道如何实现 map() 和 csv2Tuple 函数。

这是我卡住的地方:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

ParameterTool parameterTool = ParameterTool.fromArgs(ARGS);

DataStreamSource<String> myConsumer = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"),
            new SimpleStringSchema(), parameterTool.getProperties()));

DataStream<Tuple4<Integer, Integer, Integer, Integer>> streamTuple = myConsumer.map(new csv2Tuple());
public static class csv2Tuple implements MapFunction<...> {public void map(){...}}

我还想将元组中的项目从 String 解析为 Integer。

【问题讨论】:

  • 抱歉出现错误,现已编辑

标签: java apache-kafka apache-flink map-function data-stream


【解决方案1】:

假设您将每一行 csv 文件作为 Kafka 消息生成并使用 Flink Kafka 连接器使用它,您只需使用 , 拆分每条使用的消息(因为它是一个 csv 文件)。

DataStream<Tuple4<Integer, Integer, Integer, Integer,>> streamTuple = myConsumer.map(new MapFunction<String, Tuple4<Integer, Integer, Integer, Integer>>() {
            @Override
            public Tuple4<Integer, Integer, Integer, Integer> map(String str) throws Exception {
                String[] temp = str.split(",");
                return new Tuple4<>(
                        Integer.parseInt(temp[0]),
                        Integer.parseInt(temp[1]),
                        Integer.parseInt(temp[2]),
                        Integer.parseInt(temp[3])
                );

            }
        });

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-08-16
    • 2023-04-01
    • 2020-04-01
    • 2017-02-28
    • 1970-01-01
    • 1970-01-01
    • 2015-03-31
    相关资源
    最近更新 更多