【问题标题】:Create observable from periodic async request从周期性异步请求中创建 observable
【发布时间】:2020-11-03 08:44:26
【问题描述】:

我想要一种将异步方法转换为 observable 的通用方法。就我而言,我正在处理使用 HttpClient 从 API 获取数据的方法。

假设我们有一个方法Task<string> GetSomeData(),它需要成为单个Observable<string>,其中的值是由以下组合生成的:

  • 重复定期调用GetSomeData()(例如每 x 秒)
  • 在任何给定时间(例如,当用户点击刷新时)手动触发对 GetSomeData() 的调用。

由于有两种方法可以触发GetSomeData() 的执行,并发可能是个问题。为了避免要求GetSomeData() 是线程安全的,我想限制并发性,以便只有一个线程同时执行该方法。因此,我需要使用某种策略来处理重叠的请求。我做了一个(某种)大理石图,试图描述问题和想要的结果

我的直觉告诉我有一个简单的方法可以实现这一点,所以请给我一些见解:)

这是我目前得到的解决方案。不幸的是,它并没有解决并发问题。

    public class ObservableCreationWrapper<T>
    {
        private Subject<Unit> _manualCallsSubject = new Subject<Unit>();
        private Func<Task<T>> _methodToCall;
        private IObservable<T> _manualCalls;

        public IObservable<T> Stream { get; private set; }

        public ObservableCreationWrapper(Func<Task<T>> methodToCall, TimeSpan period)
        {
            _methodToCall = methodToCall;
            _manualCalls = _manualCallsSubject.AsObservable()
                .Select(x => Observable.FromAsync(x => methodToCall()))
                .Merge(1);

            Stream = Observable.FromAsync(() => _methodToCall())
                .DelayRepeat(period)
                .Merge(_manualCalls);
        }

        public void TriggerAdditionalCall()
        {
            _manualCallsSubject.OnNext(Unit.Default);
        }
    }

延迟重复的扩展方法:

static class Extensions
{
    public static IObservable<T> DelayRepeat<T>(this IObservable<T> source, TimeSpan delay) => source
        .Concat(
            Observable.Create<T>(async observer =>
            {
                await Task.Delay(delay);
                observer.OnCompleted();
            }))
        .Repeat();
}

包含生成 observable 方法的服务示例

class SomeService
{
    private int _ticks = 0;

    public async Task<string> GetSomeValueAsync()
    {
        //Just a hack to dermine if request was triggered manuall or by timer
        var initiatationWay = (new StackTrace()).GetFrame(4).GetMethod().ToString().Contains("System.Threading.CancellationToken") ? "manually" : "by timer";

        //Here we have a data race! We would like to limit access to this method 
        var valueToReturn = $"{_ticks} ({initiatationWay})";

        await Task.Delay(500);
        _ticks += 1; 
        return valueToReturn;
    }
}

这样使用(会发生数据竞争):

static async Task Main(string[] args)
{
    //Running this program will yield non deterministic results due to data-race in GetSomeValueAsync
    var someService = new SomeService();
    var stopwatch = Stopwatch.StartNew();
    var observableWrapper = new ObservableCreationWrapper<string>(someService.GetSomeValueAsync, TimeSpan.FromMilliseconds(2000));
    observableWrapper.Stream
        .Take(6)
        .Subscribe(x => 
            {
                Console.WriteLine($"{stopwatch.ElapsedMilliseconds} | Request: {x} fininshed");
            });

    await Task.Delay(4000);
    observableWrapper.TriggerAdditionalCall();
    observableWrapper.TriggerAdditionalCall();
    Console.ReadLine();
}

【问题讨论】:

  • 你说得对,它很简单,但请给我们一些代码。 “常规异步端点”是什么样的? “来自应用程序的信号”是什么样的?一个按钮点击?计时器?
  • @Enigmativity 我已经添加了到目前为止的示例代码。它有效,但不能完全满足我的要求。另外,我怀疑还有一些需要改进的地方。
  • 我很难理解可接受解决方案的要求。你能设计一个marble diagram 来显示样本输入和输出数据吗?你不需要在 Photoshop 中绘制它,你可以像这样使用纯 ASCII 字符:Source: +--1-2-3--4--|Result: +--A-B-C--D--|
  • @TheodorZoulias 感谢您的反馈!我重写了问题以使其更清晰,并添加了一个(某种)大理石图。
  • 漂亮的大理石图!恕我直言,这个问题现在已经很清楚了。我不知道为什么它被否决了。

标签: c# asynchronous system.reactive rx.net


【解决方案1】:

这是我对这个问题的看法:


