Java线程池

什么是线程池?

线程池的基本思想是一种对象池,在程序启动时就开辟一块内存空间,里面存放了众多(未死亡)的线程,池中线程执行调度由池管理器来处理。当有线程任务时,从池中取一个,执行完成后线程对象归池,这样可以避免反复创建线程对象所带来的性能开销,节省了系统的资源。

线程池的好处

  • **降低资源消耗。**通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • **提高响应速度。**当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • **提高线程的可管理性。**线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程池适合应用的场合

  • 服务器接受到大量请求时,使用线程池技术是非常合适的,他可以大大减少线程的创建和销毁次数,提高服务器的工作效率。
  • 实际上,在开发中,如果需要创建五个以上的线程,那么就可以使用线程池来管理。

创建线程池

线程池构造函数的参数

  • corePoolSize(核心线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。
  1. 如果运行的线程少于 corePoolSize,则创建新线程来处理任务,即使线程池中的其他线程是空闲的;
  2. 如果线程池中的线程数量大于等于 corePoolSize 且小于 maximumPoolSize,则只有当workQueue满时才创建新的线程去处理任务;
  3. 如果设置的corePoolSize 和 maximumPoolSize相同,则创建的线程池的大小是固定的,这时如果有新任务提交,若workQueue未满,则将请求放入workQueue中,等待有空闲的线程去从workQueue中取任务并处理;
  4. 如果运行的线程数量大于等于maximumPoolSize,这时如果workQueue已经满了,则通过handler所指定的策略来处理任务;

所以,任务提交时,判断的顺序为 corePoolSize --> workQueue --> maximumPoolSize。

image.png

  • maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
  • workQueue(任务队列):用于保存等待执行的任务的阻塞队列。
  1. 直接切换SynchronousQueue
  2. 使用无界队列:一般使用基于链表的阻塞队列LinkedBlockingQueue。如果使用这种方式,那么线程池中能够创建的最大线程数就是corePoolSize,而maximumPoolSize就不会起作用了(后面也会说到)。当线程池中所有的核心线程都是RUNNING状态时,这时一个新的任务提交就会放入等待队列中。
  3. 使用有界队列:一般使用ArrayBlockingQueue。使用该方式可以将线程池的最大线程数量限制为maximumPoolSize。
  • keepAliveTime(线程活动保持时间):线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime;所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
  • ThreadFactory:它是ThreadFactory类型的变量,用来创建新线程。默认使Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
  • handler:它是RejectedExecutionHandler类型的变量,表示线程池的饱和策略。如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。线程池提供了4种策略:
    - AbortPolicy:直接抛出异常,这是默认策略;
    - CallerRunsPolicy:用调用者所在的线程来执行任务;
    - DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    - DiscardPolicy:直接丢弃任务;
  • TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

向线程池提交任务

我们可以使用 execute 提交任务,但是 execute 方法没有返回值,所以无法判断任务是否被线程池执行成功。
通过以下代码可知 execute 方法输入的任务是一个 Runnable 实例。

threadsPool.execute(new Runnable() {
            @Override
            public void run() {
                // TODO Auto-generated method stub
            }
        });

我们也可以使用 submit 方法来提交任务,它会返回一个 Future ,那么我们可以通过这个 Future 来判断任务是否执行成功。
通过 Futureget 方法来获取返回值,get 方法会阻塞住直到任务完成。而使用 get(long timeout, TimeUnit unit) 方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。

Future<Object> future = executor.submit(harReturnValuetask);
try {
     Object s = future.get();
} catch (InterruptedException e) {
    // 处理中断异常
} catch (ExecutionException e) {
    // 处理无法执行任务异常
} finally {
    // 关闭线程池
    executor.shutdown();
}

Java通过Executors提供的常见线程池

newFixedThreadPool

创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。指定线程池中的线程数量和最大线程数量一样,也就线程数量固定不变。
FixedThreadPool 是一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。但是,在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

一个小栗子:

public class FixedThreadPoolTest {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new Task());
        }
    }
}
class Task implements Runnable {
    @Override
    public void run() {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName());
    }
}

**缺点:**由于传进去的LinkedBlockingQueue是没有容量上限的,所以当请求数量越来越多,并且无法及时处理完毕的时候,也就是请求堆积的时候,会容易造成占用大量的内存,可能会导致OOM。例如,并设置
image.png

public class FixedThreadPoolOOM {
    private static ExecutorService executorService = Executors.newFixedThreadPool(1);
    public static void main(String[] args) {
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            executorService.execute(new SubThread());
        }
    }
}

class SubThread implements Runnable {
    @Override
    public void run() {
        try {
            Thread.sleep(1000000000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

image.png

newSingleThreadExecutor

创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。请求堆积的时候,也会容易造成占用大量的内存。如果这个线程异常结束,会有另一个取代它,保证顺序执行。单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

一个小栗子:

public class SingleThreadExecutor {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new Task());
        }
    }
}
class Task implements Runnable {
    @Override
    public void run() {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName());
    }
}

newCachedThreadPool

创建一个定长线程池,使用SynchronousQueue支持定时及周期性任务执行。无界线程池,具有自动回收多余线程的功能。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

