【问题标题】:Apache Nifi/Cassandra - how to load CSV into Cassandra tableApache Nifi/Cassandra - 如何将 CSV 加载到 Cassandra 表中
【发布时间】:2017-01-05 05:02:33
【问题描述】:

我每天有几次收到各种 CSV 文件,存储来自传感器的时间序列数据,这些传感器是传感器站的一部分。每个 CSV 都以其来源的传感器站和传感器 ID 命名,例如“station1_sensor2.csv”。目前,数据是这样存储的:

> cat station1_sensor2.csv
2016-05-04 03:02:01.001000+0000;0;
2016-05-04 03:02:01.002000+0000;0.1234;
2016-05-04 03:02:01.003000+0000;0.2345;

我创建了一个 Cassandra 表来存储它们并能够查询它们以执行各种已识别的任务。 Cassandra 表如下所示:

cqlsh > CREATE KEYSPACE data with replication = {'class' : 'SimpleStrategy', 'replication_factor' : 3};

        CREATE TABLE sensor_data (
        station_id text, // id of the station
        sensor_id text,  // id of the sensor
        tps timestamp,   // timestamp of the measure
        val float,       // measured value
        PRIMARY KEY ((station_id, sensor_id), tps)
        );

我想使用 Apache Nifi 将 CSV 中的数据自动存储到这个 Cassandra 表中,但我找不到正确的示例或方案。我曾尝试使用“PutCassandraQL”处理器,但我在没有任何明确的例子的情况下苦苦挣扎。因此,对于如何使用 Apache Nifi 执行 Cassandra put 查询以将数据插入表中的任何帮助将不胜感激!

【问题讨论】:

    标签: cassandra cql data-integration apache-nifi


    【解决方案1】:

    TL;DR 我有一个 NiFi 1.0 模板可以在 GistNiFi Wiki 上完成此操作。

    NiFi 鼓励非常模块化的设计,因此让我们将其分解为更小的任务,我将描述一个可能的流程并根据您的用例解释每个处理器的用途:

    1. 读入 CSV 文件。这可以通过 GetFile 完成,或者最好是 ListFile -> FetchFile。在我的示例中,我使用脚本处理器在线创建一个流文件,其中包含您上面的示例数据。这使我的模板便于其他人使用。

    2. 解析文件名以获取站点和传感器字段。这使用NiFi Expression Language 获取下划线之前(用于站)和下划线之后(减去 CSV 扩展名)用于传感器的文件名部分。

    3. 将单个 CSV 流文件拆分为每行一个流文件。这样做是为了以后我们可以创建单独的 CQL INSERT 语句。

    4. 从每一行中提取列值。我为此使用了 ExtractText 和正则表达式,如果您有非常复杂的逻辑,您可能需要检查脚本处理器,例如 ExecuteScript

    5. 更改时间戳。 IIRC,CQL 不接受时间戳文字的微秒。您可以尝试解析微秒(最好在 ExecuteScript 处理器中完成)或重新格式化时间戳。请注意,由于无法解析微秒,因此“重新格式化”会导致在我的示例中截断所有小数秒。

    6. 构建 CQL INSERT 语句。此时数据(无论如何在我的模板中)都在流文件属性中,原始内容可以替换为 CQL INSERT 语句(这是PutCassandraQL 期望的方式)。您可以将数据保存在属性中(使用 UpdateAttribute 正确命名它们,请参阅 PutCassandraQL 文档)并使用准备好的语句,但恕我直言,编写显式 CQL 语句更简单。在撰写本文时,PutCassandraQL 没有缓存 PreparedStatements,因此现在以这种方式执行操作实际上性能较低。

    7. 使用 PutCassandraQL 执行 CQL 语句。

    我没有详细说明我的属性名称等,但是当流程到达 ReplaceText 时,我有以下属性:

    • station.name:包含从文件名解析的站名
    • sensor.name:包含从文件名解析的传感器名称
    • tps:包含更新的时间戳值
    • columns.2:包含(大概)传感器读数的值

    ReplaceText 将内容设置为以下内容(使用表达式语言填写值):

    insert into sensor_data (station_id, sensor_id, tps, val) values ('${station.name}', '${sensor.name}', '${tps}', ${column.2})
    

    希望对您有所帮助,如果您有任何疑问或问题,请告诉我。干杯!

    【讨论】:

    • 非常感谢,在修改 Cassandra 参数以连接到集群后,它对我有用。你帮了大忙!
    猜你喜欢
    • 2021-02-16
    • 1970-01-01
    • 2018-09-13
    • 2018-07-25
    • 2018-08-04
    • 2014-09-25
    • 2014-02-02
    • 1970-01-01
    • 2019-11-09
    相关资源
    最近更新 更多