【问题标题】:Unable to return a specific output from multiprocessing pool无法从多处理池返回特定输出
【发布时间】:2020-06-07 07:57:03
【问题描述】:

我正在尝试使用 python 多处理库来调用一个函数 (calc_indicator),该函数采用 ta-lib 中技术指标的字符串名称数组,然后调用另一个函数 (technical_indicators) 来计算值字符串名称列表传递给第一个函数 (cal_indicator)。这就是我希望输出的样子:

当我运行以下代码时:

import multiprocessing as mp
import pandas as pd
import numpy as np
from talib import abstract

dataset = pd.read_csv('Data/Currencies/COST.csv')
working_frame = dataset.drop(['Date', 'Adj Close'],axis=1)

def technical_indicators(currency_dataframe, indicator):
    nothing_found = 'Indicator Not Found'

    inputs = {
            'open':currency_dataframe['Open'],
            'high':currency_dataframe['High'],
            'low':currency_dataframe['Low'],
            'close':currency_dataframe['Close'],
            'volume':currency_dataframe['Volume']
    }

    DEMA = abstract.DEMA(inputs, timeperiod=20)
    EMA = abstract.EMA(inputs, timeperiod=20)
    KAMA = abstract.KAMA(inputs, timeperiod=20)
    MA = abstract.MA(inputs, timeperiod=20, matype=0)

    ATR = abstract.ATR(inputs, timeperiod=20)
    NATR = abstract.NATR(inputs, timeperiod=20)
    TRANGE = abstract.TRANGE(inputs)

    if(indicator == 'DEMA'):
       return DEMA
    elif(indicator == 'EMA'):
        return EMA
    elif(indicator == 'KAMA'):
        return KAMA
    elif(indicator == 'MA'):
        return MA
    elif(indicator == 'ATR'):
        return ATR
    elif(indicator == 'NATR'):
        return NATR
    elif(indicator == 'TRANGE'):
        return TRANGE
    else:
        return nothing_found

list0 = ['DEMA', 'EMA', 'KAMA', 'MA']
list1 = ['ATR', 'NATR', 'TRANGE']

calc_frame = pd.DataFrame()

def calc_indicator(data_list):
    for i in range(len(data_list)):
        tindicator = technical_indicators(working_frame, data_list[i])
        calc_frame[data_list[i]] = tindicator

    return calc_frame

cal_ = calc_indicator(list0)

pool = mp.Pool(mp.cpu_count())
res0 = pool.map(calc_indicator, list0)
res1 = pool.map(calc_indicator, list1)

我得到这个输出:

D
E
K
M
M
A
E
A
M
A
M
A
A
A
T
N
T
R
A
A
N
R
G
T
E
R

我正在使用的数据的链接:daily prices

【问题讨论】:

  • 您有关于您的代码的具体问题吗?您是否尝试找到问题的原因?
  • 是的,代码没有给我我需要的输出。
  • 是什么阻止了您编写确实为您提供所需输出的不同代码?
  • 当我首先弄清楚第一个代码有什么问题时,我可以编写另一个代码来给我想要的输出
  • @mkrieger1 你明白我发布的问题还是你理解它有问题?

标签: python python-3.x python-multiprocessing ta-lib


【解决方案1】:

第一个问题是您的 calc_indicator 函数需要一个字符串列表。 但是 pool.map() api 会使用列表,并且 calc_indicator() 是使用单个字符串(例如calc_indicator('DEMA'))调用的,因此 calc_indicator 正在索引字符串的字符,而不是索引到列表中。

第二个问题是您试图从多个子进程更新单个对象 calc_frame。但是每个子进程都有自己的内存空间,所以主进程中的calc_frame不会受到子进程的影响。

相反,让子进程通过 pool.map() 返回 Technical_indicator() 结果,并迭代 pool.map() 以依次更新每个结果的 calc_frame:

def one_calc_indicator(indicator):
    return indicator, technical_indicators(working_frame, indicator)

pool = mp.Pool(mp.cpu_count())
for indicator, result in pool.map(one_calc_indicator, list0):
    calc_frame[indicator] = result

【讨论】:

    猜你喜欢
    • 2021-07-19
    • 1970-01-01
    • 1970-01-01
    • 2017-10-22
    • 1970-01-01
    • 2014-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-05
    相关资源
    最近更新 更多