前言:说起threadpoolexector应该大家多少都接触过,现在我详细的讲解下其的用法
一:解析参数
为了更好地理解threadpoolexecutor,我先讲一个例子,话说一个工作多年的高T,一天突然决定自己要单干组织一个团队,经过仔细的考虑他做出了如下的决定
1、团队的核心人员为10个
2、如果一旦出现项目过多人员不足的时候,则会聘请5个外包人员
3、接的项目单子最多堆积100
4、如果项目做完了团队比较空闲,则裁掉外包人员,空闲的时间为一个月
5、如果接的单子超过100个,则后续考虑一些兜底策略(比如拒绝多余的单子,或者把多出100个以外的单子直接交付第三方公司做)
6、同时他还考虑了如果效益一直不好,那么就裁掉所有人,宣布公司直接倒闭
上面的例子恰恰和我们的线程池非常的像,我们来看下threadpoolexecutor的定义。
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler);
corePoolSize:核心线程数(就是我们上面说的一个公司核心人员)
maximumPoolSize:最大线程数(就是我们说的一旦公司接收的单子过多则聘请外包,此时也是公司最大的人员了,因为人多了办公地方不够了)
keepAliveTime:超多核心线程数之外线程的存活时间(就是如果公司一旦活不多要多久进行裁掉外包人员)
unit:上面时间的单元(可以年月日时分秒等)
workQueue:任务队列(就是如果公司最大能存的单子)
handler:拒绝策略(就是一旦任务满了应该如果处理多余的单子)
allowCoreThreadTimeOut:设置是否清理核心线程(如果设置true,如果任务少于实际执行的线程则会清理核心线程,默认为false)
二:实际演练
先验证核心线程数
public class ThreadPoolExecutorTest { public static void main(String[] args) throws InterruptedException { RejectedExecutionHandler handler = new RejectedExecutionHandlerImpl(); ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 5, 20L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(9)); for (int i = 0; i < 11; i++) { AppTask appTask = new AppTask(i); poolExecutor.execute(appTask); System.out.println("线程池中线程的数目:" + poolExecutor.getPoolSize() + ",线程池中等待的队列数目:" + poolExecutor.getQueue().size() + ";线程池中已执行完毕的任务数据:" + poolExecutor.getCompletedTaskCount()); } poolExecutor.allowCoreThreadTimeOut(true); if (!poolExecutor.isShutdown()) { poolExecutor.shutdown(); } } static class AppTask implements Runnable { private int taskNum; public AppTask(int num) { this.taskNum = num; } @Override public void run() { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"_task_" + this.taskNum + ":执行完毕"); } } static LinkedBlockingQueue<Runnable> xs = new LinkedBlockingQueue(10000); static int i = 0; static class RejectedExecutionHandlerImpl implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { xs.put(r); } catch (InterruptedException e) { e.printStackTrace(); } } }
代码块1
那么我们来看看运行的结果
我们可以看出线程池此时只有2个线程,足矣说明当队列的数+核心线程数<=任务数,则线程池中只会有核心线程工作。
如果把上述代码中for循环中最大值改为14呢,那么我们在来看运行的结果
我们可以看的出线程池中的线程已经是5个了,说明当任务>队列最大值+核心线程数的时候线程池则会生成新的线程来处理任务。
上面我们基本弄明白了核心线程数,最大线程数这些概念,现在我们再来看2个比较重要的参数队列和拒绝策略
队列
1、直接提交(SynchronousQueue,不会保存任何任务)
2、无界队列(LinkedBlockingQueue,对于新加来的任务全部存入队列中,量大可能会导致oom)
3、有界队列(ArrayBlockingQueue,队列有一个最大值,超过最大值的任务交给拒绝策略处理)
拒绝策略
1、当线程池中的数量等于最大线程数时对于新传入的任务则抛异常(AbortPolicy)
2、当线程池中的数量等于最大线程数时抛弃消息,不做任务处理(DiscardPolicy)
3、当线程池中的数量等于最大线程数时主线程处理新传入的任务消息(CallerRunsPolicy)
4、当线程池中的数量等于最大线程数时 、抛弃线程池中最后一个要执行的任务,并执行新传入的任务(DiscardOldestPolicy)
队列这里就不多说了大家可以自己去实践,这里我们主要说拒绝策略,现在线程池默认的是第一种拒绝策略,直接抛异常,第二种不做处理的这种我们也不提了,我们主要来看下第三种和第四种拒绝策略,先看下第三种拒绝策略的源码
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
代码块2
从中我们可以看出新传入的任务并没有交接线程池来处理,直接交给主线程来处理的,看我下面这块代码以及执行结果
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 5, 20L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),new ThreadPoolExecutor.CallerRunsPolicy()); for (int i = 0; i < 12; i++) { AppTask appTask = new AppTask(i); poolExecutor.execute(appTask); System.out.println("线程池中线程的数目:" + poolExecutor.getPoolSize() + ",线程池中等待的队列数目:" + poolExecutor.getQueue().size() + ";线程池中已执行完毕的任务数据:" + poolExecutor.getCompletedTaskCount()); } poolExecutor.allowCoreThreadTimeOut(true); if (!poolExecutor.isShutdown()) { poolExecutor.shutdown(); }
代码块3
运行结果
从结果显示对于多出队列的任务则由主线程来执行,主线程执行完毕后,由于队列被释放了一些任务,新来的任务又会交给线程池来处理。
第四种情况我们可以看下源码
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }
代码块4
我们可以看出先执行新进入的任务,然后将队列头的任务抛弃,这样会导致一部分消息丢失
三:使用场景
1、异步处理日志,这个是比较场景的采用线程池来解决的
2、定时任务,定时对数据库备份、定时更新redis配置等,定时发送邮件
3、数据迁移
这些常见的一些场景我们就应该优先来考虑线程池来解决
四:堵塞线程池和非堵塞线程池
这里我说下通过日志处理方式来讲解堵塞线程池和非堵塞线程池.日志的作用便于我们分析问题,但是对于服务本身而言却不是必须的,这也是为什么我们一般都会异步来处理日志的情况。
1、堵塞线程池
在上面几种几种队列中,如果我们选取有有界队列来,拒绝策略可以采用CallerRunsPolicy,这样一来就不会出现消息丢失、内存溢出等问题,当然我们也可以重写拒绝策略,我们来看下面一段代码
RejectedExecutionHandler handler = new RejectedExecutionHandlerImpl(); ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 5, 20L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),handler); for (int i = 0; i < 12; i++) { AppTask appTask = new AppTask(i); poolExecutor.execute(appTask); System.out.println("线程池中线程的数目:" + poolExecutor.getPoolSize() + ",线程池中等待的队列数目:" + poolExecutor.getQueue().size() + ";线程池中已执行完毕的任务数据:" + poolExecutor.getCompletedTaskCount()); } poolExecutor.allowCoreThreadTimeOut(true); if (!poolExecutor.isShutdown()) { poolExecutor.shutdown(); } static int count=0; static class RejectedExecutionHandlerImpl implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { executor.getQueue().put(r); count++; System.out.println("阻塞队列中个数:"+count); } catch (InterruptedException e) { e.printStackTrace(); } } }
代码块5
运行结果如下
我们可以看出所有的任务都执行完毕了,因为拒绝策略中我们把新进入的任务在次放入队列中,我们用put这个方法,这个是java提供给我们的阻塞队列,如果满了就会一直等待直到队列中其他任务被释放。
优点:
1、不会造成内存溢出
2、不会出现消息丢失
3、对消费者来说并不需要非常复杂的处理就能满足业务需求
缺点:
对生成者来说就要变得复杂的多,如果消息量qps过高消费者消费能力不足(如果消费者不足以处理生产者的任务则会堵塞等待,那么生产者肯定得不到响应,会报出超时异常),如果不采用一些措施将会导致消息丢失,所以对生成者来说必须要持久化的记录消息,而且设置最大量,如果出现超过最大量的80%则报警,所以在设计生产者的时候必须考虑这样的场景,
总结:
堵塞线程池其实就是把消费者消费能力不足的压力交给生产者来处理
2、非堵塞线程池
非堵塞线程池说白就是消息来了就处理,处理不足则进行存储,或者记录日志,我们看如下的代码
RejectedExecutionHandler handler = new RejectedExecutionHandlerImpl(); ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 5, 20L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5), handler); for (int i = 0; i < 12; i++) { AppTask appTask = new AppTask(i); poolExecutor.execute(appTask); System.out.println("线程池中线程的数目:" + poolExecutor.getPoolSize() + ",线程池中等待的队列数目:" + poolExecutor.getQueue().size() + ";线程池中已执行完毕的任务数据:" + poolExecutor.getCompletedTaskCount()); } MainThread.exec(); poolExecutor.allowCoreThreadTimeOut(true); if (!poolExecutor.isShutdown()) { poolExecutor.shutdown(); } static LinkedBlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue(10000); static class RejectedExecutionHandlerImpl implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { blockingQueue.put(r); } catch (InterruptedException e) { e.printStackTrace(); } } } static int count=0; static class MainThread { public static void exec() throws InterruptedException { while (true) { if (blockingQueue.size() >= 8000) { System.out.println("报警"); } Runnable r = blockingQueue.poll(); if (r == null) { Thread.sleep(1000); continue; } count++; r.run(); System.out.println("非堵塞线程池执行的个数:"+count); } } }
代码块6
执行结果
有人问和上面有什么区别呢,其实区别就是这个不会出现堵塞,这里我虽然采用堵塞队列来存储,为了更好的展示,其实这里你可以打印日志,或者存入redis中然后进行处理。
优点:
1、也不会造成内存溢出
2、消费不会出现堵塞
缺点:
这样设计明显会复杂的很多,而且获取消息值不易,就我目前来看我更倾向于采用堵塞线程池
五:线程池的设计
线程池的设计最重要的一点就是计算出生产者最大的qps量和单个线程消费的能力,比如我们生产者qps是1万,但是我们单个线程处理每个任务的时间是2毫秒,如果我们cpu是4核,那么我们核心线程是4*2=8个,所以我们每秒处理的任务数是1000/2*8=4000,很显然我们的消费能力远远不足,这个时候我们应该考虑采用多台机器处理,有人不是可以堵塞队列么,其实那是一种兜底策略,避免消息丢失,但这并不是我们设计的核心。如果我们能计算出单条消息的大小(如1k)我们分配这个消息服务的内存是300M,那么我们可以做个折中150M来存储多余的消息,那么可以存储的量是1百万,如果我们的流量高峰是30分钟,每秒处理剩余的消息是200,那么这半小时之内总共剩余的消息总量是30*60*200=360000,这样一来完全可以满足我们的业务需求。
六:线程池的有点
1、减少频繁的创建和销毁线程(由于线程创建和销毁都会耗用一定的内存)
2、线程池也是多线程,充分利用CPU,提高系统的效率
七:Executors下的4种创建线程池的方式
1、newSingleThreadExecutor();
这个线程池的特点是核心线程数只有一个,最大线程数也只有一个,采用的队列是无界队列,可能会导致内存溢出
2、newFixedThreadPool(5)
这个线程池特点是核心线程数由自己设置并且一旦设置就是固定的不在改变,采用的队列是无界队列,可能会导致内存溢出
3、newCachedThreadPool()
这个线程池特点是核心线程数为0,最大线程数无界,这样也会造成内存溢出
4、newScheduledThreadPool(5)
这个线程池特点是可以周期性执行任务,核心线程数需要自己进行初始化,最大线程数无界,这样也会造成内存溢出
我们在实际操作中,尽量避免使用Executors来创建多线程,因为如果消息量过大会导致内存溢出,消息丢失
最新评论