併發系列(一)——線程池源碼(ThreadPoolExecutor類)簡析

前言

  本文主要是結合源碼去線程池執行任務的過程,基於JDK 11,整個過程基本與JDK 8相同。

  個人水平有限,文中若有表達有誤的,歡迎大夥留言指出,謝謝了!

一、線程池簡介

  1.1 使用線程池的優點

    1)通過復用已創建的線程,降低資源的消耗(線程的創建/銷毀是要消耗資源的)、提高響應速度;

    2)管理線程的個數,線程的個數在初始化線程池的時候指定;

    3)統一管理線程,比如停止,stop()方法;

  1.2 線程池執行任務過程

    線程池執行任務的過程如下圖所示,主要分為以下4步,其中參數的含義會在後面詳細講解:

    1)判斷工作的線程是否小於核心線程數據(workerCountOf(c) < corePoolSize),若小於則會新建一個線程去執行任務,這一步僅僅的是根據線程個數決定;

    2)若核心線程池滿了,就會判斷線程池的狀態,若是running狀態,則嘗試加入任務隊列,若加入成功后還會做一些事情,後面詳細說;

    3)若任務隊列滿了,則加入失敗,此時會判斷整個線程池線程是否滿,若沒有則創建非核心線程執行任務;

    4)若線程池滿了,則根據拒絕測試處理無法執行的任務;

    整體過程如下圖:

二、ThreadPoolExecutor類解析

  2.1 ThreadPoolExecutor的構造函數

    ThreadPoolExecutor類一共提供了4個構造函數,涉及5~7個參數,下面就5個必備參數的構造函數進行說明:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    1)corePoolSize :初始化核心線程池中線程個數的大小;

    2)maxmumPoolSize:線程池中線程大小;

    3)keepAliveTime:非核心線程的超時時長;

      非核心線程空閑時常大於該值就會被終止。

    4)unit :keepAliveTime的單位,類型可以參見TimeUnit類;

    5)BlockingQueue workQueue:阻塞隊列,維護等待執行的任務;

  2.2  私有類Worker

    在ThreadPoolExecutor類中有兩個集合類型比較重要,一個是用於放置等待任務的workQueue,其類型是阻塞對列;一個是用於用於存放工作線程的works,其是Set類型,其中存放的類型是Worker。

    進一步簡化線程池執行過程,可以理解為works中的工作線程不停的去阻塞對列中取任務,執行結束,線程重新加入大works中。

    為此,有必要簡單了解一下Work類型的組成。

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /** Thread this worker is running in.  Null if factory fails. */
        //工作線程,由線程的工廠類初始化
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
        //不可重入的鎖
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        .......
    }

    Worker類繼承於隊列同步器(AbstractQueueSynchronizer),隊列同步器是採取鎖或其他同步組件的基礎框架,其主要結構是自旋獲取鎖的同步隊列和等待喚醒的等待隊列,其方法因此可以分為兩類:對state改變的方法 和 入、出隊列的方法,即獲取獲取鎖的資格的變化(可能描述的不準確)。關於隊列同步器後續博客會詳細分析,此處不展開討論。

    Work類中通過CAS設置狀態失敗后直接返回false,而不是判斷當前線程是否已獲取鎖來實現不可重入的鎖,源碼註釋中解釋這樣做的原因是因為避免work tash重新獲取到控制線程池全局的方法,如setCorePoolSize。

  2.3  拒絕策略類

    ThreadPoolExecutor的拒絕策略類是以私有類的方式實現的,有四種策略:

    1)AbortPolicy:丟棄任務並拋出RejectedExecutionException異常(默認拒絕處理策略)。

      2)DiscardPolicy:拋棄新來的任務,但是不拋出異常。

      3)DiscardOldestPolicy:拋棄等待隊列頭部(最舊的)的任務,然後重新嘗試執行程序(失敗則會重複此過程)。

      4)CallerRunsPolicy:由調用線程處理該任務。

    其代碼相對簡單,可以參考源碼。

三、任務執行過程分析

  3.1 execute(Runnable)方法

    execute(Runnable)方法的整體過程如上文1.2所述,其實現方式如下:

public void execute(Runnable command) {
        //執行的任務為空,直接拋出異常
        if (command == null)
            throw new NullPointerException();
        //ctl是ThreadPoolExecutor中很關鍵的一個AtomicInteger,主線程池的控制狀態
        int c = ctl.get();
        //1、判斷是否小於核心線程池的大小,若是則直接嘗試新建一個work線程
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //2、大於核心線程池的大小或新建work失敗(如創建thread失敗),會先判斷線程池是否是running狀態,若是則加入阻塞對列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //重新驗證線程池是否為running,若否,則嘗試從對列中刪除,成功后執行拒絕策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //若線程池的狀態為shutdown則,嘗試去執行完阻塞對列中的任務
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //3、新建非核心線程去執行任務,若失敗,則採取拒絕策略
        else if (!addWorker(command, false))
            reject(command);
    }

  3.2 addWorker(Runnable,boole)方法

    execute(Runnable)方法中,新建(非)核心線程執行任務主要是通過addWorker方法實現的,其執行過程如下:

private boolean addWorker(Runnable firstTask, boolean core) {
        //此處反覆檢查線程池的狀態以及工作線程是否超過給定的值
        retry:
        for (int c = ctl.get();;) {
            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;

            for (;;) {
            //核心和非核心線程的區別
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            //通過工廠方法初始化,可能失敗,即可能為null
            final Thread t = w.thread;
            if (t != null) {
            //獲取全局鎖
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    //線程池處於running狀態
                    //或shutdown狀態但無需要執行的task,個人理解為用於去阻塞隊列中取任務執行
                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //執行任務,這裡會執行thread的firstTask獲取阻塞對列中取任務
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
            //開始失敗,則會從workers中刪除新建的work,work數量減1,嘗試關閉線程池,這些過程會獲取全局鎖
                addWorkerFailed(w);
        }
        return workerStarted;
    }

  3.3  runWorker(this) 方法

     在3.2 中當新建的worker線程加入在workers中成功后,就會啟動對應任務,其調用的是Worker類中的run()方法,即調用runWorker(this)方法,其過程如下:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
        //while()循環中,前者是新建線程執行firstTask,對應線程個數小於核心線程和阻塞隊列滿的情況,
        //getTask()則是從阻塞對列中取任務執行
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                //僅線程池狀態為stop時,線程響應中斷,這裏也就解釋了調用shutdown時,正在工作的線程會繼續工作
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    try {
                    //執行任務
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    //完成的個數+1
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //處理後續工作
            processWorkerExit(w, completedAbruptly);
        }
    }

   3.4 processWorkerExit(Worker,boole)方法

    當任務執行結果后,在滿足一定條件下會新增一個worker線程,代碼如下:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            //對工作線程的增減需要加全局鎖
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        //嘗試終止線程池
        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
        //線程不是中斷,會維持最小的個數
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            //執行完任務后,線程重新加入workers中
            addWorker(null, false);
        }
    }

  至此,線程池執行任務的過程分析結束,其他方法的實現過程可以參考源碼。

 

Ref:

[1]http://concurrent.redspider.group/article/03/12.html

[2]《Java併發編程的藝術》

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※回頭車貨運收費標準

Dart Memo for Android Developers

Dart Memo for Android Developers

Dart語言一些語法特點和編程規範.

本文適合: 日常使用Kotlin, 突然想寫個Flutter程序的Android程序員.

Dart語言

完整的請看A tour of the Dart language

  • 創建對象可以不用new. -> 並且規範不讓用new, lint會報錯.
  • 聲明變量可以用var, 也可以用具體類型如String. 不變量用final, 常量用const.
  • 沒有訪問修飾符, 用_來表示私有: 文件級別.
  • 字符串可以用單引號'.
  • 語句結尾要用;.
  • 創建數組可以用: var list = [1, 2, 3];.
  • assert()常用來斷定開發時不可能會出現的情況.
  • 空測試操作符: ??.
  • 過濾操作符: where.
  • 兩個點..表示鏈式調用.
  • dynamic說明類型未指定.
  • 除了throw異常, 還可以throw別的東西, 比如字符串.

函數

  • 函數返回值在函數最開頭, 可以不標. -> 但是規範會建議標註返回值.
bool isNoble(int atomicNumber) {
  return _nobleGases[atomicNumber] != null;
}
  • =>箭頭符號, 用來簡化一句話的方法.
bool isNoble(int atomicNumber) => _nobleGases[atomicNumber] != null;

構造函數

  • 構造函數{}表示帶名字, 參數可選, 若要必選加上@required.
const Scrollbar({Key key, @required Widget child})
  • 構造函數名可以是ClassName或者ClassName.identifier.
  • 空構造函數體可以省略, 用;結尾就行:
class Point {
  double x, y;
  Point(this.x, this.y);
}

這裡會初始化相應的變量, 也不用聲明具體的參數類型.

  • factory構造, 可以用來返回緩存實例, 或者返回類型的子類:
factory Logger(String name) {
    return _cache.putIfAbsent(name, () => Logger._internal(name));
}

異步代碼

Future<String> lookUpVersion() async => '1.0.0';

Future checkVersion() async {
  var version = await lookUpVersion();
  // Do something with version
}

編程規範類

完整的規範在這裏: Effective Dart.

有一些Good和Bad的舉例, 這裏僅列出比較常用的幾項.

  • 文件名要蛇形命名: lowercase_with_underscores. 類名: UpperCamelCase.
  • 對自己程序的文件, 兩種import都可以(package開頭或者相對路徑), 但是要保持一致.
  • Flutter程序嵌套比較多, 要用結尾的,來幫助格式化.

本文緣由

年初的時候學了一陣子Flutter, 寫了各種大小demo. 結果隔了兩個月之後, 突然心血來潮想寫個小東西, 打開Android Studio, 首先發現創建Flutter程序的按鈕都不見了. (估計是Android Studio4.0升級之後Flutter的插件沒跟上).

接着用命令行創建了工程, 打開之後稍微整理了一下心情, 然後就….懵逼了.

突然不知道如何下手.
宏觀的東西還記得, 要用什麼package, 基本常用的幾個Widget都是啥, 但是微觀的, 忘了函數和數組都是咋定義的了.
這種懵逼的狀態令我很憤怒, 果然是上年紀了嗎, 無縫切換個語言都不行.

於是就想着還是寫個備忘錄吧.

References

  • A tour of the Dart language
  • Effective Dart

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

※教你寫出一流的銷售文案?

※超省錢租車方案

【asp.net core 系列】9 實戰之 UnitOfWork以及自定義代碼生成

0. 前言

在前一篇中我們創建了一個基於EF的數據查詢接口實現基類,這一篇我將帶領大家講一下為這EF補充一些功能,並且提供一個解決避免寫大量配置類的方案。

1. SaveChanges的外移

在之前介紹EF Core的時候,我們提到過使用EF需要在每次使用之後,調用一次SaveChanges將數據提交給數據庫。在實際開發中,我們不能添加一條數據或者做一次修改就調用一次SaveChanges,這完全不現實。因為每次調用SaveChanges是EF向數據庫提交變更的時候,所以EF推薦的是每次執行完用戶的請求之後統一提交數據給數據庫。

這樣就會造成一個問題,可能也不是問題:我們需要一個接口來管理EF 的SaveChanges操作。

1.1 創建一個IUnitOfWork接口

通常我們會在Domain項目中添加一個IUnitOfWork接口,這個接口有一個方法就是SaveChanges,代碼如下:

namespace Domain.Insfrastructure
{
    public interface IUnitOfWork
    {
        void SaveChanges();
    }
}

這個方法的意思表示到執行該方法的時候,一個完整的工作流程執行完成了。也就是說,當執行該方法后,當前請求不會再與數據庫發生連接。

1.2 實現IUnitOfWork接口

在 Domain.Implement中添加IUnitOfWork實現類:

using Domain.Insfrastructure;
using Microsoft.EntityFrameworkCore;

namespace Domain.Implements.Insfrastructure
{
    public class UnitOfWork: IUnitOfWork
    {
        private DbContext DbContext;
        public UnitOfWork(DbContext context)
        {
            DbContext = context;
        }

        public void SaveChanges()
        {
            DbContext.SaveChanges();
        }
    }
}

1.3 調用時機

到現在我們已經創建了一個UnitOfWork的方法,那麼問題來了,我們該在什麼時候調用呢,或者說如何調用呢?

我的建議是創建一個ActionFilter,針對所有的控制器進行SaveChanges進行處理。當然了,也可以在控制器中持有一個IUnitOfWork的示例,然後在Action結束的時候,執行SaveChanges。不過這樣存在一個問題,可能會存在遺漏的方法。所以我推薦這樣操作,這裏簡單演示一下如何創建攔截器:

在Web的根目錄下,創建一個Filters目錄,這個目錄里用來存儲一些過濾器,創建我們需要的過濾器:

using Domain.Insfrastructure;
using Microsoft.AspNetCore.Mvc.Filters;

namespace Web.Filters
{
    public class UnitOfWorkFilterAttribute : ActionFilterAttribute
    {
        public IUnitOfWork UnitOfWork;

        public override void OnActionExecuted(ActionExecutedContext context)
        {
            UnitOfWork.SaveChanges();
        }
    }
}

使用一個ActionFilter可以很方便的解決一些容易遺漏但又必須執行的代碼。這裏就先不介紹如何配置Filter的啟用和詳細介紹了,請允許我賣個關子。當然了,有些小夥伴肯定也能猜到這是一個Attribute類,所以可以按照Attribute給Controller打標記。

2. 創建一個簡單的代碼生成方法

之前在介紹EF的時候,有個小夥伴跟我說,還要寫配置文件啊,太麻煩了。是的,之前我介紹了很多關於寫配置文件不使用特性的好處,但不解決這個問題就無法真正體檢配置類的好處。

雖然說,EF Core約定優先,但是如果默認約定的話,得在DBContext中聲明 DbSet<T> 來聲明這個字段,實體類少的話,比較簡單。如果多個數據表的話,就會非常麻煩。

所以這時候就要使用工具類, 那麼簡單的分析一下,這個工具類需要有哪些功能:

  • 第一步,找到實體類並解析出實體類的類名
  • 第二步,生成配置文件
  • 第三步,創建對應的Repository接口和實現類

很簡單的三步,但是難點就是找實體類並解析出實體類名。

在Util項目中添加一個Develop目錄,並創建Develop類:

namespace Utils.Develop
{
    public class Develop
    {
        
    }
}

定位當前類所在目錄,通過

Directory.GetCurrentDirectory()

這個方法可以獲取當前執行的DLL所在目錄,當然不同的編譯器在執行的時候,會有微妙的不同。所以我們需要以此為根據然後獲取項目的根目錄,一個簡單的方法,查找*.sln 所在目錄:

public static string CurrentDirect
{
    get
    {
        var execute = Directory.GetCurrentDirectory();
        var parent = Directory.GetParent(execute);
        while(parent.GetFiles("*.sln",SearchOption.TopDirectoryOnly).Length == 0)
        {
            parent = parent.Parent;
            if(parent == null)
            {
                return null;
            }
        }
        return parent.FullName;
    }
}

2.1 獲取實體類

那麼獲取到根目錄之後,我們下一步就是獲取實體類。因為我們的實體類都要求是繼承BaseEntity或者命名空間都是位於Data.Models下面。當然這個名稱都是根據實際業務場景約束的,這裏只是以當前項目舉例。那麼,我們可以通過以下方法找到我們設置的實體類:

public static Type[] LoadEntities()
{
    var assembly = Assembly.Load("Data");
    var allTypes = assembly.GetTypes();
    var ofNamespace = allTypes.Where(t => t.Namespace == "Data.Models" || t.Namespace.StartsWith("Data.Models."));
    var subTypes = allTypes.Where(t => t.BaseType.Name == "BaseEntity`1");
    return ofNamespace.Union(subTypes).ToArray();
}

通過 Assembly加載Data的程序集,然後選擇出符合我們要求的實體類。

2.2 編寫Repository接口

我們先約定Model的Repository接口定義在 Domain/Repository目錄下,所以它們的命名空間應該是:

namespace Domain.Repository	
{
}

假設目錄情況與Data/Models下面的代碼結構保持一致,然後生成代碼應該如下:

public static void CreateRepositoryInterface(Type type)
{
    var targetNamespace = type.Namespace.Replace("Data.Models", "");
    if (targetNamespace.StartsWith("."))
    {
        targetNamespace = targetNamespace.Remove(0);
    }
    var targetDir = Path.Combine(new[]{CurrentDirect,"Domain", "Repository"}.Concat(
        targetNamespace.Split('.')).ToArray());
    if (!Directory.Exists(targetDir))
    {
        Directory.CreateDirectory(targetDir);
    }

    var baseName = type.Name.Replace("Entity","");

    if (!string.IsNullOrEmpty(targetNamespace))
    {
        targetNamespace = $".{targetNamespace}";
    }
    var file = $"using {type.Namespace};\r\n"
        + $"using Domain.Insfrastructure;\r\n"
        + $"namespace Domain.Repository{targetNamespace}\r\n"
        + "{\r\n"
        + $"\tpublic interface I{baseName}ModifyRepository : IModifyRepository<{type.Name}>\r\n" +
        "\t{\r\n\t}\r\n"
        + $"\tpublic interface I{baseName}SearchRepository : ISearchRepository<{type.Name}>\r\n" +
        "\t{\r\n\t}\r\n}";

    File.WriteAllText(Path.Combine(targetDir, $"{baseName}Repository.cs"), file);
}

2.3 編寫Repository的實現類

因為我們提供了一個基類,所以我們在生成方法的時候,推薦繼承這個類,那麼實現方法應該如下:

public static void CreateRepositoryImplement(Type type)
{
    var targetNamespace = type.Namespace.Replace("Data.Models", "");
    if (targetNamespace.StartsWith("."))
    {
        targetNamespace = targetNamespace.Remove(0);
    }

    var targetDir = Path.Combine(new[] {CurrentDirect, "Domain.Implements", "Repository"}.Concat(
        targetNamespace.Split('.')).ToArray());
    if (!Directory.Exists(targetDir))
    {
        Directory.CreateDirectory(targetDir);
    }
    var baseName = type.Name.Replace("Entity", "");
    if (!string.IsNullOrEmpty(targetNamespace))
    {
        targetNamespace = $".{targetNamespace}";
    }

    var file = $"using {type.Namespace};" +
        $"\r\nusing Domain.Implements.Insfrastructure;" +
        $"\r\nusing Domain.Repository{targetNamespace};" +
        $"\r\nusing Microsoft.EntityFrameworkCore;" +
        $"namespace Domain.Implements.Repository{targetNamespace}\r\n" +
        "{" +
        $"\r\n\tpublic class {baseName}Repository :BaseRepository<{type.Name}> ,I{baseName}ModifyRepository,I{baseName}SearchRepository " +
        "\r\n\t{" +
        $"\r\n\t\tpublic {baseName}Repository(DbContext context) : base(context)"+
        "\r\n\t\t{"+
        "\r\n\t\t}\r\n"+
        "\t}\r\n}";
    File.WriteAllText(Path.Combine(targetDir, $"{baseName}Repository.cs"), file);
}

2.4 配置文件的生成

仔細觀察一下代碼,可以發現整體都是十分簡單的。所以這篇就不掩飾如何生成配置文件了,小夥伴們可以自行嘗試一下哦。具體實現可以等一下篇哦。

3. 總結

這一篇初略的介紹了兩個用來輔助EF Core實現的方法或類,這在開發中很重要。UnitOfWork用來確保一次請求一個工作流程,簡單的代碼生成類讓我們能讓我們忽略那些繁重的創建同類代碼的工作。

更多內容煩請關注我的博客《高先生小屋》

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

網頁設計最專業,超強功能平台可客製化

曹工說Redis源碼(8)–面試時,redis 內存淘汰總被問,但是總答不好

文章導航

Redis源碼系列的初衷,是幫助我們更好地理解Redis,更懂Redis,而怎麼才能懂,光看是不夠的,建議跟着下面的這一篇,把環境搭建起來,後續可以自己閱讀源碼,或者跟着我這邊一起閱讀。由於我用c也是好幾年以前了,些許錯誤在所難免,希望讀者能不吝指出。

曹工說Redis源碼(1)– redis debug環境搭建,使用clion,達到和調試java一樣的效果

曹工說Redis源碼(2)– redis server 啟動過程解析及簡單c語言基礎知識補充

曹工說Redis源碼(3)– redis server 啟動過程完整解析(中)

曹工說Redis源碼(4)– 通過redis server源碼來理解 listen 函數中的 backlog 參數

曹工說Redis源碼(5)– redis server 啟動過程解析,以及EventLoop每次處理事件前的前置工作解析(下)

曹工說Redis源碼(6)– redis server 主循環大體流程解析

曹工說Redis源碼(7)– redis server 的周期執行任務,到底要做些啥

什麼是內存淘汰

內存淘汰,和平時我們設置redis key的過期時間,不是一回事;內存淘汰是說,假設我們限定redis只能使用8g內存,現在已經使用了這麼多了(包括設置了過期時間的key和沒設過期時間的key),那,後續的set操作,還怎麼辦呢?

是不是只能報錯了?

那不行啊,不科學吧,因為有的key,可能已經很久沒人用了,可能以後也不會再用到了,那我們是不是可以把這類key給幹掉呢?

幹掉key的過程,就是內存淘汰。

內存淘汰什麼時候啟用

當我們在配置文件里設置了如下屬性時:

# maxmemory <bytes>

默認,該屬性是被註釋掉的。

其實,這個配置項的註釋,相當有價值,我們來看看:

# Don't use more memory than the specified amount of bytes.
# When the memory limit is reached Redis will try to remove keys
# according to the eviction policy selected (see maxmemory-policy).
#
# If Redis can't remove keys according to the policy, or if the policy is
# set to 'noeviction', Redis will start to reply with errors to commands
# that would use more memory, like SET, LPUSH, and so on, and will continue
# to reply to read-only commands like GET.
#
# This option is usually useful when using Redis as an LRU cache, or to set
# a hard memory limit for an instance (using the 'noeviction' policy).
#
# WARNING: If you have slaves attached to an instance with maxmemory on,
# the size of the output buffers needed to feed the slaves are subtracted
# from the used memory count, so that network problems / resyncs will
# not trigger a loop where keys are evicted, and in turn the output
# buffer of slaves is full with DELs of keys evicted triggering the deletion
# of more keys, and so forth until the database is completely emptied.
#
# In short... if you have slaves attached it is suggested that you set a lower
# limit for maxmemory so that there is some free RAM on the system for slave
# output buffers (but this is not needed if the policy is 'noeviction').
#
# maxmemory <bytes>

渣翻譯如下:

不能使用超過指定數量bytes的內存。當該內存限制被達到時,redis會根據過期策略(eviction policy,通過參數 maxmemory-policy來指定)來驅逐key。

如果redis根據指定的策略,或者策略被設置為“noeviction”,redis會開始針對如下這種命令,回復錯誤。什麼命令呢?會使用更多內存的那類命令,比如set、lpush;只讀命令還是不受影響,可以正常響應。

該選項通常在redis使用LRU緩存時有用,或者在使用noeviction策略時,設置一個進程級別的內存limit。

內存淘汰策略

所謂策略,意思是,當我們要刪除部分key的時候,刪哪些,不刪哪些?是不是需要一個策略?比如是隨機刪,就像滅霸一樣?還是按照lru時間來刪,lru的策略意思就是,最近最少使用的key,將被優先刪除。

總之,我們需要定一個規則。

redis默認支持以下策略:

# MAXMEMORY POLICY: how Redis will select what to remove when maxmemory
# is reached. You can select among five behaviors:
# 
# volatile-lru -> remove the key with an expire set using an LRU algorithm
# allkeys-lru -> remove any key accordingly to the LRU algorithm
# volatile-random -> remove a random key with an expire set
# allkeys-random -> remove a random key, any key
# volatile-ttl -> remove the key with the nearest expire time (minor TTL)
# noeviction -> don't expire at all, just return an error on write operations
# 
# Note: with any of the above policies, Redis will return an error on write
#       operations, when there are not suitable keys for eviction.
#
#       At the date of writing this commands are: set setnx setex append
#       incr decr rpush lpush rpushx lpushx linsert lset rpoplpush sadd
#       sinter sinterstore sunion sunionstore sdiff sdiffstore zadd zincrby
#       zunionstore zinterstore hset hsetnx hmset hincrby incrby decrby
#       getset mset msetnx exec sort
#
# The default is:
#
# maxmemory-policy noeviction
maxmemory-policy allkeys-lru
針對設置了過期時間的,使用lru算法
# volatile-lru -> remove the key with an expire set using an LRU algorithm

針對全部key,使用lru算法
# allkeys-lru -> remove any key accordingly to the LRU algorithm

針對設置了過期時間的,隨機刪
# volatile-random -> remove a random key with an expire set

針對全部key,隨機刪
# allkeys-random -> remove a random key, any key

針對設置了過期時間的,馬上要過期的,刪掉
# volatile-ttl -> remove the key with the nearest expire time (minor TTL)

不過期,不能寫了,就報錯
# noeviction -> don't expire at all, just return an error on write operations

一般呢,我們會設置為:

allkeys-lru,即,針對全部key,進行lru。

源碼實現

配置讀取

在如下結構體中,定義了如下字段:

struct redisServer {
	...
	unsigned long long maxmemory;   /* Max number of memory bytes to use */
    int maxmemory_policy;           /* Policy for key eviction */
    int maxmemory_samples;          /* Pricision of random sampling */
    ...
}

