DQN(Deep Q-learning)入門教程(四)之Q-learning Play Flappy Bird_網頁設計公司

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

台中景泰電動車行只是一個單純的理由,將來台灣的環境,出門可以自由放心的深呼吸,讓空氣回歸自然的乾淨,減少污染,留給我們下一代有好品質無空污的優質環境

在上一篇博客中,我們詳細的對Q-learning的算法流程進行了介紹。同時我們使用了\(\epsilon-貪婪法\)防止陷入局部最優。

那麼我們可以想一下,最後我們得到的結果是什麼樣的呢?因為我們考慮到了所有的(\(\epsilon-貪婪法\)導致的)情況,因此最終我們將會得到一張如下的Q-Table表。

Q-Table \(a_1\) \(a_2\)
\(s_1\) \(q(s_1,a_1)\) \(q(s_1,a_2)\)
\(s_2\) \(q(s_2,a_1)\) \(q(s_2,a_2)\)
\(s_3\) \(q(s_3,a_1)\) \(q(s_3,a_2)\)

當agent運行到某一個場景\(s\)時,會去查詢已經訓練好的Q-Table,然後從中選擇一個最大的\(q\)對應的action。

訓練內容

這一次,我們將對Flappy-bird遊戲進行訓練。這個遊戲的介紹我就不多說了,可以看一下維基百科的介紹。

遊戲就是控制一隻穿越管道,然後可以獲得分數,對於小鳥來說,他只有兩個動作,跳or不跳,而我們的目標就是使小鳥穿越管道獲得更多的分數。

前置準備

因為我們的目標是來學習“強化學習”的,所以我們不可能說自己去弄一個Flappy-bird(當然自己弄也可以),這裏我們直接使用一個已經寫好的Flappy-bird。

PyGame-Learning-Environment,是一個Python的強化學習環境,簡稱PLE,下面時他Github上面的介紹:

PyGame Learning Environment (PLE) is a learning environment, mimicking the Arcade Learning Environment interface, allowing a quick start to Reinforcement Learning in Python. The goal of PLE is allow practitioners to focus design of models and experiments instead of environment design.

PLE hopes to eventually build an expansive library of games.

然後關於FlappyBird的文檔介紹在這裏,文檔的介紹還是蠻清楚的。安裝步驟如下所示,推薦在Pipenv的環境下安裝,不過你也可以直接clone我的代碼然後然後根據reademe的步驟進行使用。

git clone https://github.com/ntasfi/PyGame-Learning-Environment.git
cd PyGame-Learning-Environment/
pip install -e .

需要的庫如下:

  • pygame
  • numpy
  • pillow

函數說明

在官方文檔有幾個的函數在這裏說下,因為等下我們需要用到。

  • getGameState():獲得遊戲當前的狀態,返回值為一個字典:

    1. player y position.
    2. players velocity.
    3. next pipe distance to player
    4. next pipe top y position
    5. next pipe bottom y position
    6. next next pipe distance to player
    7. next next pipe top y position
    8. next next pipe bottom y position

    部分數據表示如下:

  • reset_game():重新開始遊戲

  • act(action):在遊戲中執行一個動作,參數為動作,返回執行后的分數。

  • game_over():假如遊戲結束,則返回True,否者返回False。

  • getActionSet():獲得遊戲的動作集合。

    ※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

    網站的第一印象網頁設計,決定了客戶是否繼續瀏覽的意願。台北網動廣告製作的RWD網頁設計,採用精簡與質感的CSS語法,提升企業的專業形象與簡約舒適的瀏覽體驗,讓瀏覽者第一眼就愛上它。

我們的窗體大小默認是288*512,其中鳥的速度在-20到10之間(最小速度我並不知道,但是經過觀察,並沒有小於-20的情況,而最大的速度在源代碼裏面已經說明好了為10)

Coding Time

在前面我們說,通過getGameState()函數,我們可以獲得幾個關於環境的數據,在這裏我們選擇如下的數據:

  • next_pipe_dist_to_player:
  • player_y與next_pipe_top_y的差值
  • 的速度

但是我們可以想一想,next_pipe_dist_to_player一共會有多少種的取值:因為窗體大小為288*512,則取值的範圍大約是0~288,也就是說它大約有288個取值,而關於player_y與next_pipe_top_y的差值,則大概有1024個取值。這樣很難讓模型收斂,因此我們將數值進行簡化。其中簡化的思路來自:GitHub

首先我們創建一個Agent類,然後逐漸向裏面添加功能。

class Agent():

    def __init__(self, action_space):
        # 獲得遊戲支持的動作集合
        self.action_set = action_space

        # 創建q-table
        self.q_table = np.zeros((6, 6, 6, 2))

        # 學習率
        self.alpha = 0.7
        # 勵衰減因子
        self.gamma = 0.8
        # 貪婪率
        self.greedy = 0.8

至於為什麼q-table的大小是(6,6,6,2),其中的3個6分別代表next_pipe_dist_to_playerplayer_y與next_pipe_top_y的差值的速度,其中的2代表動作的個數。也就是說,表格中的state一共有$6 \times6 \times 6 $種,表格的大小為\(6 \times6 \times 6 \times 2\)

縮小狀態值的範圍

我們定義一個函數get_state(s),這個函數專門提取遊戲中的狀態,然後返回進行簡化的狀態數據:

    def get_state(self, state):
        """
        提取遊戲state中我們需要的數據
        :param state: 遊戲state
        :return: 返回提取好的數據
        """
        return_state = np.zeros((3,), dtype=int)
        dist_to_pipe_horz = state["next_pipe_dist_to_player"]
        dist_to_pipe_bottom = state["player_y"] - state["next_pipe_top_y"]
        velocity = state['player_vel']
        if velocity < -15:
            velocity_category = 0
        elif velocity < -10:
            velocity_category = 1
        elif velocity < -5:
            velocity_category = 2
        elif velocity < 0:
            velocity_category = 3
        elif velocity < 5:
            velocity_category = 4
        else:
            velocity_category = 5

        if dist_to_pipe_bottom < 8:  # very close or less than 0
            height_category = 0
        elif dist_to_pipe_bottom < 20:  # close
            height_category = 1
        elif dist_to_pipe_bottom < 50:  # not close
            height_category = 2
        elif dist_to_pipe_bottom < 125:  # mid
            height_category = 3
        elif dist_to_pipe_bottom < 250:  # far
            height_category = 4
        else:
            height_category = 5

        # make a distance category
        if dist_to_pipe_horz < 8:  # very close
            dist_category = 0
        elif dist_to_pipe_horz < 20:  # close
            dist_category = 1
        elif dist_to_pipe_horz < 50:  # not close
            dist_category = 2
        elif dist_to_pipe_horz < 125:  # mid
            dist_category = 3
        elif dist_to_pipe_horz < 250:  # far
            dist_category = 4
        else:
            dist_category = 5

        return_state[0] = height_category
        return_state[1] = dist_category
        return_state[2] = velocity_category
        return return_state

更新Q-table

更新的數學公式如下:

\[{\displaystyle Q^{new}(s_{t},a_{t})\leftarrow \underbrace {Q(s_{t},a_{t})} _{\text{舊的值}}+\underbrace {\alpha } _{\text{學習率}}\cdot \overbrace {{\bigg (}\underbrace {\underbrace {r_{t}} _{\text{獎勵}}+\underbrace {\gamma } _{\text{獎勵衰減因子}}\cdot \underbrace {\max _{a}Q(s_{t+1},a)} _{\text{estimate of optimal future value}}} _{\text{new value (temporal difference target)}}-\underbrace {Q(s_{t},a_{t})} _{\text{舊的值}}{\bigg )}} ^{\text{temporal difference}}} \]

下面是更新Q-table的函數代碼:

def update_q_table(self, old_state, current_action, next_state, r):
    """

    :param old_state: 執行動作前的狀態
    :param current_action: 執行的動作
    :param next_state: 執行動作后的狀態
    :param r: 獎勵
    :return:
    """
    next_max_value = np.max(self.q_table[next_state[0], next_state[1], next_state[2]])

    self.q_table[old_state[0], old_state[1], old_state[2], current_action] = (1 - self.alpha) * self.q_table[
        old_state[0], old_state[1], old_state[2], current_action] + self.alpha * (r + next_max_value)

選擇最佳的動作

然後我們就是根據q-table對應的Q值選擇最大的那一個,其中第一個代表(也就是0)跳躍,第2個代表不執行任何操作。

選擇的示意圖如下:

代碼如下所示:

def get_best_action(self, state, greedy=False):
    """
    獲得最佳的動作
    :param state: 狀態
    :是否使用ϵ-貪婪法
    :return: 最佳動作
    """
	
    # 獲得q值
    jump = self.q_table[state[0], state[1], state[2], 0]
    no_jump = self.q_table[state[0], state[1], state[2], 1]
    # 是否執行策略
    if greedy:
        if np.random.rand(1) < self.greedy:
            return np.random.choice([0, 1])
        else:
            if jump > no_jump:
                return 0
            else:
                return 1
    else:
        if jump > no_jump:
            return 0
        else:
            return 1

更新\(\epsilon\)

這個比較簡單,從前面的博客中,我們知道\(\epsilon\)是隨着訓練次數的增加而減少的,有很多種策略可以選擇,這裏乘以\(0.95\)吧。

def update_greedy(self):
    self.greedy *= 0.95

執行動作

在官方文檔中,如果小鳥沒有死亡獎勵為0,越過一個管道,獎勵為1,死亡獎勵為-1,我們稍微的對其進行改變:

def act(self, p, action):
    """
    執行動作
    :param p: 通過p來向遊戲發出動作命令
    :param action: 動作
    :return: 獎勵
    """
    # action_set表示遊戲動作集(119,None),其中119代表跳躍
    r = p.act(self.action_set[action])
    if r == 0:
        r = 1
    if r == 1:
        r = 10
    else:
        r = -1000
    return r

main函數

最後我們就可以執行main函數了。

if __name__ == "__main__":
    # 訓練次數
    episodes = 2000_000000
    # 實例化遊戲對象
    game = FlappyBird()
    # 類似遊戲的一個接口,可以為我們提供一些功能
    p = PLE(game, fps=30, display_screen=False)
    # 初始化
    p.init()
    # 實例化Agent,將動作集傳進去
    agent = Agent(p.getActionSet())
    max_score = 0
	
    for episode in range(episodes):
        # 重置遊戲
        p.reset_game()
        # 獲得狀態
        state = agent.get_state(game.getGameState())
        agent.update_greedy()
        while True:
            # 獲得最佳動作
            action = agent.get_best_action(state)
            # 然後執行動作獲得獎勵
            reward = agent.act(p, action)
            # 獲得執行動作之後的狀態
            next_state = agent.get_state(game.getGameState())
            # 更新q-table
            agent.update_q_table(state, action, next_state, reward)
            # 獲得當前分數
            current_score = p.score()
            state = next_state
            if p.game_over():
                max_score = max(current_score, max_score)
                print('Episodes: %s, Current score: %s, Max score: %s' % (episode, current_score, max_score))
                # 保存q-table
                if current_score > 300:
                    np.save("{}_{}.npy".format(current_score, episode), agent.q_table)
                break

部分的訓練的結果如下:

總結

emm,說實話,我也不知道結果會怎麼樣,因為訓練的時間比較長,我不想放在我的電腦上面跑,然後我就放在樹莓派上面跑,但是樹莓派性能比較低,導致訓練的速度比較慢。但是,我還是覺得我的方法有點問題,get_state()函數中簡化的方法,我感覺不是特別的合理,如果各位有好的看法,可以在評論區留言哦,然後共同學習。

項目地址:https://github.com/xiaohuiduan/flappy-bird-q-learning

參考

  • Use reinforcement learning to train a flappy bird NEVER to die
  • PyGame-Learning-Environment
  • https://github.com/BujuNB/Flappy-Brid-RL

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

以設計的實用美學觀點,規劃出舒適、美觀的視覺畫面,有效提昇使用者的心理期待,營造出輕鬆、愉悅的網站瀏覽體驗。

Alink漫談(五) : 迭代計算和Superstep_台中搬家公司

※推薦台中搬家公司優質服務,可到府估價

台中搬鋼琴,台中金庫搬運,中部廢棄物處理,南投縣搬家公司,好幫手搬家,西屯區搬家

Alink漫談(五) : 迭代計算和Superstep

目錄

  • Alink漫談(五) : 迭代計算和Superstep
    • 0x00 摘要
    • 0x01 緣由
    • 0x02 背景概念
      • 2.1 四層執行圖
      • 2.2 Task和SubTask
      • 2.3 如何劃分 Task 的依據
      • 2.4 JobGraph
      • 2.5 BSP模型和Superstep
        • BSP模型
        • BSP模型的實現
        • Flink-Gelly
    • 0x03 Flink的迭代算法(superstep-based)
      • 3.1 Bulk Iterate
      • 3.2 迭代機制
    • 0x04 Alink如何使用迭代
    • 0x05 深入Flink源碼和runtime來驗證
      • 5.1 向Flink提交Job
      • 5.2 生成JobGraph
      • 5.3 迭代對應的Task
        • 5.3.1 IterationHeadTask
        • 5.3.2 IterationIntermediateTask
        • 5.3.3 IterationTailTask
          • 如何和Head建立聯繫
          • 如何把用戶返回的數值傳給Head
        • 5.3.4 IterationSynchronizationSinkTask
      • 5.4 superstep
    • 0x06 結合KMeans代碼看superset
      • 6.1 K-means算法概要
      • 6.2 KMeansPreallocateCentroid
      • 6.3 KMeansAssignCluster 和 KMeansUpdateCentroids
      • 6.4 KMeansOutputModel
    • 0x07 參考

0x00 摘要

Alink 是阿里巴巴基於實時計算引擎 Flink 研發的新一代機器學習算法平台,是業界首個同時支持批式算法、流式算法的機器學習平台。迭代算法在很多數據分析領域會用到,比如機器學習或者圖計算。本文將通過Superstep入手看看Alink是如何利用Flink迭代API來實現具體算法。

因為Alink的公開資料太少,所以以下均為自行揣測,肯定會有疏漏錯誤,希望大家指出,我會隨時更新。

0x01 緣由

為什麼提到 Superstep 這個概念,是因為在擼KMeans代碼的時候,發現幾個很奇怪的地方,比如以下三個步驟中,都用到了context.getStepNo(),而且會根據其數值的不同進行不同業務操作:

public class KMeansPreallocateCentroid extends ComputeFunction {
    public void calc(ComContext context) {
        LOG.info("liuhao  KMeansPreallocateCentroid ");
        if (context.getStepNo() == 1) {
          /** 具體業務邏輯代碼
           * Allocate memory for pre-round centers and current centers.
           */        
        }
    }
}  

public class KMeansAssignCluster extends ComputeFunction {
    public void calc(ComContext context) {
        ......
        if (context.getStepNo() % 2 == 0) {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
        } else {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
        }
      /** 具體業務邏輯代碼
       * Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster.
       */
    }
}

public class KMeansUpdateCentroids extends ComputeFunction {
    public void calc(ComContext context) {
        if (context.getStepNo() % 2 == 0) {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
        } else {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
        }
      /** 具體業務邏輯代碼
       * Update the centroids based on the sum of points and point number belonging to the same cluster.
       */
    }

查看ComContext的源碼,發現stepNo的來源居然是runtimeContext.getSuperstepNumber()

public class ComContext {
   private final int taskId;
   private final int numTask;
   private final int stepNo; // 對,就是這裏
   private final int sessionId;
	public ComContext(int sessionId, IterationRuntimeContext runtimeContext) {
		this.sessionId = sessionId;
		this.numTask = runtimeContext.getNumberOfParallelSubtasks();
		this.taskId = runtimeContext.getIndexOfThisSubtask();
		this.stepNo = runtimeContext.getSuperstepNumber(); // 這裏進行了變量初始化
	}  
	/**
	 * Get current iteration step number, the same as {@link IterationRuntimeContext#getSuperstepNumber()}.
	 * @return iteration step number.
	 */
	public int getStepNo() {
		return stepNo; // 這裡是使用
	}  
}

看到這裡有的兄弟可能會虎軀一震,這不是BSP模型的概念嘛。我就是想寫個KMeans算法,怎麼除了MPI模型,還要考慮BSP模型。下面就讓我們一步一步挖掘究竟Alink都做了什麼工作。

0x02 背景概念

2.1 四層執行圖

在 Flink 中的執行圖可以分為四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執行圖

  • StreamGraph:Stream API 編寫的代碼生成的最初的圖。用來表示程序的拓撲結構。
  • JobGraph:StreamGraph 經過優化後生成了 JobGraph, JobGraph是提交給 JobManager 的數據結構。主要的優化為,將多個符合條件的節點 chain 在一起作為一個節點,這樣可以減少數據在節點之間流動所需要的序列化/反序列化/傳輸消耗。JobGraph是唯一被Flink的數據流引擎所識別的表述作業的數據結構,也正是這一共同的抽象體現了流處理和批處理在運行時的統一。
  • ExecutionGraph:JobManager 根據 JobGraph 生成 ExecutionGraph。ExecutionGraph 是 JobGraph 的并行化版本,是調度層最核心的數據結構。
  • 物理執行圖:JobManager 根據 ExecutionGraph 對 Job 進行調度后,在各個TaskManager 上部署 Task 后形成的“圖”,並不是一個具體的數據結構。

2.2 Task和SubTask

因為某種原因,Flink內部對這兩個概念的使用本身就有些混亂:在Task Manager里這個subtask的概念由一個叫Task的類來實現。Task Manager里談論的Task對象實際上對應的是ExecutionGraph里的一個subtask。

所以這兩個概念需要理清楚。

  • Task(任務) :Task對應JobGraph的一個節點,是一個算子Operator。Task 是一個階段多個功能相同 subTask 的集合,類似於 Spark 中的 TaskSet。
  • subTask(子任務) :subTask 是 Flink 中任務最小執行單元,是一個 Java 類的實例,這個 Java 類中有屬性和方法,完成具體的計算邏輯。在ExecutionGraph里Task被分解為多個并行執行的subtask 。每個subtask作為一個excution分配到Task Manager里執行。
  • Operator Chains(算子鏈) :沒有 shuffle 的多個算子合併在一個 subTask 中,就形成了 Operator Chains,類似於 Spark 中的 Pipeline。Operator subTask 的數量指的就是算子的并行度。同一程序的不同算子也可能具有不同的并行度(因為可以通過 setParallelism() 方法來修改并行度)。

Flink 中的程序本質上是并行的。在執行期間,每一個算子 Operator (Transformation)都有一個或多個算子subTask(Operator SubTask),每個算子的 subTask 之間都是彼此獨立,並在不同的線程中執行,並且可能在不同的機器或容器上執行。

Task( SubTask) 是一個Runnable 對象, Task Manager接受到TDD 後會用它實例化成一個Task對象, 並啟動一個線程執行Task的Run方法。

TaskDeploymentDescriptor(TDD) : 是Task Manager在submitTask是提交給TM的數據結構。 他包含了關於Task的所有描述信息。比如:

  • TaskInfo : 包含該Task 執行的java 類,該類是某個 AbstractInvokable的實現類 , 當然也是某個operator的實現類 (比如DataSourceTask, DataSinkTask, BatchTask,StreamTask 等)。
  • IG描述 :通常包含一個或兩個InputGateDeploymentDescriptor(IGD)。
  • 目標RP的描述: ParitionId, PartitionType, RS個數等等。

2.3 如何劃分 Task 的依據

在以下情況下會重新劃分task

  • 并行度發生變化時
  • keyBy() /window()/apply() 等發生 Rebalance 重新分配;
  • 調用 startNewChain() 方法,開啟一個新的算子鏈;
  • 調用 diableChaining()方法,即:告訴當前算子操作不使用 算子鏈 操作。

比如有如下操作

DataStream<String> text = env.socketTextStream(hostname, port);

DataStream counts = text
    .filter(new FilterClass())
    .map(new LineSplitter())
    .keyBy(0)
    .timeWindow(Time.seconds(10))
    .sum(2)

那麼StreamGraph的轉換流是:

 Source --> Filter --> Map --> Timestamps/Watermarks --> Window(SumAggregator) --> Sink

其task是四個:

  • Source –> Filter –> Map
  • keyBy
  • timeWindow
  • Sink

其中每個task又會被分成分若干subtask。在執行時,一個Task會被并行化成若干個subTask實例進行執行,一個subTask對應一個執行線程。

2.4 JobGraph

以上說了這麼多,就是要說jobGraph和subtask,因為本文中我們在分析源碼和調試時候,主要是從jobGraph這裏開始入手來看subtask

JobGraph是在StreamGraph的基礎之上,對StreamNode進行了關聯合併的操作,比如對於source -> flatMap -> reduce -> sink 這樣一個數據處理鏈,當source和flatMap滿足鏈接的條件時,可以可以將兩個操作符的操作放到一個線程并行執行,這樣可以減少網絡中的數據傳輸,由於在source和flatMap之間的傳輸的數據也不用序列化和反序列化,所以也提高了程序的執行效率。

相比流圖(StreamGraph)以及批處理優化計劃(OptimizedPlan),JobGraph發生了一些變化,已經不完全是“靜態”的數據結構了,因為它加入了中間結果集(IntermediateDataSet)這一“動態”概念。

作業頂點(JobVertex)、中間數據集(IntermediateDataSet)、作業邊(JobEdge)是組成JobGraph的基本元素。這三個對象彼此之間互為依賴:

  • 一個JobVertex關聯着若干個JobEdge作為輸入端以及若干個IntermediateDataSet作為其生產的結果集;每個JobVertex都有諸如并行度和執行代碼等屬性。
  • 一個IntermediateDataSet關聯着一個JobVertex作為生產者以及若干個JobEdge作為消費者;
  • 一個JobEdge關聯着一個IntermediateDataSet可認為是源以及一個JobVertex可認為是目標消費者;

那麼JobGraph是怎麼組織並存儲這些元素的呢?其實JobGraph只以Map的形式存儲了所有的JobVertex,鍵是JobVertexID:

private final Map<JobVertexID, JobVertex> taskVertices = new LinkedHashMap<JobVertexID, JobVertex>();

至於其它的元素,通過JobVertex都可以根據關係找尋到。需要注意的是,用於迭代的反饋邊(feedback edge)當前並不體現在JobGraph中,而是被內嵌在特殊的JobVertex中通過反饋信道(feedback channel)在它們之間建立關係。

2.5 BSP模型和Superstep

BSP模型

BSP模型是并行計算模型的一種。并行計算模型通常指從并行算法的設計和分析出發,將各種并行計算機(至少某一類并行計算機)的基本特徵抽象出來,形成一個抽象的計算模型。

BSP模型是一種異步MIMD-DM模型(DM: distributed memory,SM: shared memory),BSP模型支持消息傳遞系統,塊內異步并行,塊間顯式同步,該模型基於一個master協調,所有的worker同步(lock-step)執行, 數據從輸入的隊列中讀取。

BSP計算模型不僅是一種體繫結構模型,也是設計并行程序的一種方法。BSP程序設計準則是整體同步(bulk synchrony),其獨特之處在於超步(superstep)概念的引入。一個BSP程序同時具有水平和垂直兩個方面的結構。從垂直上看,一個BSP程序由一系列串行的超步(superstep)組成。

BSP模型的實現

BSP模型的實現大概舉例如下:

  • Pregel :Google的大規模圖計算框架,首次提出了將BSP模型應用於圖計算,具體請看Pregel——大規模圖處理系統,不過至今未開源。
  • Apache Giraph :ASF社區的Incubator項目,由Yahoo!貢獻,是BSP的java實現,專註於迭代圖計算(如pagerank,最短連接等),每一個job就是一個沒有reducer過程的hadoop job。
  • Apache Hama :也是ASF社區的Incubator項目,與Giraph不同的是它是一個純粹的BSP模型的java實現,並且不單單是用於圖計算,意在提供一個通用的BSP模型的應用框架。

Flink-Gelly

Flink-Gelly利用Flink的高效迭代算子來支持海量數據的迭代式圖處理。目前,Flink Gelly提供了“Vertex-Centric”,“Scatter-Gather”以及“Gather-Sum-Apply”等計算模型的實現。

“Vertex-Centric”迭代模型也就是我們經常聽到的“Pregel”,是一種從Vertex角度出發的圖計算方式。其中,同步地迭代計算的步驟稱之為“superstep”。在每個“superstep”中,每個頂點都執行一個用戶自定義的函數,且頂點之間通過消息進行通信,當一個頂點知道圖中其他任意頂點的唯一ID時,該頂點就可以向其發送一條消息。

但是實際上,KMeans不是圖處理,Alink也沒有基於Flink-Gelly來構建。也許只是借鑒了其概念。所以我們還需要再探尋。

0x03 Flink的迭代算法(superstep-based)

迭代算法在很多數據分析領域會用到,比如機器學習或者圖計算。為了從大數據中抽取有用信息,這個時候往往會需要在處理的過程中用到迭代計算。

所謂迭代運算,就是給定一個初值,用所給的算法公式計算初值得到一个中間結果,然後將中間結果作為輸入參數進行反覆計算,在滿足一定條件的時候得到計算結果。

大數據處理框架很多,比如spark,mr。實際上這些實現迭代計算都是很困難的。

Flink直接支持迭代計算。Flink實現迭代的思路也是很簡單,就是實現一個step函數,然後將其嵌入到迭代算子中去。有兩種迭代操作算子: Iterate和Delta Iterate。兩個操作算子都是在未收到終止迭代信號之前一直調用step函數。

3.1 Bulk Iterate

這種迭代方式稱為全量迭代,它會將整個數據輸入,經過一定的迭代次數,最終得到你想要的結果。

迭代操作算子包括了簡單的迭代形式:每次迭代,step函數會消費全量數據(本次輸入和上次迭代的結果),然後計算得到下輪迭代的輸出(例如,map,reduce,join等)

迭代過程主要分為以下幾步:

  • Iteration Input(迭代輸入):是初始輸入值或者上一次迭代計算的結果。
  • Step Function(step函數):每次迭代都會執行step函數。它迭代計算DataSet,由一系列的operator組成,比如map,flatMap,join等,取決於具體的業務邏輯。
  • Next Partial Solution(中間結果):每一次迭代計算的結果,被發送到下一次迭代計算中。
  • Iteration Result(迭代結果):最後一次迭代輸出的結果,被輸出到datasink或者發送到下游處理。

它迭代的結束條件是:

  • 達到最大迭代次數
  • 自定義收斂聚合函數

編程的時候,需要調用iterate(int),該函數返回的是一個IterativeDataSet,當然我們可以對它進行一些操作,比如map等。Iterate函數唯一的參數是代表最大迭代次數。

迭代是一個環。我們需要進行閉環操作,那麼這時候就要用到closeWith(Dataset)操作了,參數就是需要循環迭代的dataset。也可以可選的指定一個終止標準,操作closeWith(DataSet, DataSet),可以通過判斷第二個dataset是否為空,來終止迭代。如果不指定終止迭代條件,迭代就會在迭代了最大迭代次數后終止。

3.2 迭代機制

DataSet API引進了獨特的同步迭代機制(superstep-based),僅限於用在有界的流。

我們將迭代操作算子的每個步驟函數的執行稱為單個迭代。在并行設置中,在迭代狀態的不同分區上并行計算step函數的多個實例。在許多設置中,對所有并行實例上的step函數的一次評估形成了所謂的superstep,這也是同步的粒度。因此,迭代的所有并行任務都需要在初始化下一個superstep之前完成superstep。終止準則也將被評估為superstep同步屏障。

下面是Apache原文

We referred to each execution of the step function of an iteration operator as a single iteration. In parallel setups, multiple instances of the step function are evaluated in parallel on different partitions of the iteration state. In many settings, one evaluation of the step function on all parallel instances forms a so called superstep, which is also the granularity of synchronization. Therefore, all parallel tasks of an iteration need to complete the superstep, before a next superstep will be initialized. Termination criteria will also be evaluated at superstep barriers.

下面是apache原圖

概括如下:

每次迭代都是一個superstep
    每次迭代中有若干subtask在不同的partition上分別執行step
      	 每個step有一個HeadTask,若干IntermediateTask,一個TailTask
    每個superstep有一個SynchronizationSinkTask 同步,因為迭代的所有并行任務需要在下一個迭代前完成

由此我們可以知道,superstep這是Flink DataSet API的概念,但是你從這裡能夠看到BSP模型的影子,比如:

  • 在傳統的BSP模型中,一個superstep被分為3步: 本地的計算, 消息的傳遞, 同步的barrier.
  • Barrier Synchronization又叫障礙同步或柵欄同步。每一次同步也是一個超步的完成和下一個超步的開始;
  • Superstep超步 是一次計算迭代,從起始每往前步進一層對應一個超步。
  • 程序該什麼時候結束是程序自己控制

0x04 Alink如何使用迭代

KMeansTrainBatchOp.iterateICQ函數中,生成了一個IterativeComQueue,而IterativeComQueue之中就用到了superstep-based迭代。

return new IterativeComQueue()
   .initWithPartitionedData(TRAIN_DATA, data)
   .initWithBroadcastData(INIT_CENTROID, initCentroid)
   .initWithBroadcastData(KMEANS_STATISTICS, statistics)
   .add(new KMeansPreallocateCentroid())
   .add(new KMeansAssignCluster(distance))
   .add(new AllReduce(CENTROID_ALL_REDUCE))
   .add(new KMeansUpdateCentroids(distance))
   .setCompareCriterionOfNode0(new KMeansIterTermination(distance, tol)) // 終止條件
   .closeWith(new KMeansOutputModel(distanceType, vectorColName, latitudeColName, longitudeColName)) 
   .setMaxIter(maxIter) // 迭代最大次數
   .exec();

而BaseComQueue.exec函數中則有:

public DataSet<Row> exec() {
   IterativeDataSet<byte[]> loop // Flink 迭代API
      = loopStartDataSet(executionEnvironment)
      .iterate(maxIter);
     // 後續操作能看出來,之前添加在queue上的比如KMeansPreallocateCentroid,都是在loop之上運行的。
  		if (null == compareCriterion) {
        loopEnd = loop.closeWith...
     	} else {     
        // compare Criterion.
        DataSet<Boolean> criterion = input ... compareCriterion
        loopEnd = loop.closeWith( ... criterion ... )
      }   
}

再仔細研究代碼,我們可以看出:

superstep包括:

.add(new KMeansPreallocateCentroid())
.add(new KMeansAssignCluster(distance))
.add(new AllReduce(CENTROID_ALL_REDUCE))
.add(new KMeansUpdateCentroids(distance))

終止標準就是

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

擁有後台管理系統的網站,將擁有強大的資料管理與更新功能,幫助您隨時新增網站的內容並節省網站開發的成本。

利用KMeansIterTermination構建了一個RichMapPartitionFunction作為終止標準。最後結束時候調用 KMeansOutputModel完成業務操作。

最大循環就是

.setMaxIter(maxIter)

於是我們可以得出結論,superstep-based Bulk Iterate 迭代算子是用來實現整體KMeans算法,KMeans算法就是一個superstep進行迭代。但是在superstep內容如果需要通訊或者柵欄同步,則採用了MPI的allReduce。

0x05 深入Flink源碼和runtime來驗證

我們需要深入到Flink內部去挖掘驗證,如果大家有興趣,可以參見下面調用棧,自己添加斷點來研究。

execute:56, LocalExecutor (org.apache.flink.client.deployment.executors)
executeAsync:944, ExecutionEnvironment (org.apache.flink.api.java)
execute:860, ExecutionEnvironment (org.apache.flink.api.java)
execute:844, ExecutionEnvironment (org.apache.flink.api.java)
collect:413, DataSet (org.apache.flink.api.java)
sinkFrom:44, PrintBatchOp (com.alibaba.alink.operator.batch.utils)
sinkFrom:20, PrintBatchOp (com.alibaba.alink.operator.batch.utils)
linkFrom:31, BaseSinkBatchOp (com.alibaba.alink.operator.batch.sink)
linkFrom:17, BaseSinkBatchOp (com.alibaba.alink.operator.batch.sink)
link:89, BatchOperator (com.alibaba.alink.operator.batch)
linkTo:239, BatchOperator (com.alibaba.alink.operator.batch)
print:337, BatchOperator (com.alibaba.alink.operator.batch)
main:35, KMeansExample (com.alibaba.alink)

5.1 向Flink提交Job

Alink和Flink構建聯繫,是在print調用中完成的。因為是本地調試,Flink會啟動一個miniCluster,然後會做如下操作。

  • 首先生成執行計劃Plan。Plan以數據流形式來表示批處理程序,但它只是批處理程序最初的表示,然後計劃會被優化以生成更高效的方案OptimizedPlan。
  • 然後,計劃被編譯生成JobGraph。這個圖是要交給flink去生成task的圖。
  • 生成一系列配置。
  • 將JobGraph和配置交給flink集群去運行。如果不是本地運行的話,還會把jar文件通過網絡發給其他節點。
  • 以本地模式運行的話,可以看到啟動過程,如啟動性能度量、web模塊、JobManager、ResourceManager、taskManager等等。

當我們看到了submitJob調用,就知道KMeans代碼已經和Flink構建了聯繫

@Internal
public class LocalExecutor implements PipelineExecutor {

   public static final String NAME = "local";

   @Override
   public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception {

      // we only support attached execution with the local executor.
      checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));

      final JobGraph jobGraph = getJobGraph(pipeline, configuration);
      final MiniCluster miniCluster = startMiniCluster(jobGraph, configuration);
      final MiniClusterClient clusterClient = new MiniClusterClient(configuration, miniCluster);

      CompletableFuture<JobID> jobIdFuture = clusterClient.submitJob(jobGraph);

      jobIdFuture
            .thenCompose(clusterClient::requestJobResult)
            .thenAccept((jobResult) -> clusterClient.shutDownCluster());

      return jobIdFuture.thenApply(jobID ->
            new ClusterClientJobClientAdapter<>(() -> clusterClient, jobID));
   }

5.2 生成JobGraph

生成jobGraph的具體流程是:

  • IterativeDataSet.closeWith會生成一個BulkIterationResultSet。
  • PrintBatchOp.sinkFrom中會調用到ExecutionEnvironment.executeAsync
  • 調用createProgramPlan構建一個Plan
  • OperatorTranslation.translate函數發現if (dataSet instanceof BulkIterationResultSet),則調用translateBulkIteration(bulkIterationResultSet);
  • 這時候生成了執行計劃Plan
  • ExecutionEnvironment.executeAsync調用LocalExecutor.execute
  • 然後調用FlinkPipelineTranslationUtil.getJobGraph來生成jobGraph
  • GraphCreatingVisitor.preVisit中會判斷 if (c instanceof BulkIterationBase),以生成BulkIterationNode
  • PlanTranslator.translateToJobGraph會調用到JobGraphGenerator.compileJobGraph,最終調用到createBulkIterationHead就生成了迭代處理的Head。
  • 最後將jobGraph提交給Cluster ,jobGraph 變形為 ExceutionGraph在JM和TM上執行。

5.3 迭代對應的Task

前面代碼中,getJobGraph函數作用是生成了job graph。

然後 JobManager 根據 JobGraph 生成 ExecutionGraph。ExecutionGraph 是 JobGraph 的并行化版本,是調度層最核心的數據結構。

最後 JobManager 根據 ExecutionGraph 對 Job 進行調度后,在各個TaskManager 上部署 Task。

所以我們需要看看最終運行時候,迭代API對應着哪些Task。

針對IterativeDataSet,即superstep-based Bulk Iterate,Flink生成了如下的task。

  • IterationHeadTask
  • IterationIntermediateTask
  • IterationTailTask
  • IterationSynchronizationSinkTask

5.3.1 IterationHeadTask

IterationHeadTask主要作用是協調一次迭代。

它會讀取初始輸入,和迭代Tail建立一個BlockingBackChannel。在成功處理輸入之後,它會發送EndOfSuperstep事件給自己的輸出。它在每次superstep之後會聯繫 synchronization task,等到自己收到一個用來同步的AllWorkersDoneEvent。AllWorkersDoneEvent表示所有其他的heads已經完成了自己的迭代。

下一次迭代時候,上一次迭代中tail的輸出就經由backchannel傳輸,形成了head的輸入。何時進入到下一個迭代,是由HeadTask完成的。一旦迭代完成,head將發送TerminationEvent給所有和它關聯的task,告訴他們shutdown。

				barrier.waitForOtherWorkers();

				if (barrier.terminationSignaled()) {
					requestTermination();
					nextStepKickoff.signalTermination();
				} else {
					incrementIterationCounter();
					String[] globalAggregateNames = barrier.getAggregatorNames();
					Value[] globalAggregates = barrier.getAggregates();
					aggregatorRegistry.updateGlobalAggregatesAndReset(globalAggregateNames, globalAggregates);
          // 在這裏發起下一次Superstep。
					nextStepKickoff.triggerNextSuperstep();
				}
			}

IterationHeadTask是在JobGraphGenerator.createBulkIterationHead中構建的。其例子如下:

"PartialSolution (Bulk Iteration) (org.apache.flink.runtime.iterative.task.IterationHeadTask)"

