很多朋友對于mapreduce的shuffle過程和mapreduce的shuffle作用不太懂,今天就由小編來為大家分享,希望可以幫助到大家,下面一起來看看吧!
spark中的廣播變量是怎么設(shè)計和實(shí)現(xiàn)的
Spark官網(wǎng)上對廣播變量的描述:
Broadcastvariablesallowtheprogrammertokeeparead-onlyvariablecachedoneachmachineratherthanshippingacopyofitwithtasks.Theycanbeused,forexample,togiveeverynodeacopyofalargeinputdatasetinanefficientmanner.Sparkalsoattemptstodistributebroadcastvariablesusingefficientbroadcastalgorithmstoreducecommunicationcost.
大意是,使用廣播變量,每個Executor的內(nèi)存中,只駐留一份變量副本,而不是對每個task都傳輸一次大變量,省了很多的網(wǎng)絡(luò)傳輸,對性能提升具有很大幫助,而且會通過高效的廣播算法來減少傳輸代價。
使用廣播變量的場景很多,我們都知道spark一種常見的優(yōu)化方式就是小表廣播,使用mapjoin來代替reducejoin,我們通過把小的數(shù)據(jù)集廣播到各個節(jié)點(diǎn)上,節(jié)省了一次特別expensive的shuffle操作。
比如driver上有一張數(shù)據(jù)量很小的表,其他節(jié)點(diǎn)上的task都需要lookup這張表,那么driver可以先把這張表copy到這些節(jié)點(diǎn),這樣task就可以在本地查表了。
今天我們來看下Spark對廣播變量的設(shè)計和實(shí)現(xiàn)。
Spark廣播的方式:
Spark歷史上采用了兩種廣播的方式,一種是通過Http協(xié)議傳輸數(shù)據(jù),一種是通過Torrent協(xié)議來傳輸數(shù)據(jù),但是最新的spark版本中,http的方式已經(jīng)廢棄了(pr在此https://github.com/apache/spark/pull/10531),spark是在spark1.1版本中引入了TorrentBroadcast,此后就沒有更新HttpBroadcast和相關(guān)文檔了,spark2.0的時候完全可以刪除HttpBroadcast了,之后統(tǒng)一把TorrentBroadcast作為廣播變量的唯一實(shí)現(xiàn)方式。但是代碼沒有寫死,還是保留了擴(kuò)展性(BroadcastFactory作為一個trait,TorrentBroadcastFactory只是一種實(shí)現(xiàn)方式,符合依賴倒置原則,依賴抽象,不依賴具體實(shí)現(xiàn)),萬一之后想到了更牛x的實(shí)現(xiàn)方式,可以方便的加上,但是我估計一時半會應(yīng)該沒有了。
本著過時不講的原則,我們這里只說TorrentBroadcast,大家可以到這里看下動圖。
http://mg8.org/processing/bt.html
你能看到不同的數(shù)據(jù)塊是來自不同的節(jié)點(diǎn),多個節(jié)點(diǎn)一起組成一個網(wǎng)絡(luò),在你下載的同時,你也在上傳,所以說在享受別人提供的下載的同時,你也在貢獻(xiàn),最終所有人一起受益。
們看下BitTorrent協(xié)議,wiki定義:
BitTorrent協(xié)議(簡稱BT,俗稱比特洪流、BT下載)是用在對等網(wǎng)絡(luò)中文件分享的網(wǎng)絡(luò)協(xié)議程序。和點(diǎn)對點(diǎn)(point-to-point)的協(xié)議程序不同,它是用戶群對用戶群(peer-to-peer),而且用戶越多,下載同一文件的人越多,下載該檔案的速度越快。且下載后,繼續(xù)維持上傳的狀態(tài),就可以“分享”,成為其用戶端節(jié)點(diǎn)下載的種子文件(.torrent),同時上傳及下載。
具體感興趣的可以看下這個論文。
http://www.webpaas.com/usr/uploads/2015/01/52279564.pdf
關(guān)鍵的幾個點(diǎn):
下載者要下載文件內(nèi)容,需要先得到相應(yīng)的種子文件,然后使用BT客戶端軟件進(jìn)行下載。
提供下載的文件虛擬分成大小相等的塊,并把每個塊的索引信息和Hash驗(yàn)證碼寫入種子文件中
有一個Tracker負(fù)責(zé)維護(hù)元信息,所有的客戶端都可以通過Tracker找到每個快離自己最近的其他下載者
下載時,BT客戶端首先解析種子文件得到Tracker地址,然后連接Tracker服務(wù)器。Tracker服務(wù)器回應(yīng)下載者的請求,提供下載者其他下載者(包括發(fā)布者)的IP。下載者再連接其他下載者,根據(jù)種子文件,兩者分別告知對方自己已經(jīng)有的塊,然后交換對方所沒有的數(shù)據(jù)。此時不需要其他服務(wù)器參與,分散了單個線路上的數(shù)據(jù)流量,因此減輕了服務(wù)器負(fù)擔(dān)。
下載者每得到一個塊,需要算出下載塊的Hash驗(yàn)證碼與種子文件中的對比,如果一樣則說明塊正確,不一樣則需要重新下載這個塊。這種規(guī)定是為了解決下載內(nèi)容準(zhǔn)確性的問題。
針對以上的幾個點(diǎn),spark是怎么做的,我們看下:
TorrentBroadcast底層使用的是BlockManager,下載每個數(shù)據(jù)塊先要去master去獲取Block所在的位置(location)。
在把大變量寫到廣播變量的時候,通過ChunkedByteBufferOutputStream把輸入的數(shù)據(jù)分成多個小塊,zipWithIndex中,為每個小塊加一個唯一標(biāo)識,形如broadcast_broadcastId_pieceId。作為BlockId,存儲在BlockManager中。而且對每個小的數(shù)據(jù)塊加上一個校驗(yàn)碼。
BlockManagerMaster作為tracker維護(hù)所有Block塊的元信息,知道每個數(shù)據(jù)塊所在的executor和存儲級別。Broadcast變量中維護(hù)屬于自己的所有小塊的BlockId
通過value方法讀取Boradcast變量的時候,取出所有小塊的BlockId,對于每個BlockId,通過BlockManagerMaster獲取了該BlockId的位置的集合,隨機(jī)化,位置集合被打亂,優(yōu)先找同主機(jī)的地址(這樣可以走回環(huán)),然后從隨機(jī)的地址集合按順序取地址一個一個嘗試去獲取數(shù)據(jù),因?yàn)殡S機(jī)化了地址,那么executor不只會從Driver去獲取數(shù)據(jù)。分散了driver上的壓力。
取到Blockpiece后,使用校驗(yàn)碼進(jìn)行校驗(yàn),看看數(shù)據(jù)塊有沒有損壞,如果沒有損壞,然后按照順序拼在一起。
大家比較一下,流程是不是差不多,基本貫穿了BitTorrent的思想原理。
開始的時候,大家都是通過driver拿數(shù)據(jù),但是一旦其他executor上有了數(shù)據(jù)塊之后,所有的executor都是有機(jī)會通過別的executor來獲取數(shù)據(jù)塊,這樣就分散了driver的壓力。套用一句話,下載的executor越多,下載的越快。
Spark廣播變量的使用姿勢:
valarray:Array[Int]=???
valbroadcasted=sc.broadcast(array)
valrdd:RDD[Int]=???
rdd.map(i=>array.contains(i))//這種沒有使用broadcast,每次task都要傳一下數(shù)組,浪費(fèi)內(nèi)網(wǎng)帶寬
rdd.map(i=>broadcasted.value.contains(i))
上面的一個小的demo就是把一個數(shù)組通過broadcast的方式廣播出去,然后就可以在task里面使用數(shù)組變量了,這個數(shù)組變量是駐留在executor上的,不用每次調(diào)度task運(yùn)行的時候都得傳輸一次數(shù)組。
我們可以看到對于broadcast的使用,無非就是sc.broadcast定義了一個廣播變量和broadcasted.value使用廣播變量的value方法,找到真正的數(shù)組。
sparkcontext初始化的時候,sparkEnv中初始化了一個broadcastManager,初始化方法里面,現(xiàn)在默認(rèn)使用的TorrentBroadcastFactory,調(diào)用sc.broadcast方法,就會使用工廠模式創(chuàng)建一個TorrentBroadcast,這時候就會調(diào)用寫操作,把數(shù)據(jù)分成小塊寫到BlockManager中,broadcasted只是一個TorrentBroadcast類型的實(shí)例,并沒有數(shù)組數(shù)據(jù),這個實(shí)例只維護(hù)了數(shù)據(jù)的元信息,也就是一組BlockId信息,這個實(shí)例被序列化被傳到executor上,在executor上調(diào)用這個實(shí)例的value方法,才會觸發(fā)去BlockManager上讀真正的數(shù)據(jù)。
廣播變量的回收:
在調(diào)用sc.Broadcast方法中,會去ContextCleaner中注冊一下,之前講的緩存RDD的時候也要去ContextCleaner中注冊一下,兩個差不多,都是為了回收。
cleaner.foreach(_.registerBroadcastForCleanup(bc))
當(dāng)廣播變量引用為null的時候,在contextcleaner里面會回調(diào)broadcastManager.unbroadcast方法,會把Broadcast變量從BlockManager存儲中干掉。
為什么只能broadcast只讀的變量:
這就涉及一致性的問題,如果變量可以被更新,那么一旦變量被某個節(jié)點(diǎn)更新,其他節(jié)點(diǎn)要不要一塊更新?如果多個節(jié)點(diǎn)同時在更新,更新順序是什么?怎么做同步?仔細(xì)想一下,每個都很頭疼,spark目前就索性搞成了只讀的。因?yàn)榉植际綇?qiáng)一致性真的很蛋疼。
mapreduce如何實(shí)現(xiàn)計算向數(shù)據(jù)移動
MapReduce數(shù)據(jù)預(yù)處理,從數(shù)據(jù)進(jìn)入到處理程序到處理完成后輸出到存儲中,整個過程分為如下5個階段:
InputSplit或Read數(shù)據(jù)階段InputSplit,是從數(shù)據(jù)分片出發(fā),把數(shù)據(jù)輸入到處理程序中。
Read則是從處理程序出發(fā)反向來看,把數(shù)據(jù)從文件中讀取到處理程序中來。這個階段表達(dá)的是我們數(shù)據(jù)從哪里來。這是整個過程的開始。
Map階段當(dāng)數(shù)據(jù)輸入進(jìn)來以后,我們進(jìn)行的是map階段的處理。例如對一行的單詞進(jìn)行分割,然后每個單詞進(jìn)行計數(shù)為1進(jìn)行輸出。
Shuffle階段Shuffle階段是整個MapReduce的核心,介于Map階段跟Reduce階段之間。
Reduce階段數(shù)據(jù)經(jīng)過Map階段處理,數(shù)據(jù)再經(jīng)過Shuffle階段,最后到Reduce,相同的key值的數(shù)據(jù)會到同一個Reduce任務(wù)中進(jìn)行最后的匯總。
Output階段這個階段的事情就是將Reduce階段計算好的結(jié)果,存儲到某個地方去,這是整個過程的結(jié)束。
sparksql為什么比hive處理速度快
Spark為什么快呢?
消除了冗余的HDFS讀寫
Hadoop每次shuffle操作后,必須寫到磁盤,而Spark在shuffle后不一定落盤,可以cache到內(nèi)存中,以便迭代時使用。如果操作復(fù)雜,很多的shufle操作,那么Hadoop的讀寫IO時間會大大增加。、
消除了冗余的MapReduce階段
Hadoop的shuffle操作一定連著完整的MapReduce操作,冗余繁瑣。而Spark基于RDD提供了豐富的算子操作,且action操作產(chǎn)生shuffle數(shù)據(jù),可以緩存在內(nèi)存中。
JVM的優(yōu)化
Hadoop每次MapReduce操作,啟動一個Task便會啟動一次JVM,基于進(jìn)程的操作。而Spark每次MapReduce操作是基于線程的,只在啟動Executor時啟動一次JVM,內(nèi)存的Task操作是在線程復(fù)用的。
每次啟動JVM的時間可能就需要幾秒甚至十幾秒,那么當(dāng)Task多了,這個時間Hadoop不知道比Spark慢了多少。
總結(jié):Spark比Mapreduce運(yùn)行更快,主要得益于其對mapreduce操作的優(yōu)化以及對JVM使用的優(yōu)化。
impala為什么比hive快
Impala自稱數(shù)據(jù)查詢效率比Hive快幾倍甚至數(shù)十倍,它之所以這么快的原因大致有以下幾點(diǎn):
真正的MPP查詢引擎。
使用C++開發(fā)而不是Java,降低運(yùn)行負(fù)荷。
運(yùn)行時代碼生成(LLVMIR),提高效率。
全新的執(zhí)行引擎(不是Mapreduce)。
在執(zhí)行SQL語句的時候,Impala不會把中間數(shù)據(jù)寫入到磁盤,而是在內(nèi)存中完成了所有的處理。
使用Impala的時候,查詢?nèi)蝿?wù)會馬上執(zhí)行而不是生產(chǎn)Mapreduce任務(wù),這會節(jié)約大量的初始化時間。
Impala查詢計劃解析器使用更智能的算法在多節(jié)點(diǎn)上分布式執(zhí)行各個查詢步驟,同時避免了sorting和shuffle這兩個非常耗時的階段,這兩個階段往往是不需要的。
Impala擁有HDFS上面各個datablock的信息,當(dāng)它處理查詢的時候能夠在各個datanode上面更均衡的分發(fā)查詢。
另外一個關(guān)鍵原因是,Impala為每個查詢產(chǎn)生匯編級的代碼,當(dāng)Impala在本地內(nèi)存中運(yùn)行的時候,這些匯編代碼執(zhí)行效率比其它任何代碼框架都更快,因?yàn)榇a框架會增加額外的延遲。
如何管理Spark內(nèi)存
Spark內(nèi)存管理
Spark執(zhí)行應(yīng)用程序時,Spark集群會啟動Driver和Executor兩種JVM進(jìn)程,Driver負(fù)責(zé)創(chuàng)建SparkContext上下文,提交任務(wù),task的分發(fā)等。Executor負(fù)責(zé)task的計算任務(wù),并將結(jié)果返回給Driver。同時需要為需要持久化的RDD提供儲存。Driver端的內(nèi)存管理比較簡單,這里所說的Spark內(nèi)存管理針對Executor端的內(nèi)存管理。
Spark內(nèi)存管理分為靜態(tài)內(nèi)存管理和統(tǒng)一內(nèi)存管理,Spark1.6之前使用的是靜態(tài)內(nèi)存管理,Spark1.6之后引入了統(tǒng)一內(nèi)存管理。
靜態(tài)內(nèi)存管理中存儲內(nèi)存、執(zhí)行內(nèi)存和其他內(nèi)存的大小在Spark應(yīng)用程序運(yùn)行期間均為固定的,但用戶可以應(yīng)用程序啟動前進(jìn)行配置。
統(tǒng)一內(nèi)存管理與靜態(tài)內(nèi)存管理的區(qū)別在于儲存內(nèi)存和執(zhí)行內(nèi)存共享同一塊空間,可以互相借用對方的空間。
Spark1.6以上版本默認(rèn)使用的是統(tǒng)一內(nèi)存管理,可以通過參數(shù)spark.memory.useLegacyMode設(shè)置為true(默認(rèn)為false)使用靜態(tài)內(nèi)存管理。
一、具體細(xì)節(jié)
1、靜態(tài)內(nèi)存管理分布圖
2、統(tǒng)一內(nèi)存管理分布圖
3、reduce中OOM如何處理?
拉取數(shù)據(jù)的時候一次都放不下,放下的話可以溢寫磁盤
1)減少每次拉取的數(shù)據(jù)量
2)提高shuffle聚合的內(nèi)存比例
3)提高Excutor的總內(nèi)存
4、Shuffle調(diào)優(yōu)
spark.shuffle.file.buffer默認(rèn)值:32k參數(shù)說明:該參數(shù)用于設(shè)置shufflewritetask的BufferedOutputStream的buffer緩沖大小。將數(shù)據(jù)寫到磁盤文件之前,會先寫入buffer緩沖中,待緩沖寫滿之后,才會溢寫到磁盤。調(diào)優(yōu)建議:如果作業(yè)可用的內(nèi)存資源較為充足的話,可以適當(dāng)增加這個參數(shù)的大小(比如64k,一定是成倍的增加),從而減少shufflewrite過程中溢寫磁盤文件的次數(shù),也就可以減少磁盤IO次數(shù),進(jìn)而提升性能。在實(shí)踐中發(fā)現(xiàn),合理調(diào)節(jié)該參數(shù),性能會有1%~5%的提升。spark.reducer.maxSizeInFlight默認(rèn)值:48m參數(shù)說明:該參數(shù)用于設(shè)置shufflereadtask的buffer緩沖大小,而這個buffer緩沖決定了每次能夠拉取多少數(shù)據(jù)。調(diào)優(yōu)建議:如果作業(yè)可用的內(nèi)存資源較為充足的話,可以適當(dāng)增加這個參數(shù)的大小(比如96m),從而減少拉取數(shù)據(jù)的次數(shù),也就可以減少網(wǎng)絡(luò)傳輸?shù)拇螖?shù),進(jìn)而提升性能。在實(shí)踐中發(fā)現(xiàn),合理調(diào)節(jié)該參數(shù),性能會有1%~5%的提升。spark.shuffle.io.maxRetries默認(rèn)值:3參數(shù)說明:shufflereadtask從shufflewritetask所在節(jié)點(diǎn)拉取屬于自己的數(shù)據(jù)時,如果因?yàn)榫W(wǎng)絡(luò)異常導(dǎo)致拉取失敗,是會自動進(jìn)行重試的。該參數(shù)就代表了可以重試的最大次數(shù)。如果在指定次數(shù)之內(nèi)拉取還是沒有成功,就可能會導(dǎo)致作業(yè)執(zhí)行失敗。調(diào)優(yōu)建議:對于那些包含了特別耗時的shuffle操作的作業(yè),建議增加重試最大次數(shù)(比如60次),以避免由于JVM的fullgc或者網(wǎng)絡(luò)不穩(wěn)定等因素導(dǎo)致的數(shù)據(jù)拉取失敗。在實(shí)踐中發(fā)現(xiàn),對于針對超大數(shù)據(jù)量(數(shù)十億~上百億)的shuffle過程,調(diào)節(jié)該參數(shù)可以大幅度提升穩(wěn)定性。shufflefilenotfindtaskScheduler不負(fù)責(zé)重試task,由DAGScheduler負(fù)責(zé)重試stagespark.shuffle.io.retryWait默認(rèn)值:5s參數(shù)說明:具體解釋同上,該參數(shù)代表了每次重試?yán)?shù)據(jù)的等待間隔,默認(rèn)是5s。調(diào)優(yōu)建議:建議加大間隔時長(比如60s),以增加shuffle操作的穩(wěn)定性。spark.shuffle.memoryFraction默認(rèn)值:0.2參數(shù)說明:該參數(shù)代表了Executor內(nèi)存中,分配給shufflereadtask進(jìn)行聚合操作的內(nèi)存比例,默認(rèn)是20%。調(diào)優(yōu)建議:在資源參數(shù)調(diào)優(yōu)中講解過這個參數(shù)。如果內(nèi)存充足,而且很少使用持久化操作,建議調(diào)高這個比例,給shuffleread的聚合操作更多內(nèi)存,以避免由于內(nèi)存不足導(dǎo)致聚合過程中頻繁讀寫磁盤。在實(shí)踐中發(fā)現(xiàn),合理調(diào)節(jié)該參數(shù)可以將性能提升10%左右。spark.shuffle.manager默認(rèn)值:sort|hash參數(shù)說明:該參數(shù)用于設(shè)置ShuffleManager的類型。Spark1.5以后,有三個可選項(xiàng):hash、sort和tungsten-sort。HashShuffleManager是Spark1.2以前的默認(rèn)選項(xiàng),但是Spark1.2以及之后的版本默認(rèn)都是SortShuffleManager了。tungsten-sort與sort類似,但是使用了tungsten計劃中的堆外內(nèi)存管理機(jī)制,內(nèi)存使用效率更高。調(diào)優(yōu)建議:由于SortShuffleManager默認(rèn)會對數(shù)據(jù)進(jìn)行排序,因此如果你的業(yè)務(wù)邏輯中需要該排序機(jī)制的話,則使用默認(rèn)的SortShuffleManager就可以;而如果你的業(yè)務(wù)邏輯不需要對數(shù)據(jù)進(jìn)行排序,那么建議參考后面的幾個參數(shù)調(diào)優(yōu),通過bypass機(jī)制或優(yōu)化的HashShuffleManager來避免排序操作,同時提供較好的磁盤讀寫性能。這里要注意的是,tungsten-sort要慎用,因?yàn)橹鞍l(fā)現(xiàn)了一些相應(yīng)的bug。spark.shuffle.sort.bypassMergeThreshold默認(rèn)值:200參數(shù)說明:當(dāng)ShuffleManager為SortShuffleManager時,如果shufflereadtask的數(shù)量小于這個閾值(默認(rèn)是200),則shufflewrite過程中不會進(jìn)行排序操作,而是直接按照未經(jīng)優(yōu)化的HashShuffleManager的方式去寫數(shù)據(jù),但是最后會將每個task產(chǎn)生的所有臨時磁盤文件都合并成一個文件,并會創(chuàng)建單獨(dú)的索引文件。調(diào)優(yōu)建議:當(dāng)你使用SortShuffleManager時,如果的確不需要排序操作,那么建議將這個參數(shù)調(diào)大一些,大于shufflereadtask的數(shù)量。那么此時就會自動啟用bypass機(jī)制,map-side就不會進(jìn)行排序了,減少了排序的性能開銷。但是這種方式下,依然會產(chǎn)生大量的磁盤文件,因此shufflewrite性能有待提高。spark.shuffle.consolidateFiles默認(rèn)值:false參數(shù)說明:如果使用HashShuffleManager,該參數(shù)有效。如果設(shè)置為true,那么就會開啟consolidate機(jī)制,會大幅度合并shufflewrite的輸出文件,對于shufflereadtask數(shù)量特別多的情況下,這種方法可以極大地減少磁盤IO開銷,提升性能。調(diào)優(yōu)建議:如果的確不需要SortShuffleManager的排序機(jī)制,那么除了使用bypass機(jī)制,還可以嘗試將spark.shffle.manager參數(shù)手動指定為hash,使用HashShuffleManager,同時開啟consolidate機(jī)制。在實(shí)踐中嘗試過,發(fā)現(xiàn)其性能比開啟了bypass機(jī)制的SortShuffleManager要高出10%~30%。
5、Shuffle調(diào)優(yōu)設(shè)置
SparkShuffle調(diào)優(yōu)配置項(xiàng)如何使用?
1)在代碼中,不推薦使用,硬編碼。
newSparkConf().set(“spark.shuffle.file.buffer”,”64”)
2)在提交spark任務(wù)的時候,推薦使用。
spark-submit--confspark.shuffle.file.buffer=64–conf….
3)在conf下的spark-default.conf配置文件中,不推薦,因?yàn)槭菍懰篮笏袘?yīng)用程序都要用。
關(guān)于mapreduce的shuffle過程,mapreduce的shuffle作用的介紹到此結(jié)束,希望對大家有所幫助。