【问题标题】:Python Multiprocessing - Writing to a JSON filePython 多处理 - 写入 JSON 文件
【发布时间】:2020-10-14 17:52:38
【问题描述】:

我有一个从 Rest-API 读取订单字典的要求,如下所示:-

OrderedDict([('_bkt', 'ii~1809~C76785F7-95DF-4D1D-A5E7-A2202947734C'), ('_cd', '1809:718727061')])
OrderedDict([('_bkt', 'ii~1809~C76785F7-95DF-4D1D-A5E7-A2202947735C'), ('_cd', '1809:718727063')])
OrderedDict([('_bkt', 'ii~1809~C76785F7-95DF-4D1D-A5E7-A2202947736C'), ('_cd', '1809:718727065')])
OrderedDict([('_bkt', 'ii~1809~C76785F7-95DF-4D1D-A5E7-A2202947737C'), ('_cd', '1809:718727067')])

我的要求是读取有序字典并将多处理中的 JSON 格式的数据写入 JSON 文件。但是我的代码不能正常工作,它没有将 JSON 格式的数据写入我的目标文件。请提出建议。

代码如下:-


from multiprocessing import Pool
from collections import OrderedDict
import simplejson as json

rr = OrderedDict([('_bkt', 'ii~1809~C76785F7-95DF-4D1D-A5E7-A2202947734C'), ('_cd', '1809:718727061')])

f = open('iitp222.json', "a")

def write_data(args):
    f.write(args + '\n')

###Get the results and display them using the ResultsReader.
if __name__ == "__main__":
    for result in rr:
            print result
            p = Pool()
            result = p.map(write_data, json.dumps(result))
            p.close()
            p.join()
    f.close()

【问题讨论】:

  • 代码没有多大意义,对于字典中的每个项目,您正在创建一个新池并尝试为 single 项目合并多处理。遍历字典也只会产生键,而不是值
  • 这段代码只是一个例子。正如我最初所说,我正在从 Rest-API 中提取数据,而我收到的数据是有序字典。我的要求是从 Rest-API 读取并将其写入多处理中的 JSON 文件。由于目前我正在单核上写入数据,因此处理时间相当长。我的要求是缩短完整的阅读和写作部分。
  • 听起来您需要使用多线程或异步来一次发出多个请求,因为这是 I/O 绑定代码,然后使用多处理来一次执行多个写入,因为写入磁盘受 CPU 限制。

标签: python json python-3.x multiprocessing pool


【解决方案1】:

我可以通过以下代码解决我的问题

#------------Import Lib-----------------------#
import splunklib.results as results
from collections import OrderedDict
import splunklib.client  as client
import simplejson as json, sys
from datetime import datetime
import multiprocessing as mp

fn=sys.argv[1]
HOST = "restapi.xxxx.com"
PORT = 443

#----Capturing Current Hour & Min--------------#
Cur_min1 = datetime.now().strftime('%M')
Cur_min = int(Cur_min1)/2

#----Evaluating time to flip different users --- #
if int(Cur_min) % 4 == 0:
        USERNAME = "xxxxxx"
        PASSWORD = "yyyyyy"
elif int(Cur_min) % 4 == 1:
        USERNAME = "xxxxxx"
        PASSWORD = "yyyyyy"
elif int(Cur_min) % 4 == 2:
        USERNAME = "kuuuu1"
        PASSWORD = "yyyyyy"
else:
        USERNAME = "xxxx"
        PASSWORD = "yyyyyy"

# Create a Service instance and log in
try:
    service = client.connect(
        host=HOST,
        port=PORT,
        username=USERNAME,
        password=PASSWORD)
    rr = results.ResultsReader(service.jobs.export("search index=xxx host=yyyyyy* sourcetype=xxxx splunk_server=idx* earliest=-2m@m"))
    f1=open(fn, 'w')
    f1.close()
except:
    os.system("python /home/xxx/MS_TeamsNotification.py 'Unable to connect Splunk Rest-API'")
    exit()

###Get the results and display them using the ResultsReader.

def worker(arg, q):
    res = json.dumps(arg)
    q.put(res)
    return res

def listener(q):
    '''listens for messages on the q, writes to file. '''
    with open(fn, 'w') as f:
        while 1:
            m = q.get()
            if m == 'kill':
                break
            f.write(str(m) + '\n')
            f.flush()

def main():
    #must use Manager queue here, or will not work
    manager = mp.Manager()
    q = manager.Queue()
    pool = mp.Pool(mp.cpu_count() + 2)

    #put listener to work first
    watcher = pool.apply_async(listener, (q,))

    #fire off workers
    jobs = []
    for result in rr:
        if isinstance(result, dict):
            job = pool.apply_async(worker, (result, q))
            jobs.append(job)
    assert rr.is_preview == False

    # collect results from the workers through the pool result queue
    for job in jobs:
        job.get()

    #now we are done, kill the listener
    q.put('kill')
    pool.close()
    pool.join()

if __name__ == "__main__":
   main()


【讨论】:

    猜你喜欢
    • 2012-11-06
    • 1970-01-01
    • 2017-08-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-07-06
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多