似乎 Rx 库缺少将 IEnumerable<T> 转换为 IObservable<T> 的机制,方法是惰性枚举它并时移其元素。下面是一个自定义实现。这个想法是Zip 与一个主题可枚举的源,并通过在适当的时刻向主题发送OnNext 通知来控制枚举。
/// <summary>Converts an enumerable sequence to a time shifted observable sequence,
/// based on a time selector function for each element.</summary>
public static IObservable<T> ToObservable<T>(
this IEnumerable<T> source,
Func<T, DateTimeOffset> dueTimeSelector,
IScheduler scheduler = null)
{
scheduler ??= Scheduler.Default;
return Observable.Defer(() =>
{
var subject = new BehaviorSubject<Unit>(default);
return subject
.Zip(source, (_, x) => x)
.Delay(x => Observable.Timer(dueTimeSelector(x), scheduler))
.Do(_ => subject.OnNext(default));
});
}
之所以选择BehaviorSubject,是因为它有一个初始值,因此它可以让轮子自然地运动。
Observable.Defer 运算符用于防止多个订阅共享同一状态(本例中为 subject)并相互干扰。有关此here 的更多信息。
使用示例:
IEnumerable<Event> events = GetEvents();
IObservable<Event> observable = events.ToObservable(x => x.Timestamp);