【问题标题】:How to use Kafka Connect for Cassandra without Confluent如何在没有 Confluent 的情况下使用 Kafka Connect for Cassandra
【发布时间】:2017-07-23 08:03:04
【问题描述】:

我们如何在不使用 Confluent 框架的情况下将 Kafka Connect 与 Cassandra 结合使用。

【问题讨论】:

    标签: cassandra apache-kafka apache-kafka-connect


    【解决方案1】:

    Kafka Connect 框架。 Confluent 仅提供连接器。如果您不想使用 Confluent Open Source(但为什么不呢?),您也可以将所有这些连接器与 vanilla Apache Kafka 一起使用。

    有多个 Casandra 连接器可用:https://www.confluent.io/product/connectors/

    顺便说一句:列出的 Casandra 连接器都不是由 Confluent 维护的。

    当然,您也可以编写自己的连接器或使用任何其他第三方连接器。

    【讨论】:

    • 在我的例子中,访问数据库的基本概念是使用 SQL/CQL 查询。 connect 还对数据库执行查询以存储数据或获取数据。如果我建立一个消费者组,一个用于处理,另一个用于将其存储到 DB,那么一个用于存储到 DB,例如 DB-Consumer,它的工作是只将数据存储到我可以使用 ORM 轻松完成的数据库中,我也会完全透明并对其进行控制。所以我担心的是它在性能和速度方面与这种类型的(DB-consumer)消费者实际上有何不同。提前感谢您在这方面的帮助和帮助。
    • Connect as a framework 负责故障转移,您还可以在分布式模式下运行它以扩展您的数据导入/导出“作业”。因此,Connect 确实是一种“一劳永逸”的体验。此外,对于 Connect,您无需编写任何代码——您只需配置连接器。
    • Confluent cp-kafka-connect 没有 cassandra 连接器
    • 您可以在 Confluent Hub 上找到 Cassandra 连接器:confluent.io/connector/kafka-connect-cassandra
    【解决方案2】:

    DataMountaineer Stream Reactor 具有可与 Kafka Connect 一起使用的 Cassandra Source 和 Sink 解决方案。

    将 jar 文件 (download) 放入 Kafka libs 文件夹,然后按如下方式指定您的连接器:

    {
    "name": "cassandra-NAME",
    "config": {
        "tasks.max": "1",
        "connector.class": "com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector",
        "connect.cassandra.key.space": "KEYSPACE",
        "connect.cassandra.source.kcql": "INSERT INTO KAFKA_TOPIC SELECT column1, timestamp_col FROM CASSANDRA_TABLE PK timestamp_col",
        "connect.cassandra.import.mode": "incremental",
        "connect.cassandra.contact.points": "localhost",
        "connect.cassandra.port": 9042,
        "connect.cassandra.import.poll.interval": 10000
    }}
    

    启动 Kafka Connect

    bin/connect-distributed.sh config/connect-distributed.properties
    

    并通过上面提到的 JSON 属性文件将 Cassandra 连接器加载到 Kafka Connect(假设它的名称为 connect-cassandra-source.json)

    curl -X POST -H "Content-Type: application/json" -d @config/connect-cassandra-source.json localhost:8083/connectors
    

    您将需要创建一个具有 timeuuid 列作为集群键的表。这被描述为here

    【讨论】:

      猜你喜欢
      • 2020-11-23
      • 2019-06-16
      • 2019-11-19
      • 1970-01-01
      • 2016-09-14
      • 1970-01-01
      • 2020-11-04
      • 2021-08-07
      • 2019-11-14
      相关资源
      最近更新 更多