# Java 并发

# 1. CompletableFuture

工具类:

public class SmallTool {
    public static void sleepMillis(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public static void printTimeAndThread(String tag) {
        String result = new StringJoiner("\t|\t")
                .add(String.valueOf(System.currentTimeMillis()))
                .add(String.valueOf(Thread.currentThread().getId()))
                .add(Thread.currentThread().getName())
                .add(tag)
                .toString();
        System.out.println(result);
    }
}

# 1.1. supplyAsync 开启

@Test
public void supplyAsync_开启_异步方法() {
    SmallTool.printTimeAndThread("小明 点餐: 番茄炒蛋 + 米饭");

    // 启动方法
    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("厨师 炒菜");
        SmallTool.sleepMillis(300);
        SmallTool.printTimeAndThread("厨师 打饭");
        SmallTool.sleepMillis(200);
        return "番茄炒蛋 + 米饭 做好了";
    });

    SmallTool.printTimeAndThread("小明 打游戏");

    // 等待异步执行完毕
    String result = completableFuture.join();

    SmallTool.printTimeAndThread(String.format("%s, 小明吃饭", result));
}

输出:

1763904357849	|	1	|	main	|	小明 点餐: 番茄炒蛋 + 米饭
1763904357852	|	1	|	main	|	小明 打游戏
1763904357852	|	22	|	ForkJoinPool.commonPool-worker-25	|	厨师 炒菜
1763904358164	|	22	|	ForkJoinPool.commonPool-worker-25	|	厨师 打饭
1763904358364	|	1	|	main	|	番茄炒蛋 + 米饭 做好了, 小明吃饭

# 1.2. runAsync 开启

@Test
public void runAsync_开启_异步方法_不需要返回值() {
    SmallTool.printTimeAndThread("小明 点餐: 番茄炒蛋 + 米饭");

    // 主线程不会等待异步任务执行完毕
    CompletableFuture.runAsync(() -> {
        SmallTool.printTimeAndThread("厨师 炒菜");
        SmallTool.sleepMillis(300);
        SmallTool.printTimeAndThread("厨师 打饭");
    });

    SmallTool.printTimeAndThread("小明 打游戏");
}
/* =>
1763980773336	|	1	|	main	|	小明 点餐: 番茄炒蛋 + 米饭
1763980773340	|	1	|	main	|	小明 打游戏
1763980773340	|	26	|	ForkJoinPool.commonPool-worker-1	|	厨师 炒菜
*/

# 1.3. thenCompose 连接

@Test
public void thenCompose_连接_两个异步方法顺序执行() {
    SmallTool.printTimeAndThread("小明 点餐: 番茄炒蛋 + 米饭");

    // 厨师炒完菜,服务员才去打饭,然后组合 菜和饭
    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("厨师 炒菜");
        SmallTool.sleepMillis(300);
        return "番茄炒蛋";
    }).thenCompose((dish) -> CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("服务员 打饭");
        SmallTool.sleepMillis(200);
        return String.format("%s + 米饭 做好了", dish);
    }));

    SmallTool.printTimeAndThread("小明 打游戏");

    String result = completableFuture.join();

    SmallTool.printTimeAndThread(String.format("%s, 小明吃饭", result));
}

输出:

1763904387656	|	1	|	main	|	小明 点餐: 番茄炒蛋 + 米饭
1763904387659	|	22	|	ForkJoinPool.commonPool-worker-25	|	厨师 炒菜
1763904387659	|	1	|	main	|	小明 打游戏
1763904387970	|	22	|	ForkJoinPool.commonPool-worker-25	|	服务员 打饭
1763904388172	|	1	|	main	|	番茄炒蛋 + 米饭 做好了, 小明吃饭

# 1.4. thenCombine 合并

拿到前面两个任务的结果进行加工处理

