【问题标题】:Semaphore in shared memory with Python on Windows?在 Windows 上与 Python 共享内存中的信号量?
【发布时间】:2017-08-21 04:52:28
【问题描述】:

这是场景。我有一个 Python 脚本,它将在运行其主代码之前启动另一个程序(我们称之为P)。可以同时运行多个 Python 脚本,但如果程序P 已经启动,则不应再次启动:

pid 100 starts up
pid 100 starts program P with pid n and detaches it
pid 100 runs main
pid 101 starts up
pid 101 does not start program P because it's already started
pid 101 runs main
pid 100 main finishes
pid 100 does not terminate program P because pid 101 needs it
pid 100 terminates
pid 101 main finishes
pid 101 terminates program P with pid n
pid 101 terminates

在 C 中,我可以通过 mmaping 一个文件并在其中放置一个信号量来跟踪它来创建共享内存。一旦信号量达到 0,我也可以终止程序 P。但我不知道如何在 Windows 上的 Python 中做到这一点。

我应该如何解决这个问题,即是否有已经建立的解决方法?

【问题讨论】:

  • 您可以将mmap 模块与tagname 一起使用。 Windows 中的共享内存是一个 Section 对象,与其他几种内核对象类型(例如设备、事件)一样,它可以在对象命名空间中命名。诸如“share1234”之类的名称是当前 Windows 会话的本地名称。要使所有会话都可以访问该名称,请使用“Global\share1234”。后者必须使用反斜杠作为分隔符,因为正斜杠只是内核中的一个名称字符。
  • 创建一个全局名称使用路径分隔符,因为这是如何实现的。 Windows API 使用\BaseNamedObjects 作为其全局名称(所有会话),使用\Sessions\[session number]\BaseNamedObjects 作为会话本地名称。本地 BNO 中有一个“全局”符号链接,因此它确实将其解析为,例如 \Sessions\1\BaseNamedObjects\Global\share1234 => \BaseNamedObjects\share1234

标签: python windows process


【解决方案1】:

我创建了一个示例,说明如何在 Windows 上使用套接字而不是共享内存信号量来实现此行为。这达到了同样的目的;只要至少有一个 Python 脚本在运行,c++ 程序就会一直运行。一旦所有的脚本都完成了,只要在一定的超时时间内没有更多的 Python 脚本被启动,c++ 程序就会结束。

这里的大部分代码都进入了 c++ 程序,该程序运行一个线程来监视来自 Python 脚本的 TCP 连接。

Python 脚本只是检查/启动 Windows 程序,然后打开一个套接字,该套接字一直保持打开状态,直到脚本结束。

Windows 程序检测套接字连接和断开连接,从而跟踪 Python 脚本的运行时间。

在这些示例中,Windows 程序恰好被称为“ConsoleApplication11.exe”。我使用了 1234 端口和 15 秒的超时,您可以在 c++ 程序的第 19-21 行更改这些。另外,如果您想让 c++ 程序的终止更迅速,请在 client_monitor_thread() 结束时调用 exit() 而不是 return。

希望这可能有用。

Python 脚本:

import socket
import time
import psutil
import os

# check if the C++ program is running, start it if not
cppProgramName = "ConsoleApplication11.exe"
progRunning = False
for pid in psutil.pids():
    p = psutil.Process(pid)
    if (cppProgramName in p.name()):
        progRunning = True

if not progRunning:
    os.startfile(cppProgramName)
    time.sleep(5) # wait for c++ program to start

# open a socket to the C++ program to tell it we need it to keep running
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("127.0.0.1", 1234))

# (MAIN PROGRAM)
time.sleep(3)
# (END OF MAIN PROGRAM)

# close the socket to the C++ program
s.close()

C++ 程序:

// ConsoleApplication11.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"

#include <iostream>
#include <set>
#include <chrono>
#include <thread>

#include <winsock2.h>
#include <Ws2tcpip.h>

#pragma comment(lib, "ws2_32.lib")              // link with Ws2_32.lib


namespace
{
    const unsigned int commonPort            = 1234;   // must match Python app
    const unsigned int noClientsTimeoutLimit = 15;     // quit when no clients connected for this many seconds
    bool               clientMonitorRunning  = true;   // flag to show client monitor is running


