关键词搜索

源码搜索 ×
×

Java并发编程线程任务返回值及顺序问题解决方案

发布2020-04-01浏览1357次

详情内容

目录

核心知识点 

无返回值并发执行顺序控制 

有返回值或先后顺序控制

Future FutureTask CompletionService CompletableFutrue比较

可回调实现方式

Callable

Supplier

并发工具类

CountDownLatch

CyclicBarrier

FutureTask

CompletionService

CompletableFuture


核心知识点 

无返回值并发执行顺序控制 

  • CountDownLatch
  • CyclicBarrier

有返回值或先后顺序控制

  • FutureTask
  • CompletionService
  • CompletableFuture

Future FutureTask CompletionService CompletableFutrue比较

下图参考自:https://blog.csdn.net/u011726984/article/details/79320004

可回调实现方式

Callable

接口定义:

  1. @FunctionalInterface
  2. public interface Callable<V> {
  3. /**
  4. * Computes a result, or throws an exception if unable to do so.
  5. *
  6. * @return computed result
  7. * @throws Exception if unable to compute a result
  8. */
  9. V call() throws Exception;
  10. }

接口实现:

  1. package thread;
  2. import java.util.Random;
  3. import java.util.concurrent.Callable;
  4. /**
  5. *
  6. * @function 功能:线程可回调任务
  7. * @author PJL
  8. * @package thread
  9. * @filename CountTask.java
  10. * @time 2020年4月1日 下午12:50:16
  11. */
  12. public class CallableTask implements Callable<Integer>{
  13. @Override
  14. public Integer call() throws Exception {
  15. int count=new Random().nextInt(10000);
  16. Thread.sleep(count);
  17. System.out.println("---"+count);
  18. return count;
  19. }
  20. }

Supplier

接口定义:

  1. @FunctionalInterface
  2. public interface Supplier<T> {
  3. /**
  4. * Gets a result.
  5. *
  6. * @return a result
  7. */
  8. T get();
  9. }

接口实现:

  1. package thread;
  2. import java.util.Random;
  3. import java.util.function.Supplier;
  4. /**
  5. *
  6. * @function 功能:可回调的Supplier顺序处理线程任务
  7. * @author PJL
  8. * @package thread
  9. * @filename SupplierTask.java
  10. * @time 2020年4月1日 下午1:06:27
  11. */
  12. public class SupplierTask implements Supplier<Integer>{
  13. @Override
  14. public Integer get() {
  15. int count=new Random().nextInt(10000);
  16. try {
  17. Thread.sleep(count);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. System.out.println("---"+count);
  22. return count;
  23. }
  24. }

并发工具类

CountDownLatch

CountDownLatch 中文名叫闭锁,通俗的叫法叫栅栏发令枪。它可以模拟用户同时到达开始并发执行业务的场景,比如秒杀。

  1. package thread;
  2. import java.text.SimpleDateFormat;
  3. import java.util.Date;
  4. import java.util.concurrent.CountDownLatch;
  5. /**
  6. * 同步辅助类CountDownLatch等待组线程执行完毕
  7. * @author admin
  8. * @link {@link https://www.cnblogs.com/liun1994/p/7396026.html}
  9. * @screen 场景:
  10. * <li>分布式计算进行汇总最终结果</li>
  11. * <li>主线程等待子线程执行完的操作</li>
  12. * <li>等待最终执行如初始化资源完成操作</li>
  13. * <li>等待最终执行如系统资源汇总计算</li>
  14. */
  15. public class WorkerCountDownLatch {
  16. final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  17. public static void main(String[] args) throws InterruptedException {
  18. CountDownLatch latch = new CountDownLatch(2);// 两个工人的协作
  19. Worker worker1 = new Worker("zhang san", 5000, latch);
  20. Worker worker2 = new Worker("li si", 8000, latch);
  21. worker1.start();//线程就绪
  22. worker2.start();//线程就绪
  23. latch.await();// 等待所有工人完成工作
  24. System.out.println("all work done at " + sdf.format(new Date()));
  25. }
  26. static class Worker extends Thread {
  27. String workerName;
  28. int workTime;
  29. CountDownLatch latch;
  30. public Worker(String workerName, int workTime, CountDownLatch latch) {
  31. this.workerName = workerName;
  32. this.workTime = workTime;
  33. this.latch = latch;
  34. }
  35. public void run() {
  36. System.out.println("Worker " + workerName + " do work begin at " + sdf.format(new Date()));
  37. doWork();// 工作了
  38. System.out.println("Worker " + workerName + " do work complete at " + sdf.format(new Date()));
  39. latch.countDown();// 工人完成工作,计数器减一
  40. }
  41. private void doWork() {
  42. try {
  43. Thread.sleep(workTime);
  44. } catch (InterruptedException e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. }
  49. }

测试结果:

  1. Worker zhang san do work begin at 2020-04-01 13:47:45
  2. Worker li si do work begin at 2020-04-01 13:47:45
  3. Worker zhang san do work complete at 2020-04-01 13:47:50
  4. Worker li si do work complete at 2020-04-01 13:47:53
  5. all work done at 2020-04-01 13:47:53

 

CyclicBarrier

CyclicBarrier内部采用了可重入锁ReentrantLock和Runnable接口,可以控制多任务循环串行。

  1. package thread;
  2. import java.util.concurrent.CyclicBarrier;
  3. /**
  4. *
  5. * @function 功能:CyclicBarrier允许线程之间互相等待
  6. * @author PJL
  7. * @package thread
  8. * @filename WorkerCyclicBarrier.java
  9. * @time 2020年4月1日 下午1:27:51
  10. */
  11. public class WorkerCyclicBarrier {
  12. static class TaskThread extends Thread {
  13. CyclicBarrier barrier;
  14. public TaskThread(CyclicBarrier barrier) {
  15. this.barrier = barrier;
  16. }
  17. @Override
  18. public void run() {
  19. try {
  20. // 循环执行A直到完成
  21. Thread.sleep(1000);
  22. System.out.println(getName() + " 执行任务A");
  23. barrier.await();
  24. System.out.println(getName() + " 执行任务 A完成");
  25. // 循环执行B直到完成
  26. Thread.sleep(2000);
  27. System.out.println(getName() + " 执行任务B");
  28. barrier.await();
  29. System.out.println(getName() + " 执行任务 B完成");
  30. } catch (Exception e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. }
  35. public static void main(String[] args) {
  36. int threadNum = 5;
  37. CyclicBarrier barrier = new CyclicBarrier(threadNum, new Runnable() {
  38. @Override
  39. public void run() {
  40. // 回调处理
  41. System.out.println(Thread.currentThread().getName() + " 完成最后任务");
  42. }
  43. });
  44. for (int i = 0; i < threadNum; i++) {
  45. new TaskThread(barrier).start();
  46. }
  47. }
  48. }

运行打印结果:

  1. Thread-0 执行任务A
  2. Thread-1 执行任务A
  3. Thread-2 执行任务A
  4. Thread-4 执行任务A
  5. Thread-3 执行任务A
  6. Thread-3 完成最后任务
  7. Thread-3 执行任务 A完成
  8. Thread-2 执行任务 A完成
  9. Thread-0 执行任务 A完成
  10. Thread-4 执行任务 A完成
  11. Thread-1 执行任务 A完成
  12. Thread-0 执行任务B
  13. Thread-4 执行任务B
  14. Thread-3 执行任务B
  15. Thread-2 执行任务B
  16. Thread-1 执行任务B
  17. Thread-1 完成最后任务
  18. Thread-1 执行任务 B完成
  19. Thread-2 执行任务 B完成
  20. Thread-3 执行任务 B完成
  21. Thread-4 执行任务 B完成
  22. Thread-0 执行任务 B完成

FutureTask

FutureTask 实现了RunnableFuture接口

  1. /**
  2. * A cancellable asynchronous computation. This class provides a base
  3. * implementation of {@link Future}, with methods to start and cancel
  4. * a computation, query to see if the computation is complete, and
  5. * retrieve the result of the computation. The result can only be
  6. * retrieved when the computation has completed; the {@code get}
  7. * methods will block if the computation has not yet completed. Once
  8. * the computation has completed, the computation cannot be restarted
  9. * or cancelled (unless the computation is invoked using
  10. * {@link #runAndReset}).
  11. *
  12. * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
  13. * {@link Runnable} object. Because {@code FutureTask} implements
  14. * {@code Runnable}, a {@code FutureTask} can be submitted to an
  15. * {@link Executor} for execution.
  16. *
  17. * <p>In addition to serving as a standalone class, this class provides
  18. * {@code protected} functionality that may be useful when creating
  19. * customized task classes.
  20. *
  21. * @since 1.5
  22. * @author Doug Lea
  23. * @param <V> The result type returned by this FutureTask's {@code get} methods
  24. */
  25. public class FutureTask<V> implements RunnableFuture<V> {

而RunnableFutrue接口继承自Runnable和Future接口

  1. /**
  2. * A {@link Future} that is {@link Runnable}. Successful execution of
  3. * the {@code run} method causes completion of the {@code Future}
  4. * and allows access to its results.
  5. * @see FutureTask
  6. * @see Executor
  7. * @since 1.6
  8. * @author Doug Lea
  9. * @param <V> The result type returned by this Future's {@code get} method
  10. */
  11. public interface RunnableFuture<V> extends Runnable, Future<V> {
  12. /**
  13. * Sets this Future to the result of its computation
  14. * unless it has been cancelled.
  15. */
  16. void run();
  17. }

FutureTask测试:

  1. package thread;
  2. import java.util.concurrent.ExecutionException;
  3. import java.util.concurrent.ExecutorService;
  4. import java.util.concurrent.Executors;
  5. import java.util.concurrent.FutureTask;
  6. /**
  7. *
  8. * @function 功能:FutureTask线程返回值获取方式
  9. * @author PJL
  10. * @package thread
  11. * @filename WorkerFutureTask.java
  12. * @time 2020年4月1日 下午1:11:41
  13. */
  14. public class WorkerFutureTask {
  15. public static void main(String[] args) throws InterruptedException, ExecutionException {
  16. ExecutorService executor = Executors.newCachedThreadPool();
  17. FutureTask<Integer> task1 = new FutureTask<Integer>(new CallableTask());
  18. FutureTask<Integer> task2 = new FutureTask<Integer>(new CallableTask());
  19. FutureTask<Integer> task3 = new FutureTask<Integer>(new CallableTask());
  20. executor.execute(task1);
  21. executor.execute(task2);
  22. executor.execute(task3);
  23. long start = System.currentTimeMillis();
  24. System.out.println("task1结果=" + task1.get());
  25. System.out.println("task2结果=" + task2.get());
  26. System.out.println("task3结果=" + task3.get());
  27. long end = System.currentTimeMillis();
  28. System.out.println("总共耗时=" + (end - start) + "ms");
  29. }
  30. }

测试结果:

  1. ---181
  2. ---2594
  3. task1结果=2594
  4. task2结果=181
  5. ---2816
  6. task3结果=2816
  7. 总共耗时=2817ms

 

注意:FutureTask的get()方法是阻塞的,不区分完成先后顺序,所以最长的时间可能是加起来所有任务耗时的总和。 

CompletionService

CompletionService 是通过线程干预的模式对线程完成状态进行监控,它是JDK并发包的接口,其实现是java.util.concurrent.ExecutorCompletionService,需要结合线程池来使用。

测试示例:

  1. package thread;
  2. import java.util.concurrent.CompletionService;
  3. import java.util.concurrent.ExecutionException;
  4. import java.util.concurrent.ExecutorCompletionService;
  5. import java.util.concurrent.ExecutorService;
  6. import java.util.concurrent.Executors;
  7. import java.util.concurrent.Future;
  8. /**
  9. *
  10. * @function 功能:线程完成顺序干预先后排序
  11. * @author PJL
  12. * @package thread
  13. * @filename WorkerCompletionService.java
  14. * @time 2020年4月1日 下午1:20:14
  15. */
  16. public class WorkerCompletionService {
  17. public static void main(String[] args) throws InterruptedException, ExecutionException {
  18. ExecutorService executor = Executors.newCachedThreadPool();
  19. CompletionService<Integer> completionService=new ExecutorCompletionService<Integer>(executor);
  20. Future<Integer> task1=completionService.submit(new CallableTask());
  21. Future<Integer> task2=completionService.submit(new CallableTask());
  22. Future<Integer> task3=completionService.submit(new CallableTask());
  23. long start = System.currentTimeMillis();
  24. System.out.println("task1结果=" + task1.get());
  25. System.out.println("task2结果=" + task2.get());
  26. System.out.println("task3结果=" + task3.get());
  27. long end = System.currentTimeMillis();
  28. System.out.println("总共耗时=" + (end - start) + "ms");
  29. }
  30. }

测试结果:

  1. ---1539
  2. task1结果=1539
  3. ---5169
  4. task2结果=5169
  5. ---9412
  6. task3结果=9412
  7. 总共耗时=9414ms

 

CompletableFuture

CompletableFuture 实现了Future和CompletionState接口,可实现线程先后完成排序。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {

测试示例:

  1. package thread;
  2. import java.util.concurrent.CompletableFuture;
  3. import java.util.concurrent.ExecutionException;
  4. import java.util.concurrent.ExecutorService;
  5. import java.util.concurrent.Executors;
  6. /**
  7. *
  8. * @function 功能:多线程任务按照完成先后顺序返回
  9. * @author PJL
  10. * @package thread
  11. * @filename WorkerCompletableFuture.java
  12. * @time 2020年4月1日 下午1:04:00
  13. */
  14. public class WorkerCompletableFuture {
  15. public static void main(String[] args) throws InterruptedException, ExecutionException {
  16. ExecutorService executor = Executors.newCachedThreadPool();
  17. CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(new SupplierTask(), executor);
  18. CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(new SupplierTask(), executor);
  19. CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(new SupplierTask(), executor);
  20. long start = System.currentTimeMillis();
  21. System.out.println("task1结果=" + task1.get());
  22. System.out.println("task2结果=" + task2.get());
  23. System.out.println("task3结果=" + task3.get());
  24. long end = System.currentTimeMillis();
  25. System.out.println("总共耗时=" + (end - start) + "ms");
  26. }
  27. }

测试结果:

  1. ---609
  2. ---782
  3. task1结果=782
  4. task2结果=609
  5. ---5614
  6. task3结果=5614
  7. 总共耗时=5615ms

 

相关技术文章

点击QQ咨询
开通会员
返回顶部
×
微信扫码支付
微信扫码支付
确定支付下载
请使用微信描二维码支付
×

提示信息

×

选择支付方式

  • 微信支付
  • 支付宝付款
确定支付下载