【问题标题】:Kafka streams - KSQL - Split messages and publish to another topicKafka 流 - KSQL - 拆分消息并发布到另一个主题
【发布时间】:2020-10-03 13:46:09
【问题描述】:

有没有办法使用 KSQL 将一条消息拆分为多条消息并发布到一个新主题。为了清楚起见,我不是在寻找基于 Java 的侦听器并将其迭代/流式传输到新主题;相反,我正在寻找可以为我执行此操作的 KSQL。

例如:

假设,我需要将 invoice 主题中的消息拆分为 item_inventory_delta 消息

发票主题

:saleschecknumber

消息示例:

{
    "total": 12.33,
    "salecounter": 1,
    "items": [
        {
            "itemId": 123,
            "quantity": 1
        },
        {
            "itemId": 345,
            "quantity": 5
        }
    ]
}

item_inventory_delta 主题

:saleschecknumber_itemID

消息示例

1.

{
    "itemId": 123,
    "quantity": 1
}

2.

{
    "itemId": 345,
    "quantity": 5
}

【问题讨论】:

    标签: apache-kafka ksqldb


    【解决方案1】:

    从 ksqlDB 0.6 开始,您现在可以这样做了,这要感谢 EXPLODE table function 中的 the addition

    根据您的示例,给定一个带有 JSON 有效负载的主题 invoice,首先使用 PRINT 检查该主题以转储其内容:

    ksql> PRINT invoice FROM BEGINNING;
    Format:JSON
    {"ROWTIME":1575366231505,"ROWKEY":"null","total":12.33,"salecounter":1,"items":[{"itemId":123,"quantity":1},{"itemId":345,"quantity":5}]}
    

    然后在topic的topic上声明一个schema,这给了我们一个ksqlDB stream

    CREATE STREAM INVOICE (total DOUBLE, 
                           salecounter INT, 
                           items ARRAY<STRUCT<itemId INT, 
                                              quantity INT>>) 
                    WITH (KAFKA_TOPIC='invoice', 
                          VALUE_FORMAT='JSON');
    

    这只是简单地“注册”现有主题以供 ksqlDB 使用。在下一步之前,不会编写新的 Kafka 主题。

    创建一个新的 Kafka 主题,从源流中到达的消息不断填充:

    CREATE STREAM INVENTORY WITH (KAFKA_TOPIC='item_inventory_delta') AS 
      SELECT EXPLODE(ITEMS)->ITEMID AS ITEMID, 
             EXPLODE(ITEMS)->QUANTITY AS QUANTITY 
        FROM INVOICE;
    

    已创建新主题:

    ksql> SHOW TOPICS;
    
     Kafka Topic                     | Partitions | Partition Replicas
    -------------------------------------------------------------------
     invoice                         | 1          | 1
     item_inventory_delta            | 1          | 1
    

    主题有请求的增量消息:)

    ksql> PRINT item_inventory_delta;
    Format:JSON
    {"ROWTIME":1575366231505,"ROWKEY":"null","ITEMID":123,"QUANTITY":1}
    {"ROWTIME":1575366231505,"ROWKEY":"null","ITEMID":345,"QUANTITY":5}
    

    【讨论】:

      【解决方案2】:

      对于KStream 应用程序,您可以使用flatMap,它接受一个函数,该函数接受一个记录并返回一个包含零个或多个记录的可迭代对象:

      case class Record(total: Double, salecounter: Int, items: List[Item])
      case class Item(itemId: Int, quantity: Int)
      
      // initialize the stream 
      val inputStream: KStream[String, Record] = ??? 
      
      // split the message
      inputStream.flatMap { case (key, record) => 
        record.items.map(item => (key, item) )
      }
      

      【讨论】:

      • 感谢@Brandon。我正在寻找更多的非样板方法来实现这一目标。
      【解决方案3】:

      据我了解,有很多处理方法与我们如何处理传入消息而不是聚合消息有关。使用 Kafka 流处理器 API 的简单方法,允许您自定义处理逻辑。

      Kafka Stream Processor API

      处理器 API 允许开发人员定义和连接自定义 处理器并与状态存储进行交互。使用处理器 API, 您可以定义处理接收到的任意流处理器 一次记录,并将这些处理器与其相关联的 状态存储组成处理器拓扑,表示 自定义处理逻辑

      注意:您不必定义输出值,所以我只是发布相同的键和值,但您可以选择定义输出键和值

      您可以如下定义 Kafka 流处理器 API

      Topology builder = new Topology();
      builder.addSource("Source", "invoice")
                      .addProcessor("sourceProcessor", () -> new InvoiceProcessor(), "Source")
                      .addSink("sinkDeltaInvoice", "item_inventory_delta", Serdes.String().serializer(), Serdes.String().serializer(),
                              "sourceProcessor")
      

      以下是自定义处理器方法,请注意它只是一种方法,而不是完整的实现

      class InvoiceProcessor implements Processor<String, String> {
              private Gson gson = new Gson();
      
              //constructor
              .......
              private ProcessorContext context;
      
              @Override
              public void init(ProcessorContext context) {
                  this.context = context;
      
              }
      
              @Override
              public void close() {
                  // Any code for clean up would go here. This processor instance will not be used
                  // again after this call.
              }
      
              @Override
              public void process(String key, String value) {
                  try {
      
                      //Create custom inventory to map JSON object  
                      //List[Item] items is member object of Inventory class
                      Inventory inventory = gson.fromJson(key, Inventory.class);
                      
                      
                      //itertae item of items List[Items]
                      for(Item item: inventory.getItems()){
                      context.forward(gson.toJson(item), gson.toJson(item), To.child("sinkDeltaInvoice"));
                      
                      }
                      //
                      
                      
                      }
      
      
              }
      
          }  
      

      【讨论】:

      • 谢谢。赞成。我应该更清楚我在寻找什么。对不起。更新了我的问题
      猜你喜欢
      • 2019-10-09
      • 1970-01-01
      • 2020-05-09
      • 1970-01-01
      • 2020-01-18
      • 2016-01-17
      • 1970-01-01
      • 2021-06-17
      • 2018-10-27
      相关资源
      最近更新 更多