第6章 Java并发容器和框架

前言

ConcurrentHashMap
ConcurrentLinkedQueue
BlockingQueue
Fork/Join🖊

ConcurrentHashMap

ConcurrentHashMap 是线程安全且高效的HashMap,它使用锁分段技术有效提高并发访问率。给每一段数据各一把锁,不同数据段之间的访问就不会存在锁竞争。

HashMap在多线程环境下的put可能会导致死循环,因为多线程会导致HashMap的Entry链表形成环形数据结构,Entry的next结点用不为空,导致死循环获取Entry。

HashTable使用的是synchronized来保证线程安全,效率低下。

ConcurrentHashMap结构

ConcurrentHashMap由 Segment 数组HashEntry 数组 组成

Segment 是一种可重入锁(ReetrantLock),是ConcurrentHashMap的锁。
Segment 的结构与HashMap相似,Segment数组,其中的一个Segment包含一个HashEntry数组,每一个HashEntry是一个链表结构的元素。每个Segemt守护着一个HashEntry数组里的元素,每次修改其中数据必须先获得对应的Segment锁。

ConcurrentHashMap结构图

ConcurrentHashMap类图

ConcurrentHashMap的实现原理

ConcurrentHashMap 的初始化

  1. 初始化 segments 数组

  2. 初始化 segmentShift 和 segmentMask

  3. 初始化每个 segment

定位 Segment

  1. 散列算法定位到 segment
  2. 对 hash 针对 hashCode 再散列,为了减少散列冲突,使元素均匀分配在不同的 Segment 上,从而提高存取效率。

ConcurrentHashMap操作

get()

get() 先经过一次再散列,在使用散列值定位 Segment,再通过散列算法定位到元素

  • get不需要加锁,因为所有将要使用的共享变量都定义为了volatile类型,能够保证线程之间的可见性。

put()

写操作必须加锁,因为需要对共享变量做修改。put() 方法首先拿到 Segment,然后在Segment里进行插入操作1. 判断是否需要扩容(只针对一个Segnent扩容两倍) 2. 判断存放的位置。

size()

Segemt里的全局变量count是个volatile变量。但求size时直接加会导致不一致的问题。
ConcurrentHashMap 的做法是先不加锁地统计两次各个 segment 的 count,若不一致才采用全加锁的方式统计

ConcurrentLinkedQueue

非阻塞的线程安全的无界链队列。

实现线程安全的队列有两种方法

  1. 阻塞算法:出队和入队共用一把锁(或两个锁)等方式实现
  2. 非阻塞算法:循环CAS的方式实现(ConcurrentLinkedQueue的实现方式)

ConcurrentLinkedQueue的结构

默认情况下head节点存储元素为空,tail节点等于head节点(注意,tail不总是尾节点,head也不总是头节点)

1
private transient volatile Node<E> tail = head;

入队列

  1. 定位尾节点(因为tail不总是尾节点,所以要通过tail再定位)
  2. 将入队节点设置为尾节点的下一个节点(循环CAS设置,不成功则重试)
  3. 更新 tail 节点:(在上一步CAS中顺带完成)
    • 如果 tail 节点的 next 不为空,则将入队节点设为tail
    • // 如果tail节点的 next 为空,则入队节点设为 tail 的 next 节点

由上看出,tail不总是尾节点,只有当tail的next不为空,才修改tail为尾节点

Java使用一个HOPS变量(默认为1)来控制更新tail的频率,以减少CAS更新tail来提高入队的效率。
当tail节点和尾节点的距离大于等于HOPS时,才会更新tail节点。

ConcurrentLinkedQueue入队示意

出队列

head节点也不是每次出队都会更新!

当head节点里有元素时,直接弹出head的元素,而不更新head节点,只有head节点里没有元素时,才会更新head节点。

与入队列同理,也是使用hops变量减少CAS更新head节点的消耗,从而提高出队效率。

ConcurrentLinkedQueue出队示意

阻塞队列 BlockingQueue

阻塞队列相比普通队列额外支持阻塞式插入和阻塞式移除的方法。

阻塞式插入:当队列满时,阻塞插入元素的线程,直到队列不满。

阻塞式移除:当队列为空时,获取元素的线程会等待队列变非空。

阻塞队列支持的插入和移除操作的4种处理方式:

方法 抛出异常 返回特殊值 阻塞 超时退出
插入方法 add(e) offer() put(e) offer(e, time, unit)
移除方法 remove() poll() take() poll(time, unit)
检查方法 element() peek() 不可用 不可用

记忆方式:offer和poll都有o, put和take都有t

如果是无界队列,put或offer永远不会被阻塞,使用offer时永远返回true

实现原理:等待-通知机制,用的是native的park方法和unpark方法

Java的7种阻塞队列

声明方式:new

  1. ArrayBlockingQueue:数组结构的有界阻塞队列
  2. LinkedBlockingQueue:链表结构的无界阻塞队列
  3. PriorityBlockingQueue:支持优先级排序的链表结构的无界优先队列
  4. DelayQueue:支持延时弹出的无界队列,底层实现是优先队列。在创建内部元素时可以指定多久之后才能取出。(元素必须实现Delayed接口)
  5. SynchronousQueue不储存元素的阻塞队列
  6. LinkedTransferQueue:链表结构的无界阻塞队列,用transfer将数据直接传给阻塞接收的线程,若没有消费者则将元素放在队列尾。
  7. LinkedBlockingDeque:链表结构的无界双向阻塞队列

