【问题标题】:Rx.NET: Combine observables in orderRx.NET:按顺序组合可观察对象
【发布时间】:2018-05-11 18:51:06
【问题描述】:

我有 2 个IConnectableObservables,其中一个正在重放旧的历史消息,另一个正在发出新的当前值:

HistoricObservable: - 1 - 2 - 3 - 4 - 5 - 6 - 7 - 8 - ...
CurrentObservable:    - - - - - 5 - 6 - 7 - 8 - 9 - 10 - ...

如何将它们合并到一个可观察对象中,以便从两个可观察对象中获取完整(正确)序列,但一旦我开始从 CurrentObservable 发出值,也可以删除订阅并在 HistoricObservable 订阅上调用 Dispose。

MergedObservable: - 1 - 2 - 3 - 4 - 56 - 7 - 8 - 9 - 10 - ...

我的消息由 Guid 标识,因此该解决方案只能使用 Equal 比较它们,并且不能依赖于每个可观察对象发出的顺序之外的任何顺序。

简而言之,我正在寻找填充方法:

public static IObservable<T> MergeObservables<T>(
    IObservable<T> historicObservable,
    IObservable<T> currentObservable)
    where T : IEquatable<T>
{
    throw new NotImplementedException();
}

MergedObservable 应该继续从 HistoricObservable 发出值,而无需等待来自 CurrentObservable 的第一个值,如果 CurrentObservable 的第一个值之前已经发出,则 MergedObservable 应该跳过 CurrentObservable 中已经发出的任何值,处置对 HistoricObservable 的订阅,并开始从 CurrentObservable 中获取所有新值。当第一个对象由 CurrentObservable 发出时,我也不想立即切换,直到我到达 HistoricObservable 中的那个点,所以我一直很难尝试使用 TakeWhile/TakeUntil。我在使用 CombineLatest 来保存状态方面取得了一些小成功,但我认为可能有更好的方法。

测试用例

对于以下测试用例,假设每条消息都由一个 GUID 表示,如下所示:

A = E021ED8F-F0B7-44A1-B099-9878C6400F34
B = 1139570D-8465-4D7D-982F-E83A183619DE
C = 0AA2422E-19D9-49A7-9E8C-C9333FC46C46
D = F77D0714-2A02-4154-A44C-E593FFC16E3F
E = 14570189-4AAD-4D60-8780-BCDC1D23273D
F = B42983F0-5161-4165-A2F7-074698ECCE77
G = D2506881-F8AB-447F-96FA-896AEAAD1D0A
H = 3063CB7F-CD25-4287-85C3-67C609FA5679
I = 91200C69-CC59-4488-9FBA-AD2D181FD276
J = 2BEA364E-BE86-48FF-941C-4894CEF7A257
K = 67375907-8587-4D77-9C58-3E3254666303
L = C37C2259-C81A-4BC6-BF02-C96A34011479
M = E6F709BE-8910-42AD-A100-2801697496B0
N = 8741D0BB-EDA9-4735-BBAF-CE95629E880D

1) 如果历史 observable 永远不会赶上当前 observable,那么合并后的 observable 不应该从当前 observable 发出任何东西

Historic: - A - B - C - D - E - F - G - H|
Current:    - - - - - - - - - - - - - - - I - J - K - L - M - N|
Merged:   - A - B - C - D - E - F - G - H|

2) 一旦历史 observable 达到当前 observable 发出的第一个值,则合并后的 observable 应立即发出当前 observable 先前发出的所有值,并与历史 observable 断开连接。

Historic: - A - B - C - D - E - F - G - H - I - J|
Current:  - - - - - - E - F - G - H - I - J|
Merged:   - A - B - C - D - EF-G- H - I - J|

3) 解决方案应该能够在历史可观察对象之前处理来自当前可观察对象的值。

Historic: - - - - - A - B - C - D - E - F - G - H - I - J|
Current:  - - C - D - E - F - G - H - I - J - K - L - M - N|
Merged:   - - - - - A - B - CDEF-G-H- I - J - K - L - M - N|

4) 如果当前 observable 中的值已经发出,则解决方案应跳过它们,直到发出新值。

Historic: - A - B - C - D - E - F - G - H - I - J|
Current:  - - - - - - - - B - C - D - E - F - G - H - I - J|
Merged:   - A - B - C - D - - - - - - E - F - G - H - I - J|

5) 对于我的用例,我保证当前的 observable 将是历史的一个子集,但为了完整起见,我认为解决方案将继续从历史 observable 中提取,认为第一个元素将发生在后一点

Historic: - - - - - E - F - G - H - I - J - ... - Z - A|
Current:  - - A - B - C - D - E - F - G - H - I - J|
Merged:   - - - - - E - F - G - H - I - J - ... - Z - ABCDEFGHIJ|

