【问题标题】:Flink to NiFi connectorFlink 到 NiFi 连接器
【发布时间】:2017-07-21 17:54:30
【问题描述】:

我需要一些帮助来使用 Scala 代码将数据从输出 NiFi 端口传输到 Flink。

我被困在.addSource() 函数上。它要求提供其他参数([OUT]),但是当我提供它们时,我不断收到错误消息。 Scala 代码和错误信息如下。

package flinkTest

import java.nio.charset.{Charset, StandardCharsets}

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.connectors.nifi.NiFiSource
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket

import org.apache.nifi.remote.client.{SiteToSiteClient, SiteToSiteClientConfig}

object NifiFlow {
  def main(): Unit = {

    // get the execution environment
    val env: StreamExecutionEnvironment = 
    StreamExecutionEnvironment.getExecutionEnvironment

    // get input data by connecting to NiFi
    val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
      .url("http://localhost:8080/nifi")
      .portName("Data to flink")
      .requestBatchCount(2)
      .buildConfig()

    val nifiSource: SourceFunction[NiFiDataPacket] = new NiFiSource(clientConfig)

这是一块

    val streamSource: DataStream[NiFiDataPacket] = 
    env.addSource(nifiSource).setParallelism(2)

还有一些代码

    val dataStream = streamSource.map(dataPacket => new String(dataPacket.getContent, StandardCharsets.UTF_8))

    dataStream.print()

    env.execute()
  }
}

1) 带 [OUT]

Error:(28, 76) value nifiSource of type org.apache.flink.streaming.api.functions.source.SourceFunction[org.apache.flink.streaming.connectors.nifi.NiFiDataPacket] does not take type parameters.
    val streamSource: DataStream[NiFiDataPacket] = env.addSource(nifiSource[NiFiDataPacket]).setParallelism(2)

2) 没有 [OUT]

Error:(28, 66) type mismatch;
 found   : org.apache.flink.streaming.api.functions.source.SourceFunction[org.apache.flink.streaming.connectors.nifi.NiFiDataPacket]
 required: org.apache.flink.streaming.api.function.source.SourceFunction[?]
    val streamSource: DataStream[NiFiDataPacket] = env.addSource(nifiSource).setParallelism(2)

示例取自 here 并重写为 Scala。

我将不胜感激。

UPD2

package flinkTest

import org.apache.nifi.remote.client.{SiteToSiteClient, SiteToSiteClientConfig}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.nifi._

object NifiFlow {
  def main(): Unit = {

    // get the execution environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // get input data by connecting to NiFi
    val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
      .url("http://localhost:8080/nifi")
      .portName("Data to flink")
      .requestBatchCount(2)
      .buildConfig()

    val nifiSource = new NiFiSource(clientConfig)

    val streamSource: DataStream[String] = env
      .addSource(nifiSource)
      .map(x => x.getAttributes().toString)

    env.execute()
  }
}

错误

Connected to the target VM, address: '127.0.0.1:41218', transport: 'socket'
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Interfaces and abstract classes are not valid types: interface org.apache.flink.streaming.connectors.nifi.NiFiDataPacket
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:871)
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:863)
    at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:406)
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:197)
    at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:184)
    at flinkTest.NifiFlow$.main(NiFiFlow.scala:23)

【问题讨论】:

    标签: scala apache-flink apache-nifi


    【解决方案1】:

    scala有一个特殊的执行环境实现

    org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

    用它代替org.apache.flink.streaming.api.environment.StreamExecutionEnvironment

    【讨论】:

    • 没有帮助。它以某种方式无法转换类型类。错误:(37, 38) 类型不匹配;找到:org.apache.flink.streaming.api.functions.source.SourceFunction[org.apache.flink.streaming.connectors.nifi.NiFiDataPacket] 需要:org.apache.flink.streaming.api .function.source.SourceFunction[?] val streamSource = env.addSource(nifiSource)
    • 能否附上包含修改后代码和完整错误的完整文件?
    • 更新了最初的帖子。代码太长,无法粘贴到 cmets 中。
    • 你的 flink 版本是多少?
    • 我看到 SourceFunction 类的包名有问题。 StreamExecutionEnvironment 需要org.apache.flink.streaming.api.function.source.SourceFunction,我只能在flink v0.8.1 中找到它,而您传递自flink v0.9 以来出现的org.apache.flink.streaming.api.functions.source.SourceFunction。请注意,.function. vs .functions. 我猜你的类路径中有不同的库。
    【解决方案2】:

    env.addSource(nifiSource) 仅适用于先前设置 env.getJavaEnv.getConfig.disableClosureCleaner()

    可能这个开源项目中的 scala 源码应该更新一下(位于 flink-scala_2.11...jar 中)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-07-24
      • 1970-01-01
      • 2023-03-07
      • 2017-10-04
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多