5.3.2 IterationIntermediateTask

IterationIntermediateTask是superstep中間段的task,其將傳輸EndOfSuperstepEvent和TerminationEvent給所有和它關聯的tasks。此外,IterationIntermediateTask能更新the workset或者the solution set的迭代狀態。

如果迭代狀態被更新,本task的輸出將傳送回IterationHeadTask,在這種情況下,本task將作為head再次被安排。

IterationIntermediateTask的例子如下:

 "MapPartition (computation@KMeansUpdateCentroids) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)"
   
 "Combine (SUM(0), at kMeansPlusPlusInit(KMeansInitCentroids.java:135) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)"
   
 "MapPartition (AllReduceSend) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)"
   
"Filter (Filter at kMeansPlusPlusInit(KMeansInitCentroids.java:130)) (org.apache.flink.runtime.iterative.task.IterationIntermediateTask)"
   

5.3.3 IterationTailTask

IterationTailTask是迭代的最末尾。如果迭代狀態被更新,本task的輸出將通過BlockingBackChannel傳送回IterationHeadTask,反饋給迭代頭就意味着一個迭代完整邏輯的完成,那麼就可以關閉這個迭代閉合環了。這種情況下,本task將在head所在的實例上重新被調度。

這裡有幾個關鍵點需要注意:

如何和Head建立聯繫

Flink有一個BlockingQueueBroker類,這是一個阻塞式的隊列代理,它的作用是對迭代併發進行控制。Broker是單例的,迭代頭任務和尾任務會生成同樣的broker ID,所以頭尾在同一個JVM中會基於相同的dataChannel進行通信。dataChannel由迭代頭創建。

IterationHeadTask中會生成BlockingBackChannel,這是一個容量為1的阻塞隊列。

// 生成channel
BlockingBackChannel backChannel = new BlockingBackChannel(new SerializedUpdateBuffer(segments, segmentSize, this.getIOManager())); 

// 然後block在這裏,等待Tail
superstepResult = backChannel.getReadEndAfterSuperstepEnded();

IterationTailTask則是如下:

// 在基類得到channel,因為是單例,所以會得到同一個
worksetBackChannel = BlockingBackChannelBroker.instance().getAndRemove(brokerKey());

// notify iteration head if responsible for workset update 在這裏通知Head
worksetBackChannel.notifyOfEndOfSuperstep();

而兩者都是利用如下辦法來建立聯繫,在同一個subtask中會使用同一個brokerKey,這樣首尾就聯繫起來了。

public String brokerKey() {
    if (this.brokerKey == null) {
        int iterationId = this.config.getIterationId();
        this.brokerKey = this.getEnvironment().getJobID().toString() + '#' + iterationId + '#' + this.getEnvironment().getTaskInfo().getIndexOfThisSubtask();
    }

    return this.brokerKey;
}
如何把用戶返回的數值傳給Head

這是通過output.collect來完成的。

首先,在Tail初始化時候,會生成一個outputCollector,這個outputCollector會被設置為本task的輸出outputCollector。這樣就保證了用戶函數的輸出都會轉流到outputCollector。

而outputCollector的輸出就是worksetBackChannel的輸出,這裏設置為同一個instance。這樣用戶輸出就輸出到backChannel中。

	@Override
	protected void initialize() throws Exception {
		super.initialize();
    
		// set the last output collector of this task to reflect the iteration tail state update:
		// a) workset update,
		// b) solution set update, or
		// c) merged workset and solution set update

		Collector<OT> outputCollector = null;
		if (isWorksetUpdate) {
      // 生成一個outputCollector
			outputCollector = createWorksetUpdateOutputCollector();

			// we need the WorksetUpdateOutputCollector separately to count the collected elements
			if (isWorksetIteration) {
				worksetUpdateOutputCollector = (WorksetUpdateOutputCollector<OT>) outputCollector;
			}
		}
    
    ......
    // 把outputCollector設置為本task的輸出
		setLastOutputCollector(outputCollector);
	}

outputCollector的輸出就是worksetBackChannel的輸出buffer,這裏設置為同一個instance。

	protected Collector<OT> createWorksetUpdateOutputCollector(Collector<OT> delegate) {
		DataOutputView outputView = worksetBackChannel.getWriteEnd();
		TypeSerializer<OT> serializer = getOutputSerializer();
		return new WorksetUpdateOutputCollector<OT>(outputView, serializer, delegate);
	}

運行時候如下:

	@Override
	public void run() throws Exception {

		SuperstepKickoffLatch nextSuperStepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());

		while (this.running && !terminationRequested()) {

      // 用戶在這裏輸出,最後會輸出到output.collect,也就是worksetBackChannel的輸出buffer。
			super.run();

      // 這時候以及輸出到channel完畢,只是通知head進行讀取。
			if (isWorksetUpdate) {
				// notify iteration head if responsible for workset update
				worksetBackChannel.notifyOfEndOfSuperstep();
			} else if (isSolutionSetUpdate) {
				// notify iteration head if responsible for solution set update
				solutionSetUpdateBarrier.notifySolutionSetUpdate();
			}

      ...
	}

IterationTailTask例子如下:

"Pipe (org.apache.flink.runtime.iterative.task.IterationTailTask)"

5.3.4 IterationSynchronizationSinkTask

IterationSynchronizationSinkTask作用是同步所有的iteration heads,IterationSynchronizationSinkTask被是實現成一個 output task。其只是用來協調,不處理任何數據。

在每一次superstep,IterationSynchronizationSinkTask只是等待直到它從每一個head都收到一個WorkerDoneEvent。這表示下一次superstep可以開始了。

這裏需要注意的是 SynchronizationSinkTask 如何等待各個并行度的headTask。比如Flink的并行度是5,那麼SynchronizationSinkTask怎麼做到等待這5個headTask。

在IterationSynchronizationSinkTask中,註冊了SyncEventHandler來等待head的WorkerDoneEvent。

this.eventHandler = new SyncEventHandler(numEventsTillEndOfSuperstep, this.aggregators, this.getEnvironment().getUserClassLoader());
this.headEventReader.registerTaskEventListener(this.eventHandler, WorkerDoneEvent.class);

