一篇文章快速搞懂 Atomic(原子整數/CAS/ABA/原子引用/原子數組/LongAdder)

前言

相信大部分開發人員,或多或少都看過或寫過併發編程的代碼。併發關鍵字除了Synchronized,還有另一大分支Atomic。如果大家沒聽過沒用過先看基礎篇,如果聽過用過,請滑至底部看進階篇,深入源碼分析。

提出問題:int線程安全嗎?

看過Synchronized相關文章的小夥伴應該知道其是不安全的,再次用代碼應驗下其不安全性:

public class testInt {
    static int number = 0;

    public static void main(String[] args) throws Exception {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 100000; i++) {
                    number = number+1;
                }
            }
        };

        Thread t1 = new Thread(runnable);
        t1.start();
        Thread t2 = new Thread(runnable);
        t2.start();

        t1.join();
        t2.join();
        System.out.println("number:" + number);
    }
}

 

 

運行結果:

在上面的例子中,我們定義一個初始值為0的靜態變量number,再新建並運行兩個線程讓其各執行10萬次的自增操作,如果他是線程安全的,應該兩個線程執行后結果為20萬,但是我們發現最終的結果是小於20萬的,即說明他是不安全的。

在之前Synchronized那篇文章中說過,可以在number=number+1這句代碼上下加Synchronized關鍵字實現線程安全。但是其對資源的開銷較大,所以我們今天再看下另外一種實現線程安全的方法Atomic。

Atomic基礎篇分界線

原子整數(基礎類型)

整體介紹

Atomic是jdk提供的一系列包的總稱,這個大家族包括原子整數(AtomicInteger,AtomicLong,AtomicBoolean),原子引用(AtomicReference,AtomicStampedReference,AtomicMarkableReference),原子數組(AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray),更新器(AtomicIntegerFieldUpdater,AtomicLongFieldUpdater,AtomicReferenceFieldUpdater)。

AtomicInteger

AtomicInteger,AtomicBoolean,AtomicLong三者功能類似,咱就以AtomicInteger為主分析原子類。

先看下有哪些API,及其他們具體啥功能:

public class testInt {

    public static void main(String[] args) {
        //定義AtomicInteger類型的變量,值為1
        AtomicInteger i = new AtomicInteger(1);
        //incrementAndGet方法先新增1再返回,所以打印2,此時i為2
        System.out.println(i.incrementAndGet());
        //getAndIncrement方法先返回值再新增1,所以打印2,此時i為3
        System.out.println(i.getAndIncrement());
        //get方法返回當前i值,所以打印3,此時i為3
        System.out.println(i.get());
        //參數為正數即新增,getAndAdd方法先返回值再新增666,所以打印3,此時i為669
        System.out.println(i.getAndAdd(666));
        //參數為負數即減去,getAndAdd方法先返回值再減去1,所以打印669,此時i為668
        System.out.println(i.getAndAdd(-1));
        //參數為正數即新增,addAndGet方法先新增666再返回值,所以打印1334,此時i為1334
        System.out.println(i.addAndGet(666));
        //參數為負數即減去,addAndGet方法先減去-1再返回值,所以打印1333,此時i為1333
        System.out.println(i.addAndGet(-1));
        //getAndUpdate方法IntUnaryOperator參數是一個箭頭函數,後面可以寫任何操作,所以打印1333,此時i為13331
        System.out.println(i.getAndUpdate(x -> (x * 10 + 1)));
        //最終打印i為13331
        System.out.println(i.get());
    }
} 

 

 

執行結果:

對上述int類型的例子改進

public class testInt {
    //1.定義初始值為0的AtomicInteger類型變量number
    static AtomicInteger number = new AtomicInteger(0);

    public static void main(String[] args) throws Exception {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 100000; i++) {
                    //2.調用incrementAndGet方法,實現加1操作
                    number.incrementAndGet();
                }
            }
        };

        Thread t1 = new Thread(runnable);
        t1.start();
        Thread t2 = new Thread(runnable);
        t2.start();

        t1.join();
        t2.join();
        System.out.println("number:" + number.get());
    }
}

 

 

我們可以看到運行結果是正確的20萬,說明AtomicInteger的確保證了線程安全性,即在多線程的過程中,運行結果還是正確的。但是這存在一個ABA問題,下面將原子引用的時候再說,先立個flag。

源碼分析

我們以incrementAndGet方法為例,看下底層是如何實現的,AtomicInteger類中的incrementAndGet方法調用了Unsafe類的getAndAddInt方法。

