【发布时间】:2020-11-03 08:44:26
【问题描述】:
我想要一种将异步方法转换为 observable 的通用方法。就我而言,我正在处理使用 HttpClient 从 API 获取数据的方法。
假设我们有一个方法Task<string> GetSomeData(),它需要成为单个Observable<string>,其中的值是由以下组合生成的:
- 重复定期调用
GetSomeData()(例如每 x 秒) - 在任何给定时间(例如,当用户点击刷新时)手动触发对
GetSomeData()的调用。
由于有两种方法可以触发GetSomeData() 的执行,并发可能是个问题。为了避免要求GetSomeData() 是线程安全的,我想限制并发性,以便只有一个线程同时执行该方法。因此,我需要使用某种策略来处理重叠的请求。我做了一个(某种)大理石图,试图描述问题和想要的结果
我的直觉告诉我有一个简单的方法可以实现这一点,所以请给我一些见解:)
这是我目前得到的解决方案。不幸的是,它并没有解决并发问题。
public class ObservableCreationWrapper<T>
{
private Subject<Unit> _manualCallsSubject = new Subject<Unit>();
private Func<Task<T>> _methodToCall;
private IObservable<T> _manualCalls;
public IObservable<T> Stream { get; private set; }
public ObservableCreationWrapper(Func<Task<T>> methodToCall, TimeSpan period)
{
_methodToCall = methodToCall;
_manualCalls = _manualCallsSubject.AsObservable()
.Select(x => Observable.FromAsync(x => methodToCall()))
.Merge(1);
Stream = Observable.FromAsync(() => _methodToCall())
.DelayRepeat(period)
.Merge(_manualCalls);
}
public void TriggerAdditionalCall()
{
_manualCallsSubject.OnNext(Unit.Default);
}
}
延迟重复的扩展方法:
static class Extensions
{
public static IObservable<T> DelayRepeat<T>(this IObservable<T> source, TimeSpan delay) => source
.Concat(
Observable.Create<T>(async observer =>
{
await Task.Delay(delay);
observer.OnCompleted();
}))
.Repeat();
}
包含生成 observable 方法的服务示例
class SomeService
{
private int _ticks = 0;
public async Task<string> GetSomeValueAsync()
{
//Just a hack to dermine if request was triggered manuall or by timer
var initiatationWay = (new StackTrace()).GetFrame(4).GetMethod().ToString().Contains("System.Threading.CancellationToken") ? "manually" : "by timer";
//Here we have a data race! We would like to limit access to this method
var valueToReturn = $"{_ticks} ({initiatationWay})";
await Task.Delay(500);
_ticks += 1;
return valueToReturn;
}
}
这样使用(会发生数据竞争):
static async Task Main(string[] args)
{
//Running this program will yield non deterministic results due to data-race in GetSomeValueAsync
var someService = new SomeService();
var stopwatch = Stopwatch.StartNew();
var observableWrapper = new ObservableCreationWrapper<string>(someService.GetSomeValueAsync, TimeSpan.FromMilliseconds(2000));
observableWrapper.Stream
.Take(6)
.Subscribe(x =>
{
Console.WriteLine($"{stopwatch.ElapsedMilliseconds} | Request: {x} fininshed");
});
await Task.Delay(4000);
observableWrapper.TriggerAdditionalCall();
observableWrapper.TriggerAdditionalCall();
Console.ReadLine();
}
【问题讨论】:
-
你说得对,它很简单,但请给我们一些代码。 “常规异步端点”是什么样的? “来自应用程序的信号”是什么样的?一个按钮点击?计时器?
-
@Enigmativity 我已经添加了到目前为止的示例代码。它有效,但不能完全满足我的要求。另外,我怀疑还有一些需要改进的地方。
-
我很难理解可接受解决方案的要求。你能设计一个marble diagram 来显示样本输入和输出数据吗?你不需要在 Photoshop 中绘制它,你可以像这样使用纯 ASCII 字符:
Source: +--1-2-3--4--|,Result: +--A-B-C--D--|。 -
@TheodorZoulias 感谢您的反馈!我重写了问题以使其更清晰,并添加了一个(某种)大理石图。
-
漂亮的大理石图!恕我直言,这个问题现在已经很清楚了。我不知道为什么它被否决了。
标签: c# asynchronous system.reactive rx.net