关键词搜索

源码搜索 ×
×

Java 并发工具合集 JUC 大爆发!!!

发布2023-04-18浏览656次

详情内容

并发工具类

通常我们所说的并发包也就是 java.util.concurrent (JUC),集中了 Java 并发的各种工具类, 合理地使用它们能帮忙我们快速地完成功能 。

1. CountDownLatch

CountDownLatch 是一个同步计数器,初始化的时候 传入需要计数的线程等待数,可以是需要等待执行完成的线程数,或者大于 ,一般称为发令枪。\

​ countdownlatch 是一个同步类工具,不涉及锁定,当 count 的值为零时当前线程继续运行,不涉及同步,只涉及线程通信的时候,使用它较为合适

1.1 作用

用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用),是一组线程等待其他的线程完成工作以后在执行,相当于加强版 join。

注意:这是一个一次性操作 - 计数无法重置。 如果你需要一个重置的版本计数,考虑使用 CyclicBarrier。

1.2 举例

​ 我们去组团游玩一样,总共 30 个人,来的人要等待还没有到的人,一直等到第 30 个人到了,我们才开始出发,在等待过程中,其他人(线程)是等待状态不做任何事情的,一直等所有人(线程)到齐了(准备完成)才开始执行。

1.3 概念

  • countDownLatch 这个类使一个线程等待其他线程各自执行完毕后再执行。
  • 是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就 - 1,当计数器的值为 0 时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。

我们打开 CountDownLatch 的源代码分析,我们发现最重要的方法就是一下这两个方法:

  1. //阻塞当前线程,等待其他线程执行完成,直到计数器计数值减到0。
  2. public void await() throws InterruptedException;
  3. //阻塞当前线程指定的时间,如果达到时间就放行,等待其他线程执行完成,直到计数器计数值减到0。
  4. public boolean await(long timeout, TimeUnit unit) throws InterruptedException
  5. //负责计数器的减一。
  6. public void countDown()

1.4 应用场景

1.4.1 多线程压测

有时我们想同时启动多个线程,实现最大程度的并行性。

​ 例如,我们想测试一个单例类。如果我们创建一个初始计数为 1 的 CountDownLatch,并让所有线程都在这个锁上等待,那么我们可以很轻松地完成测试。我们只需调用 一次 countDown () 方法就可以让所有的等待线程同时恢复执行。

1.4.2 等待其他线程

​ 例如应用程序启动类要确保在处理用户请求前,所有 N 个外部系统已经启动和运行了,例如处理 excel 中多个表单,如果一个一个出来很耗 IO 和性能,我们可以等 100 或者 1000 个线程都完成了表单的操作后一下子写进 excel 表单中。

注意:一个线程不一定只能做 countDown 一次,也可以 countDown 多次

1.5 示例

1.5.1 准备完成后执行