當我們在配置文件中,進入如下配置時,該結構體中幾個字段的值如下:

maxmemory 3mb
maxmemory-policy allkeys-lru
# maxmemory-samples 5  這個取了默認值

maxmemory_policy為3,是因為枚舉值為3:

#define REDIS_MAXMEMORY_VOLATILE_LRU 0
#define REDIS_MAXMEMORY_VOLATILE_TTL 1
#define REDIS_MAXMEMORY_VOLATILE_RANDOM 2
#define REDIS_MAXMEMORY_ALLKEYS_LRU 3
#define REDIS_MAXMEMORY_ALLKEYS_RANDOM 4
#define REDIS_MAXMEMORY_NO_EVICTION 5
#define REDIS_DEFAULT_MAXMEMORY_POLICY REDIS_MAXMEMORY_NO_EVICTION

處理命令時,判斷是否進行內存淘汰

在處理命令的時候,會調用中的

redis.c  processCommand
    
int processCommand(redisClient *c) {
    /* The QUIT command is handled separately. Normal command procs will
     * go through checking for replication and QUIT will cause trouble
     * when FORCE_REPLICATION is enabled and would be implemented in
     * a regular command proc. */
    // 特別處理 quit 命令
    void *commandName = c->argv[0]->ptr;
    redisLog(REDIS_NOTICE, "The server is now processing %s", commandName);

    if (!strcasecmp(c->argv[0]->ptr, "quit")) {
        addReply(c, shared.ok);
        c->flags |= REDIS_CLOSE_AFTER_REPLY;
        return REDIS_ERR;
    }

    /* Now lookup the command and check ASAP about trivial error conditions
     * such as wrong arity, bad command name and so forth. */
    // 1 查找命令,並進行命令合法性檢查,以及命令參數個數檢查
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    if (!c->cmd) {
        // 沒找到指定的命令
        flagTransaction(c);
        addReplyErrorFormat(c, "unknown command '%s'",
                            (char *) c->argv[0]->ptr);
        return REDIS_OK;
    }

    /* Check if the user is authenticated */
    //2 檢查認證信息
    if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) {
        flagTransaction(c);
        addReply(c, shared.noautherr);
        return REDIS_OK;
    }

    /* If cluster is enabled perform the cluster redirection here.
     *
     * 3 如果開啟了集群模式,那麼在這裏進行轉向操作。
     *
     * However we don't perform the redirection if:
     *
     * 不過,如果有以下情況出現,那麼節點不進行轉向:
     *
     * 1) The sender of this command is our master.
     *    命令的發送者是本節點的主節點
     *
     * 2) The command has no key arguments. 
     *    命令沒有 key 參數
     */
    if (server.cluster_enabled &&
        !(c->flags & REDIS_MASTER) &&
        !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) {
        int hashslot;

        // 集群已下線
        if (server.cluster->state != REDIS_CLUSTER_OK) {
            flagTransaction(c);
            addReplySds(c, sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n"));
            return REDIS_OK;

            // 集群運作正常
        } else {
            int error_code;
            clusterNode *n = getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, &error_code);
            // 不能執行多鍵處理命令
            if (n == NULL) {
                flagTransaction(c);
                if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) {
                    addReplySds(c, sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
                } else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) {
                    /* The request spawns mutliple keys in the same slot,
                     * but the slot is not "stable" currently as there is
                     * a migration or import in progress. */
                    addReplySds(c, sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
                } else {
                    redisPanic("getNodeByQuery() unknown error.");
                }
                return REDIS_OK;

                //3.1 命令針對的槽和鍵不是本節點處理的,進行轉向
            } else if (n != server.cluster->myself) {
                flagTransaction(c);
                // -<ASK or MOVED> <slot> <ip>:<port>
                // 例如 -ASK 10086 127.0.0.1:12345
                addReplySds(c, sdscatprintf(sdsempty(),
                                            "-%s %d %s:%d\r\n",
                                            (error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
                                            hashslot, n->ip, n->port));

                return REDIS_OK;
            }

            // 如果執行到這裏,說明鍵 key 所在的槽由本節點處理
            // 或者客戶端執行的是無參數命令
        }
    }

    /* Handle the maxmemory directive.
     *
     * First we try to free some memory if possible (if there are volatile
     * keys in the dataset). If there are not the only thing we can do
     * is returning an error. */
    //4 如果設置了最大內存,那麼檢查內存是否超過限制,並做相應的操作
    if (server.maxmemory) {
        //4.1 如果內存已超過限制,那麼嘗試通過刪除過期鍵來釋放內存
        int retval = freeMemoryIfNeeded();
        // 如果即將要執行的命令可能佔用大量內存(REDIS_CMD_DENYOOM)
        // 並且前面的內存釋放失敗的話
        // 那麼向客戶端返回內存錯誤
        if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
            flagTransaction(c);
            addReply(c, shared.oomerr);
            return REDIS_OK;
        }
    }    
    ....
  • 1處,查找命令,對應的函數指針(類似於java里的策略模式,根據命令,找對應的策略)
  • 2處,檢查,是否密碼正確
  • 3處,集群相關操作;
  • 3.1處,不是本節點處理,直接返回ask,指示客戶端轉向
  • 4處,判斷是否設置了maxMemory,這裏就是本文重點:設置了maxMemory時,內存淘汰策略
  • 4.1處,調用了下方的 freeMemoryIfNeeded

接下來,深入4.1處:


int freeMemoryIfNeeded(void) {
    size_t mem_used, mem_tofree, mem_freed;
    int slaves = listLength(server.slaves);

    /* Remove the size of slaves output buffers and AOF buffer from the
     * count of used memory. */
    // 計算出 Redis 目前佔用的內存總數,但有兩個方面的內存不會計算在內:
    // 1)從服務器的輸出緩衝區的內存
    // 2)AOF 緩衝區的內存
    mem_used = zmalloc_used_memory();
    if (slaves) {
		...
    }
    if (server.aof_state != REDIS_AOF_OFF) {
        mem_used -= sdslen(server.aof_buf);
        mem_used -= aofRewriteBufferSize();
    }

    /* Check if we are over the memory limit. */
    //1 如果目前使用的內存大小比設置的 maxmemory 要小,那麼無須執行進一步操作
    if (mem_used <= server.maxmemory) return REDIS_OK;

    //2 如果佔用內存比 maxmemory 要大,但是 maxmemory 策略為不淘汰,那麼直接返回
    if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION)
        return REDIS_ERR; /* We need to free memory, but policy forbids. */

    /* Compute how much memory we need to free. */
    // 3 計算需要釋放多少字節的內存
    mem_tofree = mem_used - server.maxmemory;

    // 初始化已釋放內存的字節數為 0
    mem_freed = 0;

    // 根據 maxmemory 策略,
    //4 遍歷字典,釋放內存並記錄被釋放內存的字節數
    while (mem_freed < mem_tofree) {
        int j, k, keys_freed = 0;

        // 遍歷所有字典
        for (j = 0; j < server.dbnum; j++) {
            long bestval = 0; /* just to prevent warning */
            sds bestkey = NULL;
            dictEntry *de;
            redisDb *db = server.db + j;
            dict *dict;

            if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU ||
                server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM) {
                // 如果策略是 allkeys-lru 或者 allkeys-random 
                //5 那麼淘汰的目標為所有數據庫鍵
                dict = server.db[j].dict;
            } else {
                // 如果策略是 volatile-lru 、 volatile-random 或者 volatile-ttl 
                //6 那麼淘汰的目標為帶過期時間的數據庫鍵
                dict = server.db[j].expires;
            }


            /* volatile-random and allkeys-random policy */
            // 如果使用的是隨機策略,那麼從目標字典中隨機選出鍵
            if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM ||
                server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_RANDOM) {
                de = dictGetRandomKey(dict);
                bestkey = dictGetKey(de);
            }
            /* volatile-lru and allkeys-lru policy */
            //7 
            else if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU ||
                     server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU) {
                struct evictionPoolEntry *pool = db->eviction_pool;

                while (bestkey == NULL) {
                    // 8 
                    evictionPoolPopulate(dict, db->dict, db->eviction_pool);
                    /* Go backward from best to worst element to evict. */
                    for (k = REDIS_EVICTION_POOL_SIZE - 1; k >= 0; k--) {
                        if (pool[k].key == NULL) continue;
                        // 8.1
                        de = dictFind(dict, pool[k].key);

                        /* 8.2 Remove the entry from the pool. */
                        sdsfree(pool[k].key);
                        /* Shift all elements on its right to left. */
                        memmove(pool + k, pool + k + 1,
                                sizeof(pool[0]) * (REDIS_EVICTION_POOL_SIZE - k - 1));
                        /* Clear the element on the right which is empty
                         * since we shifted one position to the left.  */
                        pool[REDIS_EVICTION_POOL_SIZE - 1].key = NULL;
                        pool[REDIS_EVICTION_POOL_SIZE - 1].idle = 0;

                        /* If the key exists, is our pick. Otherwise it is
                         * a ghost and we need to try the next element. */
                        // 8.3
                        if (de) {
                            bestkey = dictGetKey(de);
                            break;
                        } else {
                            /* Ghost... */
                            continue;
                        }
                    }
                }
            }

                /* volatile-ttl */
                // 策略為 volatile-ttl ,從一集 sample 鍵中選出過期時間距離當前時間最接近的鍵
            else if (server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_TTL) {
                ...
            }

            /* Finally remove the selected key. */
            // 8.4 刪除被選中的鍵
            if (bestkey) {
                long long delta;

                robj *keyobj = createStringObject(bestkey, sdslen(bestkey));
                propagateExpire(db, keyobj);
                /* We compute the amount of memory freed by dbDelete() alone.
                 * It is possible that actually the memory needed to propagate
                 * the DEL in AOF and replication link is greater than the one
                 * we are freeing removing the key, but we can't account for
                 * that otherwise we would never exit the loop.
                 *
                 * AOF and Output buffer memory will be freed eventually so
                 * we only care about memory used by the key space. */
                // 計算刪除鍵所釋放的內存數量
                delta = (long long) zmalloc_used_memory();
                dbDelete(db, keyobj);
                delta -= (long long) zmalloc_used_memory();
                mem_freed += delta;

                // 對淘汰鍵的計數器增一
                server.stat_evictedkeys++;

                notifyKeyspaceEvent(REDIS_NOTIFY_EVICTED, "evicted",
                                    keyobj, db->id);
                decrRefCount(keyobj);
                keys_freed++;
				...
            }
        }

        if (!keys_freed) return REDIS_ERR; /* nothing to free... */
    }

    return REDIS_OK;
}
  • 1處,如果目前使用的內存大小比設置的 maxmemory 要小,那麼無須執行進一步操作

  • 2處,如果佔用內存比 maxmemory 要大,但是 maxmemory 策略為不淘汰,那麼直接返回

  • 3處,計算需要釋放多少字節的內存

  • 4處,遍歷字典,釋放內存並記錄被釋放內存的字節數

  • 5處,如果策略是 allkeys-lru 或者 allkeys-random 那麼淘汰的目標為所有數據庫鍵

  • 6處,如果策略是 volatile-lru 、 volatile-random 或者 volatile-ttl ,那麼淘汰的目標為帶過期時間的數據庫鍵

  • 7處,如果使用的是 LRU 策略, 那麼從 sample 鍵中選出 IDLE 時間最長的那個鍵

  • 8處,調用evictionPoolPopulate,該函數在下面講解,該函數的功能是,傳入一個鏈表,即這裏的db->eviction_pool,然後在函數內部,隨機找出n個key,放入傳入的鏈表中,並按照空閑時間排序,空閑最久的,放到最後。

    當該函數,返回后,db->eviction_pool這個鏈表裡就存放了我們要淘汰的key。

  • 8.1處,找到這個key,這個key,在後邊會被刪除

  • 8.2處,下面這一段,從db->eviction_pool將這個已經處理了的key刪掉

  • 8.3處,如果這個key,是存在的,則跳出循環,在後面8.4處,會被刪除

  • 8.4處,刪除這個key

選擇哪些key作為被淘汰的key

前面我們看到,在7處,如果為lru策略,則會進入8處的函數:

evictionPoolPopulate。

該函數的名稱為:填充(populate)驅逐(eviction)對象池(pool)。驅逐的意思,就是現在達到了maxmemory,沒辦法,只能開始刪除掉一部分元素,來騰空間了,不然新的put類型的命令,根本沒辦法執行。

該方法的大概思路是,使用lru的時候,隨機找n個key,類似於抽樣,然後放到一個鏈表,根據空閑時間排序。

具體看看該方法的實現:

