【问题标题】:How Kotlin coroutines are better than RxKotlin?Kotlin 协程如何优于 RxKotlin?
【发布时间】:2017-02-06 10:55:51
【问题描述】:

我为什么要使用 Kotlin 的协程?

RxKotlin 库似乎更加通用。 相比之下,Kotlin 的协程看起来没有那么强大,使用起来也更麻烦。

我对协程的看法基于this design talk by Andrey Breslav (JetBrains)

演讲的幻灯片是accessible here.


编辑(感谢@hotkey):

协同程序当前状态的更好来源here.

【问题讨论】:

    标签: kotlin rx-kotlin


    【解决方案1】:

    免责声明:此答案的部分内容无关紧要,因为协程现在具有流 API,与 Rx 非常相似。如果您想要最新的答案,请跳至最后一次编辑。

    Rx 中有两个部分; Observable 模式,以及一组可靠的操作符来操作、转换和组合它们。 Observable 模式本身并没有做很多事情。与协程相同;这只是处理异步的另一种范式。您可以比较回调、Observable 和协程解决给定问题的优缺点,但不能将范例与功能齐全的库进行比较。这就像将语言与框架进行比较。

    Kotlin 协程在哪些方面优于 RxKotlin?还没有使用协程,但它看起来类似于 C# 中的 async/wait。您只需编写顺序代码,一切就像编写同步代码一样简单……除了它异步执行。更容易掌握。

    我为什么要使用 kotlin 协程?我会为自己回答。大多数时候我会坚持使用 Rx,因为我更喜欢事件驱动的架构。但是如果出现我正在编写顺序代码的情况,并且我需要在中间调用一个异步方法,我会很高兴地利用协程来保持这种状态并避免将所有内容都包装在 Observable 中。

    编辑:现在我正在使用协程,是时候更新了。

    RxKotlin 只是在 Kotlin 中使用 RxJava 的语法糖,所以我将在下面讨论 RxJava 而不是 RxKotlin。协程是比 RxJava 更低级和更通用的概念,它们服务于其他用例。也就是说,有一个用例可以比较 RxJava 和协程 (channel),它异步传递数据。协程在这里比 RxJava 有明显的优势:

    协程更好地处理资源

    • 在 RxJava 中,您可以将计算分配给调度程序,但 subscribeOn()ObserveOn() 令人困惑。每个协程都被赋予一个线程上下文并返回父上下文。对于一个通道,双方(生产者、消费者)都在自己的上下文中执行。协程对线程或线程池的影响更直观。
    • 协程可以更好地控制这些计算何时发生。例如,您可以为给定的计算传递手(yield)、优先级(select)、并行化(channel 上的多个producer/actor)或锁定资源(Mutex)。在服务器上(RxJava 首先出现)可能无关紧要,但在资源有限的环境中可能需要这种级别的控制。
    • 由于它的反应性质,背压在 RxJava 中不太适合。在另一端send() 到通道是一个挂起函数,当达到通道容量时会挂起。这是大自然赋予的开箱即用的背压。你也可以offer() 到通道,在这种情况下调用永远不会挂起,但在通道已满的情况下返回false,有效地从 RxJava 复制onBackpressureDrop()。或者您可以编写自己的自定义背压逻辑,这对于协程来说并不困难,尤其是与使用 RxJava 相比。

    还有另一个用例,协程大放异彩,这将回答您的第二个问题“我为什么要使用 Kotlin 协程?”。协程是后台线程或AsyncTask (Android) 的完美替代品。就像launch { someBlockingFunction() } 一样简单。当然,你也可以使用 RxJava 来实现这一点,也许使用 SchedulersCompletable。您不会(或很少)使用观察者模式和作为 RxJava 签名的运算符,这暗示这项工作超出了 RxJava 的范围。 RxJava 的复杂性(这里是无用的税)会使你的代码比 Coroutine 的版本更冗长,更不干净。

    可读性很重要。在这方面,RxJava 和协程方法有很大不同。协程比 RxJava 更简单。如果您对map()flatmap() 和一般的函数式反应式编程不放心,协程操作更容易,涉及基本指令:foriftry/catch……但我个人发现协程的代码对于非平凡的任务更难理解。尤其是它涉及更多的嵌套和缩进,而 RxJava 中的运算符链使所有内容保持一致。函数式编程使处理更加明确。最重要的是,RxJava 可以使用来自其丰富(好吧,太丰富)运算符集的一些标准运算符来解决复杂的转换。当您拥有需要大量组合和转换的复杂数据流时,RxJava 会大放异彩。

    我希望这些考虑将帮助您根据自己的需要选择合适的工具。

    编辑: 协程现在有了 flow,一个非常非常类似于 Rx 的 API。可以比较每种的优缺点,但事实是差异很小。

    协程的核心是一种并发设计模式,带有附加库,其中之一是类似于 Rx 的流 API。显然,Coroutines 的范围比 Rx 要广泛得多,Coroutines 能做的事情有很多 Rx 做不到的,我不能一一列举。但通常如果我在我的一个项目中使用协程,归结为一个原因:

    协程更擅长从代码中移除回调

    我避免使用过多损害可读性的回调。协程使异步代码简单易写。通过利用suspend关键字,您的代码看起来像同步代码。

    我在项目中看到 Rx 主要用于替换回调的相同目的,但如果您不打算修改架构以提交响应式模式,Rx 将是一个负担。考虑这个接口:

    interface Foo {
       fun bar(callback: Callback)
    }
    

    Coroutine 等价物更加明确,返回类型和关键字suspend 表明它是一个异步操作。

    interface Foo {
       suspend fun bar: Result
    }
    

    但是 Rx 等效项存在问题:

    interface Foo {
       fun bar: Single<Result>
    }
    

    当你在回调或协程版本中调用 bar() 时,你会触发计算;使用 Rx 版本,您可以获得可以随意触发的计算表示。您需要调用 bar() 然后订阅 Single。通常没什么大不了的,但对于初学者来说有点混乱,可能会导致一些微妙的问题。

    此类问题的一个例子,假设回调栏函数是这样实现的:

    fun bar(callback: Callback) {
       setCallback(callback)
       refreshData()
    }
    

    如果你没有正确移植它,你会以一个只能触发一次的 Single 结束,因为 refreshData() 是在 bar() 函数中调用的,而不是在订阅时调用的。一个初学者的错误,理所当然,但问题是 Rx 不仅仅是一个回调替代品,许多开发人员都在努力掌握 Rx。

    如果您的目标是将异步任务从回调转换为更好的范例,则协程是完美的选择,而 Rx 会增加一些复杂性。

    【讨论】:

    • 所以不是将所有内容都包装在 Observable 中,而是将所有内容都包装在 Future 中。
    • 幸运的是,Kotlin 协程与 C# 和 JS 完全不同,并且不需要将代码包装在 Future 中。您可以在 Kotlin 协程中使用期货,但基于 Kotlin 协程的惯用代码几乎根本不使用期货。
    • 人们可以相当轻松地使用通道来构建事件驱动架构。
    • 出于一致性和复杂性的原因,我个人会避免混合使用 Coroutines 和 RxJava。根据您的用例,您可以考虑使用 LiveData 或新引入的类型 Flow 的协程:Roman Elizarov: Cold flows, hot channels
    • 此外,“''map()''' 或 '''flatMap()''' 在 Cooutin 中也可用。 Coroutine 的“流”的作用与 Rx 中的 Observable 类似,你也可以为此使用很多运算符。另外,Coroutine 比 Rx 快得多,并且比 Rx 使用更少的资源。让我展示这篇文章。 link.medium.com/o1QNGL2bvZ
    【解决方案2】:

    Kotlin 协程与 Rx 不同。很难将它们进行比较,因为 Kotlin 协程是一种精简的语言特性(只有几个基本概念和一些操作它们的基本函数),而 Rx 是一个相当繁重的库,种类繁多即用型运算符。两者都是为解决异步编程问题而设计的,但是它们的解决方法却大不相同:

    • Rx 带有一种特殊的函数式编程风格,几乎可以用任何编程语言实现,而无需语言本身的支持。当手头的问题很容易分解为一系列标准运算符时,它运行良好,否则效果不佳。

    • Kotlin 协程提供了一种语言特性,可让库编写者实现各种异步编程风格,包括但不限于函数式反应风格 (Rx)。使用 Kotlin 协程,您还可以使用命令式、基于 promise/futures 的风格、actor 风格等编写异步代码。

    将 Rx 与一些基于 Kotlin 协程实现的特定库进行比较更合适。

    kotlinx.coroutines library 为例。这个库提供了一组像async/await 这样的原语和通常被嵌入到其他编程语言中的通道。它还支持轻量级的无未来演员。你可以在Guide to kotlinx.coroutines by example阅读更多内容。

    kotlinx.coroutines 提供的频道可以在某些用例中替换或增加 Rx。有一个单独的Guide to reactive streams with coroutines 更深入地探讨了与 Rx 的异同。

    【讨论】:

    • 错误/异常处理管理怎么样? Rx 比协程好吗?
    • 如果我们将 Rx 与 kotlinx.coroutines 库进行比较,那么它们都提供大致相同的错误/异常处理能力,以样式差异为模。您可以安装全局错误/异常处理程序或使用各种结构在本地处理错误。
    • 我想说协程在错误处理方面肯定更灵活,因为你可以使用旧的try-catch。您可以获得开箱即用的范围控制,可以清晰、直观地划分您要保护的内容。您可以嵌套这些块并编写复杂的错误处理模式,这些模式仍然很容易推理。从语法上讲,所有基于高阶函数的库都可以使用方法链。协程拥有完整的语言。
    • 什么是不能“轻松分解为一系列标准运算符”的问题/代码示例?
    • 嘿 Roman 恭喜 Kotlin 领导层 :) 我对这个问题的回答有什么问题吗?
    【解决方案3】:

    我非常了解 RxJava,最近我改用 Kotlin Coroutines 和 Flow。

    RxKotlin 与 RxJava 基本相同,只是添加了一些语法糖以使其在 Kotlin 中编写 RxJava 代码更加舒适/惯用。

    RxJava 和 Kotlin 协程之间的“公平”比较应该包括 Flow,我将在这里尝试解释原因。这会有点长,但我会尽量用例子保持简单。

    使用 RxJava 你有不同的对象(从版本 2 开始):

    // 0-n events without backpressure management
    fun observeEventsA(): Observable<String>
    
    // 0-n events with explicit backpressure management
    fun observeEventsB(): Flowable<String>
    
    // exactly 1 event
    fun encrypt(original: String): Single<String>
    
    // 0-1 events
    fun cached(key: String): Maybe<MyData>
    
    // just completes with no specific results
    fun syncPending(): Completable
    

    在 kotlin coroutines + flow 中,您不需要很多实体,因为如果您没有事件流,您可以只使用简单的协程(挂起函数):

    // 0-n events, the backpressure is automatically taken care off
    fun observeEvents(): Flow<String>
    
    // exactly 1 event
    suspend fun encrypt(original: String): String
    
    // 0-1 events
    suspend fun cached(key: String): MyData?
    
    // just completes with no specific results
    suspend fun syncPending()
    

    奖励:Kotlin Flow / Coroutines 支持null 值(RxJava 2 移除了支持)

    暂停函数顾名思义:它们是可以暂停代码执行并在函数完成后恢复执行的函数;这使您可以编写更自然的代码。

    运营商呢?

    在 RxJava 中,你有很多运算符(mapfilterflatMapswitchMap、...),其中大多数都有一个对应于每个实体类型的版本(Single.map()、@ 987654335@, ...)。

    Kotlin Coroutines + Flow 不需要那么多运算符,让我们看看为什么用一些最常见的运算符示例

    地图()

    RxJava:

    fun getPerson(id: String): Single<Person>
    fun observePersons(): Observable<Person>
    
    fun getPersonName(id: String): Single<String> {
      return getPerson(id)
         .map { it.firstName }
    }
    
    fun observePersonsNames(): Observable<String> {
      return observePersons()
         .map { it.firstName }
    }
    

    Kotlin 协程 + 流

    suspend fun getPerson(id: String): Person
    fun observePersons(): Flow<Person>
    
    suspend fun getPersonName(id: String): String? {
      return getPerson(id).firstName
    }
    
    fun observePersonsNames(): Flow<String> {
      return observePersons()
         .map { it.firstName }
    }
    

    对于“单一”情况,您不需要运算符,这与Flow 情况非常相似。

    flatMap()

    flatMap 运算符及其兄弟switchMapcontactMap 的存在允许您组合不同的 RxJava 对象,从而在映射事件时执行潜在的异步代码。

    假设每个人都需要从数据库(或远程服务)中获取保险

    RxJava

    fun fetchInsurance(insuranceId: String): Single<Insurance>
    
    fun getPersonInsurance(id: String): Single<Insurance> {
      return getPerson(id)
        .flatMap { person ->
          fetchInsurance(person.insuranceId)
        }
    }
    
    fun observePersonsInsurances(): Observable<Insurance> {
      return observePersons()
        .flatMap { person ->
          fetchInsurance(person.insuranceId) // this is a Single
              .toObservable() // flatMap expect an Observable
        }
    }
    

    让我们看看 Kotlin Coroutiens + Flow

    suspend fun fetchInsurance(insuranceId: String): Insurance
    
    suspend fun getPersonInsurance(id: String): Insurance {
      val person = getPerson(id)
      return fetchInsurance(person.insuranceId)
    }
    
    fun observePersonsInsurances(): Flow<Insurance> {
      return observePersons()
        .map { person ->
          fetchInsurance(person.insuranceId)
        }
    }
    

    像以前一样,在简单的协程案例中,我们不需要运算符,我们只需要像不异步的情况下那样编写代码,只需使用挂起函数即可。

    Flow 不是拼写错误,因此不需要flatMap 运算符,我们可以使用map。原因是 map lambda 是一个挂起函数!我们可以在里面执行挂起代码!!!

    为此,我们不需要其他操作员。

    我在这里作弊了一点

    Rx flatMapswitchMapconcatMap 的行为略有不同。 Rx flatMap 为每个事件生成一个新流,然后将它们全部合并在一起:您在输出中接收到的新流事件的顺序未确定,它可能与输入中的顺序或事件不匹配

    Rx concatMap“修复”了该问题并保证您将按照输入事件的相同顺序获取每个新流

    Rx switchMap 将在收到新事件时处理任何以前运行的流,只有收到的最后一个输入与此运算符有关

    所以你看,Flow.map 不一样,它实际上更类似于 Rx concatMap,这是你期望地图操作员更自然的行为。

    但确实你需要更少的运算符,在 map 中你可以做任何你想要的异步操作并重现flatMap 的行为,因为它是一个可暂停的函数。 RxJava flatMap 的实际等效运算符是 Flow.flatMapMerge 运算符。

    RxJava switchMap 的等效项可以在 Flow 中通过在 map 运算符之前使用 conflate() 运算符来实现。

    对于更复杂的内容,您可以使用 Flow transform() 运算符,它会为每个事件发出您选择的 Flow。

    每个 Flow 运算符都接受一个挂起函数!

    在上一段中我告诉过你我作弊了。但是我所说的 Flow 不需要那么多操作符 的关键在于大多数操作符的回调都是挂起函数。

    所以说你需要filter(),但是你的过滤器需要执行网络调用来知道你是否应该保留这个值,使用 RxJava 你需要将多个运算符与不可读的代码结合起来,使用 Flow 你可以只使用 @ 987654363@!

    fun observePersonsWithValidInsurance(): Flow<Person> {
      return observerPersons()
        .filter { person ->
            val insurance = fetchInsurance(person.insuranceId) // suspending call
            insurance.isValid()
        }
    }
    

    delay(), startWith(), concatWith(), ...

    在 RxJava 中,您有许多运算符用于在前后添加延迟或添加项目:

    • 延迟()
    • delaySubscription()
    • startWith(T)
    • startWith(Observable)
    • concatWith(...)

    使用 kotlin Flow,您可以简单地:

    grabMyFlow()
      .onStart {
        // delay by 3 seconds before starting
        delay(3000L)
        // just emitting an item first
        emit("First item!")
        emit(cachedItem()) // call another suspending function and emit the result
      }
      .onEach { value ->
        // insert a delay of 1 second after a value only on some condition
        if (value.length() > 5) {
          delay(1000L)
        }
      }
      .onCompletion {
        val endingSequence: Flow<String> = grabEndingSequence()
        emitAll(endingSequence)
      }
    

    错误处理

    RxJava 有很多操作符来处理错误:

    • onErrorResumeWith()
    • onErrorReturn()
    • onErrorComplete()

    使用 Flow,您只需要操作员 catch()

      grabMyFlow()
        .catch { error ->
           // emit something from the flow
           emit("We got an error: $error.message")
           // then if we can recover from this error emit it
           if (error is RecoverableError) {
              // error.recover() here is supposed to return a Flow<> to recover
              emitAll(error.recover())
           } else {
              // re-throw the error if we can't recover (aka = don't catch it)
              throw error
           }
        }
    

    而使用挂起功能,您只需使用try {} catch() {}

    您可以使用单个 catch 运算符实现所有 RxJava 错误运算符,因为您获得了暂停功能。

    易于编写流操作符

    由于协同程序在底层为 Flow 提供动力,因此编写运算符更容易。如果您曾经检查过 RxJava 运算符,您会发现它有多难以及您需要学习多少东西。

    编写 Kotlin Flow 运算符更容易,您只需查看已经是 Flow here 一部分的运算符的源代码即可了解。原因是协程使编写异步代码变得更容易,并且操作符使用起来更自然。

    作为奖励,Flow 运算符都是 kotlin 扩展函数,这意味着您或库都可以轻松添加运算符,并且使用起来不会感到奇怪(在 RxJava 中需要 observable.lift()observable.compose() 来结合自定义运算符)。

    上游线程不向下游泄漏

    这是什么意思?

    这解释了为什么在 RxJava 中你有 subscribeOn()observeOn() 而在 Flow 中你只有 flowOn()

    让我们以这个 RxJava 为例:

    urlsToCall()
      .switchMap { url ->
        if (url.scheme == "local") {
           val data = grabFromMemory(url.path)
           Flowable.just(data)
        } else {
           performNetworkCall(url)
            .subscribeOn(Subscribers.io())
            .toObservable()
        }
      }
      .subscribe {
        // in which thread is this call executed?
      }
    

    那么subscribe 中的回调在哪里执行?

    答案是:

    取决于...

    如果它来自网络,它在一个 IO 线程中;如果它来自另一个未定义的分支,则取决于用于发送 url 的线程。

    如果您考虑一下,您编写的任何代码:您不知道它将在哪个线程中执行:始终取决于调用者。这里的问题是线程不再依赖于调用者,它依赖于内部函数调用的作用。

    假设你有这个简单的标准代码:

    fun callUrl(url: Uri) {
      val callResult = if (url.scheme == "local") {
        grabFromMemory(url.path)
      } else {
        performNetworkCall(url)
      }
      return callResult
    }
    

    想象一下,如果不查看grabFromMemory()performNetworkCall() 内部,就无法知道return callResult 行在哪个线程中执行。

    想一想:让线程根据您调用的函数以及它们在内部执行的操作而更改。

    回调 API 经常发生这种情况:除非有文档说明,否则您无法知道您提供的回调将在哪个线程中执行。

    这就是“上游线程向下游泄漏”的概念。

    对于 Flow 和 Coroutines,情况并非如此,除非您明确要求这种行为(使用 Dispatchers.Unconfined)。

    suspend fun myFunction() {
      // execute this coroutine body in the main thread
      withContext(Dispatchers.Main) {
        urlsToCall()
          .conflate() // to achieve the effect of switchMap
          .transform { url ->
            if (url.scheme == "local") {
               val data = grabFromMemory(url.path)
               emit(data)
            } else {
               withContext(Dispatchers.IO) {
                 performNetworkCall(url)
               }
            }
          }
          .collect {
            // this will always execute in the main thread
            // because this is where we collect,
            // inside withContext(Dispatchers.Main)
          }
      }
    }
    

    协程代码将在它们被执行的上下文中运行。并且只有网络调用的部分会在 IO 线程上运行,而我们在这里看到的其他所有内容都会在主线程上运行。

    好吧,实际上,我们不知道grabFromMemory() 中的代码将在哪里运行,但我们并不关心:我们知道它将在主线程中调用,在那个挂起函数中我们可以有另一个 Dispatcher使用过,但我们知道它什么时候会返回结果 val data 这将再次出现在主线程中。

    这意味着,查看一段代码,更容易判断它将在哪个线程中运行,如果你看到一个明确的 Dispatcher = 它就是那个调度器,如果你没有看到它:在任何线程调度器中暂停调用你正在查看正在被调用。

    结构化并发

    这不是 kotlin 发明的概念,但他们比我所知道的任何其他语言都更接受它。

    如果我在这里解释的内容不足以让您阅读this article 或观看this video

    那是什么?

    使用 RxJava,您可以订阅 observable,它们会为您提供 Disposable 对象。

    您需要在不再需要它时处理掉它。因此,您通常要做的是保留对它的引用(或将其放在CompositeDisposable 中),以便稍后在不再需要时调用dispose()。如果你不这样做,linter 会给你一个警告。

    RxJava 比传统线程好一些。当您创建一个新线程并在其上执行某些操作时,这是“一劳永逸”,您甚至无法取消它:Thread.stop() 已被弃用,有害,最近的实现实际上什么也没做。 Thread.interrupt() 让你的线程失败等等。任何异常都会丢失。你明白了。

    使用 kotlin 协程和流程,它们颠倒了“一次性”的概念。没有CoroutineContext,你不能创建协程。

    这个上下文定义了你协程的scope。在其中生成的每个子协程都将共享相同的范围。

    如果您订阅流程,您必须在协程内或提供范围。

    您仍然可以保留对您启动的协程 (Job) 的引用并取消它们。这将自动取消该协程的每个子进程。

    如果您是 Android 开发人员,他们会自动为您提供这些范围。示例:viewModelScope 并且您可以在 viewModel 内启动协程,该范围知道当 viewmodel 被清除时它们将自动取消。

    viewModelScope.launch {
      // my coroutine here
    }
    

    如果任何孩子失败,一些作用域将终止,另一些作用域将让每个孩子离开自己的生命周期,而如果一个孩子失败,则不会停止其他孩子 (SupervisedJob)。

    为什么这是一件好事?

    让我试着像 Roman Elizarov 那样解释它。

    一些旧的编程语言有goto这个概念,基本上可以让你随意从一行代码跳转到另一行代码。

    非常强大,但如果被滥用,您最终可能会得到非常难以理解的代码,难以调试和推理。

    因此,新的编程语言最终将其从语言中完全移除。

    当您使用 ifwhilewhen 时,更容易对代码进行推理:不管这些块内部发生了什么,您最终都会摆脱它们,这是一个“上下文” ",你没有奇怪的跳进跳出。

    启动线程或订阅 RxJava observable 类似于 goto:您正在执行的代码将继续运行,直到“其他地方”停止。

    对于协程,通过要求您提供上下文/作用域,您知道当您的作用域覆盖所有内容时,当您的上下文完成时,协程将完成,无论您有单个协程还是 10000 个协程都无关紧要。

    您仍然可以通过使用 GlobalScope 来“转到”协程,出于同样的原因,您不应该在提供它的语言中使用 goto

    冷与热 - ShareFlow 和 StateFlow

    当我们使用响应式流时,我们总是有冷流和热流的概念。这些是 Rx 世界和 Kotlin Flows 的概念

    Cold 流就像我们代码中的一个函数:它就在那里,在你调用它之前什么都不做。使用 Flow 意味着它定义了流的作用,但在您开始收集它之前它什么也不做。而且,就像一个函数,如果你收集(调用)它两次,流将运行两次。 (例如,执行 http 请求的冷流如果收集两次,将执行两次请求)。

    Hot 流不是这样工作的。当您对它们进行多次对方付费呼叫时,它们都在后台共享同一个 Hot Stream,这意味着您的 Hot Stream 运行一次,您可以拥有多个观察者。

    您通常可以使用某些运算符将冷流转换为热流。

    在 RxJava 上,您可以使用 Connectable Observable/Flowable 这个概念。

    val coldObservable: Observable<Something> = buildColdObservable()
    
    // create an hot observable from the cold one
    val connectableObservable: ConnectableObservable<Something> = coldObservable.publish()
    
    // you can subscribe multiple times to this connectable
    val subADisposable: Disposable = connectableObservable.subscribe(subscriberA)
    val subBDisposable: Disposable = connectableObservable.subscribe(subscriberB)
    
    // but nothing will be emitted there until you call
    val hotDisposable: Disposable = connectableObservable.connect()
    
    // which actually run the cold observable and share the result on bot subscriberA and subscriberB
    
    // while it's active another one can start listening to it
    val subCDisposable: Disposable = connectableObservable.subscribe(subscriberC)
    

    您不会有其他有用的运算符,例如 refCount()autoConnect(),它们会将 Connectable 转回标准流,并在附加第一个订阅者时自动 .connect()

    
    buildColdObservable()
       .replay(1) // when a new subscriber is attached receive the last data instantly
       .autoConnect() // keep the cold observable alive while there's some subscriber
    

    在 Flow 上,您有 shareIn()stateIn() 运算符。你可以看到API设计here。当您“连接”时,它们在处理时不那么“手动”。

    buildColdFlow()
      .shareIn(
        // you need to specify a scope for the cold flow subscription
        scope = myScope,
        // when to "connect"
        started = SharingStarted.WhileSubscribed(),
        // how many events already emitted should be sent to new subscribers
        replay = 1,
      )
    

    范围

    适用于结构化并发。在 RxJava 上,实际上订阅冷 observable 的是 connect() 操作,它给你一个 Disposable 你将不得不在某个地方调用 .dispose()。如果您使用refCount()autoConnect(),它将在第一个订阅者上调用,并且refCount() 永远不会被释放,而autoConnect() 在没有更多订阅者时被释放。

    使用 Flow,您需要提供一个专用的 Scope 来收集冷流,如果您取消该范围,冷流将停止发射并且不再可用。

    开始

    所以这个很简单

    • RxJava refCount() --> Flow SharingStarted.Lazily,开始收集第一个订阅者
    • RxJava autoConnect() -> Flow SharingStarted.WhileSubscribed(),开始收集第一个订阅者并在没有订阅者时取消它
    • RxJava 在任何订阅之前手动调用connect() -> 流SharingStarted.Eagerly(),立即开始收集

    WhileSubscribed() 有有用的参数,check them out

    您还可以为SharingStarted 定义自己的逻辑,以便在从冷流收集时处理。

    行为和背压

    当你有一个 hot observable 时,你总是需要处理背压问题。 1 个数据源被多种方式监听,一个监听器可能比其他监听器慢。

    Flow .shareIn 默认在专用协程中收集冷流并缓冲发射。这意味着如果冷流发射得太快,它将使用缓冲区。您可以更改此行为。

    如果需要,Kotlin SharedFlow 还允许您直接访问重放缓冲区以检查先前的发射。

    取消订阅者不会影响共享流。

    使用flowOn()更改订阅者上的Dispatcher不会影响共享流(如果您需要在某些特定调度程序中运行冷流,请在共享前使用flowOn()

    stateIn

    Flow 有一个“特殊”版本的 ShareFlow,称为 StateFlow,您可以使用 stateIn() 从另一个流创建一个。

    StateFlow总是有1个值,不能为“空”,所以stateIn()时需要提供初始值。

    一个StateFlow永远不能抛出异常,永远不能终止(这种方式类似于RxRelay库中的BehaviorRelay

    StateFlow 只会在状态发生变化时发出(就像它在 distinctUntilChanged() 中构建一样。

    RxJava 主题与可变*流 RxJava 中的 Subject 是一个类,您可以使用它手动将数据推送到其上,同时仍将其用作流。

    在 Flow 中,您可以使用 MutableSharedFlowMutableStateFlow 来实现类似的效果。

    对于 Kotlin 协程,您还可以使用 Channels,但它们被认为是较低级别的 API。

    有什么缺点吗?

    Flow 仍在开发中,RxJava 中可用的一些功能可能在 Kotlin Coroutines Flow 中被标记为实验性的,或者在这里和那里有一些不同。

    某些小众运算符或运算符功能可能尚未实现,您可能必须自己实现(至少更容易)。

    但除此之外,我所知道的没有任何缺点。

    但是有一些差异需要注意,这可能会导致从 RxJava 切换时出现一些摩擦,并且需要您学习新事物。

    结构化并发向前迈进了一步,但引入了您需要学习和习惯的新概念(范围、supervisorJob):取消的处理方式完全不同。

    需要注意一些问题。

    问题:取消异常

    如果您在协程中 cancel() 工作或 throw CancellationException() 除非您使用了主管范围/工作,否则异常会传播到父协程。 如果发生这种情况,父协程也会取消被取消的协程的兄弟协程。

    但是如果你catch(e: Exception),即使使用runCatching {},你必须记得重新抛出CancellationException(),否则你会得到意想不到的结果,因为协程已被取消,但你的代码仍在尝试执行,就像它没有被取消一样.

    问题:UncaughtExceptionHandler

    如果您使用launch { ... } 创建一个新的协程并且该协程抛出by default,这将终止协程但不会使应用程序崩溃,您可能会完全错过一些问题。

    此代码不会使您的应用崩溃。

    launch {
      throw RuntimeException()
    }
    

    在某些情况下,它甚至可能不会在日志中打印任何内容。

    如果是取消异常,它肯定不会在日志中打印任何内容。

    【讨论】:

    • 很好的比较和总结。
    • 谢谢,实际上我通过修复格式中的一些错误并添加了一些我忘记包含的部分来改进它
    • 谢谢。这个答案是杰作。
    • 没问题,很高兴它有帮助
    • 摇滚人,感谢您抽出宝贵时间与我们分享您的知识!
    【解决方案4】:

    您链接的谈话/文档没有谈论频道。通道填补了您当前对协程的理解与事件驱动编程之间的空白。

    使用协程和通道,您可以进行事件驱动编程,就像您可能习惯使用 rx 一样,但您可以使用看起来同步的代码来完成,而无需太多“自定义”运算符。

    如果您想更好地理解这一点,我建议您看看 kotlin 之外,这些概念更成熟和精致(不是实验性的)。查看 Clojure 中的 core.async、Rich Hickey 视频、帖子和相关讨论。

    【讨论】:

      【解决方案5】:

      协程旨在提供一个轻量级的异步编程框架。在启动异步作业所需的资源方面是轻量级的。协程不强制使用外部 API,对用户(程序员)来说更自然。相比之下,RxJava + RxKotlin 有一个额外的数据处理包,在 Kotlin 中并不真正需要,它在标准库中具有非常丰富的 API 用于序列和集合处理。

      如果您想了解更多关于在 Android 上实际使用协程的信息,我可以推荐我的文章: https://www.netguru.com/codestories/android-coroutines-%EF%B8%8Fin-2020

      【讨论】:

        猜你喜欢
        • 2019-12-06
        • 1970-01-01
        • 1970-01-01
        • 2019-02-19
        • 1970-01-01
        • 1970-01-01
        • 2019-11-10
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多