Java线程池详解

Java线程池详解本文包含知识点为什么需要使用线程池? 线程池的创建及重要参数 springboot中使用线程池1.为什么需要线程池?java中经常需要用到多线程来处理一些业务,我们非常不建议单纯使用继承Thread或者实现Runnable接口的方式来创建线程,那样势必有创建及销毁线程耗费资源、线程上下文切换问题。同时创建过多的线程也会占用过多的内存,这个时候引入线程池比较合理,方便线程任务的管理…

本文包含知识点

  • 线程池的使用场景分析
  • 线程池的创建及重要参数
  • 线程池实现线程复用的原理
  • springboot中使用线程池
  • Callabel与Runnable任务
  • 在基于spring体系的业务中正确地关闭线程池
  • 实现优先使用运行线程及调整线程数大小的线程池(线程池的优化)
  • 在java web项目中慎用Executors以及非守护线程

pushpin 1.线程池使用场景?

java中经常需要用到多线程来处理一些业务,我们非常不建议单纯使用继承Thread或者实现Runnable接口的方式来创建线程,那样势必有创建及销毁线程耗费资源、线程上下文切换问题。同时创建过多的线程也可能引发资源耗尽的风险,这个时候引入线程池比较合理,方便线程任务的管理。java中涉及到线程池的相关类均在jdk1.5开始的java.util.concurrent包中,涉及到的几个核心类及接口包括:Executor、Executors、ExecutorService、ThreadPoolExecutor、FutureTask、Callable、Runnable等。

  • 加快请求响应(响应时间优先)

比如用户在饿了么上查看某商家外卖,需要聚合商品库存、店家、价格、红包优惠等等信息返回给用户,接口逻辑涉及到聚合、级联等查询,从这个角度来看接口返回越快越好,那么就可以使用多线程方式,把聚合/级联查询等任务采用并行方式执行,从而缩短接口响应时间。这种场景下使用线程池的目的就是为了缩短响应时间,往往不去设置队列去缓冲并发的请求,而是会适当调高corePoolSize和maxPoolSize去尽可能的创造线程来执行任务

  • 加快处理大任务(吞吐量优先)

比如业务中台每10分钟就调用接口统计每个系统/项目的PV/UV等指标然后写入多个sheet页中返回,这种情况下往往也会使用多线程方式来并行统计。和”时间优先”场景不同,这种场景的关注点不在于尽可能快的返回,而是关注利用有限的资源尽可能的在单位时间内处理更多的任务,即吞吐量优先。这种场景下我们往往会设置队列来缓冲并发任务,并且设置合理的corePoolSize和maxPoolSize参数,这个时候如果设置了太大的corePoolSize和maxPoolSize可能还会因为线程上下文频繁切换降低任务处理速度,从而导致吞吐量降低

以上两种使用场景和JVM里的ParallelScavenge和CMS垃圾收集器有较大的类比性,ParallelScavenge垃圾收集器关注点在于达到可观的吞吐量,而CMS垃圾收集器重点关注尽可能缩短GC停顿时间。

Java线程池详解
本人项目中使用线程池的一个需求场景

pushpin 2.线程池的创建及重要参数

线程池可以自动创建也可以手动创建,自动创建体现在Executors工具类中,常见的可以创建newFixedThreadPool、newCachedThreadPool、newSingleThreadExecutor、newScheduledThreadPool;手动创建体现在可以灵活设置线程池的各个参数,体现在代码中即ThreadPoolExecutor类构造器上各个实参的不同:

  public static ExecutorService newFixedThreadPool(int var0) {
        return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
  }
	
  public static ExecutorService newSingleThreadExecutor() {
        return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
  }

  public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
  }

  public static ScheduledExecutorService newScheduledThreadPool(int var0) {
        return new ScheduledThreadPoolExecutor(var0);
  }
 public ThreadPoolExecutor(int corePoolSize,
                           int maximumPoolSize,
                           long keepAliveTime,
                           TimeUnit unit,
                           BlockingQueue<Runnable> workQueue,
                           ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {……}

books ThreadPoolExecutor中重要的几个参数详解

  • corePoolSize:核心线程数,也是线程池中常驻的线程数,线程池初始化时默认是没有线程的,当任务来临时才开始创建线程去执行任务
  • maximumPoolSize:最大线程数,在核心线程数的基础上可能会额外增加一些非核心线程,需要注意的是只有当workQueue队列填满时才会创建多于corePoolSize的线程(线程池总线程数不超过maxPoolSize)
  • keepAliveTime:非核心线程的空闲时间超过keepAliveTime就会被自动终止回收掉,注意当corePoolSize=maxPoolSize时,keepAliveTime参数也就不起作用了(因为不存在非核心线程);
  • unit:keepAliveTime的时间单位
  • workQueue:用于保存任务的队列,可以为无界、有界、同步移交三种队列类型之一,当池子里的工作线程数大于corePoolSize时,这时新进来的任务会被放到队列中
  • threadFactory:创建线程的工厂类,默认使用Executors.defaultThreadFactory(),也可以使用guava库的ThreadFactoryBuilder来创建
  • handler:线程池无法继续接收任务(队列已满且线程数达到maximunPoolSize)时的饱和策略,取值有AbortPolicy、CallerRunsPolicy、DiscardOldestPolicy、DiscardPolicy

books 线程池中的线程创建流程图:

Java线程池详解

举个栗子:现有一个线程池,corePoolSize=10,maxPoolSize=20,队列长度为100,那么当任务过来会先创建10个核心线程数,接下来进来的任务会进入到队列中直到队列满了,会创建额外的线程来执行任务(最多20个线程),这个时候如果再来任务就会执行拒绝策略。

books workQueue队列

  • SynchronousQueue(同步移交队列):队列不作为任务的缓冲方式,可以简单理解为队列长度为零
  • LinkedBlockingQueue(无界队列):队列长度不受限制,当请求越来越多时(任务处理速度跟不上任务提交速度造成请求堆积)可能导致内存占用过多或OOM
  • ArrayBlockintQueue(有界队列):队列长度受限,当队列满了就需要创建多余的线程来执行任务

books 常见的几种自动创建线程池方式

  • 自动创建线程池的几种方式都封装在Executors工具类中:
  • newFixedThreadPool:使用的构造方式为new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()),设置了corePoolSize=maxPoolSize,keepAliveTime=0(此时该参数没作用),无界队列,任务可以无限放入,当请求过多时(任务处理速度跟不上任务提交速度造成请求堆积)可能导致占用过多内存或直接导致OOM异常
  • newSingleThreadExector:使用的构造方式为new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var0),基本同newFixedThreadPool,但是将线程数设置为了1,单线程,弊端和newFixedThreadPool一致
  • newCachedThreadPool:使用的构造方式为new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue()),corePoolSize=0,maxPoolSize为很大的数,同步移交队列,也就是说不维护常驻线程(核心线程),每次来请求直接创建新线程来处理任务,也不使用队列缓冲,会自动回收多余线程,由于将maxPoolSize设置成Integer.MAX_VALUE,当请求很多时就可能创建过多的线程,导致资源耗尽OOM
  • newScheduledThreadPool:使用的构造方式为new ThreadPoolExecutor(var1, 2147483647, 0L, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue()),支持定时周期性执行,注意一下使用的是延迟队列,弊端同newCachedThreadPool一致

所以根据上面分析我们可以看到,FixedThreadPool和SigleThreadExecutor中之所以用LinkedBlockingQueue无界队列,是因为设置了corePoolSize=maxPoolSize,线程数无法动态扩展,于是就设置了无界阻塞队列来应对不可知的任务量;而CachedThreadPool则使用的是SynchronousQueue同步移交队列,为什么使用这个队列呢?因为CachedThreadPool设置了corePoolSize=0,maxPoolSize=Integer.MAX_VALUE,来一个任务就创建一个线程来执行任务,用不到队列来存储任务;SchduledThreadPool用的是延迟队列DelayedWorkQueue。在实际项目开发中也是推荐使用手动创建线程池的方式,而不用默认方式,关于这点在《阿里巴巴开发规范》中是这样描述的:

Java线程池详解

books handler拒绝策略

  • AbortPolicy:中断抛出异常
  • DiscardPolicy:默默丢弃任务,不进行任何通知
  • DiscardOldestPolicy:丢弃掉在队列中存在时间最久的任务
  • CallerRunsPolicy:让提交任务的线程去执行任务(对比前三种比较友好一丢丢)

books 关闭线程池

  • shutdownNow():立即关闭线程池(暴力),正在执行中的及队列中的任务会被中断,同时该方法会返回被中断的队列中的任务列表
  • shutdown():平滑关闭线程池,正在执行中的及队列中的任务能执行完成,后续进来的任务会被执行拒绝策略
  • isTerminated():当正在执行的任务及对列中的任务全部都执行(清空)完就会返回true

books 3.线程池实现线程复用的原理

    1.线程池里执行的是任务,核心逻辑在ThreadPoolExecutor类的execute方法中,同时ThreadPoolExecutor中维护了HashSet<Worker> workers;
    2.addWorker()方法来创建线程执行任务,如果是核心线程的任务,会赋值给Worker的firstTask属性;
    3.Worker实现了Runnable,本质上也是任务,核心在run()方法里;
    4.run()方法的执行核心runWorker(),自旋拿任务while (task != null || (task = getTask()) != null)),task是核心线程Worker的firstTask或者getTask();
    5.getTask()的核心逻辑:
            1.若当前工作线程数量大于核心线程数->说明此线程是非核心工作线程,通过poll()拿任务,未拿到任务即getTask()返回null,然后会在processWorkerExit(w, completedAbruptly)方法释放掉这个非核心工作线程的引用;
            2.若当前工作线程数量小于核心线程数->说明此时线程是核心工作线程,通过take()拿任务
            3.take()方式取任务,如果队列中没有任务了会调用await()阻塞当前线程,直到新任务到来,所以核心工作线程不会被回收; 当执行execute方法里的workQueue.offer(command)时会调用Condition.singal()方法唤醒一个之前阻塞的线程,这样核心线程即可复用

Java线程池详解

Java线程池详解

books 手动创建线程池(推荐)

那么上面说了使用Executors工具类创建的线程池有隐患,那如何使用才能避免这个隐患呢?对症下药,建立自己的线程工厂类,灵活设置关键参数:

//这里默认拒绝策略为AbortPolicy
private static ExecutorService executor = new ThreadPoolExecutor(10,10,60L, TimeUnit.SECONDS,new ArrayBlockingQueue(10));

使用guava包中的ThreadFactoryBuilder工厂类来构造线程池:

private static ThreadFactory threadFactory = new ThreadFactoryBuilder().build();

private static ExecutorService executorService = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10), threadFactory, new ThreadPoolExecutor.AbortPolicy());

通过guava的ThreadFactory工厂类还可以指定线程组名称,这对于后期定位错误时也是很有帮助的

ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-pool-d%").build();

pushpin 4.Springboot中使用线程池

springboot可以说是非常流行了,下面说说如何在springboot中优雅的使用线程池

/**
 * @ClassName ThreadPoolConfig
 * @Description 配置类中构建线程池实例,方便调用
 * @Author simonsfan
 * @Date 2018/12/20
 * Version  1.0
 */
