【问题标题】:Making subject synchronization less painful使主题同步不那么痛苦
【发布时间】:2023-03-16 20:13:01
【问题描述】:

我想同步对BehaviorSubject<T> 的访问,所以我希望使用Subject.Synchronize。但是,我对这个界面有几个痛点,我想知道我是否错过了一种更令人满意的做事方式。

首先,我不得不同时存储原始主题和同步主题。这是因为我有时会在BehaviorSubject<T> 上使用Value 属性。也是因为Synchronize的返回值不是一次性的,所以我必须存储原始主题的一个实例才能正确处理。

其次,Synchronize的返回值为ISubject<TSource, TResult>,与ISubject<T>不兼容。

所以我最终得到了这样的代码:

public class SomeClass
{
    private readonly BehaviorSubject<string> something;
    private readonly ISubject<string, string> synchronizedSomething;

    public SomeClass()
    {
        this.something = new BehaviorSubject<string>(null);

        // having to provide the string type here twice is annoying
        this.synchronizedSomething = Subject.Synchronize<string, string>(this.something);
    }

    // must remember to use synchronizedSomething here (I forgot and had to edit my question again, showing how easy it is to screw this up)
    public IObservable<string> Something => this.synchronizedSomething.AsObservable();

    // could be called from any thread
    public void SomeMethod()
    {
        // do some work

        // also must be careful to use synchronizedSomething here
        this.synchronizedSomething.OnNext("some calculated value");
    }

    public void Dispose()
    {
        // synchronizedSomething is not disposable, so we must dispose the original subject
        this.something.Dispose();
    }
}

我是否缺少更清洁/更好的方法?为了清楚起见,我希望能够做的是这样的事情(伪代码):

public class SomeClass
{
    private readonly IBehaviorSubject<string> something;

    public SomeClass()
    {
        this.something = new BehaviorSubject<string>(null).Synchronized();
    }

    public IObservable<string> Something => this.something.AsObservable();

    // could be called from any thread
    public void SomeMethod()
    {
        // do some work

        this.something.OnNext("some calculated value");
    }

    public void Dispose()
    {
        this.something.Dispose();
    }
}

【问题讨论】:

  • 当你说同步访问时,你的意思是你想要原子写入吗?还是您的意思是您想要序列化读取(订阅者)?您能否将示例扩展为 1) 编译 - 您在 ctor 中缺少 BehaviorSubject 的默认值,2) 公开如何使用 SomeClass - 您只有一个 ctor 和一个 Dispose 方法并且从不公开主题。
  • @LeeCampbell:已编辑

标签: c# .net system.reactive


【解决方案1】:

我从您发布的代码示例中得到了一些注释

  1. IBehaviorSubject&lt;string&gt; 不是 Rx.NET 中定义的类型。也许你的意思是ISubject&lt;string&gt;
  2. 您将null 作为默认值传递给BehaviorSubject&lt;T&gt;,通常当我看到这个时,用户实际上只是想要ReplaySubject&lt;string&gt;(1)。这取决于您在代码库中的某处是否有 Where(x=&gt;x!=null)Skip(1) 作为补偿行为。
  3. 也许您想使用静态方法Subject.Synchronize(ISubject&lt;T&gt;) 而不是扩展方法.Synchronized()

这可能是您上面示例代码的合适替代品。

public class SomeClass
{
    //Exposed as ISubject as I can't see usage of `Value` and `TryGetValue` are not present.
    private readonly ISubject<string> something;

    public SomeClass()
    {
        var source = new BehaviorSubject<string>(null);
        //Maybe this is actually what you want?
        //var source = new ReplaySubject<string>(1);
        this.something = Subject.Synchronize(source);
    }

    public IObservable<string> Something => this.something.AsObservable();

    // could be called from any thread
    public void SomeMethod()
    {
        // do some work

        this.something.OnNext("some calculated value");
    }
}

【讨论】:

  • 呃,谢谢,但我确实需要使用Value 属性。我在我的问题中说过这一点,但忽略了将其包含在代码示例中。代码只是伪代码。我知道没有IBehaviorSubject&lt;T&gt;,但我觉得我需要一个能够针对同步主题访问Value。此外,您的代码不处理处置。
  • 好的,我只是想猜测您的要求是什么,因为您的示例代码无法编译。
  • 您没有显示您希望同步访问主题的位置(或原因)。您想访问Value,但不需要同步?通常使用 Rx,最好序列化(使用调度程序)而不是同步(使用锁)。很抱歉我帮不上什么忙。
【解决方案2】:
public class SynchronizeBehaviorSubject<T> : ISubject<T>, IDisposable
{
    private readonly BehaviorSubject<T> _source;
    private readonly ISubject<T> _sourceSync;

    public SynchronizeBehaviorSubject(BehaviorSubject<T> source)
    {
        _source = source;
        _sourceSync = source.Synchronize();
    }

    public void OnCompleted() => _sourceSync.OnCompleted();

    public void OnError(Exception error) => _sourceSync.OnError(error);

    public void OnNext(T value) => _sourceSync.OnNext(value);

    public IDisposable Subscribe(IObserver<T> observer) => _sourceSync.Subscribe(observer);

    public T Value => _source.Value;

    public bool HasObservers => _source.HasObservers;
    public void Dispose() => _source.Dispose();
    public bool IsDisposed => _source.IsDisposed;
}

public static class ReactiveEx
{

    public static ISubject<T> Synchronize<T>(this ISubject<T> source) =>
        Subject.Synchronize(source);

    public static SynchronizeBehaviorSubject<T> Synchronize<T>(this BehaviorSubject<T> source) =>
        new SynchronizeBehaviorSubject<T>(source);
}

用法:

private readonly SynchronizeBehaviorSubject<bool> _isBool 
                    = new BehaviorSubject(false).Synchronize();

bool isBool = _isBool.Value;

如果你不需要Value,甚至是ISubject&lt;T&gt;

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-12-24
    • 1970-01-01
    • 1970-01-01
    • 2016-01-25
    • 2012-02-21
    • 1970-01-01
    相关资源
    最近更新 更多