前言
ConcurrentHashMap
ConcurrentLinkedQueue
BlockingQueue
Fork/Join🖊
ConcurrentHashMap
ConcurrentHashMap 是线程安全且高效的HashMap,它使用锁分段技术有效提高并发访问率。给每一段数据各一把锁,不同数据段之间的访问就不会存在锁竞争。
HashMap在多线程环境下的put可能会导致死循环,因为多线程会导致HashMap的Entry链表形成环形数据结构,Entry的next结点用不为空,导致死循环获取Entry。
HashTable使用的是synchronized来保证线程安全,效率低下。
ConcurrentHashMap结构
ConcurrentHashMap由 Segment 数组和 HashEntry 数组 组成
Segment 是一种可重入锁(ReetrantLock),是ConcurrentHashMap的锁。
Segment 的结构与HashMap相似,Segment数组,其中的一个Segment包含一个HashEntry数组,每一个HashEntry是一个链表结构的元素。每个Segemt守护着一个HashEntry数组里的元素,每次修改其中数据必须先获得对应的Segment锁。
ConcurrentHashMap的实现原理
ConcurrentHashMap 的初始化
初始化 segments 数组
初始化 segmentShift 和 segmentMask
- 初始化每个 segment
定位 Segment
- 散列算法定位到 segment
- 对 hash 针对 hashCode 再散列,为了减少散列冲突,使元素均匀分配在不同的 Segment 上,从而提高存取效率。
ConcurrentHashMap操作
get()
get() 先经过一次再散列,在使用散列值定位 Segment,再通过散列算法定位到元素
- get不需要加锁,因为所有将要使用的共享变量都定义为了volatile类型,能够保证线程之间的可见性。
put()
写操作必须加锁,因为需要对共享变量做修改。put() 方法首先拿到 Segment,然后在Segment里进行插入操作1. 判断是否需要扩容(只针对一个Segnent扩容两倍) 2. 判断存放的位置。
size()
Segemt里的全局变量count是个volatile变量。但求size时直接加会导致不一致的问题。
ConcurrentHashMap 的做法是先不加锁地统计两次各个 segment 的 count,若不一致才采用全加锁的方式统计
ConcurrentLinkedQueue
非阻塞的线程安全的无界链队列。
实现线程安全的队列有两种方法
- 阻塞算法:出队和入队共用一把锁(或两个锁)等方式实现
- 非阻塞算法:循环CAS的方式实现(ConcurrentLinkedQueue的实现方式)
ConcurrentLinkedQueue的结构
默认情况下head节点存储元素为空,tail节点等于head节点(注意,tail不总是尾节点,head也不总是头节点)
1 | private transient volatile Node<E> tail = head; |
入队列
- 定位尾节点(因为tail不总是尾节点,所以要通过tail再定位)
- 将入队节点设置为尾节点的下一个节点(循环CAS设置,不成功则重试)
- 更新 tail 节点:(在上一步CAS中顺带完成)
- 如果 tail 节点的 next 不为空,则将入队节点设为tail
- // 如果tail节点的 next 为空,则入队节点设为 tail 的 next 节点
由上看出,tail不总是尾节点,只有当tail的next不为空,才修改tail为尾节点
Java使用一个HOPS变量(默认为1)来控制更新tail的频率,以减少CAS更新tail来提高入队的效率。
当tail节点和尾节点的距离大于等于HOPS时,才会更新tail节点。
出队列
head节点也不是每次出队都会更新!
当head节点里有元素时,直接弹出head的元素,而不更新head节点,只有head节点里没有元素时,才会更新head节点。
与入队列同理,也是使用hops变量减少CAS更新head节点的消耗,从而提高出队效率。
阻塞队列 BlockingQueue
阻塞队列相比普通队列额外支持阻塞式插入和阻塞式移除的方法。
阻塞式插入:当队列满时,阻塞插入元素的线程,直到队列不满。
阻塞式移除:当队列为空时,获取元素的线程会等待队列变非空。
阻塞队列支持的插入和移除操作的4种处理方式:
方法 | 抛出异常 | 返回特殊值 | 阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer() | put(e) | offer(e, time, unit) |
移除方法 | remove() | poll() | take() | poll(time, unit) |
检查方法 | element() | peek() | 不可用 | 不可用 |
记忆方式:offer和poll都有o, put和take都有t
如果是无界队列,put或offer永远不会被阻塞,使用offer时永远返回true
实现原理:等待-通知机制,用的是native的park方法和unpark方法
Java的7种阻塞队列
声明方式:new
- ArrayBlockingQueue:数组结构的有界阻塞队列
- LinkedBlockingQueue:链表结构的无界阻塞队列
- PriorityBlockingQueue:支持优先级排序的链表结构的无界优先队列
- DelayQueue:支持延时弹出的无界队列,底层实现是优先队列。在创建内部元素时可以指定多久之后才能取出。(元素必须实现Delayed接口)
- SynchronousQueue:不储存元素的阻塞队列
- LinkedTransferQueue:链表结构的无界阻塞队列,用transfer将数据直接传给阻塞接收的线程,若没有消费者则将元素放在队列尾。
- LinkedBlockingDeque:链表结构的无界双向阻塞队列
1. ArrayBlockingQueue
数组实现的有界阻塞队列,排序规则是FIFO
支持公平性和非公平性(默认)的进入队列,公平性和非公平性是针对:等待在队列外的线程。
公平性访问:队列外的线程先等待先访问(但是会降低吞吐量)公平性是用可重入锁实现的。
非公平性访问:当队列可用时,阻塞的线程都可以争夺访问资格。
指定公平性的声明方式如下:
1 | ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000, true); |
2. LinkedBlockingQueue
链表实现的无界阻塞队列,默认和最大长度是 Integer.MAX_VALUE。使用FIFO排序
3. PriorityBlockingQueue
支持优先级排序的无界阻塞队列,默认情况使用自然升序排列。可以自定义类实现compareTo()方法来指定元素排序规则,或者在构造函数中指定一个Comparator定制排序规则。(不稳定:不保证同优先级的元素顺序)
4. DelayQueue
支持延时获取元素的无界阻塞队列。内部使用 PriorityQueue 实现。
- 队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才可以从队列中取出元素。
使用场景:
- 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期。当取出元素时,说明缓存到期了
- 定时任务调度:使用DelayQueue保存当天将要执行的任务和执行时间,一旦获取到就开始执行。
如何给元素实现Delayed接口
- 初始化基本数据,使用time记录延时时限。
- 实现getDelay方法,该方法返回当前元素还需要延时多长时间
- 实现compareTo方法指定顺序。如让延时最长的放在队列尾。
5. SynchronousQueue
不存储元素的阻塞队列,每一个put都必须等待另一个线程从队列 take() 元素,否则不能继续添加元素
支持公平性访问,默认使用非公平策略。
使用场景
适合传递性场景,相当于一个传球手。负责把生产者线程处理的数据直接交给消费者。
吞吐量高
6. LinkedTransferQueue
链表结构的无界阻塞 TransferQueue 队列,相比其他队列多了 tryTransfer() 和 transfer() 方法
(1)transfer() 方法
将元素立即传给正在 poll(time) 或正在 take() 而导致阻塞的线程,若没有正在阻塞接收数据的线程,则将数据放在队列尾。
(2)tryTransfer() 方法
试探数据能否直接传给消费者,如果不能返回false。
7. LinkedBlockingDeque
链表结构的双向阻塞队列。
双向队列英文多了一个操作队列的入口,在多线程同时入队时,可以减少一半的竞争。
相比其他队列多了addFirst、addLast等等方法。
使用场景
使用在Fork-Join框架的工作窃取中。当一个线程完成了自己的任务队列中的任务,可以从别的线程的任务队列里,从另一头获取任务执行,从而减轻别的线程的压力。
Fork/Join 框架
Java7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干小任务,最终汇总每个小任务结构或得到大任务的结果的框架。
工作窃取算法
提前完成任务的线程帮助其他线程完成其一一对应的队列里的任务(使用双向队列,一个从头获取任务,一个从尾取任务)
优点:充分利用线程进行并行计算,减少了线程之间的竞争。(但竞争不可避免,比如双端队列里只有一个任务时,反而更消耗资源,如线程和队列的消耗)
Fork/Join 框架的设计
Task.fork() 异步执行任务
Task.join() 阻塞当前线程并等待获取结果
示例:
1 | public class CountTask extends RecursiveTask<Integer> { |