欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Kotlin之协程(三)Flow异步流

时间:2023-08-10
flow介绍

挂起函数可以异步返回单个值,那如何异步多次返回多个值呢?
使用flow,flow的特点:

flow{…}块中的代码可以挂起使用flow,suspend修饰符可以省略流使用emit函数发射值流使用collect的函数收集值flow类似冷流,flow中代码直到流被收集(调用collect)的时候才运行,类似lazy,什么时候用,什么时候执行。流的连续性:流收集都是按顺序收集的flowOn可更改流发射的上下文,即可以指定在主线程或子线程中执行

//使用flow,suspend修饰符可以省略 fun doflow() = flow { for (i in 1..5) { //这里是挂起,不是阻塞 delay(500) emit(i) } }.flowOn(Dispatchers.IO)//调用 runBlocking { doflow().collect { log("value=$it") } }打印(多次返回多个值)com.z.zjetpack V/zx: value=1com.z.zjetpack V/zx: value=2com.z.zjetpack V/zx: value=3com.z.zjetpack V/zx: value=4com.z.zjetpack V/zx: value=5

flow的应用场景

文件下载场景

//正在下载(文件总大小为5) fun doflow() = flow { for (i in 1..5) { delay(500) emit(i.toDouble()) } //flowOn来指定在IO线程中下载 }.flowOn(Dispatchers.IO)//读取进度 runBlocking { doflow().collect { log("当前下载=${it / 5 * 100}%") } }打印:com.z.zjetpack V/zx: 当前下载=20.0%com.z.zjetpack V/zx: 当前下载=40.0%com.z.zjetpack V/zx: 当前下载=60.0%com.z.zjetpack V/zx: 当前下载=80.0%com.z.zjetpack V/zx: 当前下载=100.0%

流构建器

flowof 和asflow

runBlocking { flowOf(1, 2, 3) .onEach { delay(500) } .collect { log("value = $it") } (5..8).asFlow() .onEach { delay(500) } .collect { log("value = $it") } }

使用launchin替换collect在单独的协程中启动收集流。

fun event() = (1..3) .asFlow() .onEach { delay(500) }.flowOn(Dispatchers.IO) //调用 runBlocking { val job = event().onEach { log("value = $it") }.launchIn(CoroutineScope(Dispatchers.IO)) //主线程可用this //.launchIn(this) job.join() }

流的取消

超时的时候取消

fun cancelFlow() = flow { for (i in 1..5) { delay(1000) emit(i) } }//调用 runBlocking { //超时的时候取消流 withTimeoutOrNull(2500) { cancelFlow().collect { log("value = $it") } } }打印:在2.5秒的时候超时了,取消了com.z.zjetpack V/zx: value = 1com.z.zjetpack V/zx: value = 2

直接取消

runBlocking { cancelFlow().collect { log("value = $it") if(it == 3){ cancel() } } }

繁忙的任务是不能直接取消的,需要检测取消(cancellable)

runBlocking { (1..5).asFlow().cancellable().collect { if(it == 3) { cancel() } } }

背压:生产者效率 > 消费者效率

使用缓冲和flowon来处理背压

buffer():并发运行流中发射元素的代码
conflate():合并发射项,不对每个值处理
collectLatest():取消并重新发送最后一个值

模拟背压代码:

fun preFlow() = flow { for (i in 1..5) { delay(100) emit(i) log("发送$i") } }//调用 //100ms发送一次,300ms接收一次就产生了背压 runBlocking { val time = measureTimeMillis { preFlow() //buffer可以增加缓冲,提高效率 //.buffer(100) //flowOn自带缓冲功能 //.flowOn(Dispatchers.IO) //conflate不对每个值处理 //.conflate() //.collect //取消并重新发送最后一个值 .collectLatest { delay(300) log("接收到:$it") } } log("总耗时 $time") }打印:com.z.zjetpack V/zx: 接收到:1com.z.zjetpack V/zx: 发送1com.z.zjetpack V/zx: 接收到:2com.z.zjetpack V/zx: 发送2com.z.zjetpack V/zx: 接收到:3com.z.zjetpack V/zx: 发送3com.z.zjetpack V/zx: 接收到:4com.z.zjetpack V/zx: 发送4com.z.zjetpack V/zx: 接收到:5com.z.zjetpack V/zx: 发送5com.z.zjetpack V/zx: 总耗时 2033使用buffer后com.z.zjetpack V/zx: 发送1com.z.zjetpack V/zx: 发送2com.z.zjetpack V/zx: 发送3com.z.zjetpack V/zx: 接收到:1com.z.zjetpack V/zx: 发送4com.z.zjetpack V/zx: 发送5com.z.zjetpack V/zx: 接收到:2com.z.zjetpack V/zx: 接收到:3com.z.zjetpack V/zx: 接收到:4com.z.zjetpack V/zx: 接收到:5com.z.zjetpack V/zx: 总耗时 1634使用flowOn后com.z.zjetpack V/zx: 发送1com.z.zjetpack V/zx: 发送2com.z.zjetpack V/zx: 发送3com.z.zjetpack V/zx: 接收到:1com.z.zjetpack V/zx: 发送4com.z.zjetpack V/zx: 发送5com.z.zjetpack V/zx: 接收到:2com.z.zjetpack V/zx: 接收到:3com.z.zjetpack V/zx: 接收到:4com.z.zjetpack V/zx: 接收到:5com.z.zjetpack V/zx: 总耗时 1639使用conflate后com.z.zjetpack V/zx: 发送1com.z.zjetpack V/zx: 发送2com.z.zjetpack V/zx: 发送3com.z.zjetpack V/zx: 接收到:1com.z.zjetpack V/zx: 发送4com.z.zjetpack V/zx: 发送5com.z.zjetpack V/zx: 接收到:3com.z.zjetpack V/zx: 接收到:5com.z.zjetpack V/zx: 总耗时 1034使用collectLatest后com.z.zjetpack V/zx: 发送1com.z.zjetpack V/zx: 发送2com.z.zjetpack V/zx: 发送3com.z.zjetpack V/zx: 发送4com.z.zjetpack V/zx: 发送5com.z.zjetpack V/zx: 接收到:5com.z.zjetpack V/zx: 总耗时 843

操作符

转换操作符:map ,transform
限长操作符:取指定数量,take
末端操作符:末端操作符用于启动流收集的挂起函数,collect,tolist,toset,reduce,fold
组合操作符:zip
展平操作符:flatMapConcat(连接),flatMapMerge(合并),flatMapLatest(最新)

map

suspend fun perRequest(req: Int): String { delay(1000) return "转换 $req" } runBlocking { (1..3).asFlow().map { perRequest(it) }.collect { log(it) } }打印:com.z.zjetpack V/zx: 转换 1com.z.zjetpack V/zx: 转换 2com.z.zjetpack V/zx: 转换 3

transform

runBlocking {(5..6).asFlow().transform { emit("s $it") emit(perRequest(it)) emit("e $it") } //.take(4) .collect { log(it) } }打印:com.z.zjetpack V/zx: s 5com.z.zjetpack V/zx: 转换 5com.z.zjetpack V/zx: e 5com.z.zjetpack V/zx: s 6com.z.zjetpack V/zx: 转换 6com.z.zjetpack V/zx: e 6

take

加上take之后com.z.zjetpack V/zx: s 5com.z.zjetpack V/zx: 转换 5com.z.zjetpack V/zx: e 5com.z.zjetpack V/zx: s 6

末端操作符:collect,tolist,toset,reduce,fold

runBlocking { val sum = (1..5).asFlow().map { it * it }.reduce { a, b -> a + b } log("sum = $sum") val nList = (1..5).asFlow().toList() log("nList = $nList") val nSet = listOf(1, 2, 2, 3, 3, 5).asFlow().toSet() log("nSet = $nSet") }打印:com.z.zjetpack V/zx: sum = 55com.z.zjetpack V/zx: nList = [1, 2, 3, 4, 5]com.z.zjetpack V/zx: nSet = [1, 2, 3, 5]

展平操作符

只使用map的时候

//返回值是一个flow fun reqFlow(i: Int) = flow { emit("start $i") delay(500) emit("end $i") } runBlocking { (0..1).asFlow().map { reqFlow(it) }.collect { log("首次collect = $it") it.collect { log("二次 = $it") } } }打印:由于返回是flow所以需要collect 两次才能拿到值,Flow>com.z.zjetpack V/zx: 首次collect = kotlinx.coroutines.flow.SafeFlow@63db1bfcom.z.zjetpack V/zx: 二次 = start 0com.z.zjetpack V/zx: 二次 = end 0com.z.zjetpack V/zx: 首次collect = kotlinx.coroutines.flow.SafeFlow@d27108ccom.z.zjetpack V/zx: 二次 = start 1com.z.zjetpack V/zx: 二次 = end 1

flatMapConcat

runBlocking { (0..1).asFlow().flatMapConcat { reqFlow(it) }.collect { log("首次collect = $it") } }打印:直接展开了com.z.zjetpack V/zx: 首次collect = start 0com.z.zjetpack V/zx: 首次collect = end 0com.z.zjetpack V/zx: 首次collect = start 1com.z.zjetpack V/zx: 首次collect = end 1

runBlocking { (0..1).asFlow().flatMapMerge { reqFlow(it) }.collect { log("首次collect = $it") } }打印:com.z.zjetpack V/zx: 首次collect = start 0com.z.zjetpack V/zx: 首次collect = start 1com.z.zjetpack V/zx: 首次collect = end 0com.z.zjetpack V/zx: 首次collect = end 1

flatMapLatest

runBlocking { (0..1).asFlow().flatMapLatest { reqFlow(it) }.collect { log("首次collect = $it") } }打印:com.z.zjetpack V/zx: 首次collect = start 0com.z.zjetpack V/zx: 首次collect = start 1com.z.zjetpack V/zx: 首次collect = end 1

流的异常处理

catch函数 和 try catch

flow { emit(1) throw NullPointerException() //catch函数只捕获上游的异常 }.catch { log("exception $it") //在异常后恢复 emit(20) }.flowOn(Dispatchers.IO) .collect { log("msg $it") }打印:com.z.zjetpack V/zx: exception java.lang.NullPointerExceptioncom.z.zjetpack V/zx: msg 1com.z.zjetpack V/zx: msg 20

//不建议通过这种方式捕获上游的异常,违反了flow原则,这种适合捕获下游的异常 try { (1..3).asFlow().collect { check(it > 2) { "ex $it" } } } catch (e: Exception) { log("异常 $e") }打印:com.z.zjetpack V/zx: 异常 java.lang.IllegalStateException: ex 1

流的完成

finally 和 onCompletion

try { (1..3).asFlow().collect { check(it > 2) { "ex $it" } } } catch (e: Exception) { log("异常 $e") } finally { log("流已完成") } //发生异常onCompletion可以拿到异常信息,但不会捕获 try { (1..3).asFlow().onCompletion { log("onCompletion $it") }.collect { check(it > 2) { "ex $it" } } } catch (e: Exception) { log("异常 $e") }打印:com.z.zjetpack V/zx: 异常 java.lang.IllegalStateException: ex 1com.z.zjetpack V/zx: 流已完成com.z.zjetpack V/zx: onCompletion java.lang.IllegalStateException: ex 1com.z.zjetpack V/zx: 异常 java.lang.IllegalStateException: ex 1

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。