void evictionPoolPopulate(dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {

其中,傳入的第三個參數,是要被填充的對象,在c語言中,習慣傳入一個入參,然後在函數內部填充或者修改入參對象的屬性。

該屬性,就是前面說的那個鏈表,用來存放收集的隨機的元素,該鏈表中節點的結構如下:

struct evictionPoolEntry {
    unsigned long long idle;    /* Object idle time. */
    sds key;                    /* Key name. */
};

該結構共2個字段,一個存儲key,一個存儲空閑時間。

該鏈表中,共maxmemory-samples個元素,會按照idle時間長短排序,idle時間長的在鏈表尾部,(假設頭在左,尾在右)。

void evictionPoolPopulate(dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
    int j, k, count;
    dictEntry *_samples[EVICTION_SAMPLES_ARRAY_SIZE];
    dictEntry **samples;

    /* Try to use a static buffer: this function is a big hit...
     * Note: it was actually measured that this helps. */
    if (server.maxmemory_samples <= EVICTION_SAMPLES_ARRAY_SIZE) {
        samples = _samples;
    } else {
        samples = zmalloc(sizeof(samples[0]) * server.maxmemory_samples);
    }

    /* 1 Use bulk get by default. */
    count = dictGetRandomKeys(sampledict, samples, server.maxmemory_samples);

	// 2
    for (j = 0; j < count; j++) {
        unsigned long long idle;
        sds key;
        robj *o;
        dictEntry *de;

        de = samples[j];
        key = dictGetKey(de);
        /* If the dictionary we are sampling from is not the main
         * dictionary (but the expires one) we need to lookup the key
         * again in the key dictionary to obtain the value object. */
        if (sampledict != keydict) de = dictFind(keydict, key);
        // 3
        o = dictGetVal(de);
        // 4
        idle = estimateObjectIdleTime(o);

        /* 5  Insert the element inside the pool.
         * First, find the first empty bucket or the first populated
         * bucket that has an idle time smaller than our idle time. */
        k = 0;
        while (k < REDIS_EVICTION_POOL_SIZE &&
               pool[k].key &&
               pool[k].idle < idle)
            k++;
        
		...
            
        // 6
        pool[k].key = sdsdup(key);
        pool[k].idle = idle;
    }
    if (samples != _samples) zfree(samples);
}
  • 1處,獲取 server.maxmemory_samples個key,這裡是隨機獲取的,(dictGetRandomKeys),這個值,默認值為5,放到samples中

  • 2處,遍歷返回來的samples

  • 3處,調用如下宏,獲取val

    he的類型為dictEntry:

    /*
     * 哈希表節點
     */
    typedef struct dictEntry {
        
        // 鍵
        void *key;
    
        // 值
        union {
            // 1
            void *val;
            uint64_t u64;
            int64_t s64;
        } v;
    
        // 指向下個哈希表節點,形成鏈表
        struct dictEntry *next;
    
    } dictEntry;
    

    所以,這裏去

    robj *o; 
    
    o = dictGetVal(de);
    

    實際就是獲取其v屬性中的val,(1處):

    #define dictGetVal(he) ((he)->v.val)
    
  • 4處,準備計算該val的空閑時間

    我們上面3處,看到,獲取的o的類型為robj。我們現在看看怎麼計算對象的空閑時長:

    /* Given an object returns the min number of milliseconds the object was never
     * requested, using an approximated LRU algorithm. */
    unsigned long long estimateObjectIdleTime(robj *o) {
        //4.1 獲取系統的當前時間
        unsigned long long lruclock = LRU_CLOCK();
        // 4.2
        if (lruclock >= o->lru) {
            // 4.3
            return (lruclock - o->lru) * REDIS_LRU_CLOCK_RESOLUTION;
        } else {
            return (lruclock + (REDIS_LRU_CLOCK_MAX - o->lru)) *
                        REDIS_LRU_CLOCK_RESOLUTION;
        }
    }
    

    這裏,4.1處,獲取系統的當前時間;

    4.2處,如果系統時間,大於對象的lru時間

    4.3處,則用系統時間減去對象的lru時間,再乘以單位,換算為毫秒,最終返回的單位,為毫秒(可以看註釋。)

    #define REDIS_LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */
    
  • 5處,這裏拿當前元素,和pool中已經放進去的元素,從第0個開始比較,如果當前元素的idle時長,大於pool中指針0指向的元素,則和pool中索引1的元素比較;直到條件不滿足為止。

    這句話意思就是,類似於冒泡,把當前元素一直往後冒,直到idle時長小於被比較的元素為止。

  • 6處,把當前元素放進pool中。

經過上面的處理后,鏈表中存放了全部的抽樣元素,且ide時間最長的,在最右邊。

對象還有字段存儲空閑時間?

前面4處,說到,用系統的當前時間,減去對象的lru時間。

大家看看對象的結構體

typedef struct redisObject {

    // 類型
    unsigned type:4;

    // 編碼
    unsigned encoding:4;

    //1 對象最後一次被訪問的時間
    unsigned lru:REDIS_LRU_BITS; /* lru time (relative to server.lruclock) */

    // 引用計數
    int refcount;

    // 指向實際值的指針
    void *ptr;

} robj;

上面1處,lru屬性,就是用來存儲這個。

創建對象時,直接使用當前系統時間創建

robj *createObject(int type, void *ptr) {

    robj *o = zmalloc(sizeof(*o));

    o->type = type;
    o->encoding = REDIS_ENCODING_RAW;
    o->ptr = ptr;
    o->refcount = 1;

    /*1 Set the LRU to the current lruclock (minutes resolution). */
    o->lru = LRU_CLOCK();
    return o;
}

1處即是。

robj *createEmbeddedStringObject(char *ptr, size_t len) {
    robj *o = zmalloc(sizeof(robj)+sizeof(struct sdshdr)+len+1);
    struct sdshdr *sh = (void*)(o+1);

    o->type = REDIS_STRING;
    o->encoding = REDIS_ENCODING_EMBSTR;
    o->ptr = sh+1;
    o->refcount = 1;
    // 1
    o->lru = LRU_CLOCK();

    sh->len = len;
    sh->free = 0;
    if (ptr) {
        memcpy(sh->buf,ptr,len);
        sh->buf[len] = '\0';
    } else {
        memset(sh->buf,0,len+1);
    }
    return o;
}

1處即是。

每次查找該key時,刷新時間

robj *lookupKey(redisDb *db, robj *key) {

    // 查找鍵空間
    dictEntry *de = dictFind(db->dict,key->ptr);

    // 節點存在
    if (de) {
        

        // 取出值
        robj *val = dictGetVal(de);

        /* Update the access time for the ageing algorithm.
         * Don't do it if we have a saving child, as this will trigger
         * a copy on write madness. */
        // 更新時間信息(只在不存在子進程時執行,防止破壞 copy-on-write 機制)
        if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
            // 1
            val->lru = LRU_CLOCK();

        // 返回值
        return val;
    } else {

        // 節點不存在

        return NULL;
    }
}

1處即是,包括get、set等各種操作,都會刷新該時間。

仔細看下面的堆棧,set的,get同理:

總結

大家有沒有更清楚一些呢?

總的來說,就是,設置了max-memory后,達到該內存限制后,會在處理命令時,檢查是否要進行內存淘汰;如果要淘汰,則根據maxmemory-policy的策略來。

隨機選擇maxmemory-sample個元素,按照空閑時間排序,拉鏈表;挨個挨個清除。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

南投搬家公司費用需注意的眉眉角角,別等搬了再說!

※教你寫出一流的銷售文案?

老大吩咐的可重入分佈式鎖,終於完美的實現了!!!

重做永遠比改造簡單

最近在做一個項目,將一個其他公司的實現系統(下文稱作舊系統),完整的整合到自己公司的系統(下文稱作新系統)中,這其中需要將對方實現的功能完整在自己系統也實現一遍。

舊系統還有一批存量商戶,為了不影響存量商戶的體驗,新系統提供的對外接口,還必須得跟以前一致。最後系統完整切換之後,功能只運行在新系統中,這就要求舊系統的數據還需要完整的遷移到新系統中。

當然這些在做這個項目之前就有預期,想過這個過程很難,但是沒想到有那麼難。原本感覺排期大半年,時間還是挺寬裕,現在感覺就是大坑,還不得不在坑裡一點點去填。

哎,說多都是淚,不吐槽了,等到下次做完再給大家復盤下真正心得體會。

回到正文,上篇文章Redis 分佈式鎖,咱們基於 Redis 實現一個分佈式鎖。這個分佈式鎖基本功能沒什麼問題,但是缺少可重入的特性,所以這篇文章小黑哥就帶大家來實現一下可重入的分佈式鎖。

本篇文章將會涉及以下內容:

  • 可重入
  • 基於 ThreadLocal 實現方案
  • 基於 Redis Hash 實現方案

先贊后看,養成習慣。微信搜索「程序通事」,關注就完事了~

可重入

說到可重入鎖,首先我們來看看一段來自 wiki 上可重入的解釋:

若一個程序或子程序可以“在任意時刻被中斷然後操作系統調度執行另外一段代碼,這段代碼又調用了該子程序不會出錯”,則稱其為可重入(reentrant或re-entrant)的。即當該子程序正在運行時,執行線程可以再次進入並執行它,仍然獲得符合設計時預期的結果。與多線程併發執行的線程安全不同,可重入強調對單個線程執行時重新進入同一個子程序仍然是安全的。

當一個線程執行一段代碼成功獲取鎖之後,繼續執行時,又遇到加鎖的代碼,可重入性就就保證線程能繼續執行,而不可重入就是需要等待鎖釋放之後,再次獲取鎖成功,才能繼續往下執行。

用一段 Java 代碼解釋可重入:

public synchronized void a() {
    b();
}

public synchronized void b() {
    // pass
}

假設 X 線程在 a 方法獲取鎖之後,繼續執行 b 方法,如果此時不可重入,線程就必須等待鎖釋放,再次爭搶鎖。

鎖明明是被 X 線程擁有,卻還需要等待自己釋放鎖,然後再去搶鎖,這看起來就很奇怪,我釋放我自己~

可重入性就可以解決這個尷尬的問題,當線程擁有鎖之後,往後再遇到加鎖方法,直接將加鎖次數加 1,然後再執行方法邏輯。退出加鎖方法之後,加鎖次數再減 1,當加鎖次數為 0 時,鎖才被真正的釋放。

可以看到可重入鎖最大特性就是計數,計算加鎖的次數。所以當可重入鎖需要在分佈式環境實現時,我們也就需要統計加鎖次數。

分佈式可重入鎖實現方式有兩種:

  • 基於 ThreadLocal 實現方案
  • 基於 Redis Hash 實現方案

首先我們看下基於 ThreadLocal 實現方案。

基於 ThreadLocal 實現方案

實現方式

Java 中 ThreadLocal可以使每個線程擁有自己的實例副本,我們可以利用這個特性對線程重入次數進行技術。

下面我們定義一個ThreadLocal的全局變量 LOCKS,內存存儲 Map 實例變量。

private static ThreadLocal<Map<String, Integer>> LOCKS = ThreadLocal.withInitial(HashMap::new);

每個線程都可以通過 ThreadLocal獲取自己的 Map實例,Mapkey 存儲鎖的名稱,而 value存儲鎖的重入次數。

加鎖的代碼如下:

/**
 * 可重入鎖
 *
 * @param lockName  鎖名字,代表需要爭臨界資源
 * @param request   唯一標識,可以使用 uuid,根據該值判斷是否可以重入
 * @param leaseTime 鎖釋放時間
 * @param unit      鎖釋放時間單位
 * @return
 */
public Boolean tryLock(String lockName, String request, long leaseTime, TimeUnit unit) {
    Map<String, Integer> counts = LOCKS.get();
    if (counts.containsKey(lockName)) {
        counts.put(lockName, counts.get(lockName) + 1);
        return true;
    } else {
        if (redisLock.tryLock(lockName, request, leaseTime, unit)) {
            counts.put(lockName, 1);
            return true;
        }
    }
    return false;
}

ps: redisLock#tryLock 為上一篇文章實現的分佈鎖。

由於公號外鏈無法直接跳轉,關注『程序通事』,回復分佈式鎖獲取源代碼。

加鎖方法首先判斷當前線程是否已經已經擁有該鎖,若已經擁有,直接對鎖的重入次數加 1。

若還沒擁有該鎖,則嘗試去 Redis 加鎖,加鎖成功之後,再對重入次數加 1 。

釋放鎖的代碼如下:

/**
 * 解鎖需要判斷不同線程池
 *
 * @param lockName
 * @param request
 */
public void unlock(String lockName, String request) {
    Map<String, Integer> counts = LOCKS.get();
    if (counts.getOrDefault(lockName, 0) <= 1) {
        counts.remove(lockName);
        Boolean result = redisLock.unlock(lockName, request);
        if (!result) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by lockName:+" + lockName + " with request: "
                    + request);
        }

    } else {
        counts.put(lockName, counts.get(lockName) - 1);
    }
}

