【发布时间】:2021-11-16 22:05:39
【问题描述】:
我正在尝试将来自 Kafka 主题的数据实时提取到 Cassandra 表中。为此,我使用 DataStax Cassandra Sink 连接器。主题中事件的格式将是 JSON。我可以将主题中事件的 JSON 字段直接映射到表中,但这并不是我所需要的。
我的场景是这样的: 有一个主题需要映射到多个 Cassandra 表。主题中的事件应该可以根据某些条件进入表格。假设有具有不同类型标头 A、B 和 C 的事件。具有标头 A 的事件需要转到 Cassandra 表 A,标头 B 事件需要转到表 B,标头 C 到表 C。在连接器配置 JSON 文件中,我我能做到吗?
我对单个主题和单个表进行了一些尝试,并尝试在“ID”字段中提取具有特定条件的事件。
我当前的连接器配置文件:
{
"name": "cassandra-json-sink",
"config": {
"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
"tasks.max": "1",
"topics": "json_test_topic",
"contactPoints": "cassandra",
"loadBalancing.localDc": "datacenter1",
"port": 9042,
"auth.username": "cassandra",
"auth.password": "cassandra",
"topic.json_test_topic.kconnect_json.customer.mapping": "id=key, name=value.name, lname=value.lname, adress=value.adress",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false
}
}
Cassandra 表的详细信息:
USE kconnect_json;
CREATE TABLE customer (id TEXT PRIMARY KEY, name TEXT, lname TEXT, adress TEXT);
我发布到我的主题“json_test_topic”的示例消息:
abc:{"name":"john", "lname":"doe", "adress":"WY"}
efg:{"name":"wanda", "lname":"hill", "adress":"CA"}
在这个简单的尝试中,我想将带有“name”字段 =“john”的事件提取到我的 Cassandra 表中。我已将连接器的先前配置文件更改为:
{
"name": "cassandra-json-sink",
"config": {
"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
"tasks.max": "1",
"topics": "json_test_topic",
"contactPoints": "cassandra",
"loadBalancing.localDc": "datacenter1",
"port": 9042,
"auth.username": "cassandra",
"auth.password": "cassandra",
"topic.json_test_topic.kconnect_json.customer.mapping": "id=key, name=value.name, lname=value.lname, adress=value.adress",
"topic.json_test_topic.kconnect_json.customer.query": "INSERT INTO kconnect_json.customer(id, name, lname, adress) SELECT :id, :name, :lname, :adress FROM topic.json_test_topic WHERE :name = 'john';",
"topic.json_test_topic.kconnect_json.musteri.deletesEnabled": false,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false
}
}
在执行此操作时,我尝试使用此文档并稍微自定义示例:https://docs.datastax.com/en/kafka/doc/kafka/kafkaCqlQuery.html
但是这种自定义不起作用,因为 Cassandra 端的查询无效。我无法在 Cassandra 上找到带有内部 SELECT 语句的 INSERT 语句,所以这可能是个问题,但我不知道替代方案。
如果有人可以提供帮助,我会很高兴。
【问题讨论】:
标签: apache-kafka cassandra apache-kafka-connect