文章出處

如果用戶量增加后為了解決吞吐量問題,需要引入集群,在openfire中提供了集群的支持,另外也實現了兩個集群插件:hazelcast和clustering。為了了解情況集群的工作原理,我就沿著openfire的源代碼進行了分析,也是一次學習的過程。

 
首先理解集群的一些簡單概念
集群的目的是讓多個實例像一個實例一樣運行,這樣就可以通過增長實例來增長計算能力。也就是所謂的分布式計算問題,這其中最為關注的一個特性就是——CAP理論,也就是所謂的一致性、可用性、分區容錯性。集群中最核心解決的問題就是CAP。
CAP綜合理解就是我上面寫的,多個實例像一個實例一樣運行。
 
所以所謂集群就是把一些數據共享或者同步到不同的實例上,這樣系統使用同樣的算法,取的結果當然應該是相同啦。所以一些數據庫的主從復制,緩存數據集群都是類似這種解決方法。只是代碼實現質量和處理規模的問題。
 
有了這個基礎我們再來看看openfire是怎么解決這個問題的。
 
openfire的集群設計
 
1、哪些需要進行集群間的同步
 對于openfire而言,有這幾方面的數據需要進行保證集群間的同步:數據庫存的數據、緩存數據、session。貌似就這些吧?
數據庫
因為對于openfire來說基本上是透明的,所以這塊就交給數據庫本身來實現。
緩存數據
緩存是存在內存里的,所以這部分是要同步的
session
session在openfire并不需要所有實例同步,但是需要做用戶路由緩存,否則發消息時找不到對應的會話。由此用戶路由還是要同步的。
 
2、緩存的設計
  • 緩存接口
openfire里對緩存的數據容器提供了一個包裝接口,這個接口提供了緩存數據的基本方法,用于統一數據操作。
public interface Cache<K,V> extends java.util.Map<K,V>

如果不開啟集群時緩存的默認緩存容器類是:public class DefaultCache<K, V> ,實際上DefaultCache就是用一個Hashmap來存數據的。

 
  • 緩存工廠類
為了保證緩存是可以擴展的,提供了一個工廠類:
public class CacheFactory

 

CacheFactory類中會管理所有的緩存容器,如下代碼:
    /**
     * Returns the named cache, creating it as necessary.
     *
     * @param name         the name of the cache to create.
     * @return the named cache, creating it as necessary.
     */
    @SuppressWarnings("unchecked")
    public static synchronized <T extends Cache> T createCache(String name) {
        T cache = (T) caches.get(name);
        if (cache != null) {
            return cache;
        }
        cache = (T) cacheFactoryStrategy.createCache(name);
 
        log.info("Created cache [" + cacheFactoryStrategy.getClass().getName() + "] for " + name);
 
        return wrapCache(cache, name);
    }
 

上面代碼中會通過緩存工廠策略對象來創建一個緩存容器,最后warpCache方法會將此容器放入到caches中。

 
  • 緩存工廠類的策略
在CacheFactory中默認是使用一個DefaultLocalCacheStrategy來完成緩存創建的。另外還提供了在集群條件下的緩存策略接入。也就是通過實例化不同的策略來切換緩存管理方案。比如后面要提到的hazelcast就是通過這個來替換了本地緩存策略的。從接口的設計上來看,openfire的緩存策略也就是為了集群與非集群的實現。
 
3、集群的設計
在openfire中的集群主要包括:集群管理、數據同步管理、集群計算任務。
 
集群管理者
在openfire中主要是一個類來實現:ClusterManager,在ClusterManager中實現了集群實例的加入、退出管理,因為沒有使用主從結構,所以ClusterManager實現了一個無中心管理,不知道我理解的對不對。因為只要當前實實例啟用了集群,ClusterManager就會主動的加載集群管理并與其他的集群進行同步。
 
  • startup
startup是啟動集群的方法,代碼:
 
    public static synchronized void startup() {
        if (isClusteringEnabled() && !isClusteringStarted()) {
            initEventDispatcher();
            CacheFactory.startClustering();
        }
    }
首先要判斷是否開啟了集群并且當前集群實例未運行時才去啟動。
先是初始化了事件分發器,用于處理集群的同步事情。
 
然后就是調用CacheFactory的startClustering來運行集群。在startClustering方法中主要是這幾個事情:
    • 會使用集群的緩存工廠策略來啟動,同時使自己加入到集群中。
    • 開啟一個線程用于同步緩存的狀態
 
在前面startup中的initEventDispatcher方法,在這里會注冊一個分發線程監聽到集群事件,收到事件后會執行joinedCluster或者leftCluster的操作,joinedCluster就是加入到集群中的意思。
 
在joinedCluster時會將本地的緩存容器都轉換為集群緩存。由此便完成了集群的初始化并加入到集群中了。
 
  • shutdown
shutdown相對簡單點就是退出集群,并且將緩存工廠恢復為本地緩存。
 
同步管理
上面主要是講了如何管理集群,接著比較重要的就是如何在集群間同步數據呢?這部分主要是看具體的分布式計算系統的實現了,從openfire來說就是將數據放到集群緩存中,然后通過集群組件來完成的,比如使用hazelcast。
 
因為使用緩存來解決,所以在CacheFactory中才會有這些么多關于集群的處理代碼,特別是對于緩存策略的切換,以及集群任務處理都在CacheFactory作為接口方法向外公開。這樣也把集群的實現透明了。
 
集群計算任務 
在這之前一直沒有提到集群中的計算問題,因為既然有了集群是不是可以利用集群的優勢進行一些并行計算呢?這部分我倒沒有太過確定,只是看到相關的代碼所以簡單列一下。
 
在CacheFactory類中有幾個方法:doClusterTask、doSynchronousClusterTask,這兩個都是overload方法,參數有所不同而已。這幾個方法就是用于執行一些計算任務的。就看一下doClusterTask:
    public static void doClusterTask(final ClusterTask<?> task) {
        cacheFactoryStrategy.doClusterTask(task);
    }

這里有個限定就是必須是ClusterTask派生的類才行,看看它的定義:

public interface ClusterTask<V> extends Runnable, Externalizable {
 
    V getResult();
 
}

主要是為了異步執行和序列化,異步是因為不能阻塞,而序列化當然就是為了能在集群中傳送。

 
再看CacheFactory的doClusterTask方法可以發現,它只不過是代理了緩存策略工廠的doClusterTask,具體的實現還是要看集群實現的。
 
看一看hazelcast的實現簡單理解openfire集群
在openfire中有集群的插件實現,這里就以hazelcast為例子簡單的做一下分析與學習。
 
  • 緩存策略工廠類(ClusteredCacheFactory)
 
ClusteredCacheFactory實現了CacheFactoryStrategy,代碼如下:
public class ClusteredCacheFactory implements CacheFactoryStrategy {

首先是startCluster方法用于啟動集群,主要完成幾件事情:

    • 設置緩存序列化工具類,ClusterExternalizableUtil。這個是用于集群間數據復制時的序列化工具
    • 設置遠程session定位器,RemoteSessionLocator,因為session不同步,所以它主要是用于多實例間的session讀取
    • 設置遠程包路由器ClusterPacketRouter,這樣就可以在集群中發送消息了
    • 加載Hazelcast的實例設置NodeID,以及設置ClusterListener
 
在前面說起集群啟動時提到了緩存切換,那具體實現時是如何做的呢?
 
因為集群啟動后就要是CacheFactory.joinedCluster方法來加入集群的。看一下加入的代碼:
 
   /**
     * Notification message indicating that this JVM has joined a cluster.
     */
    @SuppressWarnings("unchecked")
    public static synchronized void joinedCluster() {
        cacheFactoryStrategy = clusteredCacheFactoryStrategy;
        // Loop through local caches and switch them to clustered cache (copy content)
        for (Cache cache : getAllCaches()) {
            // skip local-only caches
            if (localOnly.contains(cache.getName())) continue;
            CacheWrapper cacheWrapper = ((CacheWrapper) cache);
            Cache clusteredCache = cacheFactoryStrategy.createCache(cacheWrapper.getName());
            clusteredCache.putAll(cache);
            cacheWrapper.setWrappedCache(clusteredCache);
        }
        clusteringStarting = false;
        clusteringStarted = true;
        log.info("Clustering started; cache migration complete");
    }
 

這里可以看到會讀取所有的緩存容器并一個個的使用Wrapper包裝一下,然后用同樣的緩存名稱去createCache一個新的Cache,這步使用的是切換后的集群緩存策略工廠,也就是說會使用ClusteredCacheFactory去創建新的緩存容器。最后再將cache寫入到新的clusteredCache 里,這樣就完成了緩存的切換。

 
當然這里還是要看一下ClusteredCacheFactory的createCache實現:
    public Cache createCache(String name) {
        // Check if cluster is being started up
        while (state == State.starting) {
            // Wait until cluster is fully started (or failed)
            try {
                Thread.sleep(250);
            }
            catch (InterruptedException e) {
                // Ignore
            }
        }
        if (state == State.stopped) {
            throw new IllegalStateException("Cannot create clustered cache when not in a cluster");
        }
        return new ClusteredCache(name, hazelcast.getMap(name));
    }
 

這里使用的是ClusteredCache,而且最重要的是傳入的第二個map參數換成了hazelcast的了,這樣之后再訪問這個緩存容器時已經不再是原先的本地Cache了,已經是hazelcast的map對象。hazelcast會自動對map的數據進行同步管理,這也就完成了緩存同步的功能。

 

  • 集群計算

那就看hazelcast的實現吧,在ClusteredCacheFactory中doClusterTask舉個例子吧:

    public void doClusterTask(final ClusterTask task) {
        if (cluster == null) { return; }
        Set<Member> members = new HashSet<Member>();
        Member current = cluster.getLocalMember();
        for(Member member : cluster.getMembers()) {
            if (!member.getUuid().equals(current.getUuid())) {
                members.add(member);
            }
        }
        if (members.size() > 0) {
            // Asynchronously execute the task on the other cluster members
            logger.debug("Executing asynchronous MultiTask: " + task.getClass().getName());
            hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMembers(
                new CallableTask<Object>(task), members);
        } else {
               logger.warn("No cluster members selected for cluster task " + task.getClass().getName());
        }
    }
 

過程就是,先獲取到集群中的實例成員,當然要排除自己。然后hazelcast提供了ExecutorService來執行這個task,方法就是submiteToMembers。這樣就提交了一個運算任務。只不過具體是如何分配計算并匯集結果倒真不太清楚。

 

總結

花了一天時間看了一下openfire的集群,順手就寫了一篇文章,確實也到了一些東西。和一些網友溝通中好像目前大家更愿意使用redies來完成緩存共享,以及通過代理來實現集群,而不愿意使用openfire的集群方案。這部分我沒有遇到如何大的并發量需求確實不知道區別在哪里。以后有機會還是動手試試寫一個redies的插件。
 
 
原創聲明:本文為原創內容,轉載請注明。http://www.cnblogs.com/5207/p/5705092.html
 

文章列表


不含病毒。www.avast.com
arrow
arrow
    全站熱搜
    創作者介紹
    創作者 大師兄 的頭像
    大師兄

    IT工程師數位筆記本

    大師兄 發表在 痞客邦 留言(0) 人氣()