【问题标题】:Generator usage for log analysing用于日志分析的生成器使用情况
【发布时间】:2021-11-04 03:00:17
【问题描述】:

我有一个工作 python 代码来分析日志。日志的大小至少为 10 MB,有时会达到 250-300 MB,具体取决于失败、重试次数。

我使用了生成器,它可以将大文件生成为块,并且可以配置,我通常使用 1 或 2 MB 的日志来生成。因此,我将日志分析为 1Mb 块以验证某些测试。

我的问题是,当我使用生成器时,它可能会引发一些极端情况。在日志分析中,我检查随后出现的一些模式,如下所示,所以如果只看到这 4 个列表,那么我将它们保留用于代码的下一个验证部分。以下 4 种模式可以在日志中看到一次或两次,而不是更多。

listA
listB
listC
listD

如果这些都是随后发生的,那么我将它们全部保留以在下一步进行评估,否则忽略..

但是现在可能会发生以下小变化,一些模式(列表,因为我使用正则表达式 findall 方法来查找模式)可以在下一个块中完成检查。所以在下面我有 3 个匹配的 case 块 3-4 和 5-6 和 7-8 创建了要考虑的条件。

---- chunk 1 -----

listA

listB

----- chunk 2 -----


nothing

----- chunk 3 -----
listA
 
listB

----- chunk 4 -----

listC
listD

----- chunk 5 -----

listA


----- chunk 6 -----

listB

listC
listD

---- chunk 7 ------
listA
listB
listC
----- chunk 8 ------

listD
---------------------

通常不会这样发生,一些模式 (B,C,D) 大多在日志中随后出现,但 listA 可以比其他模式早 20 行或最多 30 行出现。但上述任何情况都可能发生。

请指教一个好的方法,我不知道用什么,我知道有next()函数可以用来检查下一个块,在这种情况下

我应该使用 any([listA, listB, listC, listD]) 方法吗?如果出现任何模式,那么我是否需要像下面这样逐个检查下一个块中的其余部分?:

if any([listA, listB, listC, listD]):

Then here check which of the patterns not seen and keep them in a notSeen list then check them one by one in next chunk?
next_chunk = next(gen_func(chunksize))
isListA = re.findall(pattern, next_chunk)

或者我可能完全错过了这个小项目的更简单的方法。请让我知道您的想法,因为您以前可能会遇到这种情况。

【问题讨论】:

    标签: generator


    【解决方案1】:

    我使用了 next_chunk = next(gen_func(chunksize)) 并在下面添加了必要的 if 语句以仅检查 1 个下一个日志片段,因为我会使用生成器适当地安排日志块:

    我只分享了一部分代码,其余部分保密

    import re, os
    def __init__(self, logfile):
        self.read = self.ReadLog(logfile)
        self.search = self.SearchData(logfile)
        self.file = os.path.abspath(logfile)
        self.data = self.read.read_in_chunks
        
    r_search_combined, scan_result, r_complete, r_failed = [], [], [], []
    
    def test123(self, r_reason: str, cyc: int, b_r):
        ''' Usage : verify_log.py
                --log ${LOGS_FOLDER}/log --reason r_low 1 <True | False>'''
        ret = False
        r_count = 2*int(cyc) if b_r.lower() == "true" else int(cyc)
        r_search_combined, scan_result, r_complete, r_failed = [], [], [], []
        result_pattern = self.search.r_scan_pattern()
        def check_patterns(chunk):
            search_cached = re.findall(self.search.r_search_cached, chunk)
            search_full = re.findall(self.search.r_search_full, chunk)
            scan_complete = re.findall(self.search.r_scan_complete, chunk)
            scan_result = re.findall(result_pattern, chunk)
            r_complete = re.findall(self.search.r_auth_complete, chunk)
            return search_cached, search_full, scan_complete, scan_result, r_complete 
    
        with open(self.file) as rf:
            for idx, piece in enumerate(self.data(rf), start=1):
                is_failed = re.findall(self.search.r_failure, piece)
                if is_failed:
                    print(f'general failure received : {is_failed}')
                    r_failed.extend(is_failed)
    
                is_r_search_cached, is_r_search_full, is_scan_complete, is_scan, is_r_complete = check_patterns(piece)
    
                if (is_r_search_cached or is_r_search_full) and all([is_scan_complete, is_scan, is_r_complete]):
                    if is_r_search_cached:
                        r_search_combined.extend(is_r_search_cached)
                    if is_r_search_full:
                        r_search_combined.extend(is_r_search_full)
                    scan_result.extend(is_scan)
                    r_complete.extend(is_r_complete)
    
                elif (is_r_search_cached or is_r_search_full) and not any([is_scan, is_r_complete]): 
                    next_piece = next(self.data(rf))
                    _, _, _, is_scan_next, is_r_complete_next = check_patterns(next_piece) 
                    if (is_r_search_cached or is_r_search_full) and all([is_scan_next, is_r_complete_next]):
                        r_search_combined.extend(is_r_search_cached)
                        r_search_combined.extend(is_r_search_full)
                        scan_result.extend(is_scan_next)
                        r_complete.extend(is_r_complete_next)
    
                elif (is_r_search_cached or is_r_search_full) and is_scan and not is_r_complete:
                    next_piece = next(self.data(rf))
                    _, _, _, _, is_r_complete_next = check_patterns(next_piece)
                    if (is_r_search_cached or is_r_search_full) and all([is_scan, is_r_complete_next]):
                        r_search_combined.extend(is_r_search_cached)
                        r_search_combined.extend(is_r_search_full)
                        scan_result.extend(is_scan)
                        r_complete.extend(is_r_complete_next)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2013-07-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2010-11-25
      • 1970-01-01
      • 2018-10-17
      相关资源
      最近更新 更多