【问题标题】:Kafka Connect Filter TransformerKafka Connect 过滤器转换器
【发布时间】:2021-12-31 15:43:58
【问题描述】:

我正在使用 Kafka-connect,我想过滤掉一些消息。

这就是 Kafka 中单个消息的样子。

{
  "code": 2001,
  "time": 11111111,
  "requestId": "123456789",
  "info": [
    {
      "name": "dan",
      "value": 21
    }
  ]
}

这就是我的变压器的样子

transforms:
  transforms: FilterByCode
  transforms.FilterByCode.type: io.confluent.connect.transforms.Filter$Value
  transforms.FilterByCode.filter.condition: $[?(@.code == 2000)]
  transforms.FilterByCode.filter.type: include

我想过滤掉代码值不是 2000 的消息。 我尝试了几种不同的条件语法,但找不到任何有效的语法。 所有消息都被转换,没有一个被过滤。

关于我应该如何使用过滤条件的语法有什么想法吗?

【问题讨论】:

  • 您展示的这种 YAML 格式是什么? Connect API 不使用它
  • @OneCricketeer 这是一个内部基础设施。这是有效的(我写了自己的转换器,但我想添加 Kafka-connect 过滤条件,这就是我所缺少的。

标签: apache-kafka apache-kafka-connect


【解决方案1】:

如果您想过滤掉所有非 2000 代码,请尝试[?(@.code != 2000)] 你可以在这里测试它 - http://jsonpath.herokuapp.com/

【讨论】:

    猜你喜欢
    • 2018-03-24
    • 2019-06-15
    • 2016-08-10
    • 1970-01-01
    • 2021-01-13
    • 1970-01-01
    • 2020-05-29
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多