@Configuration
public class ThreadPoolConfig {
    @Bean(value = "threadPoolInstance")
    public ExecutorService createThreadPoolInstance() {
        //通过guava类库的ThreadFactoryBuilder来实现线程工厂类并设置线程名称
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-pool-%d").build();
        ExecutorService threadPool = new ThreadPoolExecutor(10, 16, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100), threadFactory, new ThreadPoolExecutor.AbortPolicy());
        return threadPool;
    }
}
  //通过name=threadPoolInstance引用线程池实例
  @Resource(name = "threadPoolInstance")
  private ExecutorService executorService;

  @Override
  public void spikeConsumer() {
    //TODO
    executorService.execute(new Runnable() {
    @Override
    public void run() {
      //TODO
     }});
  }

pushpin 5.其它相关

在ThreadPoolExecutor类中有两个比较重要的方法引起了我们的注意:beforeExecute和afterExecute

 protected void beforeExecute(Thread var1, Runnable var2) {
 }

 protected void afterExecute(Runnable var1, Throwable var2) {
 }

这两个方法是protected修饰的,很显然是留给开发人员去重写方法体实现自己的业务逻辑,非常适合做钩子函数,在任务run方法的前后增加业务逻辑,比如添加日志、统计等。这个和我们springmvc中拦截器的preHandle和afterCompletion方法很类似,都是对方法进行环绕,类似于spring的AOP,参考下图:

Java线程池详解


Callable和Runnable

Runnable和Callable都可以理解为任务,里面封装这任务的具体逻辑,用于提交给线程池执行,区别在于Runnable任务执行没有返回值,且Runnable任务逻辑中不能通过throws抛出cheched异常(但是可以try catch),而Callable可以获取到任务的执行结果返回值且抛出checked异常。

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

Future和FutureTask

Future接口用来表示执行异步任务的结果存储器,当一个任务的执行时间过长就可以采用这种方式:把任务提交给子线程去处理,主线程不用同步等待,当向线程池提交了一个Callable或Runnable任务时就会返回Future,用Future可以获取任务执行的返回结果。Future的主要方法包括:

  • get()方法:返回任务的执行结果,若任务还未执行完,则会一直阻塞直到完成为止,如果执行过程中发生异常,则抛出异常,但是主线程是感知不到并且不受影响的,除非调用get()方法进行获取结果则会抛出ExecutionException异常;
  • get(long timeout, TimeUnit unit):在指定时间内返回任务的执行结果,超时未返回会抛出TimeoutException,这个时候需要显式的取消任务;
  • cancel(boolean mayInterruptIfRunning):取消任务,boolean类型入参表示如果任务正在运行中是否强制中断;
  • isDone():判断任务是否执行完毕,执行完毕不代表任务一定成功执行,比如任务执行失但也执行完毕、任务被中断了也执行完毕都会返回true,它仅仅表示一种状态说后面任务不会再执行了;
  • isCancelled():判断任务是否被取消;

下面来实际演示Future和FutureTask的用法:知识库页面发布接口逻辑

    @Override
    public ResultResponse publish(PageIndex page) {
        ResultResponse<Boolean> result = new ResultResponse<>();
        ResultResponse<PageIndex> recordResult = getPageRecord(page.getPageId());
        if (!recordResult.isSuccess()) {
            return recordResult;
        }
        PageIndex record = recordResult.getData();
        String currentUser = RequestContext.getUser().getUserCode();
        // 生成html文件,后续导出pdf要用
        FutureTask htmlFileTask = new FutureTask<>(new CreateHtmlFileTask(page.setName(record.getName())));
        // 同步更新页面所属知识库的最后更新人及更新时间
        FutureTask wikiUpdateTask = new FutureTask<>(new WikiUpdateTask(record.getWikiId(), currentUser));
        // 生成页面历史版本记录
        FutureTask pageVersionTask = new FutureTask<>(new GeneratePageVersionTask(new PageVersion(record.getPageId(), record.getWikiId(), page.getHtmlContent(), page.getContent(), currentUser, new Date(), null, null)));
        // 更新页面最新信息
        FutureTask pageUpdateTask = new FutureTask<>(new PageUpdateTask(page));
        List<FutureTask<String>> futureTaskList = ImmutableList.of(htmlFileTask, wikiUpdateTask, pageVersionTask, pageUpdateTask);
        // 任务提交
        futureTaskList.forEach(task -> executorService.submit(task));
        try {
            for (FutureTask<String> task : futureTaskList) {
                // 阻塞式等待任务执行结果
                String taskResult = task.get();
                logger.info("publish taskResult={}", taskResult);
            }
        } catch (Exception e) {
            logger.error("发布接口异常:{}", e, e.getMessage());
            e.printStackTrace();
        }
        // 释放锁
        pageLockService.unlock(page.getPageId(), currentUser);
        return result.data(true);
    }

    class CreateHtmlFileTask implements Callable<String> {
        private PageIndex pageIndex;

        public CreateHtmlFileTask(PageIndex pageIndex) {
            this.pageIndex = pageIndex;
        }

        @Override
        public String call() throws Exception {
            return generateHtmlFile(pageIndex);
        }
    }

