【问题标题】:GroupBy with latest element and updates instead of key and updatesGroupBy 具有最新的元素和更新,而不是键和更新
【发布时间】:2019-07-31 23:00:42
【问题描述】:

我有一个 Observable,我想以 10 为模对它们进行分组。我想让结果成为一个热门的 observable,并且当一个新订阅者订阅时,他得到了之前播放的所有 GroupedObservable,而不是获得密钥,我想要最新的值。我也想要更新,跳过最新的值。

在这个例子中,为了简单起见,我们只使用模数等于 5 的结果。但我希望我的解决方案适用于所有情况。

例子:

--------15-----25---------...

让我们在前面的示例中添加一些我们将订阅的时间点:

---S1-----15--S2---25-----S3----...

预期结果是:

  • S1 接收最新:15,更新:从 25 开始的 observable 和更新 %10 ==5,稍后将到达。 说明: 15 到后会通知 S1,15 是最新的元素,所以我马上要。第二个参数将是一个 observable,它将在未来产生 25 个和 %10 == 5 个元素。

  • S2 接收最新:15,更新:从 25 开始的 observable 和更新 %10 ==5,稍后将到达。 说明:订阅时会通知 S2,15 是最新的元素,所以我想要它。第二个参数将是一个 observable,它将在未来产生 25 个和 %10 == 5 个元素。

  • S3 接收最新:25,更新:更新 %10 ==5 的 observable 将稍后到达。 说明:订阅时会通知 S3,25 是最新的元素,所以我马上想要它。第二个参数将是一个 observable,它将在未来产生 %10 == 5 个元素。

这里有一些解决方法的尝试:

下面的代码使用了 Tuple 和 NUnit。

第一次尝试

[Test]
public void WhenWeGroupByReplaying1()
{
    var subject = new Subject<uint>();

    var observable = subject.GroupBy(t => t%10)
        .Select(t =>
        {
            var connectableObservable = t.Replay(1);
            connectableObservable.Connect();
            return (key: t.Key, updates: connectableObservable);
        }).Replay();

    observable.Connect();

    // I will block on the First of the lambda below
    var getLastAndUpdates = observable
        .Select(t => (first: t.updates.First(),updates: t.updates.Skip(1)));

    getLastAndUpdates.Subscribe(t =>
    {
        Console.WriteLine($"[1] - FIRST: {t.first}");
        t.updates.Subscribe(t2 => Console.WriteLine($"[1] - UPDATE: {t2}"));
    });

    subject.OnNext(15);

    getLastAndUpdates.Subscribe(t =>
    {
        Console.WriteLine($"[2] - FIRST: {t.first}");
        t.updates.Subscribe(t2 => Console.WriteLine($"[2] - UPDATE: {t2}"));
    });

    subject.OnNext(25);

    getLastAndUpdates.Subscribe(t =>
    {
        Console.WriteLine($"[3] - FIRST: {t.first}");
        t.updates.Subscribe(t2 => Console.WriteLine($"[3] - UPDATE: {t2}"));
    });
}

此解决方案将被阻止,如评论所示。

第二次尝试

[Test]
public void WhenWeGroupByReplaying2()
{
    var subject = new Subject<uint>();

    var observable = subject.GroupBy(t => t, t => t, new ModuloEqualityComparer())
        .Select(t =>
        {
            var connectableObservable = t.Publish(t.Key);
            connectableObservable.Connect();
            return (key: t.Key, updates: connectableObservable);
        }).Replay();

    observable.Connect();

    var getLastAndUpdates = observable
        .Select(t => (first: t.updates.First(),updates: t.updates.Skip(1)));

    getLastAndUpdates.Subscribe(t =>
    {
        Console.WriteLine($"[1] - FIRST: {t.first}");
        t.updates.Subscribe(t2 => Console.WriteLine($"[1] - UPDATE: {t2}"));
    });

    subject.OnNext(15);

    getLastAndUpdates.Subscribe(t =>
    {
        Console.WriteLine($"[2] - FIRST: {t.first}");
        t.updates.Subscribe(t2 => Console.WriteLine($"[2] - UPDATE: {t2}"));
    });

    subject.OnNext(25);

    getLastAndUpdates.Subscribe(t =>
    {
        Console.WriteLine($"[3] - FIRST: {t.first}");
        t.updates.Subscribe(t2 => Console.WriteLine($"[3] - UPDATE: {t2}"));
    });
}

private class ModuloEqualityComparer : IEqualityComparer<uint>
{
    public bool Equals(uint x, uint y)
    {
        return x % 10 == y % 10;
    }

    public int GetHashCode(uint obj)
    {
        return (obj % 10).GetHashCode();
    }
}

结果:

[1] - FIRST: 15
[1] - UPDATE: 15
[2] - FIRST: 15
[1] - UPDATE: 25
[2] - UPDATE: 25
[3] - FIRST: 25

预期结果:(确切顺序不重要)

[1] - FIRST: 15
[2] - FIRST: 15
[1] - UPDATE: 25
[2] - UPDATE: 25
[3] - FIRST: 25

第三次尝试

[Test]
public void WhenWeGroupByReplaying3()
{
    var subject = new Subject<uint>();

    var observable = subject.GroupBy(t => (key: t%10, value:t), t => t, new ModuloEqualityComparer2())
        .Select(t =>
        {
            var connectableObservable = t.Publish(t.Key.Item2);
            connectableObservable.Connect();
            return (key: t.Key, updates: connectableObservable);
        }).Replay();

    observable.Connect();

    var getLastAndUpdates = observable
        .Select(t => (first: t.updates.First(),updates: t.updates.Skip(1)));

    getLastAndUpdates.Subscribe(t =>
    {
        Console.WriteLine($"[1] - FIRST: {t.first}");
        t.updates.Subscribe(t2 => Console.WriteLine($"[1] - UPDATE: {t2}"));
    });

    subject.OnNext(15);

    getLastAndUpdates.Subscribe(t =>
    {
        Console.WriteLine($"[2] - FIRST: {t.first}");
        t.updates.Subscribe(t2 => Console.WriteLine($"[2] - UPDATE: {t2}"));
    });

    subject.OnNext(25);

    getLastAndUpdates.Subscribe(t =>
    {
        Console.WriteLine($"[3] - FIRST: {t.first}");
        t.updates.Subscribe(t2 => Console.WriteLine($"[3] - UPDATE: {t2}"));
    });
}

private class ModuloEqualityComparer2 : IEqualityComparer<(uint,uint)>
{
    private readonly ModuloEqualityComparer _moduloEqualityComparer = new ModuloEqualityComparer();
    public bool Equals((uint, uint) x, (uint, uint) y)
    {
        return _moduloEqualityComparer.Equals(x.Item1, y.Item1);
    }


    public int GetHashCode((uint, uint) obj)
    {
        return _moduloEqualityComparer.GetHashCode(obj.Item1);
    }
}

结果:

[1] - FIRST: 15
[1] - UPDATE: 15
[2] - FIRST: 15
[1] - UPDATE: 25
[2] - UPDATE: 25
[3] - FIRST: 25

预期结果:(确切顺序不重要)

[1] - FIRST: 15
[2] - FIRST: 15
[1] - UPDATE: 25
[2] - UPDATE: 25
[3] - FIRST: 25

感谢阅读。

【问题讨论】:

    标签: c# system.reactive


    【解决方案1】:

    我不完全确定您要达到的目标,但希望这会对您有所帮助:

    您的代码有几个问题:

    1. .First() 已过时是有原因的。你不应该在 Rx 中使用阻塞代码
    2. .Replay() 需要虚拟订阅才能正常工作。我不确定这是否是困扰您的代码的原因,但为了实现您的目标,您需要这样做。
    3. 嵌套订阅通常是个坏主意。我已将嵌套订阅替换为 .Merge()

    如果这不能解决您的问题,我建议您修改您的问题以描述您尝试通过使用 Rx 来完成什么。这闻起来有点像XY situation

    代码如下:

    var subject = new Subject<uint>();
    
    var observable = subject.GroupBy(t => t % 10)
        .Select(t => t.Replay(1).RefCount()).Replay().RefCount();
    
    // dummy subscriptions required for Replay to work correctly.
    var dummySub = observable.Merge().Subscribe();
    
    observable
        .Select(o => o.Select((t, index) => (t.Key, t.num, index)))
        .Merge()
        .Subscribe(t =>
        {
            if (t.index == 0)
                Console.WriteLine($"[1] - FIRST: {t.num}");
            else
                Console.WriteLine($"[1] - UPDATE: {t.num}");
        });
    
    subject.OnNext(15);
    
    observable
        .Select(o => o.Select((t, index) => (t.Key, t.num, index)))
        .Merge()
        .Subscribe(t =>
        {
            if (t.index == 0)
                Console.WriteLine($"[1] - FIRST: {t.num}");
            else
                Console.WriteLine($"[1] - UPDATE: {t.num}");
        });
    
    subject.OnNext(25);
    
    observable
        .Select(o => o.Select((t, index) => (t.Key, t.num, index)))
        .Merge()
        .Subscribe(t =>
        {
            if (t.index == 0)
                Console.WriteLine($"[1] - FIRST: {t.num}");
            else
                Console.WriteLine($"[1] - UPDATE: {t.num}");
        });
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-11-13
      • 1970-01-01
      • 1970-01-01
      • 2019-04-21
      • 1970-01-01
      相关资源
      最近更新 更多