更新:通过借鉴 Enigmativity 的 answer 的想法,我能够大大简化我建议的解决方案。 Observable.StartAsync 方法自动处理取消的杂乱事务¹,并且可以简单地通过使用 SemaphoreSlim 来强制执行非重叠执行的要求。

/// <summary>
/// Creates an observable sequence containing the results of an asynchronous
/// function that is invoked periodically and manually. Overlapping invocations
/// are prevented. Timer ticks that would cause overlapping are ignored.
/// Manual invocations cancel previous invocations, and restart the timer.
/// </summary>
public static IObservable<T> PeriodicAndManual<T>(
    Func<bool, CancellationToken, Task<T>> functionAsync,
    TimeSpan period,
    out Action manualInvocation)
{
    // Arguments validation omitted
    var manualSubject = new Subject<bool>();
    manualInvocation = () => manualSubject.OnNext(true);
    return Observable.Defer(() =>
    {
        var semaphore = new SemaphoreSlim(1, 1); // Ensure no overlapping
        return Observable
            .Interval(period)
            .Select(_ => false) // Not manual
            .Merge(manualSubject)
            .TakeUntil(isManual => isManual) // Stop on first manual
            .Repeat() // ... and restart the timer
            .Prepend(false) // Skip the initial interval delay
            .Select(isManual =>
            {
                if (isManual)
                {
                    // Triggered manually
                    return Observable.StartAsync(async ct =>
                    {
                        await semaphore.WaitAsync(ct);
                        try { return await functionAsync(isManual, ct); }
                        finally { semaphore.Release(); }
                    });
                }
                else if (semaphore.Wait(0))
                {
                    // Triggered by the timer and semaphore acquired synchronously
                    return Observable
                        .StartAsync(ct => functionAsync(isManual, ct))
                        .Finally(() => semaphore.Release());
                }
                return null; // Otherwise ignore the signal
            })
            .Where(op => op != null)
            .Switch(); // Pending operations are unsubscribed and canceled
    });
}

out Action manualInvocation 参数是触发手动调用的机制。

使用示例:

int ticks = 0;
var subscription = PeriodicAndManual(async (isManual, token) =>
{
    var id = $"{++ticks} " + (isManual ? "manual" : "periodic");
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Begin {id}");
    await Task.Delay(500, token);
    return id;
}, TimeSpan.FromMilliseconds(1000), out var manualInvocation)
.Do(x => Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Received #{x}"))
.Subscribe();

await Task.Delay(3200);
manualInvocation();
await Task.Delay(200);
manualInvocation();
await Task.Delay(3200);

subscription.Dispose();

输出:

19:52:43.684 Begin 1 periodic
19:52:44.208 Received #1 periodic
19:52:44.731 Begin 2 periodic
19:52:45.235 Received #2 periodic
19:52:45.729 Begin 3 periodic
19:52:46.232 Received #3 periodic
19:52:46.720 Begin 4 periodic
19:52:46.993 Begin 5 manual
19:52:47.220 Begin 6 manual
19:52:47.723 Received #6 manual
19:52:48.223 Begin 7 periodic
19:52:48.728 Received #7 periodic
19:52:49.227 Begin 8 periodic
19:52:49.730 Received #8 periodic
19:52:50.226 Begin 9 periodic

使用ScanDistinctUntilChanged 运算符以便在前一个异步操作运行时删除元素的技术是从this 问题借用的。

¹ Rx 库似乎并不能令人满意地处理这种混乱的业务,因为它只是 omits disposing of the CancellationTokenSources it creates

【讨论】:

  • 如果Observable.StartAsync 返回一个既是Task 又是IObservable 的类型,实现会变得更加简单。但可惜不是。创建这样的类型是可能的,但是tricky.
  • 非常令人印象深刻的答案!我已经测试过了,它可以工作并解决我的问题。我很可能会标记为正确答案,但首先我想完全理解所有建议的答案。 @TheodorZoulias 是防止此版本重叠所必需的 CancellationToken?
  • @figursagsmats 谢谢老兄!不,观察CancellationToken 是可选的,不会影响单并发执行限制的执行。但最好functionAsync 对取消信号迅速作出反应。否则,丢弃的周期性操作可能会在后台继续运行,从而阻止手动请求的操作及时启动。
【解决方案2】:

这是您需要的查询:

var subject = new Subject<Unit>();
var delay = TimeSpan.FromSeconds(1.0);

IObservable<string> query =
    subject
        .StartWith(Unit.Default)
        .Select(x => Observable.Timer(TimeSpan.Zero, delay))
        .Switch()
        .SelectMany(x => Observable.FromAsync(() => GetSomeData()));

如果您在任何时候调用subject.OnNext(Unit.Default),它将立即触发对GetSomeData 的调用,然后根据delay 中设置的TimeSpan 重复调用。

.StartWith(Unit.Default) 的使用将设置在有订阅者时立即进行查询。

使用.Switch() 会取消任何基于正在调用的新subject.OnNext(Unit.Default) 的挂起操作。

这应该与您的大理石图相匹配。


以上版本没有引入值之间的延迟。

版本 2 应该。

var subject = new Subject<Unit>();
var delay = TimeSpan.FromSeconds(5.0);

var source = Observable.FromAsync(() => GetSomeData());

IObservable<string> query =
    subject
        .StartWith(Unit.Default)
        .Select(x => source.Expand(n => Observable.Timer(delay).SelectMany(y => source)))
        .Switch();

我使用Expand 运算符在值之间引入延迟。只要source 只产生一个值(FromAsync 会产生一个值),这应该可以正常工作。

【讨论】:

  • 这个实现中有一些非常巧妙和巧妙的技巧!但似乎这并不能防止重叠执行(由持续时间大于delay 的任务引起),并且在触发手动调用时它也不会取消正在运行的操作。所以所有调用的结果最终都会出现在结果流中。
  • @TheodorZoulias - 它肯定会用.Switch() 取消,但是使用Task&lt;string&gt; GetSomeData() 的签名没有办法取消它,除非它更改为Task&lt;string&gt; GetSomeData(CancellationToken ct) - 然后最后一行查询变为.SelectMany(x =&gt; Observable.FromAsync(ct =&gt; GetSomeData(ct)));
  • @TheodorZoulias - 我已经用一个应该添加所需延迟的查询更新了我的答案。它在我的测试中有效。
  • 我尝试了SwitchObservable.FromAsync 的可取消版本。很有意思!尽管它要求Switch 影响FromAsync 可观察对象,但在您的版本1 示例中并非如此。相反,它会影响Timer observables。 FromAsync 是在Switch 之后创建的,它们的取消会在它们正常完成后立即发生(这为时已晚且无效)。取消也是“尽力而为”,这意味着它是合作的。在开始新任务之前不会等待旧任务,因此仍然可以重叠。
  • 第 2 版也很有趣。从好的方面来说,取消是功能性的并且避免了重叠(前提是GetSomeData 尊重令牌并在收到通知时同步完成)。缺点是它使用Experimental 运算符,并且调用不是周期性的。在完成一个动作和开始下一个动作之间有一个恒定的延迟,而不是在后续动作的起点之间有一个恒定的延迟(这是 OP 的大理石图所指示的)。
【解决方案3】:

我建议不要尝试取消已经开始的通话。事情会变得太混乱。 如果 GetSomeValueAsync 中的逻辑涉及数据库调用和/或 Web API 调用,您根本无法真正取消调用。

我认为这里的关键是确保对 GetSomeValueAsync 的所有调用都被序列化。

我基于 Enigmativity 的版本 1 创建了以下解决方案。 它在 asp.net core 3.1 上的 webassembly blazor 页面上进行了测试,运行良好。

private int _ticks = 0; //simulate a resource you want serialized access

//for manual event, trigger will be 0; for Timer event, trigger will be 1,2,3...
protected async Task<string> GetSomeValueAsync(string trigger)
{
    var valueToReturn = $"{DateTime.Now.Ticks.ToString()}: {_ticks.ToString()} | ({trigger})";

    await Task.Delay(1000);
    _ticks += 1;
    return valueToReturn;
}

//define two subjects
private Subject<string> _testSubject = new Subject<string>();
private Subject<string> _getDataSubject = new Subject<string>();

//driving observable, based on Enigmativity's Version 1
var delay = TimeSpan.FromSeconds(3.0);
IObservable<string> getDataObservable =
    _testSubject
   .StartWith("Init")
   .Select(x => Observable.Timer(TimeSpan.Zero, delay).Select(i => i.ToString()))
   .Switch()
   .WithLatestFrom(_getDataSubject.AsObservable().StartWith("IDLE"))
   .Where(a => a.Second == "IDLE")
   .Select(a => a.First);

//_disposables is CompositeDisposable defined in the page
_disposables.Add(getDataObservable.Subscribe(async t =>
{
     _getDataSubject.OnNext("WORKING");
     //_service.LogToConsole is my helper function to log data to console
     await _service.LogToConsole(await GetSomeValueAsync(t)); 
     _getDataSubject.OnNext("IDLE");
}));

就是这样。我使用一个按钮来触发手动事件。 输出中的_ticks始终是按顺序排列的,即没有发生重叠。

【讨论】:

  • 只需在页面上添加一个按钮并点击它 -
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多