    void client_monitor_thread()
    {
        // start up winsock service version 2.2
        WSADATA wsaData;
        int iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
        if (iResult != NO_ERROR)
        {
            std::cout << "WSAStartup() failed with error: " << iResult << std::endl;
            clientMonitorRunning = false;
            return;
        }

        // create a socket for listening for incoming connection requests.
        SOCKET listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
        if (listenSocket == INVALID_SOCKET)
        {
            std::cout << "socket() function failed with error: " << WSAGetLastError() << std::endl;
            closesocket(listenSocket);
            clientMonitorRunning = false;
            return;
        }

        // sockaddr_in structure specifies the address family, IP address, and port for the socket 
        sockaddr_in service;
        service.sin_family = AF_INET;
        inet_pton(AF_INET, (PCSTR)"127.0.0.1", &(service.sin_addr));
        service.sin_port = htons(commonPort);
        if (SOCKET_ERROR == bind(listenSocket, (SOCKADDR *)& service, sizeof(service)))
        {
            std::cout << "bind function failed with error " << WSAGetLastError() << std::endl;
            closesocket(listenSocket);
            clientMonitorRunning = false;
            return;
        }

        // Listen for incoming connection requests on the created socket
        if (SOCKET_ERROR == listen(listenSocket, SOMAXCONN))
        {
            wprintf(L"listen function failed with error: %d\n", WSAGetLastError());
            closesocket(listenSocket);
            clientMonitorRunning = false;
            return;
        }

        std::cout << "Listening on port " << commonPort << std::endl;

        // mow monitor client connections
        std::set<unsigned int> activefds;
        int timeoutCounter = 0;

        while (clientMonitorRunning)
        {
            // check for existing clients disconnected
            if (0 != activefds.size())
            {
                std::set<unsigned int> disconnectedfds;
                for (auto fd : activefds)
                {
                    int flags = 0;
                    char buf[10];
                    int rv = recv(fd, buf, 10, flags);
                    if (0 == rv)
                    {
                        disconnectedfds.insert(fd);
                    }
                }
                for (auto fd : disconnectedfds)
                {
                    activefds.erase(fd);
                }
            }

            // are any clients connected? do we need to quit?
            if (0 == activefds.size())
            {
                std::cout << "No clients - will exit in " << noClientsTimeoutLimit - timeoutCounter << " seconds" << std::endl;
                ++timeoutCounter;
                if (timeoutCounter == noClientsTimeoutLimit)
                {
                    for (auto fd : activefds)
                    {
                        closesocket(fd);
                    }
                    closesocket(listenSocket);
                    clientMonitorRunning = false;
                    return;
                }
            }
            else
            {
                timeoutCounter = 0;
            }

            // check for activity on the listening socket
            fd_set readfds;
            struct timeval timeout;
            timeout.tv_sec = 1;
            timeout.tv_usec = 0;
            FD_ZERO(&readfds);
            FD_SET(listenSocket, &readfds);
            switch (select(sizeof(readfds), &readfds, NULL, NULL, &timeout))
            {
            case 0: // timeout
            {
                break;
            }
            case SOCKET_ERROR:
            {
                std::cout << "listen failed with error: " << WSAGetLastError() << std::endl;
                closesocket(listenSocket);
                clientMonitorRunning = false;
                return;
            }
            default:
            {
                if (FD_ISSET(listenSocket, &readfds))
                {
                    // accept the connection.
                    SOCKET fd = accept(listenSocket, NULL, NULL);
                    if (fd == INVALID_SOCKET)
                    {
                        std::cout << "accept failed with error: " << WSAGetLastError() << std::endl;
                        closesocket(listenSocket);
                        clientMonitorRunning = false;
                        return;
                    }
                    else
                    {
                        unsigned long nonBlock = 1;
                        ioctlsocket(fd, FIONBIO, &nonBlock);
                        activefds.insert(fd);
                    }
                }
                break;
            }
            }
        }

        return;
    }
}


int main()
{
    // start the client monitor thread, which will run until no clients are connected
    std::thread clientMonitor(client_monitor_thread);

    // placeholder for doing the actual work in this program
    // loop until the client monitor thread exits
    while (clientMonitorRunning)
    {
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }

    // clean up
    clientMonitor.join();
    WSACleanup();
    return 0;
}

【讨论】:

  • 这是一个有趣的想法,但并行编程并不安全。 (另一个实例可能在if not progRunning 之前启动,它会启动程序两次。)实际上,我通过创建一个"mutex" 解决了这个问题,该"mutex" 可以使用Windows 文件锁在不同的进程中使用。
  • 我认为,并行编程是安全的。按照设计,c++ 程序的 2 个实例不能运行,因为它们不能在同一个端口上侦听,一个会失败并退出。但是,您的互斥锁解决方案做得很好。
猜你喜欢
  • 2010-11-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2010-10-26
相关资源
最近更新 更多