【问题标题】:Detecting Pattern in Real Time Data array in Python在 Python 中检测实时数据数组中的模式
【发布时间】:2026-01-11 08:35:01
【问题描述】:

我正在尝试检测实时数据(时间序列)中的特定模式。对于可视化,我将在这里分两部分展示数据。

Pattern:我正在尝试按时间序列搜索,

DataWindow: 数据缓冲区(窗口)我实时滑动以跟踪历史。

这是我记录的数据(红色框显示我想要检测的模式),但这可能会有所不同,因为它是实时的:

上述数据没有太多噪音(至少对于这个集合而言) - 就我的分辨率而言,峰值(也许我会说正弦峰值)乍一看是可以区分的。这就是为什么应用移动平均滤波器对我没有帮助的原因。

下图显示了一些来自实时数据的样本,但在保存的数据中,绘图仪应用外推法绘制连续图。一般来说,数据样本看起来像下面的图像,或者可能比这个图像具有更高的分辨率。

最初,我尝试了Spike Detection in a Time-Series使用移动平均线,但没有按预期工作。 我也从这个线程Detecting patterns from two arrays of data in Python 尝试了一些解决方案,结果不足以让我在运行时在模式中提出一个标志(有很多误报)

此外,您可能从保存的实时数据中了解到,模式可以有不同的规模,最重要的是可以有不同的偏移量。这就是我想我将上述解决方案应用于我的问题以获得可区分结果的问题。

举个例子来试试,这些可以用于PatternDataWindow Pattern = [5.9, 5.6, 4.08, 2.57, 2.78, 4.78, 7.3, 7.98, 4.81, 5.57, 4.7]

SampleTarget = [4.74, 4.693, 4.599, 4.444, 3.448, 2.631, 1.845, 2.032, 2.415, 3.714, 5.184, 5.82, 5.61, 4.841, 3.802, 3.11]

SampleTarget2 = [5.898, 5.91, 5.62, 5.25, 4.72, 4.09, 3.445, 2.91, 2.7, 2.44, 2.515, 2.79, 3.25, 3.915,4.72, 5.65, 6.28, 7.15, 7.81, 8.2, 7.9, 7.71, 7.32, 6.88, 6.44, 6.0,5.58, 5.185, 4.88, 4.72, 4.69, 4.82]

我正在尝试在 Python for PoC 上解决这个问题。 更新:添加了数据集,包括前两个红色框和稍宽的边,显示在保存的实时数据中。dataset

【问题讨论】:

  • 这个问题可能适合dsp.stackexchange.com
  • 不清楚提供的三个系列是什么。您能否提供第一张图中显示的系列的更大样本?相同的数据会很好,因为您已经指出要检测的预期峰。
  • @mozway 在第二张图中,我绘制了 3 个示例模式,在第一个数据中标记为红色框。但是它们的分辨率有点低,因为在原始图中它们是在绘图中推断出来的。但这个想法又是一个正弦尖峰,我想检测一下。
  • @asevindik 我要求更大的数据集,因为设置一个只有正匹配样本的检测方案很棘手。最好有更多数据来了解算法在误报方面的表现
  • @mozway 我已经更新了。请看一下

标签: python python-3.x pandas numpy signal-processing


【解决方案1】:

您可以计算数据的梯度并使用阈值来识别特征。在这里,我使用三重蒙版来获得向下/向上/向下功能。

我将代码注释给你主要步骤,所以我希望它是全面的。

import pandas as pd
import matplotlib.pyplot as plt

# read data
s = pd.read_csv('sin_peaks.txt', header=None)[0]
# 0    5.574537
# 1    5.736071
# 2    5.965132
# 3    6.164344
# 4    6.172413

thresh = 0.5 # threshold of derivative
span = 10    # max span of the feature (in number of points)

# calculate gradient
# if the points are not evenly spaced
# you should also divide by the spacing
s2 = s.diff()

# get points outside of threshold
m1 = s2.lt(-thresh)
m2 = s2.gt(thresh)

# extend masks
m1_fw = m1.where(m1).ffill(limit=span)
m1_bw = m1.where(m1).bfill(limit=span)
m2_fbw = m2.where(m2).ffill(limit=span).bfill(limit=span)

# slice data where all conditions are met
# up peak & down peak in the "span" before and down peak in the "span" after
peaks = s[m1_fw & m1_bw & m2_fbw]

# group peaks
groups = peaks.index.to_series().diff().ne(1).cumsum()

# plot identified features
ax = s.plot(label='data')
s.diff().plot(ax=ax, label='gradient')
ax.legend()

ax.axhline(thresh, ls=':', c='k')
ax.axhline(-thresh, ls=':', c='k')

for _, group in peaks.groupby(groups):
    start = group.index[0]
    stop = group.index[-1]
    ax.axvspan(start, stop, color='k', alpha=0.1)

【讨论】:

  • 感谢您抽出宝贵时间@mozway!,我现在有机会尝试一下您的解决方案,但我遇到了错误:&: 'float' 和 ' 的操作数类型不受支持在peaks = s[m1_fw & m1_bw & m2_fbw] 行浮动'。我尝试了 python2.7 和 python3.6。你能仔细检查一下吗?
  • @asevindik 你有哪个熊猫版本?你真的不应该再使用 python2 了。我认为布尔值在fill 操作期间被转换回数字。尝试使用 m1_fw = m1.where(m1).ffill(limit=span).astype(bool) 强制 bool,其他的也一样。
  • 非常有用。我给它加了书签! +1
  • 谢谢@Corralien ;) 我希望 OP 设法让它工作!
  • 感谢@mozway 的建议。抱歉回复晚了。我已经设法让它运行并像你上面那样获取图表。我也在运行时尝试过,但无法获得成功的结果。我创建了大小为 15 的 numpy 数组,并将每个数据放入这个数组(循环),然后输入你的算法。self.s[self.counter%len(self.s)] = each_data_repection self.counter += 1 self.df = pd.DataFrame(self.s)我做错了什么让它在运行时工作吗?
最近更新 更多