【问题标题】:DataStax Cassandra Sink Connector - Ingesting data from a Kafka topic according to conditionsDataStax Cassandra Sink Connector - 根据条件从 Kafka 主题中摄取数据
【发布时间】:2021-11-16 22:05:39
【问题描述】:

我正在尝试将来自 Kafka 主题的数据实时提取到 Cassandra 表中。为此,我使用 DataStax Cassandra Sink 连接器。主题中事件的格式将是 JSON。我可以将主题中事件的 JSON 字段直接映射到表中,但这并不是我所需要的。

我的场景是这样的: 有一个主题需要映射到多个 Cassandra 表。主题中的事件应该可以根据某些条件进入表格。假设有具有不同类型标头 A、B 和 C 的事件。具有标头 A 的事件需要转到 Cassandra 表 A,标头 B 事件需要转到表 B,标头 C 到表 C。在连接器配置 JSON 文件中,我我能做到吗?

我对单个主题和单个表进行了一些尝试,并尝试在“ID”字段中提取具有特定条件的事件。

我当前的连接器配置文件:

{
  "name": "cassandra-json-sink",
  "config": {
    "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
    "tasks.max": "1",
    "topics": "json_test_topic",
    "contactPoints": "cassandra",
    "loadBalancing.localDc": "datacenter1",
    "port": 9042,
    "auth.username": "cassandra",
    "auth.password": "cassandra",
    "topic.json_test_topic.kconnect_json.customer.mapping": "id=key, name=value.name, lname=value.lname, adress=value.adress",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": false,
    "value.converter.schemas.enable": false
  }
}

Cassandra 表的详细信息:

USE kconnect_json;
CREATE TABLE customer (id TEXT PRIMARY KEY, name TEXT, lname TEXT, adress TEXT);

我发布到我的主题“json_test_topic”的示例消息:

abc:{"name":"john", "lname":"doe", "adress":"WY"}
efg:{"name":"wanda", "lname":"hill", "adress":"CA"}

在这个简单的尝试中,我想将带有“name”字段 =“john”的事件提取到我的 Cassandra 表中。我已将连接器的先前配置文件更改为:

{
  "name": "cassandra-json-sink",
  "config": {
    "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
    "tasks.max": "1",
    "topics": "json_test_topic",
    "contactPoints": "cassandra",
    "loadBalancing.localDc": "datacenter1",
    "port": 9042,
    "auth.username": "cassandra",
    "auth.password": "cassandra",
    "topic.json_test_topic.kconnect_json.customer.mapping": "id=key, name=value.name, lname=value.lname, adress=value.adress",
    "topic.json_test_topic.kconnect_json.customer.query": "INSERT INTO kconnect_json.customer(id, name, lname, adress) SELECT :id, :name, :lname, :adress FROM topic.json_test_topic WHERE :name = 'john';",
    "topic.json_test_topic.kconnect_json.musteri.deletesEnabled": false,
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": false,
    "value.converter.schemas.enable": false
  }
}

在执行此操作时,我尝试使用此文档并稍微自定义示例:https://docs.datastax.com/en/kafka/doc/kafka/kafkaCqlQuery.html

但是这种自定义不起作用,因为 Cassandra 端的查询无效。我无法在 Cassandra 上找到带有内部 SELECT 语句的 INSERT 语句,所以这可能是个问题,但我不知道替代方案。

如果有人可以提供帮助,我会很高兴。

【问题讨论】:

    标签: apache-kafka cassandra apache-kafka-connect


    【解决方案1】:

    Cassandra 的 kafka-sink 连接器不支持条件映射。

    正如我们之前向您建议的那样,连接器可以map a topic to multiple CQL tables,但不可能对消息的映射方式应用条件。干杯!

    【讨论】:

      猜你喜欢
      • 2021-11-20
      • 2021-06-03
      • 2019-04-16
      • 2016-05-27
      • 2018-11-02
      • 2021-10-23
      • 2021-07-26
      • 2018-03-01
      • 2021-10-12
      相关资源
      最近更新 更多