【问题标题】:Reading from multiple WebSockets with async/await使用 async/await 从多个 WebSocket 读取
【发布时间】:2018-03-29 13:10:15
【问题描述】:

我正在编写一个需要从多个 WebSocket 连续读取数据的 .NET Core 控制台应用程序。我目前的方法是为每个 WebSocket 创建一个新任务(通过 Task.Run),该任务运行一个无限的 while 循环并阻塞,直到它从套接字读取数据。然而,由于数据推送的频率相当低,线程大部分时间只是阻塞,这看起来效率很低。

根据我的理解,异步/等待模式应该是阻塞 I/O 操作的理想选择。但是,我不确定如何将它应用于我的情况,或者即使 async/await 可以以任何方式改善这一点 - 特别是因为它是一个控制台应用程序。

我已经整理了一个概念证明(为了简单起见,使用 HTTP GET 而不是从 WebSocket 读取)。我能够做到这一点的唯一方法是没有真正等待。代码:

static void Main(string[] args)
{
    Console.WriteLine($"ThreadId={ThreadId}: Main");

    Task task = Task.Run(() => Process("https://duckduckgo.com", "https://stackoverflow.com/"));

    // Do main work.

    task.Wait();
}

private static void Process(params string[] urls)
{
    Dictionary<string, Task<string>> tasks = urls.ToDictionary(x => x, x => (Task<string>)null);
    HttpClient client = new HttpClient();

    while (true)
    {
        foreach (string url in urls)
        {
            Task<string> task = tasks[url];
            if (task == null || task.IsCompleted)
            {
                if (task != null)
                {
                    string result = task.Result;
                    Console.WriteLine($"ThreadId={ThreadId}: Length={result.Length}");
                }
                tasks[url] = ReadString(client, url);
            }
        }
        Thread.Yield();
    }
}

private static async Task<string> ReadString(HttpClient client, string url)
{
    var response = await client.GetAsync(url);
    Console.WriteLine($"ThreadId={ThreadId}: Url={url}");
    return await response.Content.ReadAsStringAsync();
}

private static int ThreadId => Thread.CurrentThread.ManagedThreadId;

这似乎在 ThreadPool 上的各种工作线程上工作和执行。但是,这绝对不是任何典型的 async/await 代码,这让我认为必须有更好的方法。

有没有更合适/更优雅的方式来做到这一点?

【问题讨论】:

  • 我认为对于您的用例,可观察流非常适合。尝试:reactivex.io
  • 所以基本上......你有一个 URL 列表,你想向每个 URL 发起一个请求,然后当每个请求完成时你想处理它,然后发送另一个请求?
  • @Rawling - 非常相似,只是使用 WebSockets。我正在等待数据被推送,然后读取它,隐藏它,存储它,然后再次读取。推送频率范围为 1 分钟 - 1 天。

标签: c# async-await .net-core console-application


【解决方案1】:

您基本上已经编写了 Task.WhenAny 的一个版本,它使用 CPU 循环来检查已完成的任务,而不是......框架方法在幕后使用的任何魔法。

更惯用的版本可能如下所示。 (虽然它可能不会——我觉得应该有一种比我在这里使用的反向字典更简单的“重新运行已完成的任务”的方法。)

static void Main(string[] args)
{
    Console.WriteLine($"ThreadId={ThreadId}: Main");

    // No need for Task.Run here.
    var task = Process("https://duckduckgo.com", "https://stackoverflow.com/");
    task.Wait();
}

private static async Task Process(params string[] urls)
{
    // Set up initial dictionary mapping task (per URL) to the URL used.
    HttpClient client = new HttpClient();
    var tasks = urls.ToDictionary(u => client.GetAsync(u), u => u);

    while (true)
    {
        // Wait for any task to complete, get its URL and remove it from the current tasks.
        var firstCompletedTask = await Task.WhenAny(tasks.Keys);
        var firstCompletedUrl = tasks[firstCompletedTask];
        tasks.Remove(firstCompletedTask);

        // Do work with completed task.
        try
        {
            Console.WriteLine($"ThreadId={ThreadId}: URL={firstCompletedUrl}");
            using (var response = await firstCompletedTask)
            {
                var content = await response.Content.ReadAsStringAsync();
                Console.WriteLine($"ThreadId={ThreadId}: Length={content.Length}");
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine($"ThreadId={ThreadId}: Ex={ex}");
        }

        // Queue the task again.
        tasks.Add(client.GetAsync(firstCompletedUrl), firstCompletedUrl);
    }
}

private static int ThreadId => Thread.CurrentThread.ManagedThreadId;

【讨论】:

  • 啊,Task.WhenAny 返回已完成的任务是我缺少的一点。这个解决方案与我写的非常相似,但如果我可以避免它,我宁愿依赖你所说的框架魔法而不是 CPU 循环。谢谢!
【解决方案2】:

我已接受 Rawling 的回答 - 我相信这对于我描述的确切场景是正确的。然而,通过一些颠倒的逻辑,我最终得到了一些更简单的东西——留下它以防万一有人需要这样的东西:

static void Main(string[] args)
{
    string[] urls = { "https://duckduckgo.com", "https://stackoverflow.com/" };
    HttpClient client = new HttpClient();

    var tasks = urls.Select(async url =>
    {
        while (true) await ReadString(client, url);
    });
    Task.WhenAll(tasks).Wait();
}

private static async Task<string> ReadString(HttpClient client, string url)
{
    var response = await client.GetAsync(url);
    string data = await response.Content.ReadAsStringAsync();
    Console.WriteLine($"Fetched data from url={url}. Length={data.Length}");
    return data;
}

【讨论】:

    【解决方案3】:

    也许更好的问题是:在这种情况下,您真的需要每个套接字线程吗?您应该将线程视为系统范围的资源,并且在生成它们时应该考虑到这一点,尤其是如果您并不真正知道应用程序将使用的线程数。这是一本好书:What's the maximum number of threads in Windows Server 2003?

    几年前,.NET 团队引入了异步套接字。

    ...客户端是用异步套接字构建的,所以执行 当服务器返回一个 回复。应用程序向服务器发送一个字符串,然后 在控制台上显示服务器返回的字符串。

    Asynchronous Client Socket Example

    有更多的例子展示了这种方法。虽然它有点复杂和“低级别”,但它让你可以控制。

    【讨论】:

      猜你喜欢
      • 2019-05-30
      • 2020-12-02
      • 2013-09-19
      • 2023-03-10
      • 2018-09-14
      • 2019-02-04
      • 1970-01-01
      • 2017-10-04
      • 1970-01-01
      相关资源
      最近更新 更多