6) 我还保证历史的 observable 一旦同步就不会与当前的 observable 不同,但是如果由于某种原因他们这样做了合并的 observable 应该已经与它断开连接并且不会选择消除任何差异

Historic: - A - B - C - D - E - D - C - B - A|
Current:  - - - - - - E - F - G - H - I - J|
Merged:   - A - B - C - D - EF-G- H - I - J|

创建有效解决方案的帮助,这里是一些输入数据:

var historic = new Subject<int>();
var current = new Subject<int>();

// query & subscription goes here

historic.OnNext(1);
historic.OnNext(2);
current.OnNext(5);
historic.OnNext(3);
current.OnNext(6);
historic.OnNext(4);
current.OnNext(7);
historic.OnNext(5);
current.OnNext(8);
historic.OnNext(6);
current.OnNext(9);
historic.OnNext(7);
current.OnNext(10);

正确的解应该产生从 1 到 10 的数字。

【问题讨论】:

  • 我不确定测试数据是否反映了问题的目标。当 Historic 为 7 时,Current 为 10,因此如果我理解正确,则不应发出 Current 中的任何值。
  • 请查看更新的答案和 cmets。
  • 大家好,很抱歉没有尽我所能。使用示例测试用例更新了描述。还添加了一个新约束,因为我正在使用 GUID,并且只能使用 Equals() 进行比较
  • 仍不清楚。如果您正在使用 guid 并重新表述问题,请去掉有序数字。
  • 数字是传入消息的简单单字符表示。我可以尝试将它们切换为字母,以更好地展示我不依赖于它们的数字顺序的意图,但我宁愿不实际给出包含 10 多个 guid 的示例,因为那样会变得太混乱

标签: c# observable system.reactive


【解决方案1】:

假设您想要不同的结果而不考虑出现的顺序,也许这种方法也有效:

       var replayCurrent = current.Replay();
        replayCurrent.Connect();


        var merged = historic
            .Scan(
                new { history = new List<string>(), firstVal = (string)null },
                (state, val) =>
                { state.history.Add(val); return state; }
                )
            .Merge(
                current.Take(1).Select(v => new { history = (List<string>)null, firstVal = v })

                )
            .Scan(new { history = (List<string>)null, firstVal = (string)null },
                (state, val) =>
                new { history = val.history ?? state.history, firstVal = val.firstVal ?? state.firstVal })
            .TakeWhile(v => 
                (null==v.firstVal || ( null!=v.firstVal && !v.history.Contains(v.firstVal)))
                )
            .Select(v=>v.history.Last())
            .Concat(replayCurrent)
            .Distinct();

        merged.Subscribe(x => Console.WriteLine(x));