2021-03-29补充:评论区看到同行们的留言,刚好近期看到一些关于线程池的好文章,特此补充上。需要重点声明的是:补充内容来自https://www.iflym.com,重要的内容说三遍!!!

books实现优先使用运行线程及调整线程数大小的线程池(线程池的优化)

当前在JDK中默认使用的线程池 ThreadPoolExecutor,在具体使用场景中,有以下几个缺点

  1. core线程一般不会timeOut
  2. 新任务提交时,如果工作线程数小于 coreSize,会自动先创建线程,即使当前工作线程已经空闲,这样会造成空闲线程浪费
  3. 设置的maxSize参数只有在队列满之后,才会生效,而默认情况下容器队列会很大(比如1000)

如一个coreSize为10,maxSize为100,队列长度为1000的线程池,在运行一段时间之后的效果会是以下2个效果:

  1. 系统空闲时,线程池中始终保持10个线程不变,有一部分线程在执行任务,另一部分线程一直wait中(即使设置allowCoreThreadTimeOut)
  2. 系统繁忙时,线程池中线程仍然为10个,但队列中有还没有执行的任务(不超过1000),存在任务堆积现象

本文将描述一下简单版本的线程池,参考于 Tomcat ThreadPoolExecutor, 实现以下3个目标

  1. 新任务提交时,如果有空闲线程,直接让空闲线程执行任务,而非创建新线程
  2. 如果coreSize满了,并且线程数没有超过maxSize,则优先创建线程,而不是放入队列
  3. 其它规则与ThreadPoolExecutor一致,如 timeOut机制

首先看一下ThreadPoolExecutor的执行逻辑, 其基本逻辑如下

  1. 如果线程数小于coreSize,直接创建新线程并执行(coreSize逻辑)
  2. 尝试放入队列
  3. 放入队列失败,则尝试创建新线程(maxSize逻辑)

而执行线程的任务执行逻辑,就是不断地从队列里面获取任务并执行,换言之,即如果有执行线程,直接往队列里面放任务,执行线程就会被通知到并直接执行任务.

空闲线程优先

空闲线程优先在基本逻辑中,即如果线程数小于coreSize,但如果有空闲线程,就取消创建线程的逻辑. 在有空闲线程的情况下,直接将任务放入队列中,即达到任务执行的目的。

这里的逻辑即是直接调整默认的ThreadPoolExecutor逻辑,通过重载 execute(Runnable) 方法达到效果. 具体代码如下所示:

1

2

3

4

5

6

7

8

9

10

public void execute(Runnable command) {

    //此处优先处理有活跃线程的情况,避免在<coreSize时,直接创建线程

    if(getActiveCount() < getPoolSize()) {

        if(pool1.offer(command)) {

            return;

        }

    }

    super.execute(command);

}

coreSize满了优先创建线程

从之前的逻辑来看,如果放入队列失败,则尝试创建新线程。在这个时候,相应的coreSize肯定已经满了。那么,只需要处理一下逻辑,将其offer调整为false,即可以实现相应的目的。