@Test
public void thenCombine_合并_多个异步方法并发执行() {
    SmallTool.printTimeAndThread("小明 点餐: 番茄炒蛋 + 米饭");

    // 厨师炒菜、服务员煮饭 同时进行,等都完毕后,再组合 菜和饭
    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("厨师 炒菜");
        SmallTool.sleepMillis(300);
        return "番茄炒蛋";
    }).thenCombine(CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("服务员 煮饭");
        SmallTool.sleepMillis(200);
        return "米饭";
    }), (dish, rice) -> {
        SmallTool.printTimeAndThread("服务员 打饭");
        SmallTool.sleepMillis(100);
        return String.format("%s + %s 做好了", dish, rice);
    });

    SmallTool.printTimeAndThread("小明 打游戏");

    String result = completableFuture.join();

    SmallTool.printTimeAndThread(String.format("%s, 小明吃饭", result));
}

输出:

1763904402238	|	1	|	main	|	小明 点餐: 番茄炒蛋 + 米饭
1763904402242	|	22	|	ForkJoinPool.commonPool-worker-25	|	厨师 炒菜
1763904402242	|	23	|	ForkJoinPool.commonPool-worker-18	|	服务员 煮饭
1763904402242	|	1	|	main	|	小明 打游戏
1763904402547	|	22	|	ForkJoinPool.commonPool-worker-25	|	服务员 打饭
1763904402656	|	1	|	main	|	番茄炒蛋 + 米饭 做好了, 小明吃饭

# 1.5. thenAcceptBoth 合并,无返回值

@Test
public void thenAcceptBoth_合并_拿到前面两个任务的结果进行加工处理() {
    // thenAcceptBoth: 拿到前面两个任务的结果进行加工处理,不需要返回值
    CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("任务1 执行");
        SmallTool.sleepMillis(300);
        return "任务1 结果";
    }).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("任务2 执行");
        SmallTool.sleepMillis(300);
        return "任务2 结果";
    }), (task1_result, task2_result) -> {
        SmallTool.printTimeAndThread(task1_result);
        SmallTool.printTimeAndThread(task2_result);
    });

    completableFuture.join();
}
/*=>
1763981675284	|	27	|	ForkJoinPool.commonPool-worker-2	|	任务2 执行
1763981675284	|	26	|	ForkJoinPool.commonPool-worker-1	|	任务1 执行
1763981675596	|	26	|	ForkJoinPool.commonPool-worker-1	|	任务1 结果
1763981675596	|	26	|	ForkJoinPool.commonPool-worker-1	|	任务2 结果
*/

# 1.6. thenAcceptBoth 合并,无返回值

@Test
public void runAfterBoth_合并_拿到前面两个任务的结果进行加工处理() {
    // thenAcceptBoth: 不需要前面两个任务的结果,不需要返回值
    CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("任务1 执行");
        SmallTool.sleepMillis(300);
        return "任务1 结果";
    }).runAfterBoth(CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("任务2 执行");
        SmallTool.sleepMillis(300);
        return "任务2 结果";
    }), () -> {
        SmallTool.printTimeAndThread("执行完毕");
    });

    completableFuture.join();
}
/*=>
1763981945286	|	26	|	ForkJoinPool.commonPool-worker-1	|	任务1 执行
1763981945286	|	27	|	ForkJoinPool.commonPool-worker-2	|	任务2 执行
1763981945601	|	27	|	ForkJoinPool.commonPool-worker-2	|	执行完毕
*/

# 1.7. thenApply

@Test
public void thenApply_() {
    SmallTool.printTimeAndThread("小明 结账、要求开发票");

    // 同一个线程,顺序执行多个方法,
    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("服务员A 收款");
        SmallTool.sleepMillis(300);
        return 500;
    }).thenApply((money) -> {
        SmallTool.printTimeAndThread(String.format("服务员A 开发票 —— 面额 %s 元", money));
        SmallTool.sleepMillis(200);
        return String.format("%s 元发票", money);
    });

    SmallTool.printTimeAndThread("小明 打游戏");

    String result = completableFuture.join();

    SmallTool.printTimeAndThread(String.format("小明拿到 %s, 准备回家", result));
}

