【问题标题】:Is there a way to use System.Diagnostics.Process in an IAsyncEnumerator?有没有办法在 IAsyncEnumerator 中使用 System.Diagnostics.Process?
【发布时间】:2021-01-05 01:02:31
【问题描述】:

首先,我的目标是 .Net Core 3.1 和 C#8。

我想要这样的东西。

public static async Task<MyDataObj> GetData()
{
  var dataObj = new MyDataObj();
  var args = ArgsHelperFunction(...);
  
  await foreach(string result in RunProcessAsync(args))
  {
     // Process result and store it in dataObj
  }

  return dataObj;
}

private static async IAsyncEnumerable<string> RunProcessAsync(string args)
{
  await using (var myProcess = new Process())
  {
    myProcess.StartInfo.FileName = @"path\to\file.exe"
    myProcess.StartInfo.Arguments = args;
    myProcess.StartInfo.UseShellExecute = false;
    myProcess.StartInfo.RedirectStandardError = true;
    myProcess.StartInfo.CreateNoWindow = true;

    myProcess.ErrorDataReceived += (s, e) =>
    {
      yield return e.Data;
    }

    myProcess.Start();
    myProcess.BeginErrorReadLine();
    process.WaitforExit();
  }
}

当我尝试此设置时,我收到来自 await foreach(string result in RunProcessAsync(args)) 的错误

CS8417 'Process':异步 using 语句中使用的类型必须隐式转换为 'System.IAsyncDisposable' 或实现合适的 'DisposeAsync' 方法。

这个错误来自yield return e.Data;

CS1621 不能在匿名方法或 lambda 表达式中使用 yield 语句

目标是这样的。我有一个执行一些操作并将信息写入错误输出流的 exe(不确定这是否是它的真实名称)。我想在写入时接受这些写入,解析它们以获取我想要的信息并将它们存储在一个对象中以供以后使用。

我是一个相当新手的编码器,对异步编码非常陌生。我以同步方式测试了RunProcessAsync 的功能;它被调用的地方,只是将所有原始数据写入输出窗口,而不将任何数据返回给调用方法。那工作得很好。另外,我得到了一个使用IAsyncEnumerable 的测试异步流,但它只使用了Task.Delay 并返回了一些整数。现在我正在尝试将这些东西结合起来,而我缺乏经验正在阻碍我。

感谢大家提供的任何帮助以及帮助提高 C# 技能和知识。

【问题讨论】:

标签: c# process iasyncenumerable


【解决方案1】:

没有minimal, reproducible example,就不可能完全解决您的问题。但我们可以处理您提出的两个具体问题。

首先,如果您的对象(例如Process)不支持IAsyncDisposable,那么就不要使用它。请改用同步的using 语句。

就方法中​​的yield return 而言,如果您花一点时间,您可能会发现您尝试编写的内容没有任何意义。事件处理程序,这是一个完全不同的方法,如何能够使当前方法产生一个新值?当事件发生时,您需要事件处理程序向当前方法发出信号。您可以通过多种方式执行此操作,但SemaphoreSlim 是更直接的方式之一。

把它们放在一起,你可能会得到这样的结果:

private static async IAsyncEnumerable<string> RunProcessAsync(string args)
{
  using (var myProcess = new Process())
  {
    myProcess.StartInfo.FileName = @"path\to\file.exe";
    myProcess.StartInfo.Arguments = args;
    myProcess.StartInfo.UseShellExecute = false;
    myProcess.StartInfo.RedirectStandardError = true;
    myProcess.StartInfo.CreateNoWindow = true;

    ConcurrentQueue<string> dataQueue = new ConcurrentQueue<string>();
    SemaphoreSlim dataSemaphore = new SemaphoreSlim(0);

    myProcess.ErrorDataReceived += (s, e) =>
    {
      dataQueue.Enqueue(e.Data);
      dataSemaphore.Release();
    }

    myProcess.Start();
    myProcess.BeginErrorReadLine();

    while (true)
    {
      await dataSemaphore.WaitAsync();

      // Only one consumer, so this will always succeed
      dataQueue.TryDequeue(out string data);

      if (data == null) break;

      yield return data;
    }
  }
}

由于您没有提供实际的 MCVE,因此我无法尝试从头开始重建您的场景。所以上面没有编译,没关系测试。但它应该显示要点。

也就是说,您需要保持迭代器方法异步(这意味着您不能阻塞对 WaitForExit() 的调用),并且您需要以某种方式将 ErrorDataReceived 事件处理程序接收到的数据移回迭代器方法。在上面,我将线程安全队列对象与信号量结合使用。

每次接收到一行数据时,事件处理程序中的信号量计数都会增加(通过Release()),然后迭代器方法通过减少信号量计数(通过WaitAsync())并返回该行来消耗这些数据收到了。

这里还有很多其他机制可以用于生产者/消费者方面。 a well-received Q&A here 讨论了异步兼容机制,包括支持异步操作的 BlockingCollection&lt;T&gt; 的自定义版本,以及来自 TPL Dataflow 的 the BufferBlock&lt;T&gt; class 的提及。

这是一个使用BufferBlock&lt;T&gt; 的示例(其语义与BlockingCollection&lt;T&gt; 非常相似,但包括对消费代码的异步处理):

static async IAsyncEnumerable<string> RunProcessAsync(string args)
{
    using (var process = new Process())
    {
        myProcess.StartInfo.FileName = @"path\to\file.exe";
        process.StartInfo.Arguments = args;
        process.StartInfo.UseShellExecute = false;
        process.StartInfo.RedirectStandardError = true;
        process.StartInfo.CreateNoWindow = true;

        BufferBlock<string> dataBuffer = new BufferBlock<string>();

        process.ErrorDataReceived += (s, e) =>
        {
            if (e.Data != null)
            {
                dataBuffer.Post(e.Data);
            }
            else
            {
                dataBuffer.Complete();
            }
        };

        process.Start();
        process.BeginErrorReadLine();

        while (await dataBuffer.OutputAvailableAsync())
        {
            yield return dataBuffer.Receive();
        }
    }
}

【讨论】:

    【解决方案2】:

    这样的?

    using CliWrap;
    using CliWrap.EventStream;
    
    var cmd = Cli.Wrap("foo").WithArguments("bar");
    
    await foreach (var cmdEvent in cmd.ListenAsync())
    {
        switch (cmdEvent)
        {
            case StartedCommandEvent started:
                _output.WriteLine($"Process started; ID: {started.ProcessId}");
                break;
            case StandardOutputCommandEvent stdOut:
                _output.WriteLine($"Out> {stdOut.Text}");
                break;
            case StandardErrorCommandEvent stdErr:
                _output.WriteLine($"Err> {stdErr.Text}");
                break;
            case ExitedCommandEvent exited:
                _output.WriteLine($"Process exited; Code: {exited.ExitCode}");
                break;
        }
    }
    

    这在CliWrap 中可用。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-06-13
      • 2021-12-21
      • 2012-10-28
      • 2021-05-29
      • 2016-09-10
      • 2021-01-07
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多