【Java 并发编程】主线程等待子线程的多种方法
2022年4月25日大约 4 分钟
提醒:
1 部分案例使用线程池创建线程。方便起见使用 Executors.newFixedThreadPool()方法创建一个固定大小的线程池。
2 Runnable 使用 Lambda 表达式创建
3 代码在 main()方法中执行,出于方便演示,代码中有几处不规范的地方
4 JDK 使用 17 版本
1.Thread sleep()
public static void main(String[] args) throws Exception {
long timeout1 = 1000;
long timeout2 = timeout1 + 500;
new Thread(() -> {
try {
Thread.sleep(timeout1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
// 满足 timeout2 > timeout1 即可
Thread.sleep(timeout2);
System.out.println("主线程执行!\ndo something...");
}方法问题太多,不可取,仅作为参考
2.Thread join()
private static void test06() throws Exception {
CopyOnWriteArrayList<Thread> list = new CopyOnWriteArrayList<>();
for (int i = 0; i < 3; i++) {
Thread thread = new Thread(() -> {
try {
// 随机生成睡眠时间
long timeout = new Random().nextLong(1000, 3000);
TimeUnit.MILLISECONDS.sleep(timeout);
System.out.printf("%s 子线程执行完毕!耗时:%sms\n", Thread.currentThread().getName(), timeout);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
list.add(thread);
thread.start();
}
for (Thread thread : list) {
thread.join();
}
System.out.println("主线程执行!\ndo something...");
}3.synchronized 等待唤醒机制
public static void main(String[] args) throws Exception {
Object lock = new Object();
// 启动子线程
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("子线程执行完毕");
// 获取对象锁
synchronized (lock) {
// 子线程唤醒
lock.notify();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
synchronized (lock) {
// 主线程等待
lock.wait();
}
System.out.println("主线程执行!\ndo something...");
}4.ExecutorService isTerminated() + while 轮询判断
思路:所有任务提交后,调用线程池的 shutdown()方法,然后在死循环里每隔几秒调用一次线程池的 isTerminated()
方法,判断所有线程在线程池关闭后是否都已完成。需要注意的是调用 isTerminated()前一定要先调用 shutdown()或 shutdownNow()
方法,原因可以在 isTerminated()的源码中找到,位于 java.util.concurrent.ExecutorService 198 行左右,内容如下:
翻译为中文:如果所有任务在关闭后都已完成,则返回 true。请注意,除非首先调用了 shutdown 或 shutdownNow,否则 isTerminated
永远不会为真。
实现代码:
public static void main(String[] args) throws Exception {
int n = 3;
String[] tasks = {"发送短信消息完毕", "发送微信消息完毕", "发送邮箱消息完毕"};
int[] executeTimes = new int[]{2, 5, 1};
ExecutorService threadPool = Executors.newFixedThreadPool(n);
long start = System.currentTimeMillis();
for (int i = 0; i < n; i++) {
int finalI = i;
threadPool.execute(() -> {
try {
TimeUnit.SECONDS.sleep(executeTimes[finalI]);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(tasks[finalI]);
});
}
threadPool.shutdown();
// 关键代码
while (true) {
if (threadPool.isTerminated()) {
break;
} else {
// 每隔1s判断一次
TimeUnit.SECONDS.sleep(1);
}
}
System.out.println("所有消息都发送完毕了,执行主线程任务。\n耗时ms:" + (System.currentTimeMillis() - start));
}缺点:若线程池中的任务完成耗时不确定,则不能及时执行主线程的任务
5.ExecutorService awaitTermination()
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
executorService.execute(() -> {
long timeout = new Random().nextLong(1000, 3000);
try {
TimeUnit.MILLISECONDS.sleep(timeout);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.printf("%s 子线程执行完毕!耗时:%sms\n", Thread.currentThread().getName(), timeout);
});
}
executorService.shutdown();
// 阻塞当前线程,直到所有已提交的任务完成执行,或者发生超时,或者当前线程中断,以先发生者为准。
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
System.out.println("主线程执行!\ndo something...");
}6.Future
private static void test03() throws Exception{
ExecutorService executorService = Executors.newFixedThreadPool(1);
Future<String> future = executorService.submit(() -> {
try {
System.out.println("do something...");
TimeUnit.SECONDS.sleep(1);
System.out.println("子线程任务完成");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "子线程返回的结果";
});
System.out.println(future.get());
executorService.shutdown();
System.out.println("主线程执行!");
}7.CountDownLatch
public static void main(String[] args) throws Exception {
int n = 3;
String[] tasks = {"发短信完毕", "发微信完毕", "发QQ完毕"};
int[] executeTimes = new int[]{2, 5, 1};
CountDownLatch countDownLatch = new CountDownLatch(n);
ExecutorService executorService = Executors.newFixedThreadPool(n);
long start = System.currentTimeMillis();
for (int i = 0; i < n; i++) {
int finalI = i;
executorService.submit(() -> {
try {
TimeUnit.SECONDS.sleep(executeTimes[finalI]);
System.out.println(tasks[finalI]);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
System.out.println("所有消息都发送完毕了,执行主线程任务。\n耗时ms:" + (System.currentTimeMillis() - start));
// 不要忘记关闭线程池,不然会导致主线程阻塞无法退出
executorService.shutdown();
}8.CyclicBarrier
private static void test07() throws Exception{
CyclicBarrier cyclicBarrier = new CyclicBarrier(6);
// 最后一个达到屏障的线程留给主线程,因此循环5次
for (int i = 0; i < 5; i++) {
new Thread(() -> {
long timeout = new Random().nextLong(1000, 3000);
try {
TimeUnit.MILLISECONDS.sleep(timeout);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.printf("%s 子线程执行完毕!耗时:%sms\n", Thread.currentThread().getName(), timeout);
// 到达屏障
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
}
}).start();
}
cyclicBarrier.await();
System.out.println("主线程执行!\ndo something...");
}9.BlockingQueue
public static void main(String[] args) throws Exception {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(1);
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
queue.put("OK");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
System.out.println(queue.take());
System.out.println("主线程执行!\ndo something...");
}10.CompletableFuture(本质还是 Future 的 get 方法)
public static void main(String[] args) throws Exception {
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("cf1 任务完成");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
CompletableFuture<Void> cf2 = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("cf2 任务完成");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
CompletableFuture<Void> cf3 = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(0);
System.out.println("cf3 任务完成");
// int n = 1 / 0;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
CompletableFuture<Void> allOf = CompletableFuture.allOf(cf1, cf2, cf3);
// 若子任务都未发生异常则返回null,否则返回异常。此步会阻塞主线程
System.out.println(allOf.get());
System.out.println("主线程执行!");
}
11.LockSupport(23.03.17 更新)
public static void main(String[] args) {
Thread mainThread = Thread.currentThread();
new Thread(() -> {
try {
System.out.println("子线程开始执行");
// 模拟子线程执行任务
TimeUnit.SECONDS.sleep(2);
System.out.println("子线程执行完毕!");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 如果线程在 park 上被阻塞,那么它将解除阻塞
LockSupport.unpark(mainThread);
}).start();
// 禁用当前线程(主线程)
LockSupport.park();
System.out.println("主线程执行!\ndo something...");
}