【问题标题】:Python - Count A Structured LogfilePython - 计算结构化日志文件
【发布时间】:2021-09-22 02:12:34
【问题描述】:

给定一个日志字符串数组:

log = [
    '[WARNING] 403 Forbidden: No token in request parameters',
    '[ERROR] 500 Server  Error: int is not subscription',
    '[INFO] 200 OK: Login Successful',
    '[INFO] 200 OK: User sent a message',
    '[ERROR] 500 Server Error: int is not subscription'
]

我正在尝试在 python 中更好地使用字典,并希望遍历这个数组并打印出如下内容:

{'WARNING': {'403': {'Forbidden': {'No token in request parameters': 1}}},
'ERROR': {'500': {'Server Error': {'int is not subscriptable': 2}}},
'INFO': {'200': {'OK': {'Login Successful': 1, 'User sent a message': 1}}}}

本质上,我想返回一个字典,其中包含上述格式的日志记录统计信息。 我开始写我的方法并写到这里:

def logInfo(logs):
    dct = {}

for log in logs:
    log = log.strip().split()
    if log[2] == "Server":
        log[2] = "Server Error:"
        log.remove(log[3])
    #print(log)
    joined = " ".join(log[3:])
    if log[0] not in dct:
        log[0] = log[0].strip('[').strip(']')
        dct[log[0]] = {}
        if log[1] not in dct[log[0]]:
            dct[log[0]][log[1]] = {}
            if log[2] not in dct[log[0]][log[1]]:
                dct[log[0]][log[1]][log[2]] = {}
                if joined not in dct:
                    dct[log[0]][log[1]][log[2]][joined] = 1
                else:
                    dct[log[0]][log[1]][log[2]][joined] += 1
            else:
                dct[joined].append(joined)
print(dct)

改为打印:

{'WARNING': {'403': {'Forbidden:': {'No token in request parameters': 1}}}, 'ERROR': {'500': {'Server Error:': {'int is not subscription': 1}}}, 'INFO': {'200': {'OK:': {'User sent a message': 1}}}}

该方法本身也很长,任何人都可以帮助或提示我一种更熟练的处理方法吗?

【问题讨论】:

    标签: python dictionary multidimensional-array


    【解决方案1】:

    我浏览了您的代码。发现修复了一些错误,并且运行良好。

    • 首先不需要嵌套if,所以我将if放在同一级别。因为当您测试dict是否有键时,如果不存在,则在键下给它一个空dict,所以接下来如果当有父键时可以正常工作。
    • 你在strip('[').strip(']')之前测试log[0] not in dct,所以你总是会听到以前的数据,我修复它并指向代码下方
    • 我不知道你为什么要测试joined not in dct,你应该在dct[log[0]][log[1]][log[2]]中测试它,我修复它并指向它的代码下方
    def logInfo(logs):
        dct = {}
    
        for log in logs:
            log = log.strip().split()
            if log[2] == "Server":
                log[2] = "Server Error:"
                log.remove(log[3])
            #print(log)
            joined = " ".join(log[3:])
    
            log[0] = log[0].strip('[').strip(']')
            if log[0] not in dct:
                # this line should move to before in dct test
                # log[0] = log[0].strip('[').strip(']') 
                dct[log[0]] = {}
            if log[1] not in dct[log[0]]:
                dct[log[0]][log[1]] = {}
            if log[2] not in dct[log[0]][log[1]]:
                dct[log[0]][log[1]][log[2]] = {}
            # I did not know why test joined in the root dct
            # if joined not in dct:
            if joined not in dct[log[0]][log[1]][log[2]]:
                dct[log[0]][log[1]][log[2]][joined] = 1
            else:
                dct[log[0]][log[1]][log[2]][joined] += 1
        
        print(dct)
    

    【讨论】:

    • 哦,哇,谢谢,基本上我检查了加入 dct[log[0]][log[1]][log[2]] 但它没有工作可能是因为嵌套的 if 语句.我明白我现在错了的逻辑:)
    【解决方案2】:

    您可以使用re.findallcollections.defaultdict

    import re, collections
    r = collections.defaultdict(dict)
    log = ['[WARNING] 403 Forbidden: No token in request parameters', '[ERROR] 500 Server Error: int is not subscription', '[INFO] 200 OK: Login Successful', '[INFO] 200 OK: User sent a message', '[ERROR] 500 Server Error: int is not subscription']
    for i in log:
       a, b, c, d = map(str.strip, re.findall('(?<=\[)\w+(?=\])|(?<=\]\s)\d+|(?<=\d\s)[\w\s]+(?=:)|(?<=:)[\w+\s]+$', i))
       if b not in r[a]:
          r[a][b] = collections.defaultdict(dict)
       if c not in r[a][b]:
          r[a][b][c] = collections.defaultdict(int)
       r[a][b][c][d] += 1
    

    输出:

    defaultdict(<class 'dict'>, {'WARNING': {'403': defaultdict(<class 'dict'>, {'Forbidden': defaultdict(<class 'int'>, {'No token in request parameters': 1})})}, 'ERROR': {'500': defaultdict(<class 'dict'>, {'Server Error': defaultdict(<class 'int'>, {'int is not subscription': 2})})}, 'INFO': {'200': defaultdict(<class 'dict'>, {'OK': defaultdict(<class 'int'>, {'Login Successful': 1, 'User sent a message': 1})})}})
    

    结果是collections.defaultdicts 中的collections.defaultdict。如果只想要纯字典,可以使用递归转换r

    def to_dict(d):
       return {a:to_dict(b) if not isinstance(b, int) else b for a, b in d.items()}
    
    print(to_dict(r))
    

    输出:

    {'WARNING': {'403': {'Forbidden': {'No token in request parameters': 1}}}, 
    'ERROR': {'500': {'Server Error': {'int is not subscription': 2}}}, 
    'INFO': {'200': {'OK': {'Login Successful': 1, 'User sent a message': 1}}}}
    

    【讨论】:

    • 感谢您的解决方案,您介意将我引导到一个我可以了解这些功能的确切作用的页面吗?或者如果你不介意自己解释?
    • @FireAssassin 要深入了解collections.default,请参阅here。此解决方案不是手动解析不同的日志组件,而是使用regular expressions。最后,to_dict 在递归原理下工作:循环输入字典,如果值是字典,则再次调用to_dict,但如果值是数字计数,则按原样存储。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-03-24
    相关资源
    最近更新 更多