【问题标题】:Reactor lazy pagination with expand反应堆延迟分页与扩展
【发布时间】:2019-06-22 23:47:59
【问题描述】:

基于How to collect paginated API responses using spring boot WebClient?

我创建了以下爬虫类

class GitlabCrawler(private val client: WebClient, private val token: String) {

    fun fetchCommits(project: URI): Flux<Commit> {
        return fetchCommitsInternal(project).expand { cr: ClientResponse? ->
                val nextUrl = getNextUrl(cr)

                nextUrl?.let { fetchCommitsInternal(URI.create(it)) }
                        ?: Mono.empty<ClientResponse>()
        }.limitRate(1)
                .flatMap { cr: ClientResponse? -> cr?.bodyToFlux(Commit::class.java) ?: Flux.empty() }


    }

    private fun getNextUrl(cr: ClientResponse?):String? {
        // TODO replace with proper link parsing
        return cr?.headers()?.header(HttpHeaders.LINK)?.firstOrNull()
                ?.splitToSequence(",")
                ?.find { it.endsWith("rel=\"next\"") }
                ?.let { it.substring(it.indexOf('<') + 1, it.lastIndexOf('>')) }
    }

    private fun fetchCommitsInternal(url: URI): Mono<ClientResponse> {
        return client.get()
                .uri(url)
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .header("Private-Token", token)
                .exchange()
    }
}


data class Commit(
        val id: String,
        val message: String,
        @JsonProperty("parent_ids") val parentIds: List<String>,
        @JsonProperty("created_at") val createdAt: String)

我想避免不必要的请求,但它执行的请求比完成请求所需的要多。

gitlabCrawler.fetchCommits(URI.create("https://...")).take(15).collectList().block()

只需要一个请求,因为每个页面包含 20 个条目,但它会启动第二个页面请求。它似乎总是要求多一页而不是必要的。我尝试使用limitRate,但这似乎没有效果。

有没有办法让它变得懒惰,即只有在当前用尽时才请求下一页?

【问题讨论】:

    标签: kotlin spring-webflux project-reactor


    【解决方案1】:

    您确定它确实执行了请求吗? fetchCommitInternal被调用意味着WebFlux“准备”了请求,不一定是它被执行(即订阅)。

    您的用例的以下简化显示了差异:

    private static Tuple2<Integer, Flux<Integer>> nextPage(int index, int pageSize) {
        System.out.println("prepared a request for page " + index);
        return Tuples.of(index, Flux.range((pageSize * (index - 1)) + 1, pageSize));
    }
    
    @Test
    public void expandLimitedRequest() {
        int pageSize = 5;
        Flux.just(nextPage(1, pageSize))
            .doOnSubscribe(sub -> System.out.println("requested first page"))
            .expand(page -> {
                int currentPage = page.getT1();
                if (currentPage < 3) {
                    int nextPage = currentPage + 1;
                    return Flux.just(nextPage(nextPage, pageSize))
                               .doOnSubscribe(sub -> System.out.println("requested page " + nextPage));
                }
                return Flux.empty();
            })
            .doOnNext(System.out::println)
            .flatMap(Tuple2::getT2)
            .doOnNext(System.out::println)
            .take(8)
            .blockLast();
    }
    

    打印:

    prepared a request for page 1
    requested first page
    [1,FluxRange]
    1
    2
    3
    4
    5
    prepared a request for page 2
    requested page 2
    [2,FluxRange]
    6
    7
    8
    prepared a request for page 3
    

    如您所见,它为第 3 页准备了请求,但从不执行它(因为 take 下游取消了之前的 expand)。

    【讨论】:

    • 从日志看来,请求是用.exchange() 急切完成的,我有两个带有标题的Received response 日志条目。如果我只take(1),也会发生这种情况。
    猜你喜欢
    • 1970-01-01
    • 2019-06-07
    • 1970-01-01
    • 1970-01-01
    • 2017-05-21
    • 2019-08-27
    • 2019-03-01
    • 1970-01-01
    • 2022-11-24
    相关资源
    最近更新 更多