【发布时间】:2021-12-13 04:57:10
【问题描述】:
C# rx 中有没有办法处理背压? 我正在尝试从分页查询的结果中调用 Web api。这个 web api 非常脆弱,我需要不超过 3 个并发调用,所以,程序应该是这样的:
- 从 db 获取页面
- 调用 web api,页面上每条记录最多三个并发调用
- 将结果保存回数据库
- 获取另一个页面并重复,直到没有更多结果。
我并没有真正得到我所追求的序列,基本上数据库会获取所有记录,无论它们是否可以处理。
我尝试了各种方法,包括调整 ObserveOn 运算符、实现信号量以及其他一些事情。我可以得到一些指导来实现这样的东西吗?
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
using Castle.Core.Internal;
using Xunit;
using Xunit.Abstractions;
namespace ProductValidation.CLI.Tests.Services
{
public class Example
{
private readonly ITestOutputHelper output;
public Example(ITestOutputHelper output)
{
this.output = output;
}
[Fact]
public async Task RunsObservableToCompletion()
{
var repo = new Repository(output);
var client = new ServiceClient(output);
var results = repo.FetchRecords()
.Select(x => client.FetchMoreInformation(x).ToObservable())
.Merge(1)
.Do(async x => await repo.Save(x));
await results.LastOrDefaultAsync();
}
}
public class Repository
{
private readonly ITestOutputHelper output;
public Repository(ITestOutputHelper output)
{
this.output = output;
}
public IObservable<int> FetchRecords()
{
return Observable.Create<int>(async (observer) =>
{
var page = 1;
var products = await FetchPage(page);
while (!products.IsNullOrEmpty())
{
foreach (var product in products)
{
observer.OnNext(product);
}
page += 1;
products = await FetchPage(page);
}
observer.OnCompleted();
})
.ObserveOn(SynchronizationContext.Current);
}
private async Task<IEnumerable<int>> FetchPage(int page)
{
// Simulate fetching a paged query.
await Task.Delay(500).ToObservable().ObserveOn(new TaskPoolScheduler(new TaskFactory()));
output.WriteLine("Fetching page {0}", page);
if (page >= 4) return Enumerable.Empty<int>();
return Enumerable.Range(1, 3).Select(_ => page);
}
public async Task Save(string id)
{
await Task.Delay(50); //Simulates latency
}
}
public class ServiceClient
{
private readonly ITestOutputHelper output;
private readonly SemaphoreSlim semaphore;
public ServiceClient(ITestOutputHelper output)
{
this.output = output;
this.semaphore = new SemaphoreSlim(2);
}
public async Task<string> FetchMoreInformation(int id)
{
try
{
output.WriteLine("Calling the web client for {0}", id);
await semaphore.WaitAsync(); // Protection for the webapi not sending too many calls
await Task.Delay(1000); //Simulates latency
return id.ToString();
}
finally
{
semaphore.Release();
}
}
}
}
【问题讨论】:
标签: c# system.reactive