这里的逻辑,即是重新定义一个BlockingDeque,重载相应的offer方法,相应的参考如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

public boolean offer(Runnable o) {

    //这里的parent为ThreadPoolExecutor的引用

    int poolSize = parent.getPoolSize();

    int maxPoolSize = parent.getMaximumPoolSize();

    //还没到最大值,先创建线程

    if(poolSize < maxPoolSize) {

        return false;

    }

    //默认逻辑

    return super.offer(o);

}

即判定当前线程池中线程数如果小于最大线程数,即直接返回false,达到放入队列失败的效果。

总结

按照以上的调整,只需要通过继承自默认的ThreadPoolExecutor和默认的BlockingQueue(如LinkedBlockingDeque),重载2个主要的方法 ThreadPoolExecutor#execute 和 LinkedBlockingDeque#offer 即达到调整的目的。

缺点在于,实现后的类,在定义时,需要互相引用,因为相应的逻辑中需要互相调用相应的方法,以处理逻辑。此外,ThreadPoolExecutor的相应方法 getXXX 方法,在调用时都为通过加锁式实现,以精确返回数据,这里在多线程环境中可能会存在一些性能上的考虑。

在Tomcat默认的worker线程池中,即是采用以上的逻辑来达到工作线程的调整逻辑。因此在spring boot tomcat embedded中,通过参数 server.tomcat.max-thread 和 min-thread 即是通过优先调整线程来达到控制工作线程的目的。 相应的处理类为 org.apache.tomcat.util.threads 下的 ThreadPoolExecutor 和 TaskQueue。

books在基于spring体系的业务中正确地关闭线程池

在业务代码中,特别是基于spring体系的代码中,均会使用线程池进行一些操作,比如异步处理消息,定时任务,以及一些需要与当前业务分离开的操作等。常规情况下,使用spring体系的TaskExecutor或者是自己定义ExecutorService,均可以正常地完成相应的操作。不论是定义一个spring bean,或者是使用 static Thread工具类均是能满足条件。

但是,如果需要正常地关闭spring容器时,这些线程池就不一定能够按照预期地关闭了。结果就是,当使用代码 context.close() 时,期望进程会正常地退出,但实际上进程并不会退出掉.原因就在于这些线程池中还在运行的线程。
本文描述了在基于spring boot的项目中,如何正确地配置线程池,以保证线程池能够正确的在整个spring容器周期内运行,并且在容器正常关闭时能够一并退出掉.

  • 定义bean时添加destroyMethod方法或相应生命周期方法
  • 设置线程池中线程为daemon
  • 为每个线程池正确地命名及使用ThreadFactory
  • 丢弃不再需要的周期性任务
  • 监听ContextClosedEvent,触发额外操作

定义Bean时的Lifecycle定义

将线程池定义为spring bean,为保证在容器关闭时,线程池一并关闭,可以为其定义相应的关闭方法。以下为特定的bean的关闭方法

1

2

3

ThreadPoolTaskExecutor#destroy

ThreadPoolTaskExecutor#destroy

ScheduledThreadPoolExecutor#shutdown

针对 ConcurrentTaskExecutor 这种并没有实现关闭方法的bean,则需要保证其所使用 executor 能够被正常地关闭

在spring 体系,让一个bean可以接收Lifecycle管理,有多种方法,如下所示

1

2

3

4

5

6

@Bean(destroyMethod) //定义时

@PreDestroy //bean方法

DisposableBean  //接口

AutoCloseable   //接口

Lifecycle   //接口

//有方法名为 shutdown

设置线程池中线程为Daemon

一般情况下,关闭线程池后,线程池会自行将其中的线程结束掉.但针对一些自己伪装或直接new Thread()的这种线程,则仍会阻塞进程关闭。

按照,java进程关闭判定方法,当只存在Daemon线程时,进程才会正常关闭。因此,这里建议这些非主要线程均设置为 daemon,即不会阻塞进程关闭.

1

Thread.setDaemon(true)

