挂起函数可以异步返回单个值,那如何异步多次返回多个值呢?
使用flow,flow的特点:
flow{…}块中的代码可以挂起使用flow,suspend修饰符可以省略流使用emit函数发射值流使用collect的函数收集值flow类似冷流,flow中代码直到流被收集(调用collect)的时候才运行,类似lazy,什么时候用,什么时候执行。流的连续性:流收集都是按顺序收集的flowOn可更改流发射的上下文,即可以指定在主线程或子线程中执行
//使用flow,suspend修饰符可以省略 fun doflow() = flow
文件下载场景
//正在下载(文件总大小为5) fun doflow() = flow
flowof 和asflow
runBlocking { flowOf(1, 2, 3) .onEach { delay(500) } .collect { log("value = $it") } (5..8).asFlow() .onEach { delay(500) } .collect { log("value = $it") } }
使用launchin替换collect在单独的协程中启动收集流。
fun event() = (1..3) .asFlow() .onEach { delay(500) }.flowOn(Dispatchers.IO) //调用 runBlocking { val job = event().onEach { log("value = $it") }.launchIn(CoroutineScope(Dispatchers.IO)) //主线程可用this //.launchIn(this) job.join() }
流的取消超时的时候取消
fun cancelFlow() = flow
直接取消
runBlocking { cancelFlow().collect { log("value = $it") if(it == 3){ cancel() } } }
繁忙的任务是不能直接取消的,需要检测取消(cancellable)
runBlocking { (1..5).asFlow().cancellable().collect { if(it == 3) { cancel() } } }
背压:生产者效率 > 消费者效率
使用缓冲和flowon来处理背压
buffer():并发运行流中发射元素的代码
conflate():合并发射项,不对每个值处理
collectLatest():取消并重新发送最后一个值
模拟背压代码:
fun preFlow() = flow
转换操作符:map ,transform
限长操作符:取指定数量,take
末端操作符:末端操作符用于启动流收集的挂起函数,collect,tolist,toset,reduce,fold
组合操作符:zip
展平操作符:flatMapConcat(连接),flatMapMerge(合并),flatMapLatest(最新)
suspend fun perRequest(req: Int): String { delay(1000) return "转换 $req" } runBlocking { (1..3).asFlow().map { perRequest(it) }.collect { log(it) } }打印:com.z.zjetpack V/zx: 转换 1com.z.zjetpack V/zx: 转换 2com.z.zjetpack V/zx: 转换 3
transformrunBlocking {(5..6).asFlow().transform { emit("s $it") emit(perRequest(it)) emit("e $it") } //.take(4) .collect { log(it) } }打印:com.z.zjetpack V/zx: s 5com.z.zjetpack V/zx: 转换 5com.z.zjetpack V/zx: e 5com.z.zjetpack V/zx: s 6com.z.zjetpack V/zx: 转换 6com.z.zjetpack V/zx: e 6
take加上take之后com.z.zjetpack V/zx: s 5com.z.zjetpack V/zx: 转换 5com.z.zjetpack V/zx: e 5com.z.zjetpack V/zx: s 6
末端操作符:collect,tolist,toset,reduce,foldrunBlocking { val sum = (1..5).asFlow().map { it * it }.reduce { a, b -> a + b } log("sum = $sum") val nList = (1..5).asFlow().toList() log("nList = $nList") val nSet = listOf(1, 2, 2, 3, 3, 5).asFlow().toSet() log("nSet = $nSet") }打印:com.z.zjetpack V/zx: sum = 55com.z.zjetpack V/zx: nList = [1, 2, 3, 4, 5]com.z.zjetpack V/zx: nSet = [1, 2, 3, 5]
展平操作符只使用map的时候
//返回值是一个flow fun reqFlow(i: Int) = flow
runBlocking { (0..1).asFlow().flatMapConcat { reqFlow(it) }.collect { log("首次collect = $it") } }打印:直接展开了com.z.zjetpack V/zx: 首次collect = start 0com.z.zjetpack V/zx: 首次collect = end 0com.z.zjetpack V/zx: 首次collect = start 1com.z.zjetpack V/zx: 首次collect = end 1
runBlocking { (0..1).asFlow().flatMapMerge { reqFlow(it) }.collect { log("首次collect = $it") } }打印:com.z.zjetpack V/zx: 首次collect = start 0com.z.zjetpack V/zx: 首次collect = start 1com.z.zjetpack V/zx: 首次collect = end 0com.z.zjetpack V/zx: 首次collect = end 1
flatMapLatestrunBlocking { (0..1).asFlow().flatMapLatest { reqFlow(it) }.collect { log("首次collect = $it") } }打印:com.z.zjetpack V/zx: 首次collect = start 0com.z.zjetpack V/zx: 首次collect = start 1com.z.zjetpack V/zx: 首次collect = end 1
流的异常处理catch函数 和 try catch
flow { emit(1) throw NullPointerException() //catch函数只捕获上游的异常 }.catch { log("exception $it") //在异常后恢复 emit(20) }.flowOn(Dispatchers.IO) .collect { log("msg $it") }打印:com.z.zjetpack V/zx: exception java.lang.NullPointerExceptioncom.z.zjetpack V/zx: msg 1com.z.zjetpack V/zx: msg 20
//不建议通过这种方式捕获上游的异常,违反了flow原则,这种适合捕获下游的异常 try { (1..3).asFlow().collect { check(it > 2) { "ex $it" } } } catch (e: Exception) { log("异常 $e") }打印:com.z.zjetpack V/zx: 异常 java.lang.IllegalStateException: ex 1
流的完成finally 和 onCompletion
try { (1..3).asFlow().collect { check(it > 2) { "ex $it" } } } catch (e: Exception) { log("异常 $e") } finally { log("流已完成") } //发生异常onCompletion可以拿到异常信息,但不会捕获 try { (1..3).asFlow().onCompletion { log("onCompletion $it") }.collect { check(it > 2) { "ex $it" } } } catch (e: Exception) { log("异常 $e") }打印:com.z.zjetpack V/zx: 异常 java.lang.IllegalStateException: ex 1com.z.zjetpack V/zx: 流已完成com.z.zjetpack V/zx: onCompletion java.lang.IllegalStateException: ex 1com.z.zjetpack V/zx: 异常 java.lang.IllegalStateException: ex 1
完