【问题标题】:Waiting on multiple semaphores or multiple message queues等待多个信号量或多个消息队列
【发布时间】:2023-10-21 00:36:01
【问题描述】:

在大多数 UNIX 实现中,进程可以阻塞多个事件。也就是说,进程可以等待多个信号量或多个消息队列,而不是等待单个信号量或从单个消息队列接收。这种能力有什么优势?你将如何实现它?

现在,在大家开始问这是否是我的学校作业之前,它不是。这是我正在参加的课程的推荐考试题。

我对此的看法 - 类似这样:

typedef struct QueueElement {
    int Sender;
    int Receiver;
    char Message[MAX_MSG_SIZE];
    int MessageSize;
} QueueElement;

QueueElement MessageQueue[NUM_OF_PROCESSES];



/* Send Message to receiving process queue number */
int SendMsg(int SenderId, int ReceiverId, char* Msg, int Size)
{
    /* Create Queue Element to Enqueue */
    QueueElement El = {
        .Sender = SenderId,
        .Receiver = ...
        .
        .
    };

    /* Enqueue element in queue specified by Receiver's Id */
    Enqueue(&(MessageQueue[ReceiverId]), El);
}



/* Get messages for process defined by ReceiverId, from any queue */
int RecvMsg(int* SenderId, int ReceiverId, char* Msg, int* size)
{
    int i;
    for (i=NUM_OF_PROCESSES; i>=0; i--)
    {
        /* If a message is available for receiving process, Dequeue element from queue */
        if (MessageQueue[i].Receiver = ReceiverId)
        {
            QueueElement El = Dequeue(&(MessageQueue[i]));
            *SenderId = El.Sender;
            strcpy(Msg, El.Message);
            .
            .
            .

            return TRUE;
        }
    }

    return FALSE;
}

现在,考虑 4 个并行运行的进程。他们不断地向消息队列 1、2、3、4 发送消息。现在,假设所有消息都发送到进程 2。这意味着进程 2 必须检查所有 4 个队列(1、2、3、4)中的消息。但如果不断添加新消息,则只会处理队列 4 中的消息。如何避免饿死其他消息队列?

有没有更好的方法来处理这个问题?现代建筑如何处理这个问题?此解决方案的问题在于,如果消息不断被排入高优先级队列 (NUM_OF_PROCESSES),则永远不会处理低优先级队列中的消息。

【问题讨论】:

  • re:您的编辑:unix / POSIX select(2) 返回有活动的 FD 的数量,因此您可以在查找更多消息之前遍历所有有消息的 FD(管道或套接字或其他) . poll(2)epoll 也允许同样的事情。

标签: c unix cpu-architecture blocking epoll


【解决方案1】:

有没有更好的方法来处理这个问题?

是的。主要问题是您的代码不断浪费 CPU 时间轮询,因为它根本不等待。

更好的方法是把它放在内核中,这样:

  • 当一个任务调用RecvMsg()时,内核可以原子地执行if no messages in queue { tell scheduler this task should block until a message is received }(没有竞争条件)

  • 当任何东西调用SendMsg()时,内核可以原子地执行if receiving task is blocked waiting for a message { tell scheduler the receiving task should be unblocked }(没有竞争条件)

下一个问题是它只等待消息。如果您想等待(异步)打开文件,或者等待信号,或者等待时间过去,或者等待获取互斥锁怎么办?对于这个问题有两种可能的解决方案:

  • 对于不同类型的事情(仍然不能用于某些事情 - 例如等待互斥锁)有一个可怕的混乱(例如epoll_pwait())。

  • 在消息之上实现所有内容,这样您只需等待消息。

对于第二种解决方案;大多数情况下,您最终会用 Actor 模型取代传统的过程式编程。

【讨论】:

  • 感谢您的评论。是的,当然,实际的排队和进程阻塞/解除阻塞机制将由内核处理。该进程将不得不执行类似于主管调用的操作以将控制权传递给内核。这段代码的编写更多是为了解释我想要理解的内容。我发布的代码的主要问题是,如果更高优先级的队列总是收到新消息,则可能永远不会处理低优先级的消息队列元素。这是代码中descending for loop 的结果。有没有办法解决这个问题?
  • @ImranNiloy:在正常情况下,您不想解决这个问题(当您可以/应该做更重要的工作时,您不想浪费时间为较低优先级的消息做不太重要的工作处理更高优先级的消息)。如果总是有更高优先级的消息,则表明存在恶意攻击者(例如“拒绝服务”消息泛滥)、流量控制错误(发送方发送消息过快)或可扩展性不足(没有足够的接收方并行工作来处理负载) ;解决方案是修复发送者(而不是接收者)。
  • @ImranNiloy:请注意(无论消息优先级或消息处理顺序如何)如果接收方无法跟上,那么消息队列将继续增长,直到您用完 RAM 并不得不采取激烈的行动操作(例如,终止进程以释放消息队列中的许多 GiB 内存)。或者,作为最后的手段(防止恶意攻击者和发送方和接收方的设计缺陷),当接收方的队列太满时,内核可能会限制/延迟发送方。当然问题也可能是接收方(例如 Windows 中的“任务没有响应”对话框)。
【解决方案2】:

我认为您的代码没有按照提问者的意图回答问题。扩展问题的含义:

通常你可以调用read()之类的东西来从文件描述符中读取数据,它会阻塞直到有数据进来。但是,如果你有多个文件描述符,并且你想同时阻塞所有的文件描述符,直到数据来自其中任何一个?你不能用一个简单的read() 来做到这一点,它只需要一个文件描述符。

  • 您需要什么样的 API?展示一个函数或一组函数,让人们可以同时从多个来源读取数据。

提示: select() 正是这样做的。 google 的好关键字包括“select”、“multiplexing”和“non-blocking I/O”。

【讨论】:

  • 感谢您的评论。你能推荐一个从头开始构建的简单实现吗?在我正在学习的课程中,我们更关心处理来自具有不同优先级的组件的硬件中断消息。
【解决方案3】:

当使用不同优先级的消息队列时,阅读器将继续服务高优先级队列,当耗尽时,它将移动到低优先级队列。在不使用优先级的情况下,可以使用线程并行服务队列,但不能保证消息在不同队列中出现的顺序。

【讨论】: