【问题标题】:How to clear the outgoing message buffers at the Server?如何清除服务器上的传出消息缓冲区?
【发布时间】:2012-06-07 12:36:46
【问题描述】:

我使用PollingDuplexHttpBinding 编写了一个服务,它有一个使用它的 Silverllight 客户端。我的服务基本上有给定数量的客户端向服务发送他们的数据(经常是每秒,并且数据非常大,每次调用大约 5KB),并监听其他客户端向服务发送的新数据被路由到他们,非常类似于聊天室架构。

我注意到的问题是,当客户端通过 Internet 连接到服务时,几分钟后服务的响应变慢并且回复变得滞后。我得出的结论是,当服务主机的上传容量达到时(互联网上传速度,服务器上只有大约 15KB/s),其他客户端发送的消息会在有可用带宽时进行相应的缓冲和处理。我想知道如何才能限制服务用来存储从客户端接收到的消息的缓冲区的占用?我的客户获得所有数据并不是那么重要,而是他们获得其他人发送的最新数据,所以实时连接是我正在寻找的,以保证交付为代价。

简而言之,我希望能够在服务填满时清理我的队列/缓冲区,或者达到某个上限,然后开始用收到的呼叫再次填充它以消除延迟。我该怎么做呢? MaxBufferSize 属性是我需要在服务端和客户端减少的吗?或者我是否需要在我的服务中编写此功能?有什么想法吗?

谢谢。

编辑:

这是我的服务架构:

//the service
[ServiceContract(Namespace = "", CallbackContract = typeof(INewsNotification))]
[AspNetCompatibilityRequirements(RequirementsMode = AspNetCompatibilityRequirementsMode.Allowed)]
[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Multiple, InstanceContextMode = InstanceContextMode.Single)]
public class NewsService
{


private static Dictionary<IChatNotification, string> clients = new Dictionary<IChatNotification, string>();
private ReaderWriterLockSlim subscribersLock = new ReaderWriterLockSlim();

[OperationContract(IsOneWay = true)]
public void PublishNotifications(byte[] data)
{
            try
            {
                subscribersLock.EnterReadLock();
                List<INewsNotification> removeList = new List<INewsNotification>();
                lock (clients)
                {
                    foreach (var subscriber in clients)
                    {
                        if (OperationContext.Current.GetCallbackChannel<IChatNotification>() == subscriber.Key)
                        {
                            continue;
                        }
                        try
                        {
                            subscriber.Key.BeginOnNotificationSend(data, GetCurrentUser(), onNotifyCompletedNotificationSend, subscriber.Key);

                        }
                        catch (CommunicationObjectAbortedException)
                        {
                            removeList.Add(subscriber.Key);
                        }
                        catch (CommunicationException)
                        {
                            removeList.Add(subscriber.Key);
                        }
                        catch (ObjectDisposedException)
                        {
                            removeList.Add(subscriber.Key);
                        }

                    }
                }

                foreach (var item in removeList)
                {
                    clients.Remove(item);
                }
            }
            finally
            {
                subscribersLock.ExitReadLock();
            }
        }

}


//the callback contract
[ServiceContract]
public interface INewsNotification
{
       [OperationContract(IsOneWay = true, AsyncPattern = true)]
       IAsyncResult BeginOnNotificationSend(byte[] data, string username, AsyncCallback callback, object asyncState);
       void EndOnNotificationSend(IAsyncResult result);
}

服务配置:

  <system.serviceModel>
    <extensions>
      <bindingExtensions>
        <add name="pollingDuplex" type="System.ServiceModel.Configuration.PollingDuplexHttpBindingCollectionElement, System.ServiceModel.PollingDuplex, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35" />
      </bindingExtensions>
    </extensions>
    <behaviors>
      <serviceBehaviors>
        <behavior name="">

          <serviceMetadata httpGetEnabled="true" />
          <serviceThrottling maxConcurrentSessions="2147483647" />
          <serviceDebug includeExceptionDetailInFaults="true" />
        </behavior>
      </serviceBehaviors>
    </behaviors>
    <bindings>
      <pollingDuplex>

        <binding name="myPollingDuplex" duplexMode="SingleMessagePerPoll" 
                 maxOutputDelay="00:00:00" inactivityTimeout="02:00:00" 
                 serverPollTimeout="00:55:00" sendTimeout="02:00:00"  openTimeout="02:00:00" 
                  maxBufferSize="10000"  maxReceivedMessageSize="10000" maxBufferPoolSize="1000"/>

      </pollingDuplex>
    </bindings>
    <serviceHostingEnvironment aspNetCompatibilityEnabled="true" multipleSiteBindingsEnabled="true" />
    <services>
      <service name="NewsNotificationService.Web.NewsService">
        <endpoint address="" binding="pollingDuplex" bindingConfiguration="myPollingDuplex" contract="NewsNotificationService.Web.NewsService" />
        <endpoint address="mex" binding="mexHttpBinding" contract="IMetadataExchange" />
      </service>
    </services>
  </system.serviceModel>
    <system.webServer>
        <directoryBrowse enabled="true" />
    </system.webServer>
