前言
相信大部分開發人員,或多或少都看過或寫過併發編程的代碼。併發關鍵字除了Synchronized,還有另一大分支Atomic。如果大家沒聽過沒用過先看基礎篇,如果聽過用過,請滑至底部看進階篇,深入源碼分析。
提出問題:int線程安全嗎?
看過Synchronized相關文章的小夥伴應該知道其是不安全的,再次用代碼應驗下其不安全性:
public class testInt {
static int number = 0;
public static void main(String[] args) throws Exception {
Runnable runnable = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 100000; i++) {
number = number+1;
}
}
};
Thread t1 = new Thread(runnable);
t1.start();
Thread t2 = new Thread(runnable);
t2.start();
t1.join();
t2.join();
System.out.println("number:" + number);
}
}
運行結果:
在上面的例子中,我們定義一個初始值為0的靜態變量number,再新建並運行兩個線程讓其各執行10萬次的自增操作,如果他是線程安全的,應該兩個線程執行后結果為20萬,但是我們發現最終的結果是小於20萬的,即說明他是不安全的。
在之前Synchronized那篇文章中說過,可以在number=number+1這句代碼上下加Synchronized關鍵字實現線程安全。但是其對資源的開銷較大,所以我們今天再看下另外一種實現線程安全的方法Atomic。
Atomic基礎篇分界線
原子整數(基礎類型)
整體介紹
Atomic是jdk提供的一系列包的總稱,這個大家族包括原子整數(AtomicInteger,AtomicLong,AtomicBoolean),原子引用(AtomicReference,AtomicStampedReference,AtomicMarkableReference),原子數組(AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray),更新器(AtomicIntegerFieldUpdater,AtomicLongFieldUpdater,AtomicReferenceFieldUpdater)。
AtomicInteger
AtomicInteger,AtomicBoolean,AtomicLong三者功能類似,咱就以AtomicInteger為主分析原子類。
先看下有哪些API,及其他們具體啥功能:
public class testInt {
public static void main(String[] args) {
//定義AtomicInteger類型的變量,值為1
AtomicInteger i = new AtomicInteger(1);
//incrementAndGet方法先新增1再返回,所以打印2,此時i為2
System.out.println(i.incrementAndGet());
//getAndIncrement方法先返回值再新增1,所以打印2,此時i為3
System.out.println(i.getAndIncrement());
//get方法返回當前i值,所以打印3,此時i為3
System.out.println(i.get());
//參數為正數即新增,getAndAdd方法先返回值再新增666,所以打印3,此時i為669
System.out.println(i.getAndAdd(666));
//參數為負數即減去,getAndAdd方法先返回值再減去1,所以打印669,此時i為668
System.out.println(i.getAndAdd(-1));
//參數為正數即新增,addAndGet方法先新增666再返回值,所以打印1334,此時i為1334
System.out.println(i.addAndGet(666));
//參數為負數即減去,addAndGet方法先減去-1再返回值,所以打印1333,此時i為1333
System.out.println(i.addAndGet(-1));
//getAndUpdate方法IntUnaryOperator參數是一個箭頭函數,後面可以寫任何操作,所以打印1333,此時i為13331
System.out.println(i.getAndUpdate(x -> (x * 10 + 1)));
//最終打印i為13331
System.out.println(i.get());
}
}
執行結果:
對上述int類型的例子改進
public class testInt {
//1.定義初始值為0的AtomicInteger類型變量number
static AtomicInteger number = new AtomicInteger(0);
public static void main(String[] args) throws Exception {
Runnable runnable = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 100000; i++) {
//2.調用incrementAndGet方法,實現加1操作
number.incrementAndGet();
}
}
};
Thread t1 = new Thread(runnable);
t1.start();
Thread t2 = new Thread(runnable);
t2.start();
t1.join();
t2.join();
System.out.println("number:" + number.get());
}
}
我們可以看到運行結果是正確的20萬,說明AtomicInteger的確保證了線程安全性,即在多線程的過程中,運行結果還是正確的。但是這存在一個ABA問題,下面將原子引用的時候再說,先立個flag。
源碼分析
我們以incrementAndGet方法為例,看下底層是如何實現的,AtomicInteger類中的incrementAndGet方法調用了Unsafe類的getAndAddInt方法。
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
我們看下getAndAddInt方法,裏面有個循環,直接值為compareAndSwapInt返回值為true,才結束循環。這裏就不得不提CAS,這就是多線程安全性問題的解決方法。
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
CAS
線程1和線程2同事獲取了主內存變量值0,線程1加1並寫入主內存,現在主內存變量值1,線程2也加2並嘗試寫入主內存,這個時候是不能寫入主內存的,因為會覆蓋掉線程1的操作,具體過程如下圖。
CAS是在線程2嘗試寫入內存的時候,通過比較並設置(CompareAndSet)發現現在主內存當前值為1,和他剛開始讀取的值0不一樣,所以他會放棄本次修改,重新讀取主內存的最新值,然後再重試下線程2的具體邏輯操作,再次嘗試寫入主內存。如果這時候線程1,再次對主內存進行了修改,線程2發現現在主內存的值又和預期不一樣,所以將放棄本次修改,再次讀取主內存最新值,再次重試並嘗試寫入主內存。我們可以發現這是一個重複比較的過程,即直到和預期初始值一樣,才會寫入主內存,否則將一直讀取重試的循環。這就是上面for循環的意義。
CAS的實現實際上利用了CPU指令來實現的,如果操作系統不支持CAS,還是會加鎖的,如果操作系統支持CAS,則使用原子性的CPU指令。
原子引用
在日常使用中,我們不止對上述基本類型進行原子操作,而是需要對一些複雜類型進行原子操作,所以需要AtomicReference。
不安全實現
先看不安全的BigDecimal類型:
public class testReference {
static BigDecimal number = BigDecimal.ZERO;
public static void main(String[] args) throws Exception {
Runnable runnable = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
number=number.add(BigDecimal.ONE);
}
}
};
Thread t1 = new Thread(runnable);
t1.start();
Thread t2 = new Thread(runnable);
t2.start();
t1.join();
t2.join();
System.out.println(number);
}
}
運行結果如下圖,我們可以看到兩個線程,自循環1000次加1操作,最終結果應該是2000,可是結果小於2000。
安全實現-使用CAS
public class testReference {
//定義AtomicReference類型BigDecimal變量
static AtomicReference<BigDecimal> number = new AtomicReference<BigDecimal>(BigDecimal.ZERO);
public static void main(String[] args) throws Exception {
Runnable runnable = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
//手動寫循環+CAS判斷
while(true){
BigDecimal pre=number.get();
BigDecimal next=number.get().add(BigDecimal.ONE);
if(number.compareAndSet(pre,next)) {
break;
}
}
}
}
};
Thread t1 = new Thread(runnable);
t1.start();
Thread t2 = new Thread(runnable);
t2.start();
t1.join();
t2.join();
System.out.println(number.get());
}
}
運行結果如下:
ABA問題及解決
在上面CAS過程中,是通過值比較來知曉是不是能夠更新成功,那如果線程1先加1再減1,這樣主內存還是原來的值,即線程2還是可以更新成功的。但是這樣邏輯錯了,線程1已經發生了修改,線程2不能直接更新成功。
代碼:
public class testInt {
static AtomicInteger number = new AtomicInteger(0);
public static void main(String[] args) throws Exception {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
int a = number.get();
System.out.println("開始number:" + a);
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(number.compareAndSet(a, a++));
}
});
t1.start();
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("開始增加操作");
int a = number.incrementAndGet();
System.out.println("當前number:" + a);
int b = number.decrementAndGet();
System.out.println("當前number:" + b);
}
});
t2.start();
t1.join();
t2.join();
}
}
我們看線程2對其進行了一系列操作,但是最後打印了還是true,表示可以更新成功的。這顯然不對。
那我們可以使用AtomicStampedReference,為其添加一個版本號。線程1在剛開始讀取主內存的時候,獲取到值為0,版本為1,線程2也獲取到這兩個值,線程1進行加1,減1的操作的時候,版本各加1,現在主內存的值為0,版本為2,而線程2還拿着預計值為0,版本為1的數據嘗試寫入主內存,這個時候因版本不同而更新失敗。具體我們用代碼試下:
public class testInt {
static AtomicStampedReference<Integer> number = new AtomicStampedReference<Integer>(0, 0);
public static void main(String[] args) throws Exception {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
int a = number.getReference();
int s = number.getStamp();
System.out.println("開始number:" + a + ",stamp:" + s);
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(number.compareAndSet(a, a + 1, s, s + 1));
}
});
t1.start();
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("開始增加操作");
int a = number.getReference();
int s = number.getStamp();
number.compareAndSet(a, a + 1, s, s + 1);
System.out.println("當前number:" + a + ",stamp:" + (s + 1));
a = number.getReference();
s = number.getStamp();
number.compareAndSet(a, a - 1, s, s + 1);
System.out.println("當前number:" + a + ",stamp:" + (s+1));
}
});
t2.start();
t1.join();
t2.join();
}
}
我們可以看到每次操作都會更新stamp(版本號),在最後對比的時候不僅比較值,還比較版本號,所以是不能更新成功的,false.
原子數組
AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray三者類似,所以以AtomicIntegerArray為例,我們可以將下面AtomicIntegerArray看做是AtomicInteger類型的數組,其底層很類似,就不詳細寫了。
AtomicIntegerArray array = new AtomicIntegerArray(10);
array.getAndIncrement(0); // 將第0個元素原子地增加1
AtomicInteger[] array = new AtomicInteger[10];
array[0].getAndIncrement(); // 將第0個元素原子地增加1
字段更新器和原子累加器比較簡單,這裏就不說了。
Atomic進階篇分界線
LongAdder源碼分析
LongAdder使用
LongAdder是jdk1.8之後新加的,那為什麼要加他?這個問題,下面將回答,我們先看下如何使用。
public class testLongAdder {
public static void main(String[] args) throws Exception {
LongAdder number = new LongAdder();
Runnable runnable = new Runnable() {
@Override
public void run() {
for (int j = 0; j < 10000; j++) {
number.add(1L);
}
}
};
Thread t1 = new Thread(runnable);
Thread t2 = new Thread(runnable);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("number:" + number);
}
}
我們可以看到LongAdder的使用和AtomicLong大致相同,使用兩個線程Thread1,Thread2對number值各進行一萬次的自增操作,最後的number是正確的兩萬。
與Atomic的對比優勢
那問題來了,既然AtomicLong能夠完成對多線程下的number進行線程安全的操作,那為什麼還要LongAdder?我們先來段代碼比較下,兩個在結果都是正確的前提下,性能方面的差距。
public class testLongAdder {
public static void main(String[] args) {
//1個線程,進行100萬次自增操作
test1(1,1000000);
//10個線程,進行100萬次自增操作
test1(10,1000000);
//100個線程,進行100萬次自增操作
test1(100,1000000);
}
static void test1(int threadCount,int times){
long startTime=System.currentTimeMillis();
AtomicLong number1=new AtomicLong();
List<Thread> threads1=new ArrayList<>();
for(int i=0;i<threadCount;i++) {
threads1.add(new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < times; j++) {
number1.incrementAndGet();
}
}
}));
}
threads1.forEach(thread -> thread.start());
threads1.forEach(thread ->{
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
} );
long endTime=System.currentTimeMillis();
System.out.println("AtomicLong:"+number1+",time:"+(endTime-startTime));
LongAdder number2=new LongAdder();
List<Thread> threads2=new ArrayList<>();
for(int i=0;i<threadCount;i++) {
threads2.add(new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < times; j++) {
number2.add(1);
}
}
}));
}
threads2.forEach(thread -> thread.start());
threads2.forEach(thread ->{
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
} );
System.out.println("LongAdder:"+number2+",time:"+(System.currentTimeMillis()-endTime));
}
}
上述代碼對比了1個線程,10個線程,100個線程在進行100百次自增操作后,AtomicLong和LongAdder所花費的時間。通過打印語句,我們發現在最終number1和number2都正確的基礎上,LongAdder花費的時間比AtomicLong少了一個量級。
源碼分析
那為什麼會導致這種情況,我們就要從源碼層面分析。AtomicLong為什麼效率低?因為如果線程數量一多,尤其在高併發的情況下,比如有100個線程同時想要對對象進行操作,肯定只有一個線程會獲取到鎖,其他99個線程可能空轉,一直循環知道線程釋放鎖。如果該線程操作完畢釋放了鎖,其他99個線程再次競爭,也只有一個線程獲取鎖,另外98個線程還是空轉,直到鎖被釋放。這樣CAS操作會浪費大量資源在空轉上,從而使得AtomicLong在線程數越來越多的情況下越來越慢。
AtomicLong是多個線程對同一個value值進行操作,導致多個線程自旋次數太多,性能降低。而LongAdder在無競爭的情況,跟AtomicLong一樣,對同一個base進行操作,當出現競爭關係時則是採用化整為零的做法,從空間換時間,用一個數組cells,將一個value拆分進這個數組cells。多個線程需要同時對value進行操作時候,可以對線程id進行hash得到hash值,再根據hash值映射到這個數組cells的某個下標,再對該下標所對應的值進行自增操作。當所有線程操作完畢,將數組cells的所有值和無競爭值base都加起來作為最終結果。
我們先看下LongAdder裏面的字段,發現其裏面沒有,主要是在其繼承的Stripped64類中,有下面四個主要變量。
/** CPU數量,即cells數組的最大長度*/
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
*cells數組,為2的冪,2,4,8,16.....,方便以後位運算
*/
transient volatile Cell[] cells;
/**
* 基值,主要用於沒有競爭的情況,通過CAS更新。
*/
transient volatile long base;
/**
* 調整單元格大小(擴容),創建單元格時使用的鎖。
*/
transient volatile int cellsBusy;
下面是add方法開始。
public void add(long x) {
//as:cells數組的引用
//b:base的基礎值
//v:期望值
//m:cells數組大小
//a:當前數組命中的單元
Cell[] as; long b, v; int m; Cell a;
//as不為空(cells已經初始化過,說明之前有其他線程對初始化)或者CAS操作不成功(線程間出現競爭)
if ((as = cells) != null || !casBase(b = base, b + x)) {
//初始化uncontented,true表示未競爭(因為有兩個情況,這裏先初始化,後面對其修改,就能區分這兩種情況)
boolean uncontended = true;
//as等於null(cells未初始化)
//或者線程id哈希出來的下標所對應的值為空(cell等於空),getProbe() & m功能是獲取下標,底層邏輯是位運算
//或者更新失敗為false,即發生競爭,取非就為ture
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
//進入到if裏面,說明更新case失敗,或者更新某個cell也失敗了,或者cell為空,或者cells為空
longAccumulate(x, null, uncontended);
}
}
從LongAdder調用Stripped64的longAccumulate方法,主要是初始化cells,cells的擴容,多個線程同時命中一個cell的競爭操作。
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
//x:add的值,fn:為null,wasUncontended:是否發生競爭,true為發生競爭,false為不發生競爭
int h;//線程的hash值
//如果該線程為0,即第一次進來,所以ThreadLocalRandom強制初始化線程id,再對其hash
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current();
h = getProbe();
wasUncontended = true;
}
//擴容意向,為false肯定不擴容,為true可能擴容
boolean collide = false;
//死循環
for (;;) {
//as:cells數組的引用
//a:當前線程命中的cell
//n:cells的長度
//v:當前線程命中的cell所擁有的value值
Cell[] as; Cell a; int n; long v;
//cells不為空
if ((as = cells) != null && (n = as.length) > 0) {
//當前線程命中的cell為空,下面邏輯是新增cell
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
//發生競爭
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//沒有競爭,嘗試修改當前線程對應的cell值,成功跳出循環
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
//如果n大於CPU最大數量,不可擴容
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
//獲取到了鎖,進行擴容,為2的冪,
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];//左移一位運算符,數量加倍
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
//cells等於空,並且獲取到鎖,開始初始化工作,創建結束釋放鎖,繼續循環
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
結語
結束了,撒花。這篇主要說了Atomic的一些使用,包括Atomic原子類(AtomicInteger,AtomicLong,AtomicBoolean),Atomic原子引用(AtomicReference,AtomicStampedReference),以及1.8之後LongAdder的優勢,源碼分析。過程還穿插了一些CAS,ABA問題引入和解決方式。
參考資料
Java多線程進階(十七)—— J.U.C之atomic框架:LongAdder
CAS原理
Java 8 Performance Improvements: LongAdder vs AtomicLong
AtomicInteger深入理解
原子操作類AtomicInteger詳解
玩轉Java併發工具,精通JUC,成為併發多面手
本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】
※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面
※網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!
※想知道最厲害的網頁設計公司"嚨底家"!
※別再煩惱如何寫文案,掌握八大原則!
※產品缺大量曝光嗎?你需要的是一流包裝設計!
※回頭車貨運收費標準
※台中搬家公司費用怎麼算?