输出:

1763904423520	|	1	|	main	|	小明 结账、要求开发票
1763904423524	|	1	|	main	|	小明 打游戏
1763904423524	|	22	|	ForkJoinPool.commonPool-worker-25	|	服务员A 收款
1763904423831	|	22	|	ForkJoinPool.commonPool-worker-25	|	服务员A 开发票 —— 面额 500 元
1763904424034	|	1	|	main	|	小明拿到 500 元发票, 准备回家

# 1.8. thenApplyAsync, 独立任务

@Test
public void thenApplyAsync_() {
    SmallTool.printTimeAndThread("小明 结账、要求开发票");

    // 不同的线程,顺序执行多个方法,与 thenCompose 类似
    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("服务员A 收款");
        SmallTool.sleepMillis(300);
        return 500;
    }).thenApplyAsync((money) -> {
        SmallTool.printTimeAndThread(String.format("服务员B 开发票 —— 面额 %s 元", money));
        SmallTool.sleepMillis(200);
        return String.format("%s 元发票", money);
    });

    SmallTool.printTimeAndThread("小明 打游戏");

    String result = completableFuture.join();

    SmallTool.printTimeAndThread(String.format("小明拿到 %s, 准备回家", result));
}

输出:

1763904472241	|	1	|	main	|	小明 结账、要求开发票
1763904472244	|	1	|	main	|	小明 打游戏
1763904472244	|	22	|	ForkJoinPool.commonPool-worker-25	|	服务员A 收款
1763904472545	|	23	|	ForkJoinPool.commonPool-worker-18	|	服务员B 开发票 —— 面额 500 元
1763904472749	|	1	|	main	|	小明拿到 500 元发票, 准备回家

# 1.9. thenAccept,无返回值

@Test
public void thenAccept_() {
    // thenAccept: 需要上一个任务的结果,但当前任务没有返回值
    CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("任务1 执行");
        SmallTool.sleepMillis(300);
        return "任务1 的结果";
    }).thenAccept((result) -> {
        SmallTool.printTimeAndThread("任务2 执行");
        SmallTool.sleepMillis(300);
    });

    // 等待异步任务执行完毕
    completableFuture.join();
}
/*=>
1763981134001	|	26	|	ForkJoinPool.commonPool-worker-1	|	任务1 执行
1763981134303	|	26	|	ForkJoinPool.commonPool-worker-1	|	任务2 执行
*/

# 1.10. thenRun,无入参无返回值

@Test
public void thenRun_() {
    // thenAccept: 不需要上一个任务的结果,且当前任务没有返回值
    CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
        SmallTool.printTimeAndThread("任务1 执行");
        SmallTool.sleepMillis(300);
        return "任务1 的结果";
    }).thenRun(() -> {
        SmallTool.printTimeAndThread("任务2 执行");
        SmallTool.sleepMillis(300);
    });

    // 等待异步任务执行完毕
    completableFuture.join();
}
/*=>
1763981299650	|	26	|	ForkJoinPool.commonPool-worker-1	|	任务1 执行
1763981299957	|	26	|	ForkJoinPool.commonPool-worker-1	|	任务2 执行
*/

# 1.11. applyToEither 及 exceptionally

@Test
public void applyToEither_() {
    SmallTool.printTimeAndThread("小明 走出餐厅,来到公交站");
    SmallTool.printTimeAndThread("小明 准备坐 700路 或 800路 公交");

    // 两个线程同时执行,哪个先执行完毕 就返回哪个的结果,末尾处理异常
    CompletableFuture<String> completableFuture = CompletableFuture
            .supplyAsync(() -> {
                SmallTool.printTimeAndThread("700路 公交正在赶来");
                SmallTool.sleepMillis(300);
                return "700路 到了";
            })
            .applyToEither(CompletableFuture.supplyAsync(() -> {
                SmallTool.printTimeAndThread("800路 公交正在赶来");
                SmallTool.sleepMillis(200);
                return "800路 到了";
            }), firstComeBus -> {
                if (true) {
                    throw new RuntimeException(String.format("%s,小明 刷公交卡时,发现公交卡余额不足,直接下了公交车", firstComeBus));
                }
                return firstComeBus;
            })
            .exceptionally((e) -> {
                SmallTool.printTimeAndThread(e.getMessage());
                SmallTool.printTimeAndThread("小明 呼叫出租车");
                return "出租车 到了";
            });


    String result = completableFuture.join();

    SmallTool.printTimeAndThread(String.format("%s, 小明坐车回家", result));
}

输出:

1763904539565	|	1	|	main	|	小明 走出餐厅,来到公交站
1763904539565	|	1	|	main	|	小明 准备坐 700路 或 800路 公交
1763904539569	|	23	|	ForkJoinPool.commonPool-worker-18	|	800路 公交正在赶来
1763904539569	|	22	|	ForkJoinPool.commonPool-worker-25	|	700路 公交正在赶来
1763904539779	|	23	|	ForkJoinPool.commonPool-worker-18	|	java.lang.RuntimeException: 800路 到了,小明 刷公交卡时,发现公交卡余额不足,直接下了公交车
1763904539779	|	23	|	ForkJoinPool.commonPool-worker-18	|	小明 呼叫出租车
1763904539779	|	1	|	main	|	出租车 到了, 小明坐车回家

# 1.12. handle

@Test
public void handle_() {
    // handle 处理上一任务的异常
    CompletableFuture<String> completableFuture = CompletableFuture
            .supplyAsync(() -> {
                SmallTool.printTimeAndThread("任务1 执行");
                SmallTool.sleepMillis(300);

                if (true) {
                    throw new RuntimeException("任务1 异常");
                }

                return "任务1 结果";
            })
            .handle((result, e) -> {
                if (e != null) {
                    return "任务1 异常处理,修正后的结果";
                }

                return result;
            });


    String result = completableFuture.join();

    SmallTool.printTimeAndThread(result);
}
/*=>
1763982424184	|	26	|	ForkJoinPool.commonPool-worker-1	|	任务1 执行
1763982424491	|	1	|	main	|	任务1 异常处理,修正后的结果
*/

# 1.13. whenComplete

用于处理上一任务的结果

@Test
public void whenComplete_() {
    // whenComplete 不处理异常,会继续往下传递异常
    CompletableFuture<String> completableFuture = CompletableFuture
            .supplyAsync(() -> {
                SmallTool.printTimeAndThread("任务1 执行");
                SmallTool.sleepMillis(300);

                if (true) {
                    throw new RuntimeException("任务1 异常");
                }

                return "任务1 结果";
            })
            .whenComplete((result, e) -> {
                if (e != null) {
                    SmallTool.printTimeAndThread("任务1 异常");
                    return;
                }
                SmallTool.printTimeAndThread(result);
            });


    String result = completableFuture.join();

    SmallTool.printTimeAndThread(result);
}
/*=>
1763982861502	|	26	|	ForkJoinPool.commonPool-worker-1	|	任务1 执行
1763982861814	|	26	|	ForkJoinPool.commonPool-worker-1	|	任务1 异常

java.util.concurrent.CompletionException: java.lang.RuntimeException: 任务1 异常
*/

# 1.14. 参数列表

大多数 xxx(arg) 方法,都有如下两个重载的方法:

  • xxxAsync(arg)
  • xxxAsync(arg, Executor)

比如 thenApply(arg) / thenApplyAsync(arg)

// thenApply 会将 任务1 和 任务2 合并,放在一个线程里执行
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(
    // 任务1
    () -> {
        return "任务1 的返回值";
    }
).thenApply(
    // 任务2
    (result) -> { // result: "任务1 的返回值"
        return "任务2 的返回值";
    }
);


