MapReduce

MapReduce

MapReduce是一種編程模型,用於大規模數據集(大於1TB)的并行運算。概念"Map(映射)"和"Reduce(化簡)",和主要思想,都是從函數式編程語言里來的,有著矢量編程語言的特性。該模型極大地方便了編程人員在不會分散式并行編程的情況下,將程序運行在分散式系統上。當前的軟體實現是指定一個Map(映射)函數,用來把一組鍵值對映射成一組新的鍵值對,指定併發的Reduce(化簡)函數,用來保證所有映射的鍵值對中的每一個共享相同的鍵組。

由來


MapReduce最早是由Google公司研究提出的一種面向大規模數據處理的并行計算模型和方法。Google公司設計MapReduce的初衷主要是為了解決其搜索引擎中大規模網頁數據的并行化處理。Google公司發明了MapReduce之後首先用其重新改寫了其搜索引擎中的Web文檔索引處理系統。但由於MapReduce可以普遍應用於很多大規模數據的計算問題,因此自發明MapReduce以後,Google公司內部進一步將其廣泛應用於很多大規模數據處理問題。Google公司內有上萬個各種不同的演演算法問題和程序都使用MapReduce進行處理。
2003年和2004年,Google公司在國際會議上分別發表了兩篇關於Google分散式文件系統和MapReduce的論文,公布了Google的GFS和MapReduce的基本原理和主要設計思想。
Hadoop的思想來源於Google的幾篇論文,Google的那篇MapReduce論文里說:Our abstraction is inspired by the map and reduce primitives present in Lisp and many other functional languages。這句話提到了MapReduce思想的淵源,大致意思是,MapReduce的靈感來源於函數式語言(比如Lisp)中的內置函數map和reduce。函數式語言也算是陽春白雪了,離普通開發者總是很遠。簡單來說,在函數式語言里,map表示對一個列表(List)中的每個元素做計算,reduce表示對一個列表中的每個元素做迭代計算。它們具體的計算是通過傳入的函數來實現的,map和reduce提供的是計算的框架。不過從這樣的解釋到現實中的MapReduce還太遠,仍然需要一個跳躍。再仔細看,reduce既然能做迭代計算,那就表示列表中的元素是相關的,比如想對列表中的所有元素做相加求和,那麼列表中至少都應該是數值吧。而map是對列表中每個元素做單獨處理的,這表示列表中可以是雜亂無章的數據。這樣看來,就有點聯繫了。在MapReduce里,Map處理的是原始數據,自然是雜亂無章的,每條數據之間互相沒有關係;到了Reduce階段,數據是以key後面跟著若干個value來組織的,這些value有相關性,至少它們都在一個key下面,於是就符合函數式語言里map和reduce的基本思想了。
這樣就可以把MapReduce理解為,把一堆雜亂無章的數據按照某種特徵歸納起來,然後處理並得到最後的結果。Map面對的是雜亂無章的互不相關的數據,它解析每個數據,從中提取出key和value,也就是提取了數據的特徵。經過MapReduce的Shuffle階段之後,在Reduce階段看到的都是已經歸納好的數據了,在此基礎上可以做進一步的處理以便得到結果。這就回到了最初,終於知道MapReduce為何要這樣設計。
2004年,開源項目Lucene(搜索索引程序庫)和Nutch(搜索引擎)的創始人Doug Cutting發現MapReduce正是其所需要的解決大規模Web數據處理的重要技術,因而模仿Google MapReduce,基於Java設計開發了一個稱為Hadoop的開源MapReduce并行計算框架和系統。自此,Hadoop成為Apache開源組織下最重要的項目,自其推出后很快得到了全球學術界和工業界的普遍關注,並得到推廣和普及應用。
MapReduce的推出給大數據并行處理帶來了巨大的革命性影響,使其已經成為事實上的大數據處理的工業標準。儘管MapReduce還有很多局限性,但人們普遍公認,MapReduce是到最為成功、最廣為接受和最易於使用的大數據并行處理技術。MapReduce的發展普及和帶來的巨大影響遠遠超出了發明者和開源社區當初的意料,以至於馬里蘭大學教授、2010年出版的《Data-Intensive Text Processing with MapReduce》一書的作者Jimmy Lin在書中提出:MapReduce改變了組織大規模計算的方式,它代表了第一個有別於馮·諾依曼結構的計算模型,是在集群規模而非單個機器上組織大規模計算的新的抽象模型上的第一個重大突破,是到所見到的最為成功的基於大規模計算資源的計算模型。

映射和化簡


簡單說來,一個映射函數就是對一些獨立元素組成的概念上的列表(例如,一個測試成績的列表)的每一個元素進行指定的操作(比如前面的例子里,有人發現所有學生的成績都被高估了一分,可以定義一個“減一”的映射函數,用來修正這個錯誤。)。事實上,每個元素都是被獨立操作的,而原始列表沒有被更改,因為這裡創建了一個新的列表來保存新的答案。這就是說,Map操作是可以高度并行的,這對高性能要求的應用以及并行計算領域的需求非常有用。
而化簡操作指的是對一個列表的元素進行適當的合併(繼續看前面的例子,如果有人想知道班級的平均分該怎麼做?可以定義一個化簡函數,通過讓列表中的元素跟自己的相鄰的元素相加的方式把列表減半,如此遞歸運算直到列表只剩下一個元素,然後用這個元素除以人數,就得到了平均分。)。雖然不如映射函數那麼并行,但是因為化簡總是有一個簡單的答案,大規模的運算相對獨立,所以化簡函數在高度并行環境下也很有用。

分佈和可靠性


MapReduce通過把對數據集的大規模操作分發給網路上的每個節點實現可靠性;每個節點會周期性的把完成的工作和狀態的更新報告回來。如果一個節點保持沉默超過一個預設的時間間隔,主節點(類同Google File System中的主伺服器)記錄下這個節點狀態為死亡,並把分配給這個節點的數據發到別的節點。每個操作使用命名文件的原子操作以確保不會發生并行線程間的衝突;當文件被改名的時候,系統可能會把其複製到任務名以外的另一個名字上去。(避免副作用)。
化簡操作工作方式很類似,但是由於化簡操作在并行能力較差,主節點會盡量把化簡操作調度在一個節點上,或者離需要操作的數據儘可能進的節點上了;這個特性可以滿足Google的需求,因為有足夠的帶寬,內部網路沒有那麼多的機器。

用途


在Google,MapReduce用在非常廣泛的應用程序中,包括“分佈grep,分佈排序,web連接圖反轉,每台機器的詞矢量,web訪問日誌分析,反向索引構建,文檔聚類,機器學習,基於統計的機器翻譯...”值得注意的是,MapReduce實現以後,它被用來重新生成Google的整個索引,並取代老的ad hoc程序去更新索引。
MapReduce會生成大量的臨時文件,為了提高效率,它利用Google文件系統來管理和訪問這些文件。

主要功能


MapReduce提供了以下的主要功能:
1)數據劃分和計算任務調度:系統自動將一個作業(Job)待處理的大數據劃分為很多個數據塊,每個數據塊對應於一個計算任務(Task),並自動調度計算節點來處理相應的數據塊。作業和任務調度功能主要負責分配和調度計算節點(Map節點或Reduce節點),同時負責監控這些節點的執行狀態,並負責Map節點執行的同步控制。
2)數據/代碼互定位:為了減少數據通信,一個基本原則是本地化數據處理,即一個計算節點儘可能處理其本地磁碟上所分佈存儲的數據,這實現了代碼向數據的遷移;當無法進行這種本地化數據處理時,再尋找其他可用節點並將數據從網路上傳送給該節點(數據向代碼遷移),但將儘可能從數據所在的本地機架上尋找可用節點以減少通信延遲。
3)系統優化:為了減少數據通信開銷,中間結果數據進入Reduce節點前會進行一定的合併處理;一個Reduce節點所處理的數據可能會來自多個Map節點,為了避免Reduce計算階段發生數據相關性,Map節點輸出的中間結果需使用一定的策略進行適當的劃分處理,保證相關性數據發送到同一個Reduce節點;此外,系統還進行一些計算性能優化處理,如對最慢的計算任務採用多備份執行、選最快完成者作為結果。
4)出錯檢測和恢復:以低端商用伺服器構成的大規模MapReduce計算集群中,節點硬體(主機、磁碟、內存等)出錯和軟體出錯是常態,因此MapReduce需要能檢測並隔離出錯節點,並調度分配新的節點接管出錯節點的計算任務。同時,系統還將維護數據存儲的可靠性,用多備份冗餘存儲機制提高數據存儲的可靠性,並能及時檢測和恢復出錯的數據。
主要技術特徵
MapReduce設計上具有以下主要的技術特徵:
1)向“外”橫向擴展,而非向“上”縱向擴展即MapReduce集群的構建完全選用價格便宜、易於擴展的低端商用伺服器,而非價格昂貴、不易擴展的高端伺服器。對於大規模數據處理,由於有大量數據存儲需要,顯而易見,基於低端伺服器的集群遠比基於高端伺服器的集群優越,這就是為什麼MapReduce并行計算集群會基於低端伺服器實現的原因。
2)失效被認為是常態MapReduce集群中使用大量的低端伺服器,因此,節點硬體失效和軟體出錯是常態,因而一個良好設計、具有高容錯性的并行計算系統不能因為節點失效而影響計算服務的質量,任何節點失效都不應當導致結果的不一致或不確定性;任何一個節點失效時,其他節點要能夠無縫接管失效節點的計算任務;當失效節點恢復后應能自動無縫加入集群,而不需要管理員人工進行系統配置。MapReduce并行計算軟體框架使用了多種有效的錯誤檢測和恢復機制,如節點自動重啟技術,使集群和計算框架具有對付節點失效的健壯性,能有效處理失效節點的檢測和恢復。
3)把處理向數據遷移傳統高性能計算系統通常有很多處理器節點與一些外存儲器節點相連,如用存儲區域網路(Storage Area,SAN Network)連接的磁碟陣列,因此,大規模數據處理時外存文件數據I/O訪問會成為一個制約系統性能的瓶頸。為了減少大規模數據并行計算系統中的數據通信開銷,代之以把數據傳送到處理節點(數據向處理器或代碼遷移),應當考慮將處理向數據靠攏和遷移。MapReduce採用了數據/代碼互定位的技術方法,計算節點將首先盡量負責計算其本地存儲的數據,以發揮數據本地化特點,僅當節點無法處理本地數據時,再採用就近原則尋找其他可用計算節點,並把數據傳送到該可用計算節點。
4)順序處理數據、避免隨機訪問數據大規模數據處理的特點決定了大量的數據記錄難以全部存放在內存,而通常只能放在外存中進行處理。由於磁碟的順序訪問要遠比隨機訪問快得多,因此MapReduce主要設計為面向順序式大規模數據的磁碟訪問處理。為了實現面向大數據集批處理的高吞吐量的并行處理,MapReduce可以利用集群中的大量數據存儲節點同時訪問數據,以此利用分佈集群中大量節點上的磁碟集合提供高帶寬的數據訪問和傳輸。
5)為應用開發者隱藏系統層細節軟體工程實踐指南中,專業程序員認為之所以寫程序困難,是因為程序員需要記住太多的編程細節(從變數名到複雜演演算法的邊界情況處理),這對大腦記憶是一個巨大的認知負擔,需要高度集中注意力;而并行程序編寫有更多困難,如需要考慮多線程中諸如同步等複雜繁瑣的細節。由於併發執行中的不可預測性,程序的調試查錯也十分困難;而且,大規模數據處理時程序員需要考慮諸如數據分佈存儲管理、數據分發、數據通信和同步、計算結果收集等諸多細節問題。MapReduce提供了一種抽象機制將程序員與系統層細節隔離開來,程序員僅需描述需要計算什麼(What to compute),而具體怎麼去計算(How to compute)就交由系統的執行框架處理,這樣程序員可從系統層細節中解放出來,而致力於其應用本身計算問題的演演算法設計。
6)平滑無縫的可擴展性這裡指出的可擴展性主要包括兩層意義上的擴展性:數據擴展和系統規模擴展性。理想的軟體演演算法應當能隨著數據規模的擴大而表現出持續的有效性,性能上的下降程度應與數據規模擴大的倍數相當;在集群規模上,要求演演算法的計算性能應能隨著節點數的增加保持接近線性程度的增長。絕大多數現有的單機演演算法都達不到以上理想的要求;把中間結果數據維護在內存中的單機演演算法在大規模數據處理時很快失效;從單機到基於大規模集群的并行計算從根本上需要完全不同的演演算法設計。奇妙的是,MapReduce在很多情形下能實現以上理想的擴展性特徵。多項研究發現,對於很多計算問題,基於MapReduce的計算性能可隨節點數目增長保持近似於線性的增長。
案例:統計詞頻
MapReduce偽代碼實現Map和Reduce兩個函數Map函數和Reduce函數是交給用戶實現的,這兩個函數定義了任務本身。
Map函數接受一個鍵值對(key-value pair),產生一組中間鍵值對。MapReduce框架會將map函數產生的中間鍵值對里鍵相同的值傳遞給一個reduce函數。ClassMappermethodmap(String input_key, String input_value):// input_key: text document name// input_value: document contentsfor eachword w ininput_value:EmitIntermediate(w, "1");
Reduce函數接受一個鍵,以及相關的一組值,將這組值進行合併產生一組規模更小的值(通常只有一個或零個值)。ClassReducermethod reduce(String output_key,Iteratorintermediate_values):// output_key: a word// output_values: a list of countsintresult = 0;for eachv inintermediate_values:result += ParseInt(v);Emit(AsString(result));

參考


Dean, Jeffrey & Ghemawat, Sanjay (2004). "MapReduce: Simplified Data Processing on Large Clusters". Retrieved Apr. 6, 2005
http://labs.google.com/papers/mapreduce.html"Our abstraction is inspired by the map and reduce primitives present in Lisp and many other functional languages." -"MapReduce: Simplified Data Processing on Large Clusters"
外部鏈接
Interpreting the Data: Parallel Analysis with Sawzall- a paper on an internal tool at Google, Sawzall, which acts as an interface to MapReduce, intended to make MapReduce much easier to use.
http://labs.google.com/papers/sawzall.html
Discussion on Lambda the Ultimate.
微塵程序員網站資料庫 -> Google研究 -> MapReduce
http://tinyfool.googlepages.com/mapreduce

經典實例


MapReduce的一個經典實例是Hadoop。用於處理大型分散式資料庫。由於Hadoop關聯到雲以及雲部署,大多數人忽略了一點,Hadoop有些屬性不適合一般企業的需求,特別是移動應用程序。下面是其中的一些特點:
● Hadoop的最大價值在於資料庫,而Hadoop所用的資料庫是移動應用程序所用資料庫的10到1000倍。對於許多人來說,使用Hadoop就是殺雞用牛刀。
● Hadoop有顯著的設置和處理開銷。 Hadoop工作可能會需要幾分鐘的時間,即使相關數據量不是很大。
● Hadoop在支持具有多維上下文數據結構方面不是很擅長。例如,一個定義給定地理變數值的記錄,然後使用垂直連接,來連續定義一個比hadoop使用的鍵值對定義更複雜的數據結構關係。
● Hadoop必須使用迭代方法處理的問題方面用處不大——尤其是幾個連續有依賴性步驟的問題。 
MapReduce (EMR),這是一項Hadoop服務。Hadoop旨在同期文件系統工作,以HDFS著稱。
當用戶用EMR創建了一個Hadoop集群,可以從AWS S3或者一些其他的數據存儲複製數據到集群上的HDFS,或者也可以直接從S3訪問數據。HDFS使用本地存儲,而且通常提供了比從S3恢復更好的性能,但是在運行Hadoop工作之前,也需要時間從S3複製數據到HDFS。如果EMR集群要運行一段時間,且針對多項工作使用相同的數據,可能值得額外的啟動時間來從S3複製數據到HDFS。

案例


統計詞頻
如果想統計下過去10年計算機論文出現最多的幾個單詞,看看人們都在研究些什麼,那收集好論文後,該怎麼辦呢?
方法一:可以寫一個小程序,把所有論文按順序遍歷一遍,統計每一個遇到的單詞的出現次數,最後就可以知道哪幾個單詞最熱門了。
這種方法在數據集比較耗時,是非常有效的,而且實現最簡單,用來解決這個問題很合適。
方法二:寫一個多線程程序,併發遍歷論文。
這個問題理論上是可以高度併發的,因為統計一個文件時不會影響統計另一個文件。當機器是多核或者多處理器,方法二肯定比方法一高效。但是寫一個多線程程序要比方法一困難多了,必須用戶同步共享數據,比如要防止兩個線程重複統計文件。
方法三:把作業交給多個計算機去完成。
可以使用方法一的程序,部署到N台機器上去,然後把論文集分成N份,一台機器跑一個作業。這個方法跑得足夠快,但是部署起來很麻煩,人工把程序copy到別的機器,要人工把論文集分開,最痛苦的是還要把N個運行結果進行整合(當然用戶也可以再寫一個程序)。