【问题标题】:How to improve performance of coincidence filtering of a time-series?如何提高时间序列的符合过滤性能?
【发布时间】:2019-10-25 22:31:09
【问题描述】:

我正在研究流体动力学的固定实验数据。我们在三个通道上测量了数据,因此样本不直接重合(同时测量)。我想用窗口方案过滤它们以获得重合样本并忽略所有其他样本。

很遗憾,由于公司的限制,我无法上传原始数据集。但我试图建立一个最小的例子,它生成一个类似(更小)的数据集。原始数据集由每个通道 500000 个值组成,每个值都标有到达时间。使用这些时间戳检查巧合。

刚才,我循环遍历第一个通道的每个样本,并查看与其他通道的时间差异。如果它小于指定的窗口宽度,则保存索引。如果我指定一个间隔来检查差异(例如附近的 100 或 1000 个样本),可能会快一点。但是通道之间的数据速率可能会有很大差异,因此尚未实现。我更喜欢摆脱对每个样本的循环 - 如果可能的话。

def filterCoincidence(df, window = 50e-6):
        '''
        Filters the dataset with arbitrary different data rates on different channels to coincident samples.
        The coincidence is checked with regard to a time window specified as argument.
        '''
        AT_cols = [col for col in df.columns if 'AT' in col]
        if len(AT_cols) == 1:
            print('only one group available')
            return
        used_ix = np.zeros( (df.shape[0], len(AT_cols)))
        used_ix.fill(np.nan)
        for ix, sample in enumerate(df[AT_cols[0]]):
            used_ix[ix, 0] = ix
            test_ix = np.zeros(2)
            for ii, AT_col in enumerate(AT_cols[1:]):
                diff = np.abs(df[AT_col] - sample)
                index = diff[diff <= window].sort_values().index.values
                if len(index) == 0:
                    test_ix[ii] = None
                    continue
                test_ix[ii] = [ix_use if (ix_use not in used_ix[:, ii+1] or ix == 0) else None for ix_use in index][0]
            if not np.any(np.isnan(test_ix)):
                used_ix[ix, 1:] = test_ix
            else:
                used_ix[ix, 1:] = [None, None]
        used_ix = used_ix[~np.isnan(used_ix).any(axis=1)]
        print(used_ix.shape)
        return

no_points = 10000
no_groups = 3
meas_duration = 60
df = pd.DataFrame(np.transpose([np.sort(np.random.rand(no_points)*meas_duration) for _ in range(no_groups)]), columns=['AT {}'.format(i) for i in range(no_groups)])
filterCoincidence(df, window=1e-3)

是否已经实现了可以进行这种过滤的模块?但是,如果您能给我一些提示以提高代码的性能,那就太棒了。

【问题讨论】:

  • dftest 数据吗?它有三列,是时间标记吗?实际的 DataFrame 是否包含更多列,其中包含每个 时间戳 的测量数据?
  • 样本是周期性的,但三个通道之间是否存在偏移?或者样本是非周期性的?对于AT 0 中的每个样本,其他列中是否只有一个样本被视为巧合
  • 是的。数据存储在 pandas 数据框df 中。每组至少存在 3 列,但只有粒子的到达时间 (AT) 才重要。到达时间以秒为单位测量,并相对于第一个样本发生偏移,因此 ist 从 0 开始,然后在触发测量时始终具有时间戳(以秒为单位)。
  • 测量程序不能保证等距采样。所以每个组或多或少相互独立(物理上当然不是,我们测量 3 个速度分量但想同时知道速度 --> 需要巧合)。因此,没有恒定的偏移量,也不知道有多少样本或什至样本彼此重合。但我只需要找到彼此匹配的样本。所以从 10000 个样本中我会得到 1000 个样本,这些样本是重合的,可以被认为是同时测量的。
  • 您的函数是否产生正确的结果?结果是used_ix吗?

标签: python pandas performance numpy time-series


【解决方案1】:

如果其他人有类似的问题,只是为了更新这个帖子。我认为经过几次代码修订后,我找到了一个合适的解决方案。