// thenApplyAsync 任务1 执行完毕后,在一个新的线程里执行 任务2
// 任务2 是一个独立的任务
CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(
    // 任务1
    () -> {
        return "任务1 的返回值";
    }
).thenApplyAsync(
    // 任务2
    (result) -> { // result: "任务1 的返回值"
        return "任务2 的返回值";
    }
);

# 2. CompletableFuture 性能问题

# 2.1. ForkJoinPool

CompletableFuture 默认使用 ForkJoinPool 线程池

ForkJoinPool 的最大线程池数量为: CPU核心数-1

可以全局修改 ForkJoinPool 的最大线程数(无法动态修改),但不推荐。

@Test
public void testForkJoinPool() {
    // CompletableFuture 底层用到的 ForkJoinPool

    System.out.println("处理器核心数: " + Runtime.getRuntime().availableProcessors());
    //=> 20

    System.out.println("ForkJoinPool 当前线程数:" + ForkJoinPool.commonPool().getPoolSize());
    //=> 0

    System.out.println("ForkJoinPool 最大线程数:" + ForkJoinPool.getCommonPoolParallelism());
    //=> 19
}

@Test
public void ForkJoinPool_设置的最大线程数() {
    // 设置虚拟机参数
    //-Djava.util.concurrent.ForkJoinPool.common.parallelism=40

    // 或者在程序启动时设置
    System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "40");

    System.out.println("ForkJoinPool 最大线程数:" + ForkJoinPool.getCommonPoolParallelism());
    //=> 40
}

使用自定义的线程池来设置线程数量,用完后直接销毁线程池。

@Test
public void test1() {
    ExecutorService executor = Executors.newCachedThreadPool();
    try {
        long start = System.currentTimeMillis();
        int total = 100;

        SmallTool.printTimeAndThread("进餐厅点菜");

        CompletableFuture[] dishCompletableFutures = IntStream.rangeClosed(1, total)
                .mapToObj(i -> new Dish("菜" + i, 1))
                .map(dish -> CompletableFuture.runAsync(dish::make, executor))
                .toArray(CompletableFuture[]::new);

        CompletableFuture.allOf(dishCompletableFutures).join();

        long seconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - start);

        SmallTool.printTimeAndThread(String.format("做完 %s 道菜,耗时 %s 秒", total, seconds));
    } finally {
        executor.shutdown();
    }
}
/*=>
使用 睡眠 来模拟耗时是不准确的。
1763991372481	|	1	|	main	|	做完 100 道菜,耗时 1 秒
*/

public class Dish {
    /** 菜名  */
    private String name;
    /** 做菜时间()  */
    private Integer productionSeconds;

    public Dish(String name, Integer productionSeconds) {
        this.name = name;
        this.productionSeconds = productionSeconds;
    }

    public void make() {
        SmallTool.sleepMillis(TimeUnit.SECONDS.toMillis(this.productionSeconds));
        SmallTool.printTimeAndThread(this.name + " 制作完毕");
    }
}

# 3. 线程池

# 3.1. ThreadPoolExecutor 构造函数

public ThreadPoolExecutor(
        int corePoolSize,       // 核心线程数,最小线程数
        int maximumPoolSize,    // 最大线程数

        long keepAliveTime,     // 超过核心线程数的线程 的存活时间
        TimeUnit unit,

        BlockingQueue<Runnable> workQueue,  // 工作队列
        ThreadFactory threadFactory,        // 线程工厂
        
        // 拒绝策略,任务装满队列后如何处理
            // 1. 直接丢弃
            // 2. 替换工作队列中的最后一个
            // 3. 抛异常
            // 4. 谁提交的任务,谁去处理
            // 自行扩展...
        RejectedExecutionHandler handler    
) {

}

这个构造函数过于复杂,一般不使用

# 3.2. 创建线程池

/*
单线程线程池
new ThreadPoolExecutor(
        1,
        1,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>()
)
*/
Executors.newSingleThreadExecutor();