釋放鎖的時首先判斷重入次數,若大於 1,則代表該鎖是被該線程擁有,所以直接將鎖重入次數減 1 即可。

若當前可重入次數小於等於 1,首先移除 Map中鎖對應的 key,然後再到 Redis 釋放鎖。

這裏需要注意的是,當鎖未被該線程擁有,直接解鎖,可重入次數也是小於等於 1 ,這次可能無法直接解鎖成功。

ThreadLocal 使用過程要記得及時清理內部存儲實例變量,防止發生內存泄漏,上下文數據串用等問題。

下次咱來聊聊最近使用 ThreadLocal 寫的 Bug。

相關問題

使用 ThreadLocal 這種本地記錄重入次數,雖然真的簡單高效,但是也存在一些問題。

過期時間問題

上述加鎖的代碼可以看到,重入加鎖時,僅僅對本地計數加 1 而已。這樣可能就會導致一種情況,由於業務執行過長,Redis 已經過期釋放鎖。

而再次重入加鎖時,由於本地還存在數據,認為鎖還在被持有,這就不符合實際情況。

如果要在本地增加過期時間,還需要考慮本地與 Redis 過期時間一致性的,代碼就會變得很複雜。

不同線程/進程可重入問題

狹義上可重入性應該只是對於同一線程的可重入,但是實際業務可能需要不同的應用線程之間可以重入同把鎖。

ThreadLocal的方案僅僅只能滿足同一線程重入,無法解決不同線程/進程之間重入問題。

不同線程/進程重入問題就需要使用下述方案 Redis Hash 方案解決。

基於 Redis Hash 可重入鎖

實現方式

ThreadLocal 的方案中我們使用了 Map 記載鎖的可重入次數,而 Redis 也同樣提供了 Hash (哈希表)這種可以存儲鍵值對數據結構。所以我們可以使用 Redis Hash 存儲的鎖的重入次數,然後利用 lua 腳本判斷邏輯。

加鎖的 lua 腳本如下:

---- 1 代表 true
---- 0 代表 false

if (redis.call('exists', KEYS[1]) == 0) then
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return 1;
end ;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return 1;
end ;
return 0;

如果 KEYS:[lock],ARGV[1000,uuid]

不熟悉 lua 語言同學也不要怕,上述邏輯還是比較簡單的。

加鎖代碼首先使用 Redis exists 命令判斷當前 lock 這個鎖是否存在。

如果鎖不存在的話,直接使用 hincrby創建一個鍵為 lock hash 表,並且為 Hash 表中鍵為 uuid 初始化為 0,然後再次加 1,最後再設置過期時間。

如果當前鎖存在,則使用 hexists判斷當前 lock 對應的 hash 表中是否存在 uuid 這個鍵,如果存在,再次使用 hincrby 加 1,最後再次設置過期時間。

最後如果上述兩個邏輯都不符合,直接返回。

加鎖代碼如下:

// 初始化代碼

String lockLuaScript = IOUtils.toString(ResourceUtils.getURL("classpath:lock.lua").openStream(), Charsets.UTF_8);
lockScript = new DefaultRedisScript<>(lockLuaScript, Boolean.class);

/**
 * 可重入鎖
 *
 * @param lockName  鎖名字,代表需要爭臨界資源
 * @param request   唯一標識,可以使用 uuid,根據該值判斷是否可以重入
 * @param leaseTime 鎖釋放時間
 * @param unit      鎖釋放時間單位
 * @return
 */
public Boolean tryLock(String lockName, String request, long leaseTime, TimeUnit unit) {
    long internalLockLeaseTime = unit.toMillis(leaseTime);
    return stringRedisTemplate.execute(lockScript, Lists.newArrayList(lockName), String.valueOf(internalLockLeaseTime), request);
}

Spring-Boot 2.2.7.RELEASE

只要搞懂 Lua 腳本加鎖邏輯,Java 代碼實現還是挺簡單的,直接使用 SpringBoot 提供的 StringRedisTemplate 即可。

解鎖的 Lua 腳本如下:

-- 判斷 hash set 可重入 key 的值是否等於 0
-- 如果為 0 代表 該可重入 key 不存在
if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then
    return nil;
end ;
-- 計算當前可重入次數
local counter = redis.call('hincrby', KEYS[1], ARGV[1], -1);
-- 小於等於 0 代表可以解鎖
if (counter > 0) then
    return 0;
else
    redis.call('del', KEYS[1]);
    return 1;
end ;
return nil;

首先使用 hexists 判斷 Redis Hash 表是否存給定的域。

如果 lock 對應 Hash 表不存在,或者 Hash 表不存在 uuid 這個 key,直接返回 nil

若存在的情況下,代表當前鎖被其持有,首先使用 hincrby使可重入次數減 1 ,然後判斷計算之後可重入次數,若小於等於 0,則使用 del 刪除這把鎖。

解鎖的 Java 代碼如下:

// 初始化代碼:


String unlockLuaScript = IOUtils.toString(ResourceUtils.getURL("classpath:unlock.lua").openStream(), Charsets.UTF_8);
unlockScript = new DefaultRedisScript<>(unlockLuaScript, Long.class);

/**
 * 解鎖
 * 若可重入 key 次數大於 1,將可重入 key 次數減 1 <br>
 * 解鎖 lua 腳本返回含義:<br>
 * 1:代表解鎖成功 <br>
 * 0:代表鎖未釋放,可重入次數減 1 <br>
 * nil:代表其他線程嘗試解鎖 <br>
 * <p>
 * 如果使用 DefaultRedisScript<Boolean>,由於 Spring-data-redis eval 類型轉化,<br>
 * 當 Redis 返回  Nil bulk, 默認將會轉化為 false,將會影響解鎖語義,所以下述使用:<br>
 * DefaultRedisScript<Long>
 * <p>
 * 具體轉化代碼請查看:<br>
 * JedisScriptReturnConverter<br>
 *
 * @param lockName 鎖名稱
 * @param request  唯一標識,可以使用 uuid
 * @throws IllegalMonitorStateException 解鎖之前,請先加鎖。若為加鎖,解鎖將會拋出該錯誤
 */
public void unlock(String lockName, String request) {
    Long result = stringRedisTemplate.execute(unlockScript, Lists.newArrayList(lockName), request);
    // 如果未返回值,代表其他線程嘗試解鎖
    if (result == null) {
        throw new IllegalMonitorStateException("attempt to unlock lock, not locked by lockName:+" + lockName + " with request: "
                + request);
    }
}

解鎖代碼執行方式與加鎖類似,只不過解鎖的執行結果返回類型使用 Long。這裏之所以沒有跟加鎖一樣使用 Boolean ,這是因為解鎖 lua 腳本中,三個返回值含義如下:

  • 1 代表解鎖成功,鎖被釋放
  • 0 代表可重入次數被減 1
  • null 代表其他線程嘗試解鎖,解鎖失敗

如果返回值使用 BooleanSpring-data-redis 進行類型轉換時將會把 null 轉為 false,這就會影響我們邏輯判斷,所以返回類型只好使用 Long

以下代碼來自 JedisScriptReturnConverter

相關問題

spring-data-redis 低版本問題

如果 Spring-Boot 使用 Jedis 作為連接客戶端,並且使用Redis Cluster 集群模式,需要使用 2.1.9 以上版本的spring-boot-starter-data-redis,不然執行過程中將會拋出:

org.springframework.dao.InvalidDataAccessApiUsageException: EvalSha is not supported in cluster environment.

如果當前應用無法升級 spring-data-redis也沒關係,可以使用如下方式,直接使用原生 Jedis 連接執行 lua 腳本。

以加鎖代碼為例:

public boolean tryLock(String lockName, String reentrantKey, long leaseTime, TimeUnit unit) {
    long internalLockLeaseTime = unit.toMillis(leaseTime);
    Boolean result = stringRedisTemplate.execute((RedisCallback<Boolean>) connection -> {
        Object innerResult = eval(connection.getNativeConnection(), lockScript, Lists.newArrayList(lockName), Lists.newArrayList(String.valueOf(internalLockLeaseTime), reentrantKey));
        return convert(innerResult);
    });
    return result;
}

private Object eval(Object nativeConnection, RedisScript redisScript, final List<String> keys, final List<String> args) {

    Object innerResult = null;
    // 集群模式和單點模式雖然執行腳本的方法一樣,但是沒有共同的接口,所以只能分開執行
    // 集群
    if (nativeConnection instanceof JedisCluster) {
        innerResult = evalByCluster((JedisCluster) nativeConnection, redisScript, keys, args);
    }
    // 單點
    else if (nativeConnection instanceof Jedis) {
        innerResult = evalBySingle((Jedis) nativeConnection, redisScript, keys, args);
    }
    return innerResult;
}

數據類型轉化問題

如果使用 Jedis 原生連接執行 Lua 腳本,那麼可能又會碰到數據類型的轉換坑。

可以看到 Jedis#eval返回 Object,我們需要具體根據 Lua 腳本的返回值的,再進行相關轉化。這其中就涉及到 Lua 數據類型轉化為 Redis 數據類型。

下面主要我們來講下 Lua 數據轉化 Redis 的規則中幾條比較容易踩坑:

1、Lua number 與 Redis 數據類型轉換

Lua 中 number 類型是一個雙精度的浮點數,但是 Redis 只支持整數類型,所以這個轉化過程將會丟棄小數位。

2、Lua boolean 與 Redis 類型轉換

這個轉化比較容易踩坑,Redis 中是不存在 boolean 類型,所以當Lua 中 true 將會轉為 Redis 整數 1。而 Lua 中 false 並不是轉化整數,而是轉化 null 返回給客戶端。

3、Lua nil 與 Redis 類型轉換

Lua nil 可以當做是一個空值,可以等同於 Java 中的 null。在 Lua 中如果 nil 出現在條件表達式,將會當做 false 處理。

所以 Lua nil 也將會 null 返回給客戶端。

其他轉化規則比較簡單,詳情參考:

http://doc.redisfans.com/script/eval.html

總結

可重入分佈式鎖關鍵在於對於鎖重入的計數,這篇文章主要給出兩種解決方案,一種基於 ThreadLocal 實現方案,這種方案實現簡單,運行也比較高效。但是若要處理鎖過期的問題,代碼實現就比較複雜。

另外一種採用 Redis Hash 數據結構實現方案,解決了 ThreadLocal 的缺陷,但是代碼實現難度稍大,需要熟悉 Lua 腳本,以及Redis 一些命令。另外使用 spring-data-redis 等操作 Redis 時不經意間就會遇到各種問題。

幫助

https://www.sofastack.tech/blog/sofa-jraft-rheakv-distributedlock/

https://tech.meituan.com/2016/09/29/distributed-system-mutually-exclusive-idempotence-cerberus-gtis.html

最後說兩句(求關注)

看完文章,哥哥姐姐們點個吧,周更真的超累,不知覺又寫了两天,拒絕白嫖,來點正反饋唄~。

最後感謝各位的閱讀,才疏學淺,難免存在紕漏,如果你發現錯誤的地方,可以留言指出。如果看完文章還有其他不懂的地方,歡迎加我,互相學習,一起成長~

最後謝謝大家支持~

最最後,重要的事再說一篇~

快來關注我呀~
快來關注我呀~
快來關注我呀~

歡迎關注我的公眾號:程序通事,獲得日常乾貨推送。如果您對我的專題內容感興趣,也可以關注我的博客:studyidea.cn

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司"嚨底家"!

※別再煩惱如何寫文案,掌握八大原則!

※產品缺大量曝光嗎?你需要的是一流包裝設計!

面試官:線程池如何按照core、max、queue的執行循序去執行?(內附詳細解析)

前言

這是一個真實的面試題。

前幾天一個朋友在群里分享了他剛剛面試候選者時問的問題:“線程池如何按照core、max、queue的執行循序去執行?”

我們都知道線程池中代碼執行順序是:corePool->workQueue->maxPool,源碼我都看過,你現在問題讓我改源碼??

一時間群里炸開了鍋,小夥伴們紛紛打聽他所在的公司,然後拉黑避坑。(手動狗頭,大家一起調侃٩(๑ᴗ๑)۶)

關於線程池他一共問了這麼幾個問題:

  • 線程池如何按照core、max、queue的順序去執行?
  • 子線程拋出的異常,主線程能感知到么?
  • 線程池發生了異常改怎樣處理?

全是一些有意思的問題,我之前也寫過一篇很詳細的圖文教程:【萬字圖文-原創】 | 學會Java中的線程池,這一篇也許就夠了! ,不了解的小夥伴可以再回顧下~

但是針對這幾個問題,可能大家一時間也有點懵。今天的文章我們以源碼為基礎來分析下該如何回答這三個問題。(之前沒閱讀過源碼也沒關係,所有的分析都會貼出源碼及圖解)

線程池如何按照core、max、queue的順序執行?

問題思考

對於這個問題,很多小夥伴肯定會疑惑:“別人源碼中寫好的執行流程你為啥要改?這面試官腦子有病吧……”

