【发布时间】:2016-01-17 21:39:33
【问题描述】:
我在 Python 3 中使用psutil 编写了一个程序来获取当前正在运行的进程的内存详细信息。问题是我得到的值与 Windows 任务管理器中的值不同。具体来说,如何在 Python 中获取进程的私有工作集大小?
【问题讨论】:
标签: python windows python-3.x memory
我在 Python 3 中使用psutil 编写了一个程序来获取当前正在运行的进程的内存详细信息。问题是我得到的值与 Windows 任务管理器中的值不同。具体来说,如何在 Python 中获取进程的私有工作集大小?
【问题讨论】:
标签: python windows python-3.x memory
psutil 调用GetProcessMemoryInfo,这不会破坏私有与共享内存的工作集。要获取此信息,您可以使用 Windows performance counter API。我在下面演示的另一种方法是直接计算共享页面的数量。 QueryWorkingSet 返回一个包含PSAPI_WORKING_SET_BLOCK 条目的数组(工作集中每页一个),您可以为这些条目统计具有Shared 字段集的条目。您需要一个进程句柄,您可以通过调用GetCurrentProcess 或OpenProcess 来获得它。要将页面转换为字节,请通过调用GetPerformanceInfo 或GetSystemInfo 获取系统页面大小。
这种方法的缺点是您需要 PROCESS_VM_READ 和 PROCESS_QUERY_INFORMATION 访问该进程。如果当前用户是高级管理员,通常启用SeDebugPrivilege 可以绕过访问检查,“受保护”进程除外。
from ctypes import *
from ctypes.wintypes import *
from collections import namedtuple
__all__ = ['query_working_set', 'working_set_size']
kernel32 = WinDLL('kernel32', use_last_error=True)
psapi = WinDLL('psapi', use_last_error=True)
PROCESS_VM_READ = 0x0010
PROCESS_QUERY_INFORMATION = 0x0400
ERROR_ACCESS_DENIED = 0x0005
ERROR_BAD_LENGTH = 0x0018
ULONG_PTR = WPARAM
SIZE_T = c_size_t
class PSAPI_WORKING_SET_BLOCK(Union):
class _FLAGS(Structure):
_fields_ = (('Protection', ULONG_PTR, 5),
('ShareCount', ULONG_PTR, 3),
('Shared', ULONG_PTR, 1),
('Reserved', ULONG_PTR, 3),
('VirtualPage', ULONG_PTR, 20))
_anonymous_ = '_flags',
_fields_ = (('Flags', ULONG_PTR),
('_flags', _FLAGS))
class PSAPI_WORKING_SET_INFORMATION(Structure):
_fields_ = (('NumberOfEntries', ULONG_PTR),
('_WorkingSetInfo', PSAPI_WORKING_SET_BLOCK * 1))
@property
def WorkingSetInfo(self):
array_t = PSAPI_WORKING_SET_BLOCK * self.NumberOfEntries
offset = PSAPI_WORKING_SET_INFORMATION._WorkingSetInfo.offset
return array_t.from_buffer(self, offset)
PPSAPI_WORKING_SET_INFORMATION = POINTER(PSAPI_WORKING_SET_INFORMATION)
def errcheck_bool(result, func, args):
if not result:
raise WinError(get_last_error())
return args
psapi.QueryWorkingSet.errcheck = errcheck_bool
psapi.QueryWorkingSet.argtypes = (
HANDLE, # _In_ hProcess
PPSAPI_WORKING_SET_INFORMATION, # _Out_ pv
DWORD) # _In_ cb
kernel32.GetCurrentProcess.restype = HANDLE
kernel32.OpenProcess.errcheck = errcheck_bool
kernel32.OpenProcess.restype = HANDLE
kernel32.OpenProcess.argtypes = (
DWORD, # _In_ dwDesiredAccess
BOOL, # _In_ bInheritHandle
DWORD) # _In_ dwProcessId
def query_working_set(pid=None):
"""Return the PSAPI_WORKING_SET_BLOCK array for the target process."""
if pid is None:
hprocess = kernel32.GetCurrentProcess()
else:
access = PROCESS_VM_READ | PROCESS_QUERY_INFORMATION
hprocess = kernel32.OpenProcess(access, False, pid)
info = PSAPI_WORKING_SET_INFORMATION()
base_size = sizeof(info)
item_size = sizeof(PSAPI_WORKING_SET_BLOCK)
overshoot = 0
while True:
overshoot += 4096
n = info.NumberOfEntries + overshoot
resize(info, base_size + n * item_size)
try:
psapi.QueryWorkingSet(hprocess, byref(info), sizeof(info))
break
except OSError as e:
if e.winerror != ERROR_BAD_LENGTH:
raise
return info.WorkingSetInfo
class PERFORMANCE_INFORMATION(Structure):
_fields_ = (('cb', DWORD),
('CommitTotal', SIZE_T),
('CommitLimit', SIZE_T),
('CommitPeak', SIZE_T),
('PhysicalTotal', SIZE_T),
('PhysicalAvailable', SIZE_T),
('SystemCache', SIZE_T),
('KernelTotal', SIZE_T),
('KernelPaged', SIZE_T),
('KernelNonpaged', SIZE_T),
('PageSize', SIZE_T),
('HandleCount', DWORD),
('ProcessCount', DWORD),
('ThreadCount', DWORD))
def __init__(self, *args, **kwds):
super(PERFORMANCE_INFORMATION, self).__init__(*args, **kwds)
self.cb = sizeof(self)
PPERFORMANCE_INFORMATION = POINTER(PERFORMANCE_INFORMATION)
psapi.GetPerformanceInfo.errcheck = errcheck_bool
psapi.GetPerformanceInfo.argtypes = (
PPERFORMANCE_INFORMATION, # _Out_ pPerformanceInformation
DWORD) # _In_ cb
WorkingSetSize = namedtuple('WorkingSetSize', 'total shared private')
def working_set_size(pid=None):
"""Return the total, shared, and private working set sizes
for the target process.
"""
wset = query_working_set(pid)
pinfo = PERFORMANCE_INFORMATION()
psapi.GetPerformanceInfo(byref(pinfo), sizeof(pinfo))
pagesize = pinfo.PageSize
total = len(wset) * pagesize
shared = sum(b.Shared for b in wset) * pagesize
private = total - shared
return WorkingSetSize(total, shared, private)
if __name__ == '__main__':
import sys
pid = int(sys.argv[1]) if len(sys.argv) > 1 else None
try:
total, shared, private = working_set_size(pid)
except OSError as e:
if e.winerror == ERROR_ACCESS_DENIED:
sys.exit('Access Denied')
raise
width = len(str(total))
print('Working Set: %*d' % (width, total))
print(' Shared: %*d' % (width, shared))
print(' Private: %*d' % (width, private))
例如:
C:\>tasklist /fi "imagename eq explorer.exe"
Image Name PID Session Name Session# Mem Usage
========================= ======== ================ =========== ============
explorer.exe 2688 Console 1 66,048 K
C:\>workingset.py 2688
Working Set: 67465216
Shared: 59142144
Private: 8323072
以下演示了即使以管理员身份也被拒绝访问系统进程。通常启用SeDebugPrivilege 可以解决这个问题(注意,特权必须存在于进程令牌中才能启用它;您不能只向令牌添加特权)。展示如何在访问令牌中启用和禁用权限超出了此答案的范围,但在下面我证明它确实有效,至少对于不受保护的进程。
C:\>tasklist /fi "imagename eq winlogon.exe"
Image Name PID Session Name Session# Mem Usage
========================= ======== ================ =========== ============
winlogon.exe 496 Console 1 8,528 K
C:\>workingset.py 496
Access Denied
C:\>python
>>> from workingset import *
>>> from privilege import enable_privilege
>>> enable_privilege('SeDebugPrivilege')
>>> working_set_size(496)
WorkingSetSize(total=8732672, shared=8716288, private=16384)
【讨论】: