目录
我们都知道线程的创建与销毁需要一定的开销,如果每个任务都需要创建一个线程将会消耗大量的计算资源,JDK 5之后把工作单元和执行机制区分开了,工作单元包括Runnable和Callable,而执行机制则由Executor框架提供。Executor框架为线程的启动、执行和关闭提供了便利,底层使用线程池实现。同时,Executor 的实现还提供了对任务执行的生命周期管理的支持。使用Executor框架管理线程的好处在于简化管理、提高效率。
从代码上看,Executor 是一个简单的接口,但它却是整个异步任务执行框架的基础,这个框架能支持多种不同类型的任务执行策略。他提供了一种标准的方法将任务的提交过程和执行过程解耦开来,任务用 Runnable 来表示。Executor 基于生产者-消费者模式,提交任务的线程相当于生产者,执行任务的线程相当于消费者。
简而言之,Executor框架实现了工作单元与执行单元的分离。
一、两级调度模型
示意图:
Executor框架使用了两级调度模型进行线程的调度。在上层,Java多线程程序通常把应用分解为多个任务,然后使用用户调度框架Executor将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。
二、Executor框架的结构
任务的产生:包括被执行任务需要实现的接口:Runnable接口或者Callable接口
首先主线程创建实现Runnable或者Callable接口的任务对象,工具类Executors可以把一个Runnable对象封装成一个Callable对象,使用Executors.callable(Runnable task)。
任务的执行:包括执行机制的核心接口Executor,以及继承Executor的ExecutorService接口。Executor框架有两个关键类,ThreadPoolExecutor和ScheduledThreadPoolExecutor。这两种线程池类均可以通过工厂类Executors来创建。
将任务对象交给ExecutorService用execute或者submit执行,execute不返回结果,submit可以返回实现了Future接口的对象(FutureTask对象)。FutureTask实现了Runnable,我们可以创建FutureTask,然后直接交给ExecutorService。
异步计算的结果:包括接口Future和实现Future接口的FutureTask类。
最后主线程可以执行FutureTask.get()方法等待任务完成(阻塞方法),也可以使用FutureTask.cancell(boolean mayInterruptIfRunning) 来取消任务的执行。
三、Executor框架中的成员
1、ThreadPoolExecutor线程池
通常通过工厂类Exectors创建,也可以自定义创建, Executors的创建有三个方法:
1)newFixedThreadPool:可重用且固定线程数的线程池,如果线程池中的所有线程都处于活动状态,此时再提交任务就在队列中等待,直到有可用线程;如果线程池中的某个线程由于异常而结束时,线程池就会再补充一条新线程。适用于负载比较重的服务器。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
//使用一个基于FIFO排序的阻塞队列,在所有corePoolSize线程都忙时新任务将在队列中等待
new LinkedBlockingQueue<Runnable>());
}
实例化过程中传入的五个参数分别是corePoolSize,maximumPoolSize,KeepAliveTime,unit,workQueue
corePoolSize:线程池的基本大小,即在没有任务需要执行的时候线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程。
maximumPoolSize:线程池中允许的最大线程数,线程池中的当前线程数目不会超过该值。如果队列中任务已满,并且当前线程个数小于maximumPoolSize,那么会创建新的线程来执行任务。
KeepAliveTime:如果一个线程处在空闲状态的时间超过了该属性值,就会因为超时而退出。举个例子,如果线程池的核心大小corePoolSize=5,而当前大小poolSize =8,那么超出核心大小的线程,会按照keepAliveTime的值判断是否会超时退出。
unit:KeepAliveTime的时间单位。
workQueue:线程池底层使用的队列类型。
可见newFixedThreadPool的corePoolSize和maximumPoolSize是相等的。并且使用LinkedBlockingQueue(阻塞最大值Integer.MAX_VALUE)作为阻塞队列,在所有corePoolSize线程都忙时新任务将在队列中等待,所以这里一般不会有拒绝任务。
2)newSingleThreadExecutor:创建一个单线程的Executor,如果该线程因为异常而结束就新建一条线程来继续执行后续的任务,适用于需要保证顺序的执行各个任务,并且在任意时间点不会有多个线程时活动的场景。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
基本和ThreadPoolExecutor类似,不过是corePoolSize和maximumPoolSize设置为了1。
3)newCachedThreadPool:创建可缓存的线程池,如果线程池中的线程在60秒内未被使用就将被移除,在执行新的任务时,当线程池中有之前创建的可用线程就重用可用线程,否则就新建一条线程,它是大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者负载较轻的服务器。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
核心线程池为0,最大线程池可以达到Integer.MAX_VALUE个,keepAliveTime设置为60表示空闲线程最长等待任务时间60秒后就会被终止,阻塞队列使用没有容量的SynchronousQueue队列,而这个又是无界的,MAME当主线程提交任务的书读高于线程的处理速度,那么CachedThreadPool就会不听的创建线程,极端的情况下CachedThreadPool回应为过多创建线程而耗尽CPU和内存资源。
2、ScheduledThreadPoolExecutor周期执行线程池
继承ThreadPoolExecutor类,主要用来在给定延迟后运行任务,或者定期执行任务,类似Timer,但是可以创建多个后台线程数进行执行。通常通过工厂类Exectors创建。
1)newScheduledThreadPool:创建一个可延迟执行或定期执行的线程池,包含若干个线程。适用于需要多个后台程序执行周期的任务,同时为了满足资源管理的需求而限制后台线程的数量的场景。
2)newSingleThreadScheduledExecutor:只包含一个线程的执行延迟或者定期的线程池。适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务的场景。
3、 FutureTask:除了实现Future接口外,还实现了Runnable接口,因此FutureTask可以交给Executor执行,也可以调用线程直接执行FutureTask.run() ,详情可见Java并发编程:Executor框架
4、Runnable和Callable就不在此多说了
四、Demos
1)FixedThreadPool
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FixedThreadPoolDemo{
public static void main(String[] args){
ExecutorService pool = Executors.newFixedThreadPool(2);
//循环添加5个任务
for (int i = 1; i <= 5; i++){
final int taskNo = i;
pool.execute(new Runnable(){
//任务内容:执行3次打印语句打印1,2,3
@Override
public void run(){
for(int j = 1;j<=3;j++){
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程名:"+Thread.currentThread().getName()+" 任务编号"+ taskNo +" 打印"+j);
}
}
});
}
pool.shutdown();
}
}
运行结果:
2)CachedThreadPool
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FixedThreadPoolDemo{
public static void main(String[] args) throws InterruptedException{
ExecutorService pool = Executors.newCachedThreadPool();
//循环添加5个任务
for (int i = 1; i <= 5; i++){
final int taskNo = i;
pool.execute(new Runnable(){
//任务内容:执行3次打印语句打印1,2,3
@Override
public void run(){
for(int j = 1;j<=3;j++){
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程名:"+Thread.currentThread().getName()+" 任务编号"+ taskNo +" 打印"+j);
}
}
});
}
Thread.sleep(2000);
pool.execute(new Runnable(){
public void run() {
for(int j = 1;j<=3;j++){
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程名:"+Thread.currentThread().getName()+" 任务编号6"+" 打印"+j);
}
}
});
pool.shutdown();
}
}
运行结果:
其他线程池比较好理解,在这里就不做测试了:)
参考资料
1、Java并发编程:Executor框架
2、Java并发编程:Executor框架
3、Java并发编程:Executor框架
4、Java并发编程:Executor框架
今天的文章java 并发框架_java高并发编程详解pdf「建议收藏」分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/65343.html