【问题标题】:Getting same process details as task manager in Windows获取与 Windows 中的任务管理器相同的进程详细信息
【发布时间】:2016-01-17 21:39:33
【问题描述】:

我在 Python 3 中使用psutil 编写了一个程序来获取当前正在运行的进程的内存详细信息。问题是我得到的值与 Windows 任务管理器中的值不同。具体来说,如何在 Python 中获取进程的私有工作集大小?

【问题讨论】:

    标签: python windows python-3.x memory


    【解决方案1】:

    psutil 调用GetProcessMemoryInfo,这不会破坏私有与共享内存的工作集。要获取此信息,您可以使用 Windows performance counter API。我在下面演示的另一种方法是直接计算共享页面的数量。 QueryWorkingSet 返回一个包含PSAPI_WORKING_SET_BLOCK 条目的数组(工作集中每页一个),您可以为这些条目统计具有Shared 字段集的条目。您需要一个进程句柄,您可以通过调用GetCurrentProcessOpenProcess 来获得它。要将页面转换为字节,请通过调用GetPerformanceInfoGetSystemInfo 获取系统页面大小。

    这种方法的缺点是您需要 PROCESS_VM_READPROCESS_QUERY_INFORMATION 访问该进程。如果当前用户是高级管理员,通常启用SeDebugPrivilege 可以绕过访问检查,“受保护”进程除外。

    from ctypes import *
    from ctypes.wintypes import *
    from collections import namedtuple
    
    __all__ = ['query_working_set', 'working_set_size']
    
    kernel32 = WinDLL('kernel32', use_last_error=True)
    psapi = WinDLL('psapi', use_last_error=True)
    
    PROCESS_VM_READ           = 0x0010
    PROCESS_QUERY_INFORMATION = 0x0400
    
    ERROR_ACCESS_DENIED = 0x0005
    ERROR_BAD_LENGTH    = 0x0018
    
    ULONG_PTR = WPARAM
    SIZE_T = c_size_t
    
    class PSAPI_WORKING_SET_BLOCK(Union):
        class _FLAGS(Structure):
            _fields_ = (('Protection',  ULONG_PTR,  5),
                        ('ShareCount',  ULONG_PTR,  3),
                        ('Shared',      ULONG_PTR,  1),
                        ('Reserved',    ULONG_PTR,  3),
                        ('VirtualPage', ULONG_PTR, 20))
        _anonymous_ = '_flags',
        _fields_ = (('Flags', ULONG_PTR),
                    ('_flags', _FLAGS))
    
    class PSAPI_WORKING_SET_INFORMATION(Structure):
        _fields_ = (('NumberOfEntries',  ULONG_PTR),
                    ('_WorkingSetInfo', PSAPI_WORKING_SET_BLOCK * 1))
        @property
        def WorkingSetInfo(self):
            array_t = PSAPI_WORKING_SET_BLOCK * self.NumberOfEntries
            offset = PSAPI_WORKING_SET_INFORMATION._WorkingSetInfo.offset
            return array_t.from_buffer(self, offset)
    
    PPSAPI_WORKING_SET_INFORMATION = POINTER(PSAPI_WORKING_SET_INFORMATION)
    
    def errcheck_bool(result, func, args):
        if not result:
            raise WinError(get_last_error())
        return args
    
    psapi.QueryWorkingSet.errcheck = errcheck_bool
    psapi.QueryWorkingSet.argtypes = (
        HANDLE,                         # _In_  hProcess
        PPSAPI_WORKING_SET_INFORMATION, # _Out_ pv
        DWORD)                          # _In_  cb
    
    kernel32.GetCurrentProcess.restype = HANDLE
    
    kernel32.OpenProcess.errcheck = errcheck_bool
    kernel32.OpenProcess.restype = HANDLE
    kernel32.OpenProcess.argtypes = (
        DWORD, # _In_ dwDesiredAccess
        BOOL,  # _In_ bInheritHandle
        DWORD) # _In_ dwProcessId
    
    def query_working_set(pid=None):
        """Return the PSAPI_WORKING_SET_BLOCK array for the target process."""
        if pid is None:
            hprocess = kernel32.GetCurrentProcess()
        else:
            access = PROCESS_VM_READ | PROCESS_QUERY_INFORMATION
            hprocess = kernel32.OpenProcess(access, False, pid)
        info = PSAPI_WORKING_SET_INFORMATION()
        base_size = sizeof(info)
        item_size = sizeof(PSAPI_WORKING_SET_BLOCK)
        overshoot = 0
        while True:
            overshoot += 4096
            n = info.NumberOfEntries + overshoot
            resize(info, base_size + n * item_size)
            try:
                psapi.QueryWorkingSet(hprocess, byref(info), sizeof(info))
                break
            except OSError as e:
                if e.winerror != ERROR_BAD_LENGTH:
                    raise
        return info.WorkingSetInfo
    
    class PERFORMANCE_INFORMATION(Structure):
        _fields_ = (('cb',                DWORD),
                    ('CommitTotal',       SIZE_T),
                    ('CommitLimit',       SIZE_T),
                    ('CommitPeak',        SIZE_T),
                    ('PhysicalTotal',     SIZE_T),
                    ('PhysicalAvailable', SIZE_T),
                    ('SystemCache',       SIZE_T),
                    ('KernelTotal',       SIZE_T),
                    ('KernelPaged',       SIZE_T),
                    ('KernelNonpaged',    SIZE_T),
                    ('PageSize',          SIZE_T),
                    ('HandleCount',       DWORD),
                    ('ProcessCount',      DWORD),
                    ('ThreadCount',       DWORD))
        def __init__(self, *args, **kwds):
            super(PERFORMANCE_INFORMATION, self).__init__(*args, **kwds)
            self.cb = sizeof(self)
    
    PPERFORMANCE_INFORMATION = POINTER(PERFORMANCE_INFORMATION)
    
    psapi.GetPerformanceInfo.errcheck = errcheck_bool
    psapi.GetPerformanceInfo.argtypes = (
        PPERFORMANCE_INFORMATION, # _Out_ pPerformanceInformation
        DWORD)                    # _In_  cb
    
    WorkingSetSize = namedtuple('WorkingSetSize', 'total shared private')
    
    def working_set_size(pid=None):
        """Return the total, shared, and private working set sizes
           for the target process.
        """
        wset = query_working_set(pid)
        pinfo = PERFORMANCE_INFORMATION()
        psapi.GetPerformanceInfo(byref(pinfo), sizeof(pinfo))
        pagesize = pinfo.PageSize        
        total = len(wset) * pagesize
        shared = sum(b.Shared for b in wset) * pagesize
        private = total - shared
        return WorkingSetSize(total, shared, private)
    
    if __name__ == '__main__':
        import sys    
        pid = int(sys.argv[1]) if len(sys.argv) > 1 else None
        try:
            total, shared, private = working_set_size(pid)
        except OSError as e:
            if e.winerror == ERROR_ACCESS_DENIED:
                sys.exit('Access Denied')
            raise
        width = len(str(total))
        print('Working Set: %*d' % (width, total))
        print('     Shared: %*d' % (width, shared))
        print('    Private: %*d' % (width, private))
    

    例如:

    C:\>tasklist /fi "imagename eq explorer.exe"
    
    Image Name                     PID Session Name        Session#    Mem Usage
    ========================= ======== ================ =========== ============
    explorer.exe                  2688 Console                    1     66,048 K
    
    C:\>workingset.py 2688
    Working Set: 67465216
         Shared: 59142144
        Private:  8323072
    

    以下演示了即使以管理员身份也被拒绝访问系统进程。通常启用SeDebugPrivilege 可以解决这个问题(注意,特权必须存在于进程令牌中才能启用它;您不能只向令牌添加特权)。展示如何在访问令牌中启用和禁用权限超出了此答案的范围,但在下面我证明它确实有效,至少对于不受保护的进程。

    C:\>tasklist /fi "imagename eq winlogon.exe"
    
    Image Name                     PID Session Name        Session#    Mem Usage
    ========================= ======== ================ =========== ============
    winlogon.exe                   496 Console                    1      8,528 K
    
    C:\>workingset.py 496
    Access Denied
    
    C:\>python
    >>> from workingset import *
    >>> from privilege import enable_privilege
    >>> enable_privilege('SeDebugPrivilege')
    >>> working_set_size(496)
    WorkingSetSize(total=8732672, shared=8716288, private=16384)
    

    【讨论】:

    • 感谢您的出色回答!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-11-21
    • 1970-01-01
    • 2015-12-20
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多