1. ArrayBlockingQueue

数组实现的有界阻塞队列,排序规则是FIFO

支持公平性和非公平性(默认)的进入队列,公平性和非公平性是针对:等待在队列外的线程。

  • 公平性访问:队列外的线程先等待先访问(但是会降低吞吐量)公平性是用可重入锁实现的。

  • 非公平性访问:当队列可用时,阻塞的线程都可以争夺访问资格。

指定公平性的声明方式如下:

1
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000, true);

2. LinkedBlockingQueue

链表实现的无界阻塞队列,默认和最大长度是 Integer.MAX_VALUE。使用FIFO排序

3. PriorityBlockingQueue

支持优先级排序的无界阻塞队列,默认情况使用自然升序排列。可以自定义类实现compareTo()方法来指定元素排序规则,或者在构造函数中指定一个Comparator定制排序规则。(不稳定:不保证同优先级的元素顺序)

4. DelayQueue

支持延时获取元素的无界阻塞队列。内部使用 PriorityQueue 实现。

  • 队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才可以从队列中取出元素。

使用场景:

  • 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期。当取出元素时,说明缓存到期了
  • 定时任务调度:使用DelayQueue保存当天将要执行的任务和执行时间,一旦获取到就开始执行。

如何给元素实现Delayed接口

  1. 初始化基本数据,使用time记录延时时限。
  2. 实现getDelay方法,该方法返回当前元素还需要延时多长时间
  3. 实现compareTo方法指定顺序。如让延时最长的放在队列尾。

5. SynchronousQueue

不存储元素的阻塞队列,每一个put都必须等待另一个线程从队列 take() 元素,否则不能继续添加元素

支持公平性访问,默认使用非公平策略。

使用场景

适合传递性场景,相当于一个传球手。负责把生产者线程处理的数据直接交给消费者。

吞吐量高

6. LinkedTransferQueue

链表结构的无界阻塞 TransferQueue 队列,相比其他队列多了 tryTransfer() 和 transfer() 方法

(1)transfer() 方法

将元素立即传给正在 poll(time) 或正在 take() 而导致阻塞的线程,若没有正在阻塞接收数据的线程,则将数据放在队列尾。

(2)tryTransfer() 方法

试探数据能否直接传给消费者,如果不能返回false。

7. LinkedBlockingDeque

链表结构的双向阻塞队列。

双向队列英文多了一个操作队列的入口,在多线程同时入队时,可以减少一半的竞争。

相比其他队列多了addFirst、addLast等等方法。

使用场景

使用在Fork-Join框架的工作窃取中。当一个线程完成了自己的任务队列中的任务,可以从别的线程的任务队列里,从另一头获取任务执行,从而减轻别的线程的压力。

Fork/Join 框架

Java7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干小任务,最终汇总每个小任务结构或得到大任务的结果的框架。

工作窃取算法

提前完成任务的线程帮助其他线程完成其一一对应的队列里的任务(使用双向队列,一个从头获取任务,一个从尾取任务)

优点:充分利用线程进行并行计算,减少了线程之间的竞争。(但竞争不可避免,比如双端队列里只有一个任务时,反而更消耗资源,如线程和队列的消耗)

Fork/Join 框架的设计

Task.fork() 异步执行任务

Task.join() 阻塞当前线程并等待获取结果

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class CountTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 2; // 阈值
private int start;
private int end;

public CountTask(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
int sum = 0;
// 5. 如果任务足够小就计算任务
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) { // 5. 如果任务足够小就计算
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算
int middle = (start + end) / 2; // 1. 划分任务到子任务中
CountTask leftTask = new CountTask(start, middle); // 划分到子任务里
CountTask rightTask = new CountTask(middle + 1, end); // 划分到子任务里
// 执行子任务
leftTask.fork(); // 2. 执行子任务RecursiveTask.fork()
rightTask.fork(); // 2. 执行子任务RecursiveTask.fork()
// 等待子任务执行完,并得到其结果
int leftResult = leftTask.join(); // 3. join()计算,得到结果
int rightResult = rightTask.join(); // 3. join()计算,得到结果
// 合并子任务
sum = leftResult + rightResult; // 4. 合并子任务的结果
}
return sum; // 返回本次任务的值
}

public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 生成一个计算任务,负责计算1+2+3+4
CountTask task = new CountTask(1, 4);
// 执行一个任务
Future<Integer> result = forkJoinPool.submit(task);
try {
System.out.println(result.get());
} catch (InterruptedException | ExecutionException ignored) {}
}
}

本文标题:第6章 Java并发容器和框架

文章作者:Aaron.H

发布时间:2018年05月25日 - 19:05

最后更新:2018年09月10日 - 17:09

原始链接:https://uncleaaron.github.io/Blog/Java/Java并发编程艺术/第6章-Java并发容器和框架/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。