【问题标题】:Flink Streaming Event time WindowFlink Streaming Event 时间窗口
【发布时间】:2018-06-02 02:10:53
【问题描述】:

我正在运行一个简单的示例来测试基于 EventTime 的窗口。我能够生成具有处理时间的输出,但是当我使用 EventTime 时,没有输出。请帮助我了解我做错了什么。

我正在创建一个大小为 10 秒的 SlidingWindow,它每 5 秒滑动一次,在窗口结束时,系统将发出在此期间接收到的消息数。

input :
a,1513695853 (generated at 13th second, received at 13th second) 
a,1513695853 (generated at 13th second, received at 13th second) 
a,1513695856 (generated at 16th second, received at 19th second) 
a,1513695859 (generated at 13th second, received at 19th second) 

第二个字段表示事件的时间戳,代表一分钟的第 13、13、16、19 秒。

if i am using Processing Time window :

Output :
(a,1)
(a,3)
(a,2)

但是当我使用事件时间时,没有输出正在打印。请帮助我了解出了什么问题。

package org.apache.flink.window.training;

import java.io.InputStream;
import java.util.Properties;

import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import com.fasterxml.jackson.databind.ObjectMapper;

public class SocketStream {


  private static Properties properties = new Properties();

  public static void main(String args[]) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    InputStream inputStream =
        SocketStream.class.getClassLoader().getResourceAsStream("local-kafka-server.properties");

    properties.load(inputStream);

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    FlinkKafkaConsumer010<String> consumer =
        new FlinkKafkaConsumer010<>("test-topic", new SimpleStringSchema(), properties);

    DataStream<Element> socketStockStream =
        env.addSource(consumer).map(new MapFunction<String, Element>() {
          @Override
          public Element map(String value) throws Exception {

            String split[] = value.split(",");
            Element element = new Element(split[0], Long.parseLong(split[1]));

            return element;
          }
        }).assignTimestampsAndWatermarks(new TimestampExtractor());

    socketStockStream.map(new MapFunction<Element, Tuple2<String, Integer>>() {

      @Override
      public Tuple2<String, Integer> map(Element value) throws Exception {

        return new Tuple2<String, Integer>(value.getId(), 1);
      }
    }).keyBy(0).timeWindow(Time.seconds(10), Time.seconds(5))
    .sum(1).
     print();

    env.execute();
  }

  public static class TimestampExtractor implements AssignerWithPunctuatedWatermarks<Element> {

    private static final long serialVersionUID = 1L;

    @Override
    public long extractTimestamp(Element element, long previousElementTimestamp) {

      return element.getTimestamp();
    }

    @Override
    public Watermark checkAndGetNextWatermark(Element lastElement, long extractedTimestamp) {
      // TODO Auto-generated method stub
      return null;
    }
  }
}

【问题讨论】:

    标签: apache-kafka apache-flink flink-streaming


    【解决方案1】:

    事件时间处理需要正确生成timestamps and watermarks

    代码中的TimestampExtractor 不会生成水印,但总是返回null

    【讨论】:

    • Thnks Fabian 在分配水印时工作得很好.....但是我仍然很好奇我是否将水印分配为空,因为水印 (t) 发生了什么只是表明不应该有元素t'
    • 返回null表示水印没有更新。因此,水印将始终保持在Long.MIN_VALUE 并且永远不会取得进展,这意味着无法计算任何窗口。
    猜你喜欢
    • 2017-10-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-08-08
    • 1970-01-01
    相关资源
    最近更新 更多