【问题标题】:How to publish and subscribe to Message class messages using WCF如何使用 WCF 发布和订阅 Message 类消息
【发布时间】:2011-06-03 07:47:53
【问题描述】:

我有一个小型 WCF pub/sup 服务正在运行,远程客户端订阅和发送消息(尝试过各种复杂的对象)并且工作正常。所有接口都反映(ed)正在使用的对象的类型。切换到另一种对象类型需要调整接口以适应该对象类型。所有订阅者都会获得该消息的副本。

现在我正在尝试做同样的事情,但使用 Message 类消息。客户端创建一条新消息并将其对象封装在消息中,并将其发送到(远程)服务,在那里正确接收(检查对象)。但是,当服务器通过将消息重新发送(回调)回原始客户端进行回复时,客户端会收到以下消息:

“服务器没有提供有意义的回复;这可能是由于合同不匹配、会话过早关闭或内部服务器错误造成的。”

事件顺序(客户端):

客户端创建消息, (DuplexChannelFactory)AddMessage, - 捕捉上面的错误

事件序列(服务器):

服务主机收到消息, 检查消息(复制和重新创建), 执行回调, 没有错误。

切换回基本或用户定义的类型,所有问题都会消失。我已经为此苦苦挣扎了一个星期,并且没有接近任何解决方案。尝试操作标头、重新创建消息、切换到消息合同,并尝试解释跟踪日志的内容等。希望我能在这里找到一些答案。

使用的主要代码(去除了大部分错误处理):

客户端接口:

namespace WCFSQL
{
    public class ClientInterfaces
    {

        [ServiceContract(Namespace = "WCFServer", Name = "CallBacks")]
        public interface IMessageCallback
        {
            [OperationContract(Name = "OnMessageAdded", Action = "WCFServer/IMessageCallback/OnMessageAdded", IsOneWay = true)] 
            void OnMessageAdded(Message SQLMessage, DateTime timestamp);
        }

        [ServiceContract(Namespace = "WCFServer", CallbackContract = typeof(IMessageCallback))] 
        public interface IMessage
        {
            [OperationContract(Name = "AddMessage", Action = "WCFServer/IMessage/AddMessage")]
            void AddMessage(Message SQLMessage);

            [OperationContract(Name = "Subscribe", Action = "WCFServer/IMessage/Subscribe")]
            bool Subscribe();

            [OperationContract(Name = "Unsubscribe", Action = "WCFServer/IMessage/Unsubscribe")]
            bool Unsubscribe();
        }
    }
}

服务器接口:

namespace WCFSQL
{
    public class ServerInterfaces
    {
        [ServiceContract(Namespace = "WCFServer")]
        public interface IMessageCallback
        {
            [OperationContract(Name = "OnMessageAdded", Action = "WCFServer/IMessageCallback/OnMessageAdded", IsOneWay = true)]
            void OnMessageAdded(Message SQLMessage, DateTime timestamp); 
        }

        [ServiceContract(Namespace = "WCFServer", CallbackContract = typeof(IMessageCallback), SessionMode = SessionMode.Required)]
        public interface IMessage
        {
            [OperationContract(Name = "AddMessage", Action = "WCFServer/IMessage/AddMessage")]
            void AddMessage(Message SQLMessage);

            [OperationContract(Name = "Subscribe", Action = "WCFServer/IMessage/Subscribe")]
            bool Subscribe();

            [OperationContract(Name = "Unsubscribe", Action = "WCFServer/IMessage/Unsubscribe")]
            bool Unsubscribe();
        }
    }
}

消息创建:

// client proxy instance created and opened before

    public static bool WCFSqlLogger(string Program, WCFSQLErrorLogMessage SQLErrorMessage, WCFSqlClientProxy client)
    {
        MessageVersion ver = MessageVersion.CreateVersion(EnvelopeVersion.Soap12, AddressingVersion.WSAddressing10);

        Message Out = Message.CreateMessage(ver, "WCFServer/IMessage/AddMessage", SQLErrorMessage); 

        if (!client.SendMessage(Out))
        {
            Console.WriteLine("Client Main: Unable to send");
            return false;
        }
        return true;
    }

客户端代理:

[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, IncludeExceptionDetailInFaults = true)]
[CallbackBehavior(IncludeExceptionDetailInFaults = true, ConcurrencyMode = ConcurrencyMode.Single, UseSynchronizationContext = false)] 
public class WCFSqlClientProxy : ClientInterfaces.IMessageCallback, IDisposable
{
    public ClientInterfaces.IMessage pipeProxy = null;
    DuplexChannelFactory<ClientInterfaces.IMessage> pipeFactory;

