【发布时间】:2021-01-23 04:01:26
【问题描述】:
我有一个解析数据流的python程序,如下所示
tail -F /path1/restapi.log -F /path2/restapi.log | parse.py
parse.py 正在解析来自 sys.stdin.readline 的数据
import re
import sys
import json
def deep_get(dictionary, keys, default=None):
return reduce(lambda d, key: d.get(key, default) if isinstance(d, dict) else default, keys.split("."), dictionary)
regexp_date_status = re.compile(r'(\d+-\d+-\d+ \d+:\d+:\d+.\d+\+\d+) (\w+)')
while True:
line = sys.stdin.readline()
if not line:
break
if re.search(r'Request #\d+: {', line):
date_status = regexp_date_status.match(line)
json_str = '{\n'
while True:
json_str += sys.stdin.readline()
try:
d = json.loads(json_str) # we have our dictionary, perhaps
except Exception:
pass
else:
username = (deep_get(d,"context.authorization.authUserName", default="Username not found"))
hostname = (deep_get(d,"context.headers.X-Forwarded-For"))
uri = (deep_get(d,"context.uri"))
verb = (deep_get(d,"context.verb"))
print("State->{} : Date->{} : User->{} : Host->{} : URI->{} : Verb->{}".format(date_status.group(2), date_status.group(1), username,hostname,uri,verb))
break
我想做多线程,因为文件数量最多可以增加到 30 个
tail -F /path1/restapi.log -F /path2/restapi.log /path3/restapi.log -F /path4/restapi.log .... | parse.py
在这种情况下,如何在线程之间分配工作,因为数据是流式传输和解析的,直到我在 try 块中获得有效的字典?我还需要在这里利用队列吗?
【问题讨论】:
标签: python multithreading python-2.7 python-multiprocessing python-multithreading