这种类型的线程池特点是:

  • 工作线程的创建数量几乎没有限制(其实也有限制的,数目为 Interger.MAX_VALUE), 这样可灵活的往线程池中添加线程。
  • 如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为 1 分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。
  • 在使用 CachedThreadPool 时,一定要注意控制任务的数量,否则,由于大量线程同时运行,很有会造成系统瘫痪OOM。
public class CachedThreadPool {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new Task());
        }
    }
}
class Task implements Runnable {
    @Override
    public void run() {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName());
    }
}

newScheduleThreadPool

创建一个定长线程池,支持定时及周期性任务执行。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

如下.表示延迟1秒后每3秒执行一次

public static void main(String[] args) {
    ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
    scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
        public void run() {
            System.out.println(Thread.currentThread().getName() + ": delay 1 seconds, and excute every 3 seconds");
        }
    }, 1, 3, TimeUnit.SECONDS);// 表示延迟1秒后每3秒执行一次
}

image.png
线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors各个方法的弊端:
1)newFixedThreadPool和newSingleThreadExecutor:
  主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool:
  主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。

四种常见线程池总结

image.png

newWorkStealingPool

 在Java 8中,引入了一种新型的线程池(其他 1.5),作为newWorkStealingPool()来补充现有的线程池。WorkStealingPool线程池,来维持相应的并行级别,它会通过工作窃取的方式,使得多核的 CPU 不会闲置,总会有活着的线程让 CPU 去运行。

 顾名思义,它基于 工作窃取算法,其中任务可以生成其他较小的任务,这些任务将添加到并行处理线程的队列中。如果一个线程完成了工作并且无事可做,则可以从另一线程的队列中“窃取”工作。

public static ExecutorService newWorkStealingPool() {
    return new ForkJoinPool
        (Runtime.getRuntime().availableProcessors(),
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}

image.png

线程数量的设定

任务一般分为:CPU密集型、IO密集型、混合型,对于不同类型的任务需要分配不同大小的线程池
1、CPU密集型(加密,计算hash等)
尽量使用较小的线程池,一般Cpu核心数+1
因为CPU密集型任务CPU的使用率很高,若开过多的线程,只能增加线程上下文的切换次数,带来额外的开销
2、IO密集型(读写数据库、文件、网络读写)
**方法一:**可以使用较大的线程池,一般CPU核心数 * 2
IO密集型CPU使用率不高,可以让CPU等待IO的时候处理别的任务,充分利用cpu时间
**方法二:**线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。
下面举个例子:
比如平均每个线程CPU运行时间为0.5s,而线程等待时间(非CPU运行时间,比如IO)为1.5s,CPU核心数为8,那么根据上面这个公式估算得到:((0.5+1.5)/0.5)8=32。这个公式进一步转化为:
最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1) CPU数目

3、混合型
可以将任务分为CPU密集型和IO密集型,然后分别使用不同的线程池去处理,按情况而定。

停止线程池的正确做法

shutDown()

当线程池调用该方法时,线程池的状态则立刻变成SHUTDOWN状态。此时,则不能再往线程池中添加任何任务,否则将会抛出RejectedExecutionException异常。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。

shutdownNow()

根据JDK文档描述,大致意思是:执行该方法,线程池的状态立刻变成STOP状态,并试图停止所有正在执行的线程,不再处理还在池队列中等待的任务,当然,它会返回那些未执行的任务。
它试图终止线程的方法是通过调用Thread.interrupt()方法来实现的,但是大家知道,这种方法的作用有限,如果线程中没有sleep、wait、Condition、定时锁等应用, interrupt()方法是无法中断当前的线程的。所以,ShutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。

isShutdown()、isTerminaed()

只要调用了这两个关闭方法的其中一个,isShutdown 方法就会返回 true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用 isTerminaed 方法会返回 true。至于我们应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用 shutdown 来关闭线程池,如果任务不一定要执行完,则可以调用 shutdownNow。

四种拒绝策略

新建线程 -> 达到核心数 -> 加入队列 -> 新建线程(非核心) -> 达到最大数 -> 触发拒绝策略
AbortPolicy:丢弃任务并抛出RejectedExecutionException异常

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}

DiscardPolicy:丢弃任务,但是不抛出异常

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}

DisCardOldSetPolicy:丢弃队列最前面的任务,然后提交新来的任务

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        e.getQueue().poll();
        e.execute(r);
    }
}

CallerRunPolicy:由调用线程(提交任务的线程,主线程)处理该任务

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        r.run();
    }
}

Executor框架

image.png

线程池组成部分

  • 线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
  • 工作线程(WorkThread):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
  • 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
  • 任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。

image.png

image.png

Executor

只有一个方法:

void execute(Runnable command);

ExecutorService

ExecutorService extends Executor{
    .......
}

image.png

Executors

工具类,快速创建线程池
image.png 

ThreadPoolExecutor

java.uitl.concurrent.ThreadPoolExecutor 类是 Executor 框架中最核心的一个类。
ThreadPoolExecutor 有四个构造方法,前三个都是基于第四个实现。第四个构造方法定义如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
        null :
    AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

参考:
Java线程池解析
简单说说Java线程池
Java面试经典题:线程池专题
线程池总结

Q.E.D.

知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议