不过更方便的是使用ThreadFactory,其在 newThread 时,可以直接操作定义出来的Thread对象(如后面所示)

正确命名Thread

在使用线程池时,一般会接受 ThreadFactory 对象,来控制如何创建thread。在java自带的ExecutorService时,如果没有设置此参数,则会使用默认的 DefaultThreadFactory. 效果就是,你会在 线程栈列表中,看到一堆的 pool-x-thread-y,在实际使用 jstack时,根本看不清这些线程每个所属的组,以及具体作用。

这里建议使用 guava 工具类 ThreadFactoryBuilder 来构建,如下代码参考所示

1

ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("定义任务-%d").build()

此代码,一是定义所有创建出来的线程为 daemon,此外相应的线程name均为 指定前缀开始。在线程栈中可以很方便地查找和定位. 此外,在判断影响进程退出时,也可以很方便地判断出是否是相关的线程池存在问题(查看daemon属性及name).

丢弃不再可用周期性任务

一般情况下,使用 java 自带的 ScheduledThreadPoolExecutor, 调用 scheduleAtFixedRate 及 scheduleWithFixedDelay 均会将任务设置为周期性的(period)。在线程池关闭时,这些任务均可以直接被丢弃掉(默认情况下). 但如果使用 schedule 添加远期的任务时,线程池则会因为其不是 周期性任务而不会关闭所对应的线程

如 spring 体系中 TriggerTask(包括CronTask), 来进行定时调度的任务,其最终均是通过 schedule 来实现调度,并在单个任务完成之后,再次 schedule 下一次任务的方式来执行。这种方式会被认为并不是 period. 因此,使用此调度方式时,尽管容器关闭时,执行了 shutdown 方法,但相应底层的 ScheduledExecutorService 仍然不会成功关闭掉(尽管所有的状态均已经设置完)。最终效果就是,会看到一个已经处于shutdown状态的线程池,但线程仍然在运行(状态为 wait 任务)的情况.

为解决此方法,java 提供一个额外的设置参数 executeExistingDelayedTasksAfterShutdown, 此值默认为true,即 shutdown 之后,仍然执行。可以通过在定义线程池时将其设置为 false,即线程池关闭之后,不再运行这些延时任务.

监听ContextClosedEvent事件

针对在业务中自己构建的bean时,可以通过上面的方式进行相应的控制,如果是三方的代码。则需要在容器关闭时通过触发额外的操作,来显示地进行三方的代码。如,在代码中获取到三方的对象,主动调用其的close方法.

惟一需要作的就是监听到容器关闭事件,然后在回调代码中进行相应的操作。因此,只需要定义beqn,实现以下接口即可

1

2

3

CallbackObj implements ApplicationListener<ContextClosedEvent> {

    public void onApplicationEvent(ContextClosedEvent event) {}

}

当此对象声明为bean时,当容器关闭时,会触发相应的事件,并回调起相应的操作.

总结

上面的几个方面均是从仔细控制线程池的创建和销毁来进行描述。基本思路即是从对象管理,生命周期以及代码控制多个方面来处理.在实际项目中,业务使用方只需要使用相应的组件即可,但组件提供方需要正确地提供,才能保证基础架构的完整性.

books在java web项目中慎用Executors以及非守护线程

 最近研究embeded tomcat,特别是关于tomcat启动和关闭的模块。通过查看相应的源代码, 我们知道tomcat的关闭是通过往相应的关闭端口发送指定的关闭指令来达到关闭tomcat的目的。但是有的时候,通过shutdown.bat或shutdown.sh却不能有效地关闭tomcat,网上也有很多人提出这个问题。通过相关资料,最后问题出现线程上。

     首先看java虚拟机退出的条件,如下所示:

1

2

a,调用了 Runtime 类的 exit 方法,并且安全管理器允许退出操作发生。

