【问题标题】:Thread Safe Pipe Termination螺纹安全管道端接
【发布时间】:2019-01-31 13:32:10
【问题描述】:

(开始前注意:虽然我的问题很笼统,但我的代码需要使用旧版Visual Studio 2008 MFC应用程序编译,并且必须使用MFC或win32同步,请避免使用ie boost或c++ 11回答)

我正在尝试实现 线程安全管道(具有单个读取器和单个写入器的队列),我执行了以下操作:

template<class T>
class CMultiThreadPipe { 

private:
    HANDLE hSemaphore, hTerminateEvent1, hTerminateEvent2;
    CRITICAL_SECTION listMutex; 
    CList<T*, T*> list;

public:
    CMultiThreadPipe() { 
        InitializeCriticalSection(&listMutex);
        hSemaphore = CreateSemaphore(NULL, 0, LONG_MAX, NULL);
        hTerminateEvent1 = ::CreateEvent(NULL, TRUE, FALSE, NULL); 
        hTerminateEvent2 = ::CreateEvent(NULL, TRUE, FALSE, NULL);
    }

    // pdata must be allocated with new. The dequeueing thread will delete it
    void Enqueue(T* pdata) { 
        EnterCriticalSection(&listMutex);
        list.AddHead(pdata);
        LeaveCriticalSection(&listMutex);
        ReleaseSemaphore(hSemaphore, 1, NULL);
    }

    // if Dequeue returns null it means the pipe was destroyed and no further queue method calls are legal
    // Dequeue caller is responsible to delete the returned instance
    T* Dequeue()
    {
        HANDLE handles[] = { hTerminateEvent1, hSemaphore };
        DWORD waitRes = WaitForMultipleObjects(2, handles, FALSE, INFINITE);
        if (waitRes==WAIT_OBJECT_0) {
            SetEvent(hTerminateEvent2);
            return NULL; // terminated
        }
        EnterCriticalSection(&listMutex);
        T* elem = list.RemoveTail(); 
        LeaveCriticalSection(&listMutex);
        return elem; // handler must delete item
    }

    void Destroy() {
        SetEvent(hTerminateEvent1);
        WaitForSingleObject(hTerminateEvent2, INFINITE);
        EnterCriticalSection(&listMutex);
        POSITION pos = list.GetHeadPosition(); 
        for (int i = 0; i < list.GetCount(); i++) delete list.GetNext(pos); 
        LeaveCriticalSection(&listMutex);
        DeleteCriticalSection(&listMutex);
        CloseHandle(hSemaphore);
    }

    ~CMultiThreadPipe() { 
        Destroy();
    }
};

代码是这样使用的:

class QueueData {
    public:
        QueueData(int i) : m_data(i) {};
        int m_data;
};

UINT DequeueThreadProc(LPVOID dummy);

CMultiThreadedPipe<QueueData>* pPipe = NULL;

void main() {
    pPipe = new CMultiThreadedPipe<QueueData>();
    start new thread running DequeueThreadProc

    int counter=0;
    for (int counter=0; counter<10; counter++)
    {
        pPipe->Enqueue(new QueueData(counter));
        Sleep(300);
    }
    delete pPipe;
}

UINT DequeueThreadProc(LPVOID ignore)
{
    QueueData* queueData;
    while ((queueData = pPipe->Dequeue()) != NULL) {
        delete queueData;
        Sleep(1000);
    };
    return 0;
}

我遇到的问题是终止,在上述实现中,当管道被销毁(总是由入队线程)时,它正在等待出队线程知道它在删除队列之前终止。它必须这样做以防止出队线程在管道被销毁后尝试出队的情况。

如果出队线程没有继续调用出队,第一个线程将挂在析构函数中,同样如果出队线程在调用出队之间等待很长时间,第一个线程的析构函数将相应地卡在那里。

我阅读了各种关于它的帖子,没有提到安全销毁。任何帮助表示赞赏!

【问题讨论】:

  • 首先你需要使用iocp 来实现——这个对象是专门为你的任务而设计的,并且使用这个代码变得更少和高效。在第二次 - 对多个线程使用的对象使用引用计数 - 这样你就不需要等待删除 - 谁释放了最后一个引用 - 称为析构函数。并且不需要与其他线程同步
  • 为什么有这么多“结构”?我会采用更简单的实现,没有信号量、列表或终止事件,只有两个关键部分,一个用于读取器,一个用于写入器。

标签: c++ multithreading visual-c++ mfc win32-process


【解决方案1】:

对于从多个线程访问的安全销毁对象,您需要对其使用引用计数。在将对象指针传递给新线程之前 - 您增加对对象的引用。当线程不再使用对象时,或者如果创建线程失败,则减少引用计数。当对象的最后一个引用被释放时——你可以安全地调用对象的析构函数。你不需要在这里等待任何线程。

也用于实现这样的队列 - 在 Windows 中存在特殊对象 - 在用户空间中命名为 I/O Completion Ports(在内核空间中称为 KQUEUE)。使用这个对象 - 实现将更加高效和简单 - 您不需要管理自我列表(代码中的CList),同步访问它 - 所有这些都将在内核空间中为您完成(PostQueuedCompletionStatus -> @987654323 @、GetQueuedCompletionStatus -> KeRemoveQueue)。您只需要创建 iocp, (kqueue) 对象。

