【问题标题】:Apache Flink Stochastic Outlier Selection on Data StreamApache Flink 对数据流的随机异常值选择
【发布时间】:2018-08-20 16:14:51
【问题描述】:

我正在尝试使用 Apache Flink ML 包的 StochasticOutlierSelection 模型。

我无法弄清楚如何将它与 Kafka 作为数据源一起使用,我知道它需要一个 DataSet 而不是 DataStream,但我似乎无法将我的 Kafka DataStream 窗口化为一个 DataSet。

有没有一种方法可以将我的流视为一系列小型数据集。例如,有没有办法说流中匹配模式的每 10 个元素(按元素唯一 ID 滑动窗口)将它们视为固定大小的 DataSet 并检测此固定大小数据集中的任何异常值?

我要创建的场景是:

数据源 -> Kafka Topic 1 -> Flink 预处理 -> Kafka Topic 2 -> Flink Groups By ID -> 组上的异常值检测

我已经有一个可以工作的实现到预处理,我希望 Flink 能够满足我的要求?

【问题讨论】:

    标签: machine-learning apache-kafka apache-flink flink-streaming flinkml


    【解决方案1】:

    我猜你可以创建一个基于计数的全局窗口并使用 ExecutionEnvironment 来获取一个数据集。像下面这样的东西可能会起作用(getResult 会返回一个 DataSet):

    
          stream.
          keyBy(...).
          window(GlobalWindows.create).
          trigger(CountTrigger.of(10)).
          aggregate(new MyAggregator()).
          ...
    
        class MyAggregator extends AggregateFunction[..., ..., ...] {  
    
          var valueList: List[LabeledVector] = List[LabeledVector]()    
    
          override def createAccumulator(): MyAggregator = new MyAggregator()
          override def add(value: .., accumulator: MyAggregator): ... = ...
          override def merge(agg1: MyAggregator, agg2: MyAggregator): ... = ...
          override def getResult(accumulator: MyAggregator): ... = {
            ExecutionEnvironment.getExecutionEnvironment.fromCollection(valueList)
          }
        }
    

    【讨论】:

      猜你喜欢
      • 2019-06-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-07-30
      • 1970-01-01
      • 2011-10-24
      • 2017-01-05
      • 1970-01-01
      相关资源
      最近更新 更多