Skip to content

06_协程应用场景

目录

  1. 协程基础回顾
  2. 协程 vs 传统线程
  3. 协程的核心优势
  4. Android 开发中的使用场景
  5. 网络请求场景
  6. 数据库操作场景
  7. 文件读写场景
  8. 并发控制场景
  9. 定时任务场景
  10. 对比分析与选型
  11. 面试高频考点

1. 协程基础回顾

1.1 什么是协程

协程(Coroutine)是轻量级的线程,由应用代码控制调度而非操作系统。协程可以在单个线程中实现高并发,避免了线程切换的开销。

核心概念:

  • 挂起(suspend):协程执行到挂起函数时会暂停,但不阻塞线程
  • 恢复:挂起完成后自动恢复执行
  • 上下文(Context):协程的执行环境,包括调度器、作业、异常处理器等

1.2 基本语法

kotlin
import kotlinx.coroutines.*

// 协程构建器
lifecycleScope.launch {
    // 协程代码
    val result = withContext(Dispatchers.IO) {
        // 后台任务
        doHeavyWork()
    }
    // 自动切换回主线程
    textView.text = result
}

1.3 协程三要素

┌────────┬────────┬────────┐
│ 上下文 │ 启动模式│ 异常处理器│
│ Context│ Start  │ Handler │
└────────┴────────┴────────┘
kotlin
// 完整的协程启动
lifecycleScope.launch(
    context = Dispatchers.Main + SupervisorJob(),
    start = CoroutineStart.DEFAULT,
    block = {
        // 协程代码
    }
)

2. 协程 vs 传统线程

2.1 实现原理对比

特性传统线程Kotlin 协程
实现层操作系统内核语言/编译器
调度方式操作系统调度应用代码调度
内存占用~1MB/线程~几 KB/协程
创建开销极低
切换开销高(上下文切换)低(仅保存状态)
并发能力几百个几十万到百万
阻塞影响阻塞整个线程仅挂起当前协程

2.2 内存对比

线程:
┌─────────────────┐
│ 栈空间 1MB      │  ← 每个线程独立
│ 本地变量        │
│ 参数            │
│ 返回地址        │
└─────────────────┘

协程:
┌───────────┐
│ 帧 几百字节│  ← 多个协程共享线程
│ 局部变量   │
│ 挂起点     │
└───────────┘

2.3 性能对比

kotlin
// 线程方案
fun threadsTest() {
    val threads = (1..10000).map { i ->
        Thread {
            doWork(i)
        }.apply { start() }
    }
    threads.forEach { it.join() }
}

// 协程方案
suspend fun coroutinesTest() {
    coroutineScope {
        (1..10000).forEach { i ->
            launch {
                doWork(i)
            }
        }
    }
}

// 结果对比:
// 线程:10000 个线程 ≈ 10GB 内存,创建和切换开销大
// 协程:10000 个协程 ≈ 几 MB 内存,几乎无切换开销

2.4 代码对比

java
// 传统线程 + 回调
new Thread(new Runnable() {
    public void run() {
        try {
            String data = fetchData();
            handler.post(new Runnable() {
                public void run() {
                    updateUI(data);
                }
            });
        } catch (Exception e) {
            handler.post(new Runnable() {
                public void run() {
                    showError(e);
                }
            });
        }
    }
}).start();
kotlin
// 协程
lifecycleScope.launch {
    try {
        val data = withContext(Dispatchers.IO) {
            fetchData()
        }
        updateUI(data)
    } catch (e: Exception) {
        showError(e)
    }
}

3. 协程的核心优势

3.1 非阻塞异步

传统线程问题:

java
// 线程阻塞等待,浪费线程资源
String data = httpClient.get(url); // 阻塞
updateUI(data);

协程解决方案:

kotlin
// 挂起等待,释放线程
suspend fun fetchData(): String = withContext(Dispatchers.IO) {
    httpClient.get(url) // 挂起,不阻塞
}

lifecycleScope.launch {
    val data = fetchData() // 等待但不阻塞
    updateUI(data)
}

3.2 结构化并发

kotlin
// 父协程管理子协程
lifecycleScope.launch {
    val job1 = launch { task1() }
    val job2 = launch { task2() }
    val job3 = launch { task3() }
    
    // 等待所有子协程完成
    job1.join()
    job2.join()
    job3.join()
    
    // 或者
    coroutineScope {
        launch { task1() }
        launch { task2() }
        launch { task3() }
        // 自动等待
    }
    
    // 父协程结束,所有子协程取消
}

3.3 异常处理

kotlin
// 1. try-catch 处理单个异常
lifecycleScope.launch {
    try {
        val result = withContext(Dispatchers.IO) {
            riskyOperation()
        }
    } catch (e: Exception) {
        handleError(e)
    }
}

// 2. SupervisorJob 隔离子协程异常
lifecycleScope.launch(SupervisorJob()) {
    launch {
        throw RuntimeException("任务 1 失败")
    }
    launch {
        // 不受影响,继续执行
        doWork()
    }
}

// 3. 全局异常处理器
class MyExceptionHandler : CoroutineExceptionHandler {
    override fun handleException(context: CoroutineContext, exception: Throwable) {
        Log.e("Coroutine", "异常:", exception)
    }
}

// 使用
lifecycleScope.launch(MyExceptionHandler()) {
    // 未捕获异常会被处理
}

3.4 线程安全

kotlin
// MutableStateFlow 线程安全
private val _count = MutableStateFlow(0)
val count: StateFlow<Int> = _count.asStateFlow()

// 从任何线程更新
lifecycleScope.launch {
    _count.value++ // 线程安全
}

// 主线程观察
count.observe { value ->
    textView.text = value.toString()
}

3.5 生命周期感知

kotlin
// ViewModel 自动管理协程生命周期
class MyViewModel : ViewModel() {
    fun loadData() {
        viewModelScope.launch {
            // Fragment 销毁时自动取消
            val data = repository.getData()
            _data.value = data
        }
    }
}

// Activity 生命周期感知
class MyActivity : AppCompatActivity() {
    fun loadData() {
        lifecycleScope.launch {
            // Activity 销毁时自动取消
            val data = repository.getData()
            updateUI(data)
        }
    }
}

4. Android 开发中的使用场景

4.1 场景分类

┌─────────────┬─────────────┬─────────────┐
│  网络请求   │  数据库操作  │  文件读写   │
├─────────────┼─────────────┼─────────────┤
│  并发控制   │  定时任务    │  UI 动画     │
├─────────────┼─────────────┼─────────────┤
│  事件流     │  状态管理    │  依赖注入   │
└─────────────┴─────────────┴─────────────┘

4.2 调度器选择

kotlin
// Dispatchers.Main - 主线程
lifecycleScope.launch(Dispatchers.Main) {
    // UI 操作
    textView.text = "Hello"
}

// Dispatchers.IO - IO 密集型
lifecycleScope.launch(Dispatchers.IO) {
    // 网络请求
    // 数据库操作
    // 文件读写
    val data = repository.getData()
}

// Dispatchers.Default - CPU 密集型
lifecycleScope.launch(Dispatchers.Default) {
    // 复杂计算
    // 图片处理
    // 加密解密
    val result = heavyCalculation()
}

// Dispatchers.Unconfined - 不受约束
lifecycleScope.launch(Dispatchers.Unconfined) {
    // 不会阻塞,在哪个线程挂起就在哪个线程恢复
    // 通常不推荐使用
}

5. 网络请求场景

5.1 单请求

kotlin
// Retrofit + 协程
interface ApiService {
    @GET("user/{id}")
    suspend fun getUser(@Path("id") userId: Long): User
}

// ViewModel
class UserViewModel : ViewModel() {
    private val apiService = RetrofitClient.api
    
    fun loadUser(userId: Long) {
        viewModelScope.launch {
            try {
                _loading.value = true
                val user = withContext(Dispatchers.IO) {
                    apiService.getUser(userId)
                }
                _user.value = user
            } catch (e: Exception) {
                _error.value = e.message
            } finally {
                _loading.value = false
            }
        }
    }
}

5.2 多请求并发

kotlin
// 并发请求多个接口
suspend fun fetchAllData(): List<Any> {
    return withContext(Dispatchers.IO) {
        coroutineScope {
            val userDeferred = async { apiService.getUser(1) }
            val postDeferred = async { apiService.getPosts(1) }
            val commentDeferred = async { apiService.getComments(1) }
            
            listOf(
                userDeferred.await(),
                postDeferred.await(),
                commentDeferred.await()
            )
        }
    }
}

// 使用
viewModelScope.launch {
    val data = fetchAllData()
    _data.value = data
}

5.3 限流并发

kotlin
// 限制并发数量
suspend fun downloadFilesLimited(urls: List<String>, limit: Int = 3) {
    val semaphore = Semaphore(limit)
    
    coroutineScope {
        urls.forEach { url ->
            async {
                semaphore.acquire()
                try {
                    downloadFile(url)
                } finally {
                    semaphore.release()
                }
            }
        }
    }
}

5.4 重试机制

kotlin
// 带重试的网络请求
suspend fun <T> retryableRequest(
    times: Int = 3,
    delay: Long = 1000,
    block: suspend () -> T
): T {
    var lastException: Exception? = null
    
    repeat(times) { attempt ->
        try {
            return block()
        } catch (e: Exception) {
            lastException = e
            if (attempt < times - 1) {
                delay(delay * (attempt + 1)) // 指数退避
            }
        }
    }
    
    throw lastException!!
}

// 使用
viewModelScope.launch {
    try {
        val user = retryableRequest {
            apiService.getUser(userId)
        }
        _user.value = user
    } catch (e: Exception) {
        _error.value = "请求失败"
    }
}

5.5 取消请求

kotlin
// 可取消的网络请求
class MyViewModel : ViewModel() {
    private var job: Job? = null
    
    fun loadUser(userId: Long) {
        // 取消之前的请求
        job?.cancel()
        
        job = viewModelScope.launch {
            try {
                val user = withContext(Dispatchers.IO) {
                    apiService.getUser(userId)
                }
                _user.value = user
            } catch (e: CancellationException) {
                // 请求被取消
            } catch (e: Exception) {
                _error.value = e.message
            }
        }
    }
    
    override fun onCleared() {
        super.onCleared()
        job?.cancel()
    }
}

6. 数据库操作场景

6.1 Room + 协程

kotlin
// DAO 接口
@Dao
interface UserDao {
    @Query("SELECT * FROM users")
    suspend fun getAllUsers(): List<User>
    
    @Query("SELECT * FROM users WHERE id = :id")
    suspend fun getUserById(id: Long): User
    
    @Insert
    suspend fun insertUser(user: User)
    
    @Update
    suspend fun updateUser(user: User)
    
    @Delete
    suspend fun deleteUser(user: User)
}

// Repository
class UserRepository(private val userDao: UserDao) {
    suspend fun getAllUsers(): List<User> = userDao.getAllUsers()
    
    suspend fun getUserById(id: Long): User = userDao.getUserById(id)
    
    suspend fun insertUser(user: User): Long = userDao.insertUser(user)
    
    suspend fun updateUser(user: User): Int = userDao.updateUser(user)
    
    suspend fun deleteUser(user: User): Int = userDao.deleteUser(user)
}

// ViewModel
class UserViewModel : ViewModel() {
    private val repository = UserRepository(userDao)
    
    fun loadUsers() {
        viewModelScope.launch {
            _users.value = repository.getAllUsers()
        }
    }
}

6.2 观察数据库变化

kotlin
// Flow 观察数据库变化
@Dao
interface UserDao {
    @Query("SELECT * FROM users")
    fun observeAllUsers(): Flow<List<User>>
}

// ViewModel
class UserViewModel : ViewModel() {
    private val userDao = database.userDao()
    
    val users: StateFlow<List<User>> = viewModelScope.viewModelScope
        .repeatOnLifecycle(Lifecycle.State.STARTED) {
            userDao.observeAllUsers()
        }.stateIn(
            scope = viewModelScope,
            started = SharingStarted.WhileSubscribed(5000),
            initialValue = emptyList()
        )
}

6.3 事务处理

kotlin
// 数据库事务
@Dao
interface UserDao {
    @Transaction
    suspend fun updateUserProfile(userId: Long, name: String, email: String) {
        val user = getUserById(userId)
        user.name = name
        user.email = email
        updateUser(user)
        
        // 关联操作
        updateUserHistory(userId, "更新个人信息")
    }
}

// 使用
viewModelScope.launch {
    try {
        withContext(Dispatchers.IO) {
            userDao.updateUserProfile(1, "新名字", "新邮箱")
        }
        _success.value = true
    } catch (e: Exception) {
        _error.value = e.message
    }
}

7. 文件读写场景

7.1 读取文件

kotlin
suspend fun readFile(filePath: String): String {
    return withContext(Dispatchers.IO) {
        File(filePath).readText()
    }
}

// 使用
viewModelScope.launch {
    try {
        val content = readFile("/path/to/file.txt")
        _content.value = content
    } catch (e: Exception) {
        _error.value = e.message
    }
}

7.2 写入文件

kotlin
suspend fun writeFile(filePath: String, content: String) {
    withContext(Dispatchers.IO) {
        File(filePath).writeText(content)
    }
}

// 带进度
suspend fun writeFileWithProgress(
    filePath: String,
    data: ByteArray,
    progress: (Int) -> Unit
) {
    withContext(Dispatchers.IO) {
        val file = File(filePath)
        file.outputStream().use { output ->
            var written = 0L
            val buffer = ByteArray(8192)
            var count: Int
            
            while (data.copyOfRange(written.toInt(), minOf(written.toInt() + 8192, data.size))
                .also { count = it.size }.copyInto(buffer) != 0) {
                output.write(buffer, 0, count)
                written += count
                progress((written * 100 / data.size).toInt())
            }
        }
    }
}

7.3 文件下载

kotlin
suspend fun downloadFile(url: String, filePath: String): Long {
    return withContext(Dispatchers.IO) {
        val urlConnection = URL(url).openConnection() as HttpURLConnection
        urlConnection.inputStream.use { input ->
            File(filePath).outputStream().use { output ->
                val buffer = ByteArray(8192)
                var count: Int
                var total = 0L
                
                while (input.read(buffer).also { count = it } != -1) {
                    output.write(buffer, 0, count)
                    total += count
                }
                total
            }
        }
    }
}

// 带进度
suspend fun downloadFileWithProgress(
    url: String,
    filePath: String,
    progress: (Int) -> Unit
): Long {
    return withContext(Dispatchers.IO) {
        val connection = URL(url).openConnection() as HttpURLConnection
        val totalSize = connection.contentLength
        
        connection.inputStream.use { input ->
            File(filePath).outputStream().use { output ->
                val buffer = ByteArray(8192)
                var count: Int
                var downloaded = 0L
                
                while (input.read(buffer).also { count = it } != -1) {
                    output.write(buffer, 0, count)
                    downloaded += count
                    progress((downloaded * 100 / totalSize).toInt())
                }
                downloaded
            }
        }
    }
}

8. 并发控制场景

8.1 并行执行

kotlin
// 并行执行多个任务
suspend fun parallelTasks() {
    val results = coroutineScope {
        listOf(
            async { task1() },
            async { task2() },
            async { task3() }
        ).awaitAll()
    }
}

// 使用场景:同时加载多个数据源
suspend fun loadDashboardData() {
    val (users, posts, comments) = coroutineScope {
        triple(
            async { repository.getUsers() },
            async { repository.getPosts() },
            async { repository.getComments() }
        ).await()
    }
    
    _dashboardData.value = DashboardData(users, posts, comments)
}

8.2 串行执行

kotlin
// 串行执行(按顺序)
suspend fun serialTasks() {
    val result1 = task1()
    val result2 = task2(result1)
    val result3 = task3(result2)
}

// 使用场景:依赖链式操作
suspend fun processOrder(orderId: Long) {
    val order = repository.getOrder(orderId)
    val inventory = checkInventory(order)
    val payment = processPayment(order)
    val shipment = createShipment(order, inventory, payment)
}

8.3 限流

kotlin
// 使用 Semaphore 限流
val semaphore = Semaphore(5)

suspend fun limitedTask(taskId: Int) {
    semaphore.acquire()
    try {
        doWork(taskId)
    } finally {
        semaphore.release()
    }
}

// 批量执行限流任务
suspend fun executeLimited(tasks: List<Int>) {
    coroutineScope {
        tasks.forEach { taskId ->
            launch {
                limitedTask(taskId)
            }
        }
    }
}

8.4 竞争与屏障

kotlin
// 第一个完成的协程胜出
suspend fun raceTasks(): Result {
    return withContext(Dispatchers.Default) {
        coroutineScope {
            val job1 = async {
                delay(1000)
                Result("任务 1 完成")
            }
            val job2 = async {
                delay(500)
                Result("任务 2 完成")
            }
            
            // 等待第一个完成
            awaitFirst(job1, job2)
        }
    }
}

// 所有线程在屏障处等待
suspend fun cyclicBarrierTasks() {
    val barrier = CyclicBarrier(3) {
        // 所有线程到达后执行
        Log.d("Barrier", "所有线程到达")
    }
    
    coroutineScope {
        repeat(3) { i ->
            launch {
                delay(i * 100)
                Log.d("Task", "任务 $i 到达")
                barrier.await()
                Log.d("Task", "任务 $i 继续")
            }
        }
    }
}

9. 定时任务场景

9.1 周期性任务

kotlin
// 使用 flow 实现周期性任务
fun periodicTask(interval: Long = 1000): Flow<Unit> = flow {
    while (true) {
        emit(Unit)
        delay(interval)
    }
}.shareIn(
    scope = lifecycleScope,
    started = SharingStarted.WhileSubscribed(5000)
)

// 使用
lifecycleScope.launch {
    periodicTask(1000).collect {
        refreshData()
    }
}

9.2 延迟执行

kotlin
// 延迟执行
lifecycleScope.launch {
    delay(3000)
    doSomething()
}

// 可取消的延迟
lifecycleScope.launch {
    try {
        delay(3000)
        doSomething()
    } catch (e: CancellationException) {
        // 被取消
    }
}

9.3 定时调度

kotlin
// 模拟 ScheduledExecutorService
fun periodicWork(initialDelay: Long, period: Long, block: suspend () -> Unit) {
    viewModelScope.launch {
        delay(initialDelay)
        while (isActive) {
            block()
            delay(period)
        }
    }
}

// 使用
periodicWork(1000, 5000) {
    refreshData()
}

10. 对比分析与选型

10.1 场景选型表

场景推荐方案理由
简单异步协程代码简洁
复杂数据流RxJava/Flow强大的转换能力
UI 动画协程 + flow易控制
网络请求协程 + Retrofit集成好
数据库协程 + Room官方支持
定时任务协程易取消
并发控制协程 + Semaphore灵活

10.2 协程 vs 协程工具

kotlin
// 1. launch - 无返回值
lifecycleScope.launch {
    // 执行任务
    doWork()
}

// 2. async - 有返回值
val deferred = lifecycleScope.async {
    return@async doWork()
}
val result = deferred.await()

// 3. flow - 流式数据
val flow = flow {
    emit(1)
    emit(2)
    emit(3)
}

// 4. channel - 通道
val channel = Channel<Int>()

10.3 StateFlow vs SharedFlow vs Flow

kotlin
// Flow - 冷流
val flow = flow {
    emit(1)
    emit(2)
}

// StateFlow - 有状态的冷流
val stateFlow = MutableStateFlow(0)

// SharedFlow - 热流
val sharedFlow = MutableSharedFlow<Int>()

10.4 选型建议

┌─────────────────┬──────────────────┐
│     场景        │     选型         │
├─────────────────┼──────────────────┤
│ 单次异步操作    │ launch          │
│ 需要返回值      │ async           │
│ 状态管理        │ StateFlow       │
│ 事件广播        │ SharedFlow      │
│ 数据处理        │ Flow            │
│ 通道通信        │ Channel         │
└─────────────────┴──────────────────┘

11. 面试高频考点

考点一:协程原理

Q: 协程是怎么实现的?

A:

  1. 使用状态机 + 挂起/恢复
  2. 编译器转换(suspend 函数)
  3. 基于 Continuation 实现

考点二:调度器

Q: Dispatchers 有哪些?区别?

A:

  • Main:主线程,UI 操作
  • IO:IO 密集型,数据库/网络
  • Default:CPU 密集型,默认线程池
  • Unconfined:不受约束,不推荐

考点三:结构化并发

Q: 什么是结构化并发?

A:

  1. 父协程管理子协程
  2. 子协程异常传播到父协程
  3. 父协程取消,子协程一起取消

考点四:协程取消

Q: 协程如何取消?

A:

  1. job.cancel()
  2. 抛出 CancellationException
  3. 检查 isActive

考点五:异常处理

Q: 协程异常如何处理?

A:

  1. try-catch
  2. SupervisorJob
  3. CoroutineExceptionHandler

考点六:挂起函数

Q: 什么是挂起函数?

A:

  1. 标记为 suspend 的函数
  2. 可以挂起而不阻塞线程
  3. 只能在协程或挂起函数中调用

考点七:Flow 使用

Q: Flow 是什么?怎么用?

A:

  1. 冷流,按需执行
  2. 支持操作符转换
  3. collect 收集数据

考点八:状态管理

Q: StateFlow vs LiveData?

A:

  • StateFlow 有初始值
  • StateFlow 可热订阅
  • LiveData 生命周期感知
  • StateFlow 更现代

考点九:并发控制

Q: 如何控制协程并发数?

A:

  1. Semaphore
  2. Dispatchers
  3. coroutineScope

考点十:内存泄漏

Q: 协程如何避免内存泄漏?

A:

  1. 使用 lifecycleScope
  2. 使用 viewModelScope
  3. 正确取消协程

最佳实践总结

1. 选择合适的 Scope

kotlin
// ViewModel
viewModelScope.launch { }

// Activity/Fragment
lifecycleScope.launch { }

2. 正确选择调度器

kotlin
// IO 任务
withContext(Dispatchers.IO) { }

// CPU 任务
withContext(Dispatchers.Default) { }

3. 异常处理

kotlin
try {
    doWork()
} catch (e: Exception) {
    handleError(e)
}

4. 取消管理

kotlin
job?.cancel()
viewModelScope.coroutineContext[Job]?.cancelChildren()

5. 使用状态流

kotlin
private val _state = MutableStateFlow(State.Initial)
val state: StateFlow<State> = _state.asStateFlow()

总结: 协程是现代 Android 开发的核心技术。掌握协程的各种应用场景和最佳实践,能够编写更高效、更安全的代码。在实际项目中,应根据具体场景选择合适的协程构建器和工具。