一,、引言 1、 簡介 Avro是Hadoop中的一個(gè)子項(xiàng)目,,也是Apache中一個(gè)獨(dú)立的項(xiàng)目,,Avro是一個(gè)基于二進(jìn)制數(shù)據(jù)傳輸高性能的中間件。在Hadoop的其他項(xiàng)目中例如HBase(Ref)和Hive(Ref)的Client端與服務(wù)端的數(shù)據(jù)傳輸也采用了這個(gè)工具,。Avro是一個(gè)數(shù)據(jù)序列化的系統(tǒng),。Avro 可以將數(shù)據(jù)結(jié)構(gòu)或?qū)ο筠D(zhuǎn)化成便于存儲或傳輸?shù)母袷健vro設(shè)計(jì)之初就用來支持?jǐn)?shù)據(jù)密集型應(yīng)用,,適合于遠(yuǎn)程或本地大規(guī)模數(shù)據(jù)的存儲和交換,。 2、 特點(diǎn) 豐富的數(shù)據(jù)結(jié)構(gòu)類型,; 快速可壓縮的二進(jìn)制數(shù)據(jù)形式,,對數(shù)據(jù)二進(jìn)制序列化后可以節(jié)約數(shù)據(jù)存儲空間和網(wǎng)絡(luò)傳輸帶寬; 存儲持久數(shù)據(jù)的文件容器; 可以實(shí)現(xiàn)遠(yuǎn)程過程調(diào)用RPC,; 簡單的動態(tài)語言結(jié)合功能,。 avro支持跨編程語言實(shí)現(xiàn)(C, C++, C#,Java, Python, Ruby, PHP),,類似于Thrift,,但是avro的顯著特征是:avro依賴于模式,動態(tài)加載相關(guān)數(shù)據(jù)的模式,,Avro數(shù)據(jù)的讀寫操作很頻繁,,而這些操作使用的都是模式,這樣就減少寫入每個(gè)數(shù)據(jù)文件的開銷,,使得序列化快速而又輕巧,。這種數(shù)據(jù)及其模式的自我描述方便了動態(tài)腳本語言的使用。當(dāng)Avro數(shù)據(jù)存儲到文件中時(shí),,它的模式也隨之存儲,,這樣任何程序都可以對文件進(jìn)行處理。如果讀取數(shù)據(jù)時(shí)使用的模式與寫入數(shù)據(jù)時(shí)使用的模式不同,,也很容易解決,,因?yàn)樽x取和寫入的模式都是已知的。
Avro和動態(tài)語言結(jié)合后,,讀/寫數(shù)據(jù)文件和使用RPC協(xié)議都不需要生成代碼,,而代碼生成作為一種可選的優(yōu)化只需要在靜態(tài)類型語言中實(shí)現(xiàn)。 Avro依賴于模式(Schema),。通過模式定義各種數(shù)據(jù)結(jié)構(gòu),,只有確定了模式才能對數(shù)據(jù)進(jìn)行解釋,所以在數(shù)據(jù)的序列化和反序列化之前,,必須先確定模式的結(jié)構(gòu),。正是模式的引入,使得數(shù)據(jù)具有了自描述的功能,,同時(shí)能夠?qū)崿F(xiàn)動態(tài)加載,,另外與其他的數(shù)據(jù)序列化系統(tǒng)如Thrift相比,數(shù)據(jù)之間不存在其他的任何標(biāo)識,,有利于提高數(shù)據(jù)處理的效率,。 二、技術(shù)要領(lǐng) 1,、 類型 數(shù)據(jù)類型標(biāo)準(zhǔn)化的意義:一方面使不同系統(tǒng)對相同的數(shù)據(jù)能夠正確解析,,另一方面,數(shù)據(jù)類型的標(biāo)準(zhǔn)定義有利于數(shù)據(jù)序列化/反序列化,。 簡單的數(shù)據(jù)類型:Avro定義了幾種簡單數(shù)據(jù)類型,,下表是其簡單說明:
簡單數(shù)據(jù)類型由類型名稱定義,不包含屬性信息,例如字符串定義如下: {"type": "string"} 復(fù)雜數(shù)據(jù)類型:Avro定義了六種復(fù)雜數(shù)據(jù)類型,,每一種復(fù)雜數(shù)據(jù)類型都具有獨(dú)特的屬性,,下表就每一種復(fù)雜數(shù)據(jù)類型進(jìn)行說明。
每一種復(fù)雜數(shù)據(jù)類型都含有各自的一些屬性,,其中部分屬性是必需的,,部分是可選的。 這里需要說明Record類型中field屬性的默認(rèn)值,,當(dāng)Record Schema實(shí)例數(shù)據(jù)中某個(gè)field屬性沒有提供實(shí)例數(shù)據(jù)時(shí),,則由默認(rèn)值提供,具體值見下表,。Union的field默認(rèn)值由Union定義中的第一個(gè)Schema決定,。
2、 序列化/反序列化 Avro指定兩種數(shù)據(jù)序列化編碼方式:binary encoding 和Json encoding,。使用二進(jìn)制編碼會高效序列化,,并且序列化后得到的結(jié)果會比較小,;而JSON一般用于調(diào)試系統(tǒng)或是基于WEB的應(yīng)用,。 binary encoding規(guī)則如下: 1、 簡單數(shù)據(jù)類型
2,、 復(fù)雜數(shù)據(jù)類型
實(shí)例: records { "type":"record", "name":"test", "fields" : [ {"name": "a","type": "long"}, {"name": "b","type": "string"} ] } 假設(shè):a=27b=”foo” (encoding:36(27), 06(3), 66("f"), 6f("o")) binary encoding:3606 66 6f 6f enums {"type": "enum","name": "Foo", "symbols": ["A","B", "C", "D"] } “D”(encoding: 06(3)) binary encoding: 06 arrays {"type": "array","items": "long"} 設(shè):{3, 27 } (encoding:04(2), 06(3), 36(27) ) binary encoding:0406 36 00 maps 設(shè):{("a":1), ("b":2) } (encoding:61(“a”), 62(“b”), 02(1), 04(2)) binary encoding:0261 02 02 62 04 unions ["string","null"] 設(shè):(1)null; (2) “a” binary encoding: (1) 02,;說明:02代表null在union定義中的位置1; (2) 00 02 61,;說明:00為string在union定義的位置,,02 61為”a”的編碼,。
圖1表示的是Avro本地序列化和反序列化的實(shí)例,,它將用戶定義的模式和具體的數(shù)據(jù)編碼成二進(jìn)制序列存儲在對象容器文件中,例如用戶定義了包含學(xué)號,、姓名,、院系和電話的學(xué)生模式,而Avro對其進(jìn)行編碼后存儲在student.db文件中,,其中存儲數(shù)據(jù)的模式放在文件頭的元數(shù)據(jù)中,,這樣讀取的模式即使與寫入的模式不同,也可以迅速地讀出數(shù)據(jù),。假如另一個(gè)程序需要獲取學(xué)生的姓名和電話,,只需要定義包含姓名和電話的學(xué)生模式,然后用此模式去讀取容器文件中的數(shù)據(jù)即可,。 圖表 1
3,、 模式Schema Schema通過JSON對象表示。Schema定義了簡單數(shù)據(jù)類型和復(fù)雜數(shù)據(jù)類型,其中復(fù)雜數(shù)據(jù)類型包含不同屬性,。通過各種數(shù)據(jù)類型用戶可以自定義豐富的數(shù)據(jù)結(jié)構(gòu),。 Schema由下列JSON對象之一定義: 1. JSON字符串:命名 2. JSON對象:{“type”: “typeName” …attributes…} 3. JSON數(shù)組:Avro中Union的定義 舉例: {"namespace": "example.avro", "type":"record", "name":"User", "fields": [ {"name":"name", "type": "string"}, {"name":"favorite_number", "type": ["int", "null"]}, {"name":"favorite_color", "type": ["string","null"]} ] } 4、 排序 Avro為數(shù)據(jù)定義了一個(gè)標(biāo)準(zhǔn)的排列順序,。比較在很多時(shí)候是經(jīng)常被使用到的對象之間的操作,,標(biāo)準(zhǔn)定義可以進(jìn)行方便有效的比較和排序。同時(shí)標(biāo)準(zhǔn)的定義可以方便對Avro的二進(jìn)制編碼數(shù)據(jù)直接進(jìn)行排序而不需要反序列化,。 只有當(dāng)數(shù)據(jù)項(xiàng)包含相同的Schema的時(shí)候,,數(shù)據(jù)之間的比較才有意義。數(shù)據(jù)的比較按照Schema深度優(yōu)先,,從左至右的順序遞歸的進(jìn)行,。找到第一個(gè)不匹配即可終止比較。 兩個(gè)擁有相同的模式的項(xiàng)的比較按照以下規(guī)則進(jìn)行: null:總是相等,。 int,long,float:按照數(shù)值大小比較,。 boolean:false在true之前。 string:按照字典序進(jìn)行比較,。 bytes,,fixed:按照byte的字典序進(jìn)行比較。 array:按照元素的字典序進(jìn)行比較,。 enum:按照符號在枚舉中的位置比較,。 record:按照域的字典序排序,如果指定了以下屬性: “ascending”,,域值的順序不變,。 “descending”,域值的順序顛倒,。 “ignore”,,排序的時(shí)候忽略域值。 map:不可進(jìn)行比較,。
5,、 對象容器文件 Avro定義了一個(gè)簡單的對象容器文件格式。一個(gè)文件對應(yīng)一個(gè)模式,,所有存儲在文件中的對象都是根據(jù)模式寫入的,。對象按照塊進(jìn)行存儲,塊可以采用壓縮的方式存儲,。為了在進(jìn)行mapreduce處理的時(shí)候有效的切分文件,,在塊之間采用了同步記號。一個(gè)文件可以包含任意用戶定義的元數(shù)據(jù),。 一個(gè)文件由兩部分組成:文件頭和一個(gè)或者多個(gè)文件數(shù)據(jù)塊,。 文件頭: 四個(gè)字節(jié),,ASCII‘O’,‘b’,,‘j’,1,。 文件元數(shù)據(jù),用于描述Schema,。 16字節(jié)的文件同步記號,。 其中,文件元數(shù)據(jù)的格式為: i. 值為-1的長整型,,表明這是一個(gè)元數(shù)據(jù)塊,。 ii. 標(biāo)識塊長度的長整型。 iii. 標(biāo)識塊中key/value對數(shù)目的長整型,。 iv. 每一個(gè)key/value對的string key和bytesvalue,。 v. 標(biāo)識塊中字節(jié)總數(shù)的4字節(jié)長的整數(shù)。 文件數(shù)據(jù)塊: 數(shù)據(jù)是以塊結(jié)構(gòu)進(jìn)行組織的,,一個(gè)文件可以包含一個(gè)或者多個(gè)文件數(shù)據(jù)塊,。 表示文件中塊中對象數(shù)目的長整型。 表示塊中數(shù)據(jù)序列化后的字節(jié)數(shù)長度的長整型,。 序列化的對象,。 16字節(jié)的文件同步記號。 當(dāng)數(shù)據(jù)塊的長度為0時(shí)即為文件數(shù)據(jù)塊的最后一個(gè)數(shù)據(jù),,此后的所有數(shù)據(jù)被自動忽略,。 下圖示對象容器文件的結(jié)構(gòu)分解及說明:
一個(gè)存儲文件由兩部分組成:頭信息(Header)和數(shù)據(jù)塊(Data Block)。而頭信息又由三部分構(gòu)成:四個(gè)字節(jié)的前綴,,文件Meta-data信息和隨機(jī)生成的16字節(jié)同步標(biāo)記符,。Avro目前支持的Meta-data有兩種:schema和codec。 codec表示對后面的文件數(shù)據(jù)塊(File Data Block)采用何種壓縮方式,。Avro的實(shí)現(xiàn)都需要支持下面兩種壓縮方式:null(不壓縮)和deflate(使用Deflate算法壓縮數(shù)據(jù)塊),。除了文檔中認(rèn)定的兩種Meta-data,用戶還可以自定義適用于自己的Meta-data,。這里用long型來表示有多少個(gè)Meta-data數(shù)據(jù)對,,也是讓用戶在實(shí)際應(yīng)用中可以定義足夠的Meta-data信息,。對于每對Meta-data信息,,都有一個(gè)string型的key(需要以“avro.” 為前綴)和二進(jìn)制編碼后的value。對于文件中頭信息之后的每個(gè)數(shù)據(jù)塊,,有這樣的結(jié)構(gòu):一個(gè)long值記錄當(dāng)前塊有多少個(gè)對象,,一個(gè)long值用于記錄當(dāng)前塊經(jīng)過壓縮后的字節(jié)數(shù),真正的序列化對象和16字節(jié)長度的同步標(biāo)記符,。由于對象可以組織成不同的塊,,使用時(shí)就可以不經(jīng)過反序列化而對某個(gè)數(shù)據(jù)塊進(jìn)行操作,。還可以由數(shù)據(jù)塊數(shù),對象數(shù)和同步標(biāo)記符來定位損壞的塊以確保數(shù)據(jù)完整性,。 三,、RPC實(shí)現(xiàn) 當(dāng)在RPC中使用Avro時(shí),服務(wù)器和客戶端可以在握手連接時(shí)交換模式,。服務(wù)器和客戶端有彼此全部的模式,,因此相同命名字段、缺失字段和多余字段等信息之間通信中需要處理的一致性問題就可以容易解決,。如圖2所示,,協(xié)議中定義了用于傳輸?shù)南ⅲ⑹褂每蚣芎蠓湃刖彌_區(qū)中進(jìn)行傳輸,,由于傳輸?shù)某跏季徒粨Q了各自的協(xié)議定義,,因此即使傳輸雙方使用的協(xié)議不同所傳輸?shù)臄?shù)據(jù)也能夠正確解析。 圖表 2 Avro作為RPC框架來使用,??蛻舳讼M?wù)器端交互時(shí),就需要交換雙方通信的協(xié)議,,它類似于模式,,需要雙方來定義,在Avro中被稱為消息(Message),。通信雙方都必須保持這種協(xié)議,,以便于解析從對方發(fā)送過來的數(shù)據(jù),這也就是傳說中的握手階段,。 消息從客戶端發(fā)送到服務(wù)器端需要經(jīng)過傳輸層(Transport Layer),,它發(fā)送消息并接收服務(wù)器端的響應(yīng)。到達(dá)傳輸層的數(shù)據(jù)就是二進(jìn)制數(shù)據(jù),。通常以HTTP作為傳輸模型,,數(shù)據(jù)以POST方式發(fā)送到對方去。在 Avro中,,它的消息被封裝成為一組緩沖區(qū)(Buffer),,類似于下圖的模型: 如上圖,每個(gè)緩沖區(qū)以四個(gè)字節(jié)開頭,,中間是多個(gè)字節(jié)的緩沖數(shù)據(jù),,最后以一個(gè)空緩沖區(qū)結(jié)尾。這種機(jī)制的好處在于,,發(fā)送端在發(fā)送數(shù)據(jù)時(shí)可以很方便地組裝不同數(shù)據(jù)源的數(shù)據(jù),,接收方也可以將數(shù)據(jù)存入不同的存儲區(qū)。還有,,當(dāng)往緩沖區(qū)中寫數(shù)據(jù)時(shí),,大對象可以獨(dú)占一個(gè)緩沖區(qū),,而不是與其它小對象混合存放,便于接收方方便地讀取大對象,。 對象容器文件是Avro的數(shù)據(jù)存儲的具體實(shí)現(xiàn),,數(shù)據(jù)交換則由RPC服務(wù)提供,與對象容器文件類似,,數(shù)據(jù)交換也完全依賴Schema,,所以與Hadoop目前的RPC不同,Avro在數(shù)據(jù)交換之前需要通過握手過程先交換Schema,。 1,、 握手過程 握手的過程是確保Server和Client獲得對方的Schema定義,從而使Server能夠正確反序列化請求信息,,Client能夠正確反序列化響應(yīng)信息,。一般的,Server/Client會緩存最近使用到的一些協(xié)議格式,,所以,,大多數(shù)情況下,握手過程不需要交換整個(gè)Schema文本,。 所有的RPC請求和響應(yīng)處理都建立在已經(jīng)完成握手的基礎(chǔ)上,。對于無狀態(tài)的連接,所有的請求響應(yīng)之前都附有一次握手過程,;對于有狀態(tài)的連接,,一次握手完成,整個(gè)連接的生命期內(nèi)都有效,。 具體過程: Client發(fā)起HandshakeRequest,,其中含有Client本身SchemaHash值和對應(yīng)Server端的Schema Hash值(clientHash!=null,clientProtocol=null, serverHash!=null)。如果本地緩存有serverHash值則直接填充,,如果沒有則通過猜測填充,。 Server用如下之一HandshakeResponse響應(yīng)Client請求: (match=BOTH, serverProtocol=null,serverHash=null):當(dāng)Client發(fā)送正確的serverHash值且Server緩存相應(yīng)的clientHash。握手過程完成,,之后的數(shù)據(jù)交換都遵守本次握手結(jié)果,。 (match=CLIENT, serverProtocol!=null,serverHash!=null):當(dāng)Server緩存有Client的Schema,但是Client請求中ServerHash值不正確,。此時(shí)Server發(fā)送Server端的Schema數(shù)據(jù)和相應(yīng)的Hash值,,此次握手完成,之后的數(shù)據(jù)交換都遵守本次握手結(jié)果,。 (match=NONE):當(dāng)Client發(fā)送的ServerHash不正確且Server端沒有Client Schema的緩存,。這種情況下Client需要重新提交請求信息 (clientHash!=null,clientProtocol!=null, serverHash!=null),,Server響應(yīng) (match=BOTH, serverProtocol=null,serverHash=null),,此次握手過程完成,,之后的數(shù)據(jù)交換都遵守本次握手結(jié)果。 握手過程使用的Schema結(jié)構(gòu)如下示,。 { "type":"record", "name":"HandshakeRequest","namespace":"org.apache.avro.ipc", "fields":[ {"name":"clientHash", "type": {"type": "fixed","name": "MD5", "size": 16}}, {"name":"clientProtocol", "type": ["null","string"]}, {"name":"serverHash", "type": "MD5"}, {"name":"meta", "type": ["null", {"type":"map", "values": "bytes"}]} ] } { "type":"record", "name":"HandshakeResponse", "namespace":"org.apache.avro.ipc", "fields":[ {"name":"match","type": {"type": "enum","name": "HandshakeMatch", "symbols":["BOTH", "CLIENT", "NONE"]}}, {"name":"serverProtocol", "type": ["null","string"]}, {"name":"serverHash","type": ["null", {"type":"fixed", "name": "MD5", "size": 16}]}, {"name":"meta","type": ["null", {"type":"map", "values": "bytes"}]} ] } 2,、 消息幀格式 消息從客戶端發(fā)送到服務(wù)器端需要經(jīng)過傳輸層,它發(fā)送請求并接收服務(wù)器端的響應(yīng),。到達(dá)傳輸層的數(shù)據(jù)就是二進(jìn)制數(shù)據(jù),。通常以HTTP作為傳輸模型,數(shù)據(jù)以POST方式發(fā)送到對方去,。在 Avro中消息首先分幀后被封裝成為一組緩沖區(qū)(Buffer),。 數(shù)據(jù)幀的格式如下: 一系列Buffer: 1、4字節(jié)的Buffer長度 2,、Buffer字節(jié)數(shù)據(jù) 長度為0的Buffer結(jié)束數(shù)據(jù)幀 3,、 Call格式 一個(gè)調(diào)用由請求消息、結(jié)果響應(yīng)消息或者錯(cuò)誤消息組成,。請求和響應(yīng)包含可擴(kuò)展的元數(shù)據(jù),,兩種消息都按照之前提出的方法分幀。 調(diào)用的請求格式為: 請求元數(shù)據(jù),,一個(gè)類型值的映射,。 消息名,一個(gè)Avro字符串,。 消息參數(shù),。參數(shù)根據(jù)消息的請求定義序列化。 調(diào)用的響應(yīng)格式為: 響應(yīng)的元數(shù)據(jù),,一個(gè)類型值的映射,。 一字節(jié)的錯(cuò)誤標(biāo)志位。 如果錯(cuò)誤標(biāo)志為false,,響應(yīng)消息,,根據(jù)響應(yīng)的模式序列化。 如果錯(cuò)誤標(biāo)志位true,,錯(cuò)誤消息,,根據(jù)消息的錯(cuò)誤聯(lián)合模式序列化。 四,、實(shí)例 1,、 本地序列化/反序列化 user.avsc {"namespace":"example.avro", "type": "record", "name": "User", "fields": [ {"name": "name", "type":"string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type":["string", "null"]} ] } Main.java public class Main { public static void main(String[] args)throws Exception { User user1 = new User(); user1.setName("Alyssa"); user1.setFavoriteNumber(256); // Leave favorite color null // Alternate constructor User user2 = new User("Ben", 7,"red"); // Construct via builder User user3 = User.newBuilder() .setName("Charlie") .setFavoriteColor("blue") .setFavoriteNumber(null) .build(); // Serialize user1 and user2to disk File file = new File("users.avro"); DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class); DataFileWriter<User> dataFileWriter = newDataFileWriter<User>(userDatumWriter); dataFileWriter.create(user1.getSchema(),new File("users.avro")); dataFileWriter.append(user1); dataFileWriter.append(user2); dataFileWriter.append(user3); dataFileWriter.close(); // Deserialize Usersfrom disk DatumReader<User> userDatumReader = newSpecificDatumReader<User>(User.class); DataFileReader<User> dataFileReader = newDataFileReader<User>(file, userDatumReader); User user = null; while (dataFileReader.hasNext()) { // Reuse user object bypassing it to next(). This saves us from // allocating and garbagecollecting many objects for files with // many items. user = dataFileReader.next(user); System.out.println(user); } } } 2、 RPC mail.avsc {"namespace":"example.proto", "protocol": "Mail", "types": [ {"name": "Message", "type":"record", "fields": [ {"name": "to", "type": "string"}, {"name": "from", "type": "string"}, {"name": "body", "type":"string"} ] } ], "messages": { "send": { "request": [{"name": "message","type": "Message"}], "response": "string" } } } Main.java public class Main { public static class MailImpl implements Mail { // in this simple example just return details of the message public Utf8 send(Message message) { System.out.println("Sending message"); return new Utf8("Sending message to " + message.getTo().toString() + " from " +message.getFrom().toString() + " with body " +message.getBody().toString()); } } private static Server server; private static void startServer() throws IOException { server = new NettyServer(new SpecificResponder(Mail.class,new MailImpl()),newInetSocketAddress(65111)); // the server implements the Mail protocol (MailImpl) } public static void main(String[] args)throws IOException { System.out.println("Starting server"); // usually this would be anotherapp, but for simplicity startServer(); System.out.println("Server started"); NettyTransceiver client = new NettyTransceiver(new InetSocketAddress(65111)); // client code - attach to the server and send a message Mail proxy = (Mail) SpecificRequestor.getClient(Mail.class, client); System.out.println("Client built, got proxy"); // fill in the Message record and send it Message message = new Message(); message.setTo(new Utf8("127.0.0.1")); message.setFrom(new Utf8("127.0.0.1")); message.setBody(new Utf8("this is my message")); System.out.println("Calling proxy.send with message: " + message.toString()); System.out.println("Result: " +proxy.send(message)); // cleanup client.close(); server.close(); } } |
|