并发容器

并发容器概览

image-20200830152844873

ConcurrentHashMap

为什么HashMap是线程不安全的?

  • 同时put碰撞导致数据丢失
    • 主要问题出在addEntry方法的new Entry (hash, key, value, e),如果两个线程都同时取得了e,则他们下一个元素都是e,然后赋值给table元素的时候有一个成功有一个丢失。
  • 同时put扩容导致数据丢失 
    • 如果多个线程同时检测到元素个数超过数组大小* loadFactor ,这样就会发生多个线程同时对 Node 数组进行扩容,都在重新计算元素位置以及复制数据,但是最终只有一个线程扩容后的数组会赋给 table,也就是说其他线程的都会丢失,并且各自线程 put 的数据也丢失。
  • 死循环造成的CPU100%(HashMap在高并发下的死循环(仅在JDK7及以前存在))

JDK7中的ConcurrentHashMap

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment实际继承自可重入锁**(ReentrantLock)**,在ConcurrentHashMap里扮演锁的角色;HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,每个Segment里包含一个HashEntry数组,我们称之为table,每个HashEntry是一个链表结构的元素。

  • Segment数组
  • HashEntry组成
  • 和HashMap一样,仍然是数组加链表
/**
 * Segment 数组,存放数据时首先需要定位到具体的 Segment 中。
 */
final Segment<K,V>[] segments;
transient Set<K> keySet;
transient Set<Map.Entry<K,V>> entrySet;

Segment 是 ConcurrentHashMap 的一个内部类,主要的组成如下:

static final class Segment<K,V> extends ReentrantLock implements Serializable {
       private static final long serialVersionUID = 2249069246763182397L;
       // 和 HashMap 中的 HashEntry 作用一样,真正存放数据的桶
       transient volatile HashEntry<K,V>[] table;
       transient int count;
       transient int modCount;
       transient int threshold;
       final float loadFactor;
}

map1

  • 唯一的区别就是其中的核心数据如 value ,以及链表都是 volatile 修饰的,保证了获取时的可见性。
  • ConcurrentHashMap 采用了分段锁技术,其中 Segment 继承于 ReentrantLock。
  • 不会像HashTable那样不管是put还是get操作都需要做同步处理,理论上 ConcurrentHashMap 支持 CurrencyLevel (Segment 数组数量)的线程并发。
  • 每当一个线程占用锁访问一个 Segment 时,不会影响到其他的 Segment。

map
只要我们的hash值足够分散,那么每次put的时候就会put到不同的segment中去。 而segment自己本身就是一个锁,put的时候,当前segment会将自己锁住,此时其他线程无法操作这个segment, 但不会影响到其他segment的操作。这是锁分段带来的好处。

put实现:

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
    return s.put(key, hash, value, false);
}

当执行put方法插入数据时,根据key的hash值,在Segment数组中找到相应的位置,如果相应位置的Segment还未初始化,则通过CAS进行赋值,接着执行Segment对象的put方法通过加锁机制插入数据,实现如下:

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            }
            else {
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        unlock();
    }
    return oldValue;
}
  • 虽然HashEntry中的value是用volatile关键字修饰的,但是并不能保证并发的原子性,所以put操作仍然需要加锁处理。
  • 首先第一步的时候会尝试获取锁,如果获取失败肯定就是其他线程存在竞争,则利用 scanAndLockForPut() 自旋获取锁。

map2

  • 尝试获取自旋锁
  • 如果重试的次数达到了MAX_SCAN_RETRIES 则改为阻塞锁获取,保证能获取成功。

map3

  • 将当前的Segment中的table通过key的hashcode定位到HashEntry
  • 遍历该HashEntry,如果不为空则判断传入的key和当前遍历的key是否相等,相等则覆盖旧的value
  • 不为空则需要新建一个HashEntry并加入到Segment中,同时会先判断是否需要扩容
  • 最后会解除在1中所获取当前Segment的锁。

场景:线程A和线程B同时执行相同Segment对象的put方法
1、线程A执行tryLock()方法成功获取锁,则把HashEntry对象插入到相应的位置;
2、线程B获取锁失败,则执行scanAndLockForPut()方法,在scanAndLockForPut方法中,会通过重复执行tryLock()方法尝试获取锁,在多处理器环境下,重复次数为64,单处理器重复次数为1,当执行tryLock()方法的次数超过上限时,则执行lock()方法挂起线程B;
3、当线程A执行完插入操作时,会通过unlock()方法释放锁,接着唤醒线程B继续执行;

get逻辑:

public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

get 逻辑比较简单,只需要将 Key 通过 Hash 之后定位到具体的 Segment ,再通过一次 Hash 定位到具体的元素上。
由于 HashEntry 中的 value 属性是用 volatile 关键词修饰的,保证了内存可见性,所以每次获取时都是最新值。
ConcurrentHashMap 的 get 方法是非常高效的,因为整个过程都不需要加锁

JDK8中ConcurrentHashMap

在jdk1.8中ConcurrentHashMap抛弃了原有的 Segment 分段锁,利用CAS+Synchronized来确保线程安全,它的底层数据结构依然是数组+链表+红黑树。
跟HashMap很像,也把之前的HashEntry改成了Node,但是作用不变,把值和next采用了volatile去修饰,保证了可见性,并且也引入了红黑树,在链表大于一定值的时候会转换(默认是8)。
map4

基本属性

// node数组最大容量:2^30=1073741824
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认初始值,必须是2的幕数
private static final int DEFAULT_CAPACITY = 16;
//数组可能最大值,需要与toArray()相关方法关联
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//并发级别,遗留下来的,为兼容以前的版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 负载因子
private static final float LOAD_FACTOR = 0.75f;
// 链表转红黑树阀值,> 8 链表转换为红黑树
static final int TREEIFY_THRESHOLD = 8;
//树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo))
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;
// 2^15-1,help resize的最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 32-16=16,sizeCtl中记录size大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// forwarding nodes的hash值
static final int MOVED     = -1; 
// 树根节点的hash值
static final int TREEBIN   = -2; 
// ReservationNode的hash值
static final int RESERVED  = -3; 
// 可用处理器数量
static final int NCPU = Runtime.getRuntime().availableProcessors();
//存放node的数组
transient volatile Node<K,V>[] table;
/*控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义
 *当为负数时:-1代表正在初始化,-N代表有N-1个线程正在 进行扩容
 *当为0时:代表当时的table还没有被初始化
 *当为正数时:表示初始化或者下一次进行扩容的大小*/
private transient volatile int sizeCtl;

map5

  • 也将 1.7 中存放数据的 HashEntry 改为 Node,但作用都是相同的。
  • 其中的 val next 都用了 volatile 修饰,保证了可见性。

put实现

与HashMap.put()不同点:用CAS添加节点,用synchronized锁住头结点,扩容方面多线程辅助扩容。

map6

  • 根据key计算出hashcode
  • 判断是否需要进行初始化
  • f即为当前key定位出的Node,如果为空表示当前位置可以写入数据,利用CAS尝试写入,失败则自旋保证成功。
  • 如果当前位置的hashcode == MOVED == -1,则需要进行扩容
  • 如果都不满足,则利用synchronized锁写入数据
  • 如果数量大于TREEIFY_THRESHOLD 则要转换为红黑树。

get操作

map7

  • 根据计算出来的 hashcode 寻址,如果就在桶上那么直接返回值。
  • 如果是红黑树那就按照树的方式获取值。
  • 就不满足那就按照链表的方式遍历获取值。

总结

套路:

  • 谈谈你理解的 HashMap,讲讲其中的 get put 过程。
  • 1.8 做了什么优化?
  • 是线程安全的嘛?
  • 不安全会导致哪些问题?
  • 如何解决?有没有线程安全的并发容器?
  • ConcurrentHashMap 是如何实现的? 1.7、1.8 实现有何不同?为什么这么做?

CopyOnWriteArrayList 并发版ArrayList

代替Vector和SynchronizedList,就和ConcurrentHashMap代替SynchronizedMap的原因一样 Vector和SynchronizedList的锁的粒度太大,并发效率相对比较低,并且迭代时无法编辑 Copy-On-Write并发容器还包括CopyOnWriteArraySet,用来替代同步Set。

