【问题标题】:Determine if PipeStream has data判断 PipeStream 是否有数据
【发布时间】:2015-04-03 17:58:25
【问题描述】:

我正在使用命名管道在应用程序之间传递一些消息。

我的管道客户端使用PipeStream.ReadByte() 读取从服务器发送的消息。这会阻塞它正在运行的线程。

这种行为通常很好,但是,我想不出在线程被阻塞时处理该线程的好方法。当客户端退出时,该线程保持活动状态,直到流获得一些数据并允许再次检查循环变量。在服务器静默的情况下,线程只是活着。

Thread.Abort() 无效。

在转移到ReadByte() 方法之前,有没有办法从数据中检查流? PipeStream.Length 抛出UnsupportedException

【问题讨论】:

  • 为什么不使用取消令牌从流中异步读取 (Stream.ReadAsync)。当客户端退出时,在关联的CancelationTokenSource 上调用Cancel 以取消挂起的IO。另外,为什么不为您的 IO 绑定操作使用异步任务而不是专用线程。参见例如我昨天在这里遇到的一个问题的类似问题/解决方案:stackoverflow.com/a/29423042/2573395
  • 请注意,优化管道的引用解决方案可能是有意义的(即等待连​​接等)。
  • 我刚才留下了一个答案。您是否愿意对此发表评论,或者我应该删除它,因为您已经找到了不同的解决方案?如果是这样,您能否将其发布为答案。

标签: c# multithreading named-pipes


【解决方案1】:

一种解决方案可能是使用异步 I/O 和任务而不是专用线程,并使用 CancelationToken 中止挂起的 IO。下面的代码 sn-p 显示了一个简化的示例,它基于此答案 https://stackoverflow.com/a/29423042/2573395,但针对管道操作稍作修改(在这种情况下异步等待每个消息的连接)。请注意,我不是命名管道方面的专家,因此可能有更优化的方法。

public class MyPipeReader
{
    private string _pipeName;

    public MyPipeReader(string pipeName)
    {
        _pipeName = pipeName;
    }

    public async Task ReceiveAndProcessStuffUntilCancelled(CancellationToken token)
    {
        var received = new byte[4096];
        while (!token.IsCancellationRequested)
        {
            using (var stream = CreateStream())
            {
                try
                {
                    if (await WaitForConnection(stream, token))
                    {
                        var bytesRead = 0;
                        do {
                            bytesRead = await stream.ReadAsync(received, 0, 4096, token).ConfigureAwait(false);
                        } while (bytesRead > 0 && DoMessageProcessing(received, bytesRead));

                        if (stream.IsConnected)
                            stream.Disconnect();
                    }
                }
                catch (OperationCanceledException)
                {
                    break; // operation was canceled.
                }
                catch (Exception e)
                {
                    // report error & decide if you want to give up or retry.
                }
            }
        }
    }

    private NamedPipeServerStream CreateStream()
    {
        return new NamedPipeServerStream(_pipeName, PipeDirection.In, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous);
    }

    private async Task<bool> WaitForConnection(NamedPipeServerStream stream, CancellationToken token)
    {
        // We can't simply use 'Task.Factory.FromAsync', as that is not cancelable.
        var tcs = new TaskCompletionSource<bool>();
        var cancelRegistration = token.Register(() => tcs.SetCanceled());
        var iar = stream.BeginWaitForConnection(null, null);
        var rwh = ThreadPool.RegisterWaitForSingleObject(iar.AsyncWaitHandle, delegate { tcs.TrySetResult(true); }, null, -1, true);
        try
        {
            await tcs.Task.ConfigureAwait(false);
            if (iar.IsCompleted) {
                stream.EndWaitForConnection(iar);
                return true;
            }
        }
        finally
        {
            cancelRegistration.Dispose();
            rwh.Unregister(null);
        }
        return false;
    }

    private bool DoMessageProcessing(byte[] buffer, int nBytes)
    {
        try
        {
            // Your processing code.
            // You could also make this async in case it does any I/O.
            return true;
        }
        catch (Exception e)
        {
            // report error, and decide what to do.
            // return false if the task should not
            // continue.
            return false;
        }
    }
}

class Program
{
    public static void Main(params string[] args)
    {
        using (var cancelSource = new CancellationTokenSource())
        {
            var receive = new MyPipeReader("_your_pipe_name_here_").ReceiveAndProcessStuffUntilCancelled(cancelSource.Token);
            Console.WriteLine("Press <ENTER> to stop");
            Console.ReadLine();
            cancelSource.Cancel();
            receive.Wait();
        }
    }
}

【讨论】:

  • 请注意,任何来到此线程的人,您都不能使用CancellationToken 取消ReadAsync 操作。
猜你喜欢
  • 1970-01-01
  • 2020-06-01
  • 2014-12-06
  • 1970-01-01
  • 2013-09-09
  • 2011-06-03
  • 1970-01-01
  • 2011-04-22
  • 2020-01-04
相关资源
最近更新 更多