在实际项目中可能有些线程需要资源准备完成后才能进行执行,这个时候就可以使用 countDownLatch

  1. package chapter02.countdownlatch;
  2. import java.util.Random;
  3. import java.util.concurrent.*;
  4. /**
  5. * countdownlatch 示例
  6. */
  7. public class CountDownLatchTest {
  8. private static ExecutorService executorService = Executors.newFixedThreadPool(10);
  9. private static Random random = new Random();
  10. public static void execute(CountDownLatch countDownLatch) {
  11. //获取一个随机数
  12. long sleepTime = random.nextInt(10);
  13. //获取线程ID
  14. long threadId = Thread.currentThread().getId();
  15. System.out.println("线程ID" + threadId + ",开始执行--countDown");
  16. try {
  17. //睡眠随机秒
  18. Thread.sleep(sleepTime * 1000);
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. //计数器减1
  23. countDownLatch.countDown();
  24. System.out.println("线程ID" + threadId + ",准备任务完成耗时:" + sleepTime + "当前时间" + System.currentTimeMillis());
  25. try {
  26. //线程等待其他任务完成后唤醒
  27. countDownLatch.await();
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. System.out.println("线程ID" + threadId + ",开始执行任务,当前时间:" + System.currentTimeMillis());
  32. }
  33. public static void main(String[] args) throws InterruptedException {
  34. CountDownLatch countDownLatch = new CountDownLatch(5);
  35. for (int i = 0; i < 5; i++) {
  36. executorService.submit(() -> {
  37. execute(countDownLatch);
  38. });
  39. }
  40. //线程等待其他任务完成后唤醒
  41. countDownLatch.await();
  42. Thread.sleep(1000);
  43. executorService.shutdown();
  44. System.out.println("全部任务执行完成");
  45. }
  46. }

1.5.2 多线程压测

在实战项目中,我们除了使用 jemter 等工具进行压测外,还可以自己动手使用 CountDownLatch 类编写压测代码。

​ 可以说 jemter 的并发压测背后也是使用的 CountDownLatch,可见掌握 CountDownLatch 类的使用是有多么的重要, CountDownLatch 是 Java 多线程同步器的四大金刚之一,CountDownLatch 能够使一个线程等待其他线程完成各自的工作后再执行。

  1. package chapter02.countdownlatch;
  2. import java.util.concurrent.CountDownLatch;
  3. import java.util.concurrent.ExecutorService;
  4. import java.util.concurrent.Executors;
  5. /**
  6. * countDownLatch 压测
  7. */
  8. public class CountDownLatchPressure {
  9. /**
  10. * 压测业务代码
  11. */
  12. public void testLoad() {
  13. System.out.println("压测:" + Thread.currentThread().getId() + ":" + System.currentTimeMillis());
  14. }
  15. /**
  16. * 压测启动
  17. * 主线程负责压测线程准备工作
  18. * 压测线程准备完成后 调用 start.countDown(); 启动线程执行
  19. * @throws InterruptedException
  20. */
  21. private void latchTest() throws InterruptedException {
  22. //压测线程数
  23. int testThreads = 300;
  24. final CountDownLatch start = new CountDownLatch(1);
  25. final CountDownLatch end = new CountDownLatch(testThreads);
  26. //创建线程池
  27. ExecutorService exce = Executors.newFixedThreadPool(testThreads);
  28. //准备线程准备
  29. for (int i = 0; i < testThreads; i++) {
  30. //添加到线程池
  31. exce.submit(() -> {
  32. try {
  33. //启动后等待 唤醒
  34. start.await();
  35. testLoad();
  36. } catch (InterruptedException e) {
  37. e.printStackTrace();
  38. } finally {
  39. //压测完成
  40. end.countDown();
  41. }
  42. });
  43. }
  44. //连接池线程初始化完成 开始压测
  45. start.countDown();
  46. //压测完成后结束
  47. end.await();
  48. //关闭线程池
  49. exce.shutdown();
  50. }
  51. public static void main(String[] args) throws InterruptedException {
  52. CountDownLatchPressure countDownLatchPressure = new CountDownLatchPressure();
  53. //开始压测
  54. countDownLatchPressure.latchTest();
  55. }
  56. }

2. CyclicBarrier

2.1 简介

CyclicBarrier,是 JDK1.5 的 java.util.concurrent (JUC) 并发包中提供的一个并发工具类

C yclicBarrier 可以使一定数量的线程反复地在栅栏位置处汇集,当线程到达栅栏位置时将调用 await 方法,这个方法将阻塞直到所有线程都到达栅栏位置,如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。

2.2 举例

就像生活中我们会约朋友们到某个餐厅一起吃饭,有些朋友可能会早到,有些朋友可能会晚到,但是这个餐厅规定必须等到所有人到齐之后才会让我们进去。

​ 这里的朋友们就是各个线程,餐厅就是 CyclicBarrier,感觉和 CountDownLatch 是一样的,但是他们是有区别的,吃完饭之后可以选择去玩一会,去处理任务,然后等待第二次聚餐,重复循环。

2.3 功能

CyclicBarrier 和 CountDownLatch 是非常类似的,CyclicBarrier 核心的概念是在于设置一个等待线程的数量边界,到达了此边界之后进行执行。

​ CyclicBarrier 类是一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点(Common Barrier Point)。

​ CyclicBarrier 类是一种同步机制,它能够对处理一些算法的线程实现同。换句话讲,它就是一个所有线程必须等待的一个栅栏,直到所有线程都到达这里,然后所有线程才可以继续做其他事情。

​ 通过调用 CyclicBarrier 对象的 await() 方法,两个线程可以实现互相等待,一旦 N 个线程在等待 CyclicBarrier 达成,所有线程将被释放掉去继续执行。

2.4 构造方法

我们可以看下 CyclicBarrier 源码的构造方法

  1. public CyclicBarrier(int parties)
  2. public CyclicBarrier(int parties, Runnable barrierAction)

2.4.1 参数介绍

  • parties : 是参与线程的个数,其参数表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。

  • barrierAction : 优先执行线程,用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景,一般用于数据的整理以及汇总,例如 excel 插入一样,等所有线程都插入完了,到达了屏障后,barrierAction 线程开始进行保存操作,完成后,接下来由其他线程开始进行插入,然后到达屏障接着就是保存,不断循环。

CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的场景。

2.5 重要方法

我们上面介绍了构造方法,下面我们介绍下 CyclicBarrier 中重要的方法

  1. //阻塞当前线程,等待其他线程执行完成。
  2. public int await() throws InterruptedException, BrokenBarrierException
  3. //阻塞当前线程指定的时间,如果达到时间就放行,等待其他线程执行完成,
  4. public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
  • 线程调用 await () 表示自己已经到达栅栏
  • BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await () 时被中断或者超时

2.6 基本使用

一个线程组的线程需要等待所有线程完成任务后再继续执行下一次任务

  1. package chapter02.cyclicbarrier;
  2. import java.util.Random;
  3. import java.util.concurrent.BrokenBarrierException;
  4. import java.util.concurrent.CyclicBarrier;
  5. import java.util.concurrent.ExecutorService;
  6. import java.util.concurrent.Executors;
  7. public class CyclicBarrierTest {
  8. private static Random random = new Random();
  9. /**
  10. * 执行任务
  11. *
  12. * @param barrier
  13. */
  14. public static void execute(CyclicBarrier barrier) {
  15. //获取一个随机数
  16. long sleepTime = random.nextInt(10);
  17. //获取线程id
  18. long threadId = Thread.currentThread().getId();
  19. try {
  20. //睡眠随机秒
  21. Thread.sleep(sleepTime * 1000);
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. System.out.println("线程ID" + threadId + ",准备任务完成耗时:" + sleepTime + "当前时间" + System.currentTimeMillis());
  26. //线程等待其他任务完成后唤醒
  27. try {
  28. barrier.await();
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. } catch (BrokenBarrierException e) {
  32. e.printStackTrace();
  33. }
  34. System.out.println("线程ID" + threadId + ",开始执行任务,当前时间:" + System.currentTimeMillis());
  35. }
  36. public static void main(String[] args) {
  37. //初始化线程数量
  38. int threadNum = 5;
  39. //初始化一般的线程
  40. CyclicBarrier barrier = new CyclicBarrier(5, () -> System.out.println("整理任务开始..."));
  41. ExecutorService executor = Executors.newFixedThreadPool(threadNum);
  42. for (int i = 0; i < threadNum; i++) {
  43. executor.submit(() -> {
  44. execute(barrier);
  45. });
  46. }
  47. }
  48. }

2.7 CyclicBarrier 与 CountDownLatch 区别

  • CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的
  • CountDownLatch.await 一般阻塞工作线程,所有的进行预备工作的线程执行 countDown,而 CyclicBarrier 通过工作线程调用 await 从而自行阻塞,直到所有工作线程达到指定屏障,再大家一起往下走。
  • CountDownLatch 参与的线程的职责是不一样的,有的在倒计时,有的在等待倒计时结束。CyclicBarrier 参与的线程职责是一样的。
  • 在控制多个线程同时运行上,CountDownLatch 可以不限线程数量,而 CyclicBarrier 是固定线程数。
  • CyclicBarrier 还可以提供一个 barrierAction,合并多线程计算结果。

3. Semaphore

3.1 简介

Semaphore 也叫信号量,在 JDK1.5 被引入,可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。

Semaphore 内部维护了一组虚拟的许可,许可的数量可以通过构造函数的参数指定。

  • 访问特定资源前,必须使用 acquire 方法获得许可,如果许可数量为 0,该线程则一直阻塞,直到有可用许可。
  • 访问资源后,使用 release 释放许可。

​ Semaphore 是一种在多线程环境下使用的设施,该设施负责协调各个线程,以保证它们能够正确、合理的使用公共资源的设施,也是操作系统中用于控制进程同步互斥的量。Semaphore 是一种计数信号量,用于管理一组资源,内部是基于 AQS 的共享模式。它相当于给线程规定一个量从而控制允许活动的线程数。

​ 可以用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源

3.2 举例

​ 这里面令牌就像停车位一样,来了十辆车,停车位只有三个,只有三辆车能够进行,只有等其他车开走后,其他车才能开进去,和锁的不一样的地方是,锁一次只能进入一辆车,但是 Semaphore 允许一次进入很多车,这个令牌是可以调整的,随时可以增减令牌。

3.3 应用场景

Semaphore 是 synchronized 的加强版,作用是控制线程的并发数量。就这一点而言,单纯的 synchronized 关键字是实现不了的。

​ Semaphore 可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是 IO 密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有 10 个,这时我们必须控制只有 10 个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,就可以使用 Semaphore 来做流量控制

3.4 工作原理

以一个停车场是运作为例,为了简单起见,假设停车场只有三个车位,一开始三个车位都是空的。

​ 这时如果同时来了五辆车,看门人允许其中三辆不受阻碍的进入,然后放下车拦,剩下的车则必须在入口等待,此后来的车也都不得不在入口处等待。

​ 这时,有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开两辆,则又可以放入两辆,如此往复。

​ 这个停车系统中,每辆车就好比一个线程,看门人就好比一个信号量,看门人限制了可以活动的线程,假如里面依然是三个车位,但是看门人改变了规则,要求每次只能停两辆车,那么一开始进入两辆车,后面得等到有车离开才能有车进入,但是得保证最多停两辆车。

​ 对于 Semaphore 类而言,就如同一个看门人,限制了可活动的线程数。

3.5 构造方法

创建具有给定许可数的计数信号量并设置为非公平信号量

查看 Semaphore 源码发现他有这两个构造方法

  1. public Semaphore(int permits)
  2. public Semaphore(int permits, boolean fair)

3.5.1 参数介绍

  • permits 是设置同时允许通过的线程数

  • fair 等于 true 时,创建具有给定许可数的计数信号量并设置为公平信号量。

3.6 其他方法

Semaphore 类里面还有一些重要的方法

  1. //从此信号量获取一个许可前线程将一直阻塞。相当于一辆车占了一个车位
  2. public void acquire() throws InterruptedException
  3. //从此信号量获取给定数目许可,在提供这些许可前一直将线程阻塞。比如n=2,就相当于一辆车占了两个车位。
  4. public void acquire(int permits) throws InterruptedException
  5. //释放一个许可,将其返回给信号量。就如同车开走返回一个车位。
  6. public void release()
  7. //获取当前可用许可数
  8. public void release(int permits)
  9. //获取当前可用许可数
  10. public int availablePermits()

3.7 示例代码

共有 5 个车位但是有 100 个线程进行占用,车停几秒后会离开,释放车位给其他线程。

  1. package chapter02.semaphore;
  2. import java.util.Random;
  3. import java.util.concurrent.*;
  4. public class SemaphoreTest {
  5. private static ExecutorService executorService = Executors.newCachedThreadPool();
  6. private static Random random = new Random();
  7. //阻塞队列
  8. private static BlockingQueue<String> parks = new LinkedBlockingQueue<>(5);
  9. public static void execute(Semaphore semaphore) {
  10. //获取一个随机数
  11. long sleepTime = random.nextInt(10);
  12. long threadId = Thread.currentThread().getId();
  13. String park = null;
  14. try {
  15. /**
  16. * 获取许可,首先判断semaphore内部的数字是否大于0,如果大于0
  17. * 才能获得许可,然后将初始值5减去1,线程才会接着去执行;如果没有
  18. * 获得许可(原因是因为已经有5个线程获得到许可,semaphore内部的数字为0),
  19. * 线程会阻塞直到已经获得到许可的线程,调用release()方法,释放掉许可,
  20. * 也就是将semaphore内部的数字加1,该线程才有可能获得许可。
  21. */
  22. semaphore.acquire();
  23. /**
  24. * 对应的线程会到阻塞对,对应车辆去获取到车位,如果没有拿到一致阻塞,
  25. * 直到其他车辆归还车位。
  26. */
  27. park = parks.take();
  28. System.out.println("线程ID" + threadId + ",开始占用车位:" + park + ",当前剩余车位" + semaphore.availablePermits());
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. try {
  33. //睡眠随机秒
  34. Thread.sleep(sleepTime * 1000);
  35. } catch (InterruptedException e) {
  36. e.printStackTrace();
  37. }
  38. //归还车位
  39. parks.offer(park);
  40. System.out.println("线程ID" + threadId + ",开始归还车位:" + park + ",共占用" + sleepTime + "秒");
  41. //线程释放掉许可,通俗来将就是将semaphore内部的数字加1
  42. semaphore.release();
  43. }
  44. public static void main(String[] args) {
  45. //初始化线程数量
  46. int threadNum = 100;
  47. parks.offer("车位一");
  48. parks.offer("车位二");
  49. parks.offer("车位三");
  50. parks.offer("车位四");
  51. parks.offer("车位五");
  52. // 初始化5个许可证
  53. Semaphore semaphore = new Semaphore(5);
  54. //可以提前释放但是车位就会被多个线程同时占用
  55. //semaphore.release(5);
  56. for (int i = 0; i < threadNum; i++) {
  57. executorService.submit(() -> {
  58. execute(semaphore);
  59. });
  60. }
  61. }
  62. }

3.8 注意事项

即使创建信号量的时候,指定了信号量的大小 ,但是在通过 release () 操作释放信号量仍然能释放超过配置的大小,也就有可能同时执行的线程数量比最开始设置的要大,没有任何线程获取信号量的时候,依然能够释放并且释放的有效。

​ 推荐的做法是一个线程先 acquire 然后 release,如果释放线程和获取线程不是同一个,那么最好保证这种对应关系。不要释放过多的许可证。

4. Fork/Join

4.1 简介

java 下多线程的开发可以我们自己启用多线程,线程池,还可以使用 forkjoin,forkjoin 可以让我们不去了解诸如 Thread,Runnable 等相关的知识,只要遵循 forkjoin 的开发模式,就可以写出很好的多线程并发程序

​ Fork/Join 框架是 Java7 提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

​ Fork/Join 框架是一个实现了 ExecutorService 接口的多线程处理器。它可以把一个大的任务划分为若干个小的任务并发执行,充分利用可用的资源,进而提高应用的执行效率。

Fork/Join 框架简化了并行程序的原因有 :

  • 它简化了线程的创建,在框架中线程是自动被创建和管理。
  • 它自动使用多个处理器,因此程序可以扩展到使用可用处理器。

4.2 举例

​ 就像我需要处理一百万行的 excel,普通的处理是一个一个的 excel 进行处理,但是使用 Fork/Join 框架后的处理方式呢,加入我们定义 100 条数据为一个批次,那么 Fork/Join 就会拆分这个 excel 先到中间拆分成各有 50 万的数据,然后还比 100 大就继续拆分,不断的细分,最后分到了每一个线程分得到了 100 条然后才开始执行。

4.3 分而治之

“分而治之” 一直是一个有效的处理大量数据的方法。著名的 MapReduce 也是采取了分而治之的思想。

​ 简单来说,就是如果你要处理 1000 个数据,但是你并不具备处理 1000 个数据的能力,那么你可以只处理其中的 10 个,然后,分阶段处理 100 次,将 100 次的结果进行合成,那就是最终想要的对原始的 1000 个数据的处理结果。

​ 同时 forkjoin 在处理某一类问题时非常的有用,哪一类问题?分而治之的问题。十大计算机经典算法:快速排序、堆排序、归并排序、二分查找、线性查找、深度优先、广度优先、Dijkstra、动态规划、朴素贝叶斯分类,有几个属于分而治之?3 个,快速排序、归并排序、二分查找,还有大数据中 M/R 都是。

4.3.1 分治法的设计思想

​ 将一个难以直接解决的大问题,分割成一些规模较小的相同问题,以便各个击破,分而治之。

4.3.2 分治策略

​ 对于一个规模为 n 的问题,若该问题可以容易地解决(比如说规模 n 较小)则直接解决,否则将其分解为 k 个规模较小的子问题,这些子问题互相独立且与原问题形式相同 (子问题相互之间有联系就会变为动态规范算法),递归地解这些子问题,然后将各子问题的解合并得到原问题的解。这种算法设计策略叫做分治法。

4.4 Fork-Join 原理

​ Fork/Join 实现了 ExecutorService,所以它的任务也需要放在线程池中执行。它的不同在于它使用了工作窃取算法,空闲的线程可以从满负荷的线程中窃取任务来帮忙执行。

​ 由于线程池中的每个线程都有一个队列,而且线程间互不影响,那么线程每次都从自己的任务队列的头部获取一个任务出来执行。如果某个时候一个线程的任务队列空了,而其余的线程任务队列中还有任务,那么这个线程就会从其他线程的任务队列中取一个任务出来帮忙执行。就像偷取了其他人的工作一样

4.4.1 任务分割和合并

Fork/Join 框架的基本思想就是将一个大任务分解(Fork)成一系列子任务,子任务可以继续往下分解,当多个不同的子任务都执行完成后,可以将它们各自的结果合并(Join)成一个大结果,最终合并成大任务的结果

我们看下面这个图

​ 首先 main Task 先 fork 成 0,1 两个任务 接着,因为还是太大,继续 fork 成 0-0,0-1,1-0,1-1 然后进行计算计算完成后进行 join 操作,0-0,1-1 join 到 0, 1-0,1-1 join 到 1 然后 0 和 1 继续 join 到 mainTask,完成计算任务。

4.4.2 工作密取

即当前线程的 Task 已经全被执行完毕,则自动取到其他线程的 Task 池中取出 Task 继续执行即如果一个工作线程没有事情要做,它可以从其他仍然忙碌的线程窃取任务。

​ ForkJoinPool 中维护着多个线程(一般为 CPU 核数)在不断地执行 Task,每个线程除了执行自己职务内的 Task 之外,还会根据自己工作线程的闲置情况去获取其他繁忙的工作线程的 Task,如此一来就能能够减少线程阻塞或是闲置的时间,提高 CPU 利用率。

4.5 相关子类

​ 我们已经很清楚 Fork/Join 框架的需求了,那么我们可以思考一下,如果让我们来设计一个 Fork/Join 框架,该如何设计?这个思考有助于你理解 Fork/Join 框架的设计。

​ 第一步分割任务。首先我们需要有一个 fork 类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小。

​ 第二步执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

Fork/Join 使用两个类来完成以上两件事情:

4.5.1 ForkJoinTask

​ 我们要使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。它提供在任务中执行 fork () 和 join () 操作的机制,通常情况下我们不需要直接继承 ForkJoinTask 类,而只需要继承它的子类,Fork/Join 框架提供了以下两个子类:

4.5.1.1 RecursiveAction

用于没有返回结果的任务

4.5.1.2 RecursiveTask

用于有返回结果的任务。

4.5.2 ForkJoinPool

​ ForkJoinTask 需要通过 ForkJoinPool 来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务

4.6 Fork/Join 使用

​ Task 要通过 ForkJoinPool 来执行,使用 submit 或 invoke 提交,两者的区别是:invoke 是同步执行,调用之后需要等待任务完成,才能执行后面的代码;submit 是异步执行,join () 和 get 方法当任务完成的时候返回计算结果

​ 在我们自己实现的 compute 方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用 invokeAll 方法时,又会进入 compute 方法,看看当前子任务是否需要继续分割成孙任务,如果不需要继续分割,则执行当前子任务并返回结果。使用 join 方法会等待子任务执行完并得到其结果。

4.6.1 任务的提交逻辑

fork/join 其实大部分逻辑处理操作都集中在提交任务和处理任务这两块,了解任务的提交基本上后面就很容易理解了, fork/join 提交任务主要分为两种:

4.6.1.1 第一次提交到 forkJoinPool

  1. //创建初始化任务
  2. SubmitTask submitTask = new SubmitTask(start, end);
  3. //将初始任务扔进连接池中执行
  4. forkJoinPool.invoke(submitTask);

4.6.1.2 任务切分之后的提交

  1. //没有达到阈值 计算一个中间值
  2. long mid = (start + end) / 2;
  3. //拆分 左边的
  4. SubmitTask left = new SubmitTask(start, mid);
  5. //拆分右边的
  6. SubmitTask right = new SubmitTask(mid + 1, end);
  7. //添加到任务列表
  8. invokeAll(left, right);

4.6.1.3 合并任务

  1. //合并结果并返回
  2. return left.join() + right.join();

4.6.1.4 代码案例

  1. package chapter02.forkjoin;
  2. import java.util.concurrent.ForkJoinPool;
  3. import java.util.concurrent.RecursiveTask;
  4. /**
  5. * 计算 0-10000 阶乘
  6. */
  7. public class SubmitTask extends RecursiveTask<Long> {
  8. /**
  9. * 起始值
  10. */
  11. private long start;
  12. /**
  13. * 结束值
  14. */
  15. private long end;
  16. /**
  17. * 阈值
  18. */
  19. private long threshold = 10L;
  20. public SubmitTask(long start, long end) {
  21. this.start = start;
  22. this.end = end;
  23. }
  24. /**
  25. * 计算逻辑
  26. * 进行任务的拆分 以及 达到阈值的计算
  27. *
  28. * @return
  29. */
  30. @Override
  31. protected Long compute() {
  32. //校验是否达到了阈值
  33. if (isLessThanThreshold()) {
  34. //处理并返回结果
  35. return handle();
  36. } else {
  37. //没有达到阈值 计算一个中间值
  38. long mid = (start + end) / 2;
  39. //拆分 左边的
  40. SubmitTask left = new SubmitTask(start, mid);
  41. //拆分右边的
  42. SubmitTask right = new SubmitTask(mid + 1, end);
  43. //添加到任务列表
  44. invokeAll(left, right);
  45. //合并结果并返回
  46. return left.join() + right.join();
  47. }
  48. }
  49. /**
  50. * 处理的任务
  51. *
  52. * @return
  53. */
  54. public Long handle() {
  55. long sum = 0;
  56. for (long i = start; i <= end; i++) {
  57. sum += i;
  58. try {
  59. Thread.sleep(1);
  60. } catch (InterruptedException e) {
  61. e.printStackTrace();
  62. }
  63. }
  64. return sum;
  65. }
  66. /*是否达到了阈值*/
  67. private boolean isLessThanThreshold() {
  68. return end - start <= threshold;
  69. }
  70. /**
  71. * forkJoin 方式调用
  72. *
  73. * @param start
  74. * @param end
  75. */
  76. public static void forkJoinInvok(long start, long end) {
  77. long sum = 0;
  78. long currentTime = System.currentTimeMillis();
  79. //创建ForkJoinPool 连接池
  80. ForkJoinPool forkJoinPool = new ForkJoinPool();
  81. //创建初始化任务
  82. SubmitTask submitTask = new SubmitTask(start, end);
  83. //将初始任务扔进连接池中执行
  84. forkJoinPool.invoke(submitTask);
  85. //forkJoinPool.submit(submitTask);
  86. // System.out.println("异步方式,任务结束才会调用该方法,当前耗时"+(System.currentTimeMillis() - currentTime));
  87. //等待返回结果
  88. sum = submitTask.join();
  89. //forkjoin调用方式耗时
  90. System.out.println("forkJoin调用:result:" + sum);
  91. System.out.println("forkJoin调用耗时:" + (System.currentTimeMillis() - currentTime));
  92. }
  93. /**
  94. * 普通方式调用
  95. *
  96. * @param start
  97. * @param end
  98. */
  99. public static void normalInvok(long start, long end) {
  100. long sum = 0;
  101. long currentTime = System.currentTimeMillis();
  102. for (long i = start; i <= end; i++) {
  103. sum += i;
  104. try {
  105. Thread.sleep(1);
  106. } catch (InterruptedException e) {
  107. e.printStackTrace();
  108. }
  109. }
  110. //普通调动方式耗时
  111. System.out.println("普通调用:result:" + sum);
  112. System.out.println("普通调用耗时:" + (System.currentTimeMillis() - currentTime));
  113. }
  114. public static void main(String[] args) {
  115. //起始值的大小
  116. long start = 0;
  117. //结束值的大小
  118. long end = 10000;
  119. //forkJoin 调用
  120. forkJoinInvok(start, end);
  121. System.out.println("========================");
  122. //普通调用
  123. normalInvok(start, end);
  124. }
  125. }

原文:Java并发工具合集JUC大爆发!!! - 博学谷狂野架构师的个人空间 - OSCHINA - 中文开源技术交流社区 

相关技术文章

点击QQ咨询
开通会员
返回顶部
×
微信扫码支付
微信扫码支付
确定支付下载
请使用微信描二维码支付
×

提示信息

×

选择支付方式

  • 微信支付
  • 支付宝付款
确定支付下载