【发布时间】:2017-11-20 18:55:26
【问题描述】:
问题:问题是这个程序在每个窗口不止一次写入Kafka(每个窗口创建2-3行或更多行,同时它应该每个窗口创建1行与reduce 函数一样,它只允许一个元素)。我用 Spark 编写了相同的代码,并且运行良好。我一直在尝试查找有关此问题的信息,但没有找到任何东西:(。此外,我一直在尝试更改某些函数的并行性和更多的东西,但没有任何效果,我无法意识到问题出在哪里。
我正在测试 Flink 延迟。这里有我的问题的环境:
集群:我使用 Flink 1.2.0 和 OpenJDK 8。我有 3 台计算机:1 台 JobManager,2 台 TaskManager(4 核,2GB RAM,每个 TaskManager 4 个任务槽)。
输入数据: 由一个 java 生产者生成到 Kafka 24 分区主题的行,包含两个元素:增量值和创建时间戳:
- 1 1497790546981
- 2 1497790546982
- 3 1497790546983
- 4 1497790546984
- .................................
我的 Java 类:
- 它从具有 24 个分区的 Kafka 主题中读取数据(Kafka 与 JobManager 在同一台机器上)。
-
filter函数与union一起没有用,因为我使用它们只是为了检查它们的延迟。 - 基本上,它在每一行加一个“1”,然后每2秒有一个
tumbling window,reduce函数将所有这个1和所有时间戳相加,最后一个时间戳随后被划分为@ 987654326@ 函数介于 1 的总和之间,这给了我平均值,最后在最后一个map函数中,它将当前时刻的时间戳添加到每个减少的行以及该时间戳与平均时间戳之间的差异。 -
此行写入 Kafka(写入 2 个分区的主题)。
//FLINK CONFIGURATION final StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //KAFKA CONSUMER CONFIGURATION Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.0.155:9092"); FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<>(args[0], new SimpleStringSchema(), properties); //KAFKA PRODUCER Properties producerConfig = new Properties(); producerConfig.setProperty("bootstrap.servers", "192.168.0.155:9092"); producerConfig.setProperty("acks", "0"); producerConfig.setProperty("linger.ms", "0"); //MAIN PROGRAM //Read from Kafka DataStream<String> line = env.addSource(myConsumer); //Add 1 to each line DataStream<Tuple2<String, Integer>> line_Num = line.map(new NumberAdder()); //Filted Odd numbers DataStream<Tuple2<String, Integer>> line_Num_Odd = line_Num.filter(new FilterOdd()); //Filter Even numbers DataStream<Tuple2<String, Integer>> line_Num_Even = line_Num.filter(new FilterEven()); //Join Even and Odd DataStream<Tuple2<String, Integer>> line_Num_U = line_Num_Odd.union(line_Num_Even); //Tumbling windows every 2 seconds AllWindowedStream<Tuple2<String, Integer>, TimeWindow> windowedLine_Num_U = line_Num_U .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2))); //Reduce to one line with the sum DataStream<Tuple2<String, Integer>> wL_Num_U_Reduced = windowedLine_Num_U.reduce(new Reducer()); //Calculate the average of the elements summed DataStream<String> wL_Average = wL_Num_U_Reduced.map(new AverageCalculator()); //Add timestamp and calculate the difference with the average DataStream<String> averageTS = wL_Average.map(new TimestampAdder()); //Send the result to Kafka FlinkKafkaProducer010Configuration<String> myProducerConfig = (FlinkKafkaProducer010Configuration<String>) FlinkKafkaProducer010 .writeToKafkaWithTimestamps(averageTS, "testRes", new SimpleStringSchema(), producerConfig); myProducerConfig.setWriteTimestampToKafka(true); env.execute("TimestampLongKafka"); } //Functions used in the program implementation: public static class FilterOdd implements FilterFunction<Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; public boolean filter(Tuple2<String, Integer> line) throws Exception { Boolean isOdd = (Long.valueOf(line._1.split(" ")[0]) % 2) != 0; return isOdd; } }; public static class FilterEven implements FilterFunction<Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; public boolean filter(Tuple2<String, Integer> line) throws Exception { Boolean isEven = (Long.valueOf(line._1.split(" ")[0]) % 2) == 0; return isEven; } }; public static class NumberAdder implements MapFunction<String, Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; public Tuple2<String, Integer> map(String line) { Tuple2<String, Integer> newLine = new Tuple2<String, Integer>(line, 1); return newLine; } }; public static class Reducer implements ReduceFunction<Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; public Tuple2<String, Integer> reduce(Tuple2<String, Integer> line1, Tuple2<String, Integer> line2) throws Exception { Long sum = Long.valueOf(line1._1.split(" ")[0]) + Long.valueOf(line2._1.split(" ")[0]); Long sumTS = Long.valueOf(line1._1.split(" ")[1]) + Long.valueOf(line2._1.split(" ")[1]); Tuple2<String, Integer> newLine = new Tuple2<String, Integer>(String.valueOf(sum) + " " + String.valueOf(sumTS), line1._2 + line2._2); return newLine; } }; public static class AverageCalculator implements MapFunction<Tuple2<String, Integer>, String> { private static final long serialVersionUID = 1L; public String map(Tuple2<String, Integer> line) throws Exception { Long average = Long.valueOf(line._1.split(" ")[1]) / line._2; String result = String.valueOf(line._2) + " " + String.valueOf(average); return result; } }; public static final class TimestampAdder implements MapFunction<String, String> { private static final long serialVersionUID = 1L; public String map(String line) throws Exception { Long currentTime = System.currentTimeMillis(); String totalTime = String.valueOf(currentTime - Long.valueOf(line.split(" ")[1])); String newLine = line.concat(" " + String.valueOf(currentTime) + " " + totalTime); return newLine; } };
一些输出数据:此输出已写入 2 个分区的主题,并且生成速率小于 1000 条记录/秒(**在这种情况下它创建 3 个输出行每个窗口):
- 1969 1497791240910 1497791241999 1089 1497791242001 1091
- 1973 1497791240971 1497791241999 1028 1497791242002 1031
- 1970 1497791240937 1497791242094 1157 1497791242198 1261
- 1917 1497791242912 1497791243999 1087 1497791244051 1139
- 1905 1497791242971 1497791243999 1028 1497791244051 1080
- 1916 1497791242939 1497791244096 1157 1497791244199 1260
- 1994 1497791244915 1497791245999 1084 1497791246002 1087
- 1993 1497791244966 1497791245999 1033 1497791246004 1038
- 1990 1497791244939 1497791246097 1158 1497791246201 1262
提前致谢!
【问题讨论】:
-
我不确定发生了什么,但有几件事似乎有点奇怪:为什么将时间特征设置为事件时间,然后从不设置时间戳提取器和水印生成器,然后使用处理时间窗口?为什么要有一个能够并行处理的基础架构,然后使用 windowAll,它将所有记录收集到一个任务中?
-
TimeCharacteristic:我稍后使用消费者将结果复制到文件中,并且我使用 record.timestamp() 所以我需要在我的程序中 setWriteTimestampToKafka(true)。 windowAll:我还没有完成,我想在两个过滤器之后添加一个键,以便可以使用 parallelism=2 完成窗口
-
我忘了,我把EventTime改成了ProcessingTime!谢谢!
标签: streaming apache-kafka apache-flink windowing