# Java 并发
# 1. CompletableFuture
工具类:
public class SmallTool {
public static void sleepMillis(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public static void printTimeAndThread(String tag) {
String result = new StringJoiner("\t|\t")
.add(String.valueOf(System.currentTimeMillis()))
.add(String.valueOf(Thread.currentThread().getId()))
.add(Thread.currentThread().getName())
.add(tag)
.toString();
System.out.println(result);
}
}
# 1.1. supplyAsync 开启
@Test
public void supplyAsync_开启_异步方法() {
SmallTool.printTimeAndThread("小明 点餐: 番茄炒蛋 + 米饭");
// 启动方法
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("厨师 炒菜");
SmallTool.sleepMillis(300);
SmallTool.printTimeAndThread("厨师 打饭");
SmallTool.sleepMillis(200);
return "番茄炒蛋 + 米饭 做好了";
});
SmallTool.printTimeAndThread("小明 打游戏");
// 等待异步执行完毕
String result = completableFuture.join();
SmallTool.printTimeAndThread(String.format("%s, 小明吃饭", result));
}
输出:
1763904357849 | 1 | main | 小明 点餐: 番茄炒蛋 + 米饭
1763904357852 | 1 | main | 小明 打游戏
1763904357852 | 22 | ForkJoinPool.commonPool-worker-25 | 厨师 炒菜
1763904358164 | 22 | ForkJoinPool.commonPool-worker-25 | 厨师 打饭
1763904358364 | 1 | main | 番茄炒蛋 + 米饭 做好了, 小明吃饭
# 1.2. runAsync 开启
@Test
public void runAsync_开启_异步方法_不需要返回值() {
SmallTool.printTimeAndThread("小明 点餐: 番茄炒蛋 + 米饭");
// 主线程不会等待异步任务执行完毕
CompletableFuture.runAsync(() -> {
SmallTool.printTimeAndThread("厨师 炒菜");
SmallTool.sleepMillis(300);
SmallTool.printTimeAndThread("厨师 打饭");
});
SmallTool.printTimeAndThread("小明 打游戏");
}
/* =>
1763980773336 | 1 | main | 小明 点餐: 番茄炒蛋 + 米饭
1763980773340 | 1 | main | 小明 打游戏
1763980773340 | 26 | ForkJoinPool.commonPool-worker-1 | 厨师 炒菜
*/
# 1.3. thenCompose 连接
@Test
public void thenCompose_连接_两个异步方法顺序执行() {
SmallTool.printTimeAndThread("小明 点餐: 番茄炒蛋 + 米饭");
// 厨师炒完菜,服务员才去打饭,然后组合 菜和饭
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("厨师 炒菜");
SmallTool.sleepMillis(300);
return "番茄炒蛋";
}).thenCompose((dish) -> CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("服务员 打饭");
SmallTool.sleepMillis(200);
return String.format("%s + 米饭 做好了", dish);
}));
SmallTool.printTimeAndThread("小明 打游戏");
String result = completableFuture.join();
SmallTool.printTimeAndThread(String.format("%s, 小明吃饭", result));
}
输出:
1763904387656 | 1 | main | 小明 点餐: 番茄炒蛋 + 米饭
1763904387659 | 22 | ForkJoinPool.commonPool-worker-25 | 厨师 炒菜
1763904387659 | 1 | main | 小明 打游戏
1763904387970 | 22 | ForkJoinPool.commonPool-worker-25 | 服务员 打饭
1763904388172 | 1 | main | 番茄炒蛋 + 米饭 做好了, 小明吃饭
# 1.4. thenCombine 合并
拿到前面两个任务的结果进行加工处理
@Test
public void thenCombine_合并_多个异步方法并发执行() {
SmallTool.printTimeAndThread("小明 点餐: 番茄炒蛋 + 米饭");
// 厨师炒菜、服务员煮饭 同时进行,等都完毕后,再组合 菜和饭
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("厨师 炒菜");
SmallTool.sleepMillis(300);
return "番茄炒蛋";
}).thenCombine(CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("服务员 煮饭");
SmallTool.sleepMillis(200);
return "米饭";
}), (dish, rice) -> {
SmallTool.printTimeAndThread("服务员 打饭");
SmallTool.sleepMillis(100);
return String.format("%s + %s 做好了", dish, rice);
});
SmallTool.printTimeAndThread("小明 打游戏");
String result = completableFuture.join();
SmallTool.printTimeAndThread(String.format("%s, 小明吃饭", result));
}
输出:
1763904402238 | 1 | main | 小明 点餐: 番茄炒蛋 + 米饭
1763904402242 | 22 | ForkJoinPool.commonPool-worker-25 | 厨师 炒菜
1763904402242 | 23 | ForkJoinPool.commonPool-worker-18 | 服务员 煮饭
1763904402242 | 1 | main | 小明 打游戏
1763904402547 | 22 | ForkJoinPool.commonPool-worker-25 | 服务员 打饭
1763904402656 | 1 | main | 番茄炒蛋 + 米饭 做好了, 小明吃饭
# 1.5. thenAcceptBoth 合并,无返回值
@Test
public void thenAcceptBoth_合并_拿到前面两个任务的结果进行加工处理() {
// thenAcceptBoth: 拿到前面两个任务的结果进行加工处理,不需要返回值
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("任务1 执行");
SmallTool.sleepMillis(300);
return "任务1 结果";
}).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("任务2 执行");
SmallTool.sleepMillis(300);
return "任务2 结果";
}), (task1_result, task2_result) -> {
SmallTool.printTimeAndThread(task1_result);
SmallTool.printTimeAndThread(task2_result);
});
completableFuture.join();
}
/*=>
1763981675284 | 27 | ForkJoinPool.commonPool-worker-2 | 任务2 执行
1763981675284 | 26 | ForkJoinPool.commonPool-worker-1 | 任务1 执行
1763981675596 | 26 | ForkJoinPool.commonPool-worker-1 | 任务1 结果
1763981675596 | 26 | ForkJoinPool.commonPool-worker-1 | 任务2 结果
*/
# 1.6. thenAcceptBoth 合并,无返回值
@Test
public void runAfterBoth_合并_拿到前面两个任务的结果进行加工处理() {
// thenAcceptBoth: 不需要前面两个任务的结果,不需要返回值
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("任务1 执行");
SmallTool.sleepMillis(300);
return "任务1 结果";
}).runAfterBoth(CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("任务2 执行");
SmallTool.sleepMillis(300);
return "任务2 结果";
}), () -> {
SmallTool.printTimeAndThread("执行完毕");
});
completableFuture.join();
}
/*=>
1763981945286 | 26 | ForkJoinPool.commonPool-worker-1 | 任务1 执行
1763981945286 | 27 | ForkJoinPool.commonPool-worker-2 | 任务2 执行
1763981945601 | 27 | ForkJoinPool.commonPool-worker-2 | 执行完毕
*/
# 1.7. thenApply
@Test
public void thenApply_() {
SmallTool.printTimeAndThread("小明 结账、要求开发票");
// 同一个线程,顺序执行多个方法,
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("服务员A 收款");
SmallTool.sleepMillis(300);
return 500;
}).thenApply((money) -> {
SmallTool.printTimeAndThread(String.format("服务员A 开发票 —— 面额 %s 元", money));
SmallTool.sleepMillis(200);
return String.format("%s 元发票", money);
});
SmallTool.printTimeAndThread("小明 打游戏");
String result = completableFuture.join();
SmallTool.printTimeAndThread(String.format("小明拿到 %s, 准备回家", result));
}
输出:
1763904423520 | 1 | main | 小明 结账、要求开发票
1763904423524 | 1 | main | 小明 打游戏
1763904423524 | 22 | ForkJoinPool.commonPool-worker-25 | 服务员A 收款
1763904423831 | 22 | ForkJoinPool.commonPool-worker-25 | 服务员A 开发票 —— 面额 500 元
1763904424034 | 1 | main | 小明拿到 500 元发票, 准备回家
# 1.8. thenApplyAsync, 独立任务
@Test
public void thenApplyAsync_() {
SmallTool.printTimeAndThread("小明 结账、要求开发票");
// 不同的线程,顺序执行多个方法,与 thenCompose 类似
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("服务员A 收款");
SmallTool.sleepMillis(300);
return 500;
}).thenApplyAsync((money) -> {
SmallTool.printTimeAndThread(String.format("服务员B 开发票 —— 面额 %s 元", money));
SmallTool.sleepMillis(200);
return String.format("%s 元发票", money);
});
SmallTool.printTimeAndThread("小明 打游戏");
String result = completableFuture.join();
SmallTool.printTimeAndThread(String.format("小明拿到 %s, 准备回家", result));
}
输出:
1763904472241 | 1 | main | 小明 结账、要求开发票
1763904472244 | 1 | main | 小明 打游戏
1763904472244 | 22 | ForkJoinPool.commonPool-worker-25 | 服务员A 收款
1763904472545 | 23 | ForkJoinPool.commonPool-worker-18 | 服务员B 开发票 —— 面额 500 元
1763904472749 | 1 | main | 小明拿到 500 元发票, 准备回家
# 1.9. thenAccept,无返回值
@Test
public void thenAccept_() {
// thenAccept: 需要上一个任务的结果,但当前任务没有返回值
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("任务1 执行");
SmallTool.sleepMillis(300);
return "任务1 的结果";
}).thenAccept((result) -> {
SmallTool.printTimeAndThread("任务2 执行");
SmallTool.sleepMillis(300);
});
// 等待异步任务执行完毕
completableFuture.join();
}
/*=>
1763981134001 | 26 | ForkJoinPool.commonPool-worker-1 | 任务1 执行
1763981134303 | 26 | ForkJoinPool.commonPool-worker-1 | 任务2 执行
*/
# 1.10. thenRun,无入参无返回值
@Test
public void thenRun_() {
// thenAccept: 不需要上一个任务的结果,且当前任务没有返回值
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("任务1 执行");
SmallTool.sleepMillis(300);
return "任务1 的结果";
}).thenRun(() -> {
SmallTool.printTimeAndThread("任务2 执行");
SmallTool.sleepMillis(300);
});
// 等待异步任务执行完毕
completableFuture.join();
}
/*=>
1763981299650 | 26 | ForkJoinPool.commonPool-worker-1 | 任务1 执行
1763981299957 | 26 | ForkJoinPool.commonPool-worker-1 | 任务2 执行
*/
# 1.11. applyToEither 及 exceptionally
@Test
public void applyToEither_() {
SmallTool.printTimeAndThread("小明 走出餐厅,来到公交站");
SmallTool.printTimeAndThread("小明 准备坐 700路 或 800路 公交");
// 两个线程同时执行,哪个先执行完毕 就返回哪个的结果,末尾处理异常
CompletableFuture<String> completableFuture = CompletableFuture
.supplyAsync(() -> {
SmallTool.printTimeAndThread("700路 公交正在赶来");
SmallTool.sleepMillis(300);
return "700路 到了";
})
.applyToEither(CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("800路 公交正在赶来");
SmallTool.sleepMillis(200);
return "800路 到了";
}), firstComeBus -> {
if (true) {
throw new RuntimeException(String.format("%s,小明 刷公交卡时,发现公交卡余额不足,直接下了公交车", firstComeBus));
}
return firstComeBus;
})
.exceptionally((e) -> {
SmallTool.printTimeAndThread(e.getMessage());
SmallTool.printTimeAndThread("小明 呼叫出租车");
return "出租车 到了";
});
String result = completableFuture.join();
SmallTool.printTimeAndThread(String.format("%s, 小明坐车回家", result));
}
输出:
1763904539565 | 1 | main | 小明 走出餐厅,来到公交站
1763904539565 | 1 | main | 小明 准备坐 700路 或 800路 公交
1763904539569 | 23 | ForkJoinPool.commonPool-worker-18 | 800路 公交正在赶来
1763904539569 | 22 | ForkJoinPool.commonPool-worker-25 | 700路 公交正在赶来
1763904539779 | 23 | ForkJoinPool.commonPool-worker-18 | java.lang.RuntimeException: 800路 到了,小明 刷公交卡时,发现公交卡余额不足,直接下了公交车
1763904539779 | 23 | ForkJoinPool.commonPool-worker-18 | 小明 呼叫出租车
1763904539779 | 1 | main | 出租车 到了, 小明坐车回家
# 1.12. handle
@Test
public void handle_() {
// handle 处理上一任务的异常
CompletableFuture<String> completableFuture = CompletableFuture
.supplyAsync(() -> {
SmallTool.printTimeAndThread("任务1 执行");
SmallTool.sleepMillis(300);
if (true) {
throw new RuntimeException("任务1 异常");
}
return "任务1 结果";
})
.handle((result, e) -> {
if (e != null) {
return "任务1 异常处理,修正后的结果";
}
return result;
});
String result = completableFuture.join();
SmallTool.printTimeAndThread(result);
}
/*=>
1763982424184 | 26 | ForkJoinPool.commonPool-worker-1 | 任务1 执行
1763982424491 | 1 | main | 任务1 异常处理,修正后的结果
*/
# 1.13. whenComplete
用于处理上一任务的结果
@Test
public void whenComplete_() {
// whenComplete 不处理异常,会继续往下传递异常
CompletableFuture<String> completableFuture = CompletableFuture
.supplyAsync(() -> {
SmallTool.printTimeAndThread("任务1 执行");
SmallTool.sleepMillis(300);
if (true) {
throw new RuntimeException("任务1 异常");
}
return "任务1 结果";
})
.whenComplete((result, e) -> {
if (e != null) {
SmallTool.printTimeAndThread("任务1 异常");
return;
}
SmallTool.printTimeAndThread(result);
});
String result = completableFuture.join();
SmallTool.printTimeAndThread(result);
}
/*=>
1763982861502 | 26 | ForkJoinPool.commonPool-worker-1 | 任务1 执行
1763982861814 | 26 | ForkJoinPool.commonPool-worker-1 | 任务1 异常
java.util.concurrent.CompletionException: java.lang.RuntimeException: 任务1 异常
*/
# 1.14. 参数列表
大多数 xxx(arg) 方法,都有如下两个重载的方法:
xxxAsync(arg)xxxAsync(arg, Executor)
比如 thenApply(arg) / thenApplyAsync(arg)
// thenApply 会将 任务1 和 任务2 合并,放在一个线程里执行
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(
// 任务1
() -> {
return "任务1 的返回值";
}
).thenApply(
// 任务2
(result) -> { // result: "任务1 的返回值"
return "任务2 的返回值";
}
);
// thenApplyAsync 任务1 执行完毕后,在一个新的线程里执行 任务2
// 任务2 是一个独立的任务
CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(
// 任务1
() -> {
return "任务1 的返回值";
}
).thenApplyAsync(
// 任务2
(result) -> { // result: "任务1 的返回值"
return "任务2 的返回值";
}
);
# 2. CompletableFuture 性能问题
# 2.1. ForkJoinPool
CompletableFuture 默认使用 ForkJoinPool 线程池
ForkJoinPool 的最大线程池数量为: CPU核心数-1
可以全局修改 ForkJoinPool 的最大线程数(无法动态修改),但不推荐。
@Test
public void testForkJoinPool() {
// CompletableFuture 底层用到的 ForkJoinPool
System.out.println("处理器核心数: " + Runtime.getRuntime().availableProcessors());
//=> 20
System.out.println("ForkJoinPool 当前线程数:" + ForkJoinPool.commonPool().getPoolSize());
//=> 0
System.out.println("ForkJoinPool 最大线程数:" + ForkJoinPool.getCommonPoolParallelism());
//=> 19
}
@Test
public void ForkJoinPool_设置的最大线程数() {
// 设置虚拟机参数
//-Djava.util.concurrent.ForkJoinPool.common.parallelism=40
// 或者在程序启动时设置
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "40");
System.out.println("ForkJoinPool 最大线程数:" + ForkJoinPool.getCommonPoolParallelism());
//=> 40
}
使用自定义的线程池来设置线程数量,用完后直接销毁线程池。
@Test
public void test1() {
ExecutorService executor = Executors.newCachedThreadPool();
try {
long start = System.currentTimeMillis();
int total = 100;
SmallTool.printTimeAndThread("进餐厅点菜");
CompletableFuture[] dishCompletableFutures = IntStream.rangeClosed(1, total)
.mapToObj(i -> new Dish("菜" + i, 1))
.map(dish -> CompletableFuture.runAsync(dish::make, executor))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(dishCompletableFutures).join();
long seconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - start);
SmallTool.printTimeAndThread(String.format("做完 %s 道菜,耗时 %s 秒", total, seconds));
} finally {
executor.shutdown();
}
}
/*=>
使用 睡眠 来模拟耗时是不准确的。
1763991372481 | 1 | main | 做完 100 道菜,耗时 1 秒
*/
public class Dish {
/** 菜名 */
private String name;
/** 做菜时间() */
private Integer productionSeconds;
public Dish(String name, Integer productionSeconds) {
this.name = name;
this.productionSeconds = productionSeconds;
}
public void make() {
SmallTool.sleepMillis(TimeUnit.SECONDS.toMillis(this.productionSeconds));
SmallTool.printTimeAndThread(this.name + " 制作完毕");
}
}
# 3. 线程池
# 3.1. ThreadPoolExecutor 构造函数
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数,最小线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 超过核心线程数的线程 的存活时间
TimeUnit unit,
BlockingQueue<Runnable> workQueue, // 工作队列
ThreadFactory threadFactory, // 线程工厂
// 拒绝策略,任务装满队列后如何处理
// 1. 直接丢弃
// 2. 替换工作队列中的最后一个
// 3. 抛异常
// 4. 谁提交的任务,谁去处理
// 自行扩展...
RejectedExecutionHandler handler
) {
}
这个构造函数过于复杂,一般不使用
# 3.2. 创建线程池
/*
单线程线程池
new ThreadPoolExecutor(
1,
1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()
)
*/
Executors.newSingleThreadExecutor();
/*
缓存线程池,无上限,过多时会被操作系统限制
new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>()
)
*/
Executors.newCachedThreadPool();
/*
固定线程数量
new ThreadPoolExecutor(
nThreads,
nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>())
*/
Executors.newFixedThreadPool(3);
ExecutorService executor = Executors.newSingleThreadExecutor();
// Java8 之前 执行任务 的方法
// executor.execute();
// executor.submit()
# 4. 阻塞队列
- LinkedBlockingQueue
- 数据结构: 链表
- 特点: 同时存取
- ArrayBlockingQueue
- 数据结构: 数组
- 特点: 公平存储
- SynchronousQueue
- 数据结构: 没有容量
- 特点: 双方就绪
# 4.1. 引入
阻塞队列的功能:
- 队列需要有一个固定的容量
- 如果队列是空的
- 取不出来东西,需要等到有元素为止
- 如果队列是满的
- 放不进去东西,需要等到有空位为止
LinkedBlockingQueue:
@Test
public void 放的时候满了_一直等待() {
LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(1);
try {
linkedBlockingQueue.put("A");
linkedBlockingQueue.put("B"); // 这里会一直等待
} catch (InterruptedException e) {
SmallTool.printTimeAndThread("放元素时被中断");
}
}
@Test
public void 取的时候空了_一直等待() {
LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(1);
try {
String item = linkedBlockingQueue.take();// 这里会一直等待
} catch (InterruptedException e) {
SmallTool.printTimeAndThread("取元素时被中断");
}
}
好处:
- 不用考虑 多线程
- 不用考虑 变量被共享
# 4.2. 基础
说明:
- 支持多生产者、多消费者
取:
take(): 死等poll(Long, TimeUnit): 超时等poll(): 不等
放:
put(E): 死等offer(E, Long, TimeUnit): 超时等offer(E): 不等
# 4.3. LinkedBlockingQueue
说明:
- 支持同时存取
- 有两把锁,一把控制消费者,一把控制生产者,所以生产者、消费者 可以同时操作这个队列
取的时候,设置超时时间,避免无限等待:
@Test
public void 取的时候空了_设置超时时间() {
LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(1);
try {
SmallTool.printTimeAndThread("开始取元素");
// 最多等 3 秒,超时返回 null
String item = linkedBlockingQueue.poll(3, TimeUnit.SECONDS);
if (item == null) {
SmallTool.printTimeAndThread("超时未取到");
}
} catch (InterruptedException e) {
SmallTool.printTimeAndThread("取元素时被中断");
}
}
取的时候,有就取,没有就返回 null:
@Test
public void 取的时候空了_不等待() {
LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(1);
SmallTool.printTimeAndThread("开始取元素");
String item = linkedBlockingQueue.poll();
if (item == null) {
SmallTool.printTimeAndThread("未取到");
}
}
# 4.4. ArrayBlockingQueue
说明:
- 支持公平
- 只有一把锁
公平:
- 先来的消费者,一定先取出来
- 先来的生产者,一定先放进去
示例:
@Test
public void 消费者取的时候_不排队_后来的有可能先取到() throws InterruptedException {
BlockingQueue<String> queue = new LinkedBlockingQueue<>(3);
两个线程先后取元素(queue);
}
/* =>
1764069650882 | 26 | Thread-0 | 消费者A 开始取
1764069650992 | 27 | Thread-1 | 消费者B 开始取
1764069650993 | 27 | Thread-1 | 消费者B 取到了: 苹果
*/
@Test
public void 消费者取的时候_排队_先来的先取() throws InterruptedException {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3, true);
两个线程先后取元素(queue);
}
private void 两个线程先后取元素(BlockingQueue<String> queue) throws InterruptedException {
new Thread(() -> {
SmallTool.printTimeAndThread("消费者A 开始取");
try {
String element = queue.poll(1, TimeUnit.SECONDS);
SmallTool.printTimeAndThread(String.format("消费者A 取到了: %s", element));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
Thread.sleep(100); // 模拟 消费者A 先到
new Thread(() -> {
SmallTool.printTimeAndThread("消费者B 开始取");
try {
String element = queue.poll(1, TimeUnit.SECONDS);
SmallTool.printTimeAndThread(String.format("消费者B 取到了: %s", element));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
queue.offer("苹果");
}
# 4.5. SynchronousQueue
说明:
- 没有容量
- 支持公平
示例:
@Test
public void 没有容量的阻塞队列() {
BlockingQueue<String> queue = new SynchronousQueue<>();
new Thread(() -> {
try {
SmallTool.printTimeAndThread("生产者 生产 苹果");
queue.put("苹果");
SmallTool.sleepMillis(500);
SmallTool.printTimeAndThread("生产者 生产 梨子");
queue.put("梨子");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
new Thread(() -> {
try {
SmallTool.printTimeAndThread(String.format("消费者 消费 %s", queue.take()));
SmallTool.sleepMillis(100);
SmallTool.printTimeAndThread(String.format("消费者 消费 %s", queue.take()));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
SmallTool.sleepMillis(2000);
}
/*=>
1764071066601 | 26 | Thread-0 | 生产者 生产 苹果
1764071066602 | 27 | Thread-1 | 消费者 消费 苹果
1764071067114 | 26 | Thread-0 | 生产者 生产 梨子
1764071067114 | 27 | Thread-1 | 消费者 消费 梨子
*/
# 4.6. PriorityBlockingQueue
说明:
- 先取小的
- 元素要么实现 Comparable 接口,要么构造队列是传入比较器
- 队列容量没有限制
- 遍历队列时,元素是无序的
示例:
class Person {
private String name;
private Level level;
public Level getLevel() {
return level;
}
public Person(String name, Level level) {
this.name = name;
this.level = level;
}
@Override
public String toString() {
return String.format("{ name: \"%s\", level: %s }", name, level.name());
}
}
@Test
public void 排序队列_从小到大排序_小的先出队() {
BlockingQueue<Person> blockingQueue = new PriorityBlockingQueue<>(
3, // 初始容量
Comparator.comparing(Person::getLevel) // 比较器
);
Thread threadOne = new Thread(() -> {
Person person1 = new Person("张三", Level.Three);
Person person2 = new Person("李四", Level.One);
Person person3 = new Person("王五", Level.Two);
blockingQueue.offer(person1);
SmallTool.printTimeAndThread("进: " + person1);
blockingQueue.offer(person2);
SmallTool.printTimeAndThread("进: " + person2);
blockingQueue.offer(person3);
SmallTool.printTimeAndThread("进: " + person3);
}, "线程一");
threadOne.start();
try {
threadOne.join(); // 等待执行完毕
} catch (InterruptedException e) {
SmallTool.printTimeAndThread("被中断" + e.getMessage());
}
new Thread(() -> {
SmallTool.printTimeAndThread("出: " + blockingQueue.poll());
}, "线程二").start();
}
/*
1764824138373 | 26 | 线程一 | 进: { name: "张三", level: Three }
1764824138373 | 26 | 线程一 | 进: { name: "李四", level: One }
1764824138373 | 26 | 线程一 | 进: { name: "王五", level: Two }
1764824138373 | 27 | 线程二 | 出: { name: "李四", level: One }
*/
上一篇: 下一篇:
本章目录