RPCClientSocketTransport.cs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.Linq;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using Expressive;
  8. using InABox.Clients;
  9. using InABox.Core;
  10. using WebSocket4Net;
  11. namespace InABox.Rpc
  12. {
  13. public class RpcClientSocketTransport : RpcClientTransport, IDisposable
  14. {
  15. private WebSocket? _socket;
  16. private Task? readTask;
  17. private ManualResetEventSlim openEvent = new ManualResetEventSlim();
  18. private string? _host;
  19. private bool _connected = false;
  20. private string[] _urls;
  21. public string? Host => _host;
  22. public RpcClientSocketTransport(string[] urls)
  23. {
  24. _urls = urls;
  25. }
  26. // Returns true if we are to continue the receive loop.
  27. /*private bool DoReceive()
  28. {
  29. if(_socket != null)
  30. {
  31. try
  32. {
  33. var buffer = new ArraySegment<byte>(new byte[1024]);
  34. using (var ms = new MemoryStream())
  35. {
  36. WebSocketReceiveResult result;
  37. do
  38. {
  39. var task = _socket.ReceiveAsync(buffer, _tokenSource.Token);
  40. task.Wait();
  41. result = task.Result;
  42. ms.Write(buffer.Array, buffer.Offset, result.Count);
  43. } while (!result.EndOfMessage);
  44. ms.Seek(0, SeekOrigin.Begin);
  45. if (result.MessageType == WebSocketMessageType.Close)
  46. {
  47. if (result.CloseStatus == WebSocketCloseStatus.NormalClosure)
  48. {
  49. Task.Run(() =>
  50. {
  51. DoClose(RpcTransportCloseEventType.Closed);
  52. });
  53. return false;
  54. }
  55. else
  56. {
  57. DoException(new Exception(result.CloseStatusDescription));
  58. Task.Run(() =>
  59. {
  60. DoClose(RpcTransportCloseEventType.Error);
  61. });
  62. return false;
  63. }
  64. }
  65. else
  66. {
  67. RpcMessage? rpcMessage = null;
  68. if (result.MessageType == WebSocketMessageType.Binary)
  69. {
  70. rpcMessage = Serialization.ReadBinary<RpcMessage>(ms, BinarySerializationSettings.Latest);
  71. }
  72. else if (result.MessageType == WebSocketMessageType.Text)
  73. {
  74. rpcMessage = Serialization.Deserialize<RpcMessage>(ms);
  75. }
  76. Accept(rpcMessage);
  77. }
  78. }
  79. }
  80. catch(Exception e)
  81. {
  82. DoException(e);
  83. if (!IsConnected())
  84. {
  85. Task.Run(() =>
  86. {
  87. DoClose(RpcTransportCloseEventType.Error);
  88. });
  89. }
  90. return false;
  91. }
  92. }
  93. return true;
  94. }*/
  95. private WebSocket? CreateSocket(string url, bool secure, CancellationToken ct)
  96. {
  97. WebSocket? client = null;
  98. var address = $"{(secure ? "wss" : "ws")}://{url}";
  99. try
  100. {
  101. client = new WebSocket(address);
  102. var openEvent = new ManualResetEventSlim();
  103. var open = false;
  104. void onOpen(object s, EventArgs e)
  105. {
  106. open = true;
  107. openEvent.Set();
  108. }
  109. void onClose(object s, EventArgs e)
  110. {
  111. openEvent.Set();
  112. }
  113. client.Opened += onOpen;
  114. client.Closed += onClose;
  115. client.EnableAutoSendPing = true;
  116. client.AutoSendPingInterval = 10;
  117. client.Error += Client_Error;
  118. client.MessageReceived += Client_MessageReceived;
  119. client.DataReceived += Client_DataReceived;
  120. client.Open();
  121. try
  122. {
  123. openEvent.Wait(ct);
  124. }
  125. catch (OperationCanceledException)
  126. {
  127. client.Dispose();
  128. return null;
  129. }
  130. if (!open)
  131. {
  132. client.Dispose();
  133. return null;
  134. }
  135. client.Opened -= onOpen;
  136. client.Closed -= onClose;
  137. client.Opened += Client_Opened;
  138. client.Closed += Client_Closed;
  139. DoOpen();
  140. _host = url;
  141. // Time to wait before disconnect - the default meant that the client disconnected during debugging, since the ping would fail
  142. /*socket.WaitTime = TimeSpan.FromSeconds(20);
  143. socket.OnOpen -= Socket_OnOpen;
  144. socket.OnError -= Socket_OnError;
  145. socket.OnClose -= Socket_OnClose;
  146. socket.OnMessage -= Socket_OnMessage;
  147. socket.Connect();
  148. if (socket.ReadyState == WebSocketState.Open)
  149. {
  150. DoOpen();
  151. socket.OnOpen += Socket_OnOpen;
  152. socket.OnError += Socket_OnError;
  153. socket.OnClose += Socket_OnClose;
  154. socket.OnMessage += Socket_OnMessage;
  155. return socket;
  156. }*/
  157. }
  158. catch (Exception e)
  159. {
  160. Logger.Send(LogType.Error, ClientFactory.UserID, $"Error in CreateSocket(): {e.Message}");
  161. throw;
  162. }
  163. return client;
  164. }
  165. private void Client_Closed(object sender, EventArgs e)
  166. {
  167. var wasConnected = _connected;
  168. _connected = false;
  169. openEvent.Set();
  170. if (wasConnected)
  171. {
  172. DoClose(RpcTransportCloseEventType.Closed);
  173. }
  174. }
  175. private void Client_DataReceived(object sender, DataReceivedEventArgs e)
  176. {
  177. var rpcMessage = Serialization.ReadBinary<RpcMessage>(e.Data, BinarySerializationSettings.Latest);
  178. Accept(rpcMessage);
  179. }
  180. private void Client_MessageReceived(object sender, MessageReceivedEventArgs e)
  181. {
  182. var rpcMessage = Serialization.Deserialize<RpcMessage>(e.Message);
  183. Accept(rpcMessage);
  184. }
  185. private void Client_Error(object sender, SuperSocket.ClientEngine.ErrorEventArgs e)
  186. {
  187. DoException(e.Exception);
  188. }
  189. private void Client_Opened(object sender, EventArgs e)
  190. {
  191. _connected = true;
  192. DoOpen();
  193. openEvent.Set();
  194. }
  195. public override bool Connect(CancellationToken ct = default)
  196. {
  197. if (_socket != null)
  198. {
  199. openEvent.Reset();
  200. _socket.Open();
  201. openEvent.Wait(ct);
  202. return _connected;
  203. }
  204. else
  205. {
  206. var childCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
  207. var tasks = new List<Task<WebSocket?>>();
  208. foreach (var url in _urls)
  209. {
  210. tasks.Add(Task.Run(() => CreateSocket(url, true, childCts.Token)));
  211. tasks.Add(Task.Run(() => CreateSocket(url, false, childCts.Token)));
  212. }
  213. while (tasks.Count > 0)
  214. {
  215. var result = Task.WhenAny(tasks).Result;
  216. if (result.Result == null)
  217. tasks.Remove(result);
  218. else
  219. {
  220. childCts.Cancel();
  221. if(_socket != null)
  222. Logger.Send(LogType.Error,"","Socket already exists!");
  223. _socket = result.Result;
  224. _connected = true;
  225. /*Task.Run(() =>
  226. {
  227. while (IsConnected())
  228. {
  229. if (!DoReceive())
  230. {
  231. break;
  232. }
  233. }
  234. });*/
  235. return _connected;
  236. }
  237. }
  238. return _connected;
  239. }
  240. }
  241. public override bool IsConnected() => _socket?.State == WebSocketState.Open;
  242. public override bool IsSecure() => _socket?.Security.Certificates.Count > 0;
  243. public override String? ServerName() => _host;
  244. public override void Disconnect()
  245. {
  246. _socket?.Close();
  247. //_socket?.CloseAsync(WebSocketCloseStatus.NormalClosure, "", _tokenSource.Token).Wait();
  248. }
  249. public override void Send(RpcMessage message)
  250. {
  251. var buffer = message.WriteBinary(BinarySerializationSettings.Latest);
  252. _socket?.Send(buffer, 0, buffer.Length);// ?.SendAsync(buffer, WebSocketMessageType.Binary, true, _tokenSource.Token)?.Wait();
  253. }
  254. protected override RpcClientTransport Clone() => new RpcClientSocketTransport(_urls);
  255. public void Dispose()
  256. {
  257. _socket?.Close();
  258. _socket?.Dispose();
  259. /*if (IsConnected())
  260. Disconnect();
  261. _tokenSource.Cancel();
  262. _socket?.Dispose();*/
  263. }
  264. }
  265. }