【问题标题】:Influxdb bulk insert using influxdb-python使用 influxdb-python 的 Influxdb 批量插入
【发布时间】:2020-07-07 05:39:33
【问题描述】:

我用influxDB-Python插入了大量从Redis-Stream读取的数据。因为 Redis-stream 并设置 maxlen=600 并且数据以100ms 的速度插入,我需要保留它的所有数据。所以我读取并将其传输到 influxDB(我不知道什么是更好的数据库),但只使用批量插入 ⌈count/batch_size⌉ 条数据,两者都在每个 batch_size 的末尾,似乎被覆盖。以下代码

import redis
from apscheduler.schedulers.blocking import BlockingScheduler
import time
import datetime

import os
import struct
from influxdb import InfluxDBClient

def parse(datas):
    ts,data = datas
    w_json = {
    "measurement": 'sensor1',
    "fields": {
        "Value":data[b'Value'].decode('utf-8')
        "Count":data[b'Count'].decode('utf-8')
        }
    }
    return w_json

def archived_data(rs,client):
    results= rs.xreadgroup('group1', 'test', {'test1': ">"}, count=600)
    if(len(results)!=0):
        print("len(results[0][1]) = ",len(results[0][1]))
        datas = list(map(parse,results[0][1]))
        client.write_points(datas,batch_size=300)
        print('insert success')
    else:
        print("No new data is generated")

if __name__=="__main__":
    try:
        rs = redis.Redis(host="localhost", port=6379, db=0)
        rs.xgroup_destroy("test1", "group1")
        rs.xgroup_create('test1','group1','0-0')
    except Exception as e:
        print("error = ",e)
    try:
        client = InfluxDBClient(host="localhost", port=8086,database='test')
    except Exception as e:
        print("error = ", e)
    try:
        sched = BlockingScheduler()
        sched.add_job(test1, 'interval', seconds=60,args=[rs,client])
        sched.start()
    except Exception as e:
        print(e)

influxDB 的数据变化如下

> select count(*) from sensor1;
name: sensor1
time count_Count count_Value
---- ----------- -----------
0    6           6
> select count(*) from sensor1;
name: sensor1
time count_Count count_Value
---- ----------- -----------
0    8           8

> select Count from sensor1;
name: sensor1
time                Count
----                -----
1594099736722564482 00000310
1594099737463373188 00000610
1594099795941527728 00000910
1594099796752396784 00001193
1594099854366369551 00001493
1594099855120826270 00001777
1594099913596094653 00002077
1594099914196135122 00002361

为什么数据似乎被覆盖了,我该如何解决它以一次插入所有数据?

如果您能告诉我如何解决,我将不胜感激?

【问题讨论】:

    标签: python-3.x redis influxdb influxdb-python redis-streams


    【解决方案1】:

    您能否提供有关您希望存储在流入数据库中的数据结构的更多详细信息? 不过,我希望以下信息对您有所帮助。

    在 Influxdb 中,timestamp + tags 是唯一的(即标签值和时间戳相同的两个数据点不能存在)。与 SQL influxdb 不同,它不会引发唯一约束冲突,它会用传入的数据覆盖现有数据。您的数据似乎没有标签,因此如果一些传入数据的时间戳已经存在于 influxdb 中,则会覆盖现有数据

    【讨论】:

    • 感谢您的帮助。我没有打标签,但是我删除了我的标签,因为我遇到了以下错误"error":"partial write: max-values-per-tag limit exceeded (100128/100000)
    • 标签被索引所以标签的基数不应该很高。但是为了使数据保持不同,应该有一些其他参数,对吧?您需要将其添加为标签。如果您可以发布一些示例数据,我可以帮助您选择标签,以免数据被覆盖
    • 将 Redis 流数据传输到 influxDB。 Ts 是一个字符串,表示将插入 redis 的时间。 Chans是一个int类型的数组,代表通道序号; Type 是表示数据类型的字符串; Shape是一个长度为2的整数列表,表示矩阵的Shape信息; Units 是表示频道单元信息的字符串数组; Names 是表示频道名称的字符串列表; Value是一个长度为3072的浮点数列表,代表传感器采集到的数据。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2023-03-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-18
    • 2017-09-26
    相关资源
    最近更新 更多