【问题标题】:One Cycle Fourier Window optimization. My code is inefficient一周期傅立叶窗优化。我的代码效率低下
【发布时间】:2019-11-20 11:42:12
【问题描述】:

美好的一天

编辑: 我想要什么: 从电力系统 (PS) 上的任何电流/电压波形中,我想要过滤后的 50Hz(基本)RMS 值幅度(以及它们的角度)。测量的电流包含从 100Hz 到 1250Hz 的所有谐波,具体取决于设备。无法使用具有这些谐波的波形进行分析和计算,您的误差会变得如此之大(取决于设备),以至于 PS 保护设备计算出不正确的数量。附加的信号还涉及许多其他频率分量。

我的目标:PS保护继电器很特别,在很短的时间内计算出20ms的窗口。我不是想得到这个。我正在使用外部记录技术并测试继电器看到的内容是否真实并且它们是否正常运行。因此,我需要做他们所做的事情,只保持 50Hz 的值,没有任何谐波和 DC。

重要的预期结果:给定信号中可能存在的任何频率分量,我想查看任何给定谐波的幅度(150,250 - 基波的 3 次谐波幅度和 5 次谐波)以及直流的幅度。这将告诉我哪种类型的 PS 设备可能注入这些频率。重要的是我提供了一个频率,答案是该频率的向量,只有所有其他值被过滤掉。 RMS-of-the-fundal 与 RMS 的差异因数为 4000A(仅 50Hz)和 4500A(包括其他频率)

