如果用戶量增加后為了解決吞吐量問題,需要引入集群,在openfire中提供了集群的支持,另外也實現了兩個集群插件:hazelcast和clustering。為了了解情況集群的工作原理,我就沿著openfire的源代碼進行了分析,也是一次學習的過程。
數據庫因為對于openfire來說基本上是透明的,所以這塊就交給數據庫本身來實現。緩存數據緩存是存在內存里的,所以這部分是要同步的sessionsession在openfire并不需要所有實例同步,但是需要做用戶路由緩存,否則發消息時找不到對應的會話。由此用戶路由還是要同步的。
- 緩存接口
public interface Cache<K,V> extends java.util.Map<K,V>
如果不開啟集群時緩存的默認緩存容器類是:public class DefaultCache<K, V> ,實際上DefaultCache就是用一個Hashmap來存數據的。
- 緩存工廠類
public class CacheFactory
/** * Returns the named cache, creating it as necessary. * * @param name the name of the cache to create. * @return the named cache, creating it as necessary. */ @SuppressWarnings("unchecked") public static synchronized <T extends Cache> T createCache(String name) { T cache = (T) caches.get(name); if (cache != null) { return cache; } cache = (T) cacheFactoryStrategy.createCache(name); log.info("Created cache [" + cacheFactoryStrategy.getClass().getName() + "] for " + name); return wrapCache(cache, name); }
上面代碼中會通過緩存工廠策略對象來創建一個緩存容器,最后warpCache方法會將此容器放入到caches中。
- 緩存工廠類的策略
- startup
public static synchronized void startup() { if (isClusteringEnabled() && !isClusteringStarted()) { initEventDispatcher(); CacheFactory.startClustering(); } }
- 會使用集群的緩存工廠策略來啟動,同時使自己加入到集群中。
- 開啟一個線程用于同步緩存的狀態
- shutdown
public static void doClusterTask(final ClusterTask<?> task) {
cacheFactoryStrategy.doClusterTask(task);
}
這里有個限定就是必須是ClusterTask派生的類才行,看看它的定義:
public interface ClusterTask<V> extends Runnable, Externalizable {
V getResult();
}
主要是為了異步執行和序列化,異步是因為不能阻塞,而序列化當然就是為了能在集群中傳送。
- 緩存策略工廠類(ClusteredCacheFactory)
public class ClusteredCacheFactory implements CacheFactoryStrategy {
首先是startCluster方法用于啟動集群,主要完成幾件事情:
-
- 設置緩存序列化工具類,ClusterExternalizableUtil。這個是用于集群間數據復制時的序列化工具
- 設置遠程session定位器,RemoteSessionLocator,因為session不同步,所以它主要是用于多實例間的session讀取
- 設置遠程包路由器ClusterPacketRouter,這樣就可以在集群中發送消息了
- 加載Hazelcast的實例設置NodeID,以及設置ClusterListener
/** * Notification message indicating that this JVM has joined a cluster. */ @SuppressWarnings("unchecked") public static synchronized void joinedCluster() { cacheFactoryStrategy = clusteredCacheFactoryStrategy; // Loop through local caches and switch them to clustered cache (copy content) for (Cache cache : getAllCaches()) { // skip local-only caches if (localOnly.contains(cache.getName())) continue; CacheWrapper cacheWrapper = ((CacheWrapper) cache); Cache clusteredCache = cacheFactoryStrategy.createCache(cacheWrapper.getName()); clusteredCache.putAll(cache); cacheWrapper.setWrappedCache(clusteredCache); } clusteringStarting = false; clusteringStarted = true; log.info("Clustering started; cache migration complete"); }
這里可以看到會讀取所有的緩存容器并一個個的使用Wrapper包裝一下,然后用同樣的緩存名稱去createCache一個新的Cache,這步使用的是切換后的集群緩存策略工廠,也就是說會使用ClusteredCacheFactory去創建新的緩存容器。最后再將cache寫入到新的clusteredCache 里,這樣就完成了緩存的切換。
public Cache createCache(String name) { // Check if cluster is being started up while (state == State.starting) { // Wait until cluster is fully started (or failed) try { Thread.sleep(250); } catch (InterruptedException e) { // Ignore } } if (state == State.stopped) { throw new IllegalStateException("Cannot create clustered cache when not in a cluster"); } return new ClusteredCache(name, hazelcast.getMap(name)); }
這里使用的是ClusteredCache,而且最重要的是傳入的第二個map參數換成了hazelcast的了,這樣之后再訪問這個緩存容器時已經不再是原先的本地Cache了,已經是hazelcast的map對象。hazelcast會自動對map的數據進行同步管理,這也就完成了緩存同步的功能。
- 集群計算
那就看hazelcast的實現吧,在ClusteredCacheFactory中doClusterTask舉個例子吧:
public void doClusterTask(final ClusterTask task) { if (cluster == null) { return; } Set<Member> members = new HashSet<Member>(); Member current = cluster.getLocalMember(); for(Member member : cluster.getMembers()) { if (!member.getUuid().equals(current.getUuid())) { members.add(member); } } if (members.size() > 0) { // Asynchronously execute the task on the other cluster members logger.debug("Executing asynchronous MultiTask: " + task.getClass().getName()); hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMembers( new CallableTask<Object>(task), members); } else { logger.warn("No cluster members selected for cluster task " + task.getClass().getName()); } }
過程就是,先獲取到集群中的實例成員,當然要排除自己。然后hazelcast提供了ExecutorService來執行這個task,方法就是submiteToMembers。這樣就提交了一個運算任務。只不過具體是如何分配計算并匯集結果倒真不太清楚。
總結
文章列表