高并发一瞥,線(xiàn)程和線(xiàn)程池的总结

2020/08/19      42342 文(wén)章来源:CSDN 原作者:shu21

1 JAVA線(xiàn)程的实现原理(lǐ)

 

java的線(xiàn)程是基于操作系统原生的線(xiàn)程模型(非用(yòng)户态),通过系统调用(yòng),将線(xiàn)程交给系统调度执行

java線(xiàn)程拥有(yǒu)属于自己的虚拟机栈,当JVM将栈、程序计数器、工作内存等准备好后,会分(fēn)配一个系统原生線(xiàn)程来执行。Java線(xiàn)程结束,原生線(xiàn)程随之被回收

原生線(xiàn)程初始化完毕,会调Java線(xiàn)程的run方法。当JAVA線(xiàn)程结束时,则释放原生線(xiàn)程和Java線(xiàn)程的所有(yǒu)资源

java方法的执行对应虚拟机栈的一个栈帧,用(yòng)于存储局部变量、操作数栈、动态链接、方法出口等

2 JAVA線(xiàn)程的生命周期

 

New(新(xīn)建状态):用(yòng)new关键字创建線(xiàn)程之后,该線(xiàn)程处于新(xīn)建状态,此时仅由JVM為(wèi)其分(fēn)配内存,并初始化其成员变量

Runnable(就绪状态):当调用(yòng)Thread.start方法后,该線(xiàn)程处于就绪状态。JVM会為(wèi)其分(fēn)配虚拟机栈等,然后等待系统调度

Running(运行状态):处于就绪状态的線(xiàn)程获得CPU,执行run方法时,则線(xiàn)程处于运行状态

Blocked(阻塞状态):阻塞状态是指線(xiàn)程放弃了cpu的使用(yòng)权(join,sleep函数的调用(yòng)),处于暂停止状态。Blocked状态的線(xiàn)程需要恢复到Runnable状态,才能(néng)再次被系统调度执行变成Running

Terminated(線(xiàn)程死亡):線(xiàn)程正常run结束、或抛出一个未捕获的Throwable、调用(yòng)Thread.stop来结束该線(xiàn)程,都会导致線(xiàn)程的死亡

 

java線(xiàn)程和linux線(xiàn)程的生命周期基本是一一对应了,就是多(duō)了new阶段

3 JAVA線(xiàn)程的几种常用(yòng)方法

線(xiàn)程启动函数

//Thread.java

//调用(yòng)start启动線(xiàn)程,进入Runnable状态,等待系统调度执行

public synchronized void start(){//synchronized同步执行

    if (threadStatus != 0) //0 代表new状态,非0则抛出错误

            throw new IllegalThreadStateException();

    ...

    start0(); //本地方法方法 private native void start0()

    ...

}

//Running状态,新(xīn)線(xiàn)程执行的代码方法,可(kě)被子类重写

public void run() {

    if (target != null) {

        //target是Runnable,new Thread(Runnable)时传入

        target.run();

    }

}

線(xiàn)程终止函数

//Thread.java

@Deprecated public final void stop();

//中断線(xiàn)程

public void interrupt()

//判断的是当前線(xiàn)程是否处于中断状态

public static boolean interrupted()

用(yòng)stop会强行终止線(xiàn)程,导致線(xiàn)程所持有(yǒu)的全部锁突然释放(不可(kě)控制),而被锁突同步的逻辑遭到破坏。不建议使用(yòng)

interrupt函数中断線(xiàn)程,但它不一定会让線(xiàn)程退出的。它比stop函数优雅,可(kě)控制

当線(xiàn)程处于调用(yòng)sleep、wait的阻塞状态时,会抛出InterruptedException,代码内部捕获,然后结束線(xiàn)程

線(xiàn)程处于非阻塞状态,则需要程序自己调用(yòng)interrupted()判断,再决定是否退出

其他(tā)常用(yòng)方法

//Thread.java

//阻塞等待其他(tā)線(xiàn)程

public final synchronized void join(final long millis)

//暂时让出CPU执行

public static native void yield();

//休眠一段时间

public static native void sleep(long millis) throws InterruptedException;

start与run方法的區(qū)别

start是Thread类的方法,从線(xiàn)程的生命周期来看,start的执行并不意味着新(xīn)線(xiàn)程的执行,而是让JVM分(fēn)配虚拟机栈,进入Runnable状态,start的执行还是在旧線(xiàn)程上

run则是新(xīn)線(xiàn)程被系统调度,获取CPU时执行的方法,函数run则是继承Thread重写的run或者实现接口Runnable的run

Thread.sleep与Object.wait區(qū)别

Thread.sleep需要指定休眠时间,时间一到可(kě)继续运行;和锁机制无关,没有(yǒu)加锁也不用(yòng)释放锁

Object.wait需要在synchronized中调用(yòng),否则报IllegalMonitorStateException错误。wait方法会释放锁,需要调用(yòng)相同锁对象Object.notify来唤醒線(xiàn)程

4 線(xiàn)程池及其优点

線(xiàn)程的每次使用(yòng)创建,结束销毁是非常巨大的开销。若用(yòng)缓存的策略(線(xiàn)程池),暂存曾经创建的線(xiàn)程,复用(yòng)这些線(xiàn)程,可(kě)以减少程序的消耗,提高線(xiàn)程的利用(yòng)率

降低资源消耗:重复利用(yòng)線(xiàn)程可(kě)降低線(xiàn)程创建和销毁造成的消耗

提高响应速度:当任務(wù)到达时,不需要等待線(xiàn)程创建就能(néng)立即执行

提高線(xiàn)程的可(kě)管理(lǐ)性:使用(yòng)線(xiàn)程池可(kě)以进行统一的分(fēn)配,监控和调优

5 JDK封装的線(xiàn)程池

 

//ThreadPoolExecutor.java

public ThreadPoolExecutor(

    int corePoolSize,

    int maximumPoolSize,

    long keepAliveTime,

    TimeUnit unit,

    BlockingQueue<Runnable> workQueue,

    ThreadFactory threadFactory,

    RejectedExecutionHandler handler)

1 corePoolSize:核心線(xiàn)程数,線(xiàn)程池维持的線(xiàn)程数量

2 maximumPoolSize:最大的線(xiàn)程数,当阻塞队列不可(kě)再接受任務(wù)时且maximumPoolSize大于corePoolSize则会创建非核心線(xiàn)程来执行。但任務(wù)执行时,会被销毁

3 keepAliveTime:非核心線(xiàn)程在闲暇间的存活时间

4 TimeUnit:和keepAliveTime配合使用(yòng),表示keepAliveTime参数的时间单位

5 workQueue:任務(wù)的等待阻塞队列,正在执行的任務(wù)数超过corePoolSize时,加入该队列

6 threadFactory:線(xiàn)程的创建工厂

7 handler:拒绝策略,線(xiàn)程数达到了maximumPoolSize,还有(yǒu)任務(wù)提交则使用(yòng)拒绝策略处理(lǐ)

6 線(xiàn)程池原理(lǐ)之执行流程

 

//ThreadPoolExecutor.java

public void execute(Runnable command) {

    ...

    if (workerCountOf(c) < corePoolSize) { //plan A

        if (addWorker(command, true))  

            return;

        c = ctl.get();

    }

    if (isRunning(c) && workQueue.offer(command)) { //plan B

        int recheck = ctl.get();

        if (! isRunning(recheck) && remove(command))

            reject(command);

        else if (workerCountOf(recheck) == 0)

            addWorker(null, false);

    }

    //addWorker(command, false) false代表可(kě)创建非核心線(xiàn)程来执行任務(wù)

    else if (!addWorker(command, false)) //plan C

        reject(command);    // //plan D

}

plan A:任務(wù)的execute,先判断核心線(xiàn)程数量达到上限;否,则创建核心線(xiàn)程来执行任務(wù);是,则执行plan B

plan B:当任務(wù)数大于核心数时,任務(wù)被加入阻塞队列,如果超过阻塞队列的容量上限,执行C

plan C: 阻塞队列不能(néng)接受任務(wù)时,且设置的maximumPoolSize大于corePoolSize,创建新(xīn)的非核心線(xiàn)程执行任務(wù)

plan D:当plan A、B、C都无能(néng)為(wèi)力时,使用(yòng)拒绝策略处理(lǐ)

7 阻塞队列的简单了解

队列的阻塞插入:当队列满时,队列会阻塞插入元素的線(xiàn)程,直到队列不满

队列的阻塞移除:当队列為(wèi)空时,获取元素的線(xiàn)程会等待队列变為(wèi)非空

BlockingQueue提供的方法如下,其中put和take是阻塞操作

操作方法

抛出异常

返回特殊值

阻塞線(xiàn)程

超时退出

插入元素

add(e)

offer(e)

put(e)

offer(e, timeout, unit)

移除元素

remove()

poll()

take()

pull(timeout, unit)

检查

element()

peek()

 

ArrayBlockingQueue

ArrayBlockingQueue是用(yòng)数组实现的有(yǒu)界阻塞队列,必须指定队列大小(xiǎo),先进先出(FIFO)原则排队

LinkedBlockingQueue

是用(yòng)链表实现的有(yǒu)界阻塞队列,如果构造LinkedBlockingQueue时没有(yǒu)指定大小(xiǎo),则默认是Integer.MAX_VALUE,无限大

该队列生产端和消费端使用(yòng)独立的锁来控制数据操作,以此来提高队列的并发性

PriorityBlockingQueue

public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)

基于数组,元素具有(yǒu)优先级的无界阻塞队列,优先级由Comparator决定

PriorityBlockingQueue不会阻塞生产者,却会在没有(yǒu)可(kě)消费的任務(wù)时,阻塞消费者

DelayQueue

支持延时获取元素的无界阻塞队列,基于PriorityQueue实现

元素必须实现Delayed接口,指定多(duō)久才能(néng)从队列中获取该元素。

可(kě)用(yòng)于缓存系统的设计、定时任務(wù)调度等场景的使用(yòng)

SynchronousQueue

SynchronousQueue是一种无缓冲的等待队列,添加一个元素必须等待被取走后才能(néng)继续添加元素

LinkedTransferQueue

由链表组成的TransferQueue无界阻塞队列,相比其他(tā)队列多(duō)了tryTransfer和transfer函数

transfer:当前有(yǒu)消费者正在等待元素,则直接传给消费者,否则存入队尾,并阻塞等待元素被消费才返回

tryTransfer:试探传入的元素是否能(néng)直接传给消费者。如果没消费者等待消费元素,元素加入队尾,返回false

LinkedBlockingDeque

LinkedBlockingDeque是由链表构建的双向阻塞队列,多(duō)了一端可(kě)操作入队出队,少了一半的竞争,提高并发性

8 Executors的四种線(xiàn)程池浅析

newFixedThreadPool

//Executors.java

public static ExecutorService newFixedThreadPool(int nThreads) {

    return new ThreadPoolExecutor(nThreads, nThreads,

                0L, TimeUnit.MILLISECONDS,

                new LinkedBlockingQueue<Runnable>());

}

指定核心線(xiàn)程数,队列是LinkedBlockingQueue无界阻塞队列,永遠(yuǎn)不可(kě)能(néng)拒绝任務(wù);适合用(yòng)在稳定且固定的并发场景,建议線(xiàn)程设置為(wèi)CPU核数

newCachedThreadPool

//Executors.java

public static ExecutorService newCachedThreadPool() {

    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

                60L, TimeUnit.SECONDS,

                new SynchronousQueue<Runnable>());

}

核心池大小(xiǎo)為(wèi)0,線(xiàn)程池最大線(xiàn)程数為(wèi)最大整型,任務(wù)提交先加入到阻塞队列中,非核心線(xiàn)程60s没任務(wù)执行则销毁,阻塞队列為(wèi)SynchronousQueue。newCachedThreadPool会不断的创建新(xīn)線(xiàn)程来执行任務(wù),不建议用(yòng)

newScheduledThreadPool

//Executors.java

public ScheduledThreadPoolExecutor(int corePoolSize,

                                   ThreadFactory threadFactory) {

    super(corePoolSize, Integer.MAX_VALUE,

          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,

          new DelayedWorkQueue(), threadFactory);

}

//指定延迟执行时间    

public <V> ScheduledFuture<V>

schedule(Callable<V> callable, long delay, TimeUnit unit)    

ScheduledThreadPoolExecutor(STPE)其实是ThreadPoolExecutor的子类,可(kě)指定核心線(xiàn)程数,队列是STPE的内部类DelayedWorkQueue。STPE的好处是 A 延时可(kě)执行任務(wù),B 可(kě)执行带有(yǒu)返回值的任務(wù)

newSingleThreadExecutor

//Executors.java

public static ExecutorService newSingleThreadExecutor() {

    return new FinalizableDelegatedExecutorService

        (new ThreadPoolExecutor(1, 1,

                        0L, TimeUnit.MILLISECONDS,

                        new LinkedBlockingQueue<Runnable>())); //无界队列

}

newFixedThreadPool构造方法一致,不过線(xiàn)程数被设置為(wèi)1了。SingleThreadExecutor比new个線(xiàn)程的好处是;線(xiàn)程运行时抛出异常的时候会有(yǒu)新(xīn)的線(xiàn)程加入線(xiàn)程池完成接下来的任務(wù);阻塞队列可(kě)以保证任務(wù)按FIFO执行

9 如果优雅地关闭線(xiàn)程池

線(xiàn)程池的关闭,就要先关闭池中的線(xiàn)程,上文(wén)第三点有(yǒu)提,暴力强制性stop線(xiàn)程会导致同步数据的不一致,因此我们要调用(yòng)interrupt关闭線(xiàn)程

而線(xiàn)程池提供了两个关闭方法,shutdownNow和shuwdown