public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

 

我們看下getAndAddInt方法,裏面有個循環,直接值為compareAndSwapInt返回值為true,才結束循環。這裏就不得不提CAS,這就是多線程安全性問題的解決方法。

public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
}

 

CAS

線程1和線程2同事獲取了主內存變量值0,線程1加1並寫入主內存,現在主內存變量值1,線程2也加2並嘗試寫入主內存,這個時候是不能寫入主內存的,因為會覆蓋掉線程1的操作,具體過程如下圖。

CAS是在線程2嘗試寫入內存的時候,通過比較並設置(CompareAndSet)發現現在主內存當前值為1,和他剛開始讀取的值0不一樣,所以他會放棄本次修改,重新讀取主內存的最新值,然後再重試下線程2的具體邏輯操作,再次嘗試寫入主內存。如果這時候線程1,再次對主內存進行了修改,線程2發現現在主內存的值又和預期不一樣,所以將放棄本次修改,再次讀取主內存最新值,再次重試並嘗試寫入主內存。我們可以發現這是一個重複比較的過程,即直到和預期初始值一樣,才會寫入主內存,否則將一直讀取重試的循環。這就是上面for循環的意義。

CAS的實現實際上利用了CPU指令來實現的,如果操作系統不支持CAS,還是會加鎖的,如果操作系統支持CAS,則使用原子性的CPU指令。

原子引用

在日常使用中,我們不止對上述基本類型進行原子操作,而是需要對一些複雜類型進行原子操作,所以需要AtomicReference。

不安全實現

先看不安全的BigDecimal類型:

public class testReference {
    static BigDecimal number = BigDecimal.ZERO;

    public static void main(String[] args) throws Exception {

        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 1000; i++) {
                   number=number.add(BigDecimal.ONE);
                }
            }
        };

        Thread t1 = new Thread(runnable);
        t1.start();

        Thread t2 = new Thread(runnable);
        t2.start();

        t1.join();
        t2.join();

        System.out.println(number);
    }
} 

 

 

運行結果如下圖,我們可以看到兩個線程,自循環1000次加1操作,最終結果應該是2000,可是結果小於2000。

安全實現-使用CAS

public class testReference {
    //定義AtomicReference類型BigDecimal變量
    static AtomicReference<BigDecimal> number = new AtomicReference<BigDecimal>(BigDecimal.ZERO);

    public static void main(String[] args) throws Exception {

        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 1000; i++) {
                    //手動寫循環+CAS判斷
                    while(true){
                        BigDecimal pre=number.get();
                        BigDecimal next=number.get().add(BigDecimal.ONE);
                      if(number.compareAndSet(pre,next))  {
                          break;
                      }
                    }
                }
            }
        };

        Thread t1 = new Thread(runnable);
        t1.start();

        Thread t2 = new Thread(runnable);
        t2.start();

        t1.join();
        t2.join();

        System.out.println(number.get());

    }
}

 

 

運行結果如下:

ABA問題及解決

在上面CAS過程中,是通過值比較來知曉是不是能夠更新成功,那如果線程1先加1再減1,這樣主內存還是原來的值,即線程2還是可以更新成功的。但是這樣邏輯錯了,線程1已經發生了修改,線程2不能直接更新成功。

代碼:

public class testInt {

    static AtomicInteger number = new AtomicInteger(0);

    public static void main(String[] args) throws Exception {


        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                int a = number.get();
                System.out.println("開始number:" + a);
                try {
                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(number.compareAndSet(a, a++));


            }
        });
        t1.start();
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("開始增加操作");
                int a = number.incrementAndGet();
                System.out.println("當前number:" + a);
                int b = number.decrementAndGet();
                System.out.println("當前number:" + b);
            }
        });
        t2.start();

        t1.join();
        t2.join();
    }
} 

 

 

我們看線程2對其進行了一系列操作,但是最後打印了還是true,表示可以更新成功的。這顯然不對。

那我們可以使用AtomicStampedReference,為其添加一個版本號。線程1在剛開始讀取主內存的時候,獲取到值為0,版本為1,線程2也獲取到這兩個值,線程1進行加1,減1的操作的時候,版本各加1,現在主內存的值為0,版本為2,而線程2還拿着預計值為0,版本為1的數據嘗試寫入主內存,這個時候因版本不同而更新失敗。具體我們用代碼試下:

public class testInt {

    static AtomicStampedReference<Integer> number = new AtomicStampedReference<Integer>(0, 0);

