【问题标题】:Parse record (PCF) from Kafka using Kafka Kusto Sink使用 Kafka Kusto Sink 从 Kafka 解析记录 (PCF)
【发布时间】:2019-05-31 00:39:35
【问题描述】:

我已经使用基于 this guide 的 docker 设置了我的环境。

在 kafka-console-producer 上,我将发送此行:

Hazriq|27|Undegrad|UNITEN

我希望像这样将这些数据提取到 Kusto:

+--------+-----+----------------+------------+
| Name   | Age | EducationLevel | University |
+--------+-----+----------------+------------+
| Hazriq | 27  | Undegrad       | UNITEN     |
+--------+-----+----------------+------------+

这可以由 Kusto 使用映射来处理(我仍在努力理解)还是应该由 Kafka 来处理?


尝试了@daniel 的建议:

.create table ParsedTable (name: string, age: int, educationLevel: string, univ:string)

.create table ParsedTable ingestion csv mapping 'ParsedTableMapping' '[{ "Name" : "name", "Ordinal" : 0},{ "Name" : "age", "Ordinal" : 1 },{ "Name" : "educationLevel", "Ordinal" : 2},{ "Name" : "univ", "Ordinal" : 3}]'

kusto.tables.topics_mapping=[{'topic': 'kafkatopiclugiaparser','db': 'kusto-test', 'table': 'ParsedTable','format': 'psv', 'mapping':'ParsedTableMapping'}]
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter

但得到这个:

+----------------------------+-----+----------------+------+
| Name                       | Age | EducationLevel | Univ |
+----------------------------+-----+----------------+------+
| Hazriq|27|Undergrad|UNITEN |     |                |      |
+----------------------------+-----+----------------+------+

【问题讨论】:

    标签: apache-kafka azure-data-explorer


    【解决方案1】:

    目前,连接器在数据到来时传递数据(在客户端没有对其进行操作),任何解析都留给 Kusto。

    因此,kusto 支持psv 格式,应该可以通过将格式设置为psv 并提供映射参考。

    将插件添加为described 时,您应该可以这样设置:

    kusto.tables.topics_mapping=[{'topic': 'testing1','db': 'testDB', 'table': 'KafkaTest','format': 'psv', 'mapping':'KafkaMapping'}]
    

    映射可以在 Kusto 中定义,如 Kusto 文档defined like so中所述

    【讨论】:

    • 你好丹尼尔。我已经用psvcsv 尝试过你的建议,但我得到:``` +------------------------- ---+-----+----------------+------+ |姓名 |年龄 |教育程度 |大学 | +----------------------------+-----+-------------- --+--------+ | Hazriq|27|本科|UNITEN | | | | +----------------------------+-----+-------------- --+--------+ ```
    • 您可以尝试使用控制台生产者发送一个 csv 格式的值吗? Hazriq,27,Undegrad,UNITEN ? (它确实需要重新启动 Kafka 才能获取新设置)
    • csv 似乎工作正常。设法正确摄取它。但不适用于 psv。
    【解决方案2】:

    支持您使用psv 格式显示的数据摄取(见下文) - 这可能只是调试为什么您的客户端调用底层命令没有产生预期结果的问题。如果您可以分享完整的流程和代码,包括参数,它可能会有所帮助。

    .create table ParsedTable (name: string, age: int, educationLevel: string, univ:string)
    
    .ingest inline into table ParsedTable with(format=psv) <| Hazriq|27|Undegrad|UNITEN
    
    
    ParsedTable:
    
    | name   | age | educationLevel | univ   |
    |--------|-----|----------------|--------|
    | Hazriq | 27  | Undegrad       | UNITEN |
    

    【讨论】:

      猜你喜欢
      • 2019-07-09
      • 1970-01-01
      • 2019-03-07
      • 2017-11-17
      • 2020-05-28
      • 2020-09-18
      • 2021-01-17
      • 1970-01-01
      • 2023-03-12
      相关资源
      最近更新 更多