Browse Source

Changed out WebSocketSharp for Fleck and the .NET client. Not 100% sure about the client code, and will need more tests. One thing that needs to be looked at is connecting to a server which is down.

Kenric Nugteren 1 year ago
parent
commit
55b0300657

+ 1 - 0
InABox.Client.RPC/InABox.Client.RPC.csproj

@@ -14,6 +14,7 @@
 
     <ItemGroup>
       <PackageReference Include="H.Pipes" Version="2.0.51" />
+      <PackageReference Include="Websocket.Client" Version="4.6.1" />
     </ItemGroup>
 
 </Project>

+ 121 - 41
InABox.Client.RPC/Transports/Socket/RPCClientSocketTransport.cs

@@ -1,18 +1,23 @@
 using System;
 using System.Collections.Generic;
+using System.IO;
 using System.Linq;
+using System.Net.WebSockets;
 using System.Threading;
 using System.Threading.Tasks;
 using InABox.Clients;
 using InABox.Core;
-using WebSocketSharp;
-using ErrorEventArgs = WebSocketSharp.ErrorEventArgs;
 
 namespace InABox.Rpc
 {
     public class RpcClientSocketTransport : RpcClientTransport, IDisposable
     {
-        private WebSocket? _socket;
+        private ClientWebSocket? _socket;
+        private Task? readTask;
+
+        private string? _host;
+        private object _sendLock = new object();
+        private CancellationTokenSource _tokenSource = new CancellationTokenSource();
         
         private string[] _urls;
 
@@ -20,48 +25,104 @@ namespace InABox.Rpc
         {
             _urls = urls;
         }
-        
-        private void Socket_OnOpen(object? sender, EventArgs e)
-        {
-            DoOpen();
-        }
-        
-        private void Socket_OnMessage(object? sender, MessageEventArgs e)
-        {
-            RpcMessage? message = null;
-            if (e.IsBinary && (e.RawData != null))
-                message = Serialization.ReadBinary<RpcMessage>(e.RawData, BinarySerializationSettings.Latest);
-            else if (e.IsText && !string.IsNullOrWhiteSpace(e.Data))
-                message = Serialization.Deserialize<RpcMessage>(e.Data);
-     
-            Accept(message);
-        }
 
-        private void Socket_OnClose(object? sender, CloseEventArgs e)
+        // Returns true if we are to continue the receive loop.
+        private bool DoReceive()
         {
-            DoClose(RpcTransportCloseEventType.Closed);
-        }
+            if(_socket != null)
+            {
+                try
+                {
+                    var buffer = new ArraySegment<byte>(new byte[1024]);
+                    using (var ms = new MemoryStream())
+                    {
+                        WebSocketReceiveResult result;
+                        do
+                        {
+                            var task = _socket.ReceiveAsync(buffer, _tokenSource.Token);
+                            task.Wait();
+                            result = task.Result;
 
-        private void Socket_OnError(object? sender, ErrorEventArgs e)
-        {
-            DoException(e.Exception);
+                            ms.Write(buffer.Array, buffer.Offset, result.Count);
+                        } while (!result.EndOfMessage);
+
+                        ms.Seek(0, SeekOrigin.Begin);
+
+                        if (result.MessageType == WebSocketMessageType.Close)
+                        {
+                            if (result.CloseStatus == WebSocketCloseStatus.NormalClosure)
+                            {
+                                Task.Run(() =>
+                                {
+                                    DoClose(RpcTransportCloseEventType.Closed);
+                                });
+                                return false;
+                            }
+                            else
+                            {
+                                DoException(new Exception(result.CloseStatusDescription));
+                                Task.Run(() =>
+                                {
+                                    DoClose(RpcTransportCloseEventType.Error);
+                                });
+                                return false;
+                            }
+                        }
+                        else
+                        {
+                            RpcMessage? rpcMessage = null;
+                            if (result.MessageType == WebSocketMessageType.Binary)
+                            {
+                                rpcMessage = Serialization.ReadBinary<RpcMessage>(ms, BinarySerializationSettings.Latest);
+                            }
+                            else if (result.MessageType == WebSocketMessageType.Text)
+                            {
+                                rpcMessage = Serialization.Deserialize<RpcMessage>(ms);
+                            }
+                            Accept(rpcMessage);
+                        }
+                    }
+                }
+                catch(Exception e)
+                {
+                    DoException(e);
+                    if (!IsConnected())
+                    {
+                        Task.Run(() =>
+                        {
+                            DoClose(RpcTransportCloseEventType.Error);
+                        });
+                    }
+                    return false;
+                }
+            }
+            return true;
         }
         
-        private WebSocket? CreateSocket(string url, bool secure)
+        private ClientWebSocket? CreateSocket(string url, bool secure)
         {
-            WebSocket socket = null;
+            var client = new ClientWebSocket();
+
+            //WebsocketClient client;
+
+            //WebSocket socket = null;
             var address = $"{(secure ? "wss" : "ws")}://{url}";
             try
             {
-                socket = new WebSocket(address);
+                client.ConnectAsync(new Uri(address), _tokenSource.Token).Wait();
+                //socket = new WebSocket(address);
             }
             catch (Exception e)
             {
+                client.Dispose();
                 return null;
             }
-            
+
+            DoOpen();
+            _host = url;
+
             // Time to wait before disconnect - the default meant that the client disconnected during debugging, since the ping would fail
-            socket.WaitTime = TimeSpan.FromSeconds(20);
+            /*socket.WaitTime = TimeSpan.FromSeconds(20);
             socket.OnOpen -= Socket_OnOpen;
             socket.OnError -= Socket_OnError;
             socket.OnClose -= Socket_OnClose;
@@ -77,17 +138,19 @@ namespace InABox.Rpc
                 socket.OnMessage += Socket_OnMessage;
 
                 return socket;
-            }
-            return null;
+            }*/
+            return client;
         }
 
         public override void Connect()
         {
-            List<Task<WebSocket>> tasks = new List<Task<WebSocket>>();
+            _socket?.Dispose();
+
+            var tasks = new List<Task<ClientWebSocket?>>();
             foreach (var url in _urls)
             {
-                tasks.Add(Task<WebSocket>.Run(() => CreateSocket(url, true)));
-                tasks.Add(Task<WebSocket>.Run(() => CreateSocket(url, false)));
+                tasks.Add(Task.Run(() => CreateSocket(url, true)));
+                tasks.Add(Task.Run(() => CreateSocket(url, false)));
             }
             while (tasks.Count > 0)
             {
@@ -97,25 +160,40 @@ namespace InABox.Rpc
                 else
                 {
                     _socket = result.Result;
+
+                    Task.Run(() =>
+                    {
+                        while (IsConnected())
+                        {
+                            if (!DoReceive())
+                            {
+                                break;
+                            }
+                        }
+                    });
+
                     return;
                 }
             }
         }
 
-        public override bool IsConnected() => _socket?.ReadyState == WebSocketState.Open;
-        public override bool IsSecure() => _socket?.IsSecure == true;
+        public override bool IsConnected() => _socket?.State == WebSocketState.Open;
+        public override bool IsSecure() => false;
 
-        public override String? ServerName() => _socket?.Url.Host;
+        public override String? ServerName() => _host;
         
         public override void Disconnect()
         {
-            _socket?.Close(CloseStatusCode.Normal);
+            _socket?.CloseAsync(WebSocketCloseStatus.NormalClosure, "", _tokenSource.Token).Wait();
         }
 
         public override void Send(RpcMessage message)
         {
-            var buffer = message.WriteBinary(BinarySerializationSettings.Latest);
-            _socket?.Send(buffer);
+            lock (_sendLock)
+            {
+                var buffer = message.WriteBinary(BinarySerializationSettings.Latest);
+                _socket?.SendAsync(buffer, WebSocketMessageType.Binary, true, _tokenSource.Token)?.Wait();
+            }
         }
 
         protected override RpcClientTransport Clone() => new RpcClientSocketTransport(_urls);
@@ -124,6 +202,8 @@ namespace InABox.Rpc
         {
             if (IsConnected())
                 Disconnect();
+            _tokenSource.Cancel();
+            _socket?.Dispose();
         }
     }
 }

