【问题标题】:Kafka streams with ScalaKafka 使用 Scala 流式传输
【发布时间】:2018-02-13 02:22:38
【问题描述】:

我正在尝试将 kafka 流与 scala 一起使用 下面是我的 Java 代码,它运行良好:

KStreamBuilder builder = new KStreamBuilder();
    KStream<String, String> textLines = builder.stream("TextLinesTopic");
    textLines.foreach((key,values) -> {
        System.out.println(values);
    });

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();

我的scala代码如下:

  val builder = new KStreamBuilder
  val textLines:KStream[String, String]  = builder.stream("TextLinesTopic")
  textLines.foreach((key,value)-> {
   println(key)
  })

  val streams = new KafkaStreams(builder, config)
  streams.start()

scala 代码抛出编译错误。预期类型不匹配:ForEachAction[>String,>String],Actual((any,any), Unit) 未找到:值键 未找到:值值

有谁知道如何在 scala 中使用流 API

【问题讨论】:

    标签: scala apache-kafka kafka-consumer-api apache-kafka-streams


    【解决方案1】:

    您可以直接使用打印方法打印kafkastream。

    textlines.print

    它将打印 kafka 流。您甚至可以通过将参数传递给打印函数来打印键或值。

    【讨论】:

      【解决方案2】:

      您的语法错误:)。 -&gt; 只是创建对的运算符,所以表达式

      (key,value)-> {
        println(key)
      }
      

      具有类型 ((Any, Any), Unit),因为编译器无法推断任何类型信息(并且缺少 keyvalue

      如果您使用 scala 2.12 将 -&gt; 替换为 =&gt; 应该可以解决问题,但如果您使用旧版本的 scala,则必须显式实现 java bifunction:

       textLines.foreach(new BiFunction[T1, T2] { ... })
      

      【讨论】:

        猜你喜欢
        • 2019-05-13
        • 2020-07-21
        • 1970-01-01
        • 1970-01-01
        • 2018-06-25
        • 2016-06-04
        • 2021-07-28
        • 2018-10-06
        • 2018-08-09
        相关资源
        最近更新 更多