【问题标题】:RabbitMQ , .NET and multithreadingRabbitMQ、.NET 和多线程
【发布时间】:2016-09-30 01:04:18
【问题描述】:

我一直在玩 RabbitMQ 及其 .NET 连接器。 经过一段时间的测试,我使用 RabbitMQ 作为网络应用程序和 API 网络中的代理的新系统投入生产。 所有应用都在几分钟内卡住了。 我想我达到了一些依赖于操作系统的阈值或搞砸了一些 TCP 堆栈(在块之后,甚至我的客户端的 TCP/IP 连接都不再访问网络服务器了)。

我还没有找到关于如何处理通过数十个进程和数千个线程传播的密集流量的好材料。

我的系统每秒生成 10k+ 个线程,每个线程都必须通过连接丢弃一条消息,然后终止。 对于所有这些消息,我只有几个“捕手”。

已经采取了一些对策。 不要为每个新线程使用新的连接 -> 声明一个连接工厂并使用静态连接(连接和通道上的函数已经说是线程安全的) -> 已解决

这是工厂的代码

    public static class Factory
    {
        private static IConnection sharedConnection_cl;
        public static IConnection SharedConnection_cl
        {
            get
            {
                if (sharedConnection_cl == null)
                {
                    sharedConnection_cl = GetRabbitMqConnection();
                }
               return sharedConnection_cl;
         }

    private static IConnection GetRabbitMqConnection()
            {
                ConnectionFactory connectionFactory = new             ConnectionFactory();
                connectionFactory.HostName = "x.y.z.w";
                connectionFactory.UserName = "notTheGuestUser";
                connectionFactory.Password = "abcdef";
                connectionFactory.VirtualHost = "/dedicatedHost";
                return connectionFactory.CreateConnection();
          }

不使用所有可用的 Erlang 进程。 Erlang 进程阈值在不到 10 分钟内达到(同一连接上的关闭通道不会触发服务器上相应 Erlang 进程的死亡)。-> 为任何给定连接添加了最大通道数的阈值,并使用信号量保护访问。每隔一段时间,连接就会关闭并重新创建(相应的 Erlang 进程在连接关闭时终止)-> 已解决

这是管理通道阈值的代码

public static class Factory
{
    private static IConnection sharedConnection_cl;
    private static int channelCount_int { get; set; }
    static Semaphore connectionAccessSemaphore_sem = new Semaphore(1, 1);
    public static IConnection SharedConnection_cl
    {
        get
        {
            if (sharedConnection_cl == null)
            {
                sharedConnection_cl = GetRabbitMqConnection();
                channelCount_int = 0;
            }
            connectionAccessSemaphore_sem.WaitOne();
            if (channelCount_int > 10000)
            {

                sharedConnection_cl.Close();
                sharedConnection_cl = GetRabbitMqConnection();
                channelCount_int = 0;
            }
            else
            {
                channelCount_int++;
            }
            connectionAccessSemaphore_sem.Release();
            return sharedConnection_cl;
        }
    }

现在......除了增加操作系统标准阈值(这只会将不可避免的阻塞的痛苦从几分钟......延长到几个小时)......

有没有什么好的做法来管理连接和通道,以避免遇到操作系统阈值和出现饱和趋势?

感谢您的支持。

【问题讨论】:

标签: .net multithreading erlang rabbitmq threshold


【解决方案1】:

好的,解决方案已经存在。只是向上移动信号量就可以了。我没有考虑到在系统重新启动时,当所有 appPools 结束时,我在连接实例分配上遇到了并发问题。 -> 已解决

public static class Factory{
private static IConnection sharedConnection_cl;
private static int channelCount_int { get; set; }
static Semaphore connectionAccessSemaphore_sem = new Semaphore(1, 1);
public static IConnection SharedConnection_cl
{
    get
    {
        connectionAccessSemaphore_sem.WaitOne();

        if (sharedConnection_cl == null)
        {
            sharedConnection_cl = GetRabbitMqConnection();
            channelCount_int = 0;
        }

        if (channelCount_int > 10000)
        {

            sharedConnection_cl.Close();
            sharedConnection_cl = GetRabbitMqConnection();
            channelCount_int = 0;
        }
        else
        {
            channelCount_int++;
        }
        connectionAccessSemaphore_sem.Release();
        return sharedConnection_cl;
    }
}

不知道为什么 AppPools 的锁会锁定服务器上的所有 TCP 连接。

【讨论】:

    猜你喜欢
    • 2016-12-19
    • 2021-04-21
    • 1970-01-01
    • 2013-09-03
    • 1970-01-01
    • 2011-08-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多