banner
NEWS LETTER

Java并发编程(2)

Scroll down

ThreadLocal

ThreadLocal有什么用

  • 实现每个线程都有自己的专属本地变量,让每个线程绑定自己的值

如何使用ThreadLocal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class main{

private static ThreadLocal<String> threadLocal = new ThreadLocal<>();

public static void main(String[] args){
threadLocal.set("This is in main thread");
System.out.println("Thread Name: " + Thread.currentThread().getName() + "---" + threadLocal.get());

//Thread A
new Thread(new Runnable(){
@Override
public void run(){
threadLocal.set("This is in A thread");
System.out.println("Thread Name: " + Thread.currentThread().getName() + "---" + threadLocal.get());
}
},"A").start();

//Thread B
new Thread(new Runnable(){
@Override
public void run(){
threadLocal.set("This is in B thread");
System.out.println("Thread Name: " + Thread.currentThread().getName() + "---" + threadLocal.get());
}
},"B").start();

//Thread C
new Thread(new Runnable(){
@Override
public void run(){
threadLocal.set("This is in C thread");
System.out.println("Thread Name: " + Thread.currentThread().getName() + "---" + threadLocal.get());
}
},"C").start();
}
}

ThreadLocal原理

  • Thread类的源码
    • 持有threadLocalsinheritableThreadLocals两个变量,它们是ThreadLocalMap类型的
1
2
3
4
5
6
public class Thread implements Runnable{
//用于维护该线程的threadLocal
ThreadLocal.ThreadLocalMap threadLocals = null;

ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
}
  • ThreadLocal类的set()方法
    • 可以看出ThreadLocal类其实是对ThreadLocalMap的封装,传入的值最终会放在Thread类里的threadLocals这个变量里
    • threadLocals这个变量是ThreadLocalMap类型的,其中ThreadLocalMap又是ThreadLocal类的一个静态类(这里比较绕)
    • 最终Thread类的threadLocals里存放的是以<ThreadLocal, Object>的键值对
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void set(T value){
//获取当前线程
Thread t = Thread.currentThread();
//获取当前线程的threadlocals变量
ThreadLocalMap map = getMap(t);
if(map != null)
map.set(this,value); //存放进去,这里的this就是这个ThreadLocal对象
else
createMap(t,value);
}

ThreadLocalMap getMap(Thread t){
return t.threadLocals;
}
  • 一个线程可以有多个ThreadLocal实例,声明多个ThreadLocal变量即可,它们都存放在Tread类的threadLocals变量里,并以ThreadLocal的实例为键名取出对应的副本值

总结

ThreadLocal实现了对私有变量的封装,我们可以不用直接对ThreadThreadLocalMap进行操作,而只需像对待共享变量一样操作私有变量即可,因为封装在ThreadLocal里的方法会帮我们从对应的Thread类的threadLocals变量里获取或更改这个变量的值,其中threadLocals变量是ThreadLocalMap类型的,存放的是<ThreadLocal, Object>的键值对,ThreadLocal的实例以自身为键名访问线程的treadLocals变量以获得该私有变量在该线程下的副本值

ThreadLocal内存泄漏问题是怎么导致的

  • ThreadLocalMap中使用的key值为ThreadLocal的弱引用,而value是强引用
  • 如果ThreadLocal没有外部强引用的情况下,在垃圾回收的时候,key会被清理掉而value不会
  • ThreadLocalMap中会出现key为null的Entry,如果我们不做任何措施的话,value就无法被GC回收,因此会产生内存泄漏
  • ThreadLocalMap实现中考虑了该情况,在调用setgetremove方法时会清理掉key为null的记录,但是用了玩ThreadLocal方法后最好手动调用remove方法

线程池

什么是线程池

线程池是管理一系列线程的资源池。当有任务要处理时,直接从线程池获取线程来处理,处理完之后线程并不会立即被销毁,而是等待下一个任务

为什么要用线程池

  • 池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率

  • 线程池提供了一种限制和管理资源(包括执行一个任务)的方式,每个线程池还维护一些基本统计信息

  • 使用线程池的好处:

    • 降低资源消耗

    • 提高响应速度

    • 提高线程的可管理性

    • 提供更多更强大的功能

