python機器學習——隨機梯度下降

上一篇我們實現了使用梯度下降法的自適應線性神經元,這個方法會使用所有的訓練樣本來對權重向量進行更新,也可以稱之為批量梯度下降(batch gradient descent)。假設現在我們數據集中擁有大量的樣本,比如百萬條樣本,那麼如果我們現在使用批量梯度下降來訓練模型,每更新一次權重向量,我們都要使用百萬條樣本,訓練時間很長,效率很低,我們能不能找到一種方法,既能使用梯度下降法,但是又不要每次更新權重都要使用到所有的樣本,於是隨機梯度下降法(stochastic gradient descent)便被提出來了。

隨機梯度下降法可以只用一個訓練樣本來對權重向量進行更新:
\[ \eta(y^i-\phi(z^i))x^i \]
這種方法比批量梯度下降法收斂的更快,因為它可以更加頻繁的更新權重向量,並且使用當個樣本來更新權重,相比於使用全部的樣本來更新更具有隨機性,有助於算法避免陷入到局部最小值,使用這個方法的要注意在選取樣本進行更新時一定要隨機選取,每次迭代前都要打亂所有的樣本順序,保證訓練的隨機性,並且在訓練時的學習率也不是固定不變的,可以隨着迭代次數的增加,學習率逐漸減小,這種方法可以有助於算法收斂。

現在我們有了使用全部樣本的批量梯度下降法,也有了使用單個樣本的隨機梯度下降法,那麼一種折中的方法,稱為最小批學習(mini-batch learning),它每次使用一部分訓練樣本來更新權重向量。

接下來我們實現使用隨機梯度下降法的Adaline

from numpy.random import seed
class AdalineSGD(object):
    """ADAptive LInear NEuron classifier.

    Parameters
    ----------
    eta:float
        Learning rate(between 0.0 and 1.0
    n_iter:int
        Passes over the training dataset.

    Attributes
    ----------
    w_: 1d-array
        weights after fitting.
    errors_: list
        Number of miscalssifications in every epoch.
    shuffle:bool(default: True)
        Shuffle training data every epoch
        if True to prevent cycles.
    random_state: int(default: None)
        Set random state for shuffling
        and initalizing the weights.

    """

    def __init__(self, eta=0.01, n_iter=10, shuffle=True, random_state=None):
        self.eta = eta
        self.n_iter = n_iter
        self.w_initialized = False
        self.shuffle = shuffle
        if random_state:
            seed(random_state)

    def fit(self, X, y):
        """Fit training data.

        :param X:{array-like}, shape=[n_samples, n_features]
        :param y: array-like, shape=[n_samples]
        :return:
        self:object

        """

        self._initialize_weights(X.shape[1])
        self.cost_ = []

        for i in range(self.n_iter):
            if self.shuffle:
                X, y = self._shuffle(X, y)
            cost = []
            for xi, target in zip(X, y):
                cost.append(self._update_weights(xi, target))
            avg_cost = sum(cost)/len(y)
            self.cost_.append(avg_cost)
        return self
    
    def partial_fit(self, X, y):
        """Fit training data without reinitializing the weights."""
        if not self.w_initialized:
            self._initialize_weights(X.shape[1])
        if y.ravel().shape[0] > 1:
            for xi, target in zip(X, y):
                self._update_weights(xi, target)
        else:
            self._update_weights(X, y)
        return self
    
    def _shuffle(self, X, y):
        """Shuffle training data"""
        r = np.random.permutation(len(y))
        return X[r], y[r]
    
    def _initialize_weights(self, m):
        """Initialize weights to zeros"""
        self.w_ = np.zeros(1 + m)
        self.w_initialized = True
    
    def _update_weights(self, xi, target):
        """Apply Adaline learning rule to update the weights"""
        output = self.net_input(xi)
        error = (target - output)
        self.w_[1:] += self.eta * xi.dot(error)
        self.w_[0] += self.eta * error
        cost = 0.5 * error ** 2
        return cost
    
    def net_input(self, X):
        """Calculate net input"""
        return np.dot(X, self.w_[1:]) + self.w_[0]
    
    def activation(self, X):
        """Computer linear activation"""
        return self.net_input(X)
    
    def predict(self, X):
        """Return class label after unit step"""
        return np.where(self.activation(X) >= 0.0, 1, -1)

其中_shuffle方法中,調用numpy.random中的permutation函數得到0-100的一個隨機序列,然後這個序列作為特徵矩陣和類別向量的下標,就可以起到打亂樣本順序的功能。

現在開始訓練

ada = AdalineSGD(n_iter=15, eta=0.01, random_state=1)
ada.fit(X_std, y)

畫出分界圖和訓練曲線圖

plot_decision_region(X_std, y, classifier=ada)
plt.title('Adaline - Stochastic Gradient Desent')
plt.xlabel('sepal length [standardized]')
plt.ylabel('petal length [standardized]')
plt.legend(loc = 'upper left')
plt.show()
plt.plot(range(1, len(ada.cost_) + 1), ada.cost_, marker='o')
plt.xlabel('Epochs')
plt.ylabel('Average Cost')
plt.show()

從上圖可以看出,平均損失下降很快,在大概第15次迭代后,分界線和使用批量梯度下降的Adaline分界線很類似。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

解鎖新夥伴關係 政府、企業與社會的氣候合作

文:李翊僑(荷蘭馬斯垂克大學永續科學、政策與社會碩士生)

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

電動車商機蓬勃,兩岸投資兩樣情

全球電動車市場持續成長,直接刺激鋰電池與電動馬達等相關零組件的需求量。台灣、中國兩岸廠商積極針對電動車進行投資布局,但局勢因策略與市場的不同而有所差異。

中國投資額高,但潛藏不穩定性

中國是目前全球電動車市場成長速度最快的國家之一,但今年首季因補貼政策所造成的不確定性,使電動車銷量比去年同期暴跌九成。

不少中國地方政府將電動車與新能源車列為新興產業投資鼓勵對象,也吸引了大筆投資。據統計,2015年至今,中國大陸各地規劃或已展開建設的新能源汽車建設專案超過30個,總投資額超過人民幣1,000億元,呈現遍地開花的局勢。

但《經濟日報》引述中國媒體指出,上述專案多以「新能源汽車產業圈」的形式存在,看似投資新能源車,實際上是為了恢復原有的汽車產能。部分電動車生產企業甚至已進入停產狀態,未來若無法改善,可能會被強制退市。

雖然中國電動車相關投資積極,但卻有泡沫化的隱憂。

台電池廠投入國際電動車市場

相較之下,台灣長泓能源科技、宏境科技對電動車產業的佈局較為穩健,且也更為注重其他海外市場。

長泓能源科技與台灣車廠、客運業者合作,推動電動大巴專案,目標搶下台灣政府10年內投入1萬輛電動大巴的政策。同時,長泓已與特斯拉的移動式20kWh儲能系統、台塑汽車啟動電池等展開測試合作,積極步入電池組裝領域。

長泓能源科技董事長陳明德表示,公司強化氧化鋰鐵電池的產品性能與應用整合,安全性高,且有機會搶攻中國大陸電動車市場。

另一方面,宏境科技已取得美國電動沙灘巡邏車電池組的1,000輛訂單,同時與某汽車通路商合作,打入中國大陸NEXT EV油電混和車馬達控制器供應鏈,數量可望超過一萬輛。宏境科技也已取得中瑞電動車每年550輛的動力鋰電池組訂單,每年可貢獻約1億元新台幣營收。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

F-立凱、五龍動力聯手搶攻中國電動車市場

F-立凱公告將與五龍動力、香港上市公司五龍電動車集團簽訂三方交易契約,共同深化電動車產業的上、下游布局,著眼爭取中國乃至於全球的電動車市場。

根據協議,三方將以策略聯盟、資本合作的方式取得股權轉換方案。契約載明,五龍動力將以每股新台幣35元的價格取得立凱電新發行的普通股,佔增資後股本21.8%;思慕完成後,五龍電動車集團在另外以1億港幣(約新台幣4.2億)現金取得立凱電旗下車電事業部門──立凱綠能(蓋曼)股權,以及台灣立凱綠能的部分資產。交易完成後,五龍動力與五龍電動車集團預計將投資凱電超過新台幣20億元。

此外,立凱電同時將以每股0.5元港幣的價格認購五龍電動車集團新發行之430萬普通股與2.75億元港幣的無擔保可轉換公司債,總投資額預計將達港幣4.9億元,並成為五龍電動車集團股東。此舉將幫助立凱電正式邁入中國電動車市場。

F-立凱由尹衍樑投資,為電動巴士系統與磷酸鐵鋰電池正極材料廠商。與五龍動力、五龍電動車集團的交易完成後,將在兩岸三地與國際市場進行明確產業分工,由立凱電繼續投入正極材料研發、製造與銷售,同時為五龍動力提供LFP-NCO奈米金屬氧化物共晶體化磷酸鋰鐵電池正極材料M系列產品的生產與技術顧問服務。三方也將合作於中國建立材料廠,以因應未來中國電動車龐大的材料需求。

此外,台灣立凱綠能也將與五龍電動車在電池芯、電池模組以及電動車技術等領域合作,同時不忘繼續拓展台灣電動巴士業務。F-立凱表示,本次策略聯盟,將可幫助F-立凱、五龍動力、五龍電動車集團、立凱電整合技術、製造、市場、供應鏈與資金,全面進軍中國與全球的儲能市場和電動車市場。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

Java 8 Streams API 詳解

流式編程作為Java 8的亮點之一,是繼Java 5之後對集合的再一次升級,可以說Java 8幾大特性中,Streams API 是作為Java 函數式的主角來設計的,誇張的說,有了Streams API之後,萬物皆可一行代碼。

什麼是Stream

Stream被翻譯為流,它的工作過程像將一瓶水導入有很多過濾閥的管道一樣,水每經過一個過濾閥,便被操作一次,比如過濾,轉換等,最後管道的另外一頭有一個容器負責接收剩下的水。

示意圖如下:

首先通過source產生流,然後依次通過一些中間操作,比如過濾,轉換,限制等,最後結束對流的操作。

Stream也可以理解為一個更加高級的迭代器,主要的作用便是遍歷其中每一個元素。

為什麼需要Stream

Stream作為Java 8的一大亮點,它專門針對集合的各種操作提供各種非常便利,簡單,高效的API,Stream API主要是通過Lambda表達式完成,極大的提高了程序的效率和可讀性,同時Stram API中自帶的并行流使得併發處理集合的門檻再次降低,使用Stream API編程無需多寫一行多線程的大門就可以非常方便的寫出高性能的併發程序。使用Stream API能夠使你的代碼更加優雅。

流的另一特點是可無限性,使用Stream,你的數據源可以是無限大的。

在沒有Stream之前,我們想提取出所有年齡大於18的學生,我們需要這樣做:

List<Student> result=new ArrayList<>();
for(Student student:students){
 
    if(student.getAge()>18){
        result.add(student);
    }
}
return result;

使用Stream,我們可以參照上面的流程示意圖來做,首先產生Stream,然後filter過濾,最後歸併到容器中。

轉換為代碼如下:

return students.stream().filter(s->s.getAge()>18).collect(Collectors.toList());
  • 首先stream()獲得流
  • 然後filter(s->s.getAge()>18)過濾
  • 最後collect(Collectors.toList())歸併到容器中

是不是很像在寫sql?

如何使用Stream

我們可以發現,當我們使用一個流的時候,主要包括三個步驟:

  • 獲取流
  • 對流進行操作
  • 結束對流的操作

獲取流

獲取流的方式有多種,對於常見的容器(Collection)可以直接.stream()獲取
例如:

  • Collection.stream()
  • Collection.parallelStream()
  • Arrays.stream(T array) or Stream.of()

對於IO,我們也可以通過lines()方法獲取流:

  • java.nio.file.Files.walk()
  • java.io.BufferedReader.lines()

最後,我們還可以從無限大的數據源中產生流:

  • Random.ints()

值得注意的是,JDK中針對基本數據類型的昂貴的裝箱和拆箱操作,提供了基本數據類型的流:

  • IntStream
  • LongStream
  • DoubleStream

這三種基本數據類型和普通流差不多,不過他們流裏面的數據都是指定的基本數據類型。

Intstream.of(new int[]{1,2,3});
Intstream.rang(1,3);

對流進行操作

這是本章的重點,產生流比較容易,但是不同的業務系統的需求會涉及到很多不同的要求,明白我們能對流做什麼,怎麼做,才能更好的利用Stream API的特點。

流的操作類型分為兩種:

  • Intermediate:中間操作,一個流可以後面跟隨零個或多個intermediate操作。其目的主要是打開流,做出某種程度的數據映射/過濾,然後會返回一個新的流,交給下一個操作使用。這類操作都是惰性化的(lazy),就是說,僅僅調用到這類方法,並沒有真正開始流的遍歷。

    map (mapToInt, flatMap 等)、 filter、 distinct、 sorted、 peek、 limit、 skip、 parallel、 sequential、 unordered

  • Terminal:終結操作,一個流只能有一個terminal操作,當這個操作執行后,流就被使用“光”了,無法再被操作。所以這必定是流的最後一個操作。Terminal操作的執行,才會真正開始流的遍歷,並且會生成一個結果,或者一個 side effect。

    forEach、 forEachOrdered、 toArray、 reduce、 collect、 min、 max、 count、 anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 iterator

IntermediateTerminal完全可以按照上圖的流程圖理解,Intermediate表示在管道中間的過濾器,水會流入過濾器,然後再流出去,而Terminal操作便是最後一個過濾器,它在管道的最後面,流入Terminal的水,最後便會流出管道。

下面依次詳細的解讀下每一個操作所能產生的效果:

中間操作

對於中間操作,所有的API的返回值基本都是Stream<T>,因此以後看見一個陌生的API也能通過返回值判斷它的所屬類型。

map/flatMap

map顧名思義,就是映射,map操作能夠將流中的每一個元素映射為另外的元素。

 <R> Stream<R> map(Function<? super T, ? extends R> mapper);

可以看到map接受的是一個Function,也就是接收參數,並返回一個值。

比如:

//提取 List<Student>  所有student 的名字 
List<String> studentNames = students.stream().map(Student::getName)
                                             .collect(Collectors.toList());

上面的代碼等同於以前的:

List<String> studentNames=new ArrayList<>();
for(Student student:students){
    studentNames.add(student.getName());
}

再比如:將List中所有字母轉換為大寫:

List<String> words=Arrays.asList("a","b","c");
List<String> upperWords=words.stream().map(String::toUpperCase)
                                      .collect(Collectors.toList());

flatMap顧名思義就是扁平化映射,它具體的操作是將多個stream連接成一個stream,這個操作是針對類似多維數組的,比如容器裡面包含容器等。

List<List<Integer>> ints=new ArrayList<>(Arrays.asList(Arrays.asList(1,2),
                                          Arrays.asList(3,4,5)));
List<Integer> flatInts=ints.stream().flatMap(Collection::stream).
                                       collect(Collectors.toList());

可以看到,相當於降維。

filter

filter顧名思義,就是過濾,通過測試的元素會被留下來並生成一個新的Stream

Stream<T> filter(Predicate<? super T> predicate);

同理,我們可以filter接收的參數是Predicate,也就是推斷型函數式接口,接收參數,並返回boolean值。

比如:

//獲取所有大於18歲的學生
List<Student> studentNames = students.stream().filter(s->s.getAge()>18)
                                              .collect(Collectors.toList());

distinct

distinct是去重操作,它沒有參數

  Stream<T> distinct();

sorted

sorted排序操作,默認是從小到大排列,sorted方法包含一個重載,使用sorted方法,如果沒有傳遞參數,那麼流中的元素就需要實現Comparable<T>方法,也可以在使用sorted方法的時候傳入一個Comparator<T>

Stream<T> sorted(Comparator<? super T> comparator);

Stream<T> sorted();

值得一說的是這個ComparatorJava 8之後被打上了@FunctionalInterface,其他方法都提供了default實現,因此我們可以在sort中使用Lambda表達式

例如:

//以年齡排序
students.stream().sorted((s,o)->Integer.compare(s.getAge(),o.getAge()))
                                  .forEach(System.out::println);;

然而還有更方便的,Comparator默認也提供了實現好的方法引用,使得我們更加方便的使用:

例如上面的代碼可以改成如下:

//以年齡排序 
students.stream().sorted(Comparator.comparingInt(Student::getAge))
                            .forEach(System.out::println);;

或者:

//以姓名排序
students.stream().sorted(Comparator.comparing(Student::getName)).
                          forEach(System.out::println);

是不是更加簡潔。

peek

peek有遍歷的意思,和forEach一樣,但是它是一个中間操作。

peek接受一個消費型的函數式接口。

Stream<T> peek(Consumer<? super T> action);

例如:

//去重以後打印出來,然後再歸併為List
List<Student> sortedStudents= students.stream().distinct().peek(System.out::println).
                                                collect(Collectors.toList());

limit

limit裁剪操作,和String::subString(0,x)有點先溝通,limit接受一個long類型參數,通過limit之後的元素只會剩下min(n,size)個元素,n表示參數,size表示流中元素個數

 Stream<T> limit(long maxSize);

例如:

//只留下前6個元素並打印
students.stream().limit(6).forEach(System.out::println);

skip

skip表示跳過多少個元素,和limit比較像,不過limit是保留前面的元素,skip是保留後面的元素

Stream<T> skip(long n);

例如:

//跳過前3個元素並打印 
students.stream().skip(3).forEach(System.out::println);

終結操作

一個流處理中,有且只能有一個終結操作,通過終結操作之後,流才真正被處理,終結操作一般都返回其他的類型而不再是一個流,一般來說,終結操作都是將其轉換為一個容器。

forEach

forEach是終結操作的遍歷,操作和peek一樣,但是forEach之後就不會再返迴流

 void forEach(Consumer<? super T> action);

例如:

//遍歷打印
students.stream().forEach(System.out::println);

上面的代碼和一下代碼效果相同:

for(Student student:students){
    System.out.println(sudents);
}

toArray

toArrayList##toArray()用法差不多,包含一個重載。

默認的toArray()返回一個Object[]

也可以傳入一個IntFunction<A[]> generator指定數據類型

一般建議第二種方式。

Object[] toArray();

<A> A[] toArray(IntFunction<A[]> generator);

例如:

 Student[] studentArray = students.stream().skip(3).toArray(Student[]::new);

max/min

max/min即使找出最大或者最小的元素。max/min必須傳入一個Comparator

Optional<T> min(Comparator<? super T> comparator);

Optional<T> max(Comparator<? super T> comparator);

count

count返迴流中的元素數量

long count();

例如:

long  count = students.stream().skip(3).count();

reduce

reduce為歸納操作,主要是將流中各個元素結合起來,它需要提供一個起始值,然後按一定規則進行運算,比如相加等,它接收一個二元操作 BinaryOperator函數式接口。從某種意義上來說,sum,min,max,average都是特殊的reduce

reduce包含三個重載:

T reduce(T identity, BinaryOperator<T> accumulator);

Optional<T> reduce(BinaryOperator<T> accumulator);

 <U> U reduce(U identity,
                 BiFunction<U, ? super T, U> accumulator,
                 BinaryOperator<U> combiner);

例如:

List<Integer> integers = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
        
long count = integers.stream().reduce(0,(x,y)->x+y);

以上代碼等同於:

long count = integers.stream().reduce(Integer::sum).get();

reduce兩個參數和一個參數的區別在於有沒有提供一個起始值,

如果提供了起始值,則可以返回一個確定的值,如果沒有提供起始值,則返回Opeational防止流中沒有足夠的元素。

anyMatch allMatch noneMatch

測試是否有任意元素\所有元素\沒有元素匹配表達式

他們都接收一個推斷類型的函數式接口:Predicate

 boolean anyMatch(Predicate<? super T> predicate);

 boolean allMatch(Predicate<? super T> predicate);

 boolean noneMatch(Predicate<? super T> predicate)

例如:

 boolean test = integers.stream().anyMatch(x->x>3);

findFirst、 findAny

獲取元素,這兩個API都不接受任何參數,findFirt返迴流中第一個元素,findAny返迴流中任意一個元素。

Optional<T> findFirst();

Optional<T> findAny();

也有有人會問findAny()這麼奇怪的操作誰會用?這個API主要是為了在并行條件下想要獲取任意元素,以最大性能獲取任意元素

例如:

int foo = integers.stream().findAny().get();

collect

collect收集操作,這個API放在後面將是因為它太重要了,基本上所有的流操作最後都會使用它。

我們先看collect的定義:

 <R> R collect(Supplier<R> supplier,
                  BiConsumer<R, ? super T> accumulator,
                  BiConsumer<R, R> combiner);

<R, A> R collect(Collector<? super T, A, R> collector);

可以看到,collect包含兩個重載:

一個參數和三個參數,

三個參數我們很少使用,因為JDK提供了足夠我們使用的Collector供我們直接使用,我們可以簡單了解下這三個參數什麼意思:

  • Supplier:用於產生最後存放元素的容器的生產者
  • accumulator:將元素添加到容器中的方法
  • combiner:將分段元素全部添加到容器中的方法

前兩個元素我們都很好理解,第三個元素是幹嘛的呢?因為流提供了并行操作,因此有可能一個流被多個線程分別添加,然後再將各個子列表依次添加到最終的容器中。

↓ – – – – – – – – –

↓ — — —

↓ ———

如上圖,分而治之。

例如:

List<String> result = stream.collect(ArrayList::new, List::add, List::addAll);

接下來看只有一個參數的collect

一般來說,只有一個參數的collect,我們都直接傳入Collectors中的方法引用即可:

List<Integer> = integers.stream().collect(Collectors.toList());

Collectors中包含很多常用的轉換器。toList(),toSet()等。

Collectors中還包括一個groupBy(),他和Sql中的groupBy一樣都是分組,返回一個Map

例如:

//按學生年齡分組
Map<Integer,List<Student>> map= students.stream().
                                collect(Collectors.groupingBy(Student::getAge));

groupingBy可以接受3個參數,分別是

  1. 第一個參數:分組按照什麼分類
  2. 第二個參數:分組最後用什麼容器保存返回(當只有兩個參數是,此參數默認為HashMap
  3. 第三個參數:按照第一個參數分類后,對應的分類的結果如何收集

有時候單參數的groupingBy不滿足我們需求的時候,我們可以使用多個參數的groupingBy

例如:

//將學生以年齡分組,每組中只存學生的名字而不是對象
Map<Integer,List<String>> map =  students.stream().
  collect(Collectors.groupingBy(Student::getAge,Collectors.mapping(Student::getName,Collectors.toList())));

toList默認生成的是ArrayList,toSet默認生成的是HashSet,如果想要指定其他容器,可以如下操作:

 students.stream().collect(Collectors.toCollection(TreeSet::new));

Collectors還包含一個toMap,利用這個API我們可以將List轉換為Map

  Map<Integer,Student> map=students.stream().
                           collect(Collectors.toMap(Student::getAge,s->s));

值得注意的一點是,IntStreamLongStream,DoubleStream是沒有collect()方法的,因為對於基本數據類型,要進行裝箱,拆箱操作,SDK並沒有將它放入流中,對於基本數據類型流,我們只能將其toArray()

優雅的使用Stream

了解了Stream API,下面詳細介紹一下如果優雅的使用Steam

  • 了解流的惰性操作

    前面說到,流的中間操作是惰性的,如果一個流操作流程中只有中間操作,沒有終結操作,那麼這個流什麼都不會做,整個流程中會一直等到遇到終結操作操作才會真正的開始執行。

    例如:

    students.stream().peek(System.out::println);

    這樣的流操作只有中間操作,沒有終結操作,那麼不管流裡面包含多少元素,他都不會執行任何操作。

  • 明白流操作的順序的重要性

    Stream API中,還包括一類Short-circuiting,它能夠改變流中元素的數量,一般這類API如果是中間操作,最好寫在靠前位置:

    考慮下面兩行代碼:

    students.stream().sorted(Comparator.comparingInt(Student::getAge)).
                      peek(System.out::println).
                      limit(3).              
                      collect(Collectors.toList());
    students.stream().limit(3).
                      sorted(Comparator.comparingInt(Student::getAge)).
                      peek(System.out::println).
                      collect(Collectors.toList());

    兩段代碼所使用的API都是相同的,但是由於順序不同,帶來的結果都非常不一樣的,

    第一段代碼會先排序所有的元素,再依次打印一遍,最後獲取前三個最小的放入list中,

    第二段代碼會先截取前3個元素,在對這三個元素排序,然後遍歷打印,最後放入list中。

  • 明白Lambda的局限性

    由於Java目前只能Pass-by-value,因此對於Lambda也和有匿名類一樣的final的局限性。

    具體原因可以參考

    因此我們無法再lambda表達式中修改外部元素的值。

    同時,在Stream中,我們無法使用break提前返回。

  • 合理編排Stream的代碼格式

    由於可能在使用流式編程的時候會處理很多的業務邏輯,導致API非常長,此時最後使用換行將各個操作分離開來,使得代碼更加易讀。

    例如:

    students.stream().limit(3).
                      sorted(Comparator.comparingInt(Student::getAge)).
                      peek(System.out::println).
                      collect(Collectors.toList());

    而不是:

    students.stream().limit(3).sorted(Comparator.comparingInt(Student::getAge)).peek(System.out::println).collect(Collectors.toList());

    同時由於Lambda表達式省略了參數類型,因此對於變量,盡量使用完成的名詞,比如student而不是s,增加代碼的可讀性。

    盡量寫出敢在代碼註釋上留下你的名字的代碼!

總結

總之,Stream是Java 8 提供的簡化代碼的神器,合理使用它,能讓你的代碼更加優雅。

尊重勞動成功,轉載註明出處

參考鏈接:

《Effective Java》3th

如果覺得寫得不錯,歡迎關注微信公眾號:逸游Java ,每天不定時發布一些有關Java乾貨的文章,感謝關注

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

※評比南投搬家公司費用收費行情懶人包大公開

不甩歐盟反對 川普批准制裁俄歐天然氣管線

摘錄自2019年12月21日中央社報導

法新社報導,美國總統川普21日批准,可制裁替俄國建設天然氣管線到德國的企業。美國國會擔心,管線將讓俄國獲得影響歐洲盟邦的危險籌碼;但歐盟反對制裁,認為自己有權決定能源政策。

美國的制裁鎖定在波羅的海建造北溪天然氣2號管線(Nord Stream 2)的相關公司;造價近110億美元的這條管線,可讓俄國輸送至歐洲經濟體龍頭德國的天然氣量倍增。美國國會議員警告,管線會助長一個含敵意的俄國政府,並在歐陸緊張升溫之際,大增俄國總統蒲亭(Vladimir Putin)的影響力。

不過華府此舉已激怒莫斯科及歐盟,後者表示他們有權決定自己的能源政策。

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

※評比南投搬家公司費用收費行情懶人包大公開

戴姆勒總裁蔡澈:電動汽車續航里程應達499公里以上

日前,戴姆勒集團總裁迪特•蔡澈(Dieter Zetsche)稱,電動汽車一次充電必須能供應至少310英里(499公里)的行程,才能替代燃油汽車,成為主流。

但蔡澈在接受美國媒體採訪時稱,在短時期內讓所有消費者接受電動汽車不太可能,並稱這是一個連續的過程。首先要降低車載電池的成本。蔡澈估計該價格在170美元每千瓦時左右,而110-130美元則會使汽車很有競爭優勢。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

※評比南投搬家公司費用收費行情懶人包大公開

如何高效的學習技術

  我們相信努力學習一定會有收穫,但是方法不當,既讓人身心疲憊,也沒有切實的回報。高中時代,我的同桌是個漂亮女同學。她的物理成績很差,雖然她非常勤奮的學習,但成績總是不理想。為了鞏固純潔的同學關係,我親密無間地輔導她的物理,發現她不知道題目考什麼。我們的教科書與試題都圍繞着考試大綱展開,看到一道題,應該先想想它在考哪些定理和公式的運用。
  不少朋友每天都閱讀技術文章,但是第二天就忘乾淨了。工作中領導和同事都認可你的溝通和技術能力,但是跳槽面試卻屢屢碰壁。面試官問技術方案,明明心裏清楚,用嘴說出來卻前言不搭后語。面試官再問底層算法,你說看過但是忘記了。他不在乎你看沒看過,答不上就是零分。正如男女相親,男方談吐瀟洒才能吸引姑娘。可是男方緊張了,平時挺能說,關鍵時候卻支支吾吾,姑娘必然認為他不行。人生充滿了許多考試,有形的和無形的,每次考試的機會只有一次。
  工作五年十年後,別人成了架構師,自己還在基層打滾,原因是什麼?職場上無法成功升遷的原因有很多,沒有持續學習、學習效果不好、無法通過心儀公司的的面試,一定是很重要的原因。
  把自己當成一台計算機,既有輸入,也要有輸出,用輸出倒逼輸入

  近些年誕生了許多新技術,比如最時髦的AI(目前還在智障階段),數學基礎是初中就接觸過的概率統計。萬丈高樓從地起,不要被新工具或者中間件迷住雙眼,一味地追新求快。基礎知識是所有技術的基石,在未來很長的時間都不會變化,應該花費足夠的時間鞏固基礎。
  以數據結構和算法為例,大家閱讀一下Java的BitSet的源碼,裏面有大量的移位操作,移位運算掌握的好,看這份源碼就沒問題。Java同步工具類AQS用到了雙向鏈表,鏈表知識不過關,肯定搞不懂它的原理。互聯網大廠都喜歡考算法,為了通過面試也要精通算法。
  以Java工程師應該掌握的知識為例,按重要程度排出六個梯度:

  • 第一梯度:計算機組成原理、數據結構和算法、網絡通信原理、操作系統原理;
  • 第二梯度:Java基礎、JVM內存模型和GC算法、JVM性能調優、JDK工具、設計模式;
  • 第三梯度:Spring系列、Mybatis、Dubbo等主流框架的運用和原理;
  • 第四梯度:MySQL(含SQL編程)、Redis、RabbitMQ/RocketMQ/Kafka、ZooKeeper等數據庫或者中間件的運用和原理;
  • 第五梯度:CAP理論、BASE理論、Paxos和Raft算法等其他分佈式理論;
  • 第六梯度:容器化、大數據、AI、區塊鏈等等前沿技術理論;

有同學認為第五梯度應該在移到第一梯度。其實很多小公司的日活犹如古天樂一樣平平無奇,離大型分佈式架構還遠得很。學習框架和中間件的時候,順手掌握分佈式理論,效果更好。

  許多公司的招聘JD沒有設定技術人員年齡門檻,但是會加上一句“具備與年齡相當的知識的廣度與深度”。多廣才算廣,多深才算深?這是很主觀的話題,這裏不展開討論。
  如何變得更廣更深呢?突破收入上升的瓶頸,發掘自己真正的興趣
  大多數人只是公司的普通職員,收入上升的瓶頸就是升職加薪。許多IT公司會對技術人員有個評級,如果你的評級不高,那就依照晉級章程努力升級。如果你在一個小公司,收入一般,發展前景不明,準備大廠的面試就是最好的學習過程。在這些過程中,你必然學習更多知識,變得更廣更深。
  個人興趣是前進的動力之一,許多知名開源項目都源於作者的興趣。個人興趣並不局限技術領域,可以是其他學科。我有個朋友喜歡玩山地自行車,還給一些做自行車話題的自媒體投稿。久而久之,居然能夠寫一手好文章了,我相信他也能寫好技術文檔。

  哲學不是故作高深的學科,它的現實意義就是解決問題。年輕小伙是怎麼泡妞的?三天兩頭花不斷,大庭廣眾跪求愛。這類套路為什麼總是能成功呢?禮物滿足女人的物慾,當眾求愛滿足女人的虛榮心,投其所好。食堂大媽打菜的手越來越抖,辣子雞丁變成辣子辣丁,為什麼呢?食堂要控製成本,直接提價會惹眾怒。
  科學上的哲學,一般指研究事物發展的規律,歸納終極的解決方案。軟件行業充滿哲學味道的作品非常多,比如。舉個例子,當軟件系統遇到性能問題,嘗試下面兩種哲學思想提升性能:

  • 空間換時間:比如引入緩存,消耗額外的存儲提高響應速度。
  • 時間換空間:比如大文件的分片處理,分段處理后再匯總結果。

設計穩健高可用的系統,嘗試從三個方面考慮問題:

  • 存儲:數據會丟失嗎,數據一致性怎麼解決。
  • 計算:計算怎麼擴容,應用允許任意增加節點嗎。
  • 傳輸:網絡中斷或擁塞怎麼辦。

從無數的失敗或者成功的經驗中,總結出高度概括性的方案,讓我們下一步做的更好。

  英語是極為重要的基礎,學好英語與掌握編程語言一樣重要。且不說外企對英語的要求,許多知名博客就是把英文翻譯成中文,充當知識的搬運工。如果英語足夠好,直接閱讀一手英語資料,避免他人翻譯存在的謬誤。

  體系化的知識比零散的更容易記憶和理解,這正如一部好的電視劇,劇情環環相扣才能吸引觀眾。建議大家使用思維導圖羅列知識點,構建體繫結構,如下圖所示:

  高中是我們知識的巔峰時刻,每周小考每月大考,教輔資料堆成山,地獄式的反覆操練強化記憶。複習是對抗遺忘的唯一辦法。大腦的遺忘是有規律的,先快后慢。一天後,學到的知識只剩下原來的25%,甚至更低。隨着時間的推移,遺忘的速度減慢,遺忘的數量也就減少。

時間間隔 記憶量
剛看完 100%
20分鐘后 60%
1小時后 40%
1天後 30%
2天後 27%

每個人的遺忘程度都不一樣,建議第二天複習前一天的內容,七天後複習這段時間的所有內容。

  不少朋友利用碎片時間學習,比如在公交上看公眾號的推送。其實我們都高估了自己的抗干擾能力,如果處在嘈雜的環境,注意力容易被打斷,記憶留存度也很低。碎片時間適合學習簡單孤立的知識點,比如鏈表的定義與實現。
  學習複雜的知識,需要大段的連續時間。圖書館是個好地方,安靜氛圍好。手機放一邊,不要理會QQ微信,最好閱讀紙質書,泡上一整天。有些城市出現了付費自習室,提供格子間、茶水等等,也是非常好的選擇。

  從下面這張圖我們可以看到,教授他人是知識留存率最高的方式。

  準備PPT和演講內容,給同事來一場技術分享。不光複習知識,還鍛煉口才。曾經有個同事說話又快又急,口頭禪也多,比如”對吧、是不是”,別人經常聽不清,但是他本人不以為然。領導讓他做了幾次技術分享,聽眾的反應可想而知,他才徹底認清缺點。
  堅持寫技術博客,別在意你寫的東西在網上已經重複千百遍。當自己動手的時候,才會意識到眼高手低。讓文章讀起來流暢清晰,需要嘔心瀝血的刪改。寫作是對大腦的長期考驗,想不到肯定寫不出,想不清楚肯定寫不清楚。

我們經常說不要重複造輪子。為了開發效率,可以不造輪子,但是必須具備造輪子的能力。建議造一個簡單的MQ,你能用到通信協議、設計模式、隊列等許多知識。在造輪子的過程中,你會頻繁的翻閱各種手冊或者博客,這就是用輸出倒逼輸入

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

※評比南投搬家公司費用收費行情懶人包大公開

詳解Kafka Producer

上一篇文章我們主要介紹了什麼是 Kafka,Kafka 的基本概念是什麼,Kafka 單機和集群版的搭建,以及對基本的配置文件進行了大致的介紹,還對 Kafka 的幾個主要角色進行了描述,我們知道,不管是把 Kafka 用作消息隊列、消息總線還是數據存儲平台來使用,最終是繞不過消息這個詞的,這也是 Kafka 最最核心的內容,Kafka 的消息從哪裡來?到哪裡去?都干什麼了?別著急,一步一步來,先說說 Kafka 的消息從哪來。

生產者概述

在 Kafka 中,我們把產生消息的那一方稱為生產者,比如我們經常回去淘寶購物,你打開淘寶的那一刻,你的登陸信息,登陸次數都會作為消息傳輸到 Kafka 後台,當你瀏覽購物的時候,你的瀏覽信息,你的搜索指數,你的購物愛好都會作為一個個消息傳遞給 Kafka 後台,然後淘寶會根據你的愛好做智能推薦,致使你的錢包從來都禁不住誘惑,那麼這些生產者產生的消息是怎麼傳到 Kafka 應用程序的呢?發送過程是怎麼樣的呢?

儘管消息的產生非常簡單,但是消息的發送過程還是比較複雜的,如圖

我們從創建一個ProducerRecord 對象開始,ProducerRecord 是 Kafka 中的一個核心類,它代表了一組 Kafka 需要發送的 key/value 鍵值對,它由記錄要發送到的主題名稱(Topic Name),可選的分區號(Partition Number)以及可選的鍵值對構成。

在發送 ProducerRecord 時,我們需要將鍵值對對象由序列化器轉換為字節數組,這樣它們才能夠在網絡上傳輸。然後消息到達了分區器。

如果發送過程中指定了有效的分區號,那麼在發送記錄時將使用該分區。如果發送過程中未指定分區,則將使用key 的 hash 函數映射指定一個分區。如果發送的過程中既沒有分區號也沒有,則將以循環的方式分配一個分區。選好分區后,生產者就知道向哪個主題和分區發送數據了。

ProducerRecord 還有關聯的時間戳,如果用戶沒有提供時間戳,那麼生產者將會在記錄中使用當前的時間作為時間戳。Kafka 最終使用的時間戳取決於 topic 主題配置的時間戳類型。

  • 如果將主題配置為使用 CreateTime,則生產者記錄中的時間戳將由 broker 使用。
  • 如果將主題配置為使用LogAppendTime,則生產者記錄中的時間戳在將消息添加到其日誌中時,將由 broker 重寫。

然後,這條消息被存放在一個記錄批次里,這個批次里的所有消息會被發送到相同的主題和分區上。由一個獨立的線程負責把它們發到 Kafka Broker 上。

Kafka Broker 在收到消息時會返回一個響應,如果寫入成功,會返回一個 RecordMetaData 對象,它包含了主題和分區信息,以及記錄在分區里的偏移量,上面兩種的時間戳類型也會返回給用戶。如果寫入失敗,會返回一個錯誤。生產者在收到錯誤之後會嘗試重新發送消息,幾次之後如果還是失敗的話,就返回錯誤消息。

創建 Kafka 生產者

要往 Kafka 寫入消息,首先需要創建一個生產者對象,並設置一些屬性。Kafka 生產者有3個必選的屬性

  • bootstrap.servers

該屬性指定 broker 的地址清單,地址的格式為 host:port。清單里不需要包含所有的 broker 地址,生產者會從給定的 broker 里查找到其他的 broker 信息。不過建議至少要提供兩個 broker 信息,一旦其中一個宕機,生產者仍然能夠連接到集群上。

  • key.serializer

broker 需要接收到序列化之後的 key/value值,所以生產者發送的消息需要經過序列化之後才傳遞給 Kafka Broker。生產者需要知道採用何種方式把 Java 對象轉換為字節數組。key.serializer 必須被設置為一個實現了org.apache.kafka.common.serialization.Serializer 接口的類,生產者會使用這個類把鍵對象序列化為字節數組。這裏拓展一下 Serializer 類

Serializer 是一個接口,它表示類將會採用何種方式序列化,它的作用是把對象轉換為字節,實現了 Serializer 接口的類主要有 ByteArraySerializerStringSerializerIntegerSerializer ,其中 ByteArraySerialize 是 Kafka 默認使用的序列化器,其他的序列化器還有很多,你可以通過 查看其他序列化器。要注意的一點:key.serializer 是必須要設置的,即使你打算只發送值的內容

  • value.serializer

與 key.serializer 一樣,value.serializer 指定的類會將值序列化。

下面代碼演示了如何創建一個 Kafka 生產者,這裏只指定了必要的屬性,其他使用默認的配置

private Properties properties = new Properties();
properties.put("bootstrap.servers","broker1:9092,broker2:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties = new KafkaProducer<String,String>(properties);

來解釋一下這段代碼

  • 首先創建了一個 Properties 對象
  • 使用 StringSerializer 序列化器序列化 key / value 鍵值對
  • 在這裏我們創建了一個新的生產者對象,併為鍵值設置了恰當的類型,然後把 Properties 對象傳遞給他。

實例化生產者對象后,接下來就可以開始發送消息了,發送消息主要由下面幾種方式

直接發送,不考慮結果

使用這種發送方式,不會關心消息是否到達,會丟失一些消息,因為 Kafka 是高可用的,生產者會自動嘗試重發,這種發送方式和 UDP 運輸層協議很相似。

同步發送

同步發送仍然使用 send() 方法發送消息,它會返回一個 Future 對象,調用 get() 方法進行等待,就可以知道消息時候否發送成功。

異步發送

異步發送指的是我們調用 send() 方法,並制定一個回調函數,服務器在返迴響應時調用該函數。

下一節我們會重新討論這三種實現。

向 Kafka 發送消息

簡單消息發送

Kafka 最簡單的消息發送如下:

ProducerRecord<String,String> record =
                new ProducerRecord<String, String>("CustomerCountry","West","France");

producer.send(record);

代碼中生產者(producer)的 send() 方法需要把 ProducerRecord 的對象作為參數進行發送,ProducerRecord 有很多構造函數,這個我們下面討論,這裏調用的是

public ProducerRecord(String topic, K key, V value) {}

這個構造函數,需要傳遞的是 topic主題,key 和 value。

把對應的參數傳遞完成后,生產者調用 send() 方法發送消息(ProducerRecord對象)。我們可以從生產者的架構圖中看出,消息是先被寫入分區中的緩衝區中,然後分批次發送給 Kafka Broker。

發送成功后,send() 方法會返回一個 Future(java.util.concurrent) 對象,Future 對象的類型是 RecordMetadata 類型,我們上面這段代碼沒有考慮返回值,所以沒有生成對應的 Future 對象,所以沒有辦法知道消息是否發送成功。如果不是很重要的信息或者對結果不會產生影響的信息,可以使用這種方式進行發送。

我們可以忽略發送消息時可能發生的錯誤或者在服務器端可能發生的錯誤,但在消息發送之前,生產者還可能發生其他的異常。這些異常有可能是 SerializationException(序列化失敗)BufferedExhaustedException 或 TimeoutException(說明緩衝區已滿),又或是 InterruptedException(說明發送線程被中斷)

同步發送消息

第二種消息發送機制如下所示

ProducerRecord<String,String> record =
                new ProducerRecord<String, String>("CustomerCountry","West","France");

try{
  RecordMetadata recordMetadata = producer.send(record).get();
}catch(Exception e){
  e.printStackTrace();
}

這種發送消息的方式較上面的發送方式有了改進,首先調用 send() 方法,然後再調用 get() 方法等待 Kafka 響應。如果服務器返回錯誤,get() 方法會拋出異常,如果沒有發生錯誤,我們會得到 RecordMetadata 對象,可以用它來查看消息記錄。

生產者(KafkaProducer)在發送的過程中會出現兩類錯誤:其中一類是重試錯誤,這類錯誤可以通過重發消息來解決。比如連接的錯誤,可以通過再次建立連接來解決;無錯誤則可以通過重新為分區選舉首領來解決。KafkaProducer 被配置為自動重試,如果多次重試后仍無法解決問題,則會拋出重試異常。另一類錯誤是無法通過重試來解決的,比如消息過大對於這類錯誤,KafkaProducer 不會進行重試,直接拋出異常。

異步發送消息

同步發送消息都有個問題,那就是同一時間只能有一個消息在發送,這會造成許多消息無法直接發送,造成消息滯后,無法發揮效益最大化。

比如消息在應用程序和 Kafka 集群之間一個來回需要 10ms。如果發送完每個消息后都等待響應的話,那麼發送100個消息需要 1 秒,但是如果是異步方式的話,發送 100 條消息所需要的時間就會少很多很多。大多數時候,雖然Kafka 會返回 RecordMetadata 消息,但是我們並不需要等待響應。

為了在異步發送消息的同時能夠對異常情況進行處理,生產者提供了回掉支持。下面是回調的一個例子

ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("CustomerCountry", "Huston", "America");
        producer.send(producerRecord,new DemoProducerCallBack());


class DemoProducerCallBack implements Callback {

  public void onCompletion(RecordMetadata metadata, Exception exception) {
    if(exception != null){
      exception.printStackTrace();;
    }
  }
}

首先實現回調需要定義一個實現了org.apache.kafka.clients.producer.Callback的類,這個接口只有一個 onCompletion方法。如果 kafka 返回一個錯誤,onCompletion 方法會拋出一個非空(non null)異常,這裏我們只是簡單的把它打印出來,如果是生產環境需要更詳細的處理,然後在 send() 方法發送的時候傳遞一個 Callback 回調的對象。

生產者分區機制

Kafka 對於數據的讀寫是以分區為粒度的,分區可以分佈在多個主機(Broker)中,這樣每個節點能夠實現獨立的數據寫入和讀取,並且能夠通過增加新的節點來增加 Kafka 集群的吞吐量,通過分區部署在多個 Broker 來實現負載均衡的效果。

上面我們介紹了生產者的發送方式有三種:不管結果如何直接發送發送並返回結果發送並回調。由於消息是存在主題(topic)的分區(partition)中的,所以當 Producer 生產者發送產生一條消息發給 topic 的時候,你如何判斷這條消息會存在哪個分區中呢?

這其實就設計到 Kafka 的分區機制了。

分區策略

Kafka 的分區策略指的就是將生產者發送到哪個分區的算法。Kafka 為我們提供了默認的分區策略,同時它也支持你自定義分區策略。

如果要自定義分區策略的話,你需要显示配置生產者端的參數 Partitioner.class,我們可以看一下這個類它位於 org.apache.kafka.clients.producer 包下

public interface Partitioner extends Configurable, Closeable {
  
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

  public void close();
  
  default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {}
}

Partitioner 類有三個方法,分別來解釋一下

  • partition(): 這個類有幾個參數: topic,表示需要傳遞的主題;key 表示消息中的鍵值;keyBytes表示分區中序列化過後的key,byte數組的形式傳遞;value 表示消息的 value 值;valueBytes 表示分區中序列化后的值數組;cluster表示當前集群的原數據。Kafka 給你這麼多信息,就是希望讓你能夠充分地利用這些信息對消息進行分區,計算出它要被發送到哪個分區中。
  • close() : 繼承了 Closeable 接口能夠實現 close() 方法,在分區關閉時調用。
  • onNewBatch(): 表示通知分區程序用來創建新的批次

其中與分區策略息息相關的就是 partition() 方法了,分區策略有下面這幾種

順序輪訓

順序分配,消息是均勻的分配給每個 partition,即每個分區存儲一次消息。就像下面這樣

上圖表示的就是輪訓策略,輪訓策略是 Kafka Producer 提供的默認策略,如果你不使用指定的輪訓策略的話,Kafka 默認會使用順序輪訓策略的方式。

隨機輪訓

隨機輪訓簡而言之就是隨機的向 partition 中保存消息,如下圖所示

實現隨機分配的代碼只需要兩行,如下

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());

先計算出該主題總的分區數,然後隨機地返回一個小於它的正整數。

本質上看隨機策略也是力求將數據均勻地打散到各個分區,但從實際表現來看,它要遜於輪詢策略,所以如果追求數據的均勻分佈,還是使用輪詢策略比較好。事實上,隨機策略是老版本生產者使用的分區策略,在新版本中已經改為輪詢了。

按照 key 進行消息保存

這個策略也叫做 key-ordering 策略,Kafka 中每條消息都會有自己的key,一旦消息被定義了 Key,那麼你就可以保證同一個 Key 的所有消息都進入到相同的分區裏面,由於每個分區下的消息處理都是有順序的,故這個策略被稱為按消息鍵保序策略,如下圖所示

實現這個策略的 partition 方法同樣簡單,只需要下面兩行代碼即可:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();

上面這幾種分區策略都是比較基礎的策略,除此之外,你還可以自定義分區策略。

生產者壓縮機制

壓縮一詞簡單來講就是一種互換思想,它是一種經典的用 CPU 時間去換磁盤空間或者 I/O 傳輸量的思想,希望以較小的 CPU 開銷帶來更少的磁盤佔用或更少的網絡 I/O 傳輸。如果你還不了解的話我希望你先讀完這篇文章 ,然後你就明白壓縮是怎麼回事了。

Kafka 壓縮是什麼

Kafka 的消息分為兩層:消息集合 和 消息。一個消息集合中包含若干條日誌項,而日誌項才是真正封裝消息的地方。Kafka 底層的消息日誌由一系列消息集合日誌項組成。Kafka 通常不會直接操作具體的一條條消息,它總是在消息集合這個層面上進行寫入操作。

在 Kafka 中,壓縮會發生在兩個地方:Kafka Producer 和 Kafka Consumer,為什麼啟用壓縮?說白了就是消息太大,需要變小一點 來使消息發的更快一些。

Kafka Producer 中使用 compression.type 來開啟壓縮

private Properties properties = new Properties();
properties.put("bootstrap.servers","192.168.1.9:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("compression.type", "gzip");

Producer<String,String> producer = new KafkaProducer<String, String>(properties);

ProducerRecord<String,String> record =
  new ProducerRecord<String, String>("CustomerCountry","Precision Products","France");

上面代碼錶明該 Producer 的壓縮算法使用的是 GZIP

有壓縮必有解壓縮,Producer 使用壓縮算法壓縮消息后併發送給服務器后,由 Consumer 消費者進行解壓縮,因為採用的何種壓縮算法是隨着 key、value 一起發送過去的,所以消費者知道採用何種壓縮算法。

Kafka 重要參數配置

在上一篇文章 中,我們主要介紹了一下 kafka 集群搭建的參數,本篇文章我們來介紹一下 Kafka 生產者重要的配置,生產者有很多可配置的參數,在文檔里(

key.serializer

用於 key 鍵的序列化,它實現了 org.apache.kafka.common.serialization.Serializer 接口

value.serializer

用於 value 值的序列化,實現了 org.apache.kafka.common.serialization.Serializer 接口

acks

acks 參數指定了要有多少個分區副本接收消息,生產者才認為消息是寫入成功的。此參數對消息丟失的影響較大

  • 如果 acks = 0,就表示生產者也不知道自己產生的消息是否被服務器接收了,它才知道它寫成功了。如果發送的途中產生了錯誤,生產者也不知道,它也比較懵逼,因為沒有返回任何消息。這就類似於 UDP 的運輸層協議,只管發,服務器接受不接受它也不關心。
  • 如果 acks = 1,只要集群的 Leader 接收到消息,就會給生產者返回一條消息,告訴它寫入成功。如果發送途中造成了網絡異常或者 Leader 還沒選舉出來等其他情況導致消息寫入失敗,生產者會受到錯誤消息,這時候生產者往往會再次重發數據。因為消息的發送也分為 同步異步,Kafka 為了保證消息的高效傳輸會決定是同步發送還是異步發送。如果讓客戶端等待服務器的響應(通過調用 Future 中的 get() 方法),顯然會增加延遲,如果客戶端使用回調,就會解決這個問題。
  • 如果 acks = all,這種情況下是只有當所有參与複製的節點都收到消息時,生產者才會接收到一個來自服務器的消息。不過,它的延遲比 acks =1 時更高,因為我們要等待不只一個服務器節點接收消息。

buffer.memory

此參數用來設置生產者內存緩衝區的大小,生產者用它緩衝要發送到服務器的消息。如果應用程序發送消息的速度超過發送到服務器的速度,會導致生產者空間不足。這個時候,send() 方法調用要麼被阻塞,要麼拋出異常,具體取決於 block.on.buffer.null 參數的設置。

compression.type

此參數來表示生產者啟用何種壓縮算法,默認情況下,消息發送時不會被壓縮。該參數可以設置為 snappy、gzip 和 lz4,它指定了消息發送給 broker 之前使用哪一種壓縮算法進行壓縮。下面是各壓縮算法的對比

retries

生產者從服務器收到的錯誤有可能是臨時性的錯誤(比如分區找不到首領),在這種情況下,reteis 參數的值決定了生產者可以重發的消息次數,如果達到這個次數,生產者會放棄重試並返回錯誤。默認情況下,生產者在每次重試之間等待 100ms,這個等待參數可以通過 retry.backoff.ms 進行修改。

batch.size

當有多個消息需要被發送到同一個分區時,生產者會把它們放在同一個批次里。該參數指定了一個批次可以使用的內存大小,按照字節數計算。當批次被填滿,批次里的所有消息會被發送出去。不過生產者井不一定都會等到批次被填滿才發送,任意條數的消息都可能被發送。

client.id

此參數可以是任意的字符串,服務器會用它來識別消息的來源,一般配置在日誌里

max.in.flight.requests.per.connection

此參數指定了生產者在收到服務器響應之前可以發送多少消息,它的值越高,就會佔用越多的內存,不過也會提高吞吐量。把它設為1 可以保證消息是按照發送的順序寫入服務器。

timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms

request.timeout.ms 指定了生產者在發送數據時等待服務器返回的響應時間,metadata.fetch.timeout.ms 指定了生產者在獲取元數據(比如目標分區的首領是誰)時等待服務器返迴響應的時間。如果等待時間超時,生產者要麼重試發送數據,要麼返回一個錯誤。timeout.ms 指定了 broker 等待同步副本返回消息確認的時間,與 asks 的配置相匹配—-如果在指定時間內沒有收到同步副本的確認,那麼 broker 就會返回一個錯誤。

max.block.ms

此參數指定了在調用 send() 方法或使用 partitionFor() 方法獲取元數據時生產者的阻塞時間當生產者的發送緩衝區已捕,或者沒有可用的元數據時,這些方法就會阻塞。在阻塞時間達到 max.block.ms 時,生產者會拋出超時異常。

max.request.size

該參數用於控制生產者發送的請求大小。它可以指能發送的單個消息的最大值,也可以指單個請求里所有消息的總大小。

receive.buffer.bytes 和 send.buffer.bytes

Kafka 是基於 TCP 實現的,為了保證可靠的消息傳輸,這兩個參數分別指定了 TCP Socket 接收和發送數據包的緩衝區的大小。如果它們被設置為 -1,就使用操作系統的默認值。如果生產者或消費者與 broker 處於不同的數據中心,那麼可以適當增大這些值。

文章參考:

《Kafka 權威指南》

極客時間 -《Kafka 核心技術與實戰》

Kafka 源碼

關注公眾號獲取更多優質电子書,關注一下你就知道資源是有多好了

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

※評比南投搬家公司費用收費行情懶人包大公開

敲開通往架構師的門

最近學習了一些關於架構設計的知識想分享給大家。俗話說得好,不想當架構師的程序員不是好廚子。那麼如何成為一名架構師呢?接下來就聊一聊我的一些想法。

什麼是架構師

之前有同學問我,做了幾年技術,應該轉管理還是轉架構師?對於這位同學,我給他的答案是,你要先踏踏實實做好現在的工作。因為就他提的問題來看,應該是剛入行不久或者是在校學生。

專心做技術的,都想做架構師。但架構師並不是說技術做時間長了可以轉的。隨着你的知識深度和廣度的增加,在工作中會扮演更重要的角色,承擔更大的責任,最終自然而然就會接觸到架構設計的工作。

而架構師的主要工作,其實是利用架構設計知識以及豐富的工作經驗,在設計架構時,結合實際情況,在不同的選項中做出取捨。

架構設計的真正目的?

為什麼要進行架構設計?因為架構設計很重要?可是為什麼重要呢?似乎說不清楚。

因為可以提升開發效率嗎?也不一定,因為只有簡單的設計才會使開發效率更高。而架構設計出於多方面考慮,不得已會引入一些複雜度,因此架構設計並不一定能提升開發效率。

是為了大多數口中的“高可用”、“高性能”、“可擴展”嗎?其實也不是。我們的系統可能並不一定需要這些。

那架構設計的真正目的是什麼呢?我認為架構設計的真正目的是與系統複雜度做鬥爭。

系統複雜度的來源有:高性能、高可用、可擴展性、低成本、安全、規模

前面我們聊到有些系統可能不需要高可用、高性能。有些同學可能不理解,這些難道不是軟件開發最基本的要求嗎?這樣的說法是存在一定偏差的。我們舉一個簡單的例子說明一下。

如果讓你為一所學校設計一個學生信息管理系統。針對上述幾個複雜度的來源,你會做出怎樣的取捨?我們來逐條分析一下。

首先是高性能,學校的學生最多也就幾萬人,而且平時也不可能幾萬人同時用系統。因此我們並不需要考慮高性能。數據的CRUD直接用關係型數據庫就足夠了。

然後是高可用,對於學生系統而言,即使宕機幾個小時,影響也不會太大。不過數據的可靠性還是要保證的,如果大量數據丟失而又沒有備份的話,數據修復將會是一項繁重的工作。所以這裏需要做一些數據高可靠的設計。

接下來是可擴展性,學生管理系統一般比較穩定,不會出現需要擴展的情況。因此我們也不太需要考慮可擴展性。

至此,我們在設計系統時習慣考慮的高可用、高性能和可擴展,在這個系統中都不需要過多關注了。我們再來看看剩下的幾個複雜度來源。

關於低成本,由於我們並不需要高可用和高性能的設計,所以幾台服務器的成本對於學校來說也不足為慮。

安全性而言,學生信息需要一定的安全保證,但也不必做到金融級安全。所以只需要做好數據庫權限管理,登錄密碼管理就足夠了。

最後是系統規模,學生管理系統往往不會很複雜。也不會迭代出許多功能。因此規模是比較固定且比較小的,不會帶來很多的複雜度。

從我們的分析中可以看出,學生管理系統是一個並不複雜的系統,我們真正需要着重考慮的就只有數據高可靠和數據安全兩方面。面對複雜的系統,我們也應該按照這個步驟來思考並設計出合理的架構。在合理的情況下,盡量減少系統的複雜度。

架構設計原則

前面我們提到,架構師的工作其實就是在多種選項中做出合理的取捨,取捨沒有對錯之分,只有是否合適一說。為了更好的做出選擇,架構設計應該遵循三個原則:合適原則、簡單原則、演化原則。下面我來一一介紹這三個原則。

合適原則

我們一直在說,架構設計中架構師要做出取捨,選擇合適的架構。之所以一直強調合適,是因為我們在架構設計過程中需要結合實際情況來考慮。

那麼脫離實際情況的設計通常是怎樣發生的呢?不知道大家在開發時有沒有遇到過這樣的需求:“我們決定做一個電商網站,就按照淘寶做一個一模一樣的吧。“這時作為開發的你一定是黑人問號臉,心裏也會萬馬奔騰。

在架構設計時也是一樣,最忌諱的就是不顧實際情況,盲目的使用業界最優的架構設計。有同學可能不太理解,使用最優設計有什麼錯呢?

這裏我們所說的實際情況就是你的業務。試想如果你的業務剛剛起步,QPS剛過百,這時,你設計的架構是能支持1000QPS還是3000QPS對於系統來說沒什麼區別。但對於開發成本來說就提升了不止3倍。而對於這樣的業務體量來說,開發團隊一般只有十幾人或幾十人這樣的規模。要讓這樣的團隊來開發的話,大概率是無法完成的。

演化原則

聊完了合適原則,我們再來聊一聊演化原則。就像北京的城市規劃一樣,它一定是先有二環,慢慢向外擴建,才逐漸有了三四五六環。而我們現在所使用的大多數軟件,也都是經過了許多版本的迭代才有了現在的功能。

對於一名合格的架構師來說,我們首先要遵循合適原則,然後再逐步演化。切不可想着一步到位,從而引起過度設計。當業務發展到一定階段時,我們不可避免的會需要對架構進行擴展、重構甚至重寫。在這一過程中,我們應該保留下好的設計,對不好的設計進行完善。就像淘寶的架構一樣,它是經歷了多次“雙十一”之後,才有了現在這樣能支撐每天上千億成交額的架構。

因此,我們在設計架構時要遵循的第二個原則就是循序漸進的演化原則,而不是追求一步到位。

簡單原則

最後再來說簡單原則。前面我們也說了,架構設計其實是在和系統的複雜度做鬥爭。為什麼要有簡單原則?我認為原因主要有兩點。

第一,複雜的架構開發成本更高。在開發資源有限的情況下,如果我們的架構設計很複雜,勢必會提升開發成本。而對於當今飛速發展的市場來說,時間就是生命。如果你設計的架構開發周期非常長,那麼公司也許就會放棄這個項目,那麼架構也就沒有存在的意義了。

第二,複雜的架構往往會帶來更多的故障。舉個栗子,電動牙刷和普通牙刷相比,壞的概率一定會高一點,電動牙刷可能出現刷頭磨損,電路問題,充電故障等等,而普通牙刷只會出現刷頭磨損的情況。也就是說,系統的組件越多,系統出現故障的概率也就越大。在此基礎上還有一個問題就是,一旦出了故障,定位問題的速度而言,簡單系統相較於複雜系統也有着很大的優勢。

至此,架構設計的三個原則我們都已經聊完了。細心的同學可能注意到了,我在詳細介紹時的順序和最開始提到的順序並不一致。這不是我不注意細節。而是我在詳細介紹時,對這三個原則的重要程度排了一個順序。這也是作為架構師的一種取捨,當三種原則無法同時滿足時,應該以哪個為重?這裏我的答案是合適>演化>簡單

關於架構設計,我已經有了一個大體的認識,不知道在讀完本文以後你是否也有同樣的感覺。如果有任何困惑,歡迎和我一起討論交流。

最後,架構師是需要有很深的技術積累的,而我在這方面做得還不夠。所以後面還是要以技術積累為主,同時也會嘗試將架構設計的知識引入到日常工作中。後續有什麼新的體會我會繼續和大家分享。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

※評比南投搬家公司費用收費行情懶人包大公開