3-1、Aapche Thrift與訊息格式
Apache Thrift支援多種訊息格式封裝。這些訊息格式是如果進行編碼和解碼的是不需要使用者關心的,只需要根據自己的需要制定不同的訊息封裝格式即可。Apache Thrift所有訊息格式封裝的實現,都繼承與TProtocol這個抽象類,如下圖所示:
3-1-1、TBinaryProtocol二進位制流的編碼格式。由於需要支援跨語言,所以Apache Thrift支援有限的幾種通用型別,包括基本型別(Float、Double、Integer、Long、String、Short)、集合型別(Map、Set、List)還有Pojo型別(實際上就是前兩者若干型別的組合形式)。
那麼這個類所生成的二進位制流和傳統的java序列化後生成的二進位制流有什麼樣的區別(或者是優勢)呢?我們可以透過閱讀TBinaryProtocol的原始碼進行研究。
我們以TBinaryProtocol中,對Integer的序列化過程進行詳細的解釋,來對比java提供的其他幾種序列化的方式找到不同。首先java中,如果要將一個Integer物件透過網路傳送出去,要做的第一件事情就是序列化,那麼我們常用的序列化方式有兩種,如下所示:
java中序列化Integer物件的第一種方法:Integer integerObject = 10066329;integerObject.toString().getBytes();
java中序列化Integer物件的第二種方法:
ByteArrayOutputStream aStream = new ByteArrayOutputStream();ObjectOutputStream oStream = new ObjectOutputStream(aStream);oStream.writeObject(integerObject);aStream.toByteArray();
第一種方式是將Integer物件中的值序列化;第二種方式,是將Integer整個物件序列化。這兩種方式雖然都產生byte[],實際上性質是完全不一樣的。我們來看一下這兩種方式產生的byte[]的內容:
序列化Integer的值:[49, 48, 48, 54, 54, 51, 50, 57]
序列化整個Integer物件:[-84, -19, 0, 5, 115, 114, 0, 17, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 73, 110, 116, 101, 103, 101, 114, 18, -30, -96, -92, -9, -127, -121, 56, 2, 0, 1, 73, 0, 5, 118, 97, 108, 117, 101, 120, 114, 0, 16, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 78, 117, 109, 98, 101, 114, -122, -84, -107, 29, 11, -108, -32, -117, 2, 0, 0, 120, 112, 0, -103, -103, -103]
第一種方式序列化後,byte陣列有8個byte元素(因為是首先轉換成字串的,所以實際上這個大小會隨著Integer值的大小增加而增加);第二中方式序列化後,byte陣列一共有 > 20 個byte元素,其中除了記錄Integer的值以外,還包括描述這個型別的其他屬性。
那麼我們再來看看TBinaryProtocol中,是如何序列化Integer型別的。首先我們來看一下TBinaryProtocol進行Integer序列化的這部分原始碼,如下圖所示:
private byte[] i32out = new byte[4];public void writeI32(int i32) throws TException { i32out[0] = (byte)(0xff & (i32 >> 24)); i32out[1] = (byte)(0xff & (i32 >> 16)); i32out[2] = (byte)(0xff & (i32 >> 8)); i32out[3] = (byte)(0xff & (i32)); trans_.write(i32out, 0, 4);}
計算過程可以透過下圖來表示:
透過4次位計算,得到了一個長度為4個byte陣列,並且這個陣列的大小並不會隨著整數大小的增加而變化。並且位運算的速度是所有計算中速度最快的一種計算。反序列化的過程相似,對這個大小為4的byte[]陣列重新進行位計算即可:
((buf[off] & 0xff) << 24) |((buf[off+1] & 0xff) << 16) |((buf[off+2] & 0xff) << 8) |((buf[off+3] & 0xff));1234
由於本文的篇幅和寫作目的所限,不能一一介紹TBinaryProtocol的各種序列化方式,但是透過對TBinaryProtocol中Integer的序列化過程,我們可以找到TBinaryProtocol處理過程的優勢,包括速度和大小的優勢。所以,如果您的使用環境對序列化過程沒有特別的要求(例如後面要提到的大量的負數情況),那麼直接使用TBinaryProtocol進行資料格式的封裝的就可以了。
byte是一個8位二進位制描述(一個位元組),在java中,一個int需要4個byte進行表示,而。“0x”的字首表示16進位制數字,那麼0xff的二進位制表示就是 1111 1111;“&”是“與”運算子,這個運算子用於二進位制計算,1 & 1 = 1,其餘情況都 = 0;“<<” 表示左移運算,0011 << 2 = 1100;”>>”表示右移運算,1100 >> 2 = 0011;
3-1-2、TCompactProtocol使用zigzag編碼方式緊湊傳輸協議。zigzag編碼的優勢在於記錄數字型別(整數、單精度浮點和雙精度浮點),最特別的是zigzag編碼對負數的記錄。在計算機中,都會使用很大的數字表示負數,為了保證節約傳輸量,zigzag編碼採用正數與負數交錯的方式,把負數轉換為一個正數進行記錄。下面我們具體來分析一下TCompactProtocol中對32位整數的序列化方式,以下是TCompactProtocol中對32為整數的處理程式碼:
/** * Write an i32 as a zigzag varint. */public void writeI32(int i32) throws TException { writeVarint32(intToZigZag(i32));}/** * Convert n into a zigzag int. This allows negative numbers to be * represented compactly as a varint. */private int intToZigZag(int n) { return (n << 1) ^ (n >> 31);}/** * Write an i32 as a varint. Results in 1-5 bytes on the wire. * TODO: make a permanent buffer like writeVarint64? */byte[] i32buf = new byte[5];private void writeVarint32(int n) throws TException { int idx = 0; while (true) { if ((n & ~0x7F) == 0) { i32buf[idx++] = (byte)n; // writeByteDirect((byte)n); break; } else { i32buf[idx++] = (byte)((n & 0x7F) | 0x80); // writeByteDirect((byte)((n & 0x7F) | 0x80)); n >>>= 7; } } trans_.write(i32buf, 0, idx);}
以上程式碼片段一共有一個對外的呼叫方法,和兩個分別名為intToZigZag和writeVarint32的私有方法。從字面上的意義我們可以知道:當對一個32位整數進行編碼時,首先將這個32位整數轉成ZigZag編碼格式,然後在序列化為“變長的32位整數”。那麼這個處理的具體過程是什麼樣的呢?我們以一個較大的32位整數(161061273,二進位制計數為:1001100110011001100110011001)為例,進行講解:
首先將整個這個整數做成ZigZag編碼格式:然後進行“變長”處理:可以看到,上面的“變長”計算一共進行了5次,比TBinaryProtocol中的32位整數序列化還要多出一個byte。這是為什麼呢?因為這個數字比較長。但實際處理中,我們一般使用的資料都是比較小的。這也是為什麼首先要使用ZigZag編碼把某個負數的符號位從高位移動到低位的原因。實際上,在實際過程中,變長計算一般只會進行二至三次就完成。這樣,在大多數情況下,完成一個32位整數的序列化,TCompactProtocol做使用的空間就比TBinaryProtocol要小。那麼經過分析,對於TCompactProtocol和TBinaryProtocol的選擇的經驗是:如果傳輸的資訊中,基本都是字串,那麼使用TCompactProtocol還是使用TBinaryProtocol基本上都是差不多的;如果需要傳輸的資訊中,會有較多的“低位數字”,那麼建議使用TCompactProtocol。3-1-3、其他傳輸格式封裝:當然Apache Thrift還提供其他的傳輸格式封裝。不同的需求場景下,您可以使用根據需要選用這些資訊傳輸格式:
3-2、Aapche Thrift與通訊模型Apache Thrift支援阻塞式同步IO通訊模型和非阻塞式非同步IO通訊模型。這裡說明一下,我在這個系列的文章中,已經詳細講述了各種IO模型的特點和工作原理(請參見我另外幾篇文章《架構設計:系統間通訊(3)——IO通訊模型和JAVA實踐 上篇》、《架構設計:系統間通訊(4)——IO通訊模型和JAVA實踐 中篇》、《架構設計:系統間通訊(5)——IO通訊模型和JAVA實踐 下篇》)。所以讀者您如果度過本人的拙作,那麼您一定清楚,要發揮Apache Thrift效能上的優勢,那麼一定要在正式生產環境中採用Apache Thrift對非阻塞式非同步IO通訊模型的支援。下面的程式碼我們將向您展示Apache Thrift的這種特性:
在給出示例程式碼之前一定要再強調一次,Apache Thrift的伺服器端和客戶端一定要採用相同的通訊模型。這就是說如果Apache Thrift的伺服器端採用的是非阻塞非同步通訊模型,那麼Apache Thrift客戶端也一定要採用非阻塞非同步通訊模型,否則就無法通訊。
伺服器的非阻塞非同步通訊程式碼:package testThrift.man;import java.nio.channels.Selector;import java.util.concurrent.Executors;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.log4j.BasicConfigurator;import org.apache.thrift.TProcessor;import org.apache.thrift.protocol.TBinaryProtocol;import org.apache.thrift.server.THsHaServer;import org.apache.thrift.transport.TNonblockingServerSocket;import testThrift.iface.HelloWorldService;import testThrift.iface.HelloWorldService.Iface;import testThrift.impl.HelloWorldServiceImpl;public class HelloNonServerDemo { static { BasicConfigurator.configure(); } /** * 日誌 */ private static Log LOGGER = LogFactory.getLog(HelloNonServerDemo.class); public static final int SERVER_PORT = 8090; public void startServer() { try { // log4j日誌,如果您工程裡面沒有加入log4j的支援,請待用system.out HelloNonServerDemo.LOGGER.info("HelloWorld TSimpleServer start ...."); // 服務執行控制器(告訴apache thrift,實現了HelloWorldService.Iface介面的是具體的哪一個類) // HelloWorldServiceImpl類的程式碼,就不在贅述了,無論採用哪種通訊模型,它的程式碼都不會變化 TProcessor tprocessor = new HelloWorldService.Processor<Iface>(new HelloWorldServiceImpl()); // 非阻塞非同步通訊模型(伺服器端) TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(HelloNonServerDemo.SERVER_PORT); // Selector這個類,是不是很熟悉。 serverTransport.registerSelector(Selector.open()); THsHaServer.Args tArgs = new THsHaServer.Args(serverTransport); tArgs.processor(tprocessor); // 指定訊息的封裝格式(採用二進位制流封裝) tArgs.protocolFactory(new TBinaryProtocol.Factory()); // 指定處理器的所使用的執行緒池。 tArgs.executorService(Executors.newFixedThreadPool(100)); // 啟動服務 THsHaServer server = new THsHaServer(tArgs); server.serve(); } catch (Exception e) { HelloNonServerDemo.LOGGER.error(e); } } /** * @param args */ public static void main(String[] args) { HelloNonServerDemo server = new HelloNonServerDemo(); server.startServer(); }}
客戶端的非阻塞非同步通訊程式碼:package testThrift.client;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.log4j.BasicConfigurator;import org.apache.thrift.TException;import org.apache.thrift.async.AsyncMethodCallback;import org.apache.thrift.async.TAsyncClientManager;import org.apache.thrift.protocol.TBinaryProtocol;import org.apache.thrift.transport.TNonblockingSocket;import testThrift.iface.HelloWorldService;import testThrift.iface.Reponse;import testThrift.iface.HelloWorldService.AsyncClient;import testThrift.iface.HelloWorldService.AsyncClient.send_call;import testThrift.iface.Request;public class HelloNonClient { static { BasicConfigurator.configure(); } /** * 日誌 */ private static final Log LOGGER = LogFactory.getLog(HelloNonClient.class); private static Object WAITOBJECT = new Object(); public static final void main(String[] args) throws Exception { TNonblockingSocket transport = new TNonblockingSocket("127.0.0.1", 8090); TAsyncClientManager clientManager = new TAsyncClientManager(); // 準備呼叫引數(這個testThrift.iface.Request,是我們透過IDL定義,並且生成的) Request request = new Request("{\"param\":\"field1\"}", "\\mySerivce\\queryService"); // 這是客戶端對非阻塞非同步網路通訊方式的支援。 // 注意使用的訊息封裝格式,一定要和伺服器端使用的一致 HelloWorldService.AsyncClient asyncClient = new HelloWorldService.AsyncClient.Factory(clientManager, new TBinaryProtocol.Factory()).getAsyncClient(transport); // 既然是非阻塞非同步模式,所以客戶端一定是透過“事件回撥”方式,接收到伺服器的響應通知的 asyncClient.send(request,new AsyncMethodCallback<AsyncClient.send_call>() { /** * 當伺服器正確響應了客戶端的請求後,這個事件被觸發 */ @Override public void onComplete(send_call call) { Reponse response = null; try { response = call.getResult(); } catch (TException e) { HelloNonClient.LOGGER.error(e); return; } HelloNonClient.LOGGER.info("response = " + response); } /** * 當伺服器沒有正確響應了客戶端的請求,或者其中過程中出現了不可控制的情況。 * 那麼這個事件會被觸發 */ @Override public void onError(Exception exception) { HelloNonClient.LOGGER.info("exception = " + exception); } }); //這段程式碼保證客戶端在得到伺服器回覆前,應用程式本身不會終止 synchronized (HelloNonClient.WAITOBJECT) { HelloNonClient.WAITOBJECT.wait(); } }}1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
以上程式碼是可以直接工作的。讀者可以直接在自己的工程中執行。執行的結果和Apache Thrift上一節中Apache Thrift阻塞模式下的執行結果是一致的,只是執行過程不一樣。目前各種主流的RPC框架基本都支援非阻塞式非同步IO網路通訊,如果您有興趣進行這些RPC框架的效能比較,一定要在相同的IO通訊模型下進行。
3-3、Aapche Thrift與執行緒池在之前的文章(《架構設計:系統間通訊(10)——RPC的基本概念》),我們已經提到影響一款RPC框架效能的主要指標。除了RPC框架實現的資料封裝格式、RPC框架支援的網路通訊模型外,還有一個重要的指標就是它如何執行客戶端的請求。
在Apache Thrift中,它使用執行緒池技術執行具體的介面實現,響應客戶端請求(無論Apahce Thrift使用哪種資料封裝格式、使用哪種網路通訊模型)。
org.apache.thrift.server.THsHaServer.Args.executorService(ExecutorService executorService)1
可以看到,實際上Apache Thrift中設定執行緒池的方法,所要求的引數型別是java.util.concurrent.ExecutorService介面,也就是說只要實現了ExecutorService介面的類都可以被傳入。一般我們常使用的是java.util.concurrent.ThreadPoolExecutor這個類。
4、下文預告在本篇文章中,我們詳細描述了Apache Thrift中和效能息息相關的三個要素:資料封裝格式的實現、網路IO模型的支援 和 處理客戶端請求的方式。正式有這些實現的細節,才使Apache Thrift成為一款主流的RPC框架。那麼我們在正式生產環境中,應該如何使用RPC框架才科學呢?在下文中,我們將結合RPC的特點和我自己的工作經歷,向各位讀者進行介紹。