Appearance
04_线程池
目录
1. 线程池概述
1.1 为什么要使用线程池
直接创建线程的问题:
java
// ❌ 问题:频繁创建销毁线程
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
// 执行任务
}).start();
}问题分析:
- 资源消耗大:线程创建和销毁消耗 CPU 和内存资源
- 系统压力大:大量线程竞争 CPU 时间片,导致上下文切换频繁
- 难以管理:线程创建后无法回收,容易造成内存泄漏
- 无控制能力:无法控制并发线程数量,可能导致系统崩溃
线程池的优势:
- 降低资源消耗:复用已创建的线程
- 提高响应速度:任务到达时无需等待线程创建
- 提高可管理性:统一分配、调优和监控
- 控制并发数:防止系统过载
1.2 线程池的核心接口
Executor 执行器接口
├── ExecutorService 执行服务接口
│ └── ThreadPoolExecutor 线程池实现
└── ScheduledExecutorService 定时执行服务接口
└── ScheduledThreadPoolExecutor 定时线程池实现java
// Executor 接口
public interface Executor {
void execute(Runnable command);
}
// ExecutorService 接口
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit);
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}2. ThreadPoolExecutor 详解
2.1 完整构造方法
java
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 空闲线程存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略
)2.2 构造函数完整示例
java
public class ThreadPoolExample {
public static void main(String[] args) {
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // corePoolSize
10, // maximumPoolSize
60L, // keepAliveTime
TimeUnit.SECONDS, // unit
new ArrayBlockingQueue<>(100), // workQueue
new NamedThreadFactory("Pool-"), // threadFactory
new AbortPolicy() // handler
);
// 提交任务
for (int i = 0; i < 20; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("执行任务:" + taskId);
});
}
// 关闭线程池
executor.shutdown();
}
}2.3 常用方法
java
public class ThreadPoolMethods {
private ThreadPoolExecutor executor;
// 1. 提交任务(无返回值)
public void executeTask() {
executor.execute(() -> {
// 任务内容
});
}
// 2. 提交任务(有返回值)
public Future<String> submitTask() {
return executor.submit(() -> {
return "任务完成";
});
}
// 3. 批量提交任务
public void submitBatch() {
List<Callable<String>> tasks = Arrays.asList(
() -> "任务 1",
() -> "任务 2",
() -> "任务 3"
);
// 等待所有任务完成
List<Future<String>> futures = executor.invokeAll(tasks);
for (Future<String> future : futures) {
try {
String result = future.get();
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 4. 获取线程池状态
public void getStatus() {
int poolSize = executor.getPoolSize(); // 当前线程数
int coreSize = executor.getCorePoolSize(); // 核心线程数
int maxSize = executor.getMaximumPoolSize(); // 最大线程数
long taskCount = executor.getTaskCount(); // 已处理任务数
long completedCount = executor.getCompletedTaskCount(); // 已完成任务数
boolean isShuttingDown = executor.isShutdown(); // 是否关闭
boolean isRunning = executor.isTerminated(); // 是否终止
}
// 5. 动态调整参数
public void dynamicConfig() {
executor.setCorePoolSize(10);
executor.setMaximumPoolSize(20);
executor.setKeepAliveTime(30L, TimeUnit.SECONDS);
}
// 6. 优雅关闭
public void gracefulShutdown() {
executor.shutdown(); // 停止接收新任务
try {
// 等待 60 秒
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
// 强制关闭
executor.shutdownNow();
// 再等 60 秒
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("线程池未正常关闭");
}
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}3. 线程池七大核心参数
3.1 核心线程数(corePoolSize)
定义: 线程池中长期存活的最小线程数,即使线程空闲也不会被回收(除非设置 allowCoreThreadTimeOut)。
示例:
java
// 核心线程数为 5,即使空闲也会保留
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // corePoolSize
10,
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new ThreadFactoryBuilder().setNameFormat("pool-%d").build(),
new AbortPolicy()
);3.2 最大线程数(maximumPoolSize)
定义: 线程池中允许创建的最大线程数。
注意: 当队列满后,会创建非核心线程,直到达到 maximumPoolSize。
3.3 空闲线程存活时间(keepAliveTime)
定义: 非核心线程在空闲状态下的最大存活时间,超时后被回收。
示例:
java
// 非核心线程空闲 60 秒后回收
executor = new ThreadPoolExecutor(
5,
10,
60L, // keepAliveTime
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
Executors.defaultThreadFactory(),
new AbortPolicy()
);
// 核心线程也回收
executor.allowCoreThreadTimeOut(true);
executor.setKeepAliveTime(60L, TimeUnit.SECONDS);3.4 任务队列(workQueue)
定义: 用于存放等待执行的任务的阻塞队列。
常见队列类型:
| 队列类型 | 特点 | 适用场景 |
|---|---|---|
| ArrayBlockingQueue | 有界数组队列 | 任务量可控 |
| LinkedBlockingQueue | 可选有界链表队列 | 默认选择 |
| SynchronousQueue | 不存储元素的队列 | 直接传递 |
| PriorityBlockingQueue | 优先级队列 | 任务有优先级 |
java
// 1. 有界队列(推荐)
new ArrayBlockingQueue<>(100)
// 2. 无界队列(可能导致 OOM)
new LinkedBlockingQueue<>()
// 3. 有界链表队列
new LinkedBlockingQueue<>(1000)
// 4. 不存储元素的队列
new SynchronousQueue<>()
// 5. 优先级队列
new PriorityBlockingQueue<>(
100,
(a, b) -> Integer.compare(a.getPriority(), b.getPriority())
)3.5 线程工厂(ThreadFactory)
定义: 用于创建新线程的工厂。
自定义线程工厂:
java
class NamedThreadFactory implements ThreadFactory {
private final String namePrefix;
private final boolean isDaemon;
private final AtomicInteger seq = new AtomicInteger(1);
public NamedThreadFactory(String namePrefix, boolean isDaemon) {
this.namePrefix = namePrefix;
this.isDaemon = isDaemon;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, namePrefix + seq.getAndIncrement());
thread.setDaemon(isDaemon);
return thread;
}
}
// 使用
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5,
10,
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new NamedThreadFactory("MyApp-", false),
new AbortPolicy()
);3.6 拒绝策略(RejectedExecutionHandler)
定义: 当线程池无法处理新任务时的处理策略。
详见第 5 节。
3.7 参数配置最佳实践
java
public class ThreadPoolConfig {
// CPU 密集型任务
public static ThreadPoolExecutor createCpuIntensivePool() {
// CPU 核数 + 1
int coreSize = Runtime.getRuntime().availableProcessors() + 1;
return new ThreadPoolExecutor(
coreSize,
coreSize,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new NamedThreadFactory("CPU-"),
new AbortPolicy()
);
}
// IO 密集型任务
public static ThreadPoolExecutor createIoIntensivePool() {
// 2 * CPU 核数 或 CPU 核数 * 2
int coreSize = Runtime.getRuntime().availableProcessors() * 2;
return new ThreadPoolExecutor(
coreSize,
coreSize * 2,
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000),
new NamedThreadFactory("IO-"),
new CallerRunsPolicy()
);
}
//混合型任务
public static ThreadPoolExecutor createMixedPool() {
int coreSize = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(
coreSize,
coreSize * 4,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2000),
new NamedThreadFactory("Mixed-"),
new CallerRunsPolicy()
);
}
}4. 线程池工作流程
4.1 完整流程图解
┌──────────────────────────────────────────────────────────────┐
│ 提交任务 │
└─────────────────────┬────────────────────────────────────────┘
│
▼
┌────────────────────────┐
│ corePoolSize == 0? │
└─────────┬──────────────┘
│ 是
▼
┌──────────────────────┐
│ 创建核心线程执行任务 │
└─────────┬────────────┘
│
┌────────┴────────┐
│ │
否 任务执行
│ │
▼ │
┌─────────────────┐ │
│ workQueue 有空?│ │
└────────┬────────┘ │
│ 是 │
▼ │
┌─────────────────┐ │
│ 加入任务队列 │ │
└────────┬────────┘ │
│ │
└───────┬───────┘
│
┌───────┴───────┐
│ │
否 队列满
│ │
▼ │
┌──────────────────┐ │
│ maxPoolSize 已满?│ │
└────────┬─────────┘ │
│ 否 │
▼ │
┌──────────────────┐ │
│ 创建非核心线程 │ │
└────────┬─────────┘ │
│ │
└───────┬───────┘
│ 是
▼
┌─────────────────┐
│ 执行拒绝策略 │
└─────────────────┘4.2 工作流程代码演示
java
public class WorkflowDemo {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // corePoolSize
4, // maximumPoolSize
60L, // keepAliveTime
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3), // 队列容量 3
Executors.defaultThreadFactory(),
new LoggedAbortPolicy() // 自定义拒绝策略
);
// 监控线程池状态
new Thread(() -> {
while (true) {
System.out.println(
"poolSize=" + executor.getPoolSize() +
", queueSize=" + executor.getQueue().size() +
", completed=" + executor.getCompletedTaskCount()
);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
}
}).start();
// 提交 10 个任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务" + taskId + "执行中");
try {
Thread.sleep(2000); // 模拟耗时
} catch (InterruptedException e) {}
System.out.println("任务" + taskId + "完成");
});
try {
Thread.sleep(500); // 每隔 500ms 提交一个任务
} catch (InterruptedException e) {}
}
executor.shutdown();
}
}4.3 不同队列的行为差异
java
// 1. ArrayBlockingQueue:队列满后立即创建非核心线程
ArrayBlockingQueue<Runnable> arrayQueue = new ArrayBlockingQueue<>(2);
// 结果:2 个核心线程 + 2 个任务在队列 + 2 个非核心线程
// 2. SynchronousQueue:不存储元素,直接创建非核心线程
SynchronousQueue<Runnable> syncQueue = new SynchronousQueue<>();
// 结果:2 个核心线程 + 超出部分直接创建非核心线程
// 3. LinkedBlockingQueue:队列容量大时,核心线程可能闲置
LinkedBlockingQueue<Runnable> linkQueue = new LinkedBlockingQueue<>(1000);
// 结果:任务先入队,核心线程处理完再创建新线程5. 四种拒绝策略
5.1 AbortPolicy(默认)
行为: 抛出 RejectedExecutionException 异常
java
class AbortPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("任务被拒绝:" + r.toString());
}
}
// 使用
new ThreadPoolExecutor(
2, 2, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
Executors.defaultThreadFactory(),
new AbortPolicy() // 默认策略
);5.2 CallerRunsPolicy
行为: 用调用者线程执行被拒绝的任务
java
class CallerRunsPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run(); // 在当前线程执行
}
}
}
// 使用
new ThreadPoolExecutor(
2, 2, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
Executors.defaultThreadFactory(),
new CallerRunsPolicy()
);特点:
- 不会丢失任务
- 降低提交速度(背压机制)
- 适合对任务丢失敏感的场景
5.3 DiscardPolicy
行为: 直接丢弃被拒绝的任务,不抛出异常
java
class DiscardPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 什么都不做,直接丢弃
}
}适用场景:
- 日志系统
- 监控数据
- 可以丢失的非关键任务
5.4 DiscardOldestPolicy
行为: 丢弃队列中最老的任务,然后尝试重新提交当前任务
java
class DiscardOldestPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll(); // 移除最老的任务
e.execute(r); // 尝试重新提交
}
}
}5.5 自定义拒绝策略
java
// 1. 记录日志后丢弃
class LoggingRejectPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
Log.e("ThreadPool", "任务被拒绝:" + r.toString());
}
}
// 2. 加入重试机制
class RetryRejectPolicy implements RejectedExecutionHandler {
private int maxRetries = 3;
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
for (int i = 0; i < maxRetries; i++) {
try {
e.execute(r);
return;
} catch (RejectedExecutionException ex) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return;
}
}
}
Log.e("ThreadPool", "重试" + maxRetries + "次后失败");
}
}
// 3. 任务回源到外部队列
class BackingQueueRejectPolicy implements RejectedExecutionHandler {
private final BlockingQueue<Runnable> backupQueue = new LinkedBlockingQueue<>();
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
try {
backupQueue.put(r); // 加入备用队列
// 后台消费备用队列
new Thread(() -> {
while (!backupQueue.isEmpty()) {
try {
e.execute(backupQueue.take());
} catch (Exception ex) {}
}
}).start();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}6. Executors 工厂类
6.1 六种工厂方法
java
// 1. 单线程线程池
ExecutorService single = Executors.newSingleThreadExecutor();
// 等价于:new ThreadPoolExecutor(1, 1, 0, MS, new LinkedBlockingQueue<>())
// 2. 固定大小线程池
ExecutorService fixed = Executors.newFixedThreadPool(5);
// 等价于:new ThreadPoolExecutor(5, 5, 0, MS, new LinkedBlockingQueue<>())
// 3. 缓存线程池
ExecutorService cached = Executors.newCachedThreadPool();
// 等价于:new ThreadPoolExecutor(0, MAX, 60S, MS, new SynchronousQueue<>())
// 4. 定时线程池
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(5);
// 支持定时和周期性任务
// 5. 单线程定时线程池
ScheduledExecutorService singleScheduled =
Executors.newSingleThreadScheduledExecutor();
// 6. 上下文保留(Java 8+)
ExecutorService context = Executors.newContextService(
Executors.newFixedThreadPool(5)
);6.2 为什么推荐使用 ThreadPoolExecutor 构造器
Executors 的缺陷:
java
// ❌ 问题 1: newFixedThreadPool 和 newSingleThreadExecutor
// 允许请求队列长度为 Integer.MAX_VALUE,可能 OOM
// ❌ 问题 2: newCachedThreadPool
// 允许创建线程数量为 Integer.MAX_VALUE,可能 OOM
// ✅ 推荐:直接使用 ThreadPoolExecutor 构造函数
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // corePoolSize
10, // maximumPoolSize
60L, // keepAliveTime
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000), // 有界队列
new NamedThreadFactory("app-"),
new AbortPolicy()
);6.3 Guava 线程池工厂
java
// 使用 Guava 的 ThreadFactoryBuilder
ThreadFactory daemonThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("daemon-%d")
.setDaemon(true)
.build();
// 使用 Guava 的 ListeningExecutorService
ListeningExecutorService executor = MoreExecutors.listeningDecorator(
new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100), daemonThreadFactory)
);
// 提交任务并添加回调
ListenableFuture<String> future = executor.submit(() -> {
return "任务完成";
});
ListenableFuture.transformAsync(future, result -> {
// 处理结果
return result.toUpperCase();
});7. 线程池参数调优
7.1 调优原则
CPU 密集型任务:
java
// 原则:CPU 核数 + 1(减少上下文切换)
int coreSize = Runtime.getRuntime().availableProcessors() + 1;
ThreadPoolExecutor cpuPool = new ThreadPoolExecutor(
coreSize,
coreSize,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new NamedThreadFactory("CPU-"),
new AbortPolicy()
);IO 密集型任务:
java
// 原则:CPU 核数 * 2 或 更多
int coreSize = Runtime.getRuntime().availableProcessors() * 2;
ThreadPoolExecutor ioPool = new ThreadPoolExecutor(
coreSize,
coreSize * 2,
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000),
new NamedThreadFactory("IO-"),
new CallerRunsPolicy()
);混合型任务:
java
// 原则:根据任务比例拆分线程池
// 70% IO 任务 + 30% CPU 任务
ThreadPoolExecutor mixedPool = new ThreadPoolExecutor(
coreSize,
coreSize * 4,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2000),
new NamedThreadFactory("Mixed-"),
new CallerRunsPolicy()
);7.2 监控线程池状态
java
class ThreadPoolMonitor {
public void monitor(ThreadPoolExecutor executor) {
// 1. 基础指标
int poolSize = executor.getPoolSize();
int activeCount = executor.getActiveCount();
int queuedCount = executor.getQueue().size();
long completedCount = executor.getCompletedTaskCount();
long taskCount = executor.getTaskCount();
// 2. 计算指标
double utilization = (double) activeCount / executor.getCorePoolSize();
double queueRatio = (double) queuedCount / executor.getQueue().size();
// 3. 日志输出
Log.d("PoolMonitor",
String.format("poolSize=%d, active=%d, queued=%d, completed=%d, util=%.2f%%",
poolSize, activeCount, queuedCount, completedCount, utilization * 100));
// 4. 告警
if (utilization > 0.9) {
Log.w("PoolMonitor", "线程池利用率过高");
}
if (queueRatio > 0.8) {
Log.w("PoolMonitor", "队列使用率过高");
}
}
}7.3 动态调整参数
java
class DynamicThreadPool {
private final ThreadPoolExecutor executor;
public DynamicThreadPool() {
this.executor = new ThreadPoolExecutor(
5, 10, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new NamedThreadFactory("Dynamic-"),
new AbortPolicy()
);
}
// 根据负载动态调整
public void adjustByLoad() {
int activeCount = executor.getActiveCount();
int queueSize = executor.getQueue().size();
if (queueSize > 80 && activeCount == executor.getMaximumPoolSize()) {
// 增加线程数
executor.setMaximumPoolSize(executor.getMaximumPoolSize() + 5);
} else if (queueSize < 10 && activeCount > executor.getCorePoolSize()) {
// 减少线程数
executor.setMaximumPoolSize(executor.getMaximumPoolSize() - 2);
}
}
// 设置核心线程超时
public void allowCoreThreadTimeout() {
executor.allowCoreThreadTimeOut(true);
executor.setKeepAliveTime(30L, TimeUnit.SECONDS);
}
// 切换拒绝策略
public void changeRejectPolicy() {
executor.setRejectedExecutionHandler(new CallerRunsPolicy());
}
}7.4 压力测试
java
class StressTest {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new NamedThreadFactory("Stress-"),
new AbortPolicy()
);
long startTime = System.currentTimeMillis();
// 提交 1000 个任务
for (int i = 0; i < 1000; i++) {
executor.submit(() -> {
try {
Thread.sleep(100); // 模拟耗时
} catch (InterruptedException e) {}
});
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.HOURS);
long endTime = System.currentTimeMillis();
System.out.println("耗时:" + (endTime - startTime) + "ms");
System.out.println("平均 TPS: " + (1000.0 / (endTime - startTime) * 1000));
}
}8. Android 中的线程池
8.1 Looper 线程模型
┌─────────────────────────────────────────┐
│ Android 应用进程 │
├─────────────────────────────────────────┤
│ ┌─────────────┐ │
│ │ 主线程 UI │ ────▶ Looper + Handler │
│ │ (Binder 线程) │ + MessageQueue │
│ └─────────────┘ │
│ │
│ ┌─────────────┐ │
│ │ 工作线程 │ ────▶ 线程池 │
│ │ (后台任务) │ + AsyncTask(V) │
│ └─────────────┘ │
└─────────────────────────────────────────┘8.2 Android 默认线程池
java
// Executors 提供的线程池不推荐直接使用
// 因为参数配置不合理(无界队列)
// ✅ 推荐:创建自定义线程池
class AndroidThreadPool {
private static ThreadPoolExecutor executor = null;
static {
executor = new ThreadPoolExecutor(
4, // 核心线程数
Math.min(10, // 最大线程数
Runtime.getRuntime().availableProcessors() * 2),
10L, // 空闲时间
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000), // 有界队列
new ThreadFactory() {
private final AtomicInteger count = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AndroidPool-" + count.getAndIncrement());
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
public static void execute(Runnable task) {
executor.execute(task);
}
public static Future<?> submit(Callable<?> task) {
return executor.submit(task);
}
}8.3 与 Handler 配合使用
java
class HandlerWithPool {
private final Handler mainHandler = new Handler(Looper.getMainLooper());
private final ThreadPoolExecutor pool = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new NamedThreadFactory("Pool-"),
new AbortPolicy()
);
public void doAsyncWork(final int id) {
pool.execute(() -> {
// 后台执行耗时操作
doHeavyWork(id);
// 切换回主线程更新 UI
mainHandler.post(() -> {
updateUI(id);
});
});
}
private void doHeavyWork(int id) {
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
}
private void updateUI(int id) {
Log.d("UI", "更新 UI: " + id);
}
}8.4 Android 11+ 的 Executor
java
// Android 11 (API 30+) 提供的 Executor
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.S) {
Executor mainExecutor = Executors.mainThreadExecutor();
// 后台线程池
Executor backgroundExecutor = Executors.newFixedThreadPool(4);
// 使用
mainExecutor.execute(() -> {
// 在主线程执行
});
backgroundExecutor.execute(() -> {
// 在后台执行
});
}9. 面试高频考点
考点一:线程池参数
Q: 线程池的核心参数有哪些?
A:
- corePoolSize:核心线程数
- maximumPoolSize:最大线程数
- keepAliveTime:空闲线程存活时间
- unit:时间单位
- workQueue:任务队列
- threadFactory:线程工厂
- handler:拒绝策略
考点二:工作流程
Q: 线程池的工作流程是怎样的?
A:
- 判断核心线程是否满,未满则创建核心线程执行
- 核心线程满,则加入队列
- 队列满,判断最大线程是否满,未满则创建非核心线程
- 最大线程满,执行拒绝策略
考点三:拒绝策略
Q: 四种拒绝策略的区别?
A:
- AbortPolicy:抛出异常(默认)
- CallerRunsPolicy:调用者线程执行
- DiscardPolicy:直接丢弃
- DiscardOldestPolicy:丢弃最老的任务
考点四:Executors 问题
Q: 为什么推荐使用 ThreadPoolExecutor 构造器而不是 Executors?
A:
- newFixedThreadPool 和 newSingleThreadExecutor 的队列是无界的,可能 OOM
- newCachedThreadPool 的最大线程数是 Integer.MAX_VALUE,可能 OOM
- 应该根据业务场景自定义参数
考点五:线程池关闭
Q: shutdown() 和 shutdownNow() 的区别?
A:
- shutdown():优雅关闭,不接收新任务,等待已提交任务完成
- shutdownNow():立即关闭,尝试终止正在执行的任务,返回未执行的任务
考点六:参数调优
Q: CPU 密集型和 IO 密集型任务如何设置线程池参数?
A:
- CPU 密集型:coreSize = CPU 核数 + 1
- IO 密集型:coreSize = CPU 核数 * 2 或更多
考点七:队列选择
Q: 不同队列类型的选择?
A:
- ArrayBlockingQueue:有界,适合任务量可控
- LinkedBlockingQueue:可选有界,默认选择
- SynchronousQueue:不存储,直接传递
- PriorityBlockingQueue:优先级队列
考点八:线程池监控
Q: 如何监控线程池?
A:
- 获取 poolSize、activeCount、queueSize、taskCount 等指标
- 设置告警阈值
- 记录拒绝任务日志
考点九:动态调整
Q: 如何动态调整线程池参数?
A:
- setCorePoolSize()、setMaximumPoolSize()
- setKeepAliveTime()
- setRejectedExecutionHandler()
- 根据监控指标动态调整
考点十:异常处理
Q: 线程池中任务抛异常怎么办?
A:
- 任务内部 try-catch
- 使用 Future 的 get() 获取异常
- 设置 Thread.UncaughtExceptionHandler
- 记录日志便于排查
最佳实践总结
1. 参数配置
java
// ✅ 推荐配置
ThreadPoolExecutor executor = new ThreadPoolExecutor(
coreSize,
maxSize,
keepAliveTime,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(queueCapacity), // 有界队列
new NamedThreadFactory("pool-"),
new CallerRunsPolicy() // 不会丢失任务
);2. 优雅关闭
java
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
Log.e("Pool", "未正常关闭");
}
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}3. 监控告警
java
// 定期检查线程池状态
if (executor.getQueue().size() > threshold) {
Log.w("Pool", "队列堆积");
}4. 异常处理
java
executor.execute(() -> {
try {
// 任务
} catch (Exception e) {
Log.e("Pool", "任务异常", e);
}
});总结: 线程池是 Android 并发编程的核心工具。掌握 ThreadPoolExecutor 的工作原理、参数调优、监控方法,能够在实际项目中高效使用线程池,提升应用性能和稳定性。