【讨论】:

    【解决方案2】:

    试试这个:

    var historic = new Subject<int>();
    var current = new Subject<int>();
    
    var subscription =
        Observable
            .Defer(() =>
            {
                var c = int.MaxValue;
                return
                    current
                        .Do(x => { if (c == int.MaxValue) c = x; })
                        .Publish(pc =>
                            historic
                                .TakeWhile(h => h < c)
                                .Publish(ph =>
                                    Observable
                                        .Merge(
                                            ph.Delay(x => pc.FirstOrDefaultAsync(y => x < y)),
                                            pc.Delay(y => ph.FirstOrDefaultAsync(x => y < x)))))
                        .Distinct();
            })
            .Subscribe(x => Console.WriteLine(x));
    
    historic.OnNext(1);
    historic.OnNext(2);
    current.OnNext(5);
    historic.OnNext(3);
    current.OnNext(6);
    historic.OnNext(4);
    current.OnNext(7);
    historic.OnNext(5);
    current.OnNext(8);
    historic.OnNext(6);
    current.OnNext(9);
    historic.OnNext(7);
    current.OnNext(10);
    

    这按顺序给出了 1 到 10。可能需要进行一些调整才能获得正确的历史值和当前值。


    要确认发出了正确的值,请尝试以下操作:

    var subscription =
        Observable
            .Defer(() =>
            {
                var c = int.MaxValue;
                return
                    current
                        .Do(x => { if (c == int.MaxValue) c = x; })
                        .Select(x => new { source = "current", value = x })
                        .Publish(pc =>
                            historic
                                .TakeWhile(h => h < c)
                                .Finally(() => Console.WriteLine("!"))
                                .Select(x => new { source = "historic", value = x })
                                .Publish(ph =>
                                    Observable
                                        .Merge(
                                            ph.Delay(x => pc.FirstOrDefaultAsync(y => x.value < y.value)),
                                            pc.Delay(y => ph.FirstOrDefaultAsync(x => y.value < x.value)))))
                        .Distinct(x => x.value);
            })
            .Subscribe(x => Console.WriteLine($"{x.source}:{x.value}"));
    

    输出是:

    历史:1 历史:2 历史:3 历史:4 电流:5 当前:6 当前:7 ! 当前:8 当前:9 当前:10

    ! 显示历史 observable 的处置位置。

    【讨论】:

      【解决方案3】:

      回到这一点,我终于在我一直在前进的方向上取得了一些进展。让它通过描述中的所有测试用例,所以我认为这对于我的用例来说已经足够好了。建设性的反馈总是值得赞赏的。

      public static IObservable<T> CombineObservables<T>(
          IObservable<T> historicObservable,
          IObservable<T> currentObservable)
          where T : IEquatable<T>
      {
          var cachedCurrent = currentObservable.Replay();
          cachedCurrent.Connect();
      
          var firstMessage = cachedCurrent.FirstAsync();
      
          var emittedHistoryItems = new List<T>();
      
          var part1 = historicObservable.TakeUntil(firstMessage)
                                        .Do(x => emittedHistoryItems.Add(x));
      
          var part2 = historicObservable.CombineLatest(firstMessage, Tuple.Create)
                                        .TakeWhile(x =>
                                                   {
                                                       var historyItem = x.Item1;
                                                       var first = x.Item2;
      
                                                       return !emittedHistoryItems.Any(y => y.Equals(first)) && !historyItem.Equals(first);
                                                   })
                                        .Select(x => x.Item1)
                                        .Do(x => emittedHistoryItems.Add(x));
      
          var part3 = cachedCurrent.SkipWhile(x => emittedHistoryItems.Contains(x));
      
          return part1.Concat(part2).Concat(part3);
      }
      

      小提琴示例:https://dotnetfiddle.net/6BqfiW

      【讨论】:

      • 很公平。作为一名程序员,开发有效沟通需求的技能可能与知道如何编码一样重要(在我看来)。感谢您的反馈!
      • 我无法抗拒,又看了看(在醒来后心情不那么懒惰:-))
      【解决方案4】:

      如果我正确理解你的问题,你可以

      • 制作一个发出两个可观察对象的可观察对象
      • 第一个是'historic',可以用StartsWith注入
      • 第二个是“当前”,仅在当前的第一个元素上提供,这意味着您需要使用 Take(1) 或其他方式订阅该 observable 以获得第一个元素,然后选择可观察到的本身。 (您可以根据需要添加 .Where 和 .Scan 来比较两个序列中的值)
      • 将两者与 .Switch 结合使用。 Switch 将从 Historic 中取消订阅,并从那时起跟随 Current。

      更新:

      我不认为示例数据反映了问题,至少不是我理解的那样。试试这个

      var historic = new Subject<int>();
      var current = new Subject<int>();
      var observables = new Subject<IObservable<int>>();
      int[] tracker = new int[] { int.MaxValue, int.MaxValue };
      
      var merged = historic
                      .Select(x => new { source = (int)0, val = x })
                      .Merge(current.Select(y => new { source = (int)1, val = y }))
                      .Scan((state, v) => {
                              tracker[v.source] = v.val;
                              return v; })
                      .Do(x => {
                          if ((tracker[1] <= tracker[0]) && tracker[0] != int.MaxValue)
                              observables.OnNext(current.StartWith(x.val));
                          })
                      .Where(x => x.source == 0)
                      .Select(x => x.val);
      
      var streamSelector = observables.Switch();
      
      streamSelector.Subscribe(x => Console.WriteLine(x));
      observables.OnNext(merged);
      
      historic.OnNext(1);
      historic.OnNext(2);
      current.OnNext(5);
      historic.OnNext(3);
      current.OnNext(6);
      historic.OnNext(4);
      current.OnNext(7);
      historic.OnNext(5);
      current.OnNext(8);
      historic.OnNext(6);
      current.OnNext(9);
      historic.OnNext(7);
      current.OnNext(10);
      

      【讨论】:

      • 我已将示例数据添加到 OP 的问题中。你能根据你的要点提出一个可行的解决方案吗?
      • @Enigmativity 你确定吗?我理解这个问题的方式是,使用您添加的测试数据,只会输出历史值,因为“历史”赶不上“当前”。到当前时间为 10 时,历史时间为 7。
      • 我放的数据是基于OP的例子。
      • 你的代码返回1, 2, 5, 3, 4, 6, 7, 8, 9, 10给你的数据和1, 2, 5, 3, 6, 4, 7, 5, 8, 6, 9, 7, 10给定我提出的数据。两者都不正确。
      • @Enigmativity 当然,但 OP 非常明确地说 Current 不应该在 Historic 赶上之前发出。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-06-07
      • 2020-01-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-06-03
      • 2017-03-31
      相关资源
      最近更新 更多