【问题标题】:How to return stream from WCF service?如何从 WCF 服务返回流?
【发布时间】:2012-07-20 15:29:51
【问题描述】:

我正在使用 protobuf-net 和 WCF。这是我创建的代码:

public class MobileServiceV2
{
    [WebGet(UriTemplate = "/some-data")]
    [Description("returns test data")]
    public Stream GetSomeData()
    {
        WebOperationContext.Current.OutgoingResponse.ContentType = "application/x-protobuf";

        var ms = new MemoryStream();
        ProtoBuf.Serializer.Serialize(ms, new MyResponse { SomeData = "Test data here" });
        return ms;
    }
}

[DataContract]
public class MyResponse
{
    [DataMember(Order = 1)] 
    public string SomeData { get; set; }
}

当我查看 Fiddler 时 - 我可以看到正确的传出内容类型并且一切看起来都不错,但我得到的响应是空的。 IE 提示下载文件,此文件为空。序列化器不工作吗?还是我做的不对?

编辑:

我在方法中添加了以下代码,是的,它可以正确序列化。我从 WCF 返回流的方式有问题..

using (var file = File.Create("C:\\test.bin"))
        {
            Serializer.Serialize(file, new MyResponse { SomeData = "Test data here" });
        }

【问题讨论】:

    标签: .net wcf stream protobuf-net


    【解决方案1】:

    只需写入 MemoryStream,然后倒带即可。在这种情况下不要 Dispose() 它:

    var ms = new MemoryStream();
    Serializer.Serialize(ms, obj);
    ms.Position = 0;
    return ms;
    

    但是,这确实意味着它在内存中缓冲。我可以尝试并想出一些巫术来避免这种情况,但这会非常复杂。

    【讨论】:

    • 那么包装它一个using 语句是否会导致它在Stream 被传输之前处理?我收到“底层连接已关闭:连接已意外关闭”异常。
    【解决方案2】:

    如果我正确理解了这个问题,那么您正在尝试使用流式 WCF 绑定。在这种情况下,您可以尝试将数据拆分为单独序列化的块,并在客户端以相同的方式反序列化。唯一需要注意的是接收端 WCF 提供的 Stream 实现 - 您需要自己包装它并管理读取。以下是我用来促进这一点的一个类:

        public static class StreamingUtility
    {
    
        public static IEnumerable<T> FromStream<T>(this Stream value, Action<T> perItemCallback = null)
        {
    
            List<T> result = new List<T>();
            StreamProxy sp = new StreamProxy(value);
            try
            {
                while (sp.CanRead)
                {
    
                    T v = ProtoBuf.Serializer.DeserializeWithLengthPrefix<T>((Stream)sp, ProtoBuf.PrefixStyle.Base128);
                    if (perItemCallback != null)
                        perItemCallback(v);
    
                    result.Add(v);
                }
            }
            catch { }
    
            return result;
        }
    
        public static StreamingContent<T> SingleToStream<T>(this T value)
        {
            return new StreamingContent<T>(new T[] { value });
        }
    
        public static StreamingContent<T> ToStream<T>(this IEnumerable<T> value)
        {
            return new StreamingContent<T>(value);
        }
    
        public class StreamingContent<T> : Stream
        {
            private bool _canRead = true;
            private ManualResetEventSlim _dataIsReady = new ManualResetEventSlim(false);
            private bool _noMoreAdditions = false;
            private long _readingOffset = 0;
    
            //private IFormatter _serializer = new BinaryFormatter(null, new StreamingContext(StreamingContextStates.CrossMachine));
            private IEnumerable<T> _source = null;
    
            private MemoryStream _stream = new MemoryStream();
    
            public static StreamingContent<T> Clone(Stream origin)
            {
                return new StreamingContent<T>(origin);
            }
    
            private StreamingContent(Stream origin)
            {
                byte[] b = new byte[65536];
    
                while (true)
                {
                    int count = origin.Read(b, 0, b.Length);
    
                    if (count > 0)
                    {
                        _stream.Write(b, 0, count);
                    }
                    else
                        break;
                }
                _noMoreAdditions = true;
            }
    
            public StreamingContent(IEnumerable<T> source)
            {
                if (!s_initialized)
                {
                    StreamingUtility.Initialize();
    
                    StreamingUtility.s_initialized = true;
                }
    
                _source = source.ToList();
                if (source.Count() > 0)
                {
                    new Thread(new ParameterizedThreadStart(obj =>
                    {
                        StreamingContent<T> _this = obj as StreamingContent<T>;
                        foreach (T item in _this._source)
                        {
                            lock (_this._stream)
                            {
                                if (_this._noMoreAdditions) break;
                                _stream.Seek(0, SeekOrigin.End);
    
                                ProtoBuf.Serializer.SerializeWithLengthPrefix<T>(_this._stream, item, ProtoBuf.PrefixStyle.Base128);
    
                                //_serializer.Serialize(_this._stream, item);
                                _dataIsReady.Set();
                            }
                        }
    
                        lock (_this._stream)
                        {
                            _this._noMoreAdditions = true;
                            _dataIsReady.Set();
                        }
                    })) { IsBackground = true }.Start(this);
                }
                else
                {
                    _canRead = false;
                }
            }
    
            public override bool CanRead
            {
                get { return _canRead; }
            }
    
            public override bool CanSeek
            {
                get { return false; }
            }
    
            public override bool CanWrite
            {
                get { return false; }
            }
    
            public override long Length
            {
                get
                {
                    while (!_noMoreAdditions) Thread.Sleep(20);
                    return _stream.Length;
                }
            }
    
            public override long Position
            {
                get
                {
                    throw new Exception("This stream does not support getting the Position property.");
                }
                set
                {
                    throw new Exception("This stream does not support setting the Position property.");
                }
            }
    
            public override void Close()
            {
                lock (_stream)
                {
                    _noMoreAdditions = true;
                    _stream.Close();
                }
            }
    
            public override void Flush()
            {
            }
    
            public override int Read(byte[] buffer, int offset, int count)
            {
                if (!CanRead) return 0;
    
                bool wait = false;
    
                lock (_stream)
                {
                    wait = !_dataIsReady.IsSet && !_noMoreAdditions;
                }
    
                if (wait)
                {
                    _dataIsReady.Wait();
                }
    
                lock (_stream)
                {
                    if (!_noMoreAdditions)
                        _dataIsReady.Reset();
    
                    if (_stream.Length > _readingOffset)
                    {
                        _stream.Seek(_readingOffset, SeekOrigin.Begin);
                        int res = _stream.Read(buffer, 0, count);
    
                        if (_noMoreAdditions && count + _readingOffset >= _stream.Length)
                            _canRead = false;
    
                        _readingOffset += res;
    
                        return res;
                    }
                }
    
                return 0;
            }
    
            public override long Seek(long offset, SeekOrigin origin)
            {
                throw new Exception("This stream does not support seeking.");
            }
    
            public override void SetLength(long value)
            {
                throw new Exception("This stream does not support setting the Length.");
            }
    
            public override void Write(byte[] buffer, int offset, int count)
            {
                throw new Exception("This stream does not support writing.");
            }
    
            protected override void Dispose(bool disposing)
            {
                try
                {
                    lock (_stream)
                    {
                        _noMoreAdditions = true;
                        _stream.Close();
                    }
                }
                catch { }
            }
        }
    
        private class StreamProxy : Stream
        {
            private bool _canRead = true;
            private bool _endOfMessage = false;
            private Stream _internalStream;
            private int _readPosition = 0;
            private MemoryStream _storage = new MemoryStream();
            private int _writePosition = 0;
    
            public StreamProxy(Stream internalStream)
            {
                _internalStream = internalStream;
                byte[] initialRequest = new byte[1000];
    
                int length = _internalStream.Read(initialRequest, 0, 1000);
    
                if (length != 0)
                    _storage.Write(initialRequest, 0, length);
                else
                    _canRead = false;
    
                _writePosition = length;
            }
    
            public override bool CanRead
            {
                get { return _canRead; }
            }
    
            public override bool CanSeek
            {
                get { return false; }
            }
    
            public override bool CanWrite
            {
                get { return false; }
            }
    
            public override long Length
            {
                get { throw new NotImplementedException(); }
            }
    
            public override long Position
            {
                get
                {
                    return _readPosition;
                }
                set
                {
                    throw new NotImplementedException();
                }
            }
    
            public override void Flush()
            {
            }
    
            public override int ReadByte()
            {
                byte[] res = new byte[1];
                int g = Read(res, 0, 1);
                return res[0];
            }
    
            public override int Read(byte[] buffer, int offset, int count)
            {
    
    
                int res = 0;
                if (_readPosition + count > _writePosition)
                {
                    /// add extra bytes to see if more data is available and we need to allow next read
                    int readSize = _readPosition + count - _writePosition;
    
                    if (readSize < 1024)
                        readSize = 1024;
    
                    byte[] read = new byte[readSize];
                    res = _internalStream.Read(read, 0, readSize);
                    if (res > 0)
                    {
                        _storage.Seek(_writePosition, SeekOrigin.Begin);
                        _writePosition += res;
                        _storage.Write(read, 0, res);
    
                    }
                    else if (res == 0)/// If the read returned 0, we're at the end
                    {
                        _endOfMessage = true;
    
                    }
    
                    if (res > 0 && res < readSize)
                    {
                        read = new byte[1024];
                        res = _internalStream.Read(read, 0, 1024);
                        if (res > 0)
                        {
                            _storage.Seek(_writePosition, SeekOrigin.Begin);
                            _writePosition += res;
                            _storage.Write(read, 0, res);
    
                        }
                        else if (res == 0)/// If the read returned 0, we're at the end
                        {
                            _endOfMessage = true;
    
                        }
                    }
                }
    
                _storage.Seek(_readPosition, SeekOrigin.Begin);
                res = _storage.Read(buffer, offset, count);
                _readPosition += res;
    
    
                /// If end of message was reached and all the data was read from the
                /// internal storage, mark CanRead as false
                if (_readPosition >= _writePosition && _endOfMessage)
                    _canRead = false;
    
                return res;
            }
    
            public override long Seek(long offset, SeekOrigin origin)
            {
                throw new NotImplementedException();
            }
    
            public override void SetLength(long value)
            {
                throw new NotImplementedException();
            }
    
            public override void Write(byte[] buffer, int offset, int count)
            {
                throw new NotImplementedException();
            }
        }
    }
    

    要使用它,只需像这样调用 WCF 接口方法:

    IEnumerable<SomeType> collection = ...
    clannel.Method(collection.ToStream());
    

    并像这样在接收端读取它:

    public void Method(Stream stream){
       IEnumerable<SomeType> coll = stream.FromStream<SomeType>();
    }
    

    此实现仍在测试中,因此我将不胜感激。

    【讨论】:

      【解决方案3】:

      请试试这个方法

      var ms = new MemoryStream();
      using (var file = File.Create("C:\\test.bin"))
      {
      Serializer.Serialize(file, new MyResponse { SomeData = "Test data here" });
      file.CopyTo(ms); 
      }
      
      return ms;//stream
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2016-06-17
        • 2012-12-30
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多