Cowboy.WebSockets 是一個托管在 GitHub 上的基于 .NET/C# 實現的開源 WebSocket 網絡庫,其完整的實現了 RFC 6455 (The WebSocket Protocol) 協議標準,并部分實現了 RFC 7692 (Compression Extensions for WebSocket) 協議標準。
WebSocket 可理解為建立在 TCP 連接通道上的更進一步的握手,并確定了消息封裝格式。
通過定義控制幀 (Control Frame) 和數據幀 (Data Frame) 來控制通道內的通信和數據傳輸,下圖用使用 ABNF 格式描述了幀頭部的格式。
Cowboy.WebSockets 中對于 WebSocket 的 Client/Server 分別做了實現,分別對應代碼中的:
Cowboy.WebSockets 的內部實現是基于 Cowboy.Sockets 中的 TAP 模式的 AsyncTcpSocketServer 和 AsyncTcpSocketClient 。關于 Cowboy.Sockets 可以參考文章《C#高性能TCP服務的多種實現方式》。
可通過 NuGet 查找 Cowboy 來獲取 nuget 包。
WebSocket 服務端應用
實現 AsyncWebSocketServerModule 抽象類,其中 ModulePath 對應著 "ws://host:port/path" 中的 path 部分。可以實現多個 Module,將多個 Module 注入到 AsyncWebSocketServerModuleCatalog 中,或者采用反射機制等自動發現 Module。
public class TestWebSocketModule : AsyncWebSocketServerModule { public TestWebSocketModule() : base(@"/test") { } public override async Task OnSessionStarted(AsyncWebSocketSession session) { Console.WriteLine(string.Format("WebSocket session [{0}] has connected.", session.RemoteEndPoint)); await Task.CompletedTask; } public override async Task OnSessionTextReceived(AsyncWebSocketSession session, string text) { Console.Write(string.Format("WebSocket session [{0}] received Text --> ", session.RemoteEndPoint)); Console.WriteLine(string.Format("{0}", text)); await session.SendTextAsync(text); } public override async Task OnSessionBinaryReceived(AsyncWebSocketSession session, byte[] data, int offset, int count) { var text = Encoding.UTF8.GetString(data, offset, count); Console.Write(string.Format("WebSocket session [{0}] received Binary --> ", session.RemoteEndPoint)); Console.WriteLine(string.Format("{0}", text)); await session.SendBinaryAsync(Encoding.UTF8.GetBytes(text)); } public override async Task OnSessionClosed(AsyncWebSocketSession session) { Console.WriteLine(string.Format("WebSocket session [{0}] has disconnected.", session.RemoteEndPoint)); await Task.CompletedTask; } }
實例化 AsyncWebSocketServer,并將 AsyncWebSocketServerModuleCatalog 實例注入,即可啟動 WebSocket 的服務端監聽。
class Program { static AsyncWebSocketServer _server; static void Main(string[] args) { NLogLogger.Use(); try { var catalog = new AsyncWebSocketServerModuleCatalog(); catalog.RegisterModule(new TestWebSocketModule()); var config = new AsyncWebSocketServerConfiguration(); //config.SslEnabled = true; //config.SslServerCertificate = new System.Security.Cryptography.X509Certificates.X509Certificate2(@"D:\\Cowboy.pfx", "Cowboy"); //config.SslPolicyErrorsBypassed = true; _server = new AsyncWebSocketServer(22222, catalog, config); _server.Listen(); Console.WriteLine("WebSocket server has been started on [{0}].", _server.ListenedEndPoint); Console.WriteLine("Type something to send to clients..."); while (true) { try { string text = Console.ReadLine(); if (text == "quit") break; Task.Run(async () => { //await _server.BroadcastText(text); //Console.WriteLine("WebSocket server [{0}] broadcasts text -> [{1}].", _server.ListenedEndPoint, text); await _server.BroadcastBinaryAsync(Encoding.UTF8.GetBytes(text)); Console.WriteLine("WebSocket server [{0}] broadcasts binary -> [{1}].", _server.ListenedEndPoint, text); }); } catch (Exception ex) { Console.WriteLine(ex.Message); } } _server.Shutdown(); Console.WriteLine("WebSocket server has been stopped on [{0}].", _server.ListenedEndPoint); } catch (Exception ex) { Logger.Get<Program>().Error(ex.Message, ex); } Console.ReadKey(); } }
WebSocket 客戶端應用
客戶端側在實例化 AsyncWebSocketClient 時有兩種方式:
- 實現 IAsyncWebSocketClientMessageDispatcher 接口;
- 直接構造函數注入接受各種事件的 Func<> 實現;
public interface IAsyncWebSocketClientMessageDispatcher { Task OnServerConnected(AsyncWebSocketClient client); Task OnServerTextReceived(AsyncWebSocketClient client, string text); Task OnServerBinaryReceived(AsyncWebSocketClient client, byte[] data, int offset, int count); Task OnServerDisconnected(AsyncWebSocketClient client); Task OnServerFragmentationStreamOpened(AsyncWebSocketClient client, byte[] data, int offset, int count); Task OnServerFragmentationStreamContinued(AsyncWebSocketClient client, byte[] data, int offset, int count); Task OnServerFragmentationStreamClosed(AsyncWebSocketClient client, byte[] data, int offset, int count); }
下面的 DEMO 采用了方式二。
class Program { static AsyncWebSocketClient _client; static void Main(string[] args) { NLogLogger.Use(); Task.Run(async () => { try { var config = new AsyncWebSocketClientConfiguration(); //config.SslTargetHost = "Cowboy"; //config.SslClientCertificates.Add(new System.Security.Cryptography.X509Certificates.X509Certificate2(@"D:\\Cowboy.cer")); //config.SslPolicyErrorsBypassed = true; //var uri = new Uri("ws://echo.websocket.org/"); //var uri = new Uri("wss://127.0.0.1:22222/test"); var uri = new Uri("ws://127.0.0.1:22222/test"); _client = new AsyncWebSocketClient(uri, OnServerTextReceived, OnServerBinaryReceived, OnServerConnected, OnServerDisconnected, config); await _client.Connect(); Console.WriteLine("WebSocket client has connected to server [{0}].", uri); Console.WriteLine("Type something to send to server..."); while (_client.State == WebSocketState.Open) { try { string text = Console.ReadLine(); if (text == "quit") break; Task.Run(async () => { //await _client.SendText(text); //Console.WriteLine("Client [{0}] send text -> [{1}].", _client.LocalEndPoint, text); await _client.SendBinaryAsync(Encoding.UTF8.GetBytes(text)); Console.WriteLine("Client [{0}] send binary -> [{1}].", _client.LocalEndPoint, text); }).Forget(); } catch (Exception ex) { Console.WriteLine(ex.Message); } } await _client.Close(WebSocketCloseCode.NormalClosure); Console.WriteLine("WebSocket client has disconnected from server [{0}].", uri); } catch (Exception ex) { Logger.Get<Program>().Error(ex.Message, ex); } }).Wait(); Console.ReadKey(); } private static async Task OnServerConnected(AsyncWebSocketClient client) { Console.WriteLine(string.Format("WebSocket server [{0}] has connected.", client.RemoteEndPoint)); await Task.CompletedTask; } private static async Task OnServerTextReceived(AsyncWebSocketClient client, string text) { Console.Write(string.Format("WebSocket server [{0}] received Text --> ", client.RemoteEndPoint)); Console.WriteLine(string.Format("{0}", text)); await Task.CompletedTask; } private static async Task OnServerBinaryReceived(AsyncWebSocketClient client, byte[] data, int offset, int count) { var text = Encoding.UTF8.GetString(data, offset, count); Console.Write(string.Format("WebSocket server [{0}] received Binary --> ", client.RemoteEndPoint)); Console.WriteLine(string.Format("{0}", text)); await Task.CompletedTask; } private static async Task OnServerDisconnected(AsyncWebSocketClient client) { Console.WriteLine(string.Format("WebSocket server [{0}] has disconnected.", client.RemoteEndPoint)); await Task.CompletedTask; } }
相關資料
- RFC6455: The WebSocket Protocol
- RFC7692: Compression Extensions for WebSocket
- IANA: WebSocket Opcode Registry
- LZ77 and LZ78
- LZ77 Compression Algorithm
- DeflateStream Class
- zlib 1.2.8 Manual
- WebSocket Extensions in Tyrus
本篇文章《Cowboy 開源 WebSocket 網絡庫》由 Dennis Gao 發表自博客園個人博客,未經作者本人同意禁止以任何的形式轉載,任何自動的或人為的爬蟲轉載行為均為耍流氓。
文章列表
留言列表