+ 1 - 1
InABox.Server/InABox.Server.csproj

@@ -7,6 +7,7 @@
         <GenerateAssemblyInfo>false</GenerateAssemblyInfo>
     </PropertyGroup>
     <ItemGroup>
+        <PackageReference Include="Fleck" Version="1.2.0" />
         <PackageReference Include="GenHTTP.Core" Version="6.3.5" />
         <PackageReference Include="GenHTTP.Modules.Practices" Version="6.3.5" />
         <PackageReference Include="H.Pipes" Version="2.0.51" />
@@ -23,7 +24,6 @@
         <ProjectReference Include="..\InABox.Mailer.Exchange\InABox.Mailer.Exchange.csproj" />
         <ProjectReference Include="..\InABox.Mailer.IMAP\InABox.Mailer.IMAP.csproj" />
         <ProjectReference Include="..\InABox.RPC.Shared\InABox.RPC.Shared.csproj" />
-        <ProjectReference Include="..\InABox.Server.WebSocket\InABox.Server.WebSocket.csproj" />
         <ProjectReference Include="..\InABox.WebSocket.Shared\InABox.WebSocket.Shared.csproj" />
     </ItemGroup>
 

+ 4 - 4
InABox.Server/RPC/RPCServer.cs

@@ -41,14 +41,14 @@ namespace InABox.Rpc
 
         private void Transport_OnOpen(IRpcTransport transport, RpcTransportOpenArgs e)
         {
-            // if ((e.Session?.ID ?? Guid.Empty) != Guid.Empty)
-            //     OnLog?.Invoke(LogType.Information, "", $"Client Connected [{e.Session?.ID}]");
+            if ((e.Session?.ID ?? Guid.Empty) != Guid.Empty)
+                OnLog?.Invoke(LogType.Information, "", $"Client Connected [{e.Session?.ID}]");
         }
 
         private void Transport_OnClose(IRpcTransport transport, RpcTransportCloseArgs e)
         {
-            // if ((e.Session?.ID ?? Guid.Empty) != Guid.Empty)
-            //     OnLog?.Invoke(LogType.Information, "", $"Client Disconnected({e.Type}) [{e.Session?.ID}]");
+            if ((e.Session?.ID ?? Guid.Empty) != Guid.Empty)
+                OnLog?.Invoke(LogType.Information, "", $"Client Disconnected({e.Type}) [{e.Session?.ID}]");
         }
 
         private void Transport_BeforeMessage(IRpcTransport transport, RpcTransportMessageArgs e)

