- 行业动态 >
- 资讯详情
高并发一瞥,線(xiàn)程和線(xiàn)程池的总结
1 JAVA線(xiàn)程的实现原理(lǐ)
l java的線(xiàn)程是基于操作系统原生的線(xiàn)程模型(非用(yòng)户态),通过系统调用(yòng),将線(xiàn)程交给系统调度执行
l java線(xiàn)程拥有(yǒu)属于自己的虚拟机栈,当JVM将栈、程序计数器、工作内存等准备好后,会分(fēn)配一个系统原生線(xiàn)程来执行。Java線(xiàn)程结束,原生線(xiàn)程随之被回收
l 原生線(xiàn)程初始化完毕,会调Java線(xiàn)程的run方法。当JAVA線(xiàn)程结束时,则释放原生線(xiàn)程和Java線(xiàn)程的所有(yǒu)资源
l java方法的执行对应虚拟机栈的一个栈帧,用(yòng)于存储局部变量、操作数栈、动态链接、方法出口等
2 JAVA線(xiàn)程的生命周期
l New(新(xīn)建状态):用(yòng)new关键字创建線(xiàn)程之后,该線(xiàn)程处于新(xīn)建状态,此时仅由JVM為(wèi)其分(fēn)配内存,并初始化其成员变量
l Runnable(就绪状态):当调用(yòng)Thread.start方法后,该線(xiàn)程处于就绪状态。JVM会為(wèi)其分(fēn)配虚拟机栈等,然后等待系统调度
l Running(运行状态):处于就绪状态的線(xiàn)程获得CPU,执行run方法时,则線(xiàn)程处于运行状态
l Blocked(阻塞状态):阻塞状态是指線(xiàn)程放弃了cpu的使用(yòng)权(join,sleep函数的调用(yòng)),处于暂停止状态。Blocked状态的線(xiàn)程需要恢复到Runnable状态,才能(néng)再次被系统调度执行变成Running
l Terminated(線(xiàn)程死亡):線(xiàn)程正常run结束、或抛出一个未捕获的Throwable、调用(yòng)Thread.stop来结束该線(xiàn)程,都会导致線(xiàn)程的死亡
l java線(xiàn)程和linux線(xiàn)程的生命周期基本是一一对应了,就是多(duō)了new阶段
3 JAVA線(xiàn)程的几种常用(yòng)方法
l 線(xiàn)程启动函数
//Thread.java
//调用(yòng)start启动線(xiàn)程,进入Runnable状态,等待系统调度执行
public synchronized void start(){//synchronized同步执行
if (threadStatus != 0) //0 代表new状态,非0则抛出错误
throw new IllegalThreadStateException();
...
start0(); //本地方法方法 private native void start0()
...
}
//Running状态,新(xīn)線(xiàn)程执行的代码方法,可(kě)被子类重写
public void run() {
if (target != null) {
//target是Runnable,new Thread(Runnable)时传入
target.run();
}
}
l 線(xiàn)程终止函数
//Thread.java
@Deprecated public final void stop();
//中断線(xiàn)程
public void interrupt()
//判断的是当前線(xiàn)程是否处于中断状态
public static boolean interrupted()
l 用(yòng)stop会强行终止線(xiàn)程,导致線(xiàn)程所持有(yǒu)的全部锁突然释放(不可(kě)控制),而被锁突同步的逻辑遭到破坏。不建议使用(yòng)
l interrupt函数中断線(xiàn)程,但它不一定会让線(xiàn)程退出的。它比stop函数优雅,可(kě)控制
l 当線(xiàn)程处于调用(yòng)sleep、wait的阻塞状态时,会抛出InterruptedException,代码内部捕获,然后结束線(xiàn)程
l 線(xiàn)程处于非阻塞状态,则需要程序自己调用(yòng)interrupted()判断,再决定是否退出
l 其他(tā)常用(yòng)方法
//Thread.java
//阻塞等待其他(tā)線(xiàn)程
public final synchronized void join(final long millis)
//暂时让出CPU执行
public static native void yield();
//休眠一段时间
public static native void sleep(long millis) throws InterruptedException;
l start与run方法的區(qū)别
l start是Thread类的方法,从線(xiàn)程的生命周期来看,start的执行并不意味着新(xīn)線(xiàn)程的执行,而是让JVM分(fēn)配虚拟机栈,进入Runnable状态,start的执行还是在旧線(xiàn)程上
l run则是新(xīn)線(xiàn)程被系统调度,获取CPU时执行的方法,函数run则是继承Thread重写的run或者实现接口Runnable的run
l Thread.sleep与Object.wait區(qū)别
l Thread.sleep需要指定休眠时间,时间一到可(kě)继续运行;和锁机制无关,没有(yǒu)加锁也不用(yòng)释放锁
l Object.wait需要在synchronized中调用(yòng),否则报IllegalMonitorStateException错误。wait方法会释放锁,需要调用(yòng)相同锁对象Object.notify来唤醒線(xiàn)程
4 線(xiàn)程池及其优点
l 線(xiàn)程的每次使用(yòng)创建,结束销毁是非常巨大的开销。若用(yòng)缓存的策略(線(xiàn)程池),暂存曾经创建的線(xiàn)程,复用(yòng)这些線(xiàn)程,可(kě)以减少程序的消耗,提高線(xiàn)程的利用(yòng)率
l 降低资源消耗:重复利用(yòng)線(xiàn)程可(kě)降低線(xiàn)程创建和销毁造成的消耗
l 提高响应速度:当任務(wù)到达时,不需要等待線(xiàn)程创建就能(néng)立即执行
l 提高線(xiàn)程的可(kě)管理(lǐ)性:使用(yòng)線(xiàn)程池可(kě)以进行统一的分(fēn)配,监控和调优
5 JDK封装的線(xiàn)程池
//ThreadPoolExecutor.java
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
l 1 corePoolSize:核心線(xiàn)程数,線(xiàn)程池维持的線(xiàn)程数量
l 2 maximumPoolSize:最大的線(xiàn)程数,当阻塞队列不可(kě)再接受任務(wù)时且maximumPoolSize大于corePoolSize则会创建非核心線(xiàn)程来执行。但任務(wù)执行时,会被销毁
l 3 keepAliveTime:非核心線(xiàn)程在闲暇间的存活时间
l 4 TimeUnit:和keepAliveTime配合使用(yòng),表示keepAliveTime参数的时间单位
l 5 workQueue:任務(wù)的等待阻塞队列,正在执行的任務(wù)数超过corePoolSize时,加入该队列
l 6 threadFactory:線(xiàn)程的创建工厂
l 7 handler:拒绝策略,線(xiàn)程数达到了maximumPoolSize,还有(yǒu)任務(wù)提交则使用(yòng)拒绝策略处理(lǐ)
6 線(xiàn)程池原理(lǐ)之执行流程
//ThreadPoolExecutor.java
public void execute(Runnable command) {
...
if (workerCountOf(c) < corePoolSize) { //plan A
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //plan B
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//addWorker(command, false) false代表可(kě)创建非核心線(xiàn)程来执行任務(wù)
else if (!addWorker(command, false)) //plan C
reject(command); // //plan D
}
l plan A:任務(wù)的execute,先判断核心線(xiàn)程数量达到上限;否,则创建核心線(xiàn)程来执行任務(wù);是,则执行plan B
l plan B:当任務(wù)数大于核心数时,任務(wù)被加入阻塞队列,如果超过阻塞队列的容量上限,执行C
l plan C: 阻塞队列不能(néng)接受任務(wù)时,且设置的maximumPoolSize大于corePoolSize,创建新(xīn)的非核心線(xiàn)程执行任務(wù)
l plan D:当plan A、B、C都无能(néng)為(wèi)力时,使用(yòng)拒绝策略处理(lǐ)
7 阻塞队列的简单了解
l 队列的阻塞插入:当队列满时,队列会阻塞插入元素的線(xiàn)程,直到队列不满
l 队列的阻塞移除:当队列為(wèi)空时,获取元素的線(xiàn)程会等待队列变為(wèi)非空
l BlockingQueue提供的方法如下,其中put和take是阻塞操作
操作方法 |
抛出异常 |
返回特殊值 |
阻塞線(xiàn)程 |
超时退出 |
插入元素 |
add(e) |
offer(e) |
put(e) |
offer(e, timeout, unit) |
移除元素 |
remove() |
poll() |
take() |
pull(timeout, unit) |
检查 |
element() |
peek() |
无 |
无 |
l ArrayBlockingQueue
l ArrayBlockingQueue是用(yòng)数组实现的有(yǒu)界阻塞队列,必须指定队列大小(xiǎo),先进先出(FIFO)原则排队
l LinkedBlockingQueue
l 是用(yòng)链表实现的有(yǒu)界阻塞队列,如果构造LinkedBlockingQueue时没有(yǒu)指定大小(xiǎo),则默认是Integer.MAX_VALUE,无限大
l 该队列生产端和消费端使用(yòng)独立的锁来控制数据操作,以此来提高队列的并发性
l PriorityBlockingQueue
l public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)
l 基于数组,元素具有(yǒu)优先级的无界阻塞队列,优先级由Comparator决定
l PriorityBlockingQueue不会阻塞生产者,却会在没有(yǒu)可(kě)消费的任務(wù)时,阻塞消费者
l DelayQueue
l 支持延时获取元素的无界阻塞队列,基于PriorityQueue实现
l 元素必须实现Delayed接口,指定多(duō)久才能(néng)从队列中获取该元素。
l 可(kě)用(yòng)于缓存系统的设计、定时任務(wù)调度等场景的使用(yòng)
l SynchronousQueue
l SynchronousQueue是一种无缓冲的等待队列,添加一个元素必须等待被取走后才能(néng)继续添加元素
l LinkedTransferQueue
l 由链表组成的TransferQueue无界阻塞队列,相比其他(tā)队列多(duō)了tryTransfer和transfer函数
l transfer:当前有(yǒu)消费者正在等待元素,则直接传给消费者,否则存入队尾,并阻塞等待元素被消费才返回
l tryTransfer:试探传入的元素是否能(néng)直接传给消费者。如果没消费者等待消费元素,元素加入队尾,返回false
l LinkedBlockingDeque
l LinkedBlockingDeque是由链表构建的双向阻塞队列,多(duō)了一端可(kě)操作入队出队,少了一半的竞争,提高并发性
8 Executors的四种線(xiàn)程池浅析
//Executors.java
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
指定核心線(xiàn)程数,队列是LinkedBlockingQueue无界阻塞队列,永遠(yuǎn)不可(kě)能(néng)拒绝任務(wù);适合用(yòng)在稳定且固定的并发场景,建议線(xiàn)程设置為(wèi)CPU核数
l newCachedThreadPool
//Executors.java
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
l 核心池大小(xiǎo)為(wèi)0,線(xiàn)程池最大線(xiàn)程数為(wèi)最大整型,任務(wù)提交先加入到阻塞队列中,非核心線(xiàn)程60s没任務(wù)执行则销毁,阻塞队列為(wèi)SynchronousQueue。newCachedThreadPool会不断的创建新(xīn)線(xiàn)程来执行任務(wù),不建议用(yòng)
l newScheduledThreadPool
//Executors.java
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
//指定延迟执行时间
public <V> ScheduledFuture<V>
schedule(Callable<V> callable, long delay, TimeUnit unit)
l ScheduledThreadPoolExecutor(STPE)其实是ThreadPoolExecutor的子类,可(kě)指定核心線(xiàn)程数,队列是STPE的内部类DelayedWorkQueue。STPE的好处是 A 延时可(kě)执行任務(wù),B 可(kě)执行带有(yǒu)返回值的任務(wù)
l newSingleThreadExecutor
//Executors.java
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>())); //无界队列
}
l 和newFixedThreadPool构造方法一致,不过線(xiàn)程数被设置為(wèi)1了。SingleThreadExecutor比new个線(xiàn)程的好处是;線(xiàn)程运行时抛出异常的时候会有(yǒu)新(xīn)的線(xiàn)程加入線(xiàn)程池完成接下来的任務(wù);阻塞队列可(kě)以保证任務(wù)按FIFO执行
9 如果优雅地关闭線(xiàn)程池
l 線(xiàn)程池的关闭,就要先关闭池中的線(xiàn)程,上文(wén)第三点有(yǒu)提,暴力强制性stop線(xiàn)程会导致同步数据的不一致,因此我们要调用(yòng)interrupt关闭線(xiàn)程
l 而線(xiàn)程池提供了两个关闭方法,shutdownNow和shuwdown
l shutdownNow:線(xiàn)程池拒接收新(xīn)任務(wù),同时立马关闭線(xiàn)程池(进行中的任務(wù)会执行完),队列的任務(wù)不再执行,返回未执行任務(wù)List
public List<Runnable> shutdownNow() {
...
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //加锁
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers(); //interrupt关闭線(xiàn)程
tasks = drainQueue(); //未执行任務(wù)
...
l shuwdown:線(xiàn)程池拒接收新(xīn)任務(wù),同时等待線(xiàn)程池里的任務(wù)执行完毕后关闭線(xiàn)程池,代码和shutdownNow类似就不贴了
10 線(xiàn)程池為(wèi)什么使用(yòng)的是阻塞队列
先考虑下為(wèi)啥線(xiàn)程池的線(xiàn)程不会被释放,它是怎么管理(lǐ)線(xiàn)程的生命周期的呢(ne)
//ThreadPoolExecutor.Worker.class
final void runWorker(Worker w) {
...
//工作線(xiàn)程会进入一个循环获取任務(wù)执行的逻辑
while (task != null || (task = getTask()) != null)
...
}
private Runnable getTask(){
...
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
: workQueue.take(); //線(xiàn)程会阻塞挂起等待任務(wù),
...
}
可(kě)以看出,无任務(wù)执行时,線(xiàn)程池其实是利用(yòng)阻塞队列的take方法挂起,从而维持核心線(xiàn)程的存活
11 線(xiàn)程池的worker继承AQS的意义
//Worker class,一个worker一个線(xiàn)程
Worker(Runnable firstTask) {
//禁止新(xīn)線(xiàn)程未开始就被中断
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
final void runWorker(Worker w) {
....
//对应构造Worker是的setState(-1)
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
....
w.lock(); //加锁同步
....
try {
...
task.run();
afterExecute(task, null);
} finally {
....
w.unlock(); //释放锁
}
worker继承AQS的意义:A 禁止線(xiàn)程未开始就被中断;B 同步runWorker方法的处理(lǐ)逻辑
12 拒绝策略
l AbortPolicy 丢弃任務(wù)并抛出RejectedExecutionException异常
l DiscardOldestPolicy 丢弃队列最前面的任務(wù),然后重新(xīn)提交被拒绝的任務(wù)
l DiscardPolicy 丢弃任務(wù),但是不抛出异常
l CallerRunsPolicy
A handler for rejected tasks that runs the rejected task directly in the calling thread of the {@code execute} method, unless the executor has been shut down, in which case the task is discarded.
如果任務(wù)被拒绝了,则由提交任務(wù)的線(xiàn)程执行此任務(wù)
13 ForkJoinPool了解一波
l ForkJoinPool和ThreadPoolExecutor不同,它适合执行可(kě)以分(fēn)解子任務(wù)的任務(wù),如树的遍历,归并排序等一些递归场景
l ForkJoinPool每个線(xiàn)程有(yǒu)一个对应的双端队列deque;当線(xiàn)程中的任務(wù)被fork分(fēn)裂,分(fēn)裂出来的子任務(wù)会放入線(xiàn)程自己的deque,减少線(xiàn)程的竞争
l work-stealing工作窃取算法
当線(xiàn)程执行完自己deque的任務(wù),且其他(tā)線(xiàn)程deque还有(yǒu)多(duō)的任務(wù),则会启动窃取策略,从其他(tā)線(xiàn)程deque队尾获取線(xiàn)程
l 使用(yòng)RecursiveTask实现ForkJoin流程demo
public class ForkJoinPoolTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
for (int i = 0; i < 10; i++) {
ForkJoinTask task = forkJoinPool.submit(new Fibonacci(i));
System.out.println(task.get());
}
}
static class Fibonacci extends RecursiveTask<Integer> {
int n;
public Fibonacci(int n) { this.n = n; }
@Override
protected Integer compute() {
if (n <= 1) { return n; }
Fibonacci fib1 = new Fibonacci(n - 1);
fib1.fork(); //相当于开启新(xīn)線(xiàn)程执行
Fibonacci fib2 = new Fibonacci(n - 2);
fib2.fork(); //相当于开启新(xīn)線(xiàn)程执行
return fib1.join() + fib2.join(); //合并返回结果
}
}
}