/*
缓存线程池,无上限,过多时会被操作系统限制
new ThreadPoolExecutor(
        0,
        Integer.MAX_VALUE,
        60L, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>()
)
*/
Executors.newCachedThreadPool();

/*
固定线程数量
new ThreadPoolExecutor(
    nThreads,
    nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>())
*/
Executors.newFixedThreadPool(3);


ExecutorService executor = Executors.newSingleThreadExecutor();

// Java8 之前 执行任务 的方法
// executor.execute();
// executor.submit()

# 4. 阻塞队列

  • LinkedBlockingQueue
    • 数据结构: 链表
    • 特点: 同时存取
  • ArrayBlockingQueue
    • 数据结构: 数组
    • 特点: 公平存储
  • SynchronousQueue
    • 数据结构: 没有容量
    • 特点: 双方就绪

# 4.1. 引入

阻塞队列的功能:

  1. 队列需要有一个固定的容量
  2. 如果队列是空的
    • 取不出来东西,需要等到有元素为止
  3. 如果队列是满的
    • 放不进去东西,需要等到有空位为止

LinkedBlockingQueue:

@Test
public void 放的时候满了_一直等待() {
    LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(1);

    try {
        linkedBlockingQueue.put("A");
        linkedBlockingQueue.put("B"); // 这里会一直等待
    } catch (InterruptedException e) {
        SmallTool.printTimeAndThread("放元素时被中断");
    }
}
@Test
public void 取的时候空了_一直等待() {
    LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(1);

    try {
        String item = linkedBlockingQueue.take();// 这里会一直等待
    } catch (InterruptedException e) {
        SmallTool.printTimeAndThread("取元素时被中断");
    }
}

好处:

  1. 不用考虑 多线程
  2. 不用考虑 变量被共享

# 4.2. 基础

说明:

  • 支持多生产者、多消费者

取:

  • take(): 死等
  • poll(Long, TimeUnit): 超时等
  • poll(): 不等

放:

  • put(E): 死等
  • offer(E, Long, TimeUnit): 超时等
  • offer(E): 不等

# 4.3. LinkedBlockingQueue

说明:

  • 支持同时存取
  • 有两把锁,一把控制消费者,一把控制生产者,所以生产者、消费者 可以同时操作这个队列

取的时候,设置超时时间,避免无限等待:

@Test
public void 取的时候空了_设置超时时间() {
    LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(1);

    try {
        SmallTool.printTimeAndThread("开始取元素");

        // 最多等 3 秒,超时返回 null
        String item = linkedBlockingQueue.poll(3, TimeUnit.SECONDS);

        if (item == null) {
            SmallTool.printTimeAndThread("超时未取到");
        }
    } catch (InterruptedException e) {
        SmallTool.printTimeAndThread("取元素时被中断");
    }
}

取的时候,有就取,没有就返回 null:

@Test
public void 取的时候空了_不等待() {
    LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(1);

    SmallTool.printTimeAndThread("开始取元素");

    String item = linkedBlockingQueue.poll();

    if (item == null) {
        SmallTool.printTimeAndThread("未取到");
    }
}

# 4.4. ArrayBlockingQueue

说明:

  • 支持公平
  • 只有一把锁

公平:

  • 先来的消费者,一定先取出来
  • 先来的生产者,一定先放进去

示例:

@Test
public void 消费者取的时候_不排队_后来的有可能先取到() throws InterruptedException {
    BlockingQueue<String> queue = new LinkedBlockingQueue<>(3);

    两个线程先后取元素(queue);
}
/* =>
1764069650882	|	26	|	Thread-0	|	消费者A 开始取
1764069650992	|	27	|	Thread-1	|	消费者B 开始取
1764069650993	|	27	|	Thread-1	|	消费者B 取到了: 苹果
*/

@Test
public void 消费者取的时候_排队_先来的先取() throws InterruptedException {
    BlockingQueue<String> queue = new ArrayBlockingQueue<>(3, true);

    两个线程先后取元素(queue);
}

