【问题标题】:Getting unspecified value parameter scala Kafka获取未指定的值参数scala Kafka
【发布时间】:2018-09-08 09:13:59
【问题描述】:

您好,我收到此编译错误 -

Unspecified Value parameters: aggregator: (String, String, NotInferedVR) => NotInferedVR

但我显然已经有一个聚合器。有谁知道怎么回事?

 val stream = builder
    .stream(inputTopic)(Consumed.`with`(Serdes.String(), Serdes.ByteArray()))
    .map{ (key: String, value: Array[Byte]) =>
      println(s"key = ${key}")
      val lv = GroupByAction.convertByteArrayToJsonObject(value)
      val lst = List.empty[String]
      val newKey = GroupByAction
        .reKey(lv
          , groupByColumnList
            .asScala
            .toList
          ,lst)
      val newValue = getValFromJSONMessage(lv, aggregateColumnList.asScala.toList.head)
      println(s"newKey = ${newKey}")
      (newKey, newValue)}
    .groupByKey(Serialized.`with`(Serdes.String(), Serdes.String()))
    .aggregate{ () => 0.toString, (k,v,vr: Long) => (v.toString.toLong + vr.toString.toLong).toString }

【问题讨论】:

    标签: scala apache-kafka dsl apache-kafka-streams kafka-streams-scala


    【解决方案1】:

    您只能使用{} 形式的方法调用来传递单个参数,该参数被视为一个块。您需要传递两个参数,因此请改用()

    aggregate( () => 0.toString, (k,v,vr: Long) => (v.toString.toLong + vr.toString.toLong).toString )
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-11-05
      • 2013-07-19
      • 2013-10-04
      • 1970-01-01
      • 2014-05-19
      • 2017-01-29
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多