</configuration>

客户端通常会在 500 毫秒到 1000 毫秒之间调用服务,如下所示:

_client.PublishNotificationAsync(byte[] data);

并且回调将通知客户端其他客户端发送的通知:

void client_NotifyNewsReceived(object sender, NewsServiceProxy.OnNewsSendReceivedEventArgs e)
        {
                e.Usernamer//WHich client published the data
                e.data//contents of the notification
        }

所以回顾一下,当客户端数量增加,并且服务主机通过互联网上传速度受到限制时,服务发送给订阅者的消息会在某个地方缓冲并在队列中处理,这就是导致问题, 我不知道这些消息在哪里缓冲。在 LAN 中,该服务运行良好,因为服务器的上传速度等于其下载速度(对于 100KB/s 的传入呼叫,它发出 100KB/s 的通知)。这些消息在哪里缓冲?我怎样才能清除这个缓冲区?

我做了一些实验来尝试查看消息是否在服务中缓冲,我尝试在客户端上调用此方法,但它总是返回 0,即使一个客户端仍在接收其他人的通知4-5 分钟前发送:

[OperationContract(IsOneWay = false)]
public int GetQueuedMessages()
{

            return OperationContext.Current.OutgoingMessageHeaders.Count();
}

【问题讨论】:

  • 我认为我可能遇到了问题。我想我正在寻找错误问题的解决方案。我有一种预感,这是客户端的线程问题。当客户端进行服务调用时,调用是在 UI 线程上进行的,这会导致延迟,这就是我认为的问题所在。 “队列”实际上发生在客户端计算机上,并且该队列是发送到服务的内容。我现在正在处理这个,我会告诉你的。感谢大家的努力。

标签: wcf performance silverlight buffer pollingduplexhttpbinding


【解决方案1】:

我对你的情况做了一些计算。

  • 消息大小 = 100KB
  • 上传通道 = 15KB/s
  • 下载通道 = 100KB/s
  • 客户端每秒调用服务 1-2 次​​li>

客户端通常会在 500 毫秒到 1000 毫秒之间调用服务

这是正确的吗?

对于一个客户端,仅消息的下载流量将为 100-200KB/s,这只是消息正文。将有更多的标题和更多的启用安全性。

消息将被合并以进行异步调用。因此,如果我们有 3 个客户端,并且每个发送的消息回调包含每个客户端的 2 条消息。 4 个客户端 - 每个回调中有 3 条消息。

对于 3 个客户端,下载通道为 200-400KB/s。

对我来说,对于您声明的带宽而言,消息似乎太大了。

检查是否可以:

  1. 减小消息大小。我不了解您的业务性质,因此无法在此提供建议。

  2. 对消息或流量使用压缩。

  3. 增加网络带宽。没有那个即使是理想的解决方案 有非常高的延迟。您可以花费数天和数周的时间来优化您的 代码,即使您正确使用您的网络解决方案仍然可以 慢慢来。

我知道这听上去很像显而易见的船长,但有些问题没有正确答案,除非你改变问题。

执行上述步骤后,将ServiceThrottlingBehavior 与将管理回调队列的自定义代码结合使用。

ServiceThrottlingBehavior 在达到边界时拒绝请求。

http://msdn.microsoft.com/en-us/library/ms735114(v=vs.100).aspx

这是一个简单的示例。实数应针对您的环境专门定义。

<serviceThrottling 
 maxConcurrentCalls="1" 
 maxConcurrentSessions="1" 
 maxConcurrentInstances="1"
/>

更新:

我在这个问题上犯了很大的错误,每次调用是 5KB/s,

即使是 5KB 的消息,15KB/s 也是不够的。我们只计算 3 个用户的流量。 3 每秒传入消息。系统现在应该使用双工来通知用户其他人发送了什么。每个用户都应该从他的合作伙伴那里得到消息。我们总共有 3 条消息。一个(属于发件人的)可能会被跳过,因此每个用户都应该收到 2 条消息。 3 个用户将获得 10KB(5KB + 5KB = 一条消息 10KB)= 30KB。 每秒一条消息将在上传通道中为 3 个用户产生 30KB/秒的速度。

这些消息在哪里缓冲?我怎样才能清除这个 缓冲区?

这取决于您如何托管您的服务。如果它是自托管服务,则根本没有缓冲。您的代码试图向接收者发送消息,但由于通道被淹没,它变得非常慢。使用 IIS 可能会有一些缓冲,但从外部触摸它不是一个好习惯。

正确的解决方案是节流。由于带宽限制,您不应尝试将所有消息发送给所有客户端。相反,您应该例如限制发送消息的线程数量。因此,从上面的 3 个用户而不是并行发送 3 条消息,您可以按顺序发送它们,或者决定只有一个用户将在该轮中获得更新,下一个用户将在下一轮获得更新。