private void 两个线程先后取元素(BlockingQueue<String> queue) throws InterruptedException {
    new Thread(() -> {
        SmallTool.printTimeAndThread("消费者A 开始取");
        try {
            String element = queue.poll(1, TimeUnit.SECONDS);
            SmallTool.printTimeAndThread(String.format("消费者A 取到了: %s", element));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }).start();

    Thread.sleep(100); // 模拟 消费者A 先到

    new Thread(() -> {
        SmallTool.printTimeAndThread("消费者B 开始取");
        try {
            String element = queue.poll(1, TimeUnit.SECONDS);
            SmallTool.printTimeAndThread(String.format("消费者B 取到了: %s", element));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }).start();

    queue.offer("苹果");
}

# 4.5. SynchronousQueue

说明:

  • 没有容量
  • 支持公平

示例:

@Test
public void 没有容量的阻塞队列() {
    BlockingQueue<String> queue = new SynchronousQueue<>();

    new Thread(() -> {
        try {
            SmallTool.printTimeAndThread("生产者 生产 苹果");
            queue.put("苹果");

            SmallTool.sleepMillis(500);

            SmallTool.printTimeAndThread("生产者 生产 梨子");
            queue.put("梨子");
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }).start();

    new Thread(() -> {
        try {
            SmallTool.printTimeAndThread(String.format("消费者 消费 %s", queue.take()));
            SmallTool.sleepMillis(100);
            SmallTool.printTimeAndThread(String.format("消费者 消费 %s", queue.take()));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }).start();

    SmallTool.sleepMillis(2000);
}
/*=>
1764071066601	|	26	|	Thread-0	|	生产者 生产 苹果
1764071066602	|	27	|	Thread-1	|	消费者 消费 苹果
1764071067114	|	26	|	Thread-0	|	生产者 生产 梨子
1764071067114	|	27	|	Thread-1	|	消费者 消费 梨子
*/

# 4.6. PriorityBlockingQueue

说明:

  • 先取小的
  • 元素要么实现 Comparable 接口,要么构造队列是传入比较器
  • 队列容量没有限制
  • 遍历队列时,元素是无序的

示例:

class Person {
    private String name;
    private Level level;

    public Level getLevel() {
        return level;
    }

    public Person(String name, Level level) {
        this.name = name;
        this.level = level;
    }
    @Override
    public String toString() {
        return String.format("{ name: \"%s\", level: %s }", name, level.name());
    }
}
@Test
public void 排序队列_从小到大排序_小的先出队() {
    BlockingQueue<Person> blockingQueue = new PriorityBlockingQueue<>(
            3, // 初始容量
            Comparator.comparing(Person::getLevel) // 比较器
    );

    Thread threadOne = new Thread(() -> {
        Person person1 = new Person("张三", Level.Three);
        Person person2 = new Person("李四", Level.One);
        Person person3 = new Person("王五", Level.Two);

        blockingQueue.offer(person1);
        SmallTool.printTimeAndThread("进: " + person1);

        blockingQueue.offer(person2);
        SmallTool.printTimeAndThread("进: " + person2);

        blockingQueue.offer(person3);
        SmallTool.printTimeAndThread("进: " + person3);
    }, "线程一");

    threadOne.start();

    try {
        threadOne.join(); // 等待执行完毕
    } catch (InterruptedException e) {
        SmallTool.printTimeAndThread("被中断" + e.getMessage());
    }

    new Thread(() -> {
        SmallTool.printTimeAndThread("出: " + blockingQueue.poll());
    }, "线程二").start();
}
/*
1764824138373	|	26	|	线程一	|	进: { name: "张三", level: Three }
1764824138373	|	26	|	线程一	|	进: { name: "李四", level: One }
1764824138373	|	26	|	线程一	|	进: { name: "王五", level: Two }

1764824138373	|	27	|	线程二	|	出: { name: "李四", level: One }
    */
本章目录