在SyncEventHandler中,我們可以看到,在構建時候,numberOfEventsUntilEndOfSuperstep就被設置為并行度,每次收到一個WorkerDoneEvent,workerDoneEventCounter就遞增,當等於numberOfEventsUntilEndOfSuperstep,即并行度時候,就說明本次superstep中,所有headtask都成功了。

    private void onWorkerDoneEvent(WorkerDoneEvent workerDoneEvent) {
        if (this.endOfSuperstep) {
            throw new RuntimeException("Encountered WorderDoneEvent when still in End-of-Superstep status.");
        } else {
          // 每次遞增
            ++this.workerDoneEventCounter;
            String[] aggNames = workerDoneEvent.getAggregatorNames();
            Value[] aggregates = workerDoneEvent.getAggregates(this.userCodeClassLoader);
            if (aggNames.length != aggregates.length) {
                throw new RuntimeException("Inconsistent WorkerDoneEvent received!");
            } else {
                for(int i = 0; i < aggNames.length; ++i) {
                    Aggregator<Value> aggregator = (Aggregator)this.aggregators.get(aggNames[i]);
                    aggregator.aggregate(aggregates[i]);
                }

              // numberOfEventsUntilEndOfSuperstep就是并行度,等於并行度時候就說明所有head都成功了。
                if (this.workerDoneEventCounter % this.numberOfEventsUntilEndOfSuperstep == 0) {
                    this.endOfSuperstep = true;
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

IterationSynchronizationSinkTask的例子如下:

"Sync (BulkIteration (Bulk Iteration)) (org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask)"

5.4 superstep

綜上所述,我們最終得到superstep如下:

***** 文字描述如下 *****
  
每次迭代都是一個superstep
  每次迭代中有若干subtask在不同的partition上分別執行step
     每個step有一個HeadTask,若干IntermediateTask,一個TailTask
  每個superstep有一個SynchronizationSinkTask
  
***** 偽代碼大致如下 *****
  
for maxIter :
  begin superstep
      for maxSubTask :
         begin step
           IterationHeadTask
           IterationIntermediateTask
           IterationIntermediateTask
           ...
           IterationIntermediateTask
           IterationIntermediateTask
           IterationTailTask
         end step
    IterationSynchronizationSinkTask
  end superstep

0x06 結合KMeans代碼看superset

6.1 K-means算法概要

K-means算法的過程,為了盡量不用數學符號,所以描述的不是很嚴謹,大概就是這個意思,“物以類聚、人以群分”:

  1. 首先輸入k的值,即我們希望將數據集經過聚類得到k個分組。
  2. 從數據集中隨機選擇k個數據點作為初始大哥(質心,Centroid)
  3. 對集合中每一個小弟,計算與每一個大哥的距離(距離的含義後面會講),離哪個大哥距離近,就跟定哪個大哥。
  4. 這時每一個大哥手下都聚集了一票小弟,這時候召開人民代表大會,每一群選出新的大哥(其實是通過算法選出新的質心)。
  5. 如果新大哥和老大哥之間的距離小於某一個設置的閾值(表示重新計算的質心的位置變化不大,趨於穩定,或者說收斂),可以認為我們進行的聚類已經達到期望的結果,算法終止。
  6. 如果新大哥和老大哥距離變化很大,需要迭代3~5步驟。

6.2 KMeansPreallocateCentroid

KMeansPreallocateCentroid也是superstep一員,但是只有context.getStepNo() == 1的時候,才會進入實際業務邏輯,預分配Centroid。當superstep為大於1的時候,本task會執行,但不會進入具體業務代碼。

public class KMeansPreallocateCentroid extends ComputeFunction {
    private static final Logger LOG = LoggerFactory.getLogger(KMeansPreallocateCentroid.class);

    @Override
    public void calc(ComContext context) {
        // 每次superstep都會進到這裏
        LOG.info("  KMeansPreallocateCentroid 我每次都會進的呀   ");
        if (context.getStepNo() == 1) {
          // 實際預分配業務只進入一次
        }
    }
}

6.3 KMeansAssignCluster 和 KMeansUpdateCentroids

KMeansAssignCluster 作用是為每個點(point)計算最近的聚類中心,為每個聚類中心的點坐標的計數和求和。

KMeansUpdateCentroids 作用是基於計算出來的點計數和坐標,計算新的聚類中心。

Alink在整個計算過程中維護一個特殊節點來記住待求中心點當前的結果。

這就是為啥迭代時候需要區分奇數次和偶數次的原因了。奇數次就表示老大哥,偶數次就表示新大哥。每次superstep只會計算一批大哥,留下另外一批大哥做距離比對。

另外要注意的一點是:普通的迭代計算,是通過Tail給Head回傳用戶數據,但是KMeans這裏的實現並沒有採用這個辦法,而是把計算出來的中心點都存在共享變量中,在各個intermediate之間互相交互。

public class KMeansAssignCluster extends ComputeFunction {
    public void calc(ComContext context) {
        ......
        if (context.getStepNo() % 2 == 0) {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
        } else {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
        }
      /** 具體業務邏輯代碼
       * Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster.
       */
    }
}

public class KMeansUpdateCentroids extends ComputeFunction {
    public void calc(ComContext context) {
        if (context.getStepNo() % 2 == 0) {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
        } else {
            stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
        }
      /** 具體業務邏輯代碼
       * Update the centroids based on the sum of points and point number belonging to the same cluster.
       */
    }

6.4 KMeansOutputModel

這裏要特殊說明,因為KMeansOutputModel是最終輸出模型,而KMeans算法的實現是:所有subtask都擁有所有中心點,就是說所有subtask都會有相同的模型,就沒有必要全部輸出,所以這裏限定了第一個subtask才能輸出,其他的都不輸出。

	@Override
	public List <Row> calc(ComContext context) {
    // 只有第一個subtask才輸出模型數據。
		if (context.getTaskId() != 0) {
			return null;
		}

    ....
      
		modelData.params = new KMeansTrainModelData.ParamSummary();
		modelData.params.k = k;
		modelData.params.vectorColName = vectorColName;
		modelData.params.distanceType = distanceType;
		modelData.params.vectorSize = vectorSize;
		modelData.params.latitudeColName = latitudeColName;
		modelData.params.longtitudeColName = longtitudeColName;

		RowCollector collector = new RowCollector();
		new KMeansModelDataConverter().save(modelData, collector);
		return collector.getRows();
	}

0x07 參考

幾種并行計算模型的區別(BSP LogP PRAM)

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/batch/iterations.html
聚類、K-Means、例子、細節

Flink-Gelly:Iterative Graph Processing

從BSP模型到Apache Hama

Flink DataSet迭代運算

幾種并行計算模型的區別(BSP LogP PRAM)

Flink架構,源碼及debug

Flink 之 Dataflow、Task、subTask、Operator Chains、Slot 介紹

Flink 任務和調度

Flink運行時之生成作業圖

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

台中搬家公司教你幾個打包小技巧,輕鬆整理裝箱!

還在煩惱搬家費用要多少哪?台中大展搬家線上試算搬家費用,從此不再擔心「物品怎麼計費」、「多少車才能裝完」

Elasticsearch系列—生產集群部署(上)_網頁設計公司

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

當全世界的人們隨著網路時代而改變向上時您還停留在『網站美醜不重要』的舊有思維嗎?機會是留給努力改變現況的人們,別再浪費一分一秒可以接觸商機的寶貴時間!

概要

本篇開始介紹Elasticsearch生產集群的搭建及相關參數的配置。

ES集群的硬件特性

我們從開始編程就接觸過各種各樣的組件,而每種功能的組件,對硬件要求的特性都不太相同,有的需要很強的CPU計算能力,有的對內存需求量大,有的對網卡要求高等待,下面我們討論一下ES集群對幾種硬件的特性需求。

CPU

ES集群對CPU的要求相對低一些,畢竟純計算的比重要小一些,選用主流的CPU,2核到8核的都可以。

如果有兩種CPU可以挑選,一種是主頻高但核數少的CPU,另一種是主頻一般核數多的CPU,肯定選后一種,因為多核的CPU可以提供更多的併發處理能力,遠比單核高性能帶來的效益要高。

內存

ES集群對內存的要求很高,部署ES集群時,要把大部分資源投入到內存當中。內存分配主要有兩部分,JVM heap內存(堆內存)和OS Cache內存。

JVM heap內存用得不多,主要是OS Cache,我們知道,ES建立的倒排索引,正排索引,過濾器緩存,都是優先放在內存當中的,OS Cache的大小直接決定搜索的性能,如果OS Cache不夠,ES搜索等操作只有被迫讀硬盤,延時就會從毫秒級升到秒級。

OS Cache具體在多大才算夠,取決於數據量,如果是百萬級別的數據,16GB左右應該可以接受,如果是億級,一般單節點都是64GB內存。生產環境最低要求內存應不低於8GB。

硬盤

硬盤成本本身比較便宜,能用SSD就用SSD,訪問速度肯定比机械硬盤快,預估好數據量后就盡可能多規劃一些容量。

另外盡量使用本地存儲,網絡存儲還依賴於網絡傳輸,這個容易造成一些延遲。

網絡

對ES集群這種分佈式系統來說,快速並且可靠的網絡還是比較重要的,shard的分配和rebalance都需要佔用大量的帶寬,集群最好部署在同一個局域網內,異地容災等跨數據中心的部署方案,要考慮到網絡故障帶來的影響。

JVM選擇

使用ES官網推薦的JDK版本,服務端和客戶端盡量使用同一個版本的JDK。

涉及到ES服務端的JVM調優設置,保持原樣不要輕易改動,畢竟ES已經花了大量人力物力驗證過的,隨意調整jvm參數可能適得其反。

容量規劃

規劃集群里,要規劃好投入幾台服務器,數據量上限是多少,業務模型數據讀寫的比例是多少,歷史數據的遷移方案等,一般來說,百萬到10億內的數據量,使用ES集群還是能夠支撐下來的,ES節點數建議不要超過100個。

舉個例子:數據量10億以內,部署5台服務器,8核64GB內存,是能夠支撐的。

生產案例模擬

Linux操作系統搭建

我們使用Linux虛擬機來演示一個生產ES集群的搭建。我們創建4台虛擬機,每台2核CPU,4GB內存,操作系統為CentOS 7 64bit。

虛擬機我用的是VMware workstation,有用virtual box也行,CentOS 7、JDK的安裝不贅述。記得把CentOS的防火牆關了。

修改每台機器的hostname信息,命令
vi /etc/hostname,修改文件,保存即可,建議修改成elasticsearch01,elasticsearch02,elasticsearch03,elasticsearch04。

假定我們4台虛擬機的域名和IP是這樣分配的:

192.168.17.138 elasticsearch01
192.168.17.137 elasticsearch02
192.168.17.132 elasticsearch03
192.168.17.139 elasticsearch04

把這段配置放在 /etc/hosts文件末尾,4台機器做相同的配置。

這4台機器之間,可以配置免密登錄,如在elasticsearch01機器上,我們執行以下操作:

  1. 生成公鑰文件,命令:
ssh-keygen -t rsa

一直輸入回車,不要設置密碼默認會將公鑰放在/root/.ssh目錄下生成id_rsa.pub和id_rsa兩個文件

  1. 拷貝公鑰文件
cp id_rsa.pub authorized_keys
  1. 將公鑰文件拷貝到另外三台機器
ssh-copy-id -i elasticsearch02
ssh-copy-id -i elasticsearch03
ssh-copy-id -i elasticsearch03

拷貝完成后,可以在目標機器上/root/.ssh/目錄下看到多了一個authorized_keys文件。

  1. 嘗試免密登錄,在elasticsearch01機器上輸入ssh elasticsearch02,如果不需要輸入密碼就能登錄到elasticsearch02,說明配置成功,其他機器類似。

這4台機器也可以相互做ssh免密設置。

這裏補充一點免密登錄的方向性問題,上面的案例是在elasticsearch01機器生成的公鑰,並且發送給了elasticsearch02等三台機器,那麼我從elasticsearch01跳到elasticsearch02是不需要密碼的,反過來從elasticsearch02登錄到elasticsearch01,還是需要密碼的。

最後補充幾個常用檢查命令:

  • 檢查NetManager的狀態:systemctl status NetworkManager.service
  • 檢查NetManager管理的網絡接口:nmcli dev status
  • 檢查NetManager管理的網絡連接:nmcli connection show

Elasticsearch服務端

這裏選用的JDK版本為1.8.0_211,Elasticsearch版本為6.3.1,自行安裝不贅述。

ES解壓后的目錄結構:

# 用 "tree -L 1" 命令得到的樹狀結構
.
├── bin
├── config
├── lib
├── LICENSE.txt
├── logs
├── modules
├── NOTICE.txt
├── plugins
└── README.textile
  • bin:存放es的一些可執行腳本,比如用於啟動進程的elasticsearch命令,以及用於安裝插件的elasticsearch-plugin插件
  • config:用於存放es的配置文件,比如elasticsearch.yml
  • logs:用於存放es的日誌文件
  • plugins:用於存放es的插件
  • data:用於存放es的數據文件的默認目錄,就是每個索引的shard的數據文件,一般會另外指定一個目錄。

Elasticsearch參數設置

在config目錄下的文件,包含了ES的基本配置信息:

.
├── elasticsearch.yml
├── jvm.options
├── log4j2.properties
├── role_mapping.yml
├── roles.yml
├── users
└── users_roles

默認參數

Elasticsearch的配置項比較豐富並且默認配置已經非常優秀了,基本上我們需要改動的是跟服務器環境相關的配置,如IP地址,集群名稱,數據存儲位置,日誌存儲位置等外圍參數,涉及到內部機制及JVM參數的,一般不干預,不恰當的JVM參數調整反而會導致集群出現性能故障,如果沒有充足的理由或數據驗證結果,不要輕易嘗試修改。

集群和節點名稱

在elasticsearch.yml文件里這項配置表示集群名稱,配置項默認是註釋掉的,集群名稱默認為elasticsearch。

#cluster.name: my-application

這個配置項強烈建議打開,用項目約定的命名規範進行重命名,並且將研發環境、測試環境、STG准生產環境、生產環境分別命名,如elasticsearch_music_app_dev表示研發環境,elasticsearch_music_app_sit表示測試環境,elasticsearch_music_app_pro表示生產環境等。避免開發測試環境連錯環境,無意中加入集群導致數據問題。

cluster.name: elasticsearch_music_app_pro

節點名稱的配置項

#node.name: node-1

默認也是註釋掉的,ES啟動時會分配一個隨機的名稱,建議還是自行分配一個名稱,這樣容易記住是哪台機器,如

node.name: es_node_001_data

文件路徑

涉及到文件路徑的幾個參數,主要有數據、日誌、插件等,默認這幾個地址都是在Elasticsearch安裝的根目錄下,但Elasticsearch升級時,有些目錄可能會有影響,安全起見,可以單獨設置目錄。

#
# ----------------------------------- Paths ------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):
#
#path.data: /path/to/data
#
# Path to log files:
#
#path.logs: /path/to/logs
#

例如我們可以在/var目錄下創建相應的文件夾,並且賦予相應的讀寫權限,如:

path.data: /var/es/data
path.logs: /var/es/logs

日誌文件配置

log4j2.properties文件,ES日誌框架選用的是log4j2,也就是log4j的進化版本,對Java技術棧熟悉的童鞋,看到這個配置文件會非常熟悉,默認的日誌輸入配置、格式均能滿足日常的故障定位和分析,也不需要什麼改動。

默認是一天生成一個日期文件,如果ES承載的數據量特別大,可以調整日誌文件產生頻率和每個日誌文件的大小,以及ES最多存儲日誌的大小、數量。

Elasticsearch集群發現機制

配置參數

Zen Discovery是Elasticsearch集群發現機制的默認實現,底層通信依賴transport組件,我們完成Elasticsearch集群的配置主要有下面幾個參數:

  • cluster.name 指定集群的名稱。
  • node.name 節點名稱。
  • network.host 節點綁定的IP。
  • node.master 可選值為true/false,決定該節點類型為master eligible或data node。
  • discovery.zen.ping.unicast.hosts gossip路由服務的IP地址,即集群發現協議通信的公共節點,可以寫多個,有節點啟動時會向裏面的IP發送消息,獲取集群其他節點的信息,最後加入集群。

Elasticsearch集群是點對點(P2P)的分佈式系統架構,數據索引、搜索操作是node之間直接通信的,沒有中心式的master節點,但Elasticsearch集群內的節點也分成master node和data node兩種角色。

正常情況下,Elasticsearch集群只有一個master節點,它負責維護整個集群的狀態信息,集群的元數據信息,有新的node加入或集群內node宕機下線時,重新分配shard,並同步node的狀態信息給所有的node節點,這樣所有的node節點都有一份完整的cluster state信息。

集群發現的一般步驟如下:

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

透過資料庫的網站架設建置,建立公司的形象或購物系統,並提供最人性化的使用介面,讓使用者能即時接收到相關的資訊

  1. 節點配置network.host綁定內網地址,配置各自的node.name信息,cluster.name設置為相同的值。
  2. discovery.zen.ping.unicast.hosts配置了幾個gossip路由的node。
  3. 所有node都可以發送ping消息到路由node,再從路由node獲取cluster state回來。
  4. 所有node執行master選舉。
  5. 所有node都會跟master進行通信,然後加入master的集群。

master選舉

node.master設置為true的,將成為master eligible node,也叫master候選節點,只有master eligible node才能被選舉成master node。如果是個小集群,那麼所有節點都可以是master eligible node,10個節點以上的集群,可以考慮拆分master node和data node,一般建議master eligible node給3個即可。

master選舉過程是自動完成的,有幾個參數可以影響選舉的過程:

  • discovery.zen.ping_timeout: 選舉超時時間,默認3秒,網絡狀況不好時可以增加超時時間。

  • discovery.zen.join_timeout: 有新的node加入集群時,會發送一個join request到master node,同樣因為網絡原因可以調大,如果一次超時,默認最多重試20次。

  • discovery.zen.master_election.ignore_non_master_pings:如果master node意外宕機了,集群進行重新選舉,如果此值為true,那麼只有master eligible node才有資格被選為master。

  • discovery.zen.minimum_master_nodes: 新選舉master時,要求必須有多少個 master eligible node去連接那個新選舉的master。而且還用於設置一個集群中必須擁有的master eligible node。如果這些要求沒有被滿足,那麼master node就會被停止,然後會重新選舉一個新的master。這個參數必須設置為我們的master eligible node的quorum數量。一般避免說只有兩個master eligible node,因為2的quorum還是2。如果在那個情況下,任何一個master候選節點宕機了,集群就無法正常運作了。

集群故障探查

有兩種集群故障探查機制

  1. master主動對集群中所有的其他node發起ping命令,判斷它們是否是存活着的。
  2. 每個node向master node發送ping請求,判斷master node是否存活,否則就會發起一個選舉過程。

有下面三個參數用來配置集群故障的探查過程:

  • ping_interval:ping一次node的間隔時間,默認是1s
  • ping_timeout:每次ping的timeout等待時長,默認是30s
  • ping_retries:對node的ping請求失敗了,重試次數,默認3次。

集群狀態更新

master node是集群中唯一可以對cluster state進行更新的node。更新的步驟如下:

  1. master node收到更新事件,如shard移動,可能會有多條事件,但master node一次只處理一個集群狀態的更新事件。
  2. master node將事件更新到本地,併發布publish message到集群所有的node上。
  3. node接收publish message后,對這個message返回ack響應,但是不會立即更新。
  4. 如果master沒有在指定的時間內(discovery.zen.commit_timeout配置項,默認是30s),從至少N個節點(discovery.zen.minimum_master_nodes配置項)獲取ack響應,那麼這次cluster state change事件就會被reject,最終不會被提交。
  5. 如果在指定時間內,指定數量的node都返回了ack消息,那麼cluster state就會被commit,然後master node把 commit message發送給所有的node。所有的node接收到那個commit message之後,接着才會將之前接收到的集群狀態應用到自己本地的狀態副本中去。
  6. master會等待所有node的commit message 的ack消息,在一個等待超時時長內,如果接收到了響應,表示狀態更新成功,master node繼續處理內存queue中保存的下一個更新事件。

discovery.zen.publish_timeout默認是30s,這個超時等待時長是從plublish cluster state開始計算的。

我們可以參照此圖:

master node宕機問題

Elasticsearch集群中,master node扮演着非常重要的角色,如果master node宕機了,那豈不是群龍無首了?雖然有master選舉,但這個也是要時間的,沒有master node那段空檔期集群該怎麼辦?

說了一半,基本上是完了,但我們也可以設置,群龍無首時哪些操作可以做,哪些操作不能做。

discovery.zen.no_master_block配置項可以控制在群龍無首時的策略:

  • all: 一旦master宕機,那麼所有的操作都會被拒絕。
  • write:默認的選項,所有寫操作都會被拒絕,但是讀操作是被允許的。

split-brain(腦分裂問題)

在Elasticsearch集群中,master node非常重要,並且只有一個,相當於整個集群的大腦,控制將整個集群狀態的更新,如果Elasticsearch集群節點之間出現區域性的網絡中斷,比如10個節點的Elasticsearch集群,4台node部署在機房A區,6台node部署在機房B區,如果A區與B區的交換機故障,導致兩個區隔離開來了,那麼沒有master node的那個區,會觸發master選舉,如果選舉了新的master,那麼整個集群就會出現兩個master node,這種現象叫做腦分裂。

這樣現象很嚴重,會破壞集群的數據,該如何避免呢?

回到我們前面提到的discovery.zen.minimum_master_nodes參數,這個值的正確設置,可以避免上述的腦分裂問題。

discovery.zen.minimum_master_nodes參數表示至少需要多少個master eligible node,才可以成功地選舉出master,否則不進行選舉。

足夠的master eligible node計算公式:

quorum = master_eligible_nodes / 2 + 1

如上圖我們10個node的集群,如果全部是master eligible node,那麼quorum = 10/2 + 1 = 6。

如果我們有3個master eligible node,7個data node,那麼quorum = 3/2 + 1 = 2。

如果集群只有2個節點,並且全是master eligible node,那麼quorum = 2/2 + 1 = 2,問題就來了,如果隨便一個node宕機,在只剩下一個node情況下,無法滿足quorum的值,master永遠選舉不成功,集群就徹底無法寫入了,所以只能設置成1,後果是只要這兩個node之間網絡斷了,就會發生腦分裂的現象。

所以一個Elasticsearch集群至少得有3個node,全部為master eligible node的話,quorum = 3/2 + 1 = 2。如果我們設置minimum_master_nodes=2,分析一下會不會出現腦分裂的問題。

場景一:A區一個node,為master,B區兩個node,為master eligible node

A區因為只剩下一個node,無法滿足quorum的條件,此時master取消當前的master角色,且無法選舉成功。

B區兩個master eligible node,滿足quorum條件,成功選舉出master。

此時集群還是只有一個master,待網絡故障恢復后,集群數據正常。

場景二:A區一個node,為master eligible node,B區2個node,其中一個是master

A區只有一個master eligible node,不滿足quorum的條件,無法進行選舉。

B區原本的master存在,不需要進行選舉,並且滿quorum的條件,master角色可以保留。

此時集群還是一個master,正常。

綜上所述:3個節點的集群,全部為master eligible node,配置discovery.zen.minimum_master_nodes: 2,就可以避免腦裂問題的產生。

minimum_master_nodes動態修改

因為集群是可以動態增加和下線節點的,quorum的值也會跟着改變。minimum_master_nodes參數值需要通過api隨時修改的,特別是在節點上線和下線的時候,都需要作出對應的修改。而且一旦修改過後,這個配置就會持久化保存下來。

修改api請求如下:

PUT /_cluster/settings
{
    "persistent" : {
        "discovery.zen.minimum_master_nodes" : 2
    }
}

響應報文:

{
  "acknowledged": true,
  "persistent": {
    "discovery": {
      "zen": {
        "minimum_master_nodes": "2"
      }
    }
  },
  "transient": {}
}

也可以通過命令查詢當前的配置:

GET /_cluster/settings

響應結果如下:

{
  "persistent": {
    "discovery": {
      "zen": {
        "minimum_master_nodes": "1"
      }
    }
  },
  "transient": {}
}
留一個問題

上圖10個節點的集群,假設全是master eligible node,按照上述的網絡故障,會不會出現腦分裂現象 ?配置項minimum_master_nodes最低要配置成多少,才不會出現腦分裂的問題?

小結

本篇主要介紹了Elasticsearch集群的部署和參數設置等知識,大部分都不需要人工干預,默認值已經是最優選,集群發現機制和master選舉機制了解一下就OK。

專註Java高併發、分佈式架構,更多技術乾貨分享與心得,請關注公眾號:Java架構社區
可以掃左邊二維碼添加好友,邀請你加入Java架構社區微信群共同探討技術

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

※想知道最厲害的網頁設計公司嚨底家"!

RWD(響應式網頁設計)是透過瀏覽器的解析度來判斷要給使用者看到的樣貌

我眼中的檳榔_潭子電動車

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

日本、大陸,發現這些先進的國家已經早就讓電動車優先上路,而且先進國家空氣品質相當好,電動車節能減碳可以減少空污

初識檳榔已是多年前的事情了,在很小的時候,大人們嚼檳榔,我們就已經耳濡目染了。後來我們長大了參加工作了。第一次吃檳榔還是朋友給的,在一次偶然的飯桌上,酒足飯飽后,朋友遞過來一顆小小的檳榔果,它穿着一層厚厚的芝麻大衣,包裹着細小的果肉,放入口中細細咀嚼,沁人心脾的清爽的感覺,刺激着味蕾,清香的果味混合著芝麻的香醇,回味悠久。

       

從那之後,我跟檳榔的接觸就越來越多了,我也開始了解了一些檳榔的文化,原來檳榔在很久很久以前的時候就已經為人們所知,併為人們所用了。

       

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

有別於一般網頁架設公司,除了模組化的架站軟體,我們的營業主軸還包含:資料庫程式開發、網站建置、網頁設計、電子商務專案開發、系統整合、APP設計建置、專業網路行銷。

相傳在很久以前,關於“檳榔”,有個神話傳說。古時候炎帝有個女兒叫賓,她的郎君長的英俊瀟洒,為人善良,平時逞強除惡,而兩人的感情非常的好,心意相通,有一次她的郎君在同妖魔相鬥時不幸被殺死,賓為了紀念她的郎君就將他葬於崑崙石下,化成為一片樹林,賓每天都悉心照顧着這片樹林,後來樹上結出綠油油的果實。

       於是賓將果實裝在茶包里,帶在身旁,以示紀念。後來賓把自己製作的茶包送給人們品嘗,據說人們吃了這種果實后就再不怕妖魔作惡了。於是人們就給這個綠油油的果實以賓加郎命名,於是就取名為檳榔。所以在古代的時候檳榔就已經為人們所食用了,而且還富有很多的價值。

       隨着後來檳榔的廣泛食用,檳榔的功效越來越多的被人們所知道。比如檳榔有很強的藥用價值,它能殺蟲消積,用於腸道寄生蟲,蛔蟲病、蟯蟲病。,而且檳榔是被歷代醫學學者作為治病的葯果,又被稱為“洗瘴丹”,咀食檳榔不僅可以消食下氣、祛痰導滯,而且可以促進腸胃吸收,增強腸道收縮能力,有潤腸通便的效果。

       除此之外,檳榔還被過去住高山地帶的人們常用來禦寒,和消除緊張勞動后的疲勞。所以檳榔還有提神醒腦的作用,當然檳榔的功效還不止是這些,隨便我對檳榔越來越多的認識,越來越覺得這顆小小的檳榔果實渾身上下都是寶。

本站聲明:網站內容來http://www.societynews.cn/html/wh/fq/,如有侵權,請聯繫我們,我們將及時處理

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

日本、大陸,發現這些先進的國家已經早就讓電動車優先上路,而且先進國家空氣品質相當好,電動車節能減碳可以減少空污

大峽谷里的甜蜜事業_包裝設計

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網動廣告出品的網頁設計,採用精簡與質感的CSS語法,提升企業的專業形象與簡約舒適的瀏覽體驗,讓瀏覽者第一眼就愛上她。

  王文攝(影像中國)

  登高臨水,車窗外,在高黎貢山和碧羅雪山的夾擊下,怒江艱難盤旋,峽谷之中左沖右突,闖前撞后,鐘鼓雷鳴,奔騰不息,和瀾滄江、金沙江一起衝激“三江併流”的奇瑰壯麗景象。我想,那些奔波在怒江峽谷間的扶貧隊伍,不就是一道道奮力衝出貧困大峽谷的瑰麗激流嗎?

   

  一

  許多人都讀過楊朔的散文名篇《荔枝蜜》,文中主人公“老梁”如今成了“梁老”,是廣東省蜂產品協會的名譽會長。不久前,偶然得知“梁老”被珠海市駐雲南省怒江傈僳族自治州扶貧工作組請了過去,到怒江考察養蜂扶貧。他不顧高齡,翻山越嶺考察之後,認為怒江十分適合養蜂,還順道給怒江的幹部們傳授了生動的“養蜂經”。隨後,珠海在怒江幫扶了兩百多個本地中華蜂養殖點,幫助當地群眾成就甜蜜的事業。

  中秋節前,為了看個究竟,我馬不停蹄去怒江州跑了幾天。古人曾以“水無不怒石,山有欲來峰”來描寫怒江。我震撼於高山峽谷、石裂川奔的壯美景色,目睹水石相搏、響之激越的驚濤駭浪,親見碧羅雪山和高黎貢山“兩山夾一江,一江拽二山”的畫面:“掛”在陡坡的耕地,木棚屋的篝火塘,危險的溜索,以及茶馬古道。正如宋代王安石所言:“世之奇偉、瑰怪,非常之觀,常在於險遠,而人之所罕至焉,故非有志者不能至也。”壯麗的風景往往綻放有志者非同尋常的壯美人生。

  珠海駐怒江扶貧工作組組長張松,一上車就說起他的養蜂扶貧經歷:從珠海帶來的企業,考察一遍多數都跑了。為啥?交通不便,百分之九十八的高山峽谷,怒江五十多萬人口,貧困的佔了一半。當地無機場、無火車、無航運、無高速公路,進出一趟要幾天時間。一年兩百多天都在下雨,道路常見塌方。

  一路顛簸一路看,我逐漸對怒江作為我國最貧困的“三區三州”之一有了切身認識。怒江的大山裡,很多地方風景優美,卻也是與貧困作鬥爭的一線。海拔越低,平地越少,山頂的村民除了在緩坡上種些玉米等糧食外,少有可脫貧致富的產業。

  怒江,渾身透露着大自然的那股韌勁兒。一邊雲霧繚繞,風光旖旎,幽谷懸岩,絕壑奔流,穠花異草,山水如畫;一邊面臨着艱巨的脫貧攻堅任務。鮮紅的三角梅,一簇簇迎風搖曳,山坡上開得正艷。張松說,扶貧還是要結合怒江當地實際,找准產業扶貧項目。“這不,我們把梁老請了過來,一番考察,發現怒江生態環境好,蜂蜜質量高,技術容易學,當地就有樹桶養蜂的傳統。而養蜂最需要的花源和蜜源,怒江都具備。玉米多,有充足的花粉供養小蜜蜂,利於蜂群繁殖。崇山峻岭,生態植被好,野桂花、杜鵑花、板栗花、草果花、油菜花……各種花多,有充足的蜜源,缺的是技術培訓和扶貧引領。”

  扶貧先扶志,有志者事竟成。扶貧項目確定后,工作組找到珠海專業養蜂的何伯農業公司合作,請他們派技術人員負責培訓,雙方一拍即合。一年來,派駐的一百多名珠海養蜂人行走在怒江的高山峽谷間,風餐露宿,披星戴月,培訓了兩千多名怒江養蜂人。

  養蜂人的到來打破了峽谷的寧靜,給怒江的山水增添了幾分熱鬧,也為我重新審視怒江打開了想象空間。怒江州因怒江而聞名。我以前覺得,怒江之美,最壯觀的是大峽谷,但現在覺得,怒江之美,還有那行走峽谷間,日夜與貧困頑強抗爭的養蜂人的堅韌。對為幫助他人實現美好生活而奔走的人,我們應該表達足夠的敬意!這正是人們綿延不息、不屈不撓、砥礪前行的精神偉力!

  登高臨水,車窗外,在高黎貢山和碧羅雪山的夾擊下,怒江艱難盤旋,峽谷之中左沖右突,闖前撞后,鐘鼓雷鳴,奔騰不息,和瀾滄江、金沙江一起衝激“三江併流”的奇瑰壯麗景象。我想,那些奔波在怒江峽谷間的扶貧隊伍,不就是一道道奮力衝出貧困大峽谷的瑰麗激流嗎?

  二

  扶貧工作組的同志熱情邀請我們去一個養蜂培訓點看看。綿綿細雨中,車子從丙中洛出發,沿怒江而行,崎嶇山路,斗折蛇行。遠眺山峰,一座座高腳木樓,錯落有致。約一個小時,來到一處山坡。坡地一字形擺放好幾排蜂箱,小蜜蜂從箱底沿小孔忙碌地進進出出。

  這裡是貢山獨龍族怒族自治縣閃當村吉木登小組的養蜂培訓點。村小組幹部領我們爬上山坡,鑽進一個簡陋的塑料棚。地上幾個木箱,箱上鋪着軍大衣,這就是守夜的床了。剛一坐下,他就介紹起小組養蜂培訓情況。

  “包了五十箱蜂,有一箱跑了蜂王,還有四十九箱蜂。”

  “大夥培訓了一個多月,每天傍晚5點到9點,集中培訓三四個小時。”

南投搬家公司費用需注意的眉眉角角,別等搬了再說!

上新台中搬家公司提供您一套專業有效率且人性化的辦公室搬遷、公司行號搬家及工廠遷廠的搬家服務

  他指着蜂箱旁邊席地散坐的十餘個村民說:“來培訓的都是建檔立卡貧困戶,免費培訓,我們村幹部要先學會,再教組裡的人。我有信心帶領大家學好養蜂技術,帶動脫貧。”

  老譚,是珠海何伯公司派駐的技術員,負責吉木登村小組培訓。他一身黃色的技術員服,十分醒目。談起養蜂,老譚滔滔不絕:“先要學習分蜂,繁育蜂群,一箱分成兩箱,一群分成兩群,也就幾天工夫。再有十多天就可以產蜜了。”

  “分群就要育王,育王先要製作育王杯。”

  老譚舉起一排蜂蠟製作的蜂巢說:“瞧,這就是育王杯,把一到三天的蜜蜂幼蟲放進去,喂以蜂王漿,長大就是蜂王,再挑選出強壯的放進蜂箱就可以分群了。”

  “新蜂王放進去前,先要把老蜂王關起來,不然會打架。”老譚小心地打開蜂箱,從箱角拿起一個火柴盒大小的白塑料盒,有細欄杆遮攔。細瞅,裏面關着一隻大個頭的黑蜂王。“蜂王個頭大,鑽不出蜂盒,瞧,那幾隻進出忙碌的小蜜蜂在忙着喂它呢。”

  “蜂群如果沒有蜂王,蜜蜂就會四散而去。剛放的新蜂王需要老蜂王壓壓陣容,等到新蜂王立穩了,就可以和老蜂王分開,分出新的群。蜂群多,產蜜才多啊!”

  我好奇地問:“為什麼老要換新蜂王呢?”

  “蜂王壽命三年,繁殖能力下降。新蜂王年輕力壯,蜂群的繁殖就快。年輕的蜂王一天多的可產八百多粒子。”

  面對眾人的好奇,老譚小心翼翼取出一巢框蜜蜂給我們看,巢框密密麻麻爬滿蜜蜂。蜂王早早躲藏起來。他指着巢框白色的部分說:“白色的是蜂蠟,下面覆蓋的就是蜂蜜。取蜜時要先割掉蠟。黃色的部分是花粉,哺喂花粉長大的就是小蜜蜂。”

  “蜂蜜是蜜蜂的糧食,人類都取走後,蜜蜂怎麼辦?”

  老譚笑着說:“不用擔心,每次取蜜,都會給蜜蜂留口糧的。蜜蜂雖然很勤勞,但是,當蜂蜜充足的時候,蜜蜂也會懶惰不再工作。人們取走蜂蜜后,小蜜蜂就又會辛勤勞動了。”

  我開始為蜜蜂着迷。蜜蜂的團結合作、勤勞奉獻,給我留下了深刻印象。它們以自己獨特的方式“參与”到東西部結對扶貧事業中。今年,珠海投入幫扶資金三千多萬元,向怒江一百八十七個養殖點贈送一萬八千餘箱蜜蜂。村裡每個建檔立卡貧困戶可分到五箱蜜蜂,按一箱年產十斤蜜、每斤五十元計算,一戶年收入兩千五百元。如果學會分箱技術,收入就更高,並且由何伯公司以保底價收購。貧困戶可以採取承包養、合作社養等多種方式。

  “收蜜啦,收蜜啦!”山花綻放,老窩河歡快地匯入怒江,清脆的歡呼聲傳遍山林。瀘水新寨村的養蜂點,第一次收蜜就搖出三百斤純蜜,不等收購,被本地市場一掃而光。去年,怒江三十個養殖點,就幫扶建檔立卡貧困戶一千五百多戶,六千五百多人受益,其中七百四十多人當年脫貧。

  養蜂扶貧賬,越算越開心。“這中間還有個小曲折,剛開始貢山縣個別同志對引進外地中蜂養殖有疑慮,我趕到縣裡組織養蜂座談會,從科學角度作解釋,說服大家同意引進珠海中蜂養殖。”梁老的一番話解開了謎底,“過去怒江人也養蜂,是從樹洞、岩穴里的蜂窩獵蜜取子,毀巢取蜜,方法原始。如今科學馴化養蜂,送蜂上門、技術幫扶、保底收購,加上其他種養項目,收入倍增,很快邁上脫貧奔小康之路……”

  短短几天,我目睹了峽谷萬物從不放棄生長的強大生命力。樹木枝繁恭弘=叶 恭弘茂,灌木千姿百態。野薔薇,就跟裝了彈簧似的,要蹦到太陽身邊去。萬物生長,增添了我對蜜蜂的喜愛,也啟迪我找到了此行的收穫:扶貧一定要扶到群眾心坎上。只有切合實際的精準扶貧,村民收穫才會更大。不僅要扶智扶志,還要做好技術服務,這樣扶貧會更有生命力。

  養蜂的知識還沒學夠,催生植物的雨水又到了,滴滴答答地送來了漫山遍野的花期。像蜜蜂抓緊花期釀蜜一樣,怒江也到了脫貧攻堅的關鍵期。讓我感動的是,梁老以七十七歲高齡與三百多名珠海扶貧幹部一起,奔走在怒江的高山峽谷間,為消除貧困而跋山涉水。珠海市三年來在此地投入幫扶資金近九億元,支持了養蜂、中藥種植、蔬菜基地、扶貧搬遷等四百多個扶貧項目,帶動建檔立卡貧困群眾兩萬多人脫貧。當地群眾正在擺脫深度貧困,迎來新的歷史性跨越。

  望着飛來飛去的小蜜蜂,我也忽然想到,這些勤勞的小蜜蜂知不知道自己的辛勤勞動,也是在為怒江脫貧攻堅而釀造甜蜜的未來呢?

本站聲明:網站內容來http://www.societynews.cn/html/wh/fq/,如有侵權,請聯繫我們,我們將及時處理

※產品缺大量曝光嗎?你需要的是一流包裝設計!

窩窩觸角包含自媒體、自有平台及其他國家營銷業務等,多角化經營並具有國際觀的永續理念。

國務院教育督導委員會辦公室:將開展義務教育教師工資待遇落實情況督導_台中搬家

台中搬家遵守搬運三大原則,讓您的家具不再被破壞!

台中搬家公司推薦超過30年經驗,首選台中大展搬家

新華社北京12月20日電(記者施雨岑)國務院教育督導委員會辦公室近日發出通知,將於2020年開展義務教育教師工資待遇落實情況督導,要求高度重視義務教育教師工資待遇保障工作,義務教育教師平均工資收入水平應當不低於當地公務員平均工資收入水平。

台中搬家公司費用怎麼算?

擁有20年純熟搬遷經驗,提供免費估價且流程透明更是5星評價的搬家公司

通知強調,各地要做好統籌安排,按照國務院關於保障義務教育教師工資待遇的工作部署,加大工作力度。在年終為公務員發放獎勵性補貼及安排下一年度財政預算時,務必統籌考慮義務教育教師待遇保障問題。

通知明確,各省(自治區、直轄市)要結合當地實際,於2020年上半年對本行政區域內義務教育教師工資收入落實情況組織督導檢查。對發現政策落實不到位的,要採取約談、問責等多種措施督促整改。2020年,國務院教育督導委員會辦公室將把義務教育教師工資收入保障情況作為重點內容進行督導。

本站聲明:網站內容來http://www.societynews.cn/html/wh/fq/,如有侵權,請聯繫我們,我們將及時處理

台中搬家公司費用怎麼算?

擁有20年純熟搬遷經驗,提供免費估價且流程透明更是5星評價的搬家公司

Apple TV 確認降臨 Chromecast with Google TV 電視棒_網頁設計

※推薦評價好的iphone維修中心

擁有專業的維修技術團隊,同時聘請資深iphone手機維修專家,現場說明手機問題,快速修理,沒修好不收錢

2020 雖然是很O的一年,但同時也是各廠更願意正視居家娛樂與通訊交流的一年。至少除了各種通訊服務與視訊功能以外,今年我們更開始看到 Apple 服務開始在各種平台上廣為散佈。而在蘋果開放讓 Apple Music 正式登陸 Google Nest 智慧喇叭後(雖然台灣還沒看到選項 QQ),現在則是輪到了當今最流行的電視串流棒之一 Chromecast。繼續閱讀 Apple TV 確認降臨 Chromecast with Google TV 電視棒報導內文。

▲圖片來源:Google

Apple TV 確認降臨 Chromecast with Google TV 電視棒

可能是現階段可以用最低成本(最新版 Chromecast 僅美金 49.99 元,也就約台幣 1,500),就能讓電視升級成能觀看 Apple TV 的智慧電視的途徑 — 雖然,還要等到 2021 年初才會更新(反正台灣還不能直接買囉)。

Google 今天在官方部落格宣佈 Apple TV app 將正式支援 Google 電視棒產品的消息。而首個支援的產品則是最新直接提供遙控器的 Chromecast with Google TV — 想知道它好不好用的,可以參考我們的 Chromecast with Google TV 開箱體驗(傳送門)。

雖然確定會不會支援更多之前的 Google TV 串流裝置(希望可以)。不過在 Apple TV app 正式登上 Chromecast 後,訂閱蘋果影視串流服務的使用者,將可以輕鬆以遙控器或是語音的方式瀏覽觀看 Apple TV+ 的各式影片內容。

現階段 Apple TV 登上各平台的速度真的遠超過以往,近期已經有不少電視產品直接內建(最新的應該是 Sony BRAVIA 系列宣布提供此支援);主流家用遊戲主機 PlayStation 更是不僅在最新的 PS5 上架 Apple TV app,甚至也往下支援 PS4 舊款主機。不得不說,這對於愛用 TV+ 的朋友們而言真的是很棒的發展趨勢啊。

本篇圖片 / 引用來源

延伸閱讀:

網頁設計最專業,超強功能平台可客製化

窩窩以「數位行銷」「品牌經營」「網站與應用程式」「印刷品設計」等四大主軸,為每一位客戶客製建立行銷脈絡及洞燭市場先機。

Chromecast with Google TV 快速開箱動手玩,讓失智電視也聰明起來

Google Home / Nest 智慧喇叭開始支援 Apple Music

您也許會喜歡:

【推爆】終身$0月租 打電話只要1元/分

立達合法徵信社-讓您安心的選擇

台北網頁設計公司這麼多該如何選擇?

網動是一群專業、熱情、向前行的工作團隊,我們擁有靈活的組織與溝通的能力,能傾聽客戶聲音,激發創意的火花,呈現完美的作品

Intel 發表新一代 Optane SSD 系列,並同步推出三款採 144 層單元 TLC、QLC 的新款 SSD_貨運

※評比南投搬家公司費用收費行情懶人包大公開

搬家價格與搬家費用透明合理,不亂收費。本公司提供下列三種搬家計費方案,由資深專業組長到府估價,替客戶量身規劃選擇最經濟節省的計費方式

等了許久的時間,Intel 終於推出了新一代 Optane SSD 系列,分別為專為資料中心設計的 Intel Optane SSD P5800X,以及專為消費端打造的 Intel Optane Memory H20。另外,這次還同步亮相 3 款導入 144 層單元的 NAND SSD。

Intel 發表新一代 Optane SSD 系列與三款採 144 層單元的 SSD

Intel Optane Memory H20 結合 Intel  Optane Memory 與 Intel QLC 3D NAND 技術,為下一世代輕薄筆電所設計,可放入更有限的空間內,透過 PCIe 的高效傳輸速度,讓使用者體驗到更快的反應速度,如:搜尋檔案、執行應用程式等等:

Intel Optane SSD P5800X 是 Intel 首款導入 PCIe 4.0 控制器的 SSD,相較於前一世代產品,效能上提升 3 倍以上,並擁有更出色的耐用性:

D7-P5600 與P5800X 的 KIOPS 差異:

而 3 款 144 層單元的 NAND SSD 分別為:「Intel SSD 670p」、「Intel SSD D7-P5510」與「Intel SSD D5-P5316」。

Intel SSD 670p 採用 144 層 QLC 3D NAND 設計,提供端到端資料保護與 Pyrite 2.0 的安全性和斷電通知:

跟上一代 660p 相比,670 的動態 SLC Cache 提升 11%:

※智慧手機時代的來臨,RWD網頁設計為架站首選

網動結合了許多網際網路業界的菁英共同研發簡單易操作的架站工具,及時性的更新,為客戶創造出更多的網路商機。

Intel SSD D7-P5510 為全球首款 144 層 TLC NAND 設計的 SSD,專為雲端高容量儲存市場打造,具備 U.2 外型,容量部分有 3.84TB 與 7.68TB 兩種選擇,也內建改進後的裝置健康監控,預計 2020 年底開賣:

Intel SSD D5-P5316 是業界首款針對大容量儲存裝置研發的 144 層 QLC NAND SSD,每個晶粒具備 128GB 容量,預計提供 15.36TB 與 30.72TB 兩種容量。而藉由 QLC 優勢,相較於傳統硬碟不僅提升 200% 的讀取效能與 38% 隨機讀取效能,也降低 48% 的存取延遲。外型方面有 U.2 與 E1.L 兩種,預計 2021 上半年開賣:

至於詳細推出日期與價格部分目前都還不知道,但猜測離開賣時間越近,Intel 就會公布更詳細的售價資訊。

部分圖片來源:anandtech

Intel Evo 平台認證筆電精銳盡出,絕佳效能讓你搶佔先機

您也許會喜歡:

【推爆】終身$0月租 打電話只要1元/分

立達合法徵信社-讓您安心的選擇

※回頭車貨運收費標準

宇安交通關係企業,自成立迄今,即秉持著「以誠待人」、「以實處事」的企業信念

Netflix 正在悄悄向部分Android 版用戶推出音訊播放模式_網頁設計公司

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

節能減碳愛地球是景泰電動車的理念,是創立景泰電動車行的初衷,滿意態度更是服務客戶的最高品質,我們的成長來自於你的推薦。

有些時候,你會不會只想聽電影或電視節目的音訊而不想看影片?有些節目確實用聽的就好,Netflix 想要覆蓋全部用戶所希望的功能,如果你是一個 Android 裝置的用戶,那你可得好好期待一下。已經有用戶發現自己的應用程式中出現了音訊播放的選項,下次去跑步時就聽著你最愛的影集吧!

Netflix 正在悄悄向部分Android 版用戶推出音訊播放模式

早在今年 10 月間,XDA Developer 就已經在 Android 應用程式中發現了音訊模式的蹤跡,只是當時設定中的該選項併不能使用,而現在 Netflix 似乎開始正式推出這項功能供用戶選擇。據國外媒體 Android Police 報導,這項功能出現在應用程式版本 7.84.1 build 28 35243 中,當你獲得該功能並且於設定中啟用它後,當影片全螢幕播放時,你會看到一個新的影片關閉按鈕,只要啟動音訊模式,Netflix 用戶將能在不看影片的情況下單純聆聽喜愛的節目、影集與電影。

▲圖片來源:Android Police

在介面上你還是可以看到熟悉的控制項,包含改變播放的速度、倒帶、快轉、暫停等,與看影片時的體驗一模一樣,只是少了影像而已。對於某些行動網路沒有辦理吃到飽方案的人,又或是網路費用較高的國家來說,光只有音訊播放可以節省下一些網路流量。你可以把它想像成就像在聽 Podcast,但又不太一樣,對於很多電影與節目來說,影像對整體來說並不是關鍵,特別像是家庭情境喜劇、脫口秀、記錄片之類的節目。

▲圖片來源:Android Police

對於提供線上串流影片服務來說,很多人會覺得光只有聲音這點怪怪的,但有時一卯起來看就是會讓人欲罷不能,音訊模式在家裡可能比較沒有意義,但在通勤途中或當作工作時的背景噪音就會很實用。

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

透過選單樣式的調整、圖片的縮放比例、文字的放大及段落的排版對應來給使用者最佳的瀏覽體驗,所以不用擔心有手機版網站兩個後台的問題,而視覺效果也是透過我們前端設計師優秀的空間比例設計,不會因為畫面變大變小而影響到整體視覺的美感。

◎資料來源:Android Police、

您也許會喜歡:

【推爆】終身$0月租 打電話只要1元/分

立達合法徵信社-讓您安心的選擇

南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

搬家費用:依消費者運送距離、搬運樓層、有無電梯、步行距離、特殊地形、超重物品等計價因素後,評估每車次單

Robot Framework(15)- 擴展關鍵字_潭子電動車

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

日本、大陸,發現這些先進的國家已經早就讓電動車優先上路,而且先進國家空氣品質相當好,電動車節能減碳可以減少空污

如果你還想從頭學起Robot Framework,可以看看這個系列的文章哦!

https://www.cnblogs.com/poloyy/category/1770899.html

 

前言

  • 什麼是擴展關鍵字?就是你自己寫的 Python 文件,裡面包含了函數或者類
  • 然後 RF 導入這個 Python 模塊,就可以調用函數或者類方法,它們就是擴展關鍵字

 

Python 模塊作為測試庫

模塊文件名作為測試庫的名字

比如:Python 模塊名叫 MyLibrary,文件名是 MyLibrary.py,那麼測試庫的名字就叫做 MyLibrary

 

Python 模塊和 Robot 文件同目錄下的栗子

這是目錄結構哈

python 模塊的代碼

def returnlist():
    return [i for i in range(10)]


def return_dict():
    return {"a": "hahhahahaahah"}


# 以下劃線開頭的函數不能作為RF關鍵字
def _returnlist2():
    return [1, 2]

robot 代碼

進入test目錄下,運行以下命令

 robot -P . test.robot 

執行結果

知識點

  • _前綴的方法不會作為關鍵字,在Python裏面, _ 開頭的方法是私有方法,RF 不會識別到它
  • Python 方法作為關鍵字也是大小寫不敏感
  • RF 中會把關鍵字的 _ 和單個空格忽略掉,所以 returndict、return dict、return_dict 都是調用同一個關鍵字

 

Python 類作為測試庫的栗子

項目目錄

所有 Python 測試代碼都在 tlib2.py 裏面哦

最終運行是在【15_擴展關鍵字】目錄下運行的,命令如下

robot -P . testrf

 

栗子一:類初始化不需要傳參

python 代碼

class SubLibrary:
    def __init__(self):
        pass

    def returnint(self):
        return 2020

    def _returnint2(self):
        return 4

robot 代碼

測試結果

知識點

  • 在類裏面, _ 前綴的方法不會當做關鍵字
  • 同樣,類中聲明的方法當做關鍵字的話,大小寫不敏感

 

栗子二:類初始化需要傳參

python 代碼

from robot.api import logger
class SubLibrary2: def __init__(self, host, port, table='test'): self.host = host self.port = port self.table = table def printaddr2(self): logger.console('host:%s,port:%s,table:%s' % (self.host, self.port, self.table))

robot 代碼

測試結果

知識點

如果類的 __init__ 初始化方法需要傳參,則在導入庫後面跟對應的參數列表

拓展 Python 知識點:先有類對象,還是先執行類初始化方法?

 __new__ 方法產生對象

 __init__ 對象的初始化方法

先 new 一個對象,再 init 一個對象

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

有別於一般網頁架設公司,除了模組化的架站軟體,我們的營業主軸還包含:資料庫程式開發、網站建置、網頁設計、電子商務專案開發、系統整合、APP設計建置、專業網路行銷。

 

栗子三:類名和模塊名相同

python 代碼

from robot.api import logger

class tlib2:
    def __init__(self, host, port):
        self.host = host
        self.port = port

    def printaddr(self):
        logger.console('host:%s,port:%s' % (self.host, self.port))

robot 代碼

測試結果

知識點

如果類名和模塊名相同,可以不用導入類名

 

栗子四:使用路徑法導入 Python 模塊

Python 代碼用的還是栗子三的

robot 代碼

測試結果

知識點

如果用路徑法,需要注意導入 Python 模塊需要有文件後綴哦,且用 / 來表示目錄下

重點:使用路徑法,只能導入和模塊名相同的類名!

 

Python 擴展庫的搜索規則

統一的規則

  • 先根據 robot 文件自身當前目錄下查找庫文件
  • 如果沒有找到則再根據 –pythonpath 和 -P 提供的搜索路徑進行搜索
  • 最後找 Python 安裝的路徑

 

Python 庫引入了其他模塊

背景

當 robot 文件導入的 Python 測試庫引入了其他模塊時,應該怎麼寫導入路徑?

正確寫法

確保導入的模塊路徑和RF導入的模塊起始路徑統一

看栗子

 testother.robot  導入 test.py 模塊, test.py  模塊引入了 login.py 模塊的方法

目錄結構

login.py 代碼

from robot.api import logger


def login_test():
    logger.console('test login')

test.py 代碼

from pylib.login import login_test
# from login import login_test 報錯

def test():
    login_test()

robot 的代碼

在 othertest 目錄下運行下面命令

robot -P . testother.robot

測試結果

結論

  • 可以看到 robot 文件引入的路徑是 pylib 開頭, test 模塊引入 login 模塊的路徑也是 pylib 開頭
  • 如果路徑是 login 開頭導入,那麼運行robot文件將會報錯(如下圖,包含了解析錯誤)

 

Python 庫中的 class 存在繼承

背景

當 robot 文件導入 Python 測試庫的類繼承了另一個類,應該怎麼寫導入路徑?

正確寫法

  • 確保導入的模塊路徑和RF導入的模塊起始路徑統一
  • 使用的時候 RF 文件只需導入子類即可

看栗子

 test.robot 引入了 other.py  模塊下的 Child 類,而 Child 類繼承了 Base.py 模塊下的 Father 類

目錄結構

base.py 的代碼

from robot.libraries.BuiltIn import logger


class Father:
    def __init__(self):
        logger.console('init Father')

    def money(self):
        return '$10000'

other.py 的代碼

from robot.api import logger
from pylib.Base import Father


class Child(Father):
    def __init__(self):
        Father.__init__(self)
        logger.console('init Child')

    def use_money(self):
        return self.money()

    def make_money(self):
        return '$9999'

robot 的代碼

在 testClass 目錄下運行下面命令

robot -P . test.robot

測試結果

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

日本、大陸,發現這些先進的國家已經早就讓電動車優先上路,而且先進國家空氣品質相當好,電動車節能減碳可以減少空污