并发版ArrayList,底层结构也是数组,和ArrayList不同之处在于:当新增和删除元素时会创建一个新的数组,在新的数组中增加或者排除指定对象,最后用新增数组替换原来的数组。

  • 适用场景:由于读操作不加锁,写(增、删、改)操作加锁,因此适用于读多写少的场景。
  • 局限:由于读的时候不会加锁(读的效率高,就和普通ArrayList一样),读取的当前副本,因此可能读取到脏数据。如果介意,建议不用。
  • 原理:利用高并发往往是读多写少的特性,对读操作不加锁,对写操作,先复制一份新的集合,在新的集合上面修改,然后将新集合赋值给旧的引用,并通过volatile 保证其可见性,当然写操作的锁是必不可少的。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。

list1

  • 数据一致性问题:CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的数据,马上能读到,请不要使用CopyOnWrite容器。 
  • 内存占用问题:因为CopyOnWrite的写是复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存。

并发队列Queue 阻塞队列

list2

什么是阻塞队列?

  • 阻塞队列是具有阻塞功能的队列,所以它首先是一个队列,其次是具有阻塞功能。 通常,阻塞队列的一端是给生产者放数据用,另一端给消费者拿数据用。阻塞队列是线程安全的,所以生产者和消费者都可以是多线程
  • 阻塞功能:最有特色的两个带有阻塞功能的方法是
    • take()方法:获取并移除队列的头结点,一旦如果执行take的时候,队列里无数据,则阻塞,直到队列里有数据
    • put()方法:插入元素。但是如果队列已满,那么就无法继续插入,则阻塞,直到队列里有了空闲空间
  • 是否有界(容量有多大):这是一个非常重要的属性,无界队列意味着里面可以容纳非常多(Integer.MAX_VALUE,约为2的31次,是非常大的一个数,可以近似认为是无限容量) 阻塞队列和线程池的关系:阻塞队列是线程池的重要组成部分

主要方法介绍

阻塞方法:put,take
抛异常方法:add,remove, element(返回头元素)
返回空值方法:offer,poll,peek

ArrayBlockingQueue

ArrayBlockingQueue 是 BlockingQueue 接口的实现类。
使用数组实现的有界阻塞队列,按照FIFO的原则对元素排序;内部使用重入锁可实现公平访问。内部使用一个重入锁来控制并发修改操作,即同一时刻,只能进行放或取中的一个操作。初始化时,必须指定容量大小。
ArrayBlockingQueue 默认情况下不能保证线程访问队列的公平性,所谓公平性是指严格按照线程等待的绝对时间顺序,即最先等待的线程能够最先访问到 ArrayBlockingQueue。

LinkedBlockingQueue

LinkedBlockingQueue 底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用,同样满足FIFO的特性,与ArrayBlockingQueue 相比起来具有更高的吞吐量,为了防止 LinkedBlockingQueue 容量迅速增,损耗大量内存。通常在创建LinkedBlockingQueue 对象时,会指定其大小,如果未指定,容量等于Integer.MAX_VALUE。
内部使用两个重入锁来控制并发操作,即同一时刻,允许同时进行放和取。
list3

PriorityBlockingQueue 线程安全的优先队列

PriorityBlockingQueue 是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序进行排序,也可以通过自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化时通过构造器参数 Comparator 来指定排序规则。

SynchronousQueue 数据同步交换的队列

不存储元素的阻塞队列,每一个put操作必须等待一个take操作,否则不能继续添加元素,支持公平访问队列,非常适合传递性场景,即把生产者线程处理的数据直接传递给消费者线程,队列本身不存储任何元素。

DelayQueue

无界队列 延迟队列,根据延迟时间排序 元素需要实现Delayed接口,规定排序规则。

非阻塞并发队列

ConcurrentLinkedQueue 并发队列(基于链表)

基于链表实现的并发队列,使用乐观锁(CAS)保证线程安全。因为数据结构是链表,所以理论上是没有队列大小限制的,也就是说添加数据一定能成功。

Q.E.D.

知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议