【发布时间】:2019-07-31 23:00:42
【问题描述】:
我有一个 Observable,我想以 10 为模对它们进行分组。我想让结果成为一个热门的 observable,并且当一个新订阅者订阅时,他得到了之前播放的所有 GroupedObservable,而不是获得密钥,我想要最新的值。我也想要更新,跳过最新的值。
在这个例子中,为了简单起见,我们只使用模数等于 5 的结果。但我希望我的解决方案适用于所有情况。
例子:
--------15-----25---------...
让我们在前面的示例中添加一些我们将订阅的时间点:
---S1-----15--S2---25-----S3----...
预期结果是:
S1 接收最新:15,更新:从 25 开始的 observable 和更新 %10 ==5,稍后将到达。 说明: 15 到后会通知 S1,15 是最新的元素,所以我马上要。第二个参数将是一个 observable,它将在未来产生 25 个和 %10 == 5 个元素。
S2 接收最新:15,更新:从 25 开始的 observable 和更新 %10 ==5,稍后将到达。 说明:订阅时会通知 S2,15 是最新的元素,所以我想要它。第二个参数将是一个 observable,它将在未来产生 25 个和 %10 == 5 个元素。
S3 接收最新:25,更新:更新 %10 ==5 的 observable 将稍后到达。 说明:订阅时会通知 S3,25 是最新的元素,所以我马上想要它。第二个参数将是一个 observable,它将在未来产生 %10 == 5 个元素。
这里有一些解决方法的尝试:
下面的代码使用了 Tuple 和 NUnit。
第一次尝试
[Test]
public void WhenWeGroupByReplaying1()
{
var subject = new Subject<uint>();
var observable = subject.GroupBy(t => t%10)
.Select(t =>
{
var connectableObservable = t.Replay(1);
connectableObservable.Connect();
return (key: t.Key, updates: connectableObservable);
}).Replay();
observable.Connect();
// I will block on the First of the lambda below
var getLastAndUpdates = observable
.Select(t => (first: t.updates.First(),updates: t.updates.Skip(1)));
getLastAndUpdates.Subscribe(t =>
{
Console.WriteLine($"[1] - FIRST: {t.first}");
t.updates.Subscribe(t2 => Console.WriteLine($"[1] - UPDATE: {t2}"));
});
subject.OnNext(15);
getLastAndUpdates.Subscribe(t =>
{
Console.WriteLine($"[2] - FIRST: {t.first}");
t.updates.Subscribe(t2 => Console.WriteLine($"[2] - UPDATE: {t2}"));
});
subject.OnNext(25);
getLastAndUpdates.Subscribe(t =>
{
Console.WriteLine($"[3] - FIRST: {t.first}");
t.updates.Subscribe(t2 => Console.WriteLine($"[3] - UPDATE: {t2}"));
});
}
此解决方案将被阻止,如评论所示。
第二次尝试
[Test]
public void WhenWeGroupByReplaying2()
{
var subject = new Subject<uint>();
var observable = subject.GroupBy(t => t, t => t, new ModuloEqualityComparer())
.Select(t =>
{
var connectableObservable = t.Publish(t.Key);
connectableObservable.Connect();
return (key: t.Key, updates: connectableObservable);
}).Replay();
observable.Connect();
var getLastAndUpdates = observable
.Select(t => (first: t.updates.First(),updates: t.updates.Skip(1)));
getLastAndUpdates.Subscribe(t =>
{
Console.WriteLine($"[1] - FIRST: {t.first}");
t.updates.Subscribe(t2 => Console.WriteLine($"[1] - UPDATE: {t2}"));
});
subject.OnNext(15);
getLastAndUpdates.Subscribe(t =>
{
Console.WriteLine($"[2] - FIRST: {t.first}");
t.updates.Subscribe(t2 => Console.WriteLine($"[2] - UPDATE: {t2}"));
});
subject.OnNext(25);
getLastAndUpdates.Subscribe(t =>
{
Console.WriteLine($"[3] - FIRST: {t.first}");
t.updates.Subscribe(t2 => Console.WriteLine($"[3] - UPDATE: {t2}"));
});
}
private class ModuloEqualityComparer : IEqualityComparer<uint>
{
public bool Equals(uint x, uint y)
{
return x % 10 == y % 10;
}
public int GetHashCode(uint obj)
{
return (obj % 10).GetHashCode();
}
}
结果:
[1] - FIRST: 15
[1] - UPDATE: 15
[2] - FIRST: 15
[1] - UPDATE: 25
[2] - UPDATE: 25
[3] - FIRST: 25
预期结果:(确切顺序不重要)
[1] - FIRST: 15
[2] - FIRST: 15
[1] - UPDATE: 25
[2] - UPDATE: 25
[3] - FIRST: 25
第三次尝试
[Test]
public void WhenWeGroupByReplaying3()
{
var subject = new Subject<uint>();
var observable = subject.GroupBy(t => (key: t%10, value:t), t => t, new ModuloEqualityComparer2())
.Select(t =>
{
var connectableObservable = t.Publish(t.Key.Item2);
connectableObservable.Connect();
return (key: t.Key, updates: connectableObservable);
}).Replay();
observable.Connect();
var getLastAndUpdates = observable
.Select(t => (first: t.updates.First(),updates: t.updates.Skip(1)));
getLastAndUpdates.Subscribe(t =>
{
Console.WriteLine($"[1] - FIRST: {t.first}");
t.updates.Subscribe(t2 => Console.WriteLine($"[1] - UPDATE: {t2}"));
});
subject.OnNext(15);
getLastAndUpdates.Subscribe(t =>
{
Console.WriteLine($"[2] - FIRST: {t.first}");
t.updates.Subscribe(t2 => Console.WriteLine($"[2] - UPDATE: {t2}"));
});
subject.OnNext(25);
getLastAndUpdates.Subscribe(t =>
{
Console.WriteLine($"[3] - FIRST: {t.first}");
t.updates.Subscribe(t2 => Console.WriteLine($"[3] - UPDATE: {t2}"));
});
}
private class ModuloEqualityComparer2 : IEqualityComparer<(uint,uint)>
{
private readonly ModuloEqualityComparer _moduloEqualityComparer = new ModuloEqualityComparer();
public bool Equals((uint, uint) x, (uint, uint) y)
{
return _moduloEqualityComparer.Equals(x.Item1, y.Item1);
}
public int GetHashCode((uint, uint) obj)
{
return _moduloEqualityComparer.GetHashCode(obj.Item1);
}
}
结果:
[1] - FIRST: 15
[1] - UPDATE: 15
[2] - FIRST: 15
[1] - UPDATE: 25
[2] - UPDATE: 25
[3] - FIRST: 25
预期结果:(确切顺序不重要)
[1] - FIRST: 15
[2] - FIRST: 15
[1] - UPDATE: 25
[2] - UPDATE: 25
[3] - FIRST: 25
感谢阅读。
【问题讨论】:
标签: c# system.reactive