【问题标题】:Interrupt processing of current item if new items arrive如果新项目到达,则中断当前项目的处理
【发布时间】:2017-07-22 00:05:22
【问题描述】:

我有以下代码:

        Observable.FromEventPattern<PropertyChangingEventArgs>(_parent, "PropertyChanged")
            .Subscribe(ep => 
            {
                switch (ep.EventArgs.PropertyName)
                {
                    case "ValorBlurMenor":
                        LoadBlurMenor();
                        break;
                    case "ValorBlurMaior":
                        LoadBlurMaior();
                        break;
                }
            });

我的问题是 ValorBlurMenorValorBlurMaior 是通过 WPF 滑块更改的,LoadBlurMaior()LoadBlurMenor() 需要一些时间来评估。

如果有新项目到达,我正在寻找一种方法来中断/取消项目的 Subscribe 委托的执行,因此始终只对最后一个项目执行处理。

【问题讨论】:

  • 您能否澄清您所说的“LoadBlurMaior()LoadBlurMenor() 需要一些时间来评估”是什么意思?在处理下一个值之前,Rx 将始终等到前一个订阅委托完成。因此,除非您的意思是 LoadBlurMaior()LoadBlurMenor() 触发某种异步行为,否则您的代码应该已经按照您的要求进行。
  • @Enigmativity 感谢您的关注。当我通过 UI 滑块更改此视图模型中的属性时,我开始触发由 LoadBlur**() 方法执行的一些更改。但是,如果在该处理结束之前,我最终再次更改了滑块中的值,我不想等待前面的处理完成。相反,我想立即处理最新数据,丢弃以前数据的部分进度。我想这或多或少是Observable.Switch 所做的。
  • 有什么“从以前的数据中取得的部分进展”? Rx 从来没有部分进展。是 UI 有部分进度吗?如果是这样,那么这是一个 WPF 问题,与 Rx 无关。
  • @Enigmativity 抱歉,如果我不清楚。假设一个朋友给了我一个橙子,让我去皮。我开始剥橘子,但还没等我完成,他就得到了另一个一个。我所说的“从以前的数据中取得部分进展”的意思是我不再使用的部分剥皮的橙子(不是最好的例子,但无论如何)。我的目标是总是立即开始剥最新的橙子,并且最好在新橙子到来时立即停止剥旧橙子。这与“实时搜索”示例没有什么不同。
  • 所以 Rx 会这样:假设一个朋友(observable)给了我一个橙子(value)并要求我去皮(observer),然后我的朋友会耐心地等到我第一次去皮在告诉我还有一个(如果有的话)之前橙色。

标签: c# wpf system.reactive reactive-programming


【解决方案1】:

让它工作的最简单方法是使用 Observable.FromAsync&lt;&gt; 重载,它使用 CancellationToken 并使用代码中的令牌来经常检查取消:

public class PlainNotifiable : INotifyPropertyChanged
{
    private bool takeFiveSeconds;
    private bool takeTenSeconds;
    public event PropertyChangedEventHandler PropertyChanged;

    public bool TakeFiveSeconds
    {
        get => this.takeFiveSeconds;
        set
        {
            this.takeFiveSeconds = value;
            this.PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(this.TakeFiveSeconds)));
        }
    }
    public bool TakeTenSeconds
    {
        get => this.takeTenSeconds;
        set
        {
            this.takeTenSeconds = value;
            this.PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(this.TakeTenSeconds)));
        }
    }

    public async Task TakeSomeTime(int seconds, CancellationToken token = default(CancellationToken))
    {
        Trace.TraceInformation("Started waiting {0} seconds", seconds);
        await Task.Delay(seconds * 1000, token);
        Trace.TraceInformation("Stoped waiting {0} seconds", seconds);

    }
}
public static async void Test()
{
    var test = new PlainNotifiable();
    async Task<Unit> propertyNameToLongTask(EventPattern<PropertyChangedEventArgs> args, CancellationToken token)
    {
        switch (args.EventArgs.PropertyName)
        {
            case nameof(test.TakeFiveSeconds):
                await test.TakeSomeTime(5, token);
                break;

            case nameof(test.TakeTenSeconds):
                await test.TakeSomeTime(10, token);
                break;
        }
        return Unit.Default;

    }

    Observable.FromEventPattern<PropertyChangedEventArgs>(test, nameof(test.PropertyChanged))
        .Select(x => Observable.FromAsync(token => propertyNameToLongTask(x, token)))
        .Switch()
        .Subscribe(x => Trace.TraceInformation("Beep boop"));

    Trace.TraceInformation("Started sending notifications");
    await Task.Delay(1000);
    test.TakeTenSeconds = true;
    await Task.Delay(2000);
    test.TakeFiveSeconds = true;
    Trace.TraceInformation("Finished sending notifications");

}

它给出以下输出:

SandBox.exe Information: 0 : Started sending notifications
SandBox.exe Information: 0 : Started waiting 10 seconds
SandBox.exe Information: 0 : Started waiting 5 seconds
SandBox.exe Information: 0 : Finished sending notifications
Exception thrown: 'System.Threading.Tasks.TaskCanceledException' in mscorlib.dll
Exception thrown: 'System.Threading.Tasks.TaskCanceledException' in mscorlib.dll
SandBox.exe Information: 0 : Stoped waiting 5 seconds
SandBox.exe Information: 0 : Beep boop

重点是:

  • Observable.FromAsync():使用正确的 CancellationToken 支持创建一个 observable
  • Switch():通过仅订阅最新的 observable 来进行扁平化,它还使用 CancellationToken 属性处置以前的 observable

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2014-09-07
    • 1970-01-01
    • 2019-03-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多