此代码计算给定频率的单周期傅立叶值 (RMS)。我认为有时称为傅立叶滤波器?我将它用于电力系统 50Hz/0Hz/150Hz 类似物分析。 (答案已经过测试,是正确的基本 RMS 值。(https://users.wpi.edu/~goulet/Matlab/overlap/trigfs.html)

对于大样本,代码非常慢。对于 55000 个数据点,需要 12 秒。对于 3 个电压和 3 个电流,这变得非常缓慢。我每天看 100 条记录。

如何增强它?有哪些 Python 提示和技巧/库可以附加我的列表/数组。 (也可以随意重写或使用代码)。我使用代码在给定时间从信号中挑选出某些值。 (就像从专门的电力系统分析程序中读取值一样) 已编辑:根据我加载文件和使用它们的方式,代码可以粘贴它:

import matplotlib.pyplot as plt
import csv
import math
import numpy as np
import cmath

# FILES ATTACHED TO POST
filenamecfg = r"E:/Python_Practise/2019-10-21 13-54-38-482.CFG"
filename = r"E:/Python_Practise/2019-10-21 13-54-38-482.DAT"

t = []
IR = []
newIR=[]
with open(filenamecfg,'r') as csvfile1:
    cfgfile = [row for row in csv.reader(csvfile1, delimiter=',')]
    numberofchannels=int(np.array(cfgfile)[1][0])
    scaleval = float(np.array(cfgfile)[3][5])
    scalevalI = float(np.array(cfgfile)[8][5])
    samplingfreq = float(np.array(cfgfile)[numberofchannels+4][0])
    numsamples = int(np.array(cfgfile)[numberofchannels+4][1])
    freq = float(np.array(cfgfile)[numberofchannels+2][0])
    intsample = int(samplingfreq/freq)
    #TODO neeeed to get number of samples and frequency and detect 
#automatically
    #scaleval = np.array(cfgfile)[3]
    print('multiplier:',scaleval)
    print('SampFrq:',samplingfreq)
    print('NumSamples:',numsamples)
    print('Freq:',freq)


with open(filename,'r') as csvfile:
    plots = csv.reader(csvfile, delimiter=',')
    for row in plots:
        t.append(float(row[1])/1000000) #get time from us to s
        IR.append(float(row[6]))

newIR = np.array(IR) * scalevalI
t = np.array(t)


def mag_and_theta_for_given_freq(f,IVsignal,Tsignal,samples): #samples are the sample window size you want to caclulate for (256 in my case)
    # f in hertz, IVsignal, Tsignal in numpy.array
    timegap = Tsignal[2]-Tsignal[3]
    pi = math.pi
    w = 2*pi*f
    Xr = []
    Xc = []
    Cplx = []
    mag = []
    theta = []
    #print("Calculating for frequency:",f)
    for i in range(len(IVsignal)-samples): 
        newspan = range(i,i+samples)
        timewindow = Tsignal[newspan]
        #print("this is my time: ",timewindow)
        Sig20ms = IVsignal[newspan]
        N = len(Sig20ms) #get number of samples of my current Freq
        RealI = np.multiply(Sig20ms, np.cos(w*timewindow)) #Get Real and Imaginary part of any signal for given frequency
        ImagI = -1*np.multiply(Sig20ms, np.sin(w*timewindow)) #Filters and calculates 1 WINDOW RMS (root mean square value).
        #calculate for whole signal and create a new vector. This is the RMS vector (used everywhere in power system analysis)
        Xr.append((math.sqrt(2)/N)*sum(RealI)) ### TAKES SO MUCH TIME
        Xc.append((math.sqrt(2)/N)*sum(ImagI)) ## these steps make RMS
        Cplx.append(complex(Xr[i],Xc[i]))
        mag.append(abs(Cplx[i]))
        theta.append(np.angle(Cplx[i]))#th*180/pi # this can be used to get Degrees if necessary
        #also for freq 0 (DC) id the offset is negative how do I return a negative to indicate this when i'm using MAGnitude or Absolute value
    return Cplx,mag,theta #mag[:,1]#,theta # BUT THE MAGNITUDE WILL NEVER BE zero

myZ,magn,th = mag_and_theta_for_given_freq(freq,newIR,t,intsample)

plt.plot(newIR[0:30000],'b',linewidth=0.4)#, label='CFG has been loaded!')
plt.plot(magn[0:30000],'r',linewidth=1)

plt.show()

鉴于附加的文件,粘贴的代码运行顺利 问候

编辑:请在此处找到测试 csvfile 和 COMTRADE TEST 文件: CSV: https://drive.google.com/open?id=18zc4Ms_MtYAeTBm7tNQTcQkTnFWQ4LUu

COMTRADE https://drive.google.com/file/d/1j3mcBrljgerqIeJo7eiwWo9eDu_ocv9x/view?usp=sharing https://drive.google.com/file/d/1pwYm2yj2x8sKYQUcw3dPy_a9GrqAgFtD/view?usp=sharing

【问题讨论】:

  • 欢迎来到 SO,您能否发布一个合成数据集(或生成它的函数)以及有关输入和预期输出的详细信息。您还可以描述一下您打算执行的操作吗?使用数学或发布参考来确定目标的操作定义。干杯,
  • 感谢您更新您的帖子。不幸的是,您提供的其他信息并不是很有用。有输入是一个好点,但这还不够。想想我们将需要重现您的问题并清楚地了解您的目标。我们缺少的是:如何加载数据,如何将它们提供给函数,最重要的是:预期输出是什么。如果没有这些信息,就很难调查您的问题。请考虑阅读minimal reproducible example 以使您的帖子符合标准。
  • 话虽如此,您的代码主要依赖于具有大量索引和标量操作的 for 循环。您已经导入了numpy,因此您应该利用矢量化。这将是改进的良好开端。当您明确定义要应用到您的输入的流程以及预期的输出时,我相信我们会找到提高性能的方法
  • 感谢您的好评。我已经给出了我缩短但有效的代码。为准确的操作。它应该可以工作。
  • 感谢您提供此更新,我们已接近 MCVE。我假设一个以 50 Hz 为中心的带通滤波器就足够了。

标签: python signal-processing simpowersystems


【解决方案1】:

前言

正如我在之前的评论中所说:

您的代码主要依赖于带有大量索引的for 循环和 标量运算。你已经导入了numpy,所以你应该采取 矢量化的优势。

此答案是您解决方案的开始。

轻量级 MCVE

首先我们为 MCVE 创建一个试验信号:

import numpy as np

# Synthetic signal sampler: 5s sampled as 400 Hz
fs = 400 # Hz
t = 5    # s
t = np.linspace(0, t, fs*t+1)

# Synthetic Signal: Amplitude is about 325V @50Hz
A = 325 # V
f = 50  # Hz
y = A*np.sin(2*f*np.pi*t) # V

然后我们可以使用通常的公式计算这个信号的RMS

# Actual definition of RMS:
yrms = np.sqrt(np.mean(y**2)) # 229.75 V

或者我们也可以使用DFT 计算它(在numpy.fft 中实现为rfft):

# RMS using FFT:
Y = np.fft.rfft(y)/y.size
Yrms = np.sqrt(np.real(Y[0]**2 + np.sum(Y[1:]*np.conj(Y[1:]))/2)) # 229.64 V

可以在here 找到最后一个公式为何有效的演示。这是有效的,因为Parseval Theorem 暗示傅里叶变换确实节约能量。

两个版本都使用了向量化函数,不需要将实部和虚部分开进行计算,然后重新组合成一个复数。

MCVE:窗口化

我怀疑您想将此函数用作长期时间序列的窗口,其中 RMS 值即将发生变化。然后我们可以使用提供时间序列商品的pandas 库来解决这个问题。

import pandas as pd

我们封装了RMS函数:

def rms(y):
    Y = 2*np.fft.rfft(y)/y.size
    return np.sqrt(np.real(Y[0]**2 + np.sum(Y[1:]*np.conj(Y[1:]))/2))

我们生成一个阻尼信号(可变 RMS)

y = np.exp(-0.1*t)*A*np.sin(2*f*np.pi*t) 

我们将试验信号包装到数据帧中以利用 rollingresample 方法:

df = pd.DataFrame(y, index=t*pd.Timedelta('1s'), columns=['signal'])

信号的滚动 RMS 是:

df['rms'] = df.rolling(int(fs/f)).agg(rms)

周期性采样的 RMS 是:

df['signal'].resample('1s').agg(rms)

后面的返回:

00:00:00    2.187840e+02
00:00:01    1.979639e+02
00:00:02    1.791252e+02
00:00:03    1.620792e+02
00:00:04    1.466553e+02

信号调理

为了满足您只保留基波 (50 Hz) 的需求,一个简单的解决方案可能是线性 detrend(以消除恒定和线性趋势),然后是 Butterworth filter(带通滤波器)。

我们生成具有其他频率和线性趋势的合成信号:

y = np.exp(-0.1*t)*A*(np.sin(2*f*np.pi*t) \
     + 0.2*np.sin(8*f*np.pi*t) + 0.1*np.sin(16*f*np.pi*t)) \
     + A/20*t + A/100

然后我们调节信号:

from scipy import signal
yd = signal.detrend(y, type='linear')
filt = signal.butter(5, [40,60], btype='band', fs=fs, output='sos', analog=False)
yfilt = signal.sosfilt(filt, yd)

图形化地导致:

它会在 RMS 计算之前继续应用信号调理。

【讨论】:

  • 我喜欢你的建议!我只需要把头绕过去。使用 rfft...我非常非常喜欢这个想法。其次,我必须开始了解“df.rolling”和.resample 的Python 实现。我什至从来没有遇到过这些函数
  • 另外,这部分不只过滤掉 50 Hz。 yrms = np.sqrt(np.mean(y**2)) # 229.75 V -- 这给出了 RMS 是的,但不是基本的 RMS。 rfft 如何仅过滤 50hz。因此我要求的任何其他频率为 150Hz 或 0Hz 或任何谐波特定频率都可以得到吗?
  • @apemanx 我很乐意为您提供进一步的帮助并向您展示如何过滤,但我真的需要您更准确、更具体地了解过滤器的含义以及您打算如何进行过滤。您介意更新您的帖子并在数学上或至少在操作上说明您想要的吗?是低通滤波器吗?哪个顺序?什么样的?
  • 没问题。会做!非常感谢你看这个。也许带通滤波器会起作用,但它必须适用于 50 或 150 Hz 而不是 49-51Hz 频带。就像只从树上摘下好苹果,把坏苹果留在后面。最终只有好苹果。 (也只有当我想要坏苹果时,我才能把它们也留下好的)
  • 非常感谢。我有很多东西可以玩,考虑以及通过它运行一些其他系统录音。很多小代码sn-ps,我觉得很有趣,但还不明白!但是,嘿!这就是我在这里的原因。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-11-10
  • 1970-01-01
  • 2012-04-26
  • 1970-01-01
相关资源
最近更新 更多