+ 12 - 4
InABox.Server/RPC/Transports/Socket/RPCServerSocketConnection.cs

@@ -1,11 +1,9 @@
+using Fleck;
 using InABox.Core;
-using WebSocketSharp;
-using WebSocketSharp.Server;
-using ErrorEventArgs = WebSocketSharp.ErrorEventArgs;
 
 namespace InABox.Rpc
 {
-    public class RpcServerSocketConnection: WebSocketBehavior
+    /*public class RpcServerSocketConnection: WebSocketBehavior
     {
         
         public RpcServerSocketTransport? Transport { get; set; }
@@ -60,5 +58,15 @@ namespace InABox.Rpc
             Send(Serialization.WriteBinary(message, BinarySerializationSettings.Latest));
         }
 
+    }*/
+
+    public class RpcServerSocketConnection2
+    {
+        public IWebSocketConnection Connection { get; set; }
+
+        public RpcServerSocketConnection2(IWebSocketConnection connection)
+        {
+            Connection = connection;
+        }
     }
 }

+ 39 - 17
InABox.Server/RPC/Transports/Socket/RPCServerSocketTransport.cs

@@ -1,14 +1,14 @@
 using System.Net.Security;
 using System.Security.Authentication;
 using System.Security.Cryptography.X509Certificates;
+using System.Security.Cryptography.Xml;
+using Fleck;
 using InABox.Core;
