【问题标题】:reactive net 4.2.0 await in subscribe响应式网络 4.2.0 在订阅中等待
【发布时间】:2019-11-12 15:48:58
【问题描述】:

我可以这样做吗?好像没有。

List<Int32> Seq = new List<Int32>() { 5, 6, 7, 8 };
IObservable<Int32> obs_seq = Seq.ToObservable();

obs_seq.Subscribe(
    onNext:
    async (_seqMember) =>
    {
        Stopwatch DelayWatc = Stopwatch.StartNew();
        await Task.Delay(delay: TimeSpan.FromSeconds(3));
        Debug.WriteLine($"{nameof(DelayWatc.Elapsed.TotalMilliseconds)} {DelayWatc.Elapsed.TotalMilliseconds:N1}.");                        
    });

如果我将 await 放在 Write 之后,它会立即打印所有这些。

如果我将 await 放在 Write 之前,它永远不会打印任何内容。

我认为没有人在等待订阅异步 lambda 任务操作。

在大约一万个 Ticks 的某个地方,它决定它不能再等待了,只是将那个 lambda 扔到数据洞中。或 5 万分之一秒。

await Task.Delay(delay: TimeSpan.FromSeconds( (Double) 1E-4 )); 将产生 4 个秒表(0.1 毫秒,3 个 0.0009 毫秒)。

await Task.Delay(delay: TimeSpan.FromSeconds((Double) 5E-4)); 从不产生任何输出

充其量是奇怪的。这是什么语义?协议是什么? 如果订阅不支持 ASYNC LAMBDA,即使编译也应该编译

更新

更长的等待确实会产生输出,但只有在 Thread.Sleep 完成之后,所以再说一遍,协议是什么? Thread.Sleep 是否应该阻止继续但 UI 消息泵线程不阻止?

充满希望的最终更新

IObservable<Int64> obs_interval = Observable.Interval(period: TimeSpan.FromSeconds(3));
obs_interval.Publish().Connect();

Task T = Task.Run(() =>
{
obs_interval.Latest().First();
});

while (!T.IsCompleted)
{
T.Wait(timeout: TimeSpan.FromMilliseconds(1E2));
Thread.Sleep(timeout: TimeSpan.FromMilliseconds(1E2));
Thread.Yield();
}

这将阻塞每个序列,直到间隔周期(+- 1E2 毫秒),但似乎没有办法让空闲时间返回,所以一旦订阅方法调用返回,所有线程都会被阻塞。

我觉得 Zip 是一个很好的解决方案,但有人向我解释了如何在不阻塞我的代码中的线程睡眠的情况下观察结果序列如何停止三秒钟。它必须通过提升调度进行硬编码,因为我的 subscribe lambda 不受支持。

我想要做的是将同步逻辑放在异步 lambda 中,表示任务等待名义上的一些网络延迟,然后如果没有完成,只需在下一个时钟滴答时发送另一个 CONGESTION PRODUCING ping。

【问题讨论】:

  • 因为您已将观察者标记为异步,序列中每个整数的观察者会立即执行,而无需等待前一个完成。我不确定是否可以让 .Subscribe() 等待每个异步完成。
  • 我拿了你的例子并运行它。看起来它像你预期的那样运行。 3 秒后 4 行。我只能看到一种情况,它无法向您返回结果,即您在任务完成(3 秒)之前停止应用程序。
  • 如果我在写入前等待 3 秒,则每个订阅会在 10k 滴答声后自发完成,然后剩下 2.9999 秒并且永远不会执行并且永远不会命中写入。
  • @Che 见上文...
  • 相关:如果我有两个时钟滴答我想兑现怎么办?但我不知道我的代码中哪个更长,没有信息,我只是做 Zip.Zip.Zip 等等?此外,为什么 Zip 的魔法限制为 16?

标签: c# reactive


【解决方案1】:

这似乎是答案,利用

Select...FromAsync...Concat...Subscribe

Howto call back async function from rx subscribe?

虽然结构的清晰度似乎很差。 Concat 是否总是与延迟的 Observables 表现相同? Concat 会始终尊重其 FromAsync 源的异步性质吗?

这会在多个平台(Java/Windows)上以相同的方式运行吗?

【讨论】:

    【解决方案2】:

    这就是我普遍寻找的:

    IObservable<Int32> obs_sync= Observable.Create<Int32>(subscribeAsync:
    (async (obsrvr) =>
    {
        Task _B = Task.Run(async () =>
        {
            while (DateTime.Now.Second % 2 != 0);
        });
    
         Task _AB = Task.Run(() =>
         {
            Observable.Interval(period: TimeSpan.FromMilliseconds(5E2)).Take(2);
         });
    
        while (!(_B.IsCompleted && _AB.IsCompleted))
        {
            await Task.Delay(TimeSpan.FromMilliseconds(15E1));
        }
    
    obsrvr.OnNext(0);
    
    }));
    
    Stopwatch BUSWATCH = Stopwatch.StartNew();
    obs_sync.Publish().Connect();
    Task.Run(() => obs_sync.Repeat().Take(5).Wait()).Wait();
    //Debug.WriteLine($"{nameof(DateTime.Now.Millisecond)} : {DateTime.Now.Millisecond:N2}.");
    Debug.WriteLine($"{nameof(BUSWATCH.Elapsed.TotalMilliseconds)} : {BUSWATCH.Elapsed.TotalMilliseconds:N2}.");
    
    猜你喜欢
    • 2020-03-16
    • 2023-02-06
    • 1970-01-01
    • 2021-06-08
    • 1970-01-01
    • 2023-03-04
    • 2021-05-06
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多