【问题标题】:Flink map tuple to stringFlink 将元组映射到字符串
【发布时间】:2017-11-02 23:45:13
【问题描述】:

您好,我有一些元组 Tuple2<String, Integer>,我想将其转换为字符串,然后将其发送到 KAFKA。

我试图找出一种方法来迭代元组并从中创建一个字符串,所以如果我的元组中有 N 个元素,我想创建一个包含它们的字符串。

我尝试了平面映射,但我为元组中的每个元素获取了新字符串。

  SingleOutputStreamOperator<String> s = t.flatMap(new FlatMapFunction<Tuple2<String, Integer>, String>() {
            @Override
            public void flatMap(Tuple2<String, Integer> stringIntegerTuple2, Collector<String> collector) throws Exception {

            collector.collect(stringIntegerTuple2.f0 + stringIntegerTuple2.f1);
            }
        });

从元组中创建字符串的正确方法是什么。

【问题讨论】:

  • 为了确保我正确理解您的目标,假设您有 Tuple3 myTupleObject。假设字符串值是红色、绿色和蓝色。您的理想目标是说“String myKafkaString = convertTupleToString(myTupleObject)”,并让“red、green、blue”成为 myKafkaString 的值,对吧?我问的原因,除了确保我理解正确之外,我想知道你为什么不只是序列化元组并将其发送到 kafka。
  • 我想通过kafka将消息发送到influx db的主要原因,因此我需要将数据作为influx line协议发送,映射的理想结果将是这样的。 " 测量颜色=red,color=green,color=blue"

标签: apache-flink flink-streaming


【解决方案1】:

您可以使用自定义类覆盖元组的.toString() 方法并使用它。像这样:

import org.apache.flink.api.java.tuple.Tuple3;

public class CustomTuple3 extends Tuple3 {

    @Override
    public String toString(){
        return "measurment color=" + this.f0.toString() + " color=" + this.f1.toString() + " color=" + this.f2.toString();
    }
}

所以现在只需使用 CustomTuple3 对象而不是 Tuple3,当您填充它并在其上调用 .toString() 时,它将输出该格式化字符串。

【讨论】:

    猜你喜欢
    • 2012-01-23
    • 2021-11-15
    • 1970-01-01
    • 1970-01-01
    • 2013-11-19
    • 2012-08-21
    • 2011-01-18
    • 2017-01-31
    • 1970-01-01
    相关资源
    最近更新 更多