只需使用接受DateTimeOffset 值的Timer 的重载。您可以使用Defer 和Repeat 来创建“绝对间隔”。
Observable.Defer(() =>
DateTime.Now.Hour < 9
? Observable.Timer(DateTime.Today.AddHours(9))
: DateTime.Now.Hour < 13
? Observable.Timer(DateTime.Today.AddHours(13))
: Observable.Timer(DateTime.Today.AddDays(1).AddHours(9)))
.Repeat()
.Subscribe(...);
Rx automatically ensures,尽其所能,您的通知将在指定的确切日期和时间发生,即使是关于计时器漂移和如果系统时钟在计时器持续时间结束之前发生变化。
这是一个进一步概括问题的扩展方法。
用法:
Observable2.Daily(TimeSpan.FromHours(9), TimeSpan.FromHours(13)).Subscribe(...);
定义:
public static partial class Observable2
{
public static IObservable<long> Daily(params TimeSpan[] times)
{
Contract.Requires(times != null);
Contract.Requires(Contract.ForAll(times, time => time > TimeSpan.Zero));
Contract.Requires(Contract.ForAll(times, time => time.TotalDays < 1));
return Daily(Scheduler.Default, times);
}
public static IObservable<long> Daily(IScheduler scheduler, params TimeSpan[] times)
{
Contract.Requires(times != null);
Contract.Requires(Contract.ForAll(times, time => time > TimeSpan.Zero));
Contract.Requires(Contract.ForAll(times, time => time.TotalDays < 1));
if (times.Length == 0)
return Observable.Never<long>();
// Do not sort in place.
var sortedTimes = times.ToList();
sortedTimes.Sort();
return Observable.Defer(() =>
{
var now = DateTime.Now;
var next = sortedTimes.FirstOrDefault(time => now.TimeOfDay < time);
var date = next > TimeSpan.Zero
? now.Date.Add(next)
: now.Date.AddDays(1).Add(sortedTimes[0]);
Debug.WriteLine("Next @" + date + " from " + sortedTimes.Aggregate("", (s, t) => s + t + ", "));
return Observable.Timer(date, scheduler);
})
.Repeat()
.Scan(-1L, (n, _) => n + 1);
}
}
更新:
根据 Jeroen Mostert 的回答,如果您希望通过将输入定义为来自迭代器块的无限序列,从而在您的方法中更具“功能性”,那么您可以使用 Generate,如下所示。
Observable.Generate(
GetScheduleTimes().GetEnumerator(),
e => e.MoveNext(),
e => e,
e => e.Current,
e => e.Current);
Jeroen Mostert 的回答 (deleted) 提供了 GetScheduleTimes 的示例实现,但基本上它只是一个迭代器块,它在 while 循环中产生无限序列的 DateTimeOffset 值,每个循环将值的天数加 1。