如何创建线程池

  1. 通过ThreadPoolExecutor构造函数来创建(推荐)
  2. 通过Java内置Executor框架的工具类Executors来创建(不推荐)
    • FixedThreadPool : 该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
    • SingleThreadExecutor 该方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
    • CachedThreadPool 该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。
    • ScheduledThreadPool :该返回一个用来在给定的延迟后运行任务或者定期执行任务的线程池

为什么不推荐使用工具类Executors创建线程池

《阿里巴巴Java开发手册》中不允许使用Executors创建,而是通过ThreadPoolExecutor构造函数的方式,这样让写的同学更加明确线程池的运行规则,规避资源耗尽的风险

Executors返回的线程池对象的弊端如下:

  • FixedThreadPoolSingleThreadExecutor : 使用的是无界的 LinkedBlockingQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
  • CachedThreadPool :使用的是同步队列 SynchronousQueue, 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。
  • ScheduledThreadPoolSingleThreadScheduledExecutor : 使用的无界的延迟阻塞队列DelayedWorkQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。

线程池常见参数有哪些

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 用给定的初始参数创建一个新的ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
  • 3个最重要的参数
    • corePoolSize:任务队列未达到队列容量时,最大可以同时运行的线程数量
    • maximumPoolSize:任务队列达到队列容量时,当前可以同时运行的线程数量变为最大线程数
    • workQueue:新任务来的时候先判断当前运行的线程数量是否达到核心线程数,如果达到则新任务会被存放在队列中
  • 其他常见参数
    • keepAliveTime:线程池中的线程数量大于corePoolSize的时候,如果没有新的任务提交,核心线程外的线程不会被立即销毁,待等待时间超过该参数后才会被回收销毁
    • unit:keepAliveTime参数的时间单位
    • threadFactory:executor创建新县城的时候用到
    • handler:饱和策略

线程池的饱和策略有哪些

线程池常用的阻塞队列有哪些

新任务到达时会先判断当前运行的线程数量是否达到核心线程数,如果达到则新任务会被存放在队列中

不同的线程池会选用不同的阻塞队列

  • 容量为Intger.MAX_VALUE的LinkedBlockingQueue(无界队列):FixedThreadPool和SingleThreadExector。由于队列永远不会被放满,因此FixesThreadPool最多只能创建核心线程数的线程
  • SychronousQueue(同步队列):CachedThreadPool。SynchronousQueue没有容量,不存储新元素,目的是保证对于提交的任务,如果有空闲线程,则使用空闲线程来处理,否则就新建一个线程来处理任务,也就是说CachedThreadPool的最大线程数是Integer.MAX_VALUE,可以理解为线程数是可以无限扩展的,可能会创建大量线程,从而导致OOM
  • DelayedWorkQueue(延迟阻塞队列):ScheduledThreadPool和SIngleThreadScheduledExecutor。DelayedWorkQueue的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是堆的数据结构,可以保证每次出队的任务是当前队列中执行时间最靠前的。DelayedWorkQueue添加元素满了之后会自动扩容到原来容量的1.5倍,即永远不会阻塞,最大扩容可达Integer.MAX_VALUE,所以最多只能创建核心线程数的线程

线程池处理任务的流程

如何给线程池命名

给线程池命名有利于定位问题

  1. 利用guava的ThreadFactoryBuilder

    1
    2
    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-%d").setDaemon(true).build();
    ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,KeepAliveTime,TimeUnit.MINUTES,workQueue,threadFactory);
  2. 自己实现ThreadFactory

如何设定线程池的大小

判断是CPU密集型任务(N+1)还是I/O密集型任务(2N)

线程数计算方法:N(CPU核数)* (线程运行总时间/ST(线程计算时间))

如何动态修改线程池的参数

配置三个核心参数:

  • corePoolSize:调用setCorePoolSize()方法

  • maximumPoolSize:调用setMaximumPoolSize()方法

  • workQueue:美团通过自定义一个ResizableCapacityLinkedBlockIngQueue的队列,主要是为了把LinkedBlockingQueue的capacity字段的final关键字去掉

线程池被创建后里面有线程吗?

线程池被创建后如果没有任务过来,里面是不会有线程的

如果需要预热的话可以调用下面两个方法:

  • prestartAllCoreThreads:全部启动

  • preastartCoreThread:仅启动一个