    public static void main(String[] args) throws Exception {


        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                int a = number.getReference();
                int s = number.getStamp();
                System.out.println("開始number:" + a + ",stamp:" + s);
                try {
                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(number.compareAndSet(a, a + 1, s, s + 1));


            }
        });
        t1.start();
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("開始增加操作");
                int a = number.getReference();
                int s = number.getStamp();
                number.compareAndSet(a, a + 1, s, s + 1);
                System.out.println("當前number:" + a + ",stamp:" + (s + 1));
                a = number.getReference();
                s = number.getStamp();
                number.compareAndSet(a, a - 1, s, s + 1);
                System.out.println("當前number:" + a + ",stamp:" + (s+1));
            }
        });
        t2.start();

        t1.join();
        t2.join();
    }
} 

 

我們可以看到每次操作都會更新stamp(版本號),在最後對比的時候不僅比較值,還比較版本號,所以是不能更新成功的,false.

原子數組

AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray三者類似,所以以AtomicIntegerArray為例,我們可以將下面AtomicIntegerArray看做是AtomicInteger類型的數組,其底層很類似,就不詳細寫了。

AtomicIntegerArray  array = new AtomicIntegerArray(10);
array.getAndIncrement(0);   // 將第0個元素原子地增加1

 

AtomicInteger[]  array = new AtomicInteger[10];
array[0].getAndIncrement();  // 將第0個元素原子地增加1

 

字段更新器和原子累加器比較簡單,這裏就不說了。 

Atomic進階篇分界線

LongAdder源碼分析

LongAdder使用

LongAdder是jdk1.8之後新加的,那為什麼要加他?這個問題,下面將回答,我們先看下如何使用。

public class testLongAdder {
    public static void main(String[] args) throws Exception {
        LongAdder number = new LongAdder();
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for (int j = 0; j < 10000; j++) {
                    number.add(1L);
                }
            }
        };
        Thread t1 = new Thread(runnable);
        Thread t2 = new Thread(runnable);
        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("number:" + number);
    }
}

 

我們可以看到LongAdder的使用和AtomicLong大致相同,使用兩個線程Thread1,Thread2對number值各進行一萬次的自增操作,最後的number是正確的兩萬。

與Atomic的對比優勢

那問題來了,既然AtomicLong能夠完成對多線程下的number進行線程安全的操作,那為什麼還要LongAdder?我們先來段代碼比較下,兩個在結果都是正確的前提下,性能方面的差距。

public class testLongAdder {
    public static void main(String[] args) {
       //1個線程,進行100萬次自增操作
        test1(1,1000000);
      //10個線程,進行100萬次自增操作
        test1(10,1000000);
     //100個線程,進行100萬次自增操作
        test1(100,1000000);
    }

    static void test1(int threadCount,int times){
        long startTime=System.currentTimeMillis();
        AtomicLong number1=new AtomicLong();
        List<Thread> threads1=new ArrayList<>();
        for(int i=0;i<threadCount;i++) {
            threads1.add(new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < times; j++) {
                        number1.incrementAndGet();
                    }
                }
            }));
        }
        threads1.forEach(thread -> thread.start());
        threads1.forEach(thread ->{
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } );

        long endTime=System.currentTimeMillis();
        System.out.println("AtomicLong:"+number1+",time:"+(endTime-startTime));

        LongAdder number2=new LongAdder();
        List<Thread> threads2=new ArrayList<>();
        for(int i=0;i<threadCount;i++) {
            threads2.add(new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < times; j++) {
                        number2.add(1);
                    }
                }
            }));
        }
        threads2.forEach(thread -> thread.start());
        threads2.forEach(thread ->{
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } );

        System.out.println("LongAdder:"+number2+",time:"+(System.currentTimeMillis()-endTime));

    }
} 

 

上述代碼對比了1個線程,10個線程,100個線程在進行100百次自增操作后,AtomicLong和LongAdder所花費的時間。通過打印語句,我們發現在最終number1和number2都正確的基礎上,LongAdder花費的時間比AtomicLong少了一個量級。

源碼分析

那為什麼會導致這種情況,我們就要從源碼層面分析。AtomicLong為什麼效率低?因為如果線程數量一多,尤其在高併發的情況下,比如有100個線程同時想要對對象進行操作,肯定只有一個線程會獲取到鎖,其他99個線程可能空轉,一直循環知道線程釋放鎖。如果該線程操作完畢釋放了鎖,其他99個線程再次競爭,也只有一個線程獲取鎖,另外98個線程還是空轉,直到鎖被釋放。這樣CAS操作會浪費大量資源在空轉上,從而使得AtomicLong在線程數越來越多的情況下越來越慢。