    public bool Connect()
    {
        NetTcpBinding newBinding = new NetTcpBinding(SecurityMode.TransportWithMessageCredential);// NetTcpBinding newBinding = new NetTcpBinding(SecurityMode.Transport)
        newBinding.Security.Message.ClientCredentialType = MessageCredentialType.Certificate;
        EndpointAddress newEndpoint = new EndpointAddress(new Uri("net.tcp://host:8000/ISubscribe"), EndpointIdentity.CreateDnsIdentity("Domain")); 

        pipeFactory = new DuplexChannelFactory<ClientInterfaces.IMessage>(new InstanceContext(this), newBinding, newEndpoint); 

        pipeFactory.Credentials.Peer.PeerAuthentication.CertificateValidationMode = X509CertificateValidationMode.PeerOrChainTrust;
        pipeFactory.Credentials.ServiceCertificate.Authentication.RevocationMode = X509RevocationMode.NoCheck; 
        pipeFactory.Credentials.ClientCertificate.SetCertificate(StoreLocation.LocalMachine, StoreName.TrustedPeople, X509FindType.FindByThumbprint, "somestring");

        try
        {
            pipeProxy = pipeFactory.CreateChannel();
            pipeProxy.Subscribe();
            return true;
        }
        catch (Exception e)
        {
            Console.WriteLine("Error opening: {0}", e.Message);
            return false;
        }
    }
    public void Close()
    {
        pipeProxy.Unsubscribe();
    }

    public bool SendMessage(Message SQLMessage)
    {
        try
        {
            Console.WriteLine("Proxy Sending:");
            pipeProxy.AddMessage(SQLMessage); // This is where the eror occurs !!!!!!!!!!!!!!!!!!
            return true;
        }
        catch (Exception e)
        {
            Console.WriteLine("Client Proxy: Error sending: {0}", e.Message); 
        }
        return false;
    }

    public void OnMessageAdded(Message SQLMessage, DateTime timestamp) 
    {
        WCFSQLErrorLogMessage message = SQLMessage.GetBody<WCFSQLErrorLogMessage>();
        Console.WriteLine(message.LogProgram + ": " + timestamp.ToString("hh:mm:ss"));
    }

    public void Dispose()
    {
        Console.WriteLine("Dispose: Unsubscribe");
        pipeProxy.Unsubscribe();
    }
}

服务:

namespace WCFSQL
{

[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, IncludeExceptionDetailInFaults = true)]
[CallbackBehavior(IncludeExceptionDetailInFaults = true, ConcurrencyMode = ConcurrencyMode.Single, UseSynchronizationContext = false)] // or ConcurrencyMode.Reentrant
public class WCFSqlServerProxy : ServerInterfaces.IMessage
{
    private static List<ServerInterfaces.IMessageCallback> subscribers = new List<ServerInterfaces.IMessageCallback>(); 
    private static Uri target;
    private static ServiceHost serviceHost;

    public WCFSqlServerProxy(Uri Target) // Singleton
    {
        target = Target;
    }

    public bool Connect()
    {
        serviceHost = new ServiceHost(typeof(WCFSqlServerProxy), target);
        NetTcpBinding newBinding = new NetTcpBinding(SecurityMode.TransportWithMessageCredential);
        newBinding.Security.Transport.ClientCredentialType = TcpClientCredentialType.Certificate; 
        newBinding.Security.Message.ClientCredentialType = MessageCredentialType.Certificate; 
        serviceHost.Credentials.ClientCertificate.Authentication.RevocationMode = X509RevocationMode.NoCheck; // Non-domain members cannot follow the chain?
        serviceHost.Credentials.ServiceCertificate.SetCertificate(StoreLocation.LocalMachine, StoreName.TrustedPeople, X509FindType.FindByThumbprint, "somestring");
        serviceHost.Credentials.ClientCertificate.Authentication.CertificateValidationMode = X509CertificateValidationMode.PeerOrChainTrust;
        serviceHost.AddServiceEndpoint(typeof(ServerInterfaces.IMessage), newBinding, "ISubscribe");

        return true;
    }

    public bool Open()
    {
        serviceHost.Open();
    return true;
    }

    public bool Close()
    {
        serviceHost.Close();
        return true;
    }

    public bool Subscribe()
    {
        try
        {
            ServerInterfaces.IMessageCallback callback = OperationContext.Current.GetCallbackChannel<ServerInterfaces.IMessageCallback>();
            if (!subscribers.Contains(callback))
            {
                subscribers.Add(callback);
                return true;
            }
            else
            {
                return false;
            }
        }
        catch (Exception e)
        {
            return false;
        }
    }