class CMultiThreadPipe {

public:

    class __declspec(novtable) QueueData {
    public:

        virtual void ProcessItem() = 0;

        virtual ~QueueData()
        {
            DbgPrint("%x: %s<%p>\n", GetCurrentThreadId(), __FUNCTION__, this);
        }

        QueueData()
        {
            DbgPrint("%x: %s<%p>\n", GetCurrentThreadId(), __FUNCTION__, this);
        }
    };

private:
    HANDLE _hIOCP;
    LONG _dwRef;
    ULONG _nThreads;

    void DequeueThreadProc()
    {
        ULONG NumberOfBytesTransferred;
        QueueData* pData;
        OVERLAPPED* pOverlapped;

        while (GetQueuedCompletionStatus(_hIOCP, 
            &NumberOfBytesTransferred, 
            (ULONG_PTR*)&pData, 
            &pOverlapped, INFINITE))
        {
            if (pData)
            {
                pData->ProcessItem();
            }
            else
            {
                break;
            }
        }

        Release();
    }

    __declspec(noreturn) static DWORD CALLBACK _DequeueThreadProc(PVOID pThis)
    {
        reinterpret_cast<CMultiThreadPipe*>(pThis)->DequeueThreadProc();
        FreeLibraryAndExitThread((HMODULE)&__ImageBase, 0);
    }

    ~CMultiThreadPipe()
    {
        if (_hIOCP)
        {
            CloseHandle(_hIOCP);
        }
    }

public:

    CMultiThreadPipe() : _dwRef(1), _hIOCP(0)
    {
    }

    void AddRef()
    {
        InterlockedIncrement(&_dwRef);
    }

    void Release()
    {
        if (!InterlockedDecrement(&_dwRef))
        {
            delete this;
        }
    }

    ULONG Create(DWORD NumberOfDequeueThreads)
    {
        if (_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, NumberOfDequeueThreads))
        {
            ULONG n = 0;
            do 
            {
                HMODULE hModule;
                if (GetModuleHandleExW(GET_MODULE_HANDLE_EX_FLAG_FROM_ADDRESS, (PCWSTR)_DequeueThreadProc, &hModule))
                {
                    AddRef();

                    if (HANDLE hThread = CreateThread(0, 0, _DequeueThreadProc, this, 0, 0))
                    {
                        CloseHandle(hThread);
                        n++;
                    }
                    else
                    {
                        Release();
                        FreeLibrary(hModule);
                    }
                }

            } while (--NumberOfDequeueThreads);

            _nThreads = n;

            return n ? NOERROR : ERROR_GEN_FAILURE;
        }

        return GetLastError();
    }

    ULONG Enqueue(QueueData* pData)
    {
        return PostQueuedCompletionStatus(_hIOCP, 0, (ULONG_PTR)pData, 0) ? NOERROR : GetLastError();
    }

    void Destroy()
    {
        if (ULONG n = _nThreads)
        {
            do 
            {
                PostQueuedCompletionStatus(_hIOCP, 0, 0, 0);
            } while (--n);
        }
    }
};

及用法:

class QueueData : public CMultiThreadPipe::QueueData
{
    int m_data; 

    virtual void ProcessItem()
    {
        DbgPrint("%x: %s<%p>(%u)\n", GetCurrentThreadId(), __FUNCTION__, this, m_data);
        delete this;
    }
public:
    QueueData(int i) : m_data(i) {};
};

void testQueue()
{
    if (CMultiThreadPipe* pPipe = new CMultiThreadPipe)
    {
        if (pPipe->Create(8) == NOERROR)
        {
            int n = 64;

            do 
            {
                if (QueueData* pData = new QueueData(n))
                {
                    if (pPipe->Enqueue(pData))
                    {
                        delete pData;
                    }
                }
            } while (--n);

            pPipe->Destroy();
        }
        pPipe->Release();
    }
}

注意这样的CMultiThreadPipe 实现——工作线程退出时不需要等待。即使您在 dll 中的代码并且您卸载了 dll - 您也不需要等待。每个线程都有自己的对象和模块引用。并在退出时释放它

【讨论】:

  • CMultiThreadPipe 的引用计数不是线程安全的。 _dwRef 需要防范并发访问。 Interlocked* API 调用只是解决方案的一半。没有针对调用AddRef 的线程的保护,而另一个线程已减少其引用计数,并在执行delete this; 之前被抢占。
  • @IInspectable - 当另一个线程减少了它的引用计数 - 你仍然无法理解AddRef 的调用者有自己的引用计数并且独立于另一个线程。跨度>
  • @IInspectable 1.) 对于访问对象,我们需要对对象 2.) 调用对象上的任何方法,包括 AddRef,这是对象访问 3.) 如果我们有对对象的引用 - 这个保证它不会被删除,直到我们不释放它 没有针对调用 AddRef 的线程的保护 - 该代码中的保护具有对对象的引用,当他调用对象方法时更好地重读 - docs.microsoft.com/en-us/windows/desktop/com/…
  • 当我在调用 WindowProc 之前使用人工引用计数时,我还没有忘记与您讨论我的窗口类实现的同一主题
  • CMultiThreadPipe 不是 COM 对象。 COM 规则不适用。
猜你喜欢
  • 1970-01-01
  • 2016-02-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-10-26
  • 1970-01-01
相关资源
最近更新 更多