关键词搜索

源码搜索 ×
×

JAVA多线程之扩展ThreadPoolExecutor

发布2015-12-25浏览12186次

详情内容

欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。

欢迎跳转到本文的原文链接:https://honeypps.com/java/java-multi-thread-of-thread-pool-executor/

ThreadPoolExecutor是可扩展的,通过查看源码可以发现,它提供了几个可以在子类化中改写的方法:beforeExecute,afterExecute,terminated.

源码片段如下所示:

 

  1. protected void beforeExecute(Thread t, Runnable r) { }
  2. protected void afterExecute(Runnable r, Throwable t) { }
  3. protected void terminated() { }

可以注意到,这三个方法都是protected的空方法,摆明了是让子类扩展的嘛。

 

在执行任务的线程中将调用beforeExecute和afterExecute等方法,在这些方法中还可以添加日志、计时、监视或者统计信息收集的功能。无论任务是从run中正常返回,还是抛出一个异常而返回,afterExecute都会被调用。如果任务在完成后带有一个Error,那么就不会调用afterExecute。如果beforeExecute抛出一个RuntimeException,那么任务将不被执行,并且afterExecute也不会被调用。

在线程池完成关闭时调用terminated,也就是在所有任务都已经完成并且所有工作者线程也已经关闭后,terminated可以用来释放Executor在其生命周期里分配的各种资源,此外还可以执行发送通知、记录日志或者手机finalize统计等操作。

下面就以给线程池添加统计信息为例(添加日志和及时等功能):

 

  1. package com.threadPool;
  2. import java.util.concurrent.BlockingQueue;
  3. import java.util.concurrent.ThreadPoolExecutor;
  4. import java.util.concurrent.TimeUnit;
  5. import java.util.concurrent.atomic.AtomicLong;
  6. import java.util.logging.Logger;
  7. public class TimingThreadPool extends ThreadPoolExecutor
  8. {
  9. private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
  10. private final Logger log = Logger.getAnonymousLogger();
  11. private final AtomicLong numTasks = new AtomicLong();
  12. private final AtomicLong totalTime = new AtomicLong();
  13. public TimingThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
  14. BlockingQueue<Runnable> workQueue)
  15. {
  16. super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
  17. }
  18. protected void beforeExecute(Thread t, Runnable r){
  19. super.beforeExecute(t, r);
  20. log.info(String.format("Thread %s: start %s", t,r));
  21. startTime.set(System.nanoTime());
  22. }
  23. protected void afterExecute(Runnable r, Throwable t){
  24. try{
  25. long endTime = System.nanoTime();
  26. long taskTime = endTime-startTime.get();
  27. numTasks.incrementAndGet();
  28. totalTime.addAndGet(taskTime);
  29. log.info(String.format("Thread %s: end %s, time=%dns", t,r,taskTime));
  30. }
  31. finally
  32. {
  33. super.afterExecute(r, t);
  34. }
  35. }
  36. protected void terminated()
  37. {
  38. try
  39. {
  40. log.info(String.format("Terminated: avg time=%dns",totalTime.get()/numTasks.get()));
  41. }
  42. finally
  43. {
  44. super.terminated();
  45. }
  46. }
  47. }

可以看到TimingThreadPool重写了父类的三个方法。

 

下面写一个测试类,参考运行效果:

 

  1. package com.threadPool;
  2. import java.util.concurrent.SynchronousQueue;
  3. import java.util.concurrent.ThreadPoolExecutor;
  4. import java.util.concurrent.TimeUnit;
  5. public class CheckTimingThreadPool
  6. {
  7. public static void main(String[] args)
  8. {
  9. ThreadPoolExecutor exec = new TimingThreadPool(0, Integer.MAX_VALUE,
  10. 60L, TimeUnit.SECONDS,
  11. new SynchronousQueue<Runnable>());
  12. exec.execute(new DoSomething(5));
  13. exec.execute(new DoSomething(4));
  14. exec.execute(new DoSomething(3));
  15. exec.execute(new DoSomething(2));
  16. exec.execute(new DoSomething(1));
  17. exec.shutdown();
  18. }
  19. }
  20. class DoSomething implements Runnable{
  21. private int sleepTime;
  22. public DoSomething(int sleepTime)
  23. {
  24. this.sleepTime = sleepTime;
  25. }
  26. @Override
  27. public void run()
  28. {
  29. System.out.println(Thread.currentThread().getName()+" is running.");
  30. try
  31. {
  32. TimeUnit.SECONDS.sleep(sleepTime);
  33. }
  34. catch (InterruptedException e)
  35. {
  36. e.printStackTrace();
  37. }
  38. }
  39. }