def filterCoincidence(self, AT1, AT2, AT3, window = 0.05e-3):
    '''
    Filters the dataset with arbitrary different data rates on different channels to coincident samples.
    The coincidence is checked with regard to a time window specified as argument.

    - arguments:
        - three times series AT1, AT2 and AT3 (arrival times of particles in my case)
        - window size (50 microseconds as default setting)

    - output: indices of combined samples
    '''

    start_time = datetime.datetime.now()
    AT_list = [AT1, AT2, AT3]

    # take the shortest period of time
    min_EndArrival = np.max(AT_list)
    max_BeginArrival = np.min(AT_list)
    for i, col in enumerate(AT_list):
        min_EndArrival = min(min_EndArrival, np.max(col))
        max_BeginArrival = max(max_BeginArrival, np.min(col))
    for i, col in enumerate(AT_list):
        AT_list[i] = np.delete(AT_list[i], np.where((col < max_BeginArrival - window) | (col > min_EndArrival + window)))

    # get channel with lowest datarate
    num_points = np.zeros(len(AT_list))
    datarate = np.zeros(len(AT_list))

    for i, AT in enumerate(AT_list):
        num_points[i] = AT.shape[0]
        datarate[i] = num_points[i] / (AT[-1]-AT[0])
    used_ref = np.argmin(datarate)

    # process coincidence
    AT_ref_val = AT_list[used_ref]
    AT_list = list(np.delete(AT_list, used_ref))
    overview = np.zeros( (AT_ref_val.shape[0], 3), dtype=int)
    overview[:,0] = np.arange(AT_ref_val.shape[0], dtype=int)
    borders = np.empty(2, dtype=object)
    max_diff = np.zeros(2, dtype=int)
    for i, AT in enumerate(AT_list):
        neighbors_lower = np.searchsorted(AT, AT_ref_val - window, side='left')
        neighbors_upper = np.searchsorted(AT, AT_ref_val + window, side='left')
        borders[i] = np.transpose([neighbors_lower, neighbors_upper])
        coinc_ix = np.where(np.diff(borders[i], axis=1).flatten() != 0)[0]
        max_diff[i] = np.max(np.diff(borders[i], axis=1))
        overview[coinc_ix, i+1] = 1
    use_ix = np.where(~np.any(overview==0, axis=1))
    borders[0] = borders[0][use_ix]
    borders[1] = borders[1][use_ix]
    overview = overview[use_ix]

    # create all possible combinations refer to the reference
    combinations = np.prod(max_diff)
    test = np.empty((overview.shape[0]*combinations, 3), dtype=object)
    for i, [ref_ix, at1, at2] in enumerate(zip(overview[:, 0], borders[0], borders[1])):
        test[i * combinations:i * combinations + combinations, 0] = ref_ix
        at1 = np.arange(at1[0], at1[1])
        at2 = np.arange(at2[0], at2[1])
        test[i*combinations:i*combinations+at1.shape[0]*at2.shape[0],1:] = np.asarray(list(itertools.product(at1, at2)))
    test = test[~np.any(pd.isnull(test), axis=1)]

    # check distances
    ix_ref = test[:,0]
    test = test[:,1:]
    test = np.insert(test, used_ref, ix_ref, axis=1)
    test = test.astype(int)

    AT_list.insert(used_ref, AT_ref_val)
    AT_mat = np.zeros(test.shape)
    for i, AT in enumerate(AT_list):
        AT_mat[:,i] = AT[test[:,i]]

    distances = np.zeros( (test.shape[0], len(list(itertools.combinations(range(3), 2)))))
    for i, AT in enumerate(itertools.combinations(range(3), 2)):
        distances[:,i] = np.abs(AT_mat[:,AT[0]]-AT_mat[:,AT[1]])
    ix = np.where(np.all(distances <= window, axis=1))[0]
    test = test[ix,:]
    distances = distances[ix,:]

    # check duplicates
    # use sum of differences as similarity factor
    dist_sum = np.max(distances, axis=1)
    unique_sorted = np.argsort([np.unique(test[:,i]).shape[0] for i in range(test.shape[1])])[::-1]
    test = np.hstack([test, dist_sum.reshape(-1, 1)])
    test = test[test[:,-1].argsort()]
    for j in unique_sorted:
        _, ix = np.unique(test[:,j], return_index=True)
        test = test[ix, :]
    test = test[:,:3]
    test = test[test[:,used_ref].argsort()]
    # check that all values are after each other
    ix = np.where(np.any(np.diff(test, axis=0) > 0, axis=1))[0]
    ix = np.append(ix, test.shape[0]-1)
    test = test[ix,:]
    print('{} coincident samples obtained in {}.'.format(test.shape[0], datetime.datetime.now()-start_time))
    return test

我确信有更好的解决方案,但对我来说它现在有效。而且我知道,绝对应该更清楚地选择变量名称(例如test),但我会在我的硕士论文结束时清理我的代码......也许:-)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-10-06
    • 2020-06-30
    • 1970-01-01
    • 1970-01-01
    • 2017-01-28
    • 2021-01-13
    相关资源
    最近更新 更多