【问题标题】:How can I merge two Linq IEnumerable<T> queries without running them?如何合并两个 Linq IEnumerable<T> 查询而不运行它们?
【发布时间】:2012-12-01 20:32:03
【问题描述】:

如何合并List&lt;T&gt; 的基于 TPL 的任务以供以后执行?

 public async IEnumerable<Task<string>> CreateTasks(){ /* stuff*/ }

我的假设是.Concat() ...

     void MainTestApp()  // Full sample available upon request.
     {
        List<string> nothingList = new List<string>();
        nothingList.Add("whatever");
        cts = new CancellationTokenSource();

         delayedExecution =
            from str in nothingList
            select AccessTheWebAsync("", cts.Token);
         delayedExecution2 =
          from str in nothingList
          select AccessTheWebAsync("1", cts.Token);

         delayedExecution = delayedExecution.Concat(delayedExecution2);
     }


    /// SNIP

    async Task AccessTheWebAsync(string nothing, CancellationToken ct)
    {
        // return a Task
    }

我想确保这不会产生任何任务或评估任何东西。事实上,我想我是在问“什么逻辑上执行 IQueryable 到返回数据的东西”?

背景

由于我正在执行递归并且我不想在正确的时间之前执行此操作,如果多次调用合并结果的正确方法是什么?

如果这很重要,我正在考虑运行这个命令来启动所有任务var AllRunningDataTasks = results.ToList();,然后是这个代码:

while (AllRunningDataTasks.Count > 0)
{
    // Identify the first task that completes.
    Task<TableResult> firstFinishedTask = await Task.WhenAny(AllRunningDataTasks);

    // ***Remove the selected task from the list so that you don't
    // process it more than once.
    AllRunningDataTasks.Remove(firstFinishedTask);

    // TODO: Await the completed task.
    var taskOfTableResult = await firstFinishedTask;

    // Todo: (doen't work)
    TrustState thisState = (TrustState)firstFinishedTask.AsyncState;

    // TODO: Update the concurrent dictionary with data
    // thisState.QueryStartPoint + thisState.ThingToSearchFor 

    Interlocked.Decrement(ref thisState.RunningDirectQueries);
    Interlocked.Increment(ref thisState.CompletedDirectQueries);

    if (thisState.RunningDirectQueries == 0)
    {
        thisState.TimeCompleted = DateTime.UtcNow;
    }
}

【问题讨论】:

  • 为什么 concat 不工作,它应该工作?另外,你不想运行任务,但运行查询没问题,对吧?
  • @Tilak 我的重点是任务,这是我第一次在任务或查询上这样做。我从来没有在查询中这样做过,但我记得读过 Concat 就是这样做的。
  • @Tilak 也许我在我的代码中发现了一个错误......很快就会更新

标签: linq task-parallel-library ienumerable


【解决方案1】:

要回答具体问题“什么逻辑上执行 IQueryable 到返回数据的东西”?这将是任何强制产生至少一个值,或强制发现一个值是否可用的东西。

例如,ToListToArrayFirstSingleSingleOrDefaultCount 都将强制评估。 (尽管First 不会评估整个集合 - 它会检索第一个项目然后停止。)这些都必须至少开始检索值,因为它们中的任何一个都无法在不这样做的情况下返回它们返回的值.在ToListToArray 的情况下,它们返回完全填充的非惰性集合,这就是它们必须评估所有内容的原因。返回单个项目的方法至少需要询问第一个项目,然后Single 的方法将继续检查如果继续评估没有其他结果(如果结果更多则抛出异常) .

使用foreach 遍历查询也会强制执行评估。 (同样,出于同样的原因:您向它询问集合中的实际值,因此它必须提供它们。)

Concat 不会立即评估,因为它不需要 - 只有当您向连接序列询问值时,它才需要向其输入询问值。

顺便说一句,尽管您询问了IQueryable,但您并没有在此处的示例中使用它。这可能很重要,因为与您实际获得的 LINQ to Objects 实现(您获得普通的IEnumerable&lt;T&gt;)相比,它的工作方式存在一些差异。我认为这在这个示例中没有什么不同,但这让我想知道您的原始代码与您在此处发布的用于说明的版本之间是否可能发生了变化?这很重要,因为不同的 LINQ 提供者可以以不同的方式做事。 IEnumerable&lt;T&gt; 风格的 Concat 肯定使用延迟评估,虽然我希望大多数其他 LINQ 实现都是如此,但这并不是绝对的。

如果您需要多次使用结果,并且您想确保只评估一次,但在真正需要它们之前不评估它们,那么通常的方法是调用ToList at您肯定需要评估的点,然后保留生成的List&lt;T&gt;,以便您可以再次使用它。一旦您获得了List&lt;T&gt;(或数组)形式的数据,您就可以根据需要多次使用该列表。

顺便说一句,你的第一个问题有问题:

“如何合并基于 TPL 的任务列表以供以后执行?”

一般来说,如果您已经有一个 TPL 任务,那么您无法阻止它执行。 (有一个例外。如果您直接构造 Task 而不是使用更正常的创建方式之一,它实际上不会运行,直到您告诉它。但通常,返回任务的 API 返回活的,也就是说,当你拿到它们时,它们很可能已经在运行,甚至已经完成。)

您示例中的“稍后执行”是因为您实际上根本没有任务列表来开始。 (如果您确实有一个List&lt;T&gt; 的任务,那么“稍后执行”将不是一个选项。)您拥有的是几个可枚举,如果您要评估它们,它们将创建任务。创建任务的行为与在任何返回 Task 的 TAP 样式 API 中启动它的行为是不可分割的。

根据你写的其余部分,我认为你真正要问的是:

“我如何将多个 IEnumerable&lt;Task&lt;T&gt;&gt; 对象合并到一个 IEnumerable&lt;Task&lt;T&gt;&gt; 中,从而推迟对基础枚举的求值,直到组合的枚举本身被求值?”

Concat 应该可以解决这个问题。

【讨论】:

  • 有趣...我正在使用 Azure 存储来获取按需加载树,并且在搜索到相同深度的所有分支之前,我不想执行子节点。 Azure 存储使用 Begin..End 语义。我将它们包装在 TPL 中,如此处所述stackoverflow.com/q/13216475/328397
【解决方案2】:

以下是一种合并数据的 hacky 方法...我不喜欢在 Main 或此示例的其他一些方面使用“nothingList”的方式,但它似乎完成了工作并允许我来合并待处理的任务。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows;
using System.Windows.Controls;
using System.Windows.Data;
using System.Windows.Documents;
using System.Windows.Input;
using System.Windows.Media;
using System.Windows.Media.Imaging;
using System.Windows.Navigation;
using System.Windows.Shapes;

// Add a using directive and a reference for System.Net.Http.
using System.Net.Http;

// Add the following using directive.
using System.Threading;


namespace ProcessTasksAsTheyFinish
{
    public partial class MainWindow : Window
    {
        // Declare a System.Threading.CancellationTokenSource.
        CancellationTokenSource cts;
        List<IEnumerable<Task>> launchList = new List<IEnumerable<Task>>();

        public MainWindow()
        {
            InitializeComponent();

            List<string> nothingList = new List<string>();
            nothingList.Add("whatever");

            cts = new CancellationTokenSource();

             delayedExecution =
                from str in nothingList
                select AccessTheWebAsync("", cts.Token);


             List<string> nothingList2 = new List<string>();
             nothingList2.Add("whatever");

             delayedExecution2 =
              from str in nothingList2
              select AccessTheWebAsync("1", cts.Token);


             launchList.Add(delayedExecution);
             launchList.Add(delayedExecution2);

             delayedExecution = delayedExecution.Concat(delayedExecution2);
        }
        IEnumerable<Task> delayedExecution = null;
        IEnumerable<Task> delayedExecution2 = null;

        private async void startButton_Click(object sender, RoutedEventArgs e)
        {
            resultsTextBox.Clear();

            // Instantiate the CancellationTokenSource.

            try
            {
                // ***Set up the CancellationTokenSource to cancel after 25 seconds.
                //cts.CancelAfter(250000);

                var test  =  delayedExecution;// AccessTheWebAsync("", cts.Token);

                var testList = test.ToList();

                while (testList.Count() > 0)
                {
                    var firstFinishedTask = await Task.WhenAny(testList);
                    testList.Remove(firstFinishedTask);

                      await firstFinishedTask;
                }

                resultsTextBox.Text += "\r\nDownloads complete.";
            }
            catch (OperationCanceledException tee)
            {
                resultsTextBox.Text += "\r\nDownloads canceled.\r\n";
            }
            catch (Exception)
            {
                resultsTextBox.Text += "\r\nDownloads failed.\r\n";
            }

            cts = null;
        }


        private void cancelButton_Click(object sender, RoutedEventArgs e)
        {
            if (cts != null)
            {
                cts.Cancel();
            }
        }


        async Task<string> AccessTheWebAsync(string nothing, CancellationToken ct)
        {
            // CHANGE THIS VALUE TO CONTROL THE TESTING
            bool delayConversionOfQueryToList = false;

            HttpClient client = new HttpClient();

            // Make a list of web addresses.
            List<string> urlList = null;

            if (nothing == "1")
            {
                urlList = SetUpURLList2();
            }
            else urlList = SetUpURLList();

            // ***Create a query that, when executed, returns a collection of tasks.
            IEnumerable<Task<int>> downloadTasksQuery =
                from url in urlList select ProcessURL(url, client, ct);

            // DEBUG!!!
            if (delayConversionOfQueryToList == true)
            {
                await Task.Delay(10000);
                resultsTextBox.Text += String.Format("\r\nDelay of IQueryable complete.  Tip: Did you see any IsRunning messages?");
            }

            // ***Use ToList to execute the query and start the tasks. 
            List<Task<int>> downloadTasks = downloadTasksQuery.ToList();

            // DEBUG!!!
            if (delayConversionOfQueryToList == false)
            {
                await Task.Delay(10000);
                resultsTextBox.Text += String.Format("\r\nDelay of .ToList() complete.   Tip: Did you see any IsRunning messages?");
            }

            // ***Add a loop to process the tasks one at a time until none remain.
            while (downloadTasks.Count() > 0)
            {
                // Identify the first task that completes.
                Task<int> firstFinishedTask = await Task.WhenAny(downloadTasks);

                resultsTextBox.Text += String.Format("\r\nID  {0}", firstFinishedTask.Id);

                // ***Remove the selected task from the list so that you don't
                // process it more than once.
                downloadTasks.Remove(firstFinishedTask);

                // Await the completed task.
                int length = await firstFinishedTask;
                resultsTextBox.Text += String.Format("\r\nLength of the download:  {0}", length);
            }

            return nothing;
        }


        private List<string> SetUpURLList()
        {
            List<string> urls = new List<string> 
            { 
                "http://msdn.microsoft.com",
                "http://msdn.microsoft.com/library/windows/apps/br211380.aspx",
                "http://msdn.microsoft.com/en-us/library/hh290136.aspx",
                "http://msdn.microsoft.com/en-us/library/dd470362.aspx",
                "http://msdn.microsoft.com/en-us/library/aa578028.aspx",
                "http://msdn.microsoft.com/en-us/library/ms404677.aspx",
                "http://msdn.microsoft.com/en-us/library/ff730837.aspx"
            };
            return urls;
        }
        private List<string> SetUpURLList2()
        {
            List<string> urls = new List<string> 
            { 
                "http://www.google.com",

            };
            return urls;
        }

        async Task<int> ProcessURL(string url, HttpClient client, CancellationToken ct)
        {
            resultsTextBox.Text += String.Format("\r\nIS RUNNING {0}", url);

            // GetAsync returns a Task<HttpResponseMessage>. 
            HttpResponseMessage response = await client.GetAsync(url, ct);
            // Retrieve the website contents from the HttpResponseMessage.
            byte[] urlContents = await response.Content.ReadAsByteArrayAsync();

           // Thread.Sleep(3000);
           // await Task.Delay(1000, ct);
           return urlContents.Length;
        }
    }
}

// Sample Output:

IS RUNNING http://msdn.microsoft.com
IS RUNNING http://msdn.microsoft.com/library/windows/apps/br211380.aspx
IS RUNNING http://msdn.microsoft.com/en-us/library/hh290136.aspx
IS RUNNING http://msdn.microsoft.com/en-us/library/dd470362.aspx
IS RUNNING http://msdn.microsoft.com/en-us/library/aa578028.aspx
IS RUNNING http://msdn.microsoft.com/en-us/library/ms404677.aspx
IS RUNNING http://msdn.microsoft.com/en-us/library/ff730837.aspx
IS RUNNING http://www.google.com
Delay of .ToList() complete.   Tip: Did you see any IsRunning messages?
ID  1
Length of the download:  48933
ID  2
Length of the download:  375328
ID  3
Length of the download:  220428
ID  4
Length of the download:  222256
ID  5
Length of the download:  229330
ID  6
Length of the download:  136544
ID  7
Length of the download:  207171
Delay of .ToList() complete.   Tip: Did you see any IsRunning messages?
ID  8
Length of the download:  43945
Downloads complete.

【讨论】:

    猜你喜欢
    • 2010-10-10
    • 2020-09-23
    • 1970-01-01
    • 1970-01-01
    • 2016-05-21
    • 1970-01-01
    • 2019-10-11
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多