這裏來思考一下現實工作場景中是否有這種需求?之前也看到過一份簡歷也寫到過這個問題:

一個線程池執行的任務屬於IO密集型,CPU大多屬於閑置狀態,系統資源未充分利用。如果一瞬間來了大量請求,如果線程池數量大於coreSize時,多餘的請求都會放入到等待隊列中。等待着corePool中的線程執行完成后再來執行等待隊列中的任務。

試想一下,這種場景我們該如何優化?

我們可以修改線程池的執行順序為corePool->maxPool->workQueue。 這樣就能夠充分利用CPU資源,提交的任務會被優先執行。當線程池中線程數量大於maxSize時才會將任務放入等待隊列中。

你就說巧不巧?面試官的這個問題顯然是經過認真思考來提問的,這是一個很有意思的溫恩提,下面就一起看看如何解決吧。

線程池運行流程

我們都知道線程池執行流程是先corePoolworkQueue,最後才是maxPool的一個執行流程。

線程池核心參數

在回顧下ThreadPoolExecutor.execute()源碼前我們先回顧下線程池中的幾個重要參數:

我們來看下這幾個參數的定義:
corePoolSize: 線程池中核心線程數量
maximumPoolSize: 線程池中最大線程數量
keepAliveTime: 非核心的空閑線程等待新任務的時間
unit: 時間單位。配合allowCoreThreadTimeOut也會清理核心線程池中的線程。
workQueue: 基於Blocking的任務隊列,最好選用有界隊列,指定隊列長度
threadFactory: 線程工廠,最好自定義線程工廠,可以自定義每個線程的名稱
handler: 拒絕策略,默認是AbortPolicy

ThreadPoolExecutor.execute()源碼分析

我們可以看下execute()如下:

接着來分析下執行過程:

  1. 第一步:workerCountOf(c)時間計算當前線程池中線程的個數,當線程個數小於核心線程數
  2. 第二步:線程池線程數量大於核心線程數,此時提交的任務會放入workQueue中,使用offer()進行操作
  3. 第三步:workQueue.offer()執行失敗,新提交的任務會直接執行,addWorker()會判斷如果當前線程池數量大於最大線程數,則執行拒絕策略

好了,到了這裏我們都已經很清楚了,關鍵在於第二步和第三步如何交換順序執行呢?

解決思路

仔細想一想,如果修改workQueue.offer()的實現不就可以達到目的了?我們先來畫圖來看一下:

現在的問題就在於,如果當前線程池中coreSize < workCount < maxSize時,一定會先執行offer()操作。

我們如果修改offer的實現是否可以完成執行順序的更換呢?這裏也是畫圖來展示一下:

Dubbo中EagerThreadPool解決方案

湊巧Dubbo中也有類似的實現,在DubboEagerThreadPool自定義了一個BlockingQueue,在offer()方法中,如果當前線程池數量小於最大線程池時,直接返回false,這裏就達到了調節線程池執行順序的目的。

源碼直達:https://github.com/apache/dubbo/blob/master/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/TaskQueue.java

看到這裏一切都真相大白了,解決思路以及方案都很簡單,學會了沒有?

這個問題背後還隱藏了一些場景的優化、源碼的擴展等等知識,果然是一個值得思考的好問題。

子線程拋出的異常,主線程能感知到么?

問題思考

這個問題其實也很容易回答,也僅僅是一個面試題而已,實際工作中子線程的異常不應該由主線程來捕獲。

針對這個問題,希望大家清楚的是: 我們要明確線程代碼的邊界,異步化過程中,子線程拋出的異常應該由子線程自己去處理,而不是需要主線程感知來協助處理。

解決方案

解決方案很簡單,在虛擬機中,當一個線程如果沒有顯式處理異常而拋出時會將該異常事件報告給該線程對象的 java.lang.Thread.UncaughtExceptionHandler 進行處理,如果線程沒有設置 UncaughtExceptionHandler,則默認會把異常棧信息輸出到終端而使程序直接崩潰。

所以如果我們想在線程意外崩潰時做一些處理就可以通過實現 UncaughtExceptionHandler 來滿足需求。

我們使用線程池設置ThreadFactory時可以指定UncaughtExceptionHandler,這樣就可以捕獲到子線程拋出的異常了。

代碼示例

具體代碼如下:

/**
 * 測試子線程異常問題
 *
 * @author wangmeng
 * @date 2020/6/13 18:08
 */
public class ThreadPoolExceptionTest {

    public static void main(String[] args) throws InterruptedException {
        MyHandler myHandler = new MyHandler();
        ExecutorService execute = new ThreadPoolExecutor(10, 10,
                0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setUncaughtExceptionHandler(myHandler).build());

        TimeUnit.SECONDS.sleep(5);
        for (int i = 0; i < 10; i++) {
            execute.execute(new MyRunner());
        }
    }


    private static class MyRunner implements Runnable {
        @Override
        public void run() {
            int count = 0;
            while (true) {
                count++;
                System.out.println("我要開始生產Bug了============");
                if (count == 10) {
                    System.out.println(1 / 0);
                }

                if (count == 20) {
                    System.out.println("這裡是不會執行到的==========");
                    break;
                }
            }
        }
    }
}

class MyHandler implements Thread.UncaughtExceptionHandler {
    private final static Logger LOGGER = LoggerFactory.getLogger(MyHandler.class);
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        LOGGER.error("threadId = {}, threadName = {}, ex = {}", t.getId(), t.getName(), e.getMessage());
    }
}

執行結果:

UncaughtExceptionHandler 解析

我們來看下Thread中的內部接口UncaughtExceptionHandler

public class Thread {
    ......
    /**
     * 當一個線程因未捕獲的異常而即將終止時虛擬機將使用 Thread.getUncaughtExceptionHandler()
     * 獲取已經設置的 UncaughtExceptionHandler 實例,並通過調用其 uncaughtException(...) 方
     * 法而傳遞相關異常信息。
     * 如果一個線程沒有明確設置其 UncaughtExceptionHandler,則將其 ThreadGroup 對象作為其
     * handler,如果 ThreadGroup 對象對異常沒有什麼特殊的要求,則 ThreadGroup 會將調用轉發給
     * 默認的未捕獲異常處理器(即 Thread 類中定義的靜態未捕獲異常處理器對象)。
     *
     * @see #setDefaultUncaughtExceptionHandler
     * @see #setUncaughtExceptionHandler
     * @see ThreadGroup#uncaughtException
     */
    @FunctionalInterface
    public interface UncaughtExceptionHandler {
        /**
         * 未捕獲異常崩潰時回調此方法
         */
        void uncaughtException(Thread t, Throwable e);
    }

    /**
     * 靜態方法,用於設置一個默認的全局異常處理器。
     */
    public static void setDefaultUncaughtExceptionHandler(UncaughtExceptionHandler eh) {
         defaultUncaughtExceptionHandler = eh;
     }

    /**
     * 針對某個 Thread 對象的方法,用於對特定的線程進行未捕獲的異常處理。
     */
    public void setUncaughtExceptionHandler(UncaughtExceptionHandler eh) {
        checkAccess();
        uncaughtExceptionHandler = eh;
    }

    /**
     * 當 Thread 崩潰時會調用該方法獲取當前線程的 handler,獲取不到就會調用 group(handler 類型)。
     * group 是 Thread 類的 ThreadGroup 類型屬性,在 Thread 構造中實例化。
     */
    public UncaughtExceptionHandler getUncaughtExceptionHandler() {
        return uncaughtExceptionHandler != null ?
            uncaughtExceptionHandler : group;
    }

    /**
     * 線程全局默認 handler。
     */
    public static UncaughtExceptionHandler getDefaultUncaughtExceptionHandler() {
        return defaultUncaughtExceptionHandler;
    }
    ......
}

部分內容參考自:https://mp.weixin.qq.com/s/ghnNQnpou6-NemhFjpl4Jg

線程池發生了異常改怎樣處理?

線程池中線程運行過程中出現了異常該怎樣處理呢?線程池提交任務有兩種方式,分別是execute()submit(),這裡會依次說明。

ThreadPoolExecutor.runWorker()實現

不管是使用execute()還是submit()提交任務,最終都會執行到ThreadPoolExecutor.runWorker(),我們來看下源碼(源碼基於JDK1.8):

我們看到在執行task.run()時,出現異常會直接向上拋出,這裏處理的最好的方式就是在我們業務代碼中使用try...catch()來捕獲異常。

FutureTask.run()實現

如果我們使用submit()來提交任務,在ThreadPoolExecutor.runWorker()方法執行時最終會調用到FutureTask.run()方法裏面去,不清楚的小夥伴也可以看下我之前的文章:

線程池續:你必須要知道的線程池submit()實現原理之FutureTask!

這裏可以看到,如果業務代碼拋出異常后,會被catch捕獲到,然後調用setExeception()方法:

可以看到其實類似於直接吞掉了,當我們調用get()方法的時候異常信息會包裝到FutureTask內部的變量outcome中,我們也會獲取到對應的異常信息。

ThreadPoolExecutor.runWorker()最後finally中有一個afterExecute()鈎子方法,如果我們重寫了afterExecute()方法,就可以獲取到子線程拋出的具體異常信息Throwable了。

結論

對於線程池、包括線程的異常處理推薦以下方式:

  1. 直接使用try/catch,這個也是最推薦的方式
  2. 在我們構造線程池的時候,重寫uncaughtException()方法,上面示例代碼也有提到:
public class ThreadPoolExceptionTest {

    public static void main(String[] args) throws InterruptedException {
        MyHandler myHandler = new MyHandler();
        ExecutorService execute = new ThreadPoolExecutor(10, 10,
                0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setUncaughtExceptionHandler(myHandler).build());

        TimeUnit.SECONDS.sleep(5);
        for (int i = 0; i < 10; i++) {
            execute.execute(new MyRunner());
        }
    }
}

class MyHandler implements Thread.UncaughtExceptionHandler {
    private final static Logger LOGGER = LoggerFactory.getLogger(MyHandler.class);
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        LOGGER.error("threadId = {}, threadName = {}, ex = {}", t.getId(), t.getName(), e.getMessage());
    }
}

3 直接重寫afterExecute()方法,感知異常細節

總結

這篇文章到這裏就結束了,不知道小夥伴們有沒有一些感悟或收穫?

通過這幾個面試問題,我也深刻的感受到學習知識要多思考,看源碼的過程中要多設置一些場景,這樣才會收穫更多。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※產品缺大量曝光嗎?你需要的是一流包裝設計!

[.NET 開源] 高性能的 Swifter.MessagePack 已發布,併發布新版本的 Swifter.Json 和 Swifter.Data。

抱歉各位朋友,由於各種私事公事,本應該在 19 年底發布的 Swifter.MessagePack 庫延遲了這麼久才發布,我深感抱歉。

MsgPack 簡介

MsgPack 一種非常輕巧的二進制數據交換格式,巧妙的設計讓它相比其他二進制數據格式更可讀,並且有着不錯的壓縮率和邏輯性能,是目前相當火熱的數據交換格式。

Swifter.MessagePack 遵循 MsgPack 新的規範實現;相比 .NET 其他 MsgPack 序列化庫,Swifter.MessagePack 有着更好的性能,生成的內容更緊湊合理且更簡單易用。

Nuget:Swifter.MessagePackSwifter.JsonSwifter.Data

GitHub:Swifter.MessagePackSwifter.Json

如果您想使用 Swifter 庫,請在 Nuget 上安裝/下載最新版本,如需單文件版本,請自行生成/合併。

 

簡單使用 Swifter.MessagePack

MessagePackFormatter 類內部還有十個方法重載,包括靜態和實例方法,總有一些適合您;這些方法都是線程安全的。

更多使用方法請參考早期關於 Swifter.Json 的文章,GitHub 或 Wiki;學習交流進 Swifter 的 QQ 群:133630914(新群,歡迎加入)。

 

Swifter 框架的特性

(1) Swifter 可以運行在 .NET Framework 2.0+, .NET Core 2.0+, .NET Standard 2.0+, MONO JIT, MONO AOT, Xamarin.Android, Xamarin.iOS, Unity JIT 等平台/運行時上,Unity IL2CPP 運行時由於沒有我們測試環境,不知可否正常運行,更多信息請看下面的 AOT 說明

(2) Swifter 有着深層的抽象封裝,這雖然帶來了一些性能和內存的損耗,但也獲得了更高的擴展性;Swifter.Json/Swifter.MessagePack/Swifter.Data 的可公用的代碼非常多,這使得在 Swifter 上實現一個新的序列化庫只需要編寫少量代碼即可實現,這是其他框架難實現的。

(3) 雖然 Swifter 有很多接口和抽象編程,但是 Swifter 並沒有因此比其他的框架慢或內存佔用大,反比它們更快和更小內存佔用;這是因為 Swifter 從來都是使用更好算法和邏輯來獲取性能,而不是使用更直接的代碼獲取直接的性能。