AtomicLong是多個線程對同一個value值進行操作,導致多個線程自旋次數太多,性能降低。而LongAdder在無競爭的情況,跟AtomicLong一樣,對同一個base進行操作,當出現競爭關係時則是採用化整為零的做法,從空間換時間,用一個數組cells,將一個value拆分進這個數組cells。多個線程需要同時對value進行操作時候,可以對線程id進行hash得到hash值,再根據hash值映射到這個數組cells的某個下標,再對該下標所對應的值進行自增操作。當所有線程操作完畢,將數組cells的所有值和無競爭值base都加起來作為最終結果。

我們先看下LongAdder裏面的字段,發現其裏面沒有,主要是在其繼承的Stripped64類中,有下面四個主要變量。

 /** CPU數量,即cells數組的最大長度*/
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    /**
     *cells數組,為2的冪,2,4,8,16.....,方便以後位運算
     */
    transient volatile Cell[] cells;

    /**
     * 基值,主要用於沒有競爭的情況,通過CAS更新。
     */
    transient volatile long base;

    /**
     * 調整單元格大小(擴容),創建單元格時使用的鎖。
     */
    transient volatile int cellsBusy;

 

 

下面是add方法開始。

 public void add(long x) {
        //as:cells數組的引用
        //b:base的基礎值
        //v:期望值
        //m:cells數組大小
        //a:當前數組命中的單元
        Cell[] as; long b, v; int m; Cell a;
        //as不為空(cells已經初始化過,說明之前有其他線程對初始化)或者CAS操作不成功(線程間出現競爭)
        if ((as = cells) != null || !casBase(b = base, b + x)) {
        //初始化uncontented,true表示未競爭(因為有兩個情況,這裏先初始化,後面對其修改,就能區分這兩種情況)
        boolean uncontended = true;
        //as等於null(cells未初始化)
        //或者線程id哈希出來的下標所對應的值為空(cell等於空),getProbe() & m功能是獲取下標,底層邏輯是位運算
        //或者更新失敗為false,即發生競爭,取非就為ture
        if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
        //進入到if裏面,說明更新case失敗,或者更新某個cell也失敗了,或者cell為空,或者cells為空
                longAccumulate(x, null, uncontended);
        }
    }

 

從LongAdder調用Stripped64的longAccumulate方法,主要是初始化cellscells的擴容多個線程同時命中一個cell的競爭操作。

final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        //x:add的值,fn:為null,wasUncontended:是否發生競爭,true為發生競爭,false為不發生競爭
        int h;//線程的hash值
        //如果該線程為0,即第一次進來,所以ThreadLocalRandom強制初始化線程id,再對其hash
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); 
            h = getProbe();
            wasUncontended = true;
        }
        //擴容意向,為false肯定不擴容,為true可能擴容
        boolean collide = false; 
        //死循環               
        for (;;) {
            //as:cells數組的引用
            //a:當前線程命中的cell
            //n:cells的長度
            //v:當前線程命中的cell所擁有的value值
            Cell[] as; Cell a; int n; long v;
            //cells不為空
            if ((as = cells) != null && (n = as.length) > 0) {
                //當前線程命中的cell為空,下面邏輯是新增cell
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                //發生競爭
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                //沒有競爭,嘗試修改當前線程對應的cell值,成功跳出循環
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                //如果n大於CPU最大數量,不可擴容
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                //獲取到了鎖,進行擴容,為2的冪,
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];//左移一位運算符,數量加倍
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }
            //cells等於空,並且獲取到鎖,開始初始化工作,創建結束釋放鎖,繼續循環
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }

 

 

結語

結束了,撒花。這篇主要說了Atomic的一些使用,包括Atomic原子類(AtomicInteger,AtomicLong,AtomicBoolean),Atomic原子引用(AtomicReference,AtomicStampedReference),以及1.8之後LongAdder的優勢,源碼分析。過程還穿插了一些CAS,ABA問題引入和解決方式。

 

參考資料

Java多線程進階(十七)—— J.U.C之atomic框架:LongAdder

CAS原理

Java 8 Performance Improvements: LongAdder vs AtomicLong

AtomicInteger深入理解

原子操作類AtomicInteger詳解

玩轉Java併發工具,精通JUC,成為併發多面手

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司"嚨底家"!

※別再煩惱如何寫文案,掌握八大原則!

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※回頭車貨運收費標準

台中搬家公司費用怎麼算?