转载于 低并发编程

问题

多个线程,一个共享变量,不断 +1

思路一

存在线程安全问题

1
2
3
4
5
6
7
8
9
public class AtomicTest {

private int count;

public void add() {
count++;
}

}

思路二

加锁实现,但效率太低

1
2
3
4
5
6
7
8
9
public class AtomicTest {

private int count;

public synchronized void add() {
count++;
}

}

思路三

无锁算法

1
2
3
4
5
6
7
8
9
public class AtomicTest {

private AtomicInteger i = new AtomicInteger();

public void increment() {
i.incrementAndGet();
}

}

更高级的玩法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class AtomicTest {

private volatile int c;

private static final Unsafe UNSAFE;
private static final long OBJECT_VALUE_OFFSET;

static {
try {
Field field = Unsafe.class.getDeclaredFields()[0];
field.setAccessible(true);
UNSAFE = (Unsafe) field.get(null);

OBJECT_VALUE_OFFSET = UNSAFE.objectFieldOffset(AtomicTest.class.getDeclaredField("c"));
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
throw new Error(e);
}
}

/**
* @HotSpotIntrinsicCandidate
* public final int getAndAddInt(Object o, long offset, int delta) {
* int v;
* do {
* v = getIntVolatile(o, offset);
* } while (!weakCompareAndSetInt(o, offset, v, v + delta));
* return v;
* }
*
* @author chengxudong chengxudong@microwu.com
* @date 2021/9/11 14:39
*
* @param
* @return void
*/
public void add2() {
int v;
do {
v = UNSAFE.getIntVolatile(this, OBJECT_VALUE_OFFSET);
} while (UNSAFE.compareAndSwapInt(this, OBJECT_VALUE_OFFSET, v, v + 1));

}

}

分析需求

极端场景,成百上千个线程一直连续不断对这个 count 进行 +1 操作,一直加上个一年,一年后,我们只需要看一下最终值即可。

我们以前的实现,每时每刻都将 +1 的操作真正计算了一遍,并赋值给 count。

但是我们只是一年以后要读取这个 count,显然,中间这一年对 count 值准确地计算出结果是不必要的。

恰恰是因为每次都要准确计算出它的结果,导致多线程之间发生了竞争,浪费了资源。

设计思路

事先搞出多个 count 变量,并且用某种方式让不同线程对应到不同 count 变量上。

这样的话,每个线程不存在竞争的问题,过一段时间,获取最终的值,只需要把它们相加即可。

牺牲读性能,换取写性能。用空间换时间。

具体实现

懒加载

首先,我们当然希望,整个过程都不存在线程竞争。

这样我们一开始就创建那么多 count,并且把线程一一映射过去,加入本来他们共同对同一个共享变量 +1 就不会产生竞争,那这种方式就有很大问题了:

  • 浪费空间
  • 多了线程映射的算法逻辑
  • 最终获取值还要相加

所以我们采用懒加载的方式,一开始,仍然是对同一个共享变量 +1,等真正出现竞争了,再开始启用更多的 count。

我们把一开始使用的唯一共享变量叫做 base,把之后开启的多个变量叫做 cell 类,放在一个 cell[] 数组里。cell 类只有一个变量 value,存储累加过程中的值。

数组扩容

一开始,这个 cell 数组是空的。

等 base 变量出现了一次竞争失败的情况,就初始化这个 cell[] 数组,第一里面放两个 cell。

此时,如果只有三个线程 + 1,就可以保证不会发生竞争。

但如果此时又来了一个线程,导致了竞争,即 CAS 失败,那么可以扩容 cell[] 数组。

扩容是翻倍的方式。

线程映射绑定

如何做到线程和 cell 数组中的每个 cell 是一一对应的关系?

我们在每个线程中维护一个局部变量,这个变量属于线程,这个变量的值根据 cell[] 数组的大小哈希取模,就可以映射到其中一个 cell 上了。

那同线程绑定的这个局部变量是怎么来的?

Thread 类的变量 probe

1
2
3
4
5
public class Thread implements Runnable {
...
int threadLocalRandomProbe;
...
}

但是我们不能直接获取,需要借助 ThreadLocalRandom 类获取

1
2
3
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}

当然,获取出的这个值,可能哈希取模后也会发生冲突

没关系,请注意,这只是哈希取模冲突,也就是多个线程可能要对同一个 cell 的 value 进行 CAS + 1 操作,但不一定会产生竞争。

所以,发生哈希取模冲突后,先尝试 CAS + 1 操作,如果能成功,就没那么多事了。

但假如恰好,CAS 的时候又发生了竞争,导致操作失败了怎么办?

还好,可以用这种方式为该线程的 probe 重新赋值。

1
2
probe = xxx;
UNSAFE.putInt(Thread.currentThread(), PROBE, probe);

重新赋值后的 probe,再次经过哈希取模后,就不会和之前的冲突了。

但很不幸,加入再冲突了怎么办?

那就再次尝试 CAS + 1 操作。

但假如有很不幸,CAS + 1 操作又失败了,那要不要重新赋值 probe 呢?

要,不过,此时说明竞争已经很激烈了。

简单说就是,这个 cell 数组有点拥挤了,此时我们选择将数组扩容!

当然,扩容也要有个限度,我们规定,数组大小超过 CPU 核心数后,就不再扩容了。

执行流程

  1. 最开始只有一个 base 变量,多个线程和谐地进行 CAS + 1 操作。
  2. 直到有一天,两个线程发生了竞争,即其中一个线程 CAS 失败了,那么就创建一个大小为 2 的 cell[] 数组,用线程私有的局部变量 probe 取模,映射到一个 cell 上,对其进行 CAS + 1 操作。
  3. probe 取模,发现那个 cell 已经被绑定过了,不要紧,先 CAS + 1 试一试。
  4. 如果失败,说明此处有竞争 ,那么重新计算一下线程的 probe 值,映射到一个新的 cell 上。
  5. 如果此时又冲突,并且 CAS + 1 失败,那么 cell[] 数组扩容。
  6. 最后当要获取最终的累计和时,用 base 的值,加上所有 cell[] 数组里的 value 值,得出一个和返回给调用方。

LongAdder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class LongAdder extends Striped64 implements Serializable {
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
// 已经初始化了 Cell 数组
// 或者对 base 变量 CAS +1 操作失败
// 就走到这里了
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
// 下面这行就是 probe 取模操作
(a = as[getProbe() & m]) == null ||
// 取模后发现已经被绑定,冲突了,先不管,直接 cas 试试
!(uncontended = a.cas(v = a.value, v + x)))
// 这里数组未初始化完全、或取模后未冲突、或冲突后 cas 失败,都会往下走
longAccumulate(x, null, uncontended);
}
}
}