    public bool Unsubscribe()
    {
        try
        {
            ServerInterfaces.IMessageCallback callback = OperationContext.Current.GetCallbackChannel<ServerInterfaces.IMessageCallback>(); 
            if (subscribers.Contains(callback))
            {
                subscribers.Remove(callback);
                return true;
            }
            return false;
        }
        catch (Exception e)
        {
            Console.WriteLine("WCFSqlServerProxy: Unsubscribe - Unsubscribe error {0}", e);
            return false;
        }
    }

    private string GetData()
    {
        MessageProperties messageProperties = ((OperationContext)OperationContext.Current).IncomingMessageProperties;
        RemoteEndpointMessageProperty endpointProperty = messageProperties[RemoteEndpointMessageProperty.Name] as RemoteEndpointMessageProperty;
        string computerName = null;
        try
        {
            string[] computer_name = Dns.GetHostEntry(endpointProperty.Address).HostName.Split(new Char[] { '.' });
            computerName = computer_name[0].ToString();
        }
        catch (Exception e)
        {
            computerName = "NOTFOUND";
            Console.WriteLine("WCFSqlServerProxy: Hostname error:  {0}", e);
        }
        return string.Format("{0} - {1}:{2}", computerName, endpointProperty.Address, endpointProperty.Port);
    }


    public void AddMessage(Message SQLMessage) //Go through the list of connections and call their callback funciton
    {
        subscribers.ForEach(delegate(ServerInterfaces.IMessageCallback callback)
        {
            if (((ICommunicationObject)callback).State == CommunicationState.Opened)
            {
                MessageVersion ver = MessageVersion.CreateVersion(EnvelopeVersion.Soap12, AddressingVersion.WSAddressing10);
                MessageBuffer buffer = SQLMessage.CreateBufferedCopy(4096);

                Message msgCopy = buffer.CreateMessage();
                //System.Xml.XmlDictionaryReader xrdr = msgCopy.GetReaderAtBodyContents();

                WCFSQLErrorLogMessage p = msgCopy.GetBody<WCFSQLErrorLogMessage>();

                SQLMessage = buffer.CreateMessage();
                buffer.Close();

                Message In = Message.CreateMessage(ver, "WCFServer/IMessage/AddMessage", p); // Tried recreating messsage, with same results

                //Console.WriteLine("Message: Header To:      {0}", In.Headers.To);
                //Console.WriteLine("Message: Header From:    {0}", In.Headers.From);
                //Console.WriteLine("Message: Header Action:  {0}", In.Headers.Action);
                //Console.WriteLine("Message: Header ReplyTo: {0}", In.Headers.ReplyTo);
                //Console.WriteLine("Message: IsFault:        {0}", In.IsFault);
                //Console.WriteLine("Message: Properties      {0}", In.Properties);
                //Console.WriteLine("Message: State           {0}", In.State);
                //Console.WriteLine("Message: Type            {0}", In.GetType());
                //Console.WriteLine("Proxy Sending: Copy created");

                //Console.WriteLine("Remote:  {0}, Hash: {1}", GetData(), callback.GetHashCode());

                callback.OnMessageAdded(SQLMessage, DateTime.Now); // This should echo the message back with a timeslot.
            }
            else
            {
                Console.WriteLine("WCFSqlServerProxy:addmessage connected state: {0}", ((ICommunicationObject)callback).State == CommunicationState.Opened);
                subscribers.Remove(callback);
            }
        });
    }
}

【问题讨论】:

    标签: wcf class message publish-subscribe


    【解决方案1】:

    我刚刚在 Microsoft WCF 论坛上从 Tanvir Huda 那里得到了我的问题的答案。

    “在操作中使用消息类

    您可以将 Message 类用作操作的输入参数、操作的返回值或两者兼而有之。如果 Message 在操作中的任何位置使用,则适用以下限制:

    •该操作不能有任何outorref参数。

    •输入参数不能超过一个。如果参数存在,则它必须是 Message 或消息协定类型。

    •返回类型必须是 void、Message 或消息协定类型。"

    我不敢相信我错过了;必须至少阅读了三遍,但从未将这些规则应用于回调。我描述的接口中的回调确实有一个返回类型 void,但它有一个 Message 和一个 DateTime 参数。

    删除 DateTime 参数后,回调确实(尝试)重新序列化原始消息,但由于无效操作而失败(仍然为 AddMessage 设置操作,而现在应该是 OnMessageAdded)。将回调上的操作更改为 Action="*" 后,它工作得很好。一个(可能很烦人)详细说明我并不真正需要回调的 Message 类型,但我很沮丧我没有让它工作

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-03-01
      • 2011-11-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-09-18
      相关资源
      最近更新 更多