【问题标题】:WCF service, stream exception thrown due to "Unexpected end of file" for incoming streamWCF 服务,由于传入流的“文件意外结束”而引发流异常
【发布时间】:2011-10-04 13:34:14
【问题描述】:

我编写了一个交换传输代理,它将电子邮件正文上传到 WCF 服务。该服务与 Exchange 位于同一个盒子上,侦听 localhost:1530 以接收传入的 TCP 连接。传输代理使用 .NET 3.5 框架实现,服务自托管在 .NET 4.0 框架上实现的 Windows 服务中。

我的问题:为什么流在读取完成之前就终止了,我该如何解决?

服务合同定义如下:

[ServiceContract]
public interface IReceiveService
{
    [OperationContract]
    Guid ImportMessage(DateTime dateTimeReceived, string from, IList<string> to, string subject, int attachmentCount, bool bodyEmpty, Guid clientID);

    [OperationContract]
    void ImportMessageBody(IMessageBody body);

    [OperationContract]
    void ImportMessageAttachment(IMessageAttachment attachment);
}

更新:我重新订购了这个,以便人们更容易快速阅读问题,而不必阅读我的其余描述,这很长。第一点显示我如何启动Task 来处理请求似乎是问题所在。事实上,如果我注释掉 Task.Factory.StartNew() 部分,Stream.CopyTo 就可以了。

在我的服务实现中,我尝试使用Stream.CopyTo 将传入流复制到临时文件,如下所示:

public void ImportMessageBody(IMessageBody body)
{
    Task.Factory.StartNew(() =>
        {
            string fileName = GetFileNameFromMagicalPlace();
            using (FileStream fs = new FileStream(fileName, FileMode.Append))
            {
                body.Data.CopyTo(fs); // <---- throws System.IOException right here
            }
        });
}

异常错误为:“错误:读取流时抛出异常。”堆栈跟踪:

   at System.ServiceModel.Dispatcher.StreamFormatter.MessageBodyStream.Read(Byte[] buffer, Int32 offset, Int32 count) 
   at System.IO.Stream.CopyTo...

有一个内部异常:

System.Xml.XmlException: Unexpected end of file. Following elements are not closed: Address, IMessageBody, Body, Envelope. 
    at System.Xml.XmlExceptionHelper.ThrowXmlException(XmlDictionaryReader reader, String res, String arg1, String arg2, String arg3) 
    at System.Xml.XmlBufferReader.GetByteHard() 
    at System.Xml.XmlBufferReader.ReadMultiByteUInt31() 
    at System.Xml.XmlBinaryReader.ReadName(StringHandle handle) 
    at System.Xml.XmlBinaryReader.ReadNode() 
    at System.ServiceModel.Dispatcher.StreamFormatter.MessageBodyStream.Read(Byte[] buffer, Int32 offset, Int32 count) 

其他细节如下。


IMessageBody 是这样定义的:

[MessageContract]
public class IMessageBody
{
    [MessageHeader]
    public Guid MessageID { get; set; }

    [MessageHeader]
    public string Format { get; set; }

    [MessageBodyMember]
    public System.IO.Stream Data { get; set; }
}

显示相关位的传输代理的缩写版本(我希望):

public class Agent : RoutingAgent
{
    public delegate void PostingDelegate(MailItem item);
    IReceiveService service;    

    public Agent()
    {
        string tcpServiceUri = "net.tcp://localhost:1530";

        NetTcpBinding endpointBinding = new NetTcpBinding();
        endpointBinding.TransferMode = TransferMode.Streamed;

        ServiceEndpoint serviceEndpoint = new ServiceEndpoint(
            ContractDescription.GetContract(typeof(IReceiveService)),
            endpointBinding,
            new EndpointAddress(tcpServiceUri));
        ChannelFactory<IReceiveService> factory = new ChannelFactory<IReceiveService>(serviceEndpoint);
        service = factory.CreateChannel();

        this.OnSubmittedMessage += new SubmittedMessageEventHandler(Agent_OnSubmittedMessage);
    }

    void Agent_OnSubmittedMessage(SubmittedMessageEventSource source, QueuedMessageEventArgs e)
    {
        if (TheseAreTheDroidsImLookingFor(e))
        {
            PostingDelegate del = PostData;
            del.BeginInvoke(e.MailItem, CompletePost, GetAgentAsyncContext());
        }
    }

    void PostData(MailItem item)
    {
        // Body class is basically direct implementation of IMessageBody 
        // with a constructor to set up the public properties from MailItem.
        var body = new Body(item); 
        service.ImportMessageBody(body);
    }

    void CompletePost(IAsyncResult ar)
    {
        var context = ar.AsyncState as AgentAsyncContext;
        context.Complete();
    }
}

最后,服务实现是这样托管的:

string queueUri = String.Format("net.tcp://localhost:{0}/{1}", port, serviceName);
            try
            {
                host = new ServiceHost(typeof(ReceiveService), new Uri(queueUri));
                host.AddDefaultEndpoints();
                var endpoint = host.Description.Endpoints.First();
                ((NetTcpBinding)endpoint.Binding).TransferMode = TransferMode.Streamed;                 

                trace.Log(Level.Debug,String.Format("Created service host: {0}", host));                

                host.Open();                
                trace.Log(Level.Debug,"Opened service host.");
            }
            catch (Exception e)
            {
                string message = String.Format("Threw exception while attempting to create ServiceHost for {0}:{1}\n{2}", queueUri, e.Message, e.StackTrace);
                trace.Log(Level.Debug,message);
                trace.Log(Level.Error, message);
            }

【问题讨论】:

    标签: c# wcf stream


    【解决方案1】:

    好吧,代码太多了。您应该尝试将代码最小化为最小的可重现示例。我想问题可能在这里:

    public void ImportMessageBody(IMessageBody body)
    {
        Task.Factory.StartNew(() =>
            {
                string fileName = GetFileNameFromMagicalPlace();
                using (FileStream fs = new FileStream(fileName, FileMode.Append))
                {
                    body.Data.CopyTo(fs); // <---- throws System.IOException right here
                }
            });
    }
    

    默认应用OperationBehavior,此行为包含AutoDisposeParameter,默认设置为true,当操作结束时disposes all disposable parameters。因此,我希望在您的 Task 能够处理整个流之前,操作结束。

    【讨论】:

    • Ladislav,感谢您的回答(以及指向您其他答案的链接!)。我发布了这么多代码,因为不知道问题出在哪里,就我而言,这最小的可重现示例。
    • 这毕竟不是问题。我现在已经用[OperationBehavior(AutoDisposeparameters=false)] 装饰了我的ImportMessageBody 操作并实现了OperationCompleted 事件处理程序,但Stream.CopyTo 仍然以与以前相同的方式失败。
    • 看起来你走在了正确的轨道上,因为当我在ImportMessageBody 实现中同步执行Stream.CopyTo 时(即没有Task),它可以工作。
    • ....这很有意义,因为不需要像这样手动实现并发。 WCF 为您执行并发操作。我会将其标记为答案,因为它为我指明了正确的方向,即使它最终不是答案。
    【解决方案2】:

    问题本质上是我通过调用Task.Factory.StartNew 进行了不必要的并发,而不是通过ServiceBehaviorAttribute 确保我的服务正确配置为并发。感谢 Ladislav 为我指明了正确的方向。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2011-10-09
      • 2011-03-16
      • 2018-03-11
      • 2018-06-13
      • 2014-03-19
      • 1970-01-01
      • 2011-11-03
      相关资源
      最近更新 更多