本篇文章给大家谈谈批八字实战,以及批八字术语的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
注:本篇基于JDK1.8。
为什么要使用ConcurrentHashMap?在思考这个问题之前,我们可以思考:如果不用ConcurrentHashMap的话,有哪些其他的容器供我们选择呢?并且它们的缺陷是什么?
哈希表利用哈希算法能够花费O(1)的时间复杂度高效地根据key找到value值,能够满足这个需求的容器还有HashTable和HashMap。
HashTable
HashTable使用synchronized关键字保证了多线程环境下的安全性,但加锁的实现方式是独占式的,所有访问HashTable的线程都必须竞争同一把锁,性能较为低下。
public synchronized V put(K key, V value) {
// ...
}
HashMap
JDK1.8版本的HashMap在读取hash槽的时候读取的是工作内存中引用指向的对象,在多线程环境下,其他线程修改的值不能被及时读到。
关于HashMap的源码解析:【JDK1.8】Java集合源码学习系列:HashMap源码详解
ConcurrentHashMap的结构特点Java8之前这就引发出可能存在的一些问题: 比如在插入操作的时候,第一次将会根据key的hash值判断当前的槽内是否被占用,如果没有的话就会插入value。在并发环境下,如果A线程判断槽未被占用,在执行写入操作的时候正巧时间片耗尽,此时B线程正巧也执行了同样的操作,率先插入了B的value,此时A正巧被CPU重新调度继续执行写入操作,进而将线程B的value覆盖。 还有一种情况是在同一个hash槽内,HashMap总是保持key唯一,在插入的时候,如果存在key,就会进行value覆盖。并发情况下,如果A线程判断最后一个节点仍未发现重复的key,那么将会执行插入操作,如果B线程在A判断和插入之间执行了同样的操作,也会发生数据的覆盖,也就是数据的丢失。 当然,像这样的并发问题其实还有一些,这里就不细说了,刚兴趣的小伙伴可以查阅下资料。
在Java8之前,底层采用Segment+HashEntry的方式实现。
采用分段锁的概念,底层使用Segment数组,Segment通过继承ReentrantLock来进行加锁,每次需要加锁的操作会锁住一个segment,分段保证每个段是线程安全的。
Java8之后JDK1.8之后采用CAS + Synchronized的方式来保证并发安全。
采用【Node数组】加【链表】加【红黑树】的结构,与HashMap类似。
基本常量过一遍即可,不用过于纠结,有些字段也许是为了兼容Java8之前的版本。
/* ---------------- Constants -------------- */
// 允许的最大容量
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认初始值16,必须是2的幂
private static final int DEFAULT_CAPACITY = 16;
// toArray相关方法可能需要的量
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//为了和Java8之前的分段相关内容兼容,并未使用
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 负载因子
private static final float LOAD_FACTOR = 0.75f;
// 链表转红黑树阀值> 8 链表转换为红黑树
static final int TREEIFY_THRESHOLD = 8;
// 树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo))
static final int UNTREEIFY_THRESHOLD = 6;
// 链表树化的最小容量 treeifyBin的时候,容量如果不足64,会优先选择扩容到64
static final int MIN_TREEIFY_CAPACITY = 64;
// 每一步最小重绑定数量
private static final int MIN_TRANSFER_STRIDE = 16;
// sizeCtl中用于生成标记的位数
private static int RESIZE_STAMP_BITS = 16;
// 2^15-1,help resize的最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 32-16=16,sizeCtl中记录size大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// forwarding nodes的hash值
static final int MOVED = -1;
// 树根节点的hash值
static final int TREEBIN = -2;
// ReservationNode的hash值
static final int RESERVED = -3;
// 提供给普通node节点hash用
static final int HASH_BITS = 0x7fffffff;
// 可用处理器数量
static final int NCPU = Runtime.getRuntime().availableProcessors();
重要成员变量
/* ---------------- Fields -------------- */
// 就是我们说的底层的Node数组,懒初始化的,在第一次插入的时候才初始化,大小需要是2的幂
transient volatile Node
构造方法
和HashMap一样,table数组的初始化是在第一次插入的时候才进行的。
/**
* 创建一个新的,空的map,默认大小为16
*/
public ConcurrentHashMap() {
}
/**
* 指定初始容量
*/
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
// hashmap中讲过哦,用来返回的是大于等于传入值的最小2的幂次方
// https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/104095675#tableSizeFor_105
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
// sizeCtl 初始化为容量
this.sizeCtl = cap;
}
/**
* 接收一个map对象
*/
public ConcurrentHashMap(Map extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
/**
* 指定初始容量和负载因子
* @since 1.6
*/
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
/**
* 最全的:容量、负载因子、并发级别
*/
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
这里的tableSizeFor方法在HashMap中有解析过:
https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/104095675#tableSizeFor_105
我们通过注解可以知道,这个方法返回的是大于等于传入值的最小2的幂次方(传入1时,为1)。它到底是怎么实现的呢,我们来看看具体的源码:
static final int tableSizeFor(int cap) {
int n = cap - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
123456789
说实话,我在看到这个方法具体实现之后,感叹了一句,数学好牛!我通过代入具体数字,翻阅了许多关于这部分的文章与视频,通过简单的例子,来做一下总结。
我们先试想一下,我们想得到比n大的最小2次幂只需要在最高位的前一位置1,后面全置0就ok了吧。如0101代表的是5,1000就符合我们的需求为8。
我们再传入更大的数,为了写着方便,这里就以8位为例:
第一步int n = cap -1这一步其实是为了防止cap本身为2的幂次的情况,如果没有这一步的话,在一顿操作之后,会出现翻倍的情况。比如传入为8,算出来会是16,所以事先减去1,保证结果。
最后n<0的情况的判定,排除了传入容量为0的情况。
n>=MAXIMUM_CAPACITY的情况的判定,排除了移位和或运算之后全部为1的情况。
put方法存值putValpublic V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 对key进行散列计算 : (h ^ (h >>> 16)) & HASH_BITS;
int hash = spread(key.hashCode());
// 记录链表长度
int binCount = 0;
for (Node
其实你会发现,如果你看过HashMap的源码,理解ConcurrentHashMap的操作其实还是比较清晰的,相信看下来你已经基本了解了。接下来将会具体分析一下几个关键的方法
initTable采用延迟初始化,第一次put的时候,调用initTable()初始化Node数组。
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node
初始化的并发问题如何解决呢?
通过volatile的sizeCtl变量进行标识,在第一次初始化的时候,如果有多个线程同时进行初始化操作,他们将会判断sizeCtl是否小于0,小于0表示已经有其他线程在进行初始化了。
因为获取到初始化权的线程已经通过cas操作将sizeCtl的值改为-1了,且volatile的特性保证了变量在各个线程之间的可见性。
接着,将会创建合适容量的数组,并将sizeCtl的值设置为cap*loadFactor。
treeifyBin这部分包含链表转红黑树的逻辑,当然,需要满足一些前提条件:
首先当然是需要链表的节点数量>=TREEIFY_THRESHOLD的时候啦,默认是8。
if (binCount != 0) {
// 判断是否需要将链表转换为红黑树 节点数 >= 8的时候
if (binCount >= TREEIFY_THRESHOLD)
// 树化
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
其实还有一个条件,就是:如果数组长度< MIN_TREEIFY_CAPACITY的时候,会优先调用tryPresize进行数组扩容。
private final void treeifyBin(Node
tryPresize
数组扩容操作一般都是核心,仔细看看。
private final void tryPresize(int size) {
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);// tableSizeFor(1.5*size)
int sc;
while ((sc = sizeCtl) >= 0) {
Node
resizeStamp(int n)方法可以参照
https://blog.csdn.net/tp7309/article/details/76532366的解析。
这个方法涉及到数据迁移的操作,支持并发执行,第一个发起数据迁移的线程,nextTab参数传null,之后再调用此方法时,nextTab不会为null。
并发执行实现:使用stride将一次迁移任务拆分成一个个的小任务,第一个发起数据迁移的线程将会将transferIndex指向原数组最后的位置,然后从后向前的stride分任务属于第一个线程,再将transferIndex指向新的位置,再往前的stride个任务属于第二个线程,依次类推。
private final void transfer(Node
get方法取值get
get方法相对来说就简单很多了,根据key计算出的hash值,找到对应的位置,判断头节点是不是要的值,不是的话就从红黑树或者链表里找。
public V get(Object key) {
Node
可以看到get方法是无锁的,通过volatile修饰的next来每次都获取最新的值。
总结再提一下吧: volatile能够保证变量在线程之间的可见性,能够被多线程同时读且保证不会读到过期的值,因为为根据Java内存模型的happen before原则,对volatile字段的写入操作先于读操作,即使两个线程同时修改和获取volatile变量,get操作也能拿到最新的值。 但只能被单线程写,【有一种情况可以被多线程写,就是写入的值不依赖于原值】,读操作是不需要加锁的。
ConcurrentHashMap在JDK1.7和JDK1.8的实现思路上发生比较大的变化:JDK1.7采用Segment+HashEntry的方式和分段锁的概念实现。JDK1.8放弃了锁分段的概念,使用Node + CAS + Synchronized 的方式实现并发,使用数组+链表+红黑树的结构。
put操作会进行如下判断:如果没有初始化,先进行初始化[懒初始化],默认容量为16,同时设置了SizeCtl。如果tab数组对应的hash槽位置上没有节点,CAS操作给该位置赋值,成功则跳出循环。如果当前插槽节点正处于迁移状态即f.hash == MOVED,则先帮助节点完成迁移操作。发生hash冲突,率先使用synchronized锁住首节点,接下来判断是链表节点或是红黑树节点,找到合适的位置,插入或覆盖值。如果节点数量超过树化的阈值8,且数组容量也达到树化的阈值64,进行树化。
当某个桶的位置的节点数量超过8,但是数组容量没有达到64时,会先进行扩容操作n*2,执行tryPresize。
扩容操作涉及到transfer数据迁移,支持并发,通过将数据迁移分为stride个小任务,通过transferIndex和nextBound两个指针来分配任务。
get操作不需要加锁,根据key计算出的hash值,找到对应的位置,判断头节点是不是要的值,不是的话就从红黑树或者链表里找。
原文链接:
https://www.cnblogs.com/summerday152/p/14318494.html
如果觉得本文对你有帮助,可以转发关注支持一下