shutdownNow:線(xiàn)程池拒接收新(xīn)任務(wù),同时立马关闭線(xiàn)程池(进行中的任務(wù)会执行完),队列的任務(wù)不再执行,返回未执行任務(wù)List

public List<Runnable> shutdownNow() {

    ...

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock(); //加锁

    try {

        checkShutdownAccess();

        advanceRunState(STOP);

        interruptWorkers(); //interrupt关闭線(xiàn)程

        tasks = drainQueue(); //未执行任務(wù)

    ...    

shuwdown:線(xiàn)程池拒接收新(xīn)任務(wù),同时等待線(xiàn)程池里的任務(wù)执行完毕后关闭線(xiàn)程池,代码和shutdownNow类似就不贴了

10 線(xiàn)程池為(wèi)什么使用(yòng)的是阻塞队列

先考虑下為(wèi)啥線(xiàn)程池的線(xiàn)程不会被释放,它是怎么管理(lǐ)線(xiàn)程的生命周期的呢(ne)

 

//ThreadPoolExecutor.Worker.class

final void runWorker(Worker w) {

    ...

    //工作線(xiàn)程会进入一个循环获取任務(wù)执行的逻辑

    while (task != null || (task = getTask()) != null)

    ...

}

 

private Runnable getTask(){

    ...

    Runnable r = timed ?

        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)

        : workQueue.take(); //線(xiàn)程会阻塞挂起等待任務(wù),

    ...    

}

可(kě)以看出,无任務(wù)执行时,線(xiàn)程池其实是利用(yòng)阻塞队列的take方法挂起,从而维持核心線(xiàn)程的存活

11 線(xiàn)程池的worker继承AQS的意义

//Worker class,一个worker一个線(xiàn)程

Worker(Runnable firstTask) {

    //禁止新(xīn)線(xiàn)程未开始就被中断

    setState(-1); // inhibit interrupts until runWorker

    this.firstTask = firstTask;

    this.thread = getThreadFactory().newThread(this);

}

 

final void runWorker(Worker w) {

    ....

    //对应构造Worker是的setState(-1)

    w.unlock(); // allow interrupts

    boolean completedAbruptly = true;

        ....

        w.lock(); //加锁同步

        ....

        try {

            ...

            task.run();

            afterExecute(task, null);

        } finally {

            ....

            w.unlock(); //释放锁

        }

worker继承AQS的意义:A 禁止線(xiàn)程未开始就被中断;B 同步runWorker方法的处理(lǐ)逻辑

12 拒绝策略

AbortPolicy 丢弃任務(wù)并抛出RejectedExecutionException异常

DiscardOldestPolicy 丢弃队列最前面的任務(wù),然后重新(xīn)提交被拒绝的任務(wù)

DiscardPolicy 丢弃任務(wù),但是不抛出异常

CallerRunsPolicy

A handler for rejected tasks that runs the rejected task directly in the calling thread of the {@code execute} method, unless the executor has been shut down, in which case the task is discarded.

 

如果任務(wù)被拒绝了,则由提交任務(wù)的線(xiàn)程执行此任務(wù)

13 ForkJoinPool了解一波

ForkJoinPool和ThreadPoolExecutor不同,它适合执行可(kě)以分(fēn)解子任務(wù)的任務(wù),如树的遍历,归并排序等一些递归场景

  




ForkJoinPool每个線(xiàn)程有(yǒu)一个对应的双端队列deque;当線(xiàn)程中的任務(wù)被fork分(fēn)裂,分(fēn)裂出来的子任務(wù)会放入線(xiàn)程自己的deque,减少線(xiàn)程的竞争

work-stealing工作窃取算法

当線(xiàn)程执行完自己deque的任務(wù),且其他(tā)線(xiàn)程deque还有(yǒu)多(duō)的任務(wù),则会启动窃取策略,从其他(tā)線(xiàn)程deque队尾获取線(xiàn)程

使用(yòng)RecursiveTask实现ForkJoin流程demo

public class ForkJoinPoolTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ForkJoinPool forkJoinPool = new ForkJoinPool();

        for (int i = 0; i < 10; i++) {

            ForkJoinTask task = forkJoinPool.submit(new Fibonacci(i));

            System.out.println(task.get());

        }

    }

    static class Fibonacci extends RecursiveTask<Integer> {

        int n;

        public Fibonacci(int n) {  this.n = n;  }

        @Override

        protected Integer compute() {

            if (n <= 1) { return n; }

            Fibonacci fib1 = new Fibonacci(n - 1);

            fib1.fork(); //相当于开启新(xīn)線(xiàn)程执行

            Fibonacci fib2 = new Fibonacci(n - 2);

            fib2.fork(); //相当于开启新(xīn)線(xiàn)程执行

            return fib1.join() + fib2.join(); //合并返回结果

        }

    }

}