【发布时间】:2020-06-17 06:39:00
【问题描述】:
我有一个 kafka 主题,其中向其发布了多种类型的消息(都是 JSON),其标头属性为:- eventType = "abc" 或 eventType = "xyz" 或 eventType = "def" 等。
我的要求是根据在 kafka 中发送的 eventType 标头字段将这些消息从 Kafka 主题发送到不同的 elasticSearch 索引。 例如如果 eventType = "abc",将数据推送到 elasticsearch index = "abc123" if eventType = "xyz",推送数据到elasticsearch index = "xyz123"
我打算使用 kafka connect 进行此操作。我需要帮助如何使用 kafka-connect 实现这一目标?
我正在使用 kafka 1.1.0 并计划使用 confluentinc-kafka-connect-elasticsearch--plugin。
这是我的弹性搜索接收器连接配置文件:-
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=test-elasticsearch-sink
topic.index.map=test-1:elastic_purchase_index1
key.ignore=true
connection.url=<url>
type.name=purchasev1
key.ignore=true
schema.ignore=true
【问题讨论】:
-
你研究过RegexRouter吗?或者,使用 ksqlDB 或 Kafka Streams 将事件过滤到新主题
-
正则表达式路由器将用于过滤事件并应用转换,例如替换字段并放入新主题。这将导致从一个我不想要的源主题放入多个主题。我认为 Kafka 流可以做到这一点(消耗事件然后发送到相应的弹性搜索索引)但想通过 kafka-connect 检查是否可能?
-
您不会使用 Kafka Streams 将数据发送到 Elastic... 参考 -
topic.index.mapdocs.confluent.io/current/connect/kafka-connect-elasticsearch/…
标签: elasticsearch apache-kafka apache-kafka-connect