接前一篇rpc框架之HA/負載均衡構架設計 繼續,寫了一個簡單的thrift 連接池:
先做點準備工作:
package yjmyzz; public class ServerInfo { public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } private String host; private int port; public ServerInfo(String host, int port) { this.host = host; this.port = port; } public String toString() { return "host:" + host + ",port:" + port; } }
上面這個類,用來封裝服務端的基本信息,主機名+端口號,連接時需要用到。
package yjmyzz; import org.apache.thrift.transport.TTransport; import java.text.SimpleDateFormat; import java.util.Date; public class TransfortWrapper { private TTransport transport; /** * 是否正忙 */ private boolean isBusy = false; /** * 是否已經掛 */ private boolean isDead = false; /** * 最后使用時間 */ private Date lastUseTime; /** * 服務端Server主機名或IP */ private String host; /** * 服務端Port */ private int port; public TransfortWrapper(TTransport transport, String host, int port, boolean isOpen) { this.lastUseTime = new Date(); this.transport = transport; this.host = host; this.port = port; if (isOpen) { try { transport.open(); } catch (Exception e) { //e.printStackTrace(); System.err.println(host + ":" + port + " " + e.getMessage()); isDead = true; } } } public TransfortWrapper(TTransport transport, String host, int port) { this(transport, host, port, false); } public boolean isBusy() { return isBusy; } public void setIsBusy(boolean isBusy) { this.isBusy = isBusy; } public boolean isDead() { return isDead; } public void setIsDead(boolean isDead) { this.isDead = isDead; } public TTransport getTransport() { return transport; } public void setTransport(TTransport transport) { this.transport = transport; } /** * 當前transport是否可用 * * @return */ public boolean isAvailable() { return !isBusy && !isDead && transport.isOpen(); } public Date getLastUseTime() { return lastUseTime; } public void setLastUseTime(Date lastUseTime) { this.lastUseTime = lastUseTime; } public String getHost() { return host; } public int getPort() { return port; } public String toString() { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); return "hashCode:" + hashCode() + "," + host + ":" + port + ",isBusy:" + isBusy + ",isDead:" + isDead + ",isOpen:" + transport.isOpen() + ",isAvailable:" + isAvailable() + ",lastUseTime:" + format.format(lastUseTime); } }
這是對TTransport的封裝,主要增加了一些輔助信息,直接看代碼注釋即可。
下面才是連接池的主要內容:
package yjmyzz; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import java.util.Date; import java.util.List; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** * Thrift連接池 * * @author : 菩提樹下的楊過(http://yjmyzz.cnblogs.com/) * @version : 0.1 BETA * @since : 2015-09-27(中秋) */ public class ThriftTransportPool { Semaphore access = null; TransfortWrapper[] pool = null; int poolSize = 1;//連接池大小 int minSize = 1;//池中保持激活狀態的最少連接個數 int maxIdleSecond = 300;//最大空閑時間(秒),超過該時間的空閑時間的連接將被關閉 int checkInvervalSecond = 60;//每隔多少秒,檢測一次空閑連接(默認60秒) List<ServerInfo> serverInfos; boolean allowCheck = true; Thread checkTread = null; public int getCheckInvervalSecond() { return checkInvervalSecond; } public void setCheckInvervalSecond(int checkInvervalSecond) { this.checkInvervalSecond = checkInvervalSecond; } /** * 連接池構造函數 * * @param poolSize 連接池大小 * @param minSize 池中保持激活的最少連接數 * @param maxIdleSecond 單個連接最大空閑時間,超過此值的連接將被斷開 * @param checkInvervalSecond 每隔多少秒檢查一次空閑連接 * @param serverList 服務器列表 */ public ThriftTransportPool(int poolSize, int minSize, int maxIdleSecond, int checkInvervalSecond, List<ServerInfo> serverList) { if (poolSize <= 0) { poolSize = 1; } if (minSize > poolSize) { minSize = poolSize; } if (minSize <= 0) { minSize = 0; } this.maxIdleSecond = maxIdleSecond; this.minSize = minSize; this.poolSize = poolSize; this.serverInfos = serverList; this.allowCheck = true; this.checkInvervalSecond = checkInvervalSecond; init(); check(); } /** * 連接池構造函數(默認最大空閑時間300秒) * * @param poolSize 連接池大小 * @param minSize 池中保持激活的最少連接數 * @param serverList 服務器列表 */ public ThriftTransportPool(int poolSize, int minSize, List<ServerInfo> serverList) { this(poolSize, minSize, 300, 60, serverList); } public ThriftTransportPool(int poolSize, List<ServerInfo> serverList) { this(poolSize, 1, 300, 60, serverList); } public ThriftTransportPool(List<ServerInfo> serverList) { this(serverList.size(), 1, 300, 60, serverList); } /** * 檢查空閑連接 */ private void check() { checkTread = new Thread(new Runnable() { public void run() { while (allowCheck) { //System.out.println("--------------"); System.out.println("開始檢測空閑連接..."); for (int i = 0; i < pool.length; i++) { //if (pool[i] == null) { // System.out.println("pool[" + i + "]為null"); //} //if (pool[i].getTransport() == null) { // System.out.println("pool[" + i + "].getTransport()為null"); //} if (pool[i].isAvailable() && pool[i].getLastUseTime() != null) { long idleTime = new Date().getTime() - pool[i].getLastUseTime().getTime(); //超過空閑閥值的連接,主動斷開,以減少資源消耗 if (idleTime > maxIdleSecond * 1000) { if (getActiveCount() > minSize) { pool[i].getTransport().close(); pool[i].setIsBusy(false); System.out.println(pool[i].hashCode() + "," + pool[i].getHost() + ":" + pool[i].getPort() + " 超過空閑時間閥值被斷開!"); } } } } System.out.println("當前活動連接數:" + getActiveCount()); try { Thread.sleep(checkInvervalSecond * 1000); } catch (Exception e) { e.printStackTrace(); } } } }); checkTread.start(); } /** * 連接池初始化 */ private void init() { access = new Semaphore(poolSize); pool = new TransfortWrapper[poolSize]; for (int i = 0; i < pool.length; i++) { int j = i % serverInfos.size(); TSocket socket = new TSocket(serverInfos.get(j).getHost(), serverInfos.get(j).getPort()); if (i < minSize) { pool[i] = new TransfortWrapper(socket, serverInfos.get(j).getHost(), serverInfos.get(j).getPort(), true); } else { pool[i] = new TransfortWrapper(socket, serverInfos.get(j).getHost(), serverInfos.get(j).getPort()); } } } /** * 從池中取一個可用連接 * @return */ public TTransport get() { try { if (access.tryAcquire(3, TimeUnit.SECONDS)) { synchronized (this) { for (int i = 0; i < pool.length; i++) { if (pool[i].isAvailable()) { pool[i].setIsBusy(true); pool[i].setLastUseTime(new Date()); return pool[i].getTransport(); } } //嘗試激活更多連接 for (int i = 0; i < pool.length; i++) { if (!pool[i].isBusy() && !pool[i].isDead() && !pool[i].getTransport().isOpen()) { try { pool[i].getTransport().open(); pool[i].setIsBusy(true); pool[i].setLastUseTime(new Date()); return pool[i].getTransport(); } catch (Exception e) { //e.printStackTrace(); System.err.println(pool[i].getHost() + ":" + pool[i].getPort() + " " + e.getMessage()); pool[i].setIsDead(true); } } } } } } catch (Exception e) { e.printStackTrace(); throw new RuntimeException("can not get available client"); } throw new RuntimeException("all client is too busy"); } /** * 客戶端調用完成后,必須手動調用此方法,將TTransport恢復為可用狀態 * * @param client */ public void release(TTransport client) { boolean released = false; synchronized (this) { for (int i = 0; i < pool.length; i++) { if (client == pool[i].getTransport() && pool[i].isBusy()) { pool[i].setIsBusy(false); released = true; break; } } } if (released) { access.release(); } } public void destory() { if (pool != null) { for (int i = 0; i < pool.length; i++) { pool[i].getTransport().close(); } } allowCheck = false; checkTread = null; System.out.print("連接池被銷毀!"); } /** * 獲取當前已經激活的連接數 * * @return */ public int getActiveCount() { int result = 0; for (int i = 0; i < pool.length; i++) { if (!pool[i].isDead() && pool[i].getTransport().isOpen()) { result += 1; } } return result; } /** * 獲取當前繁忙狀態的連接數 * * @return */ public int getBusyCount() { int result = 0; for (int i = 0; i < pool.length; i++) { if (!pool[i].isDead() && pool[i].isBusy()) { result += 1; } } return result; } /** * 獲取當前已"掛"掉連接數 * * @return */ public int getDeadCount() { int result = 0; for (int i = 0; i < pool.length; i++) { if (pool[i].isDead()) { result += 1; } } return result; } public String toString() { return "poolsize:" + pool.length + ",minSize:" + minSize + ",maxIdleSecond:" + maxIdleSecond + ",checkInvervalSecond:" + checkInvervalSecond + ",active:" + getActiveCount() + ",busy:" + getBusyCount() + ",dead:" + getDeadCount(); } public String getWrapperInfo(TTransport client) { for (int i = 0; i < pool.length; i++) { if (pool[i].getTransport() == client) { return pool[i].toString(); } } return ""; } }
主要思路:
1.構造器里,傳入 連接池大小,最小連接數,連接最大空閑時間,空間連接檢測時間間隔,服務端列表等基本信息
2.然后調用init方法進行初始化,初始化時把pool[]數組填滿,不過在填充的時候,要根據minsize決定激活多少連接(換句話講,連接實例都都建好了,只是連不連的問題),另外初始化的時候,還要考慮到某個服務器宕機的可能,如果服務端掛了,將對應的實例設置為isDead=true的狀態
3.新開一個線程定時檢查是否有空閑連接,如果空閑時間太長,主動斷開,以節省開銷。
4.get()方法從數組中撈一個可用的連接出來,取的時候要考慮到喚醒"沉睡"連接的情況,即如果當前池中只有2個活動連接,這時又來了請求,沒有活動連接了,要從池中把斷開的連接叫醒一個。
5.要控制并發控制,多個線程同時調用get()想從池中取可用連接時,可用Semaphore+Lock的機制來加以控制,可參考上一篇內容。
測試:
package yjmyzz; import org.apache.thrift.transport.TSocket; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; public class PoolTest { public static void main(String[] args) throws Exception { //初始化一個連接池(poolsize=15,minsize=1,maxIdleSecond=5,checkInvervalSecond=10) final ThriftTransportPool pool = new ThriftTransportPool(15, 1, 5, 10, getServers()); //模擬客戶端調用 createClients(pool); //等候清理空閑連接 Thread.sleep(30000); //再模擬一批客戶端,驗證連接是否會重新增加 createClients(pool); System.out.println("輸入任意鍵退出..."); System.in.read(); //銷毀連接池 pool.destory(); } private static void createClients(final ThriftTransportPool pool) throws Exception { //模擬5個client端 int clientCount = 5; Thread thread[] = new Thread[clientCount]; FutureTask<String> task[] = new FutureTask[clientCount]; for (int i = 0; i < clientCount; i++) { task[i] = new FutureTask<String>(new Callable<String>() { public String call() throws Exception { TSocket scoket = (TSocket) pool.get();//從池中取一個可用連接 //模擬調用RPC會持續一段時間 System.out.println(Thread.currentThread().getName() + " => " + pool.getWrapperInfo(scoket)); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } pool.release(scoket);//記得每次用完,要將連接釋放(恢復可用狀態) return Thread.currentThread().getName() + " done."; } }); thread[i] = new Thread(task[i], "Thread" + i); } //啟用所有client線程 for (int i = 0; i < clientCount; i++) { thread[i].start(); Thread.sleep(10); } System.out.println("--------------"); //等待所有client調用完成 for (int i = 0; i < clientCount; i++) { System.out.println(task[i].get()); System.out.println(pool); System.out.println("--------------"); thread[i] = null; } } private static List<ServerInfo> getServers() { List<ServerInfo> servers = new ArrayList<ServerInfo>(); servers.add(new ServerInfo("localhost", 9000)); servers.add(new ServerInfo("localhost", 9001)); servers.add(new ServerInfo("localhost", 1002));//這一個故意寫錯的,模擬服務器掛了,連接不上的情景 return servers; } }
輸出:
****************************
開始檢測空閑連接...
當前活動連接數:1
Thread1 => hashCode:919192718,localhost:9001,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:09:59
Thread0 => hashCode:1510835162,localhost:9000,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:09:59
localhost:1002 java.net.ConnectException: Connection refused
Thread2 => hashCode:1466719669,localhost:9000,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:00
Thread3 => hashCode:2080503518,localhost:9001,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:00
localhost:1002 java.net.ConnectException: Connection refused
Thread4 => hashCode:411724643,localhost:9000,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:00
--------------
Thread0 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:3,dead:2
--------------
Thread1 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:3,dead:2
--------------
Thread2 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:2,dead:2
--------------
Thread3 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:1,dead:2
--------------
Thread4 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:0,dead:2
--------------
開始檢測空閑連接...
1510835162,localhost:9000 超過空閑時間閥值被斷開!
919192718,localhost:9001 超過空閑時間閥值被斷開!
1466719669,localhost:9000 超過空閑時間閥值被斷開!
2080503518,localhost:9001 超過空閑時間閥值被斷開!
當前活動連接數:1
開始檢測空閑連接...
當前活動連接數:1
開始檢測空閑連接...
當前活動連接數:1
Thread0 => hashCode:411724643,localhost:9000,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:31
Thread1 => hashCode:1510835162,localhost:9000,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:31
Thread2 => hashCode:919192718,localhost:9001,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:31
Thread3 => hashCode:1466719669,localhost:9000,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:31
Thread4 => hashCode:2080503518,localhost:9001,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:31
--------------
Thread0 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:4,dead:2
--------------
Thread1 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:3,dead:2
--------------
Thread2 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:2,dead:2
--------------
Thread3 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:1,dead:2
--------------
Thread4 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:0,dead:2
--------------
輸入任意鍵退出...
q
連接池被銷毀!
***********************
注意上面高亮顏色的部分,2080503518 連接創建后,后來被check方法主動檢測到空閑斷開,然后第二輪調用時,又重新激活。411724643 則幸免于難,一直戰斗到最后。另外由于故意寫錯了一個server地址,池中始終有二個dead的實例。
值得改進的地方:
主要是公平性的問題,在初始化的時候,如果服務器有3臺,而指定的連接池大小為4,目前的做法是,用4對3取模,所以第1、4個連接實例都是連接到服務器1,get取可用連接的時候也有類似情況,是按pool數組從前向后遍歷的,撈到第1個可用的連接就完事了,這樣永遠是排在List前面的服務器壓力會大一些,這樣有點不太符合負載"均衡"的本意。
不過,這個問題也很好解決,有一個很簡單有效的技巧,實際應用中,服務器列表是從zk上取回來的,取回來后,先對數組做隨機排序,這樣整體看來下,多個連接池總體的連接分布情況就比較平均了。
文章列表