运行结果:

 

 

十二月 25, 2015 4:18:42 下午 com.threadPool.TimingThreadPool beforeExecute
信息: Thread Thread[pool-1-thread-1,5,main]: start com.threadPool.DoSomething@43f459c2
十二月 25, 2015 4:18:42 下午 com.threadPool.TimingThreadPool beforeExecute
信息: Thread Thread[pool-1-thread-3,5,main]: start com.threadPool.DoSomething@33891d5d
pool-1-thread-3 is running.
十二月 25, 2015 4:18:42 下午 com.threadPool.TimingThreadPool beforeExecute
信息: Thread Thread[pool-1-thread-4,5,main]: start com.threadPool.DoSomething@33891d5d
pool-1-thread-4 is running.
十二月 25, 2015 4:18:42 下午 com.threadPool.TimingThreadPool beforeExecute
信息: Thread Thread[pool-1-thread-5,5,main]: start com.threadPool.DoSomething@10747b4
pool-1-thread-5 is running.
十二月 25, 2015 4:18:42 下午 com.threadPool.TimingThreadPool beforeExecute
信息: Thread Thread[pool-1-thread-2,5,main]: start com.threadPool.DoSomething@7d4af469
pool-1-thread-2 is running.
pool-1-thread-1 is running.
十二月 25, 2015 4:18:43 下午 com.threadPool.TimingThreadPool afterExecute
信息: Thread null: end com.threadPool.DoSomething@10747b4, time=999589906ns
十二月 25, 2015 4:18:44 下午 com.threadPool.TimingThreadPool afterExecute
信息: Thread null: end com.threadPool.DoSomething@33891d5d, time=1999461618ns
十二月 25, 2015 4:18:45 下午 com.threadPool.TimingThreadPool afterExecute
信息: Thread null: end com.threadPool.DoSomething@33891d5d, time=3000507593ns
十二月 25, 2015 4:18:46 下午 com.threadPool.TimingThreadPool afterExecute
信息: Thread null: end com.threadPool.DoSomething@7d4af469, time=3999691253ns
十二月 25, 2015 4:18:47 下午 com.threadPool.TimingThreadPool afterExecute
信息: Thread null: end com.threadPool.DoSomething@43f459c2, time=4999778490ns
十二月 25, 2015 4:18:47 下午 com.threadPool.TimingThreadPool terminated
信息: Terminated: avg time=2999805772ns

可以看到,在测试类CheckTimingThreadPool中通过execute了五个线程,然后分别对这五个线程进行统计,最后统计出各个线程的耗时平均时间。

 

这里说明下TimingThreadPool的构造函数,它直接调用了父类的构造方法,在ThreadPoolExecutor中有许多构造方法,有兴趣的朋友可以查看jdk api或者源码进行查看。

简要说明下构造函数的参数的含义:

corePoolSize:线程池维护线程的最少数量

maximumPoolSize:线程池维护线程的最大数量

keepAliveTime:线程池维护线程所允许的空闲时间

unit:线程池维护所允许的空闲时间的单位

workQueue:线程池所使用的缓存队列


欢迎跳转到本文的原文链接:https://honeypps.com/java/java-multi-thread-of-thread-pool-executor/

欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。

相关技术文章

点击QQ咨询
开通会员
返回顶部
×
微信扫码支付
微信扫码支付
确定支付下载
请使用微信描二维码支付
×

提示信息

×

选择支付方式

  • 微信支付
  • 支付宝付款
确定支付下载