【问题标题】:Dump series back into InfluxDB after querying with replaced field value使用替换字段值查询后将系列转储回 InfluxDB
【发布时间】:2019-02-10 19:03:16
【问题描述】:

场景

我想通过从 InfluxDB 查询测量值来将数据发送到 MQTT 代理(云)。

我在名为status 的架构中有一个字段。它可以是10status=0 表示该系列尚未发送到云端。如果我得到 MQTT 代理的确认,那么我希望使用 status=1 将查询重写回数据库。

FAQs for InfluxDB regarding Duplicate data 中所述,如果信息与之前的查询具有相同的时间戳但具有不同的字段值 => 则将显示更新字段。

为了测试这一点,我创建了以下内容:

CREATE DATABASE dummy
USE dummy
INSERT meas_1, type=t1, status=0,value=123 1536157064275338300

查询:

SELECT * FROM meas_1

提供

time                status type value         
1536157064275338300 0      t1   234      

现在,如果我想覆盖该系列,我会执行以下操作:

INSERT meas_1, type=t1, status=1,value=123 1536157064275338300                                                                       

这将覆盖系列

 time                status type value         
 1536157064275338300 1      t1   234     

(注意:目前 InfluxDB 中的 标签 无法做到这一点)

用法

  1. 使用"status"=0客户端查询一些信息。
  2. 重组 JSON 以发送到云端
  3. 将信息发送到云端
  4. 如果成功,则将步骤 1. 的输出写回 DB,但使用 status=1

我正在使用InfluxDBClient Python3 创建应用程序(MQTT + InfluxDB)

write_points API 中有一个参数提到batch_size,需要int 作为输入。

我不确定如何将它与我想要的应用程序一起使用。有人可以指导我使用这个或数据库的架构,以便我可以将实际和非冗余信息上传到云端吗?

【问题讨论】:

    标签: python-3.x influxdb influxdb-python


    【解决方案1】:

    batch_size 实际上是需要传递给write_points 的测量列表的长度。

    步骤

    1. 创建客户端并从测量中查询(这里我们查询gps信息)

      client = InfluxDBClient(database='dummy')
      
      op = client.query('SELECT * FROM gps WHERE "status"=0', epoch='ns')
      
    2. ResultSet 放入列表中:

       batch = list(op.get_points('gps'))
      
    3. 为更新创建一个空列表

       updated_batch = []
      
    4. 解析每个测量并将status 标志更改为1。注意,InfluxDB 中的默认值是浮点数

         for each in batch:
      new_mes = {
      'measurement': 'gps',
      'tags': {
      'type': 'gps'
      },
      'time': each['time'],
      'fields': {
        'lat': float(each['lat']),
        'lon': float(each['lon']),
        'alt': float(each['alt']),
        'status': float(1)
      }
      }
      updated_batch.append(new_mes)
      
    5. 最后通过客户端以batch_size作为updated_batch的长度转储积分

      client.write_points(updated_batch, batch_size=len(updated_batch))
      

    这会覆盖系列,因为它包含相同的时间戳,status 字段设置为 1

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2016-05-11
      • 2021-10-29
      • 1970-01-01
      • 1970-01-01
      • 2021-04-18
      • 2021-06-03
      • 2021-06-26
      相关资源
      最近更新 更多