(4) 作為類庫開發者,我們深知每個人開發和測試的側重點都與他人不一樣,自己找出自己的問題太難,所以 Swifter.Json 和 Swifter.MessagePack 除了我們自己的測試單元之外, 還 “偷” 了 Newtonsoft, Neuecc 和 Spanjson 的 5000+ 個測試單元( 去除了 Newtonsoft 的部分測試單元);現已測試通過 4200+ 個,不通過 800+ 個是我們認為可以允許或是更加合理的行為。(不勞而獲的測試單元確實用着很爽,但事實是我們”搬”這些測試單元用了 3 天,無腦替換改到手指抽筋)

 

Swifter.Json 和 Swifter.MessagePack

(1) Swifter.MessagePack 和 Swifter.Json 一樣,都有着非常優異的性能和極小的額外內存分配。

(2) Swifter.MessagePack 和 Swifter.Json 的 API 大致相同,如果使用者同時使用它們,那麼可以極小成本在它們之間切換。

(3) 得益於 Swifter.Core 的強大數據映射,Swifter.MessagePack 和 Swifter.Json 都同時支持 .NET 上大多數常用的數據結構和類型。

(4) Swifter.MessagePack 和 Swifter.Json 對重複引用的對象的表示方式不一樣,在開啟 MultiReferencingReference 配置項后,Swifter.Json 將使用 { “$ref”: “#/obj/1/target” } 來表示重複引用的對象,而 Swifter.MessagePack 使用對象在 MsgPack 內容的偏移量表示重複引用的對象;相比之下 Swifter.MessagePack 的方案更簡單性能更快,但是可讀性較差,不過說來 MsgPack 本來就是要專門的工具才能閱讀。

(5) Swifter.MessagePack 在序列化基礎類型時,在保證精度不丟失的前提下,將大數據類型轉換為更小數據類型,以得到更緊湊的 MsgPack 內容(如將 double 123 轉換為 int 123,int 123 只需要 1 個字節即可表示,如果不做轉換則需要 9 個字節表示)。

(6) Swifter.MessagePack 在序列化未知長度的集合時(如 Enumerable<T>),會將長度定義為四字節 (FixArray32),然後在寫入完成后把實際長度賦予這四字節長度;這樣雖然在較短的未知長度集合時,將產生 1-3 個 0;但是這避免了將未知長度的集合轉換為 List<T> 或 T[], 這提高了性能也減少了內存分配,這是不虧的(因為未知長度的集合很常用,如 Linq,DbDataReader 等)。

 

新版本做了啥?

(1) 主要是解決了已知 BUG,包括了 Issues 上提到的幾個。

(2) 允許將 “” 值解析為 DateTime, int?, double 等基礎類型的默認值,但是需要啟用 EmptyStringAsDefault 配置項,默認未開啟。

(3) 解決了 Swifter.Json 浮點數: float, double 失真的問題,並增加了 UseSystemFloatingPointsMethods 配置項使用系統的浮點數方法,此配置項的更多說明請看該配置項的註釋。

(4) 增加了序列化的事件:ObjectFiltering 和 ArrayFiltering,這兩個事件可以對正在序列化中的 鍵/值 做處理和篩選,包括駝峰命名法,忽略一些值等。它們被放在 JsonFormatter 和 MessagePackFormatter 的實例裏面。

(5) 增加了 .NET 對象的持久序列化和反序列化功能,這個功能將對象序列化為包含類型信息和字段值的內容,不包含邏輯信息;使用 SerializationBox<T> 盒子使用此功能。圖示:

更多新增的功能請繼續看以下內容。

 

 

AOT

在 Swifter 新版本里,AOT 的 JIT 的界限更加明顯,由 VersionDifferences.IsSupportEmit 字段標識;當這個字段為 true 表示當前平台是 JIT 運行時,Swifter 將在一些類中使用 Emit 技術提高性能;當此字段為 false 時,Swifter 會完全不使用 Emit 技術。

因我們設備有限,無法提供大規模的平台測試,但我們非常希望可以 Swifter 可以支持更多的平台,所以希望朋友們加 Swifter 交流 QQ 群:(133630914),在這裏我們可以更快的提供反饋。

 

直接文檔讀取/寫入的 API

通常情況下,將小型對象序列化為 Json/MsgPack 和將小型 Json/MsgPack 反序列化為對象是 .NET 程序中常見的操作,Swifter 也正以此為常用場景做優化,所以 Swifter 在對小型數據操作時性能最佳,且相比其他 Json/MsgPack 解析庫優勢明顯。

但在大型數據下優勢減少,這主要原因是大型數據的存儲需要實體類或字典/集合存儲,創建/填充/遍歷這些對象消耗了大量資源(接口編程的損耗);所以 Swifter 提供了直接讀取/寫入的 API 來繞開了對存儲介質的操作,以更快更小損耗的讀寫大型數據。

使用 JsonFormatter.CreateJsonReader/MessagePackFormatter.CreateMessagePackReader 函數來創建文檔讀取器,使用 JsonFormatter.CreateJsonWriter/MessagePackFormatter.CreateMessagePackWriter 函數來創建文檔寫入器。

使用文檔讀取器完整的讀取一個 Json/MsgPack 文檔將比反序列化為對象快 4-8 倍!使用文檔寫入器生成文檔的性能與將實體類序列化為 Json/MsgPack 相差較小,前提是您已構建好了這些對象。

讀取器演示:

寫入器演示:

 

擁有簡單預測數組的長度的能力

Swifter 在對小型數組,部分集合寫入時,會根據數組的類型,來源(Data,Json,MsgPack 等),名稱等信息並結合之前的一些長度記錄,簡單的預測出新的數組的長度;在寫入完成后,如果預測長度與實際長度不符,則擴展或壓縮為實際長度;如果與實際長度相符,則不需要重新創建新數組。此能力有效提高反序列化小型數組和部分集合性能,並且減少額外內存分配。

在其他高性能的 Json 解析庫,它們使用 ArrayPool<T> 同樣可以提高性能和減少內存分配;但是由於 Swifter 對兼容性的要求,使得我們不能使用 ArrayPool<T> 方案;在數組的長度比較穩定的情況下,我們的方案更好;但在數組長度非常不穩定的情況下,我們的方案可能仍需要 1-3 次的擴容/壓縮。

 

假定有序的對象反序列化

Swifter.Json 和 Swifter.MessagePack 都支持了假定有序的對象反序列化,當一個 Json/MsgPack 的對象與當前的實體類對象的字段順序一致時,將有效提升反序列化性能。

此操作默認不開啟,可以使用 AsOrderedObjectDeserialize 配置項開啟。

 

高性能的反射封裝

Swifter.Core 里提供了一些對反射封裝的類,它們放在 Swifter.Reflection 命名空間下;這些類型主要功能就是提高了系統反射的性能;XObjectRW 正是使用它們實現不依賴 Emit 的高性能對象讀寫器。

雖然放棄一些安全性檢查可以提高更多的性能,但是我們並沒有這麼做;我們仍然有類型安全檢查和防溢出檢查(事實上讀寫字段和屬性大多數的損害都在這裏,如果去掉這些檢查將得到上百倍的性能;事實上這些檢查只起到了提示程序員不能這麼做的作用,程序實際運行時這些檢查無意義)。

 

高效的数字 ToString 和 Parse 方法

Swifter.Core 提供了一些高性能数字算法,包括 Int64, UInt64, Double, Single, Decimal 的 Parse 和 ToString 算法,它們被放在 Swifter.Tools.NumberHelper 里,這些算法被應用與 Swifter.Json 和一些其他地方,這些算法支持 2-64 進制。

 

XConvert 萬能類型轉換器

Swifter.Tools.XConvert.Convert<TSource, TDestination> 是一個功能強大的萬能類型轉換函數,它在初始化時嘗試以下方式獲取合適的轉換函數:

(1) 包含在 System.Convert 里的基礎轉換函數;

(2) 類型兼容的隱式轉換(如:從子類轉換為父類,從 Int32 轉換為 Int64,從 Int64 轉換為 Double)。

(3) 原類型和目標類型中的 static implicit operator (隱式轉換) 函數。

(4) 原類型中的 ToXXX 實例函數。

(5) 目標類型中的 Parse 和 ValueOf 靜態函數。

(7) 目標類型的構造函數。

(8) 原類型和目標類型中的 static explicit operator (顯式轉換) 函數。

(9) 當以上方法都沒有找到合適函數時,將使用 (TDestination)(object)value 進行強制轉換。

簡單示例:

 

性能測試

ServiceStack.Json, Jil, LitJson, NetJson 等庫因為出錯太多未展示出來;如果有需要,您可以到 GitHub 上自行克隆/修改/運行,已收錄了 .NET 的大多數 Json 序列化庫。

 

更多實用功能等你發現…

Swifter.Core 還提供了許許多多的工具類,包括反射,委託,類型轉換,字符串,加密,哈希,数字,日期,數組和集合等工具,它們被放在 Swifter.Tools 命名空間下,您可以使用它們來提高開發效率和運行效率。

Swifter.RW 命名空間是整個 Swifter 框架的核心,它主要邏輯是:從讀取器中讀取值,寫入到寫入器中;如:從 JsonReader 讀取值到 ObjectWriter 或 DictionaryWriter 中;熟悉它們就等於精通了 Swifter 框架。

Swifter.Json/Swifter.MessagePack 有一個非常重要的配置項 JsonFormatterOptions/MessagePackFormatterOptions;使用前建議先閱讀它們,以配置更適合您系統的序列化和反序列化方案。

 

最後附上 Swifter.Data 的簡介

Swifter.Data 是一個小型的 ORM 工具,它相比 Dapper 性能要快一些,功能要強大一些。

 

感謝閱讀

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※別再煩惱如何寫文案,掌握八大原則!

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※超省錢租車方案

※教你寫出一流的銷售文案?

網頁設計最專業,超強功能平台可客製化

※產品缺大量曝光嗎?你需要的是一流包裝設計!

foreach 集合又拋經典異常了,這次一定要刨根問底

一:背景

1. 講故事

最近同事在寫一段業務邏輯的時候,程序跑起來總是報:集合已修改;可能無法執行枚舉操作,硬是沒有找到什麼情況下會導致這個異常產生,就讓我來找一下bug,其實這個異常在座的每個程序員幾乎都遇到過,誰也不是一生下就是大牛,簡單看了下代碼,確實是多線程操作foreach,但並沒有對foreach進行Add,Remove操作,掃完代碼其實我也是有點懵,沒撤只能調試了,在foreach里套一層trycatch,查看異常的線程堆棧從而找出了問題代碼,代碼簡化如下:


        static void Main(string[] args)
        {
            var dict = new Dictionary<int, int>()
            {
                [1001] = 1,
                [1002] = 10,
                [1003] = 20
            };

            foreach (var userid in dict.Keys)
            {
                dict[userid] = dict[userid] + 1;
            }
        }

先尋找點安慰,說實話,憑肉眼你覺得這段代碼會拋出異常嗎? 反正我是被騙過了,大寫的尷尬,結論如下,運行一下便知。

從圖中看確實是異常,說明在foreach的過程中連迭代集合的 value 都不可以修改,這讓我激起了強烈的探索欲,看看FCL中到底是怎麼限制的。

二:源碼探索

1. 從IL中尋找答案

C#已發展到 9.0 了,到處都充斥着語法糖,有時候不看一下底層的IL都不知道到底是轉化成了什麼,所以這個是必須的。


	IL_000d: callvirt instance void class [System.Collections]System.Collections.Generic.Dictionary`2<int32, int32>::set_Item(!0, !1)
	IL_001b: callvirt instance void class [System.Collections]System.Collections.Generic.Dictionary`2<int32, int32>::set_Item(!0, !1)
	IL_0029: callvirt instance void class [System.Collections]System.Collections.Generic.Dictionary`2<int32, int32>::set_Item(!0, !1)
	IL_0037: callvirt instance valuetype [System.Collections]System.Collections.Generic.Dictionary`2/KeyCollection/Enumerator<!0, !1> class [System.Collections]System.Collections.Generic.Dictionary`2/KeyCollection<int32, int32>::GetEnumerator()

	.try
	{
		IL_003d: br.s IL_005a
		// loop start (head: IL_005a)
			IL_003f: ldloca.s 1
			IL_0041: call instance !0 valuetype [System.Collections]System.Collections.Generic.Dictionary`2/KeyCollection/Enumerator<int32, int32>::get_Current()
			IL_004c: callvirt instance !1 class [System.Collections]System.Collections.Generic.Dictionary`2<int32, int32>::get_Item(!0)
			IL_0053: callvirt instance void class [System.Collections]System.Collections.Generic.Dictionary`2<int32, int32>::set_Item(!0, !1)
			IL_005a: ldloca.s 1
			IL_005c: call instance bool valuetype [System.Collections]System.Collections.Generic.Dictionary`2/KeyCollection/Enumerator<int32, int32>::MoveNext()
			IL_0061: brtrue.s IL_003f
		// end loop

		IL_0063: leave.s IL_0074
	} // end .try
	finally
	{

	} // end handler    

從IL代碼中可以看到,先執行了三次字典的索引器操作,然後調用了 Dictionary.GetEnumerator 來生成字典的迭代類,這思路就非常清晰了,然後我們看一下類索引器都做了些什麼。

從圖中可以看到,每一次的索引器操作,這裏都執行了version++,所以字典初始化完成之後,這裏的 version=3,沒有問題吧,然後繼續看代碼,尋找 Dictionary.GetEnumerator 方法啟動迭代類。

上面代碼的 _version = dictionary._version; 一定要看仔細了,在啟動迭代類的時候記錄了當時字典的版本號,也就是_version=3,然後繼續探索moveNext方法幹了什麼,如下圖:

從圖中可以看到,當每次執行moveNext的過程中,都會判斷一下字典的 version 和 當初初始化迭代類中的version 版本號是否一致,如果不一致就拋出異常,所以這行代碼就是點睛之筆了,當在foreach體中執行了 dict[userid] = dict[userid] + 1; 語句,相當於又執行了一次類索引器操作,這時候字典的version就變成 4 了,而當初初始化迭代類的時候還是3,自然下一次執行 moveNext 就是 3 != 4 拋出異常了。

如果你非要讓我證明給你看,這裏可以使用dnspy直接調試源碼,在異常那裡下一個斷點再查看兩個version版本號不就知道啦。。。

2. 面對疾風

有些朋友可能要說,碼農今天分享的這篇一點水準都沒有,我18年前就知道字典是不能動態修改的,還分析的頭頭是勁。

但是我有話要說,這個還確實是我的一個盲區,平時在迭代字典的時候value一般都是引用類型,動態修改引用類型的值自然是沒有問題的,這是因為你不管怎麼修改都不會改變 _version 版本號,但質疑我的也不要把話說的太滿,因為這種操作是非常語義化非常大眾的需求,你能保證後面net版本不支持這個嗎??? 如果你說不可能,那恭喜你,被我帶到坑裡面去啦。

下面我用原封不動的代碼在 .net 5 下跑一次,睜大眼睛好好看哦~~~

驚訝吧, 居然在 .Net 5 中可以的,接下來用ILSpy去查查底層源碼,.netcore 3.1 和 net5 中分別對 類索引器 都做了啥修改。

  • netcore 3.1

Path: C:\Program Files\dotnet\shared\Microsoft.NETCore.App\3.1.2\System.Private.CoreLib.dll

  • net5

Path: C:\Program Files\dotnet\shared\Microsoft.NETCore.App\5.0.0-preview.5.20278.1\System.Private.CoreLib.dll

對比兩張圖你會發現 .Net5 中並沒有做 _version++ 操作,這就了,如果你再細讀代碼,你還發現 .Net5 對字典進行了較大幅度的優化,哈哈,當初在 .Net5 之前產生的錯誤,在 .Net5 中居然沒有啦!

四: 總結

源碼面前,不談隱私,沒事多翻翻源碼,有可能還有意外收穫,比如在 .Net 5下的這點新發現,可能還是全網第一個哦,這要是兩個大牛爭吵,讓小白去相信誰呢,嘿嘿,源碼才是真正的專家~

如您有更多問題與我互動,掃描下方進來吧~

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※教你寫出一流的銷售文案?

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※回頭車貨運收費標準

※別再煩惱如何寫文案,掌握八大原則!

※超省錢租車方案

※產品缺大量曝光嗎?你需要的是一流包裝設計!

.Net Core微服務入門全紀錄(三)——Consul-服務註冊與發現(下)

前言

上一篇【.Net Core微服務入門全紀錄(二)——Consul-服務註冊與發現(上)】已經成功將我們的服務註冊到Consul中,接下來就該客戶端通過Consul去做服務發現了。

服務發現

  • 同樣Nuget安裝一下Consul:

  • 改造一下業務系統的代碼:

ServiceHelper.cs:

    public class ServiceHelper : IServiceHelper
    {
        private readonly IConfiguration _configuration;

        public ServiceHelper(IConfiguration configuration)
        {
            _configuration = configuration;
        }

        public async Task<string> GetOrder()
        {
            //string[] serviceUrls = { "http://localhost:9060", "http://localhost:9061", "http://localhost:9062" };//訂單服務的地址,可以放在配置文件或者數據庫等等...

            var consulClient = new ConsulClient(c =>
            {
                //consul地址
                c.Address = new Uri(_configuration["ConsulSetting:ConsulAddress"]);
            });

            //consulClient.Catalog.Services().Result.Response;
            //consulClient.Agent.Services().Result.Response;
            var services = consulClient.Health.Service("OrderService", null, true, null).Result.Response;//健康的服務

            string[] serviceUrls = services.Select(p => $"http://{p.Service.Address + ":" + p.Service.Port}").ToArray();//訂單服務地址列表

            if (!serviceUrls.Any())
            {
                return await Task.FromResult("【訂單服務】服務列表為空");
            }

            //每次隨機訪問一個服務實例
            var Client = new RestClient(serviceUrls[new Random().Next(0, serviceUrls.Length)]);
            var request = new RestRequest("/orders", Method.GET);

            var response = await Client.ExecuteAsync(request);
            return response.Content;
        }

        public async Task<string> GetProduct()
        {
            //string[] serviceUrls = { "http://localhost:9050", "http://localhost:9051", "http://localhost:9052" };//產品服務的地址,可以放在配置文件或者數據庫等等...

            var consulClient = new ConsulClient(c =>
            {
                //consul地址
                c.Address = new Uri(_configuration["ConsulSetting:ConsulAddress"]);
            });

            //consulClient.Catalog.Services().Result.Response;
            //consulClient.Agent.Services().Result.Response;
            var services = consulClient.Health.Service("ProductService", null, true, null).Result.Response;//健康的服務

            string[] serviceUrls = services.Select(p => $"http://{p.Service.Address + ":" + p.Service.Port}").ToArray();//產品服務地址列表

            if (!serviceUrls.Any())
            {
                return await Task.FromResult("【產品服務】服務列表為空");
            }

            //每次隨機訪問一個服務實例
            var Client = new RestClient(serviceUrls[new Random().Next(0, serviceUrls.Length)]);
            var request = new RestRequest("/products", Method.GET);

            var response = await Client.ExecuteAsync(request);
            return response.Content;
        }
    }

appsettings.json:

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft": "Warning",
      "Microsoft.Hosting.Lifetime": "Information"
    }
  },
  "AllowedHosts": "*",
  "ConsulSetting": {
    "ConsulAddress": "http://localhost:8500"
  }
}

OK,以上代碼就完成了服務列表的獲取。

瀏覽器測試一下:

隨便停止2個服務:

繼續訪問:

這時候停止的服務地址就獲取不到了,客戶端依然正常運行。

這時候解決了服務的發現,新的問題又來了…

  • 客戶端每次要調用服務,都先去Consul獲取一下地址,這不僅浪費資源,還增加了請求的響應時間,這顯然讓人無法接受。

那麼怎麼保證不要每次請求都去Consul獲取地址,同時又要拿到可用的地址列表呢?
Consul提供的解決方案:——Blocking Queries (阻塞的請求)。詳情請見官網:https://www.consul.io/api-docs/features/blocking

Blocking Queries

這是什麼意思呢,簡單來說就是當客戶端請求Consul獲取地址列表時,需要攜帶一個版本號信息,Consul會比較這個客戶端版本號是否和Consul服務端的版本號一致,如果一致,則Consul會阻塞這個請求,直到Consul中的服務列表發生變化,或者到達阻塞時間上限;如果版本號不一致,則立即返回。這個阻塞時間默認是5分鐘,支持自定義。
那麼我們另外啟動一個線程去干這件事情,就不會影響每次的用戶請求了。這樣既保證了客戶端服務列表的準確性,又節約了客戶端請求服務列表的次數。

  • 繼續改造代碼:
    IServiceHelper增加一個獲取服務列表的接口方法:
    public interface IServiceHelper
    {
        /// <summary>
        /// 獲取產品數據
        /// </summary>
        /// <returns></returns>
        Task<string> GetProduct();

        /// <summary>
        /// 獲取訂單數據
        /// </summary>
        /// <returns></returns>
        Task<string> GetOrder();

        /// <summary>
        /// 獲取服務列表
        /// </summary>
        void GetServices();
    }

ServiceHelper實現接口:

    public class ServiceHelper : IServiceHelper
    {
        private readonly IConfiguration _configuration;
        private readonly ConsulClient _consulClient;
        private ConcurrentBag<string> _orderServiceUrls;
        private ConcurrentBag<string> _productServiceUrls;

        public ServiceHelper(IConfiguration configuration)
        {
            _configuration = configuration;
            _consulClient = new ConsulClient(c =>
            {
                //consul地址
                c.Address = new Uri(_configuration["ConsulSetting:ConsulAddress"]);
            });
        }

        public async Task<string> GetOrder()
        {
            if (_productServiceUrls == null)
                return await Task.FromResult("【訂單服務】正在初始化服務列表...");

            //每次隨機訪問一個服務實例
            var Client = new RestClient(_orderServiceUrls.ElementAt(new Random().Next(0, _orderServiceUrls.Count())));
            var request = new RestRequest("/orders", Method.GET);

            var response = await Client.ExecuteAsync(request);
            return response.Content;
        }

        public async Task<string> GetProduct()
        {
            if(_productServiceUrls == null)
                return await Task.FromResult("【產品服務】正在初始化服務列表...");

            //每次隨機訪問一個服務實例
            var Client = new RestClient(_productServiceUrls.ElementAt(new Random().Next(0, _productServiceUrls.Count())));
            var request = new RestRequest("/products", Method.GET);

            var response = await Client.ExecuteAsync(request);
            return response.Content;
        }

        public void GetServices()
        {
            var serviceNames = new string[] { "OrderService", "ProductService" };
            Array.ForEach(serviceNames, p =>
            {
                Task.Run(() =>
                {
                    //WaitTime默認為5分鐘
                    var queryOptions = new QueryOptions { WaitTime = TimeSpan.FromMinutes(10) };
                    while (true)
                    {
                        GetServices(queryOptions, p);
                    }
                });
            });
        }
        private void GetServices(QueryOptions queryOptions, string serviceName)
        {
            var res = _consulClient.Health.Service(serviceName, null, true, queryOptions).Result;
            
            //控制台打印一下獲取服務列表的響應時間等信息
            Console.WriteLine($"{DateTime.Now}獲取{serviceName}:queryOptions.WaitIndex:{queryOptions.WaitIndex}  LastIndex:{res.LastIndex}");

            //版本號不一致 說明服務列表發生了變化
            if (queryOptions.WaitIndex != res.LastIndex)
            {
                queryOptions.WaitIndex = res.LastIndex;

                //服務地址列表
                var serviceUrls = res.Response.Select(p => $"http://{p.Service.Address + ":" + p.Service.Port}").ToArray();

                if (serviceName == "OrderService")
                    _orderServiceUrls = new ConcurrentBag<string>(serviceUrls);
                else if (serviceName == "ProductService")
                    _productServiceUrls = new ConcurrentBag<string>(serviceUrls);
            }
        }
    }

Startup的Configure方法中調用一下獲取服務列表:

        public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IServiceHelper serviceHelper)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
            else
            {
                app.UseExceptionHandler("/Home/Error");
            }
            app.UseStaticFiles();

            app.UseRouting();

            app.UseAuthorization();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllerRoute(
                    name: "default",
                    pattern: "{controller=Home}/{action=Index}/{id?}");
            });

            //程序啟動時 獲取服務列表
            serviceHelper.GetServices();
        }

代碼完成,運行測試:

現在不用每次先請求服務列表了,是不是流暢多了?

看一下控制台打印:

這時候如果服務列表沒有發生變化的話,獲取服務列表的請求會一直阻塞到我們設置的10分鐘。

隨便停止2個服務:

這時候可以看到,數據被立馬返回了。

繼續訪問客戶端網站,同樣流暢。
(gif圖傳的有點問題。。。)

至此,我們就通過Consul完成了服務的註冊與發現。
接下來又引發新的思考。。。

  1. 每個客戶端系統都去維護這一堆服務地址,合理嗎?
  2. 服務的ip端口直接暴露給所有客戶端,安全嗎?
  3. 這種模式下怎麼做到客戶端的統一管理呢?

代碼放在:https://github.com/xiajingren/NetCoreMicroserviceDemo

未完待續…

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※超省錢租車方案

※別再煩惱如何寫文案,掌握八大原則!

※回頭車貨運收費標準

※教你寫出一流的銷售文案?

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益