【发布时间】:2018-05-11 18:51:06
【问题描述】:
我有 2 个IConnectableObservables,其中一个正在重放旧的历史消息,另一个正在发出新的当前值:
HistoricObservable: - 1 - 2 - 3 - 4 - 5 - 6 - 7 - 8 - ...
CurrentObservable: - - - - - 5 - 6 - 7 - 8 - 9 - 10 - ...
如何将它们合并到一个可观察对象中,以便从两个可观察对象中获取完整(正确)序列,但一旦我开始从 CurrentObservable 发出值,也可以删除订阅并在 HistoricObservable 订阅上调用 Dispose。
MergedObservable: - 1 - 2 - 3 - 4 - 56 - 7 - 8 - 9 - 10 - ...
我的消息由 Guid 标识,因此该解决方案只能使用 Equal 比较它们,并且不能依赖于每个可观察对象发出的顺序之外的任何顺序。
简而言之,我正在寻找填充方法:
public static IObservable<T> MergeObservables<T>(
IObservable<T> historicObservable,
IObservable<T> currentObservable)
where T : IEquatable<T>
{
throw new NotImplementedException();
}
MergedObservable 应该继续从 HistoricObservable 发出值,而无需等待来自 CurrentObservable 的第一个值,如果 CurrentObservable 的第一个值之前已经发出,则 MergedObservable 应该跳过 CurrentObservable 中已经发出的任何值,处置对 HistoricObservable 的订阅,并开始从 CurrentObservable 中获取所有新值。当第一个对象由 CurrentObservable 发出时,我也不想立即切换,直到我到达 HistoricObservable 中的那个点,所以我一直很难尝试使用 TakeWhile/TakeUntil。我在使用 CombineLatest 来保存状态方面取得了一些小成功,但我认为可能有更好的方法。
测试用例
对于以下测试用例,假设每条消息都由一个 GUID 表示,如下所示:
A = E021ED8F-F0B7-44A1-B099-9878C6400F34
B = 1139570D-8465-4D7D-982F-E83A183619DE
C = 0AA2422E-19D9-49A7-9E8C-C9333FC46C46
D = F77D0714-2A02-4154-A44C-E593FFC16E3F
E = 14570189-4AAD-4D60-8780-BCDC1D23273D
F = B42983F0-5161-4165-A2F7-074698ECCE77
G = D2506881-F8AB-447F-96FA-896AEAAD1D0A
H = 3063CB7F-CD25-4287-85C3-67C609FA5679
I = 91200C69-CC59-4488-9FBA-AD2D181FD276
J = 2BEA364E-BE86-48FF-941C-4894CEF7A257
K = 67375907-8587-4D77-9C58-3E3254666303
L = C37C2259-C81A-4BC6-BF02-C96A34011479
M = E6F709BE-8910-42AD-A100-2801697496B0
N = 8741D0BB-EDA9-4735-BBAF-CE95629E880D
1) 如果历史 observable 永远不会赶上当前 observable,那么合并后的 observable 不应该从当前 observable 发出任何东西
Historic: - A - B - C - D - E - F - G - H|
Current: - - - - - - - - - - - - - - - I - J - K - L - M - N|
Merged: - A - B - C - D - E - F - G - H|
2) 一旦历史 observable 达到当前 observable 发出的第一个值,则合并后的 observable 应立即发出当前 observable 先前发出的所有值,并与历史 observable 断开连接。
Historic: - A - B - C - D - E - F - G - H - I - J|
Current: - - - - - - E - F - G - H - I - J|
Merged: - A - B - C - D - EF-G- H - I - J|
3) 解决方案应该能够在历史可观察对象之前处理来自当前可观察对象的值。
Historic: - - - - - A - B - C - D - E - F - G - H - I - J|
Current: - - C - D - E - F - G - H - I - J - K - L - M - N|
Merged: - - - - - A - B - CDEF-G-H- I - J - K - L - M - N|
4) 如果当前 observable 中的值已经发出,则解决方案应跳过它们,直到发出新值。
Historic: - A - B - C - D - E - F - G - H - I - J|
Current: - - - - - - - - B - C - D - E - F - G - H - I - J|
Merged: - A - B - C - D - - - - - - E - F - G - H - I - J|
5) 对于我的用例,我保证当前的 observable 将是历史的一个子集,但为了完整起见,我认为解决方案将继续从历史 observable 中提取,认为第一个元素将发生在后一点
Historic: - - - - - E - F - G - H - I - J - ... - Z - A|
Current: - - A - B - C - D - E - F - G - H - I - J|
Merged: - - - - - E - F - G - H - I - J - ... - Z - ABCDEFGHIJ|
6) 我还保证历史的 observable 一旦同步就不会与当前的 observable 不同,但是如果由于某种原因他们这样做了合并的 observable 应该已经与它断开连接并且不会选择消除任何差异
Historic: - A - B - C - D - E - D - C - B - A|
Current: - - - - - - E - F - G - H - I - J|
Merged: - A - B - C - D - EF-G- H - I - J|
创建有效解决方案的帮助,这里是一些输入数据:
var historic = new Subject<int>();
var current = new Subject<int>();
// query & subscription goes here
historic.OnNext(1);
historic.OnNext(2);
current.OnNext(5);
historic.OnNext(3);
current.OnNext(6);
historic.OnNext(4);
current.OnNext(7);
historic.OnNext(5);
current.OnNext(8);
historic.OnNext(6);
current.OnNext(9);
historic.OnNext(7);
current.OnNext(10);
正确的解应该产生从 1 到 10 的数字。
【问题讨论】:
-
我不确定测试数据是否反映了问题的目标。当 Historic 为 7 时,Current 为 10,因此如果我理解正确,则不应发出 Current 中的任何值。
-
请查看更新的答案和 cmets。
-
大家好,很抱歉没有尽我所能。使用示例测试用例更新了描述。还添加了一个新约束,因为我正在使用 GUID,并且只能使用 Equals() 进行比较
-
仍不清楚。如果您正在使用 guid 并重新表述问题,请去掉有序数字。
-
数字是传入消息的简单单字符表示。我可以尝试将它们切换为字母,以更好地展示我不依赖于它们的数字顺序的意图,但我宁愿不实际给出包含 10 多个 guid 的示例,因为那样会变得太混乱
标签: c# observable system.reactive