-using WebSocketSharp;
-using WebSocketSharp.Server;
 using Logger = InABox.Core.Logger;
 
 namespace InABox.Rpc
 {
-    public class RpcServerSocketTransport : RpcServerTransport<RpcServerSocketConnection>
+    public class RpcServerSocketTransport : RpcServerTransport<RpcServerSocketConnection2>
     {
         private WebSocketServer? _server;
 
@@ -22,22 +22,26 @@ namespace InABox.Rpc
         {
             
             Certificate = certificate;
-            
-            _server = new WebSocketServer(port, Certificate != null);
+
+            var protocol = certificate != null ? "wss" : "ws";
+
+            _server = new WebSocketServer($"{protocol}://0.0.0.0:{port}");
+
             if (Certificate != null)
             {
-                _server.SslConfiguration.ServerCertificate = Certificate;
+                /*_server.SslConfiguration.ServerCertificate = Certificate;
                 _server.SslConfiguration.ClientCertificateRequired = false;
                 _server.SslConfiguration.CheckCertificateRevocation = false;
                 _server.SslConfiguration.ClientCertificateValidationCallback = WSSCallback;
-                _server.SslConfiguration.EnabledSslProtocols = SslProtocols.Tls12;
+                _server.SslConfiguration.EnabledSslProtocols = SslProtocols.Tls12;*/
+                _server.Certificate = Certificate;
             }
 
-            _server?.AddWebSocketService<RpcServerSocketConnection>("/", (connection) =>
+            /*_server?.AddWebSocketService<RpcServerSocketConnection>("/", (connection) =>
             {
                 connection.Transport = this;
                 //new RpcServerSocketConnection() { Transport = this };
-            });
+            });*/
         }
 
         private bool WSSCallback(object sender, X509Certificate? certificate, X509Chain? chain, SslPolicyErrors sslpolicyerrors)
@@ -47,27 +51,45 @@ namespace InABox.Rpc
 
         public override void Start()
         {
-            _server?.Start();
+            _server?.Start(socket =>
+            {
+                var connection = new RpcServerSocketConnection2(socket);
+                socket.OnOpen = () => ConnectionOpened(connection);
+                socket.OnClose = () => ConnectionClosed(connection);
+                socket.OnError = (e) => ConnectionException(connection, e);
+                socket.OnBinary = (data) => Task.Run(() =>
+                {
+                    RpcMessage? request = Serialization.ReadBinary<RpcMessage>(data, BinarySerializationSettings.Latest);
+                    var response = DoMessage(connection, request);
+                    socket.Send(Serialization.WriteBinary(response, BinarySerializationSettings.Latest));
+                });
+                socket.OnMessage = (data) => Task.Run(() =>
+                {
+                    RpcMessage? request = Serialization.Deserialize<RpcMessage>(data);
+                    var response = DoMessage(connection, request);
+                    socket.Send(Serialization.Serialize(response));
+                });
+            });
         }
 
-        public override void Send(RpcServerSocketConnection connection, RpcMessage message)
+        public override void Send(RpcServerSocketConnection2 connection, RpcMessage message)
         {
-            connection.Send(message);
+            connection.Connection.Send(Serialization.WriteBinary(message, BinarySerializationSettings.Latest));
         }
 
         public override void Stop()
         {
-            _server?.Stop();
+            _server?.ListenerSocket.Close();
         }
 
-        public void ConnectionOpened(RpcServerSocketConnection connection) 
+        public void ConnectionOpened(RpcServerSocketConnection2 connection) 
             => DoOpen(connection);
 
-        public void ConnectionException(RpcServerSocketConnection connection, Exception e) 
+        public void ConnectionException(RpcServerSocketConnection2 connection, Exception e) 
             => DoException(connection, e);
 
-        public void ConnectionClosed(RpcServerSocketConnection connection, CloseEventArgs e)
-            => DoClose(connection, (e.Code == 1000) ? RpcTransportCloseEventType.Closed : RpcTransportCloseEventType.Error);
+        public void ConnectionClosed(RpcServerSocketConnection2 connection)
+            => DoClose(connection, RpcTransportCloseEventType.Closed);
         
     }
 }

+ 4 - 1
inabox.wpf/DynamicGrid/DynamicDataGrid.cs

@@ -474,8 +474,11 @@ namespace InABox.DynamicGrid
             var tag = GetTag();
 
             var user = Task.Run(() => new UserConfiguration<DynamicGridColumns>(tag).Load());
+            user.Wait();
+
             var global = Task.Run(() => new GlobalConfiguration<DynamicGridColumns>(tag).Load());
-            Task.WaitAll(user, global);
+            global.Wait();
+            //Task.WaitAll(user, global);
             var columns = user.Result.Any() ? user.Result : global.Result;
 
             if (!columns.Any())