目录
5.1 Worker实现接口Runnable,执行run方法
一、简介
Worker是ThreadPoolExecutor中的内部类,它其实就可以相当于线程池中存储的线程,用它来执行提交给线程池的任务,但是这个worker并不是一个单纯的线程,而是它的属性中持有一个线程,它还有其他的属性,用来做一些数据统计以及实现一些执行任务相关的功能。
二、Worker类对象的类图
三、Worker类对象的解释
线程池中的线程,都会被封装成一个Worker类对象,ThreadPoolExecutor维护的其实就是一组Worker对象,其中用集合workers存储这些Worker对象;
Worker类中有两个属性,一个是firstTask,用来保存传入线程池中的任务,一个是thread,是在构造Worker对象的时候,利用ThreadFactory来创建的线程,用来处理任务的线程;
Worker继承AQS,使用AQS实现独占锁,并且是不可重入的,构造Worker对象的时候,会把锁资源状态设置成-1,因为新增的线程,还没有处理过任务,是不允许被中断的
Worker使用AQS来实现的不可重入锁,固定state只有1个。以下为它的AQS实现:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{
....
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
....
}
也就是说Worker对象本身就有lock()、unlock()、tryLock()、isLocked()等方法可以调用,用来给Worker对象加锁。
lock 方法一旦获取了独占锁,表示当前线程正在执行任务中;那么它会有以下几个作用
- 如果正在执行任务,则不应该中断线程;
- 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
- 线程池在执行 shutdown 方法或 tryTerminate 方法时会调用 interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers 方法会使用 tryLock 方法来判断线程池中的线程是否是空闲状态
- 之所以设置为不可重入,是因为我们不希望任务在调用像 setCorePoolSize 这样的线程池控制方法时重新获取锁,这样会中断正在运行的线程
4.2 Worker继承自AQS有何意义?
前面我们看了Worker内部类的定义,它继承自AQS,天生自带锁的特性,那么,它的锁是用来干什么的呢?跟任务的执行有关系吗?
答:既然是跟锁(同步)有关,说明Worker类跨线程使用了,此时我们查看它的lock()方法发现只在runWorker()方法中使用了,但是其tryLock()却是在interruptIdleWorkers()方法中使用的。
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
interruptIdleWorkers()方法的意思是中断空闲线程的意思,它只会中断BlockingQueue的poll()或take()方法,而不会中断正在执行的任务。
一般来说,interruptIdleWorkers()方法的调用不是在本工作线程,而是在主线程中调用的,还记得shutdown()和shutdownNow()方法吗?
观察两个方法中中断线程的方法,shutdown()中就是调用了interruptIdleWorkers()方法来中断线程地,而且interruptIdleWorkers()方法中就用到了tryLock(),只有获取到锁了才能中断线程,如果没有获取到锁则不中断。而调用tryLock()后没获取到锁只有一种原因,那就是lock()所在的地方runWorker()方法中,有任务正在执行。这样shutdown()方法就实现了只中断空闲线程,不会中断正在执行任务的线程。
而shutdownNow()中中断线程则很暴力,并没有tryLock(),而是直接中断了线程,所以调用shutdownNow()可能会中断正在执行的任务。
所以,Worker继承自AQS实际是要使用其锁的能力,这个锁主要是用来控制调用shutdown()时不要中断正在执行任务的线程。
那么为什么Worker使用AQS实现锁,而不直接用ReentrantLock呢?我们可以看到Worker的tryAcquire 方法,它是不允许重入的,而 ReentrantLock是允许重入的。所以这是为了实现不可重入的特性去反应线程现在的执行状态。
四、Worker的主要代码
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{ /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** 这个就是worker持有的线程,也就是线程池中的线程 */ final Thread thread; /** 这个就是提交给线程池的任务 */ Runnable firstTask; /** 每一个线程执行的任务数量的计数器 */ volatile long completedTasks; /** * 我们在调用addWorker方法的时候就会调用这个构造方法,有可能是创建新线程并执行任务,那么firstTask就是传给线程池要执行的任务,如果只是了 * 单纯的想创建一个线程,只需要传入null就可以 */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 这个是通过线程工厂类创建一个线程,也就是给线程池创建一个线程 this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } 省略上面已经讲过的利用AQS实现锁的部分......}
以上就是Worker内部类的源码,所以Worker内部类的实现很简单,没有多少代码,它调用的很多方法其实仍然是ThreadPoolExecutor中的方法。
总体来看,worker其实就是一个Runable,其也是需要构造成一个Thread对象,然后调用Thread.start()方法运行的。只不过在worker的run方法中是定义了一个runWoker的方法。这个方法的主要内容从 for 循环不停的从task队列中获取对应的runable的task,然后同步调用这个task的run()方法。其实就是在某个线程中,不停的拿队列中的任务进行执行。
4.1 运行worker
可以看到构造方法内,有一个Thread对象,其使用了ThreadFactory构造了一个新的线程,并且线程的runable是worker本身。
this.thread = getThreadFactory().newThread(this);
所以需要执行worker的时候,只需要调用worker中的thread的start()方法即可,并且可以调用thread的方法来控制worker的状态,例如interrupt等。
只有运行启动了的线程,才能够去执行线程池中的任务。因为线程去获取任务执行的逻辑是定义在Worker的runWorker()方法中的,而runWorker()方法放在了Worker的run()方法中,想要执行run()方法,就需要去调用Worker类中thread对象的start()方法。所以当向线程池中成功添加worker线程后,需要将该worker线程启动才能正常使用该线程去执行任务。
4.2 worker和ThreadPool的关系
在ThreadPool中是有一个worker集合的。通过这个集合,我们可以知道有多少worker线程在进行工作等,每一个worker都是各自进行工作,工作的内容就是不停的获取task,然后执行task即可。
五、Worker源码分析
下面我们对Worker的一些关键方法源码进行分析,来帮助我们理解Worker的运作原理。下面就是工作线程执行任务时用到的核心方法分析。
5.1 Worker实现接口Runnable,执行run方法
Worker的run()方法中,实际上执行的是runWorker()方法。该方法实际上就是让线程去线程池中拿任务来执行。
runWorker()方法是线程池真正执行任务的地方。
这里要注意,run()方法是Worker类的方法,但是其调用的runWorker()方法是ThreadPoolExecutor中的方法。
java.util.concurrent.ThreadPoolExecutor#runWorker
// 向线程池中添加线程成功,并且启动也成功,则会执行Worker对象的run方法,进入runWorker方法逻辑
final void runWorker(ThreadPoolExecutor.Worker w) {
// 获取当前线程,其实这个当前线程,就是worker对象持有的线程,从线程池中拿到的任务就是由这个线程执行的
Thread wt = Thread.currentThread();
// 在构造Worker对象的时候,会把一个任务添加进Worker对象
// 因此需要把其作为新增线程的第一个任务来执行
Runnable task = w.firstTask;
// 上面已经将该任务拿出来准备进行执行了(将firstTask取出赋值给task),则需要将该worker对象即线程池中的线程对象持有的任务清空
w.firstTask = null;
// 将AQS锁资源的状态由-1变成0,运行该线程进行中断 因为在创建的时候将state设为-1了,现在开始执行任务了,也就需要加锁了,所以要把state再重新变为0,这样在后面执行任务的时候才能用来加锁,保证任务在执行过程中不会出现并发异常
// 解锁
w.unlock();
// 用来判断执行任务的过程中,是否出现了异常
boolean completedAbruptly = true;
try {
// 线程池中的线程循环处理线程池中的任务,直到线程池中的所有任务都被处理完后则跳出循环
while (task != null || (task = getTask()) != null) { // 这一步的getTask()就说明Worker一直在轮询的从队列中获取任务,getTask()方法将从队列获取到的任务返回,赋值给task
// 给该worker加锁,一个线程只处理一个任务。注意加锁是给worker线程加锁,不是给任务线程加锁,因为worker线程之前一直在轮询地在队列中取任务,但是当执行任务的时候,为了避免执行任务出现异常,就对其加锁
w.lock();
// 线程池是否是STOP状态
// 如果是,则确保当前线程是中断状态
// 如果不是,则确保当前线程不是中断状态
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// 注意这里中断的是当前线程,也就是worker对象持有的线程
wt.interrupt();
try {
// 扩展使用,在执行任务的run方法之前执行
beforeExecute(wt, task);
// 记录执行任务过程中,出现的异常
Throwable thrown = null;
try {
// 执行任务的run方法 当前线程环境就是worker对象持有的线程,所以本质就是woker对象在执行task任务的run()方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 扩展使用,在执行任务的run方法之后执行
afterExecute(task, thrown);
}
} finally {
// 执行完任务后,就将任务对象清空
task = null;
w.completedTasks++; // 该worker已经完成的任务数+1
w.unlock(); // 将worker线程地锁释放
}
}
// 正常执行完任务
completedAbruptly = false;
} finally {
// 线程池中所有的任务都处理完后,或者执行任务的过程中出现了异常,就会执行该方法
processWorkerExit(w, completedAbruptly);
}
}
这个方法主要做几件事
- 如果 task 不为空,则开始执行 task
- 如果 task 为空,则通过 getTask()再去取任务,并赋值给 task,如果取到的 Runnable 不为空,则执行该任务
- 执行完毕后,通过 while 循环继续 getTask()取任务
- 如果 getTask()取到的任务依然是空,那么整个 runWorker()方法执行完毕
这个方法比较简单,如果忽略状态检测和锁的内容,本质就是如果有第一个任务,就先执行之,之后再从任务队列中取任务来执行,获取任务是通过getTask()来进行的。
Worker执行任务流程图:
5.2 核心方法 getTask()
这个方法用来向队列中轮询地尝试获取任务。该方法也是ThreadPoolExecutor中的方法。
这里重要的地方是第二个 if 判断,目的是控制线程池的有效线程数量。
由上文中的分析可以知道,在执行 execute 方法时,如果当前线程池的线程数量超过了 corePoolSize 且小于maximumPoolSize,并且 workQueue 已满时,则可以增加工作线程,但这时如果超时没有获取到任务,也就是 timedOut 为 true 的情况,说明 workQueue 已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于 corePoolSize 数量的线程销毁掉,保持线程数量在 corePoolSize 即可。
什么时候会销毁?
当然是 runWorker()方法执行完之后,也就是 Worker 中的 run 方法执行完,由 JVM 自动回收。
// 返回任务Runnable
private Runnable getTask() {
// timedOut表示 记录上一次从队列中获取任务是否超时
boolean timedOut = false; // Did the last poll() time out?
// 自旋
for (;;) {
// 这一部分是判断线程池状态
// 获取线程池的状态和线程池中线程数量组成的整形字段,32位
// 高3位代表线程池的状态,低29位代表线程池中线程的数量
int c = ctl.get();
// 获取高3位的值,即线程池的状态
int rs = runStateOf(c);
// 如果线程池状态不是Running状态,并且 线程也不是SHUTDOWN状态 或者任务队列已空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 则将线程池中的线程数量减1 就是说该线程已经不是运行状态了,所以要这个worker线程也没有用了,直接将该worker去掉。这个是原子操作
decrementWorkerCount();
//返回一个空任务,因为:
// 1:如果任务队列已空,则想返回任务也没有
// 2:如果线程池处于STOP或者之上的状态,则线程池不允许再处理任务
return null;
}
// 这一部分是判断线程池有效线程数量
// 获取低29位的值,即线程池中线程的数量
int wc = workerCountOf(c);
// timed是否需要进行超时控制
// allowCoreThreadTimeOut默认false
// 当线程池中线程的数量没有达到核心线程数量时,获取任务的时候允许超时 如果将allowCoreThreadTimeOut设为true,那也不允许超时
// 当线程池中线程的数量超过核心线程数量时,获取任务的时候不允许超时
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 这个很好理解
// wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
// timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
// 接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1;
// 如果减1失败,则continue返回重试
// 如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// 如果上面都没问题,就可以获取任务了
try {
// 获取任务
// 如果timed = true ,说明需要做超时控制,则根据keepAliveTime设置的时间内,阻塞等待从队列中获取任务
// 如果timed = false,说明不需要做超时控制,则阻塞,直到从队列中获取到任务为止
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 如果获取到任务,则把任务返回
if (r != null)
return r;
// 执行到这里,说明在允许的时间内,没有获取到任务
timedOut = true;
} catch (InterruptedException retry) {
// 获取任务没有超时,但是出现异常了,将timedOut设置为false
timedOut = false;
}
}
}
注意,这里取任务会根据工作线程的数量判断是使用BlockingQueue的poll(timeout, unit)方法还是take()方法。
poll(timeout, unit)方法会在超时时返回null,如果timeout<=0,队列为空时直接返回null。
take()方法会一直阻塞直到取到任务或抛出中断异常。
所以,如果keepAliveTime设置为0,当任务队列为空时,非核心线程取不出来任务,会立即结束其生命周期。
默认情况下,是不允许核心线程超时的,但是可以通过下面这个方法设置使核心线程也可超时:
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value)
// 中断空闲的线程
interruptIdleWorkers();
}
}
getTask()方法返回 null 时,在 runWorker()方法中会跳出 while 循环,然后会执行processWorkerExit 方法。
5.3 核心方法 processWorkerExit()
runWorker 的 while 循环执行完毕以后,在 finally 中会调用 processWorkerExit()方法,来销毁工作线程。该方法就是判断当前线程是需要将其删除还是继续执行任务。该方法也是ThreadPoolExecutor中的方法。
但这个方法只有在一定情况下才会起作用,如果已经从队列中取不出任务来了,或者在worker执行任务过程中出现了错误,这个方法就起到了作用,如果正常的话这个方法就没啥用。
private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) {
// 如果 completedAbruptly = true ,则线程执行任务的时候出现了异常,需要从线程池中减少一个线程
// 如果 completedAbruptly = false,则执行getTask方法的时候已经减1,这里无需在进行减1操作
if (completedAbruptly)
decrementWorkerCount();
// 获取线程池的锁,因为后面是线程池的操作,为了并发安全,需要对线程池加锁
final ReentrantLock mainLock = this.mainLock;
// 线程池加锁
mainLock.lock();
try {
// 统计该线程池完成的任务数
completedTaskCount += w.completedTasks;
// 从线程池中移除一个工作线程 works是线程池持有的一个集合
workers.remove(w); // 将没用的worker去掉,也就是当前传入的worker
} finally {
// 线程池解锁
mainLock.unlock();
}
// 根据线程池的状态,决定是否结束该线程池
tryTerminate(); // 钩子方法
// 判断线程池是否需要增加线程
// 获取线程池的状态
int c = ctl.get();
// -当线程池是RUNNING或SHUTDOWN状态时
// --如果worker是异常结束(即completedAbruptly为false),那么会直接addWorker;
// ---如果allowCoreThreadTimeOut = true,并且等待队列有任务,至少保留一个worker;
// ---如果allowCoreThreadTimeOut = false,活跃线程数不少于corePoolSize
if (runStateLessThan(c, STOP)) { // 线程池状态小于STOP,就说明当前线程池是RUNNING或SHUTDOWN状态
// 如果worker是异常结束的,不进入下面的分支,直接去addWorker
if (!completedAbruptly) {
// 根据allowCoreThreadTimeOut的值,来设置线程池中最少的活跃线程数是0还是corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果等待队列中有任务,要至少保留一个worker
if (min == 0 && ! workQueue.isEmpty())
// 至少保留一个worker
min = 1;
// 如果活跃线程数大于等于min,直接返回,不需要再调用addWorker来增加线程池中的线程了
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 增加线程池中的worker
addWorker(null, false);
}
}
5.4 worker的异常处理
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
通过源代码可以看出,对应的Exception都是保存在thrown中,在finally中交给了afterExecute进行了处理。
所以可以自己实现对应的afterExecute来进行处理系统内部发生的异常问题。
六、Worker工作线程的生命周期
Worker执行任务的模型如下图所示:
相关文章:【线程池】Java的线程池
【线程池】Java线程池的核心参数
【线程池】Executors框架创建线程池
【线程池】ScheduledExecutorService接口和ScheduledThreadPoolExecutor定时任务线程池使用详解 【线程池】线程池的拒绝策略(饱和策略)
今天的文章java线程池workqueue_java线程池的使用例子分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/82751.html