b,非守护线程的所有线程都已停止运行,无论是通过从对 run 方法的调用中返回,还是通过抛出一个传播到 run 方法之外的异常。

    如上所示,第一条是通过exit退出,第二条指出了一个正常程序退出的条件,就是所有的非守护线程都已经停止运行。我们看相应embed tomcat的启动代码,如下所示:

1

2

tomcat.start();

        tomcat.getServer().await();

    最后一条有效的运行命令即是await,通过调用shutdown命令时,这个await就会成功的返回。按照常理来说,整个程序即会成功的完成。但是程序有时候并没有成功的结束,原因就在于程序中还存在着非守护进程。
    对于tomcat来说,tomcat程序中开启的所有进程都是守护进程,所以tomcat自身可以保证程序的正常结束。当await结束时,tomcat所就正常的结束了,包括相应的监听端口等,都已经成功结束。然而,由于项目程序中仍然还有其它线程在运行,所以导致java虚拟机并没有成功的退出。

     在我们的项目中,很多时候都运用到了线程。比如,异步调用等。不过,幸运的是,这些线程往往都是守护线程,原因就在于tomcat在运行我们的项目时,对于每一个请求,tomcat是使用了守护线程来进行相应的请求调用,这个保证在以下代码:

AprEndpoint

01

02

03

04

05

06

07

08

09

10

// Start poller threads

            pollers = new Poller[pollerThreadCount];

            for (int i = 0; i < pollerThreadCount; i++) {

                pollers[i] = new Poller(false);

                pollers[i].init();

                Thread pollerThread = new Thread(pollers[i], getName() + "-Poller-" + i);

                pollerThread.setPriority(threadPriority);

                pollerThread.setDaemon(true);

                pollerThread.start();

            }

    所以,一般情况下,在我们的项目代码中使用new Thread建立的线程都是守护线程,原因就是新建线程默认上使用建立线程时的当前线程所处的守护状态。tomcat的请求处理线程为守护线程,所以我们一般情况下建立的线程也是守护线程。然而,Executors除外。

     使用Executors建立后台线程并执行一些多线程操作时,Executors会使用相对应的threadFactory来对runnable建立新的thread,所以使用默认的threadFactory时就会出问题。默认的ThreadFactory强制性的将新创建的线程设置为非守护状态,如下所示:

01

02

03

04

05

06

07

08

09

10

public Thread newThread(Runnable r) {

            Thread t = new Thread(group, r,

                                  namePrefix + threadNumber.getAndIncrement(),

                                  0);

            if (t.isDaemon())

                t.setDaemon(false);

            if (t.getPriority() != Thread.NORM_PRIORITY)

                t.setPriority(Thread.NORM_PRIORITY);

            return t;

        }

    所以,一般情况下,我们使用executors创建多线程时,就会使用默认的threadFactory(即调用只有一个参数的工厂方法),而创建出来的线程就是非守护的。而相应的程序就永远不会退出,如采用Executors创建定时调度任务时,这个调试任务永远不会退出。解决的办法就是重写相对应的threadFactory,如下所示:

1

2

3

4

5

6

7

new ThreadFactory() {

        public Thread newThread(Runnable r) {

            Thread s = Executors.defaultThreadFactory().newThread(r);

            s.setDaemon(true);

            return s;

        }

    }

    同理,对于java web项目中的线程程序,一定要记住将相应的线程标记为守护线程(尽管它默认就是守护的)。而对于使用Executors,一定要记住传递相应的threadFactory实现,以重写相应的newThread方法,将线程标记为守护线程。
    以上的结论对于普通的java项目同样有效,为了正常的结束相应的程序,一定要正确的使用相应的线程,以避免java程序不能退出的问题。


参考资料:《阿里巴巴Java开发手册v1.2.0》、JDK1.7 java.util.current包、https://www.iflym.com

引申阅读

CyclicBarrier(同步屏障)的简单使用
CountDownLatch(闭锁)的简单使用

今天的文章Java线程池详解分享到此就结束了,感谢您的阅读,如果确实帮到您,您可以动动手指转发给其他人。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/24925.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注