苦惱中尋找方法
在開始做即時通信時就知道了消息回執這個概念,目的是解決通訊消息因為各種原因未送達對方而提供的一種保障機制。產生這個問題的原因主要是網絡不穩定、服務器或者客戶端一些異常導致沒有接收到消息。
因為產品中使用的是openfire和spark的組合,所以一直就想在這個范圍內找一個現成的方案,只不過通過閱讀一些開發者的總結提到說openfire沒有消息回執的方案。于是也看到了別人的方案:
- 發送者發送消息給服務端
- 服務端接收到消息后發送回執給發送者
- 發送者確認收到則結束,如果未收到就重發
- 服務端將消息記錄一下,并推送給接收者,等待接收者的回執
- 接收者接收消息并發回執給服務端
- 服務端接收回執刪除掉消息回執記錄,表示已經發送完畢
- 如果一定時間內沒收到重新推送消息給客戶端
- 接收者如果收到消息進行去重處理,如果不重復的執行第5-6步
這個流程基本就是完成了消息回執的功能,核心點就是在于發送者-服務端-接收者三者之間建立一個消息確認機制。這個方案如果要自己實現的話需要定制一套消息協議了,這個實現方法比較多,對于XMPP來說發message、iq都可以。當然也可以看到這套方案會帶來問題,就是每條消息都要執行一套確認,所以會增大流量和計算量。
流量對于移動網絡來說還是很重要的,而且移動網絡因為移動的原因很容易出現不穩定,所以自然這部分的流量可能會更大些。但是也正因為移動網絡的不穩定就更需要消息回執來確認消息狀態了,解決丟包的問題。
于是這就變成了一個雙向的問題,只要能是盡量減少消息的體積以此來減少流量吧。
只不過對于我來說方法有了,怎么做是個問題,畢竟要實現一套這樣的功能,還要保證穩定,否則這個消息回執功能本身不穩定還不如不要呢。基本的設計思路也有了:
- 客戶端維護兩個列表(發送回執隊列和接收回執隊列),用于保存發送/接收消息回執情況
- 服務端也維護一個列表,用于記錄消息回執的接收與發送情況,服務端對列表進行超時檢查,如果回執未發送的重發消息,如果收到重復的消息則去重處理
- 客戶端定期檢查兩個列表里的回執狀態,如果未收到回執的要做重發處理,如果收到的是重復的回執則進行去重處理
方案差不多有了,只不過在檢閱網上資料時有了新的發現。
柳暗花明
在看別人的總結時發現XMPP有擴展協議是支持消息回執功能的,就是XEP-0184.了解下來這個協議確實是一套消息回執的實現方法,但是呢。。
- 它必須在openfire3.9以上版本才支持,這個可以在openfire的版本日志里可以看到
- 它只是一種端到端的消息回執,而且只有接收端收到消息后才會返回回執,這樣對于發送者來說很麻煩,如果接收者不在線無法得知消息是否發出了,因為服務端不會告知發送者已經拿到消息了。只有等到接收者上線獲取了消息后,由接收者發送一條確認的回執給接收者
這個看起來很美好的東西,發現不大好用啊。于是看了自己的openfire是4以上版本的,所以確實支持。然后檢查了客戶端使用的smack包里確實有XEP-0184的實現。
//這個類是一個統一調用的類
org.jivesoftware.smackx.receipts.DeliveryReceiptManager
//這個是發送者發送一個回執請求,告知客戶端我要消息回執
org.jivesoftware.smackx.receipts.DeliveryReceiptRequest
//這個是接收者收到消息后返回的回執確認
org.jivesoftware.smackx.receipts.DeliveryReceipt
//這個是用于發送者監聽接收者發來回執確認的事件
public interface ReceiptReceivedListener {
/**
* Callback invoked when a new receipt got received.
* <p>
* {@code receiptId} correspondents to the message ID, which can be obtained with
* {@link org.jivesoftware.smack.packet.Stanza#getStanzaId()}.
* </p>
*
* @param fromJid the jid that send this receipt
* @param toJid the jid which received this receipt
* @param receiptId the message ID of the stanza(/packet) which has been received and this receipt is for
* @param receipt the receipt
*/
void onReceiptReceived(String fromJid, String toJid, String receiptId, Stanza receipt);
}
有了這三個家伙確實是可以做一套消息確認的機制,但是要在客戶端發送消息時發送一個DeliveryReceiptRequest,然后等待接收者發送回來的消息確認DeliveryReceipt。
public class ChatDemo {
public static void main(String[] args) {
AbstractXMPPConnection connection = SesseionHelper.newConn("192.168.11.111", 5222, "abc", "user1", "pwd1");
//在發消息之前通過DeliveryReceiptManager訂閱回執
DeliveryReceiptManager drm = DeliveryReceiptManager.getInstanceFor(connection);
drm.addReceiptReceivedListener(new ReceiptReceivedListener() {
@Override
public void onReceiptReceived(String fromJid, String toJid,
String receiptId, Stanza receipt) {
System.err.println((new Date()).toString()+ " - drm:" + receipt.toXML());
}
});
Message msg = new Message("100069@bkos");
msg.setBody("回復我的消息1.");
msg.setType(Type.chat);
//將消息放到DeliveryReceiptRequest中,這樣就可以在發送Message后發送回執請求
DeliveryReceiptRequest.addTo(msg);
try {
connection.sendStanza(msg);
} catch (NotConnectedException e) {
e.printStackTrace();
}
connection.addAsyncStanzaListener(new StanzaListener() {
@Override
public void processPacket(Stanza packet) throws NotConnectedException {
System.out.println((new Date()).toString()+ "- processPacket:" + packet.toXML());
}
}, new StanzaFilter() {
@Override
public boolean accept(Stanza stanza) {
return stanza instanceof Message;
}
});
while (true) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
上面代碼是發送者要完成的代碼,這里并沒有看到接收者返回回執的過程,這個實現在DeliveryReceiptManager里完成的。
private DeliveryReceiptManager(XMPPConnection connection) {
super(connection);
ServiceDiscoveryManager sdm = ServiceDiscoveryManager.getInstanceFor(connection);
sdm.addFeature(DeliveryReceipt.NAMESPACE);
// Add the packet listener to handling incoming delivery receipts
connection.addAsyncStanzaListener(new StanzaListener() {
@Override
public void processPacket(Stanza packet) throws NotConnectedException {
DeliveryReceipt dr = DeliveryReceipt.from((Message) packet);
// notify listeners of incoming receipt
for (ReceiptReceivedListener l : receiptReceivedListeners) {
l.onReceiptReceived(packet.getFrom(), packet.getTo(), dr.getId(), packet);
}
}
}, MESSAGES_WITH_DELIVERY_RECEIPT);
// Add the packet listener to handle incoming delivery receipt requests
connection.addAsyncStanzaListener(new StanzaListener() {
@Override
public void processPacket(Stanza packet) throws NotConnectedException {
final String from = packet.getFrom();
final XMPPConnection connection = connection();
switch (autoReceiptMode) {
case disabled:
return;
case ifIsSubscribed:
if (!Roster.getInstanceFor(connection).isSubscribedToMyPresence(from)) {
return;
}
break;
case always:
break;
}
final Message messageWithReceiptRequest = (Message) packet;
Message ack = receiptMessageFor(messageWithReceiptRequest);
if (ack == null) {
LOGGER.warning("Received message stanza with receipt request from '" + from
+ "' without a stanza ID set. Message: " + messageWithReceiptRequest);
return;
}
connection.sendStanza(ack);
}
}, MESSAGES_WITH_DEVLIERY_RECEIPT_REQUEST);
}
DeliveryReceiptManager里會訂閱消息事件,當收到消息是需要回執時發送ack包,這里的ack就是帶了DeliveryReceipt的一個消息包。
好了,這個XEP-0184差不多看明白了,但并不是想要的那種消息回執。它更像是手機消息或者郵件的那種接收確認回執。是端到端的一種確認機制。但是如果在服務端對這個消息做一些截取處理,做一個中間狀態也是可以達到我們要的消息回執的狀態的。
做法就是在服務端截取XEP-0184的消息,如果是請求消息DeliveryReceiptRequest則在服務端保存記錄,同時服務端發送DeliveryReceipt(ack)給發送方。然后客戶端照樣接收消息返回ack后服務端截獲更新服務端記錄即可。
這種做法就是借用xep-0184協議來完成消息回執的功能。
真正的又一村
也不知道是否意外,在看一篇博文時發現了一個更有意思東西,就是XEP-0198.
它是干啥的呢?
流管理背后的基本概念是,初始化的實體(一個服務端或者客戶端)和接收的實體(一個服務端)可以為更靈活的管理stream交換命令.下面兩條流管理的特性被廣泛的關注,因為它們可以提高網絡的可靠性和終端用戶的體驗:
- Stanza確認(Stanza Acknowledgements) – 能夠確認一段或者一系列Stanza是否已被某一方接收.
- 流恢復(Stream Resumption) – 能夠迅速的恢復(resume)一個已經被終止的流.
這就突然發現又一村原來在這啊,XMPP畢竟最開始是基于TCP協議的,可以在流的基礎上完成消息到達回執。它的特征也表明了這點,一是可以做消息確認,保證消息是否被另一方接收。另外一點就是在消息未確認接收時可以做恢復(也就是重試)。這不就完全滿足我們消息回執的要求了嗎?
只不過在smack要4.1.x以上版本,而且默認是不開啟流管理功能的,所以要手動的開啟一下,剩下的事情由smack和openfire來完成。在建立TCPConnection前執行正面這句:
XMPPTCPConnection.setUseStreamManagementResumptionDefault(true);
這個代碼就是說開啟流恢復,當然流恢復開啟了Stanza確認也是要開啟的,可以看setUseStreamManagementResumptionDefault的實現,里面調用setUseStreamManagementDefault:
public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) {
if (useSmResumptionDefault) {
// Also enable SM is resumption is enabled
setUseStreamManagementDefault(useSmResumptionDefault);
}
XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault;
}
openfire服務端默認是開啟這個功能的,在openfire.xml里有設置:
<!-- XEP-0198 properties -->
<stream>
<management>
<!-- Whether stream management is offered to clients by server. -->
<active>true</active>
<!-- Number of stanzas sent to client before a stream management
acknowledgement request is made. -->
<requestFrequency>5</requestFrequency>
</management>
</stream>
好了,這樣就完成了消息回執的功能了。沒想到XMPP協議已經支持了整個流程,省去了很多事情,同時openfire中websocket也是支持xep-198,所以手機端應該也是可以支持。
參考與引用
http://developerworks.github.io/2014/10/03/xmpp-xep-0198-stream-management/
http://blog.csdn.net/chszs/article/details/48576553
文章列表