【发布时间】:2021-01-05 09:13:08
【问题描述】:
我有 node-red-S7PLC 链接以 1.5 秒的周期将以下数据推送到 InfluxDB。
msg.payload = {
name: 'PLCTEST',
level1_m: msg.payload.a90, "value payload from PLC passed to influx"
power1: msg.payload.a93,
valvepos_%: msg.payload.a107,
temp1: msg.payload.a111,
washer_acidity: msg.payload.a113,
etc.
}
return msg;
总共 130 个单独的数据点,由警报和按钮按下等二进制状态和测量值(温度、压力、流量...)组成
这已经运行了一周,作为数据库写入的压力测试。写作似乎很好,但我注意到,如果我在 Grafana 仪表板中从 30 分钟查询窗口的 10 个温度测量值切换到 3 小时查询,加载时间开始变得非常长。 12小时窗口是不行的。我假设这是因为我所有的东西都是作为字段键和字段值推送的。如果没有索引,这会给数据库带来压力。
Grafana 查询检查器为每个测量查询提供 1081 行,因此 x10 = 10810 行/dasboard_query。但是整个池的涌入必须经过 130 次测量 x 1081 = 140530 行 / 3 小时窗口。
我想获得一些关于如何优化架构的建议。我有以下想法。
数据库:应用程序名称X
测量:Process_metrics,
标签:温度、压力、流量、%、液位、酸度、功率
标签值:CT-xx1...CT-xxn、CP-xx1...CP-xxn、CF-xx1...CF-xxn、....
字段键=值,字段值=值
测量:Alarms_On,
字段键=状态,字段值=“真”,“假”
测量:Binary_ON
字段键:状态,fieldvalue="trues", "false"
这将是节点红色的几个临时工(我认为):
msg.payload = [{
Value: msg.payload.xxx, "value payload from PLC passed to influx"
Value: msg.payload.xxx,
Value: msg.payload.xxx
},
{
Temp:"CT_xx1",
Temp:"CT_xx2",
Temp:"CT_xx2"
}];
return msg;
编辑:关注 Roberts cmets。
在写到这里之前,我在网上阅读了一周的潮人手册和其他示例。在某些方面,influx 与正常的 SQL 思维方式完全不同和独特,我确实觉得这非常困难。但我确实在周末有一些清晰的时刻。
我认为以下会更合适。
DB: Station_name
measurements: Process_metrics,Alarms, Binary.
Tags: "SI_metric"
Values= "Temperature", "Pressure" etc.
Fieldkey: "proces_position"= CT/P/F_xxx.
values= process_values
这应该可以防止基数与我最初的想法发生冲突。
我认为警报和二进制文件只能保留为字段键/字段值,并将它们分开到自己的测量值应该提供足够的过滤。这些也仅在状态更改时记录,因此与 1s 周期的类似物相比,对数据库的输入要少得多。
按照我原来的节点红色流代码,这将转化为批量输出功能:
msg.payload = [
{
measurement: "Process_metrics",
fields: {
CT_xx1: msg.payload.xxx,
CT_xx2: msg.payload.xxx,
CT_xx3: msg.payload.xxx
},
tags:{
metric:"temperature"
},
{
measurement: "Process_metrics",
fields: {
CP_xx1: msg.payload.xxx,
CP_xx2: msg.payload.xxx,
CP_xx3: msg.payload.xxx
},
tags:{
metric:"pressure"
},
{
measurement: "Process_metrics",
fields: {
CF_xx1: msg.payload.xxx,
CF_xx2: msg.payload.xxx,
CF_xx3: msg.payload.xxx
},
tags:{
metric:"flow"
},
{
measurement: "Process_metrics",
fields: {
AP_xx1: msg.payload.xxx,
AP_xx2: msg.payload.xxx,
AP_xx3: msg.payload.xxx
},
tags:{
metric:"Pumps"
},
{
measurement: "Binary_states",
fields: {
Binary1: msg.payload.xxx,
Binary2: msg.payload.xxx,
Binary3: msg.payload.xxx
},
{
measurement: "Alarms",
fields: {
Alarm1: msg.payload.xxx,
Alarm2: msg.payload.xxx,
Alarm3: msg.payload.xxx
}
];
return msg;
编辑 2:
在测试我的上述想法并进一步完善后的最终想法。
我的第二个想法没有按预期工作。 Grafana 变量的最后一步不起作用,因为流程数据在字段中具有所需的信息,而不是标签。这使得 Grafana 方面对 rexec 查询感到厌烦,以从字段中获取 plc 标签名称信息以链接到 grafana 变量下拉列表。因此再次运行资源密集型字段查询。
我偶然发现了一篇关于如何用 TSDB 理清思路的博客文章,上面的想法仍然过于类似于 SQL 的方法来使用 TSDB 处理数据。我进一步改进了数据库结构,似乎在不同步骤(PLC->NodeRed->influxDB->Grafana)和数据库上的查询负载中找到了编码时间的折衷方案。从写入和查询压力时的 1gb 内存使用量到正常使用测试中的 100-300MB。
目前正在测试中:
Python 脚本将 PLC 端标签和描述从 csv 压缩为 Node-Red 的可复制粘贴格式。从 csv 中提取温度测量值并格式化为 nodered 的示例。
import pandas as pd
from pathlib import Path
file1 = r'C:\\Users\\....pandastestcsv.csv
df1 = pd.read_csv(file1, sep=';')
dfCT= df1[df1['POS'].str.contains('CT', regex=False, na=False)]
def my_functionCT(x,y):
print( "{measurement:"+'"temperature",'+"fields:{value:msg.payload."+ x +",},tags:{CT:\"" + y +'\",},},' )
result = [my_functionCT(x, y) for x, y in zip(dfCT['ID'], dfCT['POS'])]
输出是来自 CSV 的所有温度测量 CT。 {measurement:"temperature",fields:{value:msg.payload.a1,},tags:{CT:"tag description with process position CT_310",},},
此列表可以复制粘贴到 Node-Red 数据链路有效负载到 influxDB。
涌入数据库:
数据库:PLCTEST
测量:温度、压力、流量、泵、阀门、警报、on_off....
标签键:CT、CP、CF、misc_mes....
tag-field:“变量的 PLC 描述”
字段键:值
field-value: "来自 PLC 有效负载的过程测量值"
这可以在合理范围内检查每次测量的基数,并且可以更好地针对相关数据进行查询,而无需遍历整个数据库。 Ram 和 CPU 负载现在很小,并且在 Grafana 负载中从 1 小时跳到 12 小时的查询在几秒钟内没有锁定。
【问题讨论】:
-
尝试将原始 Influx 行存储在 VictoriaMetrics 中。它应该在大时间范围内提供更好的查询延迟。详情请参阅these docs 和these benchmarks。