核心线程会被回收吗

核心线程默认是不会被回收的,如果需要回收核心线程数,需要调用:

其中allowCoreThreadTimeOut该值默认为false

Future

Future类有什么用

当主线程执行某一耗时任务时,可以将该耗时任务交给一个子线程去异步执行,主线程可以继续执行后续的任务,再通过Future类获取到耗时任务的执行结果,使得程序的执行效率获得明显提高

这是多线程中的一种设计模式,即Future模式,核心思想是异步调用

Java中Future类只是一个泛型接口,位于java.util.concurrent包下,定义了5个方法,包括4个功能

  • 取消任务
  • 判断任务是否被取消
  • 判断任务是否已经执行完成
  • 获取任务执行结果
1
2
3
4
5
6
7
public interface Future<V>{
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,TimeoutException
}

Callable和Future有什么关系

CompletableFuture类有什么用

CompletableFuture提供函数式编程、异步任务编排组合(可以将多个异步任务串联起来,组成一个完整的链式调用)等能力

人话:CompletableFuture支持添加回掉函数,即子线程执行完成后自行调用后续回掉函数,而不需要像Future类需要主线程去询问其是否执行完

CompletableFuture同时实现了FutureCompletionStage接口

CompletionStage接口描述了一个异步计算的阶段,很多计算可以分成多个阶段或步骤,此时可以通过它将所有步骤组合起来,形成异步计算的流水线

AQS

AQS是什么

AQS的全称为AbstarctQueuedSynchronizer,即抽象队列同步器

AQS是一个抽象类,主要用于构建锁和同步器

Semaphore有什么用

synchronizedReentrantLock都是一次只允许一个线程访问某个资源,而Semaphore可以用来控制同时访问特定资源的线程数量

Semaphore有公平模式(FIFO)和非公平模式(抢占式的)

Semaphore的原理是什么

Semaphore是共享锁的一种实现,它默认构造AQS的state值为permits

调用semaphore.acquire(),线程尝试获取许可证,如果state-1>=0,则表示可以获取,使用CAS操作修改state=state-1;如果state-1<0,则获取失败,此时创建一个Node节点加入阻塞队列,挂起当前线程

调用semaphore.release(),线程尝试释放许可证,使用CAS操作修改state=state+1。成功释放后会唤醒阻塞队列中的一个线程,被唤醒的线程会重新尝试修改state的值state=state-1,如果state>=0则获取成功,否则重新进入阻塞队列,挂起线程

CountDownLatch有什么用

CountDownLatch允许count个线程阻塞在一个地方,直至这count个线程都执行完成。

CountDownLatch一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,即当CountDownLatch使用完毕后,它不能再次被使用

CountDownLatch的原理是什么

  1. 线程使用CountDown()方法时,使用了tryReleaseShared方法以CAS的操作来减少state,直至state为0
  2. 当调用await()方法时,如果state不为0,说明所有的任务还没有执行完毕,await()方法就会一直阻塞,即await()方法后的语句不会被执行
  3. CountDownLatch会自旋CAS判断state==0,如果state==0就会释放所有等待的线程,await()方法之后的语句得到执行

CyclicBarrier有什么用

CyclicBarrier,即可循环使用(Cyclic)的屏障(Barrier)让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障,屏障才会开门,所有被屏障拦截的线程才会继续执行

CyclicBarrier的原理是什么

  1. CyclicBarrier以一个count变量作为计数器,count的初始值为parties属性的初始化值
  2. 每当一个线程到屏障这里,就将计数器减一
  3. 如果count的值为0,表示这是这一代最后一个线程到达屏障
  4. 然后尝试执行构造方法中输入的任务

参考文献

  1. https://zhuanlan.zhihu.com/p/158033837
  2. https://javaguide.cn/java/concurrent/java-concurrent-questions-03.html#%E4%BB%80%E4%B9%88%E6%98%AF%E7%BA%BF%E7%A8%8B%E6%B1%A0
  3. https://mp.weixin.qq.com/s?__biz=Mzg3NjU3NTkwMQ==&mid=2247505103&idx=1&sn=a041dbec689cec4f1bbc99220baa7219&source=41#wechat_redirect
  4. 《Java并发编程的艺术》