所以一般的想法是不要在你有数据后立即将所有内容发送给每个人,而是只发送你能负担得起的数据量,并使用线程数或响应速度来控制它。

【讨论】:

  • 我经常使用这些设置。我会再看看,让你知道情况如何,谢谢。
  • +1 我认为这是一个非常好的解决方案。如果服务器太忙,拒绝客户端,让客户端处理。
  • 我试过了,我将这些值全部设置为 1,但即使是现在,过了一段时间(大约 5 分钟),当我有 2 个以上的客户使用该服务时,呼叫开始滞后并迟到。另外,我注意到许多人说您应该将这些值设置为最大值以提高服务性能。无论如何,我仍然不知道这些消息在哪里排队,也许是在互联网上的“某个地方”。
  • @ClunkyCoder 请阅读我的回答。如果您有 15KB/s 的通道,则无法发送 >100 KB/s。那是你的队列。这在物理上是不可能的。你的路上遇到了巨大的交通堵塞,你问“哪辆车应该先离开车库?”。实际上没关系。所有汽车都会堵车。
  • @dharnitski,你是对的。我在这个问题上犯了一个大错误,每次调用都是 5KB/s,我测量的 100KB/s 不正确,我不知道我从哪里得到的 :@ 正如你提到的,即使是 1 个客户端也无法发送数据如果每个调用的大小为 100KB,则需要几秒钟(服务器上需要 100-200KB/s 的上传速度而不是 15!)这也发生在 LAN 上,但不那么频繁。我刚刚用 3 台计算机再次测试了我的服务,一段时间后消息排队很严重,但它比互联网上的明显少得多。我确信这不完全是带宽问题。
【解决方案2】:

MaxBufferSize 不会帮你解决这个问题。您将不得不自己编写代码,我不知道任何现有的解决方案/框架。然而,这听起来确实是一个有趣的问题。您可以首先为每个连接的客户端维护一个 Queue&lt;Message&gt;,然后在推送到此队列时(或当客户端调用 dequeue 时),您可以重新评估应该发送什么 Message

更新:首先,我会忘记尝试从客户端和配置中执行此操作,您将不得不自己编写代码

在这里我可以看到您发送给客户的位置:

 subscriber.Key.BeginOnNotificationSend(data, GetCurrentUser(), onNotifyCompletedNotificationSend, subscriber.Key);

因此,您应该将它们推送到Queue&lt;byte[]&gt;,而不是将这些通知异步推送到每个客户端。每个客户端连接都有自己的队列,您可能应该构造一个专用于每个客户端连接的类:

请注意,此代码不会开箱即用,并且可能存在一些逻辑错误,仅供参考

public class ClientConnection
{
    private INewsNotification _callback;
    private Queue<byte> _queue = new Queue<byte>();
    private object _lock = new object();
    private bool _isSending
    public ClientConnection(INewsNotification callBack)
    {
           _callback=callback;
    }
    public void Enqueue(byte[] message)
    { 
       lock(_lock)
       {               
           //what happens here depends on what you want to do. 
           //Do you want to only send the latest message? 
           //if yes, you can just clear out the queue and put the new message in there,                         
           //or you could keep the most recent 5 messages.                
           if(_queue.Count > 0)
               _queue.Clear();
          _queue.Enqueue(message);
           if(!_isSending)
               BeginSendMessage();
       }
    }

    private void BeginSendMessage()
    {
       _isSending=true;
        _callback.BeginOnNotificationSend(_queue.Dequeue(), GetCurrentUser(), EndCallbackClient, subscriber.Key);

    }

    private void EndCallbackClient(IAsyncResult ar)
    {
        _callback.EndReceive(ar);
        lock(_lock)
        {
           _isSending=false;
           if(_queue.Count > 0)
              BeginSendMessage();//more messages to send
        }
    }
}

想象一个场景,其中一条消息被推送到客户端,同时又发送 9 条消息调用ClientConnection.Enqueue。当第一条消息完成后,它会检查应该只包含最后一条(第 9 条消息)的队列

【讨论】:

  • 谢谢,我开始做类似this suggestion的事情,在服务上写一个方法返回OperationContext.Current.OutgoingMessageProperties.Count(),然后我在客户端调用它,它一直返回0,即使有在发送方停止发送一段时间后,仍有 100 条消息从缓冲区转发到客户端。知道这个属性实际上是什么吗?
  • 为什么需要这个属性?你如何从服务器向客户端发送消息(发布一些代码)
猜你喜欢
  • 1970-01-01
  • 2021-06-05
  • 1970-01-01
  • 2023-03-22
  • 1970-01-01
  • 1970-01-01
  • 2020-05-16
  • 2013-03-02
  • 2013-05-28
相关资源
最近更新 更多