【发布时间】:2017-02-06 10:55:51
【问题描述】:
我为什么要使用 Kotlin 的协程?
RxKotlin 库似乎更加通用。 相比之下,Kotlin 的协程看起来没有那么强大,使用起来也更麻烦。
我对协程的看法基于this design talk by Andrey Breslav (JetBrains)
演讲的幻灯片是accessible here.
编辑(感谢@hotkey):
协同程序当前状态的更好来源here.
【问题讨论】:
我为什么要使用 Kotlin 的协程?
RxKotlin 库似乎更加通用。 相比之下,Kotlin 的协程看起来没有那么强大,使用起来也更麻烦。
我对协程的看法基于this design talk by Andrey Breslav (JetBrains)
演讲的幻灯片是accessible here.
编辑(感谢@hotkey):
协同程序当前状态的更好来源here.
【问题讨论】:
免责声明:此答案的部分内容无关紧要,因为协程现在具有流 API,与 Rx 非常相似。如果您想要最新的答案,请跳至最后一次编辑。
Rx 中有两个部分; Observable 模式,以及一组可靠的操作符来操作、转换和组合它们。 Observable 模式本身并没有做很多事情。与协程相同;这只是处理异步的另一种范式。您可以比较回调、Observable 和协程解决给定问题的优缺点,但不能将范例与功能齐全的库进行比较。这就像将语言与框架进行比较。
Kotlin 协程在哪些方面优于 RxKotlin?还没有使用协程,但它看起来类似于 C# 中的 async/wait。您只需编写顺序代码,一切就像编写同步代码一样简单……除了它异步执行。更容易掌握。
我为什么要使用 kotlin 协程?我会为自己回答。大多数时候我会坚持使用 Rx,因为我更喜欢事件驱动的架构。但是如果出现我正在编写顺序代码的情况,并且我需要在中间调用一个异步方法,我会很高兴地利用协程来保持这种状态并避免将所有内容都包装在 Observable 中。
编辑:现在我正在使用协程,是时候更新了。
RxKotlin 只是在 Kotlin 中使用 RxJava 的语法糖,所以我将在下面讨论 RxJava 而不是 RxKotlin。协程是比 RxJava 更低级和更通用的概念,它们服务于其他用例。也就是说,有一个用例可以比较 RxJava 和协程 (channel),它异步传递数据。协程在这里比 RxJava 有明显的优势:
subscribeOn() 和 ObserveOn() 令人困惑。每个协程都被赋予一个线程上下文并返回父上下文。对于一个通道,双方(生产者、消费者)都在自己的上下文中执行。协程对线程或线程池的影响更直观。yield)、优先级(select)、并行化(channel 上的多个producer/actor)或锁定资源(Mutex)。在服务器上(RxJava 首先出现)可能无关紧要,但在资源有限的环境中可能需要这种级别的控制。send() 到通道是一个挂起函数,当达到通道容量时会挂起。这是大自然赋予的开箱即用的背压。你也可以offer() 到通道,在这种情况下调用永远不会挂起,但在通道已满的情况下返回false,有效地从 RxJava 复制onBackpressureDrop()。或者您可以编写自己的自定义背压逻辑,这对于协程来说并不困难,尤其是与使用 RxJava 相比。还有另一个用例,协程大放异彩,这将回答您的第二个问题“我为什么要使用 Kotlin 协程?”。协程是后台线程或AsyncTask (Android) 的完美替代品。就像launch { someBlockingFunction() } 一样简单。当然,你也可以使用 RxJava 来实现这一点,也许使用 Schedulers 和 Completable。您不会(或很少)使用观察者模式和作为 RxJava 签名的运算符,这暗示这项工作超出了 RxJava 的范围。 RxJava 的复杂性(这里是无用的税)会使你的代码比 Coroutine 的版本更冗长,更不干净。
可读性很重要。在这方面,RxJava 和协程方法有很大不同。协程比 RxJava 更简单。如果您对map()、flatmap() 和一般的函数式反应式编程不放心,协程操作更容易,涉及基本指令:for、if、try/catch……但我个人发现协程的代码对于非平凡的任务更难理解。尤其是它涉及更多的嵌套和缩进,而 RxJava 中的运算符链使所有内容保持一致。函数式编程使处理更加明确。最重要的是,RxJava 可以使用来自其丰富(好吧,太丰富)运算符集的一些标准运算符来解决复杂的转换。当您拥有需要大量组合和转换的复杂数据流时,RxJava 会大放异彩。
我希望这些考虑将帮助您根据自己的需要选择合适的工具。
编辑: 协程现在有了 flow,一个非常非常类似于 Rx 的 API。可以比较每种的优缺点,但事实是差异很小。
协程的核心是一种并发设计模式,带有附加库,其中之一是类似于 Rx 的流 API。显然,Coroutines 的范围比 Rx 要广泛得多,Coroutines 能做的事情有很多 Rx 做不到的,我不能一一列举。但通常如果我在我的一个项目中使用协程,归结为一个原因:
我避免使用过多损害可读性的回调。协程使异步代码简单易写。通过利用suspend关键字,您的代码看起来像同步代码。
我在项目中看到 Rx 主要用于替换回调的相同目的,但如果您不打算修改架构以提交响应式模式,Rx 将是一个负担。考虑这个接口:
interface Foo {
fun bar(callback: Callback)
}
Coroutine 等价物更加明确,返回类型和关键字suspend 表明它是一个异步操作。
interface Foo {
suspend fun bar: Result
}
但是 Rx 等效项存在问题:
interface Foo {
fun bar: Single<Result>
}
当你在回调或协程版本中调用 bar() 时,你会触发计算;使用 Rx 版本,您可以获得可以随意触发的计算表示。您需要调用 bar() 然后订阅 Single。通常没什么大不了的,但对于初学者来说有点混乱,可能会导致一些微妙的问题。
此类问题的一个例子,假设回调栏函数是这样实现的:
fun bar(callback: Callback) {
setCallback(callback)
refreshData()
}
如果你没有正确移植它,你会以一个只能触发一次的 Single 结束,因为 refreshData() 是在 bar() 函数中调用的,而不是在订阅时调用的。一个初学者的错误,理所当然,但问题是 Rx 不仅仅是一个回调替代品,许多开发人员都在努力掌握 Rx。
如果您的目标是将异步任务从回调转换为更好的范例,则协程是完美的选择,而 Rx 会增加一些复杂性。
【讨论】:
Kotlin 协程与 Rx 不同。很难将它们进行比较,因为 Kotlin 协程是一种精简的语言特性(只有几个基本概念和一些操作它们的基本函数),而 Rx 是一个相当繁重的库,种类繁多即用型运算符。两者都是为解决异步编程问题而设计的,但是它们的解决方法却大不相同:
Rx 带有一种特殊的函数式编程风格,几乎可以用任何编程语言实现,而无需语言本身的支持。当手头的问题很容易分解为一系列标准运算符时,它运行良好,否则效果不佳。
Kotlin 协程提供了一种语言特性,可让库编写者实现各种异步编程风格,包括但不限于函数式反应风格 (Rx)。使用 Kotlin 协程,您还可以使用命令式、基于 promise/futures 的风格、actor 风格等编写异步代码。
将 Rx 与一些基于 Kotlin 协程实现的特定库进行比较更合适。
以kotlinx.coroutines library 为例。这个库提供了一组像async/await 这样的原语和通常被嵌入到其他编程语言中的通道。它还支持轻量级的无未来演员。你可以在Guide to kotlinx.coroutines by example阅读更多内容。
kotlinx.coroutines 提供的频道可以在某些用例中替换或增加 Rx。有一个单独的Guide to reactive streams with coroutines 更深入地探讨了与 Rx 的异同。
【讨论】:
try-catch。您可以获得开箱即用的范围控制,可以清晰、直观地划分您要保护的内容。您可以嵌套这些块并编写复杂的错误处理模式,这些模式仍然很容易推理。从语法上讲,所有基于高阶函数的库都可以使用方法链。协程拥有完整的语言。
我非常了解 RxJava,最近我改用 Kotlin Coroutines 和 Flow。
RxKotlin 与 RxJava 基本相同,只是添加了一些语法糖以使其在 Kotlin 中编写 RxJava 代码更加舒适/惯用。
RxJava 和 Kotlin 协程之间的“公平”比较应该包括 Flow,我将在这里尝试解释原因。这会有点长,但我会尽量用例子保持简单。
使用 RxJava 你有不同的对象(从版本 2 开始):
// 0-n events without backpressure management
fun observeEventsA(): Observable<String>
// 0-n events with explicit backpressure management
fun observeEventsB(): Flowable<String>
// exactly 1 event
fun encrypt(original: String): Single<String>
// 0-1 events
fun cached(key: String): Maybe<MyData>
// just completes with no specific results
fun syncPending(): Completable
在 kotlin coroutines + flow 中,您不需要很多实体,因为如果您没有事件流,您可以只使用简单的协程(挂起函数):
// 0-n events, the backpressure is automatically taken care off
fun observeEvents(): Flow<String>
// exactly 1 event
suspend fun encrypt(original: String): String
// 0-1 events
suspend fun cached(key: String): MyData?
// just completes with no specific results
suspend fun syncPending()
奖励:Kotlin Flow / Coroutines 支持null 值(RxJava 2 移除了支持)
暂停函数顾名思义:它们是可以暂停代码执行并在函数完成后恢复执行的函数;这使您可以编写更自然的代码。
在 RxJava 中,你有很多运算符(map、filter、flatMap、switchMap、...),其中大多数都有一个对应于每个实体类型的版本(Single.map()、@ 987654335@, ...)。
Kotlin Coroutines + Flow 不需要那么多运算符,让我们看看为什么用一些最常见的运算符示例
地图()
RxJava:
fun getPerson(id: String): Single<Person>
fun observePersons(): Observable<Person>
fun getPersonName(id: String): Single<String> {
return getPerson(id)
.map { it.firstName }
}
fun observePersonsNames(): Observable<String> {
return observePersons()
.map { it.firstName }
}
Kotlin 协程 + 流
suspend fun getPerson(id: String): Person
fun observePersons(): Flow<Person>
suspend fun getPersonName(id: String): String? {
return getPerson(id).firstName
}
fun observePersonsNames(): Flow<String> {
return observePersons()
.map { it.firstName }
}
对于“单一”情况,您不需要运算符,这与Flow 情况非常相似。
flatMap()
flatMap 运算符及其兄弟switchMap、contactMap 的存在允许您组合不同的 RxJava 对象,从而在映射事件时执行潜在的异步代码。
假设每个人都需要从数据库(或远程服务)中获取保险
RxJava
fun fetchInsurance(insuranceId: String): Single<Insurance>
fun getPersonInsurance(id: String): Single<Insurance> {
return getPerson(id)
.flatMap { person ->
fetchInsurance(person.insuranceId)
}
}
fun observePersonsInsurances(): Observable<Insurance> {
return observePersons()
.flatMap { person ->
fetchInsurance(person.insuranceId) // this is a Single
.toObservable() // flatMap expect an Observable
}
}
让我们看看 Kotlin Coroutiens + Flow
suspend fun fetchInsurance(insuranceId: String): Insurance
suspend fun getPersonInsurance(id: String): Insurance {
val person = getPerson(id)
return fetchInsurance(person.insuranceId)
}
fun observePersonsInsurances(): Flow<Insurance> {
return observePersons()
.map { person ->
fetchInsurance(person.insuranceId)
}
}
像以前一样,在简单的协程案例中,我们不需要运算符,我们只需要像不异步的情况下那样编写代码,只需使用挂起函数即可。
Flow 不是拼写错误,因此不需要flatMap 运算符,我们可以使用map。原因是 map lambda 是一个挂起函数!我们可以在里面执行挂起代码!!!
为此,我们不需要其他操作员。
我在这里作弊了一点
Rx
flatMap、switchMap和concatMap的行为略有不同。 RxflatMap为每个事件生成一个新流,然后将它们全部合并在一起:您在输出中接收到的新流事件的顺序未确定,它可能与输入中的顺序或事件不匹配Rx
concatMap“修复”了该问题并保证您将按照输入事件的相同顺序获取每个新流Rx
switchMap将在收到新事件时处理任何以前运行的流,只有收到的最后一个输入与此运算符有关所以你看,
Flow.map不一样,它实际上更类似于 RxconcatMap,这是你期望地图操作员更自然的行为。但确实你需要更少的运算符,在 map 中你可以做任何你想要的异步操作并重现
flatMap的行为,因为它是一个可暂停的函数。 RxJavaflatMap的实际等效运算符是Flow.flatMapMerge运算符。RxJava
switchMap的等效项可以在 Flow 中通过在map运算符之前使用conflate()运算符来实现。
对于更复杂的内容,您可以使用 Flow transform() 运算符,它会为每个事件发出您选择的 Flow。
每个 Flow 运算符都接受一个挂起函数!
在上一段中我告诉过你我作弊了。但是我所说的 Flow 不需要那么多操作符 的关键在于大多数操作符的回调都是挂起函数。
所以说你需要filter(),但是你的过滤器需要执行网络调用来知道你是否应该保留这个值,使用 RxJava 你需要将多个运算符与不可读的代码结合起来,使用 Flow 你可以只使用 @ 987654363@!
fun observePersonsWithValidInsurance(): Flow<Person> {
return observerPersons()
.filter { person ->
val insurance = fetchInsurance(person.insuranceId) // suspending call
insurance.isValid()
}
}
delay(), startWith(), concatWith(), ...
在 RxJava 中,您有许多运算符用于在前后添加延迟或添加项目:
使用 kotlin Flow,您可以简单地:
grabMyFlow()
.onStart {
// delay by 3 seconds before starting
delay(3000L)
// just emitting an item first
emit("First item!")
emit(cachedItem()) // call another suspending function and emit the result
}
.onEach { value ->
// insert a delay of 1 second after a value only on some condition
if (value.length() > 5) {
delay(1000L)
}
}
.onCompletion {
val endingSequence: Flow<String> = grabEndingSequence()
emitAll(endingSequence)
}
错误处理
RxJava 有很多操作符来处理错误:
使用 Flow,您只需要操作员 catch():
grabMyFlow()
.catch { error ->
// emit something from the flow
emit("We got an error: $error.message")
// then if we can recover from this error emit it
if (error is RecoverableError) {
// error.recover() here is supposed to return a Flow<> to recover
emitAll(error.recover())
} else {
// re-throw the error if we can't recover (aka = don't catch it)
throw error
}
}
而使用挂起功能,您只需使用try {} catch() {}。
您可以使用单个 catch 运算符实现所有 RxJava 错误运算符,因为您获得了暂停功能。
易于编写流操作符
由于协同程序在底层为 Flow 提供动力,因此编写运算符更容易。如果您曾经检查过 RxJava 运算符,您会发现它有多难以及您需要学习多少东西。
编写 Kotlin Flow 运算符更容易,您只需查看已经是 Flow here 一部分的运算符的源代码即可了解。原因是协程使编写异步代码变得更容易,并且操作符使用起来更自然。
作为奖励,Flow 运算符都是 kotlin 扩展函数,这意味着您或库都可以轻松添加运算符,并且使用起来不会感到奇怪(在 RxJava 中需要 observable.lift() 或 observable.compose() 来结合自定义运算符)。
上游线程不向下游泄漏
这是什么意思?
这解释了为什么在 RxJava 中你有 subscribeOn() 和 observeOn() 而在 Flow 中你只有 flowOn()。
让我们以这个 RxJava 为例:
urlsToCall()
.switchMap { url ->
if (url.scheme == "local") {
val data = grabFromMemory(url.path)
Flowable.just(data)
} else {
performNetworkCall(url)
.subscribeOn(Subscribers.io())
.toObservable()
}
}
.subscribe {
// in which thread is this call executed?
}
那么subscribe 中的回调在哪里执行?
答案是:
取决于...
如果它来自网络,它在一个 IO 线程中;如果它来自另一个未定义的分支,则取决于用于发送 url 的线程。
如果您考虑一下,您编写的任何代码:您不知道它将在哪个线程中执行:始终取决于调用者。这里的问题是线程不再依赖于调用者,它依赖于内部函数调用的作用。
假设你有这个简单的标准代码:
fun callUrl(url: Uri) { val callResult = if (url.scheme == "local") { grabFromMemory(url.path) } else { performNetworkCall(url) } return callResult }想象一下,如果不查看
grabFromMemory()和performNetworkCall()内部,就无法知道return callResult行在哪个线程中执行。想一想:让线程根据您调用的函数以及它们在内部执行的操作而更改。
回调 API 经常发生这种情况:除非有文档说明,否则您无法知道您提供的回调将在哪个线程中执行。
这就是“上游线程向下游泄漏”的概念。
对于 Flow 和 Coroutines,情况并非如此,除非您明确要求这种行为(使用 Dispatchers.Unconfined)。
suspend fun myFunction() {
// execute this coroutine body in the main thread
withContext(Dispatchers.Main) {
urlsToCall()
.conflate() // to achieve the effect of switchMap
.transform { url ->
if (url.scheme == "local") {
val data = grabFromMemory(url.path)
emit(data)
} else {
withContext(Dispatchers.IO) {
performNetworkCall(url)
}
}
}
.collect {
// this will always execute in the main thread
// because this is where we collect,
// inside withContext(Dispatchers.Main)
}
}
}
协程代码将在它们被执行的上下文中运行。并且只有网络调用的部分会在 IO 线程上运行,而我们在这里看到的其他所有内容都会在主线程上运行。
好吧,实际上,我们不知道grabFromMemory() 中的代码将在哪里运行,但我们并不关心:我们知道它将在主线程中调用,在那个挂起函数中我们可以有另一个 Dispatcher使用过,但我们知道它什么时候会返回结果 val data 这将再次出现在主线程中。
这意味着,查看一段代码,更容易判断它将在哪个线程中运行,如果你看到一个明确的 Dispatcher = 它就是那个调度器,如果你没有看到它:在任何线程调度器中暂停调用你正在查看正在被调用。
这不是 kotlin 发明的概念,但他们比我所知道的任何其他语言都更接受它。
如果我在这里解释的内容不足以让您阅读this article 或观看this video。
那是什么?
使用 RxJava,您可以订阅 observable,它们会为您提供 Disposable 对象。
您需要在不再需要它时处理掉它。因此,您通常要做的是保留对它的引用(或将其放在CompositeDisposable 中),以便稍后在不再需要时调用dispose()。如果你不这样做,linter 会给你一个警告。
RxJava 比传统线程好一些。当您创建一个新线程并在其上执行某些操作时,这是“一劳永逸”,您甚至无法取消它:Thread.stop() 已被弃用,有害,最近的实现实际上什么也没做。 Thread.interrupt() 让你的线程失败等等。任何异常都会丢失。你明白了。
使用 kotlin 协程和流程,它们颠倒了“一次性”的概念。没有CoroutineContext,你不能创建协程。
这个上下文定义了你协程的scope。在其中生成的每个子协程都将共享相同的范围。
如果您订阅流程,您必须在协程内或提供范围。
您仍然可以保留对您启动的协程 (Job) 的引用并取消它们。这将自动取消该协程的每个子进程。
如果您是 Android 开发人员,他们会自动为您提供这些范围。示例:viewModelScope 并且您可以在 viewModel 内启动协程,该范围知道当 viewmodel 被清除时它们将自动取消。
viewModelScope.launch {
// my coroutine here
}
如果任何孩子失败,一些作用域将终止,另一些作用域将让每个孩子离开自己的生命周期,而如果一个孩子失败,则不会停止其他孩子 (SupervisedJob)。
为什么这是一件好事?
让我试着像 Roman Elizarov 那样解释它。
一些旧的编程语言有goto这个概念,基本上可以让你随意从一行代码跳转到另一行代码。
非常强大,但如果被滥用,您最终可能会得到非常难以理解的代码,难以调试和推理。
因此,新的编程语言最终将其从语言中完全移除。
当您使用 if 或 while 或 when 时,更容易对代码进行推理:不管这些块内部发生了什么,您最终都会摆脱它们,这是一个“上下文” ",你没有奇怪的跳进跳出。
启动线程或订阅 RxJava observable 类似于 goto:您正在执行的代码将继续运行,直到“其他地方”停止。
对于协程,通过要求您提供上下文/作用域,您知道当您的作用域覆盖所有内容时,当您的上下文完成时,协程将完成,无论您有单个协程还是 10000 个协程都无关紧要。
您仍然可以通过使用 GlobalScope 来“转到”协程,出于同样的原因,您不应该在提供它的语言中使用 goto。
当我们使用响应式流时,我们总是有冷流和热流的概念。这些是 Rx 世界和 Kotlin Flows 的概念
Cold 流就像我们代码中的一个函数:它就在那里,在你调用它之前什么都不做。使用 Flow 意味着它定义了流的作用,但在您开始收集它之前它什么也不做。而且,就像一个函数,如果你收集(调用)它两次,流将运行两次。 (例如,执行 http 请求的冷流如果收集两次,将执行两次请求)。
Hot 流不是这样工作的。当您对它们进行多次对方付费呼叫时,它们都在后台共享同一个 Hot Stream,这意味着您的 Hot Stream 运行一次,您可以拥有多个观察者。
您通常可以使用某些运算符将冷流转换为热流。
在 RxJava 上,您可以使用 Connectable Observable/Flowable 这个概念。
val coldObservable: Observable<Something> = buildColdObservable()
// create an hot observable from the cold one
val connectableObservable: ConnectableObservable<Something> = coldObservable.publish()
// you can subscribe multiple times to this connectable
val subADisposable: Disposable = connectableObservable.subscribe(subscriberA)
val subBDisposable: Disposable = connectableObservable.subscribe(subscriberB)
// but nothing will be emitted there until you call
val hotDisposable: Disposable = connectableObservable.connect()
// which actually run the cold observable and share the result on bot subscriberA and subscriberB
// while it's active another one can start listening to it
val subCDisposable: Disposable = connectableObservable.subscribe(subscriberC)
您不会有其他有用的运算符,例如 refCount() 或 autoConnect(),它们会将 Connectable 转回标准流,并在附加第一个订阅者时自动 .connect()。
buildColdObservable()
.replay(1) // when a new subscriber is attached receive the last data instantly
.autoConnect() // keep the cold observable alive while there's some subscriber
在 Flow 上,您有 shareIn() 和 stateIn() 运算符。你可以看到API设计here。当您“连接”时,它们在处理时不那么“手动”。
buildColdFlow()
.shareIn(
// you need to specify a scope for the cold flow subscription
scope = myScope,
// when to "connect"
started = SharingStarted.WhileSubscribed(),
// how many events already emitted should be sent to new subscribers
replay = 1,
)
范围
适用于结构化并发。在 RxJava 上,实际上订阅冷 observable 的是 connect() 操作,它给你一个 Disposable 你将不得不在某个地方调用 .dispose()。如果您使用refCount() 或autoConnect(),它将在第一个订阅者上调用,并且refCount() 永远不会被释放,而autoConnect() 在没有更多订阅者时被释放。
使用 Flow,您需要提供一个专用的 Scope 来收集冷流,如果您取消该范围,冷流将停止发射并且不再可用。
开始
所以这个很简单
refCount() --> Flow SharingStarted.Lazily,开始收集第一个订阅者autoConnect() -> Flow SharingStarted.WhileSubscribed(),开始收集第一个订阅者并在没有订阅者时取消它connect() -> 流SharingStarted.Eagerly(),立即开始收集WhileSubscribed() 有有用的参数,check them out。
您还可以为SharingStarted 定义自己的逻辑,以便在从冷流收集时处理。
行为和背压
当你有一个 hot observable 时,你总是需要处理背压问题。 1 个数据源被多种方式监听,一个监听器可能比其他监听器慢。
Flow .shareIn 默认在专用协程中收集冷流并缓冲发射。这意味着如果冷流发射得太快,它将使用缓冲区。您可以更改此行为。
如果需要,Kotlin SharedFlow 还允许您直接访问重放缓冲区以检查先前的发射。
取消订阅者不会影响共享流。
使用flowOn()更改订阅者上的Dispatcher不会影响共享流(如果您需要在某些特定调度程序中运行冷流,请在共享前使用flowOn())
stateIn
Flow 有一个“特殊”版本的 ShareFlow,称为 StateFlow,您可以使用 stateIn() 从另一个流创建一个。
StateFlow总是有1个值,不能为“空”,所以stateIn()时需要提供初始值。
一个StateFlow永远不能抛出异常,永远不能终止(这种方式类似于RxRelay库中的BehaviorRelay)
StateFlow 只会在状态发生变化时发出(就像它在 distinctUntilChanged() 中构建一样。
RxJava 主题与可变*流
RxJava 中的 Subject 是一个类,您可以使用它手动将数据推送到其上,同时仍将其用作流。
在 Flow 中,您可以使用 MutableSharedFlow 或 MutableStateFlow 来实现类似的效果。
对于 Kotlin 协程,您还可以使用 Channels,但它们被认为是较低级别的 API。
Flow 仍在开发中,RxJava 中可用的一些功能可能在 Kotlin Coroutines Flow 中被标记为实验性的,或者在这里和那里有一些不同。
某些小众运算符或运算符功能可能尚未实现,您可能必须自己实现(至少更容易)。
但除此之外,我所知道的没有任何缺点。
但是有一些差异需要注意,这可能会导致从 RxJava 切换时出现一些摩擦,并且需要您学习新事物。
结构化并发向前迈进了一步,但引入了您需要学习和习惯的新概念(范围、supervisorJob):取消的处理方式完全不同。
需要注意一些问题。
问题:取消异常
如果您在协程中 cancel() 工作或 throw CancellationException() 除非您使用了主管范围/工作,否则异常会传播到父协程。
如果发生这种情况,父协程也会取消被取消的协程的兄弟协程。
但是如果你catch(e: Exception),即使使用runCatching {},你必须记得重新抛出CancellationException(),否则你会得到意想不到的结果,因为协程已被取消,但你的代码仍在尝试执行,就像它没有被取消一样.
问题:UncaughtExceptionHandler
如果您使用launch { ... } 创建一个新的协程并且该协程抛出by default,这将终止协程但不会使应用程序崩溃,您可能会完全错过一些问题。
此代码不会使您的应用崩溃。
launch {
throw RuntimeException()
}
在某些情况下,它甚至可能不会在日志中打印任何内容。
如果是取消异常,它肯定不会在日志中打印任何内容。
【讨论】:
您链接的谈话/文档没有谈论频道。通道填补了您当前对协程的理解与事件驱动编程之间的空白。
使用协程和通道,您可以进行事件驱动编程,就像您可能习惯使用 rx 一样,但您可以使用看起来同步的代码来完成,而无需太多“自定义”运算符。
如果您想更好地理解这一点,我建议您看看 kotlin 之外,这些概念更成熟和精致(不是实验性的)。查看 Clojure 中的 core.async、Rich Hickey 视频、帖子和相关讨论。
【讨论】:
协程旨在提供一个轻量级的异步编程框架。在启动异步作业所需的资源方面是轻量级的。协程不强制使用外部 API,对用户(程序员)来说更自然。相比之下,RxJava + RxKotlin 有一个额外的数据处理包,在 Kotlin 中并不真正需要,它在标准库中具有非常丰富的 API 用于序列和集合处理。
如果您想了解更多关于在 Android 上实际使用协程的信息,我可以推荐我的文章: https://www.netguru.com/codestories/android-coroutines-%EF%B8%8Fin-2020
【讨论】: