显然,不可能通过多个Threads很好地控制coverage。
一旦启动了不同的线程,停止Coverage 对象将停止所有覆盖,start 只会在“开始”线程中重新启动它。
因此,您的代码基本上会在 2 秒后停止对除 CoverageThread 之外的所有 Thread 的覆盖。
我对 API 进行了一些尝试,可以在不停止 Coverage 对象的情况下访问测量值。
因此,您可以使用 API 启动一个定期保存覆盖数据的线程。
第一个实现是这样的
import threading
from time import sleep
from coverage import Coverage
from coverage.data import CoverageData, CoverageDataFiles
from coverage.files import abs_file
cov = Coverage(config_file=True)
cov.start()
def get_data_dict(d):
"""Return a dict like d, but with keys modified by `abs_file` and
remove the copied elements from d.
"""
res = {}
keys = list(d.keys())
for k in keys:
a = {}
lines = list(d[k].keys())
for l in lines:
v = d[k].pop(l)
a[l] = v
res[abs_file(k)] = a
return res
class CoverageLoggerThread(threading.Thread):
_kill_now = False
_delay = 2
def __init__(self, main=True):
self.main = main
self._data = CoverageData()
self._fname = cov.config.data_file
self._suffix = None
self._data_files = CoverageDataFiles(basename=self._fname,
warn=cov._warn)
self._pid = os.getpid()
super(CoverageLoggerThread, self).__init__()
def shutdown(self):
self._kill_now = True
def combine(self):
aliases = None
if cov.config.paths:
from coverage.aliases import PathAliases
aliases = PathAliases()
for paths in self.config.paths.values():
result = paths[0]
for pattern in paths[1:]:
aliases.add(pattern, result)
self._data_files.combine_parallel_data(self._data, aliases=aliases)
def export(self, new=True):
cov_report = cov
if new:
cov_report = Coverage(config_file=True)
cov_report.load()
self.combine()
self._data_files.write(self._data)
cov_report.data.update(self._data)
cov_report.html_report(directory="coverage_report_data.html")
cov_report.report(show_missing=True)
def _collect_and_export(self):
new_data = get_data_dict(cov.collector.data)
if cov.collector.branch:
self._data.add_arcs(new_data)
else:
self._data.add_lines(new_data)
self._data.add_file_tracers(get_data_dict(cov.collector.file_tracers))
self._data_files.write(self._data, self._suffix)
if self.main:
self.export()
def run(self):
while True:
sleep(CoverageLoggerThread._delay)
if self._kill_now:
break
self._collect_and_export()
cov.stop()
if not self.main:
self._collect_and_export()
return
self.export(new=False)
print("End of the program. I was killed gracefully :)")
可以在GIST 中找到更稳定的版本。
这段代码基本上是在不停止收集器的情况下获取收集器收集的信息。
get_data_dict 函数获取Coverage.collector 中的字典并弹出可用数据。这应该足够安全,以免丢失任何测量值。
报告文件每_delay 秒更新一次。
但是如果你有多个进程在运行,你需要付出额外的努力来确保所有的进程都运行CoverageLoggerThread。这是patch_multiprocessing 函数,从coverage 猴子补丁修补的猴子...
代码在GIST 中。它基本上用自定义进程替换了原始进程,该进程在运行run 方法之前启动CoverageLoggerThread,并在进程结束时加入线程。
脚本main.py 允许使用线程和进程启动不同的测试。
这段代码有 2/3 的缺点需要你小心:
同时使用combine 函数是个坏主意,因为它对.coverage.* 文件执行并发读/写/删除访问。这意味着函数export 不是超级安全的。应该没问题,因为数据被多次复制,但我会在生产中使用它之前做一些测试。
数据一旦导出,就会保留在内存中。因此,如果代码库很大,它可能会占用一些资源。可以转储所有数据并重新加载它,但我假设如果你想每 2 秒记录一次,你不想每次都重新加载所有数据。如果您延迟几分钟,我将每次创建一个新的_data,使用CoverageData.read_file 重新加载此过程的先前覆盖状态。
自定义进程将在完成之前等待_delay,因为我们在进程结束时加入CoverageThreadLogger,因此如果您有很多快速进程,您希望将睡眠的粒度增加到能够更快地检测到进程的结束。它只需要一个在_kill_now 上中断的自定义睡眠循环。
让我知道这是否对您有所帮助,或者是否可以改进此要点。
编辑:
看来您不需要对多处理模块进行修补即可自动启动记录器。在你的 python 安装中使用.pth,你可以使用环境变量在新进程上自动启动你的记录器:
# Content of coverage.pth in your site-package folder
import os
if "COVERAGE_LOGGER_START" in os.environ:
import atexit
from coverage_logger import CoverageLoggerThread
thread_cov = CoverageLoggerThread(main=False)
thread_cov.start()
def close_cov()
thread_cov.shutdown()
thread_cov.join()
atexit.register(close_cov)
然后您可以使用COVERAGE_LOGGER_START=1 python main.y 启动您的覆盖记录器