using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; using Expressive; using InABox.Clients; using InABox.Core; using WebSocket4Net; namespace InABox.Rpc { public class RpcClientSocketTransport : RpcClientTransport, IDisposable { private WebSocket? _socket; private Task? readTask; private ManualResetEventSlim openEvent = new ManualResetEventSlim(); private string? _host; private bool _connected = false; private string[] _urls; public string? Host => _host; public RpcClientSocketTransport(string[] urls) { _urls = urls; } // Returns true if we are to continue the receive loop. /*private bool DoReceive() { if(_socket != null) { try { var buffer = new ArraySegment(new byte[1024]); using (var ms = new MemoryStream()) { WebSocketReceiveResult result; do { var task = _socket.ReceiveAsync(buffer, _tokenSource.Token); task.Wait(); result = task.Result; ms.Write(buffer.Array, buffer.Offset, result.Count); } while (!result.EndOfMessage); ms.Seek(0, SeekOrigin.Begin); if (result.MessageType == WebSocketMessageType.Close) { if (result.CloseStatus == WebSocketCloseStatus.NormalClosure) { Task.Run(() => { DoClose(RpcTransportCloseEventType.Closed); }); return false; } else { DoException(new Exception(result.CloseStatusDescription)); Task.Run(() => { DoClose(RpcTransportCloseEventType.Error); }); return false; } } else { RpcMessage? rpcMessage = null; if (result.MessageType == WebSocketMessageType.Binary) { rpcMessage = Serialization.ReadBinary(ms, BinarySerializationSettings.Latest); } else if (result.MessageType == WebSocketMessageType.Text) { rpcMessage = Serialization.Deserialize(ms); } Accept(rpcMessage); } } } catch(Exception e) { DoException(e); if (!IsConnected()) { Task.Run(() => { DoClose(RpcTransportCloseEventType.Error); }); } return false; } } return true; }*/ private WebSocket? CreateSocket(string url, bool secure, CancellationToken ct) { WebSocket? client = null; var address = $"{(secure ? "wss" : "ws")}://{url}"; try { client = new WebSocket(address); var openEvent = new ManualResetEventSlim(); var open = false; void onOpen(object s, EventArgs e) { open = true; openEvent.Set(); } void onClose(object s, EventArgs e) { openEvent.Set(); } client.Opened += onOpen; client.Closed += onClose; client.EnableAutoSendPing = true; client.AutoSendPingInterval = 10; client.Error += Client_Error; client.MessageReceived += Client_MessageReceived; client.DataReceived += Client_DataReceived; client.Open(); try { openEvent.Wait(ct); } catch (OperationCanceledException) { client.Dispose(); return null; } if (!open) { client.Dispose(); return null; } client.Opened -= onOpen; client.Closed -= onClose; client.Opened += Client_Opened; client.Closed += Client_Closed; DoOpen(); _host = url; // Time to wait before disconnect - the default meant that the client disconnected during debugging, since the ping would fail /*socket.WaitTime = TimeSpan.FromSeconds(20); socket.OnOpen -= Socket_OnOpen; socket.OnError -= Socket_OnError; socket.OnClose -= Socket_OnClose; socket.OnMessage -= Socket_OnMessage; socket.Connect(); if (socket.ReadyState == WebSocketState.Open) { DoOpen(); socket.OnOpen += Socket_OnOpen; socket.OnError += Socket_OnError; socket.OnClose += Socket_OnClose; socket.OnMessage += Socket_OnMessage; return socket; }*/ } catch (Exception e) { Logger.Send(LogType.Error, ClientFactory.UserID, $"Error in CreateSocket(): {e.Message}"); throw; } return client; } private void Client_Closed(object sender, EventArgs e) { var wasConnected = _connected; _connected = false; openEvent.Set(); if (wasConnected) { DoClose(RpcTransportCloseEventType.Closed); } } private void Client_DataReceived(object sender, DataReceivedEventArgs e) { var rpcMessage = Serialization.ReadBinary(e.Data, BinarySerializationSettings.Latest); Accept(rpcMessage); } private void Client_MessageReceived(object sender, MessageReceivedEventArgs e) { var rpcMessage = Serialization.Deserialize(e.Message); Accept(rpcMessage); } private void Client_Error(object sender, SuperSocket.ClientEngine.ErrorEventArgs e) { DoException(e.Exception); } private void Client_Opened(object sender, EventArgs e) { _connected = true; DoOpen(); openEvent.Set(); } public override bool Connect(CancellationToken ct = default) { if (_socket != null) { openEvent.Reset(); _socket.Open(); openEvent.Wait(ct); return _connected; } else { var childCts = CancellationTokenSource.CreateLinkedTokenSource(ct); var tasks = new List>(); foreach (var url in _urls) { tasks.Add(Task.Run(() => CreateSocket(url, true, childCts.Token))); tasks.Add(Task.Run(() => CreateSocket(url, false, childCts.Token))); } while (tasks.Count > 0) { var result = Task.WhenAny(tasks).Result; if (result.Result == null) tasks.Remove(result); else { childCts.Cancel(); if(_socket != null) Logger.Send(LogType.Error,"","Socket already exists!"); _socket = result.Result; _connected = true; /*Task.Run(() => { while (IsConnected()) { if (!DoReceive()) { break; } } });*/ return _connected; } } return _connected; } } public override bool IsConnected() => _socket?.State == WebSocketState.Open; public override bool IsSecure() => _socket?.Security.Certificates.Count > 0; public override String? ServerName() => _host; public override void Disconnect() { _socket?.Close(); //_socket?.CloseAsync(WebSocketCloseStatus.NormalClosure, "", _tokenSource.Token).Wait(); } public override void Send(RpcMessage message) { var buffer = message.WriteBinary(BinarySerializationSettings.Latest); _socket?.Send(buffer, 0, buffer.Length);// ?.SendAsync(buffer, WebSocketMessageType.Binary, true, _tokenSource.Token)?.Wait(); } protected override RpcClientTransport Clone() => new RpcClientSocketTransport(_urls); public void Dispose() { _socket?.Close(); _socket?.Dispose(); /*if (IsConnected()) Disconnect(); _tokenSource.Cancel(); _socket?.Dispose();*/ } } }