Class containerClass = loader.loadClass("org.jivesoftware.openfire.XMPPServer");
containerClass.newInstance();
- 初始化配置參數
- 檢查是否需要安裝
- 初始化Module
- 啟動統計模塊
- 啟動plugin
if (!setupMode) { verifyDataSource(); // First load all the modules so that modules may access other modules while // being initialized loadModules(); // Initize all the modules initModules(); // Start all the modules startModules(); }
public interface Module { /** * Returns the name of the module for display in administration interfaces. * * @return The name of the module. */ String getName(); /** * Initialize the module with the container. * Modules may be initialized and never started, so modules * should be prepared for a call to destroy() to follow initialize(). * * @param server the server hosting this module. */ void initialize(XMPPServer server); /** * Start the module (must return quickly). Any long running * operations should spawn a thread and allow the method to return * immediately. */ void start(); /** * Stop the module. The module should attempt to free up threads * and prepare for either another call to initialize (reconfigure the module) * or for destruction. */ void stop(); /** * Module should free all resources and prepare for deallocation. */ void destroy(); }
// Load this module always last since we don't want to start listening for clients // before the rest of the modules have been started loadModule(ConnectionManagerImpl.class.getName());
private final ConnectionListener clientListener; private final ConnectionListener clientSslListener; private final ConnectionListener boshListener; private final ConnectionListener boshSslListener; private final ConnectionListener serverListener; private final ConnectionListener componentListener; private final ConnectionListener componentSslListener; private final ConnectionListener connectionManagerListener; // Also known as 'multiplexer' private final ConnectionListener connectionManagerSslListener; // Also known as 'multiplexer' private final ConnectionListener webAdminListener; private final ConnectionListener webAdminSslListener;
- client:表示客戶端連接
- bosh:就是HTTP綁定的連接
- server:服務器到服務器的socket連接
- component:組件到服務器的連接
- connectionManager:是指通過connectionManager連接器過來的連接
- webAdmin:是指web控制臺的連接
if ( getType() == ConnectionType.SOCKET_S2S ) { connectionAcceptor = new LegacyConnectionAcceptor( generateConnectionConfiguration() ); } else { connectionAcceptor = new MINAConnectionAcceptor( generateConnectionConfiguration() ); }




@Override StanzaHandler createStanzaHandler(NIOConnection connection) { return new ClientStanzaHandler(XMPPServer.getInstance().getPacketRouter(), connection); }
@Override public void sessionOpened(IoSession session) throws Exception { // Create a new XML parser for the new connection. The parser will be used by the XMPPDecoder filter. final XMLLightweightParser parser = new XMLLightweightParser(StandardCharsets.UTF_8); session.setAttribute(XML_PARSER, parser); // Create a new NIOConnection for the new session final NIOConnection connection = createNIOConnection(session); session.setAttribute(CONNECTION, connection); session.setAttribute(HANDLER, createStanzaHandler(connection)); // Set the max time a connection can be idle before closing it. This amount of seconds // is divided in two, as Openfire will ping idle clients first (at 50% of the max idle time) // before disconnecting them (at 100% of the max idle time). This prevents Openfire from // removing connections without warning. final int idleTime = getMaxIdleTime() / 2; if (idleTime > 0) { session.getConfig().setIdleTime(IdleStatus.READER_IDLE, idleTime); } }
這樣每一個session在打開時都會設置handler,而具體的handler由各個派生類創建返回。這里的StanzHandler就是Openfire里的數據包處理單元。和connection類型一樣,包處理也是對應的幾個類:


@Override public void messageReceived(IoSession session, Object message) throws Exception { // Get the stanza handler for this session StanzaHandler handler = (StanzaHandler) session.getAttribute(HANDLER); // Get the parser to use to process stanza. For optimization there is going // to be a parser for each running thread. Each Filter will be executed // by the Executor placed as the first Filter. So we can have a parser associated // to each Thread final XMPPPacketReader parser = PARSER_CACHE.get(); // Update counter of read btyes updateReadBytesCounter(session); //System.out.println("RCVD: " + message); // Let the stanza handler process the received stanza try { handler.process((String) message, parser); } catch (Exception e) { Log.error("Closing connection due to error while processing message: " + message, e); final Connection connection = (Connection) session.getAttribute(CONNECTION); if ( connection != null ) { connection.close(); } } }
在接收到數據包后獲取到StanzaHandler,然后調用了它的process方法,也就是讓實際的包處理者去處理數據。這樣就回到了StanzeHanler,以ClientStanzaHandler為例子。只不過這個派生類中沒有重寫process方法,也就是說要看父類的實現:
public void process(String stanza, XMPPPacketReader reader) throws Exception { boolean initialStream = stanza.startsWith("<stream:stream") || stanza.startsWith("<flash:stream"); if (!sessionCreated || initialStream) { if (!initialStream) { .......... // Found an stream:stream tag... if (!sessionCreated) { sessionCreated = true; MXParser parser = reader.getXPPParser(); parser.setInput(new StringReader(stanza)); createSession(parser); } .......... return; } .......... }
由于代碼較多,我省略了一些代碼。看到這應該明白了吧,對于當前的連接沒有創建Openfire的session對象時,會進行創建過程createSession,對于不同的StanzeHandler會有些不一樣,這里ClientStanzaHandler的實現就是把創建好的session放到本地的LocalClientSession中:
@Override boolean createSession(String namespace, String serverName, XmlPullParser xpp, Connection connection) throws XmlPullParserException { if ("jabber:client".equals(namespace)) { // The connected client is a regular client so create a ClientSession session = LocalClientSession.createSession(serverName, xpp, connection); return true; } return false; }
public ClientSession getSession(JID from) { // Return null if the JID is null or belongs to a foreign server. If the server is // shutting down then serverName will be null so answer null too in this case. if (from == null || serverName == null || !serverName.equals(from.getDomain())) { return null; } // Initially Check preAuthenticated Sessions if (from.getResource() != null) { ClientSession session = localSessionManager.getPreAuthenticatedSessions().get(from.getResource()); if (session != null) { return session; } } if (from.getResource() == null || from.getNode() == null) { return null; } return routingTable.getClientRoute(from); }
先是獲取本地的session,如果能找到直接返回,找不到則跳到routingTable里獲取客戶端的路由信息。
@Override public ClientSession getClientRoute(JID jid) { // Check if this session is hosted by this cluster node ClientSession session = (ClientSession) localRoutingTable.getRoute(jid.toString()); if (session == null) { // The session is not in this JVM so assume remote RemoteSessionLocator locator = server.getRemoteSessionLocator(); if (locator != null) { // Check if the session is hosted by other cluster node ClientRoute route = usersCache.get(jid.toString()); if (route == null) { route = anonymousUsersCache.get(jid.toString()); } if (route != null) { session = locator.getClientSession(route.getNodeID().toByteArray(), jid); } } } return session; }
這里更直接的可以看到,查找本地路由不null則會通過RemoteSessionLocator來完成。當然這里最大的奧秘其實是usersCache和anonymousUsersCache這兩個cache。之前寫的集群源碼分析中提過,最終openfire集群后會對緩存進行同步,這樣每臺服務器上都會有緩存的副本。所以usersCache是擁有所有用戶信息的,有了user的信息就有了jid的信息,這樣不管是哪臺服務器都可以對數據包處理并發送給客戶端。
public Collection<ClientSession> getSessions() { return routingTable.getClientsRoutes(false); }
其實就是訪問路由表,因為路由表里有所有的cache,和獲取單個的session不一樣,需要對所有的路由都遍歷返回。
@Override public Collection<ClientSession> getClientsRoutes(boolean onlyLocal) { // Add sessions hosted by this cluster node Collection<ClientSession> sessions = new ArrayList<ClientSession>(localRoutingTable.getClientRoutes()); if (!onlyLocal) { // Add sessions not hosted by this JVM RemoteSessionLocator locator = server.getRemoteSessionLocator(); if (locator != null) { // Add sessions of non-anonymous users hosted by other cluster nodes for (Map.Entry<String, ClientRoute> entry : usersCache.entrySet()) { ClientRoute route = entry.getValue(); if (!server.getNodeID().equals(route.getNodeID())) { sessions.add(locator.getClientSession(route.getNodeID().toByteArray(), new JID(entry.getKey()))); } } // Add sessions of anonymous users hosted by other cluster nodes for (Map.Entry<String, ClientRoute> entry : anonymousUsersCache.entrySet()) { ClientRoute route = entry.getValue(); if (!server.getNodeID().equals(route.getNodeID())) { sessions.add(locator.getClientSession(route.getNodeID().toByteArray(), new JID(entry.getKey()))); } } } } return sessions; }
文章列表