【问题标题】:Flink Scala join between two Streams doesn't seem to work两个流之间的 Flink Scala 连接似乎不起作用
【发布时间】:2018-01-10 18:40:28
【问题描述】:

我想加入来自 Kafka 生产者的两个流 (json)。 如果我过滤数据,该代码将起作用。但是当我加入他们时,它似乎不起作用。我想将加入的流打印到控制台,但什么也没有出现。 这是我的代码

import java.util.Properties 
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.json4s._
import org.json4s.native.JsonMethods
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object App {

def main(args : Array[String]) {

case class Data(location: String, timestamp: Long, measurement: Int, unit: String, accuracy: Double)
case class Sensor(sensor_name: String, start_date: String, end_date: String, data_schema: Array[String], data: Data, stt: Stt)


case class Datas(location: String, timestamp: Long, measurement: Int, unit: String, accuracy: Double)
case class Sensor2(sensor_name: String, start_date: String, end_date: String, data_schema: Array[String], data: Datas, stt: Stt)


val properties = new Properties();
    properties.setProperty("bootstrap.servers", "0.0.0.0:9092");
    properties.setProperty("group.id", "test");

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val consumer1 = new FlinkKafkaConsumer010[String]("topics1", new SimpleStringSchema(), properties)
   val stream1 = env
   .addSource(consumer1)

   val consumer2 = new FlinkKafkaConsumer010[String]("topics2", new SimpleStringSchema(), properties)
   val stream2 = env
   .addSource(consumer2)

   val s1 = stream1.map { x => {
     implicit val formats = DefaultFormats
     JsonMethods.parse(x).extract[Sensor]
     }
   }
   val s2 = stream2.map { x => {
     implicit val formats = DefaultFormats
     JsonMethods.parse(x).extract[Sensor2]
     }
   }

  val s1t = s1.assignAscendingTimestamps { x => x.data.timestamp }
  val s2t = s2.assignAscendingTimestamps { x => x.data.timestamp }

  val j1pre = s1t.join(s2t)
              .where(_.data.unit)
              .equalTo(_.data.unit)
              .window(TumblingEventTimeWindows.of(Time.seconds(2L)))
              .apply((g, s) => (s.sensor_name, g.sensor_name, s.data.measurement))
   env.execute()

}

}

我认为问题出在时间戳的分配上。我认为这两个来源上的assignAscendingTimestamp 不是正确的功能。

kafka 生产者生成的 json 有一个字段data.timestamp,应该分配为时间戳。但我不知道如何管理。

我还认为我应该为传入的元组提供一个时间窗口批处理(如在 spark 中)。但我不确定这是正确的解决方案。

【问题讨论】:

    标签: scala apache-kafka apache-flink flink-streaming


    【解决方案1】:

    我认为您的代码只需要一些小的调整。首先,当你想在EventTime工作时,你应该设置适当的TimeCharacteristic

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    

    您粘贴的代码也缺少流的接收器。如果你想打印到控制台,你应该:

    j1pre.print
    

    您的代码的其余部分似乎没问题。

    【讨论】:

    • 谢谢。现在它起作用了!!!!我添加了TimeCharacteristic,现在一切正常!
    猜你喜欢
    • 2020-07-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-08-04
    相关资源
    最近更新 更多