【问题标题】:How to Exponential Backoff retry on kotlin coroutines如何在 kotlin 协程上进行指数退避重试
【发布时间】:2018-04-02 23:47:22
【问题描述】:

我正在使用 kotlin 协程进行网络请求,使用扩展方法调用改造中的类,像这样

public suspend fun <T : Any> Call<T>.await(): T {

  return suspendCancellableCoroutine { continuation -> 

    enqueue(object : Callback<T> {

        override fun onResponse(call: Call<T>?, response: Response<T?>) {
            if (response.isSuccessful) {
                val body = response.body()
                if (body == null) {
                    continuation.resumeWithException(
                            NullPointerException("Response body is null")
                    )
                } else {
                    continuation.resume(body)
                }
            } else {
                continuation.resumeWithException(HttpException(response))
            }
        }

        override fun onFailure(call: Call<T>, t: Throwable) {
            // Don't bother with resuming the continuation if it is already cancelled.
            if (continuation.isCancelled) return
            continuation.resumeWithException(t)
        }
    })

      registerOnCompletion(continuation)
  }
}

然后从调用方我使用上面这样的方法

private fun getArticles()  = launch(UI) {

    loading.value = true
    try {
        val networkResult = api.getArticle().await()
        articles.value =  networkResult

    }catch (e: Throwable){
        e.printStackTrace()
        message.value = e.message

    }finally {
        loading.value = false
    }

}

我想在某些情况下指数重试这个 api 调用,即(IOException)我怎样才能实现它??

【问题讨论】:

    标签: android kotlin async-await retrofit2 kotlin-coroutines


    【解决方案1】:

    我建议为您的重试逻辑编写一个助手higher-order function。您可以使用以下实现作为开始:

    suspend fun <T> retryIO(
        times: Int = Int.MAX_VALUE,
        initialDelay: Long = 100, // 0.1 second
        maxDelay: Long = 1000,    // 1 second
        factor: Double = 2.0,
        block: suspend () -> T): T
    {
        var currentDelay = initialDelay
        repeat(times - 1) {
            try {
                return block()
            } catch (e: IOException) {
                // you can log an error here and/or make a more finer-grained
                // analysis of the cause to see if retry is needed
            }
            delay(currentDelay)
            currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelay)
        }
        return block() // last attempt
    }
    

    使用这个函数很简单:

    val networkResult = retryIO { api.getArticle().await() }
    

    您可以根据具体情况更改重试参数,例如:

    val networkResult = retryIO(times = 3) { api.doSomething().await() }
    

    您还可以完全更改retryIO 的实现以适应您的应用程序的需要。例如,您可以硬编码所有重试参数,摆脱重试次数限制,更改默认值等。

    【讨论】:

    • 这几天一直萦绕在我的脑海中。很高兴看到解决方案并不比我想象的复杂。我还问自己将这个辅助函数定义为内联函数是否有意义。最后但并非最不重要的一点是:如果您只想在要求用户这样做(例如在对话中)之后执行重试,如何修改 a?
    • 也比 Rx 解决方案干净得多:-O
    • 如果kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/… 也会发出当前的重试次数,它可以用来进行干净的(指数)回退
    【解决方案2】:

    这是我之前答案的更复杂和方便的版本,希望对某人有所帮助:

    class RetryOperation internal constructor(
        private val retries: Int,
        private val initialIntervalMilli: Long = 1000,
        private val retryStrategy: RetryStrategy = RetryStrategy.LINEAR,
        private val retry: suspend RetryOperation.() -> Unit
    ) {
        var tryNumber: Int = 0
            internal set
    
        suspend fun operationFailed() {
            tryNumber++
            if (tryNumber < retries) {
                delay(calculateDelay(tryNumber, initialIntervalMilli, retryStrategy))
                retry.invoke(this)
            }
        }
    }
    
    enum class RetryStrategy {
        CONSTANT, LINEAR, EXPONENTIAL
    }
    
    suspend fun retryOperation(
        retries: Int = 100,
        initialDelay: Long = 0,
        initialIntervalMilli: Long = 1000,
        retryStrategy: RetryStrategy = RetryStrategy.LINEAR,
        operation: suspend RetryOperation.() -> Unit
    ) {
        val retryOperation = RetryOperation(
            retries,
            initialIntervalMilli,
            retryStrategy,
            operation,
        )
    
        delay(initialDelay)
    
        operation.invoke(retryOperation)
    }
    
    internal fun calculateDelay(tryNumber: Int, initialIntervalMilli: Long, retryStrategy: RetryStrategy): Long {
        return when (retryStrategy) {
            RetryStrategy.CONSTANT -> initialIntervalMilli
            RetryStrategy.LINEAR -> initialIntervalMilli * tryNumber
            RetryStrategy.EXPONENTIAL -> 2.0.pow(tryNumber).toLong()
        }
    }
    
    

    用法:

    coroutineScope.launch {
        retryOperation(3) {
            if (!tryStuff()) {
                Log.d(TAG, "Try number $tryNumber")
                operationFailed()
            }
        }
    }
    

    【讨论】:

      【解决方案3】:

      您可以通过简单的用法尝试这种简单但非常灵活的方法:

      编辑:在单独的答案中添加了更复杂的解决方案。

      class Completion(private val retry: (Completion) -> Unit) {
          fun operationFailed() {
              retry.invoke(this)
          }
      }
      
      fun retryOperation(retries: Int, 
                         dispatcher: CoroutineDispatcher = Dispatchers.Default, 
                         operation: Completion.() -> Unit
      ) {
          var tryNumber = 0
      
          val completion = Completion {
              tryNumber++
              if (tryNumber < retries) {
                  GlobalScope.launch(dispatcher) {
                      delay(TimeUnit.SECONDS.toMillis(tryNumber.toLong()))
                      operation.invoke(it)
                  }
              }
          }
      
          operation.invoke(completion)
      }
      

      这样使用它:

      retryOperation(3) {
          if (!tryStuff()) {
              // this will trigger a retry after tryNumber seconds
              operationFailed()
          }
      }
      

      您显然可以在此基础上构建更多内容。

      【讨论】:

        【解决方案4】:

        这里是 FlowretryWhen 函数的示例

        RetryWhen分机:

        fun <T> Flow<T>.retryWhen(
            @FloatRange(from = 0.0) initialDelay: Float = RETRY_INITIAL_DELAY,
            @FloatRange(from = 1.0) retryFactor: Float = RETRY_FACTOR_DELAY,
            predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long, delay: Long) -> Boolean
        ): Flow<T> = this.retryWhen { cause, attempt ->
            val retryDelay = initialDelay * retryFactor.pow(attempt.toFloat())
            predicate(cause, attempt, retryDelay.toLong())
        }
        

        用法:

        flow {
            ...
        }.retryWhen { cause, attempt, delay ->
            delay(delay)
            ...
        }
        

        【讨论】:

          【解决方案5】:

          流版本https://github.com/hoc081098/FlowExt

          package com.hoc081098.flowext
          
          import kotlin.time.Duration
          import kotlin.time.ExperimentalTime
          import kotlinx.coroutines.delay
          import kotlinx.coroutines.flow.Flow
          import kotlinx.coroutines.flow.FlowCollector
          import kotlinx.coroutines.flow.emitAll
          import kotlinx.coroutines.flow.flow
          import kotlinx.coroutines.flow.retryWhen
          
          @ExperimentalTime
          public fun <T> Flow<T>.retryWithExponentialBackoff(
            initialDelay: Duration,
            factor: Double,
            maxAttempt: Long = Long.MAX_VALUE,
            maxDelay: Duration = Duration.INFINITE,
            predicate: suspend (cause: Throwable) -> Boolean = { true }
          ): Flow<T> {
            require(maxAttempt > 0) { "Expected positive amount of maxAttempt, but had $maxAttempt" }
            return retryWhenWithExponentialBackoff(
              initialDelay = initialDelay,
              factor = factor,
              maxDelay = maxDelay
            ) { cause, attempt -> attempt < maxAttempt && predicate(cause) }
          }
          
          @ExperimentalTime
          public fun <T> Flow<T>.retryWhenWithExponentialBackoff(
            initialDelay: Duration,
            factor: Double,
            maxDelay: Duration = Duration.INFINITE,
            predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean
          ): Flow<T> = flow {
            var currentDelay = initialDelay
          
            retryWhen { cause, attempt ->
              predicate(cause, attempt).also {
                if (it) {
                  delay(currentDelay)
                  currentDelay = (currentDelay * factor).coerceAtMost(maxDelay)
                }
              }
            }.let { emitAll(it) }
          }
          
          

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 2021-07-04
            • 1970-01-01
            • 2020-04-24
            • 2015-11-01
            • 1970-01-01
            • 2018-04-20
            • 2018-05-13
            • 2021-12-03
            相关资源
            最近更新 更多