java线程池workqueue_java线程池的使用例子

java线程池workqueue_java线程池的使用例子Java线程池的内部类Worker详解,一篇文章让你彻底搞懂Worker内部类,带你深度分析底层源码

目录

一、简介

二、Worker类对象的类图

三、Worker类对象的解释

4.2 Worker继承自AQS有何意义?

四、Worker的主要代码

4.1 运行worker

4.2 worker和ThreadPool的关系

五、Worker源码分析

5.1 Worker实现接口Runnable,执行run方法

5.2 核心方法 getTask()

5.3 核心方法 processWorkerExit()

5.4 worker的异常处理

六、Worker工作线程的生命周期


一、简介

Worker是ThreadPoolExecutor中的内部类,它其实就可以相当于线程池中存储的线程,用它来执行提交给线程池的任务,但是这个worker并不是一个单纯的线程,而是它的属性中持有一个线程,它还有其他的属性,用来做一些数据统计以及实现一些执行任务相关的功能。

二、Worker类对象的类图

java线程池workqueue_java线程池的使用例子

三、Worker类对象的解释

线程池中的线程,都会被封装成一个Worker类对象,ThreadPoolExecutor维护的其实就是一组Worker对象,其中用集合workers存储这些Worker对象;

Worker类中有两个属性,一个是firstTask,用来保存传入线程池中的任务,一个是thread,是在构造Worker对象的时候,利用ThreadFactory来创建的线程,用来处理任务的线程;

Worker继承AQS,使用AQS实现独占锁,并且是不可重入的构造Worker对象的时候,会把锁资源状态设置成-1,因为新增的线程,还没有处理过任务,是不允许被中断的

Worker使用AQS来实现的不可重入锁,固定state只有1个。以下为它的AQS实现:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable{
    ....
    
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
    
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    
    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
    ....
}

也就是说Worker对象本身就有lock()、unlock()、tryLock()、isLocked()等方法可以调用,用来给Worker对象加锁。

lock 方法一旦获取了独占锁,表示当前线程正在执行任务中;那么它会有以下几个作用

  • 如果正在执行任务,则不应该中断线程;
  • 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
  • 线程池在执行 shutdown 方法或 tryTerminate 方法时会调用 interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers 方法会使用 tryLock 方法来判断线程池中的线程是否是空闲状态
  • 之所以设置为不可重入,是因为我们不希望任务在调用像 setCorePoolSize 这样的线程池控制方法时重新获取锁,这样会中断正在运行的线程

4.2 Worker继承自AQS有何意义?

前面我们看了Worker内部类的定义,它继承自AQS,天生自带锁的特性,那么,它的锁是用来干什么的呢?跟任务的执行有关系吗?

答:既然是跟锁(同步)有关,说明Worker类跨线程使用了,此时我们查看它的lock()方法发现只在runWorker()方法中使用了,但是其tryLock()却是在interruptIdleWorkers()方法中使用的。

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

interruptIdleWorkers()方法的意思是中断空闲线程的意思,它只会中断BlockingQueue的poll()或take()方法,而不会中断正在执行的任务。

一般来说,interruptIdleWorkers()方法的调用不是在本工作线程,而是在主线程中调用的,还记得shutdown()和shutdownNow()方法吗?

观察两个方法中中断线程的方法,shutdown()中就是调用了interruptIdleWorkers()方法来中断线程地,而且interruptIdleWorkers()方法中就用到了tryLock(),只有获取到锁了才能中断线程,如果没有获取到锁则不中断。而调用tryLock()后没获取到锁只有一种原因,那就是lock()所在的地方runWorker()方法中,有任务正在执行。这样shutdown()方法就实现了只中断空闲线程,不会中断正在执行任务的线程。

而shutdownNow()中中断线程则很暴力,并没有tryLock(),而是直接中断了线程,所以调用shutdownNow()可能会中断正在执行的任务。

所以,Worker继承自AQS实际是要使用其锁的能力,这个锁主要是用来控制调用shutdown()时不要中断正在执行任务的线程

那么为什么Worker使用AQS实现锁,而不直接用ReentrantLock呢?我们可以看到Worker的tryAcquire 方法,它是不允许重入的,而 ReentrantLock是允许重入的。所以这是为了实现不可重入的特性去反应线程现在的执行状态。

四、Worker的主要代码

private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{	/**	 * This class will never be serialized, but we provide a	 * serialVersionUID to suppress a javac warning.	 */	private static final long serialVersionUID = 6138294804551838833L;		/** 这个就是worker持有的线程,也就是线程池中的线程 */    	final Thread thread;		/** 这个就是提交给线程池的任务 */    	Runnable firstTask;		/** 每一个线程执行的任务数量的计数器 */  	volatile long completedTasks;		/**	 * 我们在调用addWorker方法的时候就会调用这个构造方法,有可能是创建新线程并执行任务,那么firstTask就是传给线程池要执行的任务,如果只是了	 * 单纯的想创建一个线程,只需要传入null就可以	 */	Worker(Runnable firstTask) {	    setState(-1); // inhibit interrupts until runWorker	    this.firstTask = firstTask;	        // 这个是通过线程工厂类创建一个线程,也就是给线程池创建一个线程	    this.thread = getThreadFactory().newThread(this); 	}		/** Delegates main run loop to outer runWorker  */	public void run() {	    runWorker(this);	}		省略上面已经讲过的利用AQS实现锁的部分......}

以上就是Worker内部类的源码,所以Worker内部类的实现很简单,没有多少代码,它调用的很多方法其实仍然是ThreadPoolExecutor中的方法。

总体来看,worker其实就是一个Runable,其也是需要构造成一个Thread对象,然后调用Thread.start()方法运行的。只不过在worker的run方法中是定义了一个runWoker的方法。这个方法的主要内容从 for 循环不停的从task队列中获取对应的runable的task,然后同步调用这个task的run()方法其实就是在某个线程中,不停的拿队列中的任务进行执行。

4.1 运行worker

可以看到构造方法内,有一个Thread对象,其使用了ThreadFactory构造了一个新的线程,并且线程的runable是worker本身。

this.thread = getThreadFactory().newThread(this);

所以需要执行worker的时候,只需要调用worker中的thread的start()方法即可,并且可以调用thread的方法来控制worker的状态,例如interrupt等。

只有运行启动了的线程,才能够去执行线程池中的任务。因为线程去获取任务执行的逻辑是定义在Worker的runWorker()方法中的,而runWorker()方法放在了Worker的run()方法中,想要执行run()方法,就需要去调用Worker类中thread对象的start()方法。所以当向线程池中成功添加worker线程后,需要将该worker线程启动才能正常使用该线程去执行任务。

4.2 workerThreadPool的关系

在ThreadPool中是有一个worker集合的。通过这个集合,我们可以知道有多少worker线程在进行工作等,每一个worker都是各自进行工作,工作的内容就是不停的获取task,然后执行task即可。

五、Worker源码分析

下面我们对Worker的一些关键方法源码进行分析,来帮助我们理解Worker的运作原理。下面就是工作线程执行任务时用到的核心方法分析。

5.1 Worker实现接口Runnable,执行run方法

Worker的run()方法中,实际上执行的是runWorker()方法。该方法实际上就是让线程去线程池中拿任务来执行。

runWorker()方法是线程池真正执行任务的地方。

这里要注意,run()方法是Worker类的方法,但是其调用的runWorker()方法是ThreadPoolExecutor中的方法。

java.util.concurrent.ThreadPoolExecutor#runWorker

// 向线程池中添加线程成功,并且启动也成功,则会执行Worker对象的run方法,进入runWorker方法逻辑
final void runWorker(ThreadPoolExecutor.Worker w) {
    // 获取当前线程,其实这个当前线程,就是worker对象持有的线程,从线程池中拿到的任务就是由这个线程执行的
    Thread wt = Thread.currentThread();
    // 在构造Worker对象的时候,会把一个任务添加进Worker对象
    // 因此需要把其作为新增线程的第一个任务来执行
    Runnable task = w.firstTask;
    // 上面已经将该任务拿出来准备进行执行了(将firstTask取出赋值给task),则需要将该worker对象即线程池中的线程对象持有的任务清空
    w.firstTask = null;
    // 将AQS锁资源的状态由-1变成0,运行该线程进行中断 因为在创建的时候将state设为-1了,现在开始执行任务了,也就需要加锁了,所以要把state再重新变为0,这样在后面执行任务的时候才能用来加锁,保证任务在执行过程中不会出现并发异常
    // 解锁
    w.unlock();
    // 用来判断执行任务的过程中,是否出现了异常
    boolean completedAbruptly = true;
    try {
        // 线程池中的线程循环处理线程池中的任务,直到线程池中的所有任务都被处理完后则跳出循环
        while (task != null || (task = getTask()) != null) {  // 这一步的getTask()就说明Worker一直在轮询的从队列中获取任务,getTask()方法将从队列获取到的任务返回,赋值给task
            // 给该worker加锁,一个线程只处理一个任务。注意加锁是给worker线程加锁,不是给任务线程加锁,因为worker线程之前一直在轮询地在队列中取任务,但是当执行任务的时候,为了避免执行任务出现异常,就对其加锁
            w.lock();
            // 线程池是否是STOP状态
            // 如果是,则确保当前线程是中断状态
            // 如果不是,则确保当前线程不是中断状态
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                            runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                // 注意这里中断的是当前线程,也就是worker对象持有的线程
                wt.interrupt();
            
            try {
                // 扩展使用,在执行任务的run方法之前执行
                beforeExecute(wt, task);
                // 记录执行任务过程中,出现的异常
                Throwable thrown = null;
                try {
                    // 执行任务的run方法   当前线程环境就是worker对象持有的线程,所以本质就是woker对象在执行task任务的run()方法
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 扩展使用,在执行任务的run方法之后执行
                    afterExecute(task, thrown);
                }
            } finally {
                // 执行完任务后,就将任务对象清空
                task = null;
                w.completedTasks++; // 该worker已经完成的任务数+1
                w.unlock();  // 将worker线程地锁释放
            }
        }
        // 正常执行完任务
        completedAbruptly = false;
    } finally {
        // 线程池中所有的任务都处理完后,或者执行任务的过程中出现了异常,就会执行该方法
        processWorkerExit(w, completedAbruptly);
    }
}

这个方法主要做几件事

  1. 如果 task 不为空,则开始执行 task
  2. 如果 task 为空,则通过 getTask()再去取任务,并赋值给 task,如果取到的 Runnable 不为空,则执行该任务
  3. 执行完毕后,通过 while 循环继续 getTask()取任务
  4. 如果 getTask()取到的任务依然是空,那么整个 runWorker()方法执行完毕

这个方法比较简单,如果忽略状态检测和锁的内容,本质就是如果有第一个任务,就先执行之,之后再从任务队列中取任务来执行,获取任务是通过getTask()来进行的。

Worker执行任务流程图:

java线程池workqueue_java线程池的使用例子

5.2 核心方法 getTask()

这个方法用来向队列中轮询地尝试获取任务。该方法也是ThreadPoolExecutor中的方法。

这里重要的地方是第二个 if 判断,目的是控制线程池的有效线程数量。

由上文中的分析可以知道,在执行 execute 方法时,如果当前线程池的线程数量超过了 corePoolSize 且小于maximumPoolSize,并且 workQueue 已满时,则可以增加工作线程,但这时如果超时没有获取到任务,也就是 timedOut 为 true 的情况,说明 workQueue 已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于 corePoolSize 数量的线程销毁掉,保持线程数量在 corePoolSize 即可。

什么时候会销毁?

当然是 runWorker()方法执行完之后,也就是 Worker 中的 run 方法执行完,由 JVM 自动回收。

// 返回任务Runnable
private Runnable getTask() {
    // timedOut表示 记录上一次从队列中获取任务是否超时
    boolean timedOut = false; // Did the last poll() time out?
    // 自旋
    for (;;) {
        // 这一部分是判断线程池状态
        // 获取线程池的状态和线程池中线程数量组成的整形字段,32位
        // 高3位代表线程池的状态,低29位代表线程池中线程的数量
        int c = ctl.get();
        // 获取高3位的值,即线程池的状态
        int rs = runStateOf(c);
        // 如果线程池状态不是Running状态,并且 线程也不是SHUTDOWN状态 或者任务队列已空
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // 则将线程池中的线程数量减1  就是说该线程已经不是运行状态了,所以要这个worker线程也没有用了,直接将该worker去掉。这个是原子操作
            decrementWorkerCount();
            //返回一个空任务,因为:
            // 1:如果任务队列已空,则想返回任务也没有
            // 2:如果线程池处于STOP或者之上的状态,则线程池不允许再处理任务
            return null;
        }
        // 这一部分是判断线程池有效线程数量
        // 获取低29位的值,即线程池中线程的数量
        int wc = workerCountOf(c);
        // timed是否需要进行超时控制
        // allowCoreThreadTimeOut默认false
        // 当线程池中线程的数量没有达到核心线程数量时,获取任务的时候允许超时  如果将allowCoreThreadTimeOut设为true,那也不允许超时
        // 当线程池中线程的数量超过核心线程数量时,获取任务的时候不允许超时   
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 这个很好理解

        // wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
        // timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
        // 接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1;
        // 如果减1失败,则continue返回重试
        // 如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。
        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        // 如果上面都没问题,就可以获取任务了
        try {
            // 获取任务
            // 如果timed = true ,说明需要做超时控制,则根据keepAliveTime设置的时间内,阻塞等待从队列中获取任务
            // 如果timed = false,说明不需要做超时控制,则阻塞,直到从队列中获取到任务为止
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            // 如果获取到任务,则把任务返回
            if (r != null)
                return r;
            // 执行到这里,说明在允许的时间内,没有获取到任务
            timedOut = true;
        } catch (InterruptedException retry) {
            // 获取任务没有超时,但是出现异常了,将timedOut设置为false
            timedOut = false;
        }
    }
}

注意,这里取任务会根据工作线程的数量判断是使用BlockingQueue的poll(timeout, unit)方法还是take()方法。

poll(timeout, unit)方法会在超时时返回null,如果timeout<=0,队列为空时直接返回null。

take()方法会一直阻塞直到取到任务或抛出中断异常。

所以,如果keepAliveTime设置为0,当任务队列为空时,非核心线程取不出来任务,会立即结束其生命周期。

默认情况下,是不允许核心线程超时的,但是可以通过下面这个方法设置使核心线程也可超时:

public void allowCoreThreadTimeOut(boolean value) {
    if (value && keepAliveTime <= 0)
        throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
    if (value != allowCoreThreadTimeOut) {
        allowCoreThreadTimeOut = value;
        if (value)
            // 中断空闲的线程
            interruptIdleWorkers();
    }
}

getTask()方法返回 null 时,在 runWorker()方法中会跳出 while 循环,然后会执行processWorkerExit 方法。

5.3 核心方法 processWorkerExit()

runWorker 的 while 循环执行完毕以后,在 finally 中会调用 processWorkerExit()方法,来销毁工作线程。该方法就是判断当前线程是需要将其删除还是继续执行任务。该方法也是ThreadPoolExecutor中的方法。

但这个方法只有在一定情况下才会起作用,如果已经从队列中取不出任务来了,或者在worker执行任务过程中出现了错误,这个方法就起到了作用,如果正常的话这个方法就没啥用。

private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) {
    // 如果 completedAbruptly = true ,则线程执行任务的时候出现了异常,需要从线程池中减少一个线程
    // 如果 completedAbruptly = false,则执行getTask方法的时候已经减1,这里无需在进行减1操作
    if (completedAbruptly)
        decrementWorkerCount();
    
    // 获取线程池的锁,因为后面是线程池的操作,为了并发安全,需要对线程池加锁
    final ReentrantLock mainLock = this.mainLock;
    // 线程池加锁
    mainLock.lock();
    try {
        // 统计该线程池完成的任务数
        completedTaskCount += w.completedTasks;
        // 从线程池中移除一个工作线程    works是线程池持有的一个集合  
        workers.remove(w); // 将没用的worker去掉,也就是当前传入的worker
    } finally {
        // 线程池解锁
        mainLock.unlock();
    }
    // 根据线程池的状态,决定是否结束该线程池
    tryTerminate(); // 钩子方法

    // 判断线程池是否需要增加线程
    // 获取线程池的状态
    int c = ctl.get();
    // -当线程池是RUNNING或SHUTDOWN状态时
    // --如果worker是异常结束(即completedAbruptly为false),那么会直接addWorker;
    // ---如果allowCoreThreadTimeOut = true,并且等待队列有任务,至少保留一个worker;
    // ---如果allowCoreThreadTimeOut = false,活跃线程数不少于corePoolSize
    if (runStateLessThan(c, STOP)) { // 线程池状态小于STOP,就说明当前线程池是RUNNING或SHUTDOWN状态
        // 如果worker是异常结束的,不进入下面的分支,直接去addWorker
        if (!completedAbruptly) {
            // 根据allowCoreThreadTimeOut的值,来设置线程池中最少的活跃线程数是0还是corePoolSize
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 如果等待队列中有任务,要至少保留一个worker
            if (min == 0 && ! workQueue.isEmpty())
                // 至少保留一个worker
                min = 1;
            // 如果活跃线程数大于等于min,直接返回,不需要再调用addWorker来增加线程池中的线程了
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 增加线程池中的worker
        addWorker(null, false);
    }
}

5.4 worker的异常处理

try {
    while (task != null || (task = getTask()) != null) {
        w.lock();
        // If pool is stopping, ensure thread is interrupted;
        // if not, ensure thread is not interrupted.  This
        // requires a recheck in second case to deal with
        // shutdownNow race while clearing interrupt
        if ((runStateAtLeast(ctl.get(), STOP) ||
             (Thread.interrupted() &&
              runStateAtLeast(ctl.get(), STOP))) &&
            !wt.isInterrupted())
            wt.interrupt();
        try {
            beforeExecute(wt, task);
            Throwable thrown = null;
            try {
                task.run();
            } catch (RuntimeException x) {
                thrown = x; throw x;
            } catch (Error x) {
                thrown = x; throw x;
            } catch (Throwable x) {
                thrown = x; throw new Error(x);
            } finally {
                afterExecute(task, thrown);
            }
        } finally {
            task = null;
            w.completedTasks++;
            w.unlock();
        }
    }
    completedAbruptly = false;
} finally {
    processWorkerExit(w, completedAbruptly);
}

通过源代码可以看出,对应的Exception都是保存在thrown中,在finally中交给了afterExecute进行了处理。

所以可以自己实现对应的afterExecute来进行处理系统内部发生的异常问题。

六、Worker工作线程的生命周期

java线程池workqueue_java线程池的使用例子

 Worker执行任务的模型如下图所示:

java线程池workqueue_java线程池的使用例子


相关文章:【线程池】Java的线程池
                  【线程池】Java线程池的核心参数
                  【线程池】Executors框架创建线程池        
                                  【线程池】ScheduledExecutorService接口和ScheduledThreadPoolExecutor定时任务线程池使用详解                 【线程池】线程池的拒绝策略(饱和策略) 

今天的文章java线程池workqueue_java线程池的使用例子分享到此就结束了,感谢您的阅读。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/82751.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注