【问题标题】:Service Fabric Stateless Server Custom UDP ListenerService Fabric 无状态服务器自定义 UDP 侦听器
【发布时间】:2016-10-02 15:17:26
【问题描述】:

我们正在尝试定义一个 Service Fabric 无状态服务来侦听 UDP 数据。

我们正在与微软合作,他们说它是受支持的,我应该为 TCP 设置;下面是 ServiceManifest.xml 文件中的 sn-p:

<Resources>
    <Endpoints>
      <!-- This endpoint is used by the communication listener to obtain the port on which to 
           listen. Please note that if your service is partitioned, this port is shared with 
           replicas of different partitions that are placed in your code. -->
      <Endpoint Name="ServiceEndpoint" Protocol="tcp" Port="12345" Type="Input" />
    </Endpoints>
</Resources>

服务启动正常,但我无法让服务接收任何 UDP 数据,如果我执行netstat -a,我看不到任何在 TCP 或 UDP 端口上侦听的内容。

我在网上进行了大量研究,但在创建自定义 ICommunicationListener 方面没有发现太多,但我希望其他人能够验证这是否应该通过 SF 实现。

这是 ICommunicationListener 的实现:

public UdpCommunicationListener(string serviceEndPoint,
            ServiceInitializationParameters serviceInitializationParameters, Action<UdpReceiveResult> connector)
   {
       if (serviceInitializationParameters == null)
       {
           throw new ArgumentNullException(nameof(serviceInitializationParameters));
       }

       var endPoint = serviceInitializationParameters
            .CodePackageActivationContext
            .GetEndpoint(serviceEndPoint ?? "ServiceEndPoint");

       _connector = connector;

        _ipAddress = FabricRuntime.GetNodeContext().IPAddressOrFQDN;
        _port = endPoint.Port;

        _server = new UdpServer(_ipAddress, _port);

        _server.Open();
    }

    public Task<string> OpenAsync(CancellationToken cancellationToken)
    {
        _listener = _server.Listen(_connector);

        return Task.FromResult($"udp::{_ipAddress}:{_port}");
    }

    public Task CloseAsync(CancellationToken cancellationToken)
    {
        this.Abort();

        return Task.FromResult(true);
    }

    public void Abort()
    {
        _listener.Dispose();
        _server?.Close();
    }
}

public class UdpServer
{
    private readonly UdpClient _udpClient;
    private IObservable<UdpReceiveResult> _receiveStream;

    public UdpServer(string ipAddress, int port)
    {
        Id = Guid.NewGuid();

        _udpClient = new UdpClient(ipAddress, port);
    }

    public Guid Id { get; }

    public void Open()
    {
        _receiveStream = _udpClient.ReceiveStream().Publish().RefCount();
    }

    public void Close()
    {
        //TODO: Not sure how to stop the process
    }

    public IDisposable Listen(Action<UdpReceiveResult> process)
    {
        return _receiveStream.Subscribe(async r =>
        {
                process(r);
        });
    }
}

【问题讨论】:

  • 能否请您发布ICommunicationListener 代码?

标签: c# udp udpclient azure-service-fabric


【解决方案1】:

我有 Udp 作为无状态服务工作。这是代码:

UdpService.cs

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Fabric;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.ServiceFabric.Services.Communication.Runtime;
using Microsoft.ServiceFabric.Services.Runtime;

namespace UdpService
{
    /// <summary>
    /// An instance of this class is created for each service instance by the Service Fabric runtime.
    /// </summary>
    internal sealed class UdpService : StatelessService
    {
        private UdpCommunicationListener listener;

        public UdpService(StatelessServiceContext context)
            : base(context)
        { }

        protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
        {
            yield return new ServiceInstanceListener(initParams =>
            {
                this.listener = new UdpCommunicationListener();
                this.listener.Initialize(initParams.CodePackageActivationContext);

                return this.listener;
            });
        }
    }
}

UdpCommunicationListener

using System;
using System.Diagnostics;
using System.Fabric;
using System.Fabric.Description;
using System.Globalization;
using System.Net;
using System.Net.Sockets;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

using Microsoft.ServiceFabric.Services.Communication.Runtime;

namespace UdpService
{
    public class UdpCommunicationListener : ICommunicationListener, IDisposable
    {
        private readonly CancellationTokenSource processRequestsCancellation = new CancellationTokenSource();

        public int Port { get; set; }

        private UdpClient server;

        /// <summary>
        /// Stops the Server Ungracefully
        /// </summary>
        public void Abort()
        {
            this.StopWebServer();
        }

        /// <summary>
        /// Stops the Server Gracefully
        /// </summary>
        /// <param name="cancellationToken">Cancellation Token</param>
        /// <returns>Task for Asynchron usage</returns>
        public Task CloseAsync(CancellationToken cancellationToken)
        {
            this.StopWebServer();

            return Task.FromResult(true);
        }

        /// <summary>
        /// Free Resources
        /// </summary>
        public void Dispose()
        {
            this.Dispose(true);
            GC.SuppressFinalize(this);
        }

        /// <summary>
        /// Initializes Configuration
        /// </summary>
        /// <param name="context">Code Package Activation Context</param>
        public void Initialize(ICodePackageActivationContext context)
        {
            EndpointResourceDescription serviceEndpoint = context.GetEndpoint("ServiceEndpoint");
            this.Port = serviceEndpoint.Port;
        }

        /// <summary>
        /// Starts the Server
        /// </summary>
        /// <param name="cancellationToken">Cancellation Token</param>
        /// <returns>Task for Asynchron usage</returns>
        public Task<string> OpenAsync(CancellationToken cancellationToken)
        {
            try
            {
                this.server = new UdpClient(this.Port);
            }
            catch (Exception ex)
            {
            }

            ThreadPool.QueueUserWorkItem((state) =>
            {
                this.MessageHandling(this.processRequestsCancellation.Token);
            });

            return Task.FromResult("udp://" + FabricRuntime.GetNodeContext().IPAddressOrFQDN + ":" + this.Port);
        }

        protected void MessageHandling(CancellationToken cancellationToken)
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                IPEndPoint ipEndPoint = new IPEndPoint(IPAddress.Any, this.Port);
                byte[] receivedBytes = this.server.Receive(ref ipEndPoint);
                this.server.Send(receivedBytes, receivedBytes.Length, ipEndPoint);
                Debug.WriteLine("Received bytes: " + receivedBytes.Length.ToString());
            }
        }

        /// <summary>
        /// Receives the specified endpoint.
        /// </summary>
        /// <param name="endpoint">The endpoint.</param>
        /// <returns></returns>
        public Task<byte[]> Receive(ref IPEndPoint endpoint)
        {
            return Task.FromResult(this.server.Receive(ref endpoint));
        }

        /// <summary>
        /// Free Resources and Stop Server
        /// </summary>
        /// <param name="disposing">Disposing .NET Resources?</param>
        protected virtual void Dispose(bool disposing)
        {
            if (disposing)
            {
                if (this.server != null)
                {
                    try
                    {
                        this.server.Close();
                        this.server = null;
                    }
                    catch (Exception ex)
                    {
                        ServiceEventSource.Current.Message(ex.Message);
                    }
                }
            }
        }

        /// <summary>
        /// Stops Server and Free Handles
        /// </summary>
        private void StopWebServer()
        {
            this.processRequestsCancellation.Cancel();
            this.Dispose();
        }
    }
}

最后但并非最不重要的 ServiceManifest.xml

<?xml version="1.0" encoding="utf-8"?>
<ServiceManifest Name="UdpServicePkg"
                 Version="1.0.0"
                 xmlns="http://schemas.microsoft.com/2011/01/fabric"
                 xmlns:xsd="http://www.w3.org/2001/XMLSchema"
                 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <ServiceTypes>
    <!-- This is the name of your ServiceType. 
         This name must match the string used in RegisterServiceType call in Program.cs. -->
    <StatelessServiceType ServiceTypeName="UdpServiceType" />
  </ServiceTypes>

  <!-- Code package is your service executable. -->
  <CodePackage Name="Code" Version="1.0.0">
    <EntryPoint>
      <ExeHost>
        <Program>UdpService.exe</Program>
      </ExeHost>
    </EntryPoint>
  </CodePackage>

  <!-- Config package is the contents of the Config directoy under PackageRoot that contains an 
       independently-updateable and versioned set of custom configuration settings for your service. -->
  <ConfigPackage Name="Config" Version="1.0.0" />

  <Resources>
    <Endpoints>
      <!-- This endpoint is used by the communication listener to obtain the port on which to 
           listen. Please note that if your service is partitioned, this port is shared with 
           replicas of different partitions that are placed in your code. -->
      <Endpoint Name="ServiceEndpoint" Port="5555" />
    </Endpoints>
  </Resources>
</ServiceManifest>

【讨论】:

    【解决方案2】:

    我解决了 UdpServer 组件的一个缺陷,现在它可以在 Service Fabric 服务中托管。

    这行代码的问题:

    _udpClient = new UdpClient(ipAddress, port);
    

    这是监听流量的错误过载,它必须是:

    _udpClient = new UdpClient(port);
    

    我试过了:

    _udpClient = new UdpClient(new IPAddress(IPAddress.Parse(_ipAddress)),port)
    

    但这不起作用;由于检索主机的通信侦听(如其自身描述)的行返回主机名而不是 IPAddress,我认为您可以通过对清单进行一些更改来改变此行为,但现在仅端口就足够了。

    【讨论】:

    • 所以问题是您的 UdpServer 实际上没有绑定/监听?
    • 请同时提供解决方案,而不仅仅是您找到的事实。否则其他人将无法使用此问题/答案。
    • 请注意,从 SDK 2.1.150 开始,UDP 不完全支持,您需要通过批处理文件添加一些防火墙规则并在服务清单的 中触发它。
    【解决方案3】:

    由于仅支持协议 http/https 和 tcp。我猜你不能做 udp 协议之类的事情。 udp 不可靠。我们可以使用 SignalR,但我猜 Udp 不起作用。

    编辑:您可以在我的其他帖子中看到 Udp 现在正在运行。

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2016-01-19
    • 2017-03-07
    • 2017-12-13
    • 2017-08-20
    • 2016-04-22
    • 2016-11-16
    • 2016-07-13
    • 2017-03-09
    相关资源
    最近更新 更多