Skip to content

07_线程通信

目录

  1. 线程通信概述
  2. Object.wait/notify 机制
  3. CountDownLatch
  4. CyclicBarrier
  5. Semaphore
  6. Exchanger
  7. BlockingQueue
  8. CompletableFuture
  9. 对比分析与选型
  10. 面试高频考点

1. 线程通信概述

1.1 什么是线程通信

线程通信是指多个线程之间进行数据交换、状态同步、任务协调等交互行为。在并发编程中,线程通信是必不可少的环节。

常见场景:

  • 生产者 - 消费者模式
  • 任务等待完成
  • 多阶段任务协调
  • 资源竞争控制

1.2 线程通信方式分类

┌───────────────────────────────────────────────┐
│              线程通信方式                     │
├───────────────────────────────────────────────┤
│  1. 基于锁的通信:synchronized, wait/notify   │
│  2. 基于队列的通信:BlockingQueue             │
│  3. 基于同步器的通信:CountDownLatch 等        │
│  4. 基于 Future 的通信:CompletableFuture     │
│  5. 基于共享变量的通信:volatile, Atomic      │
└───────────────────────────────────────────────┘

1.3 选择通信方式

场景推荐方式理由
等待任务完成CountDownLatch简单直接
多阶段协调CyclicBarrier循环等待
资源控制Semaphore信号量控制
数据交换BlockingQueue线程安全队列
异步计算CompletableFuture链式异步

2. Object.wait/notify 机制

2.1 基本原理

wait/notify 是 Java 内置的线程通信机制,基于 Object 类实现。

核心方法:

  • wait():释放锁,等待被通知
  • notify():唤醒一个等待的线程
  • notifyAll():唤醒所有等待的线程

2.2 基本用法

java
class CommunicationExample {
    private final Object lock = new Object();
    
    public void thread1() {
        synchronized (lock) {
            System.out.println("线程 1 等待");
            try {
                lock.wait(); // 等待通知
            } catch (InterruptedException e) {}
            System.out.println("线程 1 被唤醒");
        }
    }
    
    public void thread2() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {}
        
        synchronized (lock) {
            lock.notify(); // 发送通知
            System.out.println("线程 2 发送通知");
        }
    }
}

2.3 生产者 - 消费者模式

java
class ProducerConsumer {
    private final Queue<Integer> queue = new LinkedList<>();
    private final int MAX_SIZE = 10;
    private final Object lock = new Object();
    
    // 生产者
    public void produce() throws InterruptedException {
        synchronized (lock) {
            while (queue.size() >= MAX_SIZE) {
                lock.wait(); // 队列满,等待
            }
            
            Integer item = System.currentTimeMillis().hashCode();
            queue.add(item);
            System.out.println("生产:" + item + ", 队列大小:" + queue.size());
            
            lock.notifyAll(); // 通知消费者
        }
    }
    
    // 消费者
    public void consume() throws InterruptedException {
        synchronized (lock) {
            while (queue.isEmpty()) {
                lock.wait(); // 队列空,等待
            }
            
            Integer item = queue.poll();
            System.out.println("消费:" + item + ", 队列大小:" + queue.size());
            
            lock.notifyAll(); // 通知生产者
        }
    }
    
    public static void main(String[] args) {
        ProducerConsumer pc = new ProducerConsumer();
        
        // 启动生产者
        new Thread(() -> {
            try {
                for (int i = 0; i < 20; i++) {
                    pc.produce();
                    Thread.sleep(500);
                }
            } catch (InterruptedException e) {}
        }).start();
        
        // 启动消费者
        new Thread(() -> {
            try {
                for (int i = 0; i < 20; i++) {
                    pc.consume();
                    Thread.sleep(800);
                }
            } catch (InterruptedException e) {}
        }).start();
    }
}

2.4 wait/notify 注意事项

1. 必须在同步代码块中调用

java
// ❌ 错误:不在同步块中
lock.wait(); // 抛出 IllegalMonitorStateException

// ✅ 正确:在同步块中
synchronized (lock) {
    lock.wait();
}

2. 使用 while 循环而不是 if

java
// ❌ 错误:可能被虚假唤醒
synchronized (lock) {
    if (condition) {
        lock.wait();
    }
}

// ✅ 正确:检查条件
synchronized (lock) {
    while (!condition) {
        lock.wait();
    }
}

3. notify 和 notifyAll 的选择

java
// notify:唤醒任意一个等待线程
// 可能唤醒错误的线程

// notifyAll:唤醒所有等待线程
// 被唤醒的线程需要重新检查条件
// 更安全,推荐使用

2.5 wait/notify 完整示例

java
class Printer {
    private final Object lock = new Object();
    private int counter = 1;
    private int max = 10;
    
    public void print() throws InterruptedException {
        synchronized (lock) {
            while (counter <= max) {
                System.out.println(counter++);
                if (counter > max) break;
                lock.wait();
            }
        }
    }
    
    public void notify() {
        synchronized (lock) {
            lock.notify();
        }
    }
}

3. CountDownLatch

3.1 概述

CountDownLatch(倒数门闩)允许一个或多个线程等待其他线程完成操作。

核心方法:

  • countDown():计数减一
  • await():等待计数归零

3.2 基本用法

java
// 等待 5 个任务完成
CountDownLatch latch = new CountDownLatch(5);

for (int i = 0; i < 5; i++) {
    new Thread(() -> {
        // 执行任务
        doWork();
        
        // 完成任务,计数减一
        latch.countDown();
    }).start();
}

// 等待所有任务完成
latch.await();
System.out.println("所有任务完成");

3.3 场景一:主线程等待子线程

java
class Worker {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(5);
        
        // 启动 5 个工作线程
        for (int i = 0; i < 5; i++) {
            final int taskId = i;
            new Thread(() -> {
                try {
                    System.out.println("任务 " + taskId + " 开始");
                    Thread.sleep(1000);
                    System.out.println("任务 " + taskId + " 完成");
                } catch (InterruptedException e) {}
                finally {
                    latch.countDown();
                }
            }).start();
        }
        
        // 主线程等待
        System.out.println("等待所有任务完成");
        latch.await();
        System.out.println("所有任务完成,可以汇总结果");
    }
}

3.4 场景二:服务启动等待

java
class ServiceStartup {
    private static CountDownLatch latch;
    
    public static void startAll() throws InterruptedException {
        latch = new CountDownLatch(3);
        
        // 启动数据库服务
        new Thread(() -> {
            System.out.println("启动数据库服务");
            try { Thread.sleep(2000); } catch (e) {}
            System.out.println("数据库服务启动完成");
            latch.countDown();
        }).start();
        
        // 启动缓存服务
        new Thread(() -> {
            System.out.println("启动缓存服务");
            try { Thread.sleep(1500); } catch (e) {}
            System.out.println("缓存服务启动完成");
            latch.countDown();
        }).start();
        
        // 启动 Web 服务
        new Thread(() -> {
            System.out.println("启动 Web 服务");
            try { Thread.sleep(1000); } catch (e) {}
            System.out.println("Web 服务启动完成");
            latch.countDown();
        }).start();
        
        // 等待所有服务启动
        System.out.println("等待所有服务启动");
        latch.await();
        System.out.println("所有服务启动完成,开始接收请求");
    }
}

3.5 场景三:并行计算汇总

java
class ParallelComputation {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(4);
        AtomicInteger sum = new AtomicInteger(0);
        
        // 4 个线程并行计算
        for (int i = 0; i < 4; i++) {
            final int startIndex = i * 25;
            final int endIndex = (i + 1) * 25;
            
            new Thread(() -> {
                int partialSum = 0;
                for (int j = startIndex; j < endIndex; j++) {
                    partialSum += j;
                }
                sum.addAndGet(partialSum);
                latch.countDown();
            }).start();
        }
        
        // 等待所有计算完成
        latch.await();
        System.out.println("总结果:" + sum.get());
    }
}

3.6 带超时等待

java
// 等待最多 5 秒
if (latch.await(5, TimeUnit.SECONDS)) {
    System.out.println("任务完成");
} else {
    System.out.println("超时,任务未完成");
}

3.7 CountDownLatch 特点

不可重用: 计数归零后无法重置

java
latch.countDown(); // 多次调用
latch.await();     // 第一次成功

// 第二次 await 立即返回(如果计数已为 0)
// 不能重复使用同一个 CountDownLatch

4. CyclicBarrier

4.1 概述

CyclicBarrier(循环屏障)允许一组线程互相等待,直到所有线程都到达屏障点。

特点:

  • 可重复使用
  • 所有线程在屏障点等待
  • 可选择在屏障点执行公共任务

4.2 基本用法

java
// 3 个线程互相等待
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
    // 所有线程到达后执行
    System.out.println("所有线程到达屏障");
});

for (int i = 0; i < 3; i++) {
    final int threadId = i;
    new Thread(() -> {
        try {
            System.out.println("线程" + threadId + "到达屏障");
            barrier.await();
            System.out.println("线程" + threadId + "继续执行");
        } catch (Exception e) {}
    }).start();
}

4.3 场景一:并行计算分阶段

java
class ParallelComputation {
    public static void main(String[] args) {
        final int THREAD_COUNT = 4;
        CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, () -> {
            System.out.println("=== 所有线程到达阶段 1,进入下一阶段 ===");
        });
        
        for (int i = 0; i < THREAD_COUNT; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    // 阶段 1
                    System.out.println("线程" + threadId + "执行阶段 1");
                    Thread.sleep((long)(Math.random() * 1000));
                    barrier.await();
                    
                    // 阶段 2
                    System.out.println("线程" + threadId + "执行阶段 2");
                    Thread.sleep((long)(Math.random() * 1000));
                    barrier.await();
                    
                    // 阶段 3
                    System.out.println("线程" + threadId + "执行阶段 3");
                    
                } catch (Exception e) {}
            }).start();
        }
    }
}

4.4 场景二:数据分片处理

java
class DataSharding {
    public static void main(String[] args) throws InterruptedException {
        final int SHARD_COUNT = 4;
        final int DATA_PER_SHARD = 1000;
        
        // 每轮处理后的回调
        CyclicBarrier barrier = new CyclicBarrier(SHARD_COUNT, () -> {
            System.out.println("一轮处理完成,所有分片数据已就绪");
        });
        
        for (int i = 0; i < SHARD_COUNT; i++) {
            final int shardId = i;
            new Thread(() -> {
                try {
                    for (int round = 0; round < 3; round++) {
                        System.out.println("分片" + shardId + "处理第" + round + "轮");
                        // 处理数据
                        for (int j = 0; j < DATA_PER_SHARD; j++) {
                            // 处理数据...
                        }
                        barrier.await(); // 等待所有分片
                    }
                } catch (Exception e) {}
            }).start();
        }
    }
}

4.5 带超时和异常处理

java
class CyclicBarrierWithTimeout {
    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(3);
        
        for (int i = 0; i < 3; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    System.out.println("线程" + threadId + "准备到达");
                    // 模拟不同到达时间
                    Thread.sleep((long)(threadId * 500));
                    
                    // 等待屏障,超时 1 秒
                    barrier.await(1, TimeUnit.SECONDS);
                    System.out.println("线程" + threadId + "通过屏障");
                    
                } catch (BrokenBarrierException e) {
                    System.out.println("线程" + threadId + "屏障被破坏");
                } catch (TimeoutException e) {
                    System.out.println("线程" + threadId + "等待超时");
                } catch (InterruptedException e) {
                    System.out.println("线程" + threadId + "被中断");
                }
            }).start();
        }
    }
}

4.6 CyclicBarrier vs CountDownLatch

特性CyclicBarrierCountDownLatch
可重用性可重用不可重用
等待方式互相等待单向等待
公共任务支持不支持
线程角色平等主从关系

5. Semaphore

5.1 概述

Semaphore(信号量)用于控制同时访问资源的线程数量。

核心方法:

  • acquire():获取许可
  • release():释放许可
  • tryAcquire():尝试获取许可
  • availablePermits():获取可用许可数

5.2 基本用法

java
// 最多允许 3 个线程访问资源
Semaphore semaphore = new Semaphore(3);

for (int i = 0; i < 10; i++) {
    final int taskId = i;
    new Thread(() -> {
        try {
            semaphore.acquire(); // 获取许可
            System.out.println("任务" + taskId + "获取许可,开始执行");
            Thread.sleep(1000);
            System.out.println("任务" + taskId + "执行完成");
        } catch (InterruptedException e) {}
        finally {
            semaphore.release(); // 释放许可
        }
    }).start();
}

5.3 场景一:数据库连接池

java
class DatabaseConnectionPool {
    private final Semaphore semaphore;
    private final Queue<Connection> connections = new LinkedList<>();
    
    public DatabaseConnectionPool(int size) {
        this.semaphore = new Semaphore(size);
        // 初始化连接
        for (int i = 0; i < size; i++) {
            connections.offer(new Connection());
        }
    }
    
    public Connection getConnection() throws InterruptedException {
        semaphore.acquire();
        try {
            return connections.poll();
        } catch (Exception e) {
            throw e;
        }
    }
    
    public void releaseConnection(Connection conn) {
        connections.offer(conn);
        semaphore.release();
    }
    
    // 使用
    public void executeQuery() throws InterruptedException {
        Connection conn = getConnection();
        try {
            // 执行查询
            conn.query("SELECT * FROM users");
        } finally {
            releaseConnection(conn);
        }
    }
    
    class Connection {
        public void query(String sql) {
            System.out.println("执行查询:" + sql);
        }
    }
}

5.4 场景二:文件访问控制

java
class FileAccessController {
    private final Semaphore readSemaphore = new Semaphore(5);  // 最多 5 个读
    private final Semaphore writeSemaphore = new Semaphore(1); // 最多 1 个写
    
    public void readFile() throws InterruptedException {
        readSemaphore.acquire();
        try {
            System.out.println("读取文件");
        } finally {
            readSemaphore.release();
        }
    }
    
    public void writeFile() throws InterruptedException {
        // 写操作需要获取读和写两个锁
        writeSemaphore.acquire();
        readSemaphore.acquire();
        try {
            System.out.println("写入文件");
        } finally {
            readSemaphore.release();
            writeSemaphore.release();
        }
    }
}

5.5 公平 Semaphore

java
// 公平 Semaphore(按请求顺序获取许可)
Semaphore fairSemaphore = new Semaphore(3, true);

// 非公平 Semaphore(默认)
Semaphore unfairSemaphore = new Semaphore(3);

5.6 尝试获取许可

java
// 立即返回,不阻塞
if (semaphore.tryAcquire()) {
    try {
        // 访问资源
    } finally {
        semaphore.release();
    }
} else {
    System.out.println("无法获取许可");
}

// 带超时
if (semaphore.tryAcquire(2, TimeUnit.SECONDS)) {
    try {
        // 访问资源
    } finally {
        semaphore.release();
    }
}

6. Exchanger

6.1 概述

Exchanger 用于两个线程之间交换数据。

核心方法:

  • exchange(T object):交换对象

6.2 基本用法

java
Exchanger<String> exchanger = new Exchanger<>();

new Thread(() -> {
    try {
        String dataFromOther = exchanger.exchange("线程 1 的数据");
        System.out.println("线程 1 收到:" + dataFromOther);
    } catch (InterruptedException e) {}
}).start();

new Thread(() -> {
    try {
        Thread.sleep(1000);
        String dataFromOther = exchanger.exchange("线程 2 的数据");
        System.out.println("线程 2 收到:" + dataFromOther);
    } catch (InterruptedException e) {}
}).start();

6.3 场景:读写线程交换

java
class DataExchanger {
    public static void main(String[] args) {
        Exchanger<DataBuffer> exchanger = new Exchanger<>();
        DataBuffer[] buffers = {new DataBuffer(), new DataBuffer()};
        
        // 写线程
        new Thread(() -> {
            try {
                for (int i = 0; i < 5; i++) {
                    DataBuffer empty = buffers[i % 2];
                    // 填充数据
                    empty.setData("批次" + i + "的数据");
                    
                    // 交换
                    DataBuffer full = exchanger.exchange(empty);
                    
                    // 处理数据
                    System.out.println("写线程处理:" + full.getData());
                }
            } catch (InterruptedException e) {}
        }).start();
        
        // 读线程
        new Thread(() -> {
            try {
                for (int i = 0; i < 5; i++) {
                    DataBuffer empty = buffers[(i + 1) % 2];
                    // 填充数据
                    empty.setData("批次" + i + "的读取数据");
                    
                    // 交换
                    DataBuffer full = exchanger.exchange(empty);
                    
                    // 处理数据
                    System.out.println("读线程处理:" + full.getData());
                }
            } catch (InterruptedException e) {}
        }).start();
    }
    
    static class DataBuffer {
        private String data;
        public void setData(String data) { this.data = data; }
        public String getData() { return data; }
    }
}

7. BlockingQueue

7.1 概述

BlockingQueue 是阻塞队列,提供线程安全的队列操作。

核心方法:

  • put(T e):插入元素,队列满时阻塞
  • take():取出元素,队列空时阻塞
  • offer(T e, long timeout, TimeUnit unit):带超时的插入
  • poll(long timeout, TimeUnit unit):带超时的取出

7.2 常见实现

实现类特点
ArrayBlockingQueue有界数组队列
LinkedBlockingQueue可选有界链表队列
PriorityBlockingQueue优先级队列
DelayQueue延迟队列
SynchronousQueue不存储元素的队列

7.3 生产者 - 消费者实现

java
class BlockingQueueExample {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
        
        // 生产者
        new Thread(() -> {
            try {
                for (int i = 0; i < 20; i++) {
                    queue.put(i);
                    System.out.println("生产:" + i);
                    Thread.sleep(500);
                }
            } catch (InterruptedException e) {}
        }).start();
        
        // 消费者
        new Thread(() -> {
            try {
                for (int i = 0; i < 20; i++) {
                    Integer value = queue.take();
                    System.out.println("消费:" + value);
                    Thread.sleep(800);
                }
            } catch (InterruptedException e) {}
        }).start();
    }
}

7.4 四组方法对比

java
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(2);

// 1. 异常方式
try {
    queue.add(1);      // 满时抛出异常
    queue.remove();    // 空时抛出异常
} catch (Exception e) {}

// 2. 布尔方式
queue.offer(2);        // 满时返回 false
queue.poll();          // 空时返回 null

// 3. 阻塞方式
try {
    queue.put(3);      // 满时阻塞
    queue.take();      // 空时阻塞
} catch (InterruptedException e) {}

// 4. 超时方式
queue.offer(4, 1, TimeUnit.SECONDS);  // 超时返回 false
queue.poll(1, TimeUnit.SECONDS);       // 超时返回 null

7.5 优先级队列

java
class Task implements Comparable<Task> {
    private final int priority;
    private final String name;
    
    public Task(int priority, String name) {
        this.priority = priority;
        this.name = name;
    }
    
    @Override
    public int compareTo(Task other) {
        return Integer.compare(this.priority, other.priority);
    }
}

PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>();
queue.offer(new Task(3, "任务 3"));
queue.offer(new Task(1, "任务 1"));
queue.offer(new Task(2, "任务 2"));

// 取出顺序:任务 1, 任务 2, 任务 3
Task task = queue.take();
System.out.println(task.name);

8. CompletableFuture

8.1 概述

CompletableFuture 是 Java 8 引入的异步计算框架,支持链式调用和组合。

核心方法:

  • supplyAsync():异步供应
  • thenApply():转换结果
  • thenAccept():消费结果
  • thenCompose():组合 Future
  • exceptionally():异常处理

8.2 基本用法

java
// 异步执行
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    return "异步结果";
});

// 获取结果
future.thenAccept(result -> {
    System.out.println("结果:" + result);
});

// 阻塞获取
String result = future.join();

8.3 链式调用

java
CompletableFuture.supplyAsync(() -> {
    System.out.println("步骤 1");
    return 100;
})
.thenApply(result -> {
    System.out.println("步骤 2");
    return result * 2;
})
.thenAccept(result -> {
    System.out.println("步骤 3,最终结果:" + result);
})
.exceptionally(ex -> {
    System.out.println("异常:" + ex.getMessage());
    return null;
});

8.4 组合多个 Future

java
// allOf - 等待所有完成
CompletableFuture<Void> all = CompletableFuture.allOf(
    CompletableFuture.supplyAsync(() -> "任务 1"),
    CompletableFuture.supplyAsync(() -> "任务 2"),
    CompletableFuture.supplyAsync(() -> "任务 3")
);

all.thenRun(() -> {
    System.out.println("所有任务完成");
});

// anyOf - 等待任意一个完成
CompletableFuture<Object> any = CompletableFuture.anyOf(
    CompletableFuture.supplyAsync(() -> "任务 1"),
    CompletableFuture.supplyAsync(() -> "任务 2")
);

any.thenAccept(result -> {
    System.out.println("第一个完成的任务:" + result);
});

8.5 串行组合

java
// thenCompose - 串行执行(依赖前一个结果)
CompletableFuture.supplyAsync(() -> {
    return 1;
})
.thenCompose(id -> CompletableFuture.supplyAsync(() -> {
    // 使用上一个结果
    return "用户" + id;
}))
.thenCompose(user -> CompletableFuture.supplyAsync(() -> {
    // 使用用户信息
    return user + "的订单";
}))
.thenAccept(result -> {
    System.out.println(result);
});

8.6 异常处理

java
// 方式一:handle
CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("错误");
})
.handle((result, ex) -> {
    if (ex != null) {
        System.out.println("异常:" + ex.getMessage());
        return "默认值";
    }
    return result;
});

// 方式二:exceptionally
CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("错误");
})
.exceptionally(ex -> {
    System.out.println("异常:" + ex.getMessage());
    return "默认值";
});

8.7 自定义线程池

java
ExecutorService executor = new ThreadPoolExecutor(
    4, 8, 60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(100),
    new ThreadFactoryBuilder().setNameFormat("async-%d").build(),
    new ThreadPoolExecutor.CallerRunsPolicy()
);

CompletableFuture.supplyAsync(() -> {
    return "异步结果";
}, executor);

9. 对比分析与选型

9.1 工具对比

工具特点适用场景
wait/notify基础,需手动管理简单同步
CountDownLatch单向等待等待任务完成
CyclicBarrier循环等待分阶段并行
Semaphore信号量控制资源访问控制
BlockingQueue线程安全队列生产者 - 消费者
CompletableFuture异步链式复杂异步计算

9.2 选型建议

┌─────────────────┬──────────────┬─────────────────────┐
│     场景        │  推荐工具    │      理由           │
├─────────────────┼──────────────┼─────────────────────┤
│ 等待多个任务完成 │ CountDownLatch│ 简单直接           │
│ 多阶段并行       │ CyclicBarrier│ 循环使用           │
│ 限制并发数       │ Semaphore    │ 信号量控制         │
│ 数据传递         │ BlockingQueue│ 线程安全           │
│ 异步链式调用     │ CompletableFuture│ 强大组合能力  │
│ 简单同步         │ wait/notify  │ 基础够用           │
└─────────────────┴──────────────┴─────────────────────┘

9.3 性能对比

┌───────────────┬──────┬──────────┐
│     工具      │ 性能 │   开销   │
├───────────────┼──────┼──────────┤
│ wait/notify   │ 高   │ 低       │
│ CountDownLatch│ 高   │ 低       │
│ CyclicBarrier │ 中   │ 中       │
│ Semaphore     │ 高   │ 低       │
│ BlockingQueue │ 中   │ 中       │
│ CompletableFuture│高  │ 中       │
└───────────────┴──────┴──────────┘

10. 面试高频考点

考点一:wait/notify

Q: wait() 和 sleep() 的区别?

A:

  • wait 释放锁,sleep 不释放
  • wait 在同步块中使用,sleep 任意位置
  • wait 需要 notify 唤醒,sleep 自动唤醒

Q: notify 和 notifyAll 的区别?

A:

  • notify 唤醒任意一个
  • notifyAll 唤醒所有
  • notifyAll 更安全

考点二:CountDownLatch

Q: CountDownLatch 如何实现?

A:

  • 基于 AQS 实现
  • 维护一个 count 计数器
  • countDown 减一,await 等待归零

Q: CountDownLatch 可重用吗?

A: 不可重用,计数归零后无法重置

考点三:CyclicBarrier

Q: CyclicBarrier 和 CountDownLatch 的区别?

A:

  • CyclicBarrier 可重用
  • CyclicBarrier 互相等待
  • CyclicBarrier 支持公共任务

考点四:Semaphore

Q: Semaphore 的用途?

A:

  • 控制并发访问资源的线程数
  • 信号量控制
  • 数据库连接池

考点五:BlockingQueue

Q: BlockingQueue 有哪些实现?

A:

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • PriorityBlockingQueue
  • SynchronousQueue

考点六:CompletableFuture

Q: CompletableFuture 的优势?

A:

  • 链式调用
  • 组合 Future
  • 异常处理完善

考点七:线程通信选择

Q: 如何选择线程通信方式?

A:

  • 简单等待:CountDownLatch
  • 循环等待:CyclicBarrier
  • 资源控制:Semaphore
  • 数据传递:BlockingQueue
  • 异步计算:CompletableFuture

考点八:AQS 原理

Q: AQS 是什么?

A: AbstractQueuedSynchronizer,抽象队列同步器,Java 并发包的基础框架

考点九:虚假唤醒

Q: 什么是虚假唤醒?如何避免?

A:

  • 线程在没有被 notify 的情况下被唤醒
  • 使用 while 循环检查条件

考点十:死锁

Q: 什么是死锁?如何避免?

A:

  • 线程互相等待对方释放资源
  • 统一加锁顺序
  • 使用超时
  • 避免嵌套锁

最佳实践总结

1. 使用 while 检查条件

java
while (!condition) {
    lock.wait();
}

2. 使用 try-finally 释放资源

java
semaphore.acquire();
try {
    // 使用资源
} finally {
    semaphore.release();
}

3. 使用超时避免无限等待

java
if (latch.await(5, TimeUnit.SECONDS)) {
    // 成功
} else {
    // 超时处理
}

4. 优先使用高级同步器

java
// 使用 CountDownLatch 代替 wait/notify
CountDownLatch latch = new CountDownLatch(n);

5. 使用 CompletableFuture 组合任务

java
CompletableFuture.allOf(future1, future2)
    .thenRun(() -> {
        // 所有任务完成
    });

总结: 线程通信是并发编程的核心。掌握各种同步工具的特点和使用场景,能够编写更高效、更安全的并发代码。在实际开发中,应根据具体需求选择合适的同步工具。