【问题标题】:Create an asynchronous LinQ query创建异步 LinQ 查询
【发布时间】:2015-01-20 10:37:13
【问题描述】:

我检查并尝试了这个例子:How to write Asynchronous LINQ query?

这很好用,它异步执行这两个过程,但我的目标是创建一个查询,当您获得查询的第一个结果时,您已经可以开始使用该值,同时查询仍在执行并寻找更多值。

我在为 android 编程时这样做了,结果非常棒而且非常快,但我不知道如何在 C# 中使用 linQ 来制作它

【问题讨论】:

  • 你也许可以用 PLinq 和聚合函数做你想做的事...msdn.microsoft.com/en-us/library/dd460697(v=vs.110).aspx
  • 可能有一个解决方法,您可以使用 FirstOrDefault() 结果开始您的工作,并再次进行 Where 查询以完成所有其余操作(转义第一个操作)。这可能不是您想要的,但这可能会让您产生想法。
  • @JakobOlsen,来自MSDN 的“当源集合中的每个元素的处理是独立的且各个委托之间不涉及共享状态时,PLINQ 的性能最佳”。此外,您不能保证您的第一个项目将始终使用 PLINQ 进行处理。
  • @RohitPrakash 如果我理解得很好,您的建议是进行第一个 linQ 查询以仅获取一个值,我是否很好地关注您?问题是我不想只使用第一个值,而是使用所有值,而 linq 查询正在查找它们。
  • @IbanArriola 我的答案会这样做,因为每个答案都可用,可以执行您想要执行的处理。您可以选择跳过 AsParallel 调用来一次处理一个,这与说“获取一个并执行逻辑,然后获取另一个并重复”相同。

标签: c# linq asynchronous


【解决方案1】:

异步序列在 .NET 中建模为 IObservables。 Reactive Extensions 是 LINQ 的异步部分。

您可以通过多种方式生成可观察序列,例如通过调用Generate

var sequence = Observable.Generate(0,
    i => i < 10,
    i => i + 1,
    i => SomeExpensiveGeneratorFunction());

然后使用所有常规 LINQ 函数(因此允许使用查询语法)以及一些专门针对异步序列有意义的附加操作(以及许多不同的方法创建可观察序列)例如:

var query = from item in sequence
            where ConditionIsTrue(item)
            select item.ToString();

这里发生的事情的简短描述就是说它完全按照您的意愿行事。生成器函数只需在成功生成值(或完成时)通知其订阅者并继续生成值而不等待订阅者完成,Where 方法将订阅sequence,并通知 its 订阅者 每当它观察到一个通过条件的值时,Select 将订阅由Where 返回的序列,并在它获得一个值时(异步)执行其转换,然后将其推送给它的所有订阅者。

【讨论】:

    【解决方案2】:

    我已经修改了您给定链接中的TheSoftwareJedi 答案。 您可以从 Asynchronous 类引发第一个启动事件,并使用它来启动您的工作。 这是课程,

    public static class AsynchronousQueryExecutor
        {
            private static Action<object> m_OnFirstItemProcessed;
    
            public static void Call<T>(IEnumerable<T> query, Action<IEnumerable<T>> callback, Action<Exception> errorCallback, Action<object> OnFirstItemProcessed)
            {
                m_OnFirstItemProcessed = OnFirstItemProcessed;
                Func<IEnumerable<T>, IEnumerable<T>> func =
                    new Func<IEnumerable<T>, IEnumerable<T>>(InnerEnumerate<T>);
                IEnumerable<T> result = null;
                IAsyncResult ar = func.BeginInvoke(
                                    query,
                                    new AsyncCallback(delegate(IAsyncResult arr)
                                    {
                                        try
                                        {
                                            result = ((Func<IEnumerable<T>, IEnumerable<T>>)((AsyncResult)arr).AsyncDelegate).EndInvoke(arr);
                                        }
                                        catch (Exception ex)
                                        {
                                            if (errorCallback != null)
                                            {
                                                errorCallback(ex);
                                            }
                                            return;
                                        }
                                        //errors from inside here are the callbacks problem
                                        //I think it would be confusing to report them
                                        callback(result);
                                    }),
                                    null);
            }
            private static IEnumerable<T> InnerEnumerate<T>(IEnumerable<T> query)
            {
                int iCount = 0;
                foreach (var item in query) //the method hangs here while the query executes
                {
                    if (iCount == 0)
                    {
                        iCount++;
                        m_OnFirstItemProcessed(item);
    
                    }
                    yield return item;
                }
            }
        }
    

    这是关联,

    private void OnFirstItem(object value) // Your first items is proecessed here.
        {
            //You can start your work here.
        }
    
        public void HandleResults(IEnumerable<int> results)
        {
            foreach (var item in results)
            {
            }
        }
    
        public void HandleError(Exception ex)
        {
        }
    

    下面是你应该如何调用函数。

    private void buttonclick(object sender, EventArgs e)
        {
            IEnumerable<int> range = Enumerable.Range(1,10000);
    
            var qry = TestSlowLoadingEnumerable(range);
    
            //We begin the call and give it our callback delegate
            //and a delegate to an error handler
            AsynchronousQueryExecutor.Call(qry, HandleResults, HandleError, OnFirstItem);
       }
    

    如果这符合您的期望,您可以使用它从处理的第一个项目开始工作。

    【讨论】:

    • 这不允许您在查询查找下一个值的同时处理查询返回的值。 “get first item”、“handle first item”、“get second item”、“handle second item”等工作仍然是同步发生的;这只是添加了一个额外的“处理第一项”调用,该调用也同步发生。
    • @Rawling,假设我在主线程中有一个整数列表。我想添加异步处理的列表中的第一项,private void OnFirstItem(object value) // 你的第一项在这里被处理。 { listofInts.Add((int)value); //这添加成功。 //你可以在这里开始你的工作。 } List listofInts = new List();
    【解决方案3】:

    再试一次...

    如果我理解你,你想要的逻辑是......

    var query = getData.Where( ... );
    query.AsParallel().ForEach(r => {
       //other stuff
    });
    

    这里会发生什么...

    简而言之,编译器会将其评估为:在并行遍历查询结果的同时,执行注释所在区域的逻辑。

    这是异步的,并利用 .net 管理的最佳线程池来确保尽快获得结果。

    这是一个自动管理的异步并行操作。

    同样值得注意的是,如果我这样做......

    var query = getData.Where( ... );
    

    ...在我开始迭代 IQueryable 并通过声明操作为并行操作之前,不会运行任何实际代码,框架可以通过为您线程化代码在任何时间点对多个结果进行操作。

    ForEach 本质上只是一个普通的 foreach 循环,其中每次迭代都是异步处理的。

    如果你愿意,你放在那里的逻辑可以调用某种回调,但这取决于你如何包装这段代码......

    我可以建议这样的事情吗:

    void DoAsync<T>(IQueryable<T> items, Func<T> operation, Func<T> callback)
    {
       items.AsParallel().ForEach(x => {
          operation(x);
          callback(x);
       });
    }
    

    【讨论】:

      【解决方案4】:

      使用 TPL 非常简单。

      这是一个虚拟的“慢”枚举器,它必须在获取项目之间做一些工作:

      static IEnumerable<int> SlowEnumerator()
      {
          for (int i = 0; i < 10; i++)
          {
              Thread.Sleep(1000);
              yield return i;
          }
      }
      

      以下是序列中每个项目的虚拟工作:

      private static void DoWork(int i)
      {
          Thread.Sleep(1000);
          Console.WriteLine("{0} at {1}", i, DateTime.Now);
      }
      

      下面是您如何同时对枚举器返回的一个项目运行“工作量”向枚举器询问下一个项目:

      foreach (var i in SlowEnumerator())
      {
          Task.Run(() => DoWork(i));
      }
      

      您应该每秒钟完成一次工作,而不是像您期望的那样每 2 秒完成一次工作,如果您必须交错执行两种类型的工作:

      0 at 20/01/2015 10:56:52
      1 at 20/01/2015 10:56:53
      2 at 20/01/2015 10:56:54
      3 at 20/01/2015 10:56:55
      4 at 20/01/2015 10:56:56
      5 at 20/01/2015 10:56:57
      6 at 20/01/2015 10:56:58
      7 at 20/01/2015 10:56:59
      8 at 20/01/2015 10:57:00
      9 at 20/01/2015 10:57:01
      

      【讨论】:

      • 但是如何将这个示例与 linq 一起使用?我如何告诉 linq 使用他找到的第一个值执行函数,然后使用第二个值,依此类推?
      • foreach 为您提供LINQ 找到的每个值,Task.Run 在您找到每个值时执行一个函数。 SlowEnumerator 只是一个示例 - 这应该是您的 LINQ 查询,它返回一个项目,然后“仍在执行并寻找更多值”。
      • 我正在尝试这样做,但我找不到路。问题是,当我进行查询时,我做了类似这样的事情 var query = from elem in contex.Clientes select elem.Nombre;但是,当所有查询完成时,我得到的是一个列表......我怎样才能让它一一给我结果,因为他正在寻找它?
      • @Iban 您的问题需要更多关于来源的信息。如果是例如一个数据库查询,那么“当查询仍在执行并寻找更多值时”你就不能做任何事情,因为查询在完成之前不会从数据库中返回。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-11-06
      • 1970-01-01
      • 1970-01-01
      • 2013-07-31
      • 2015-10-01
      • 2011-02-18
      相关资源
      最近更新 更多