RPCServerTransport.cs 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. using System.Collections.Concurrent;
  2. using System.Text;
  3. using InABox.Core;
  4. namespace InABox.Rpc
  5. {
  6. public abstract class RpcServerTransport<TConnection> : IPusher, IRpcServerTransport where TConnection : notnull
  7. {
  8. public abstract bool IsSecure();
  9. private ConcurrentDictionary<TConnection, RpcServerSession> _sessions = new ConcurrentDictionary<TConnection, RpcServerSession>();
  10. protected RpcServerSession CreateSession(TConnection connection)
  11. {
  12. var result = new RpcServerSession();
  13. _sessions[connection] = result;
  14. return result;
  15. }
  16. public RpcServerSession? GetSession(TConnection? connection)
  17. {
  18. if (connection == null)
  19. return null;
  20. _sessions.TryGetValue(connection, out RpcServerSession? result);
  21. return result;
  22. }
  23. protected RpcServerSession? DeleteSession(TConnection connection)
  24. {
  25. _sessions.Remove(connection, out RpcServerSession? session);
  26. return session;
  27. }
  28. private Dictionary<string, IRpcCommandHandler> _handlers = new Dictionary<string, IRpcCommandHandler>();
  29. public void AddHandler<TSender, TCommand, TProperties, TResult>(RpcCommandHandler<TSender, TCommand, TProperties, TResult> handler)
  30. where TSender : class
  31. where TCommand : IRpcCommand<TProperties, TResult>
  32. where TProperties : IRpcCommandParameters, new()
  33. where TResult : IRpcCommandResult, new()
  34. {
  35. _handlers[typeof(TCommand).Name] = handler;
  36. }
  37. public abstract void Start();
  38. public abstract void Stop();
  39. public event RpcTransportOpenEvent? OnOpen;
  40. protected void DoOpen(TConnection connection)
  41. {
  42. var session = CreateSession(connection);
  43. OnOpen?.Invoke(this, new RpcTransportOpenArgs(session) );
  44. }
  45. public event RpcTransportCloseEvent? OnClose;
  46. protected void DoClose(TConnection connection, RpcTransportCloseEventType type)
  47. {
  48. var session = DeleteSession(connection);
  49. OnClose?.Invoke(this, new RpcTransportCloseArgs(session, type));
  50. }
  51. public event RpcTransportExceptionEvent? OnException;
  52. protected void DoException(TConnection? connection, Exception e)
  53. {
  54. var session = GetSession(connection);
  55. OnException?.Invoke(this, new RpcTransportExceptionArgs(session, e));
  56. }
  57. public event RpcTransportMessageEvent? BeforeMessage;
  58. protected void DoBeforeMessage(RpcServerSession? session, RpcMessage? message)
  59. {
  60. BeforeMessage?.Invoke(this, new RpcTransportMessageArgs(session,message));
  61. }
  62. public event RpcTransportMessageEvent? AfterMessage;
  63. protected void DoAfterMessage(RpcServerSession? session, RpcMessage? message)
  64. {
  65. AfterMessage?.Invoke(this, new RpcTransportMessageArgs(session,message));
  66. }
  67. /// <summary>
  68. /// Handle a message from a client.
  69. /// </summary>
  70. /// <param name="connection">The client connection.</param>
  71. /// <param name="message">The message to be handled.</param>
  72. /// <returns>The response to be sent back to the client.</returns>
  73. public RpcMessage? DoMessage(TConnection? connection, RpcMessage? message)
  74. {
  75. if(message is null)
  76. {
  77. DoException(connection, new Exception("NULL Message Received"));
  78. return null;
  79. }
  80. var logger = new Logger(message.Id);
  81. var response = new RpcMessage() { Id = message.Id, Command = message.Command };
  82. try
  83. {
  84. var session = GetSession(connection);
  85. DoBeforeMessage(session, message);
  86. if (session != null)
  87. {
  88. response = new RpcMessage() { Id = message.Id, Command = message.Command };
  89. if (_handlers.TryGetValue(message.Command, out var command))
  90. {
  91. try
  92. {
  93. response.Payload = command.Execute(session, message.Payload, logger);
  94. }
  95. catch (RpcException err)
  96. {
  97. response.Payload = Encoding.UTF8.GetBytes(err.Message);
  98. response.Error = err.Error;
  99. }
  100. }
  101. else
  102. {
  103. DoException(connection, new Exception("Command Not Found"));
  104. response.Error = RpcError.COMMANDNOTFOUND;
  105. }
  106. DoAfterMessage(session, response);
  107. }
  108. else
  109. {
  110. DoException(connection, new Exception("Session not Found"));
  111. response.Error = RpcError.SESSIONNOTFOUND;
  112. }
  113. }
  114. catch(Exception e)
  115. {
  116. DoException(connection, e);
  117. response.Payload = Encoding.UTF8.GetBytes(e.Message);
  118. response.Error = RpcError.SERVERERROR;
  119. }
  120. return response;
  121. }
  122. /// <summary>
  123. /// Send a message to a particular client connection.
  124. /// </summary>
  125. /// <param name="connection">The connection to send to.</param>
  126. /// <param name="message">The message to send.</param>
  127. public abstract void Send(TConnection connection, RpcMessage message);
  128. #region Pusher Stuff
  129. public void PushToAll<TPush>(TPush push) where TPush : BaseObject
  130. {
  131. var message = new RpcMessage(Guid.NewGuid(), "Push", RpcPush.Create(push).WriteBinary(BinarySerializationSettings.Latest));
  132. foreach (var connection in _sessions.Keys)
  133. {
  134. Send(connection, message);
  135. }
  136. }
  137. public void PushToSession<TPush>(Guid session, TPush push) where TPush : BaseObject
  138. {
  139. var message = new RpcMessage(Guid.NewGuid(), "Push", RpcPush.Create(push).WriteBinary(BinarySerializationSettings.Latest));
  140. var sessionConnection = _sessions.FirstOrDefault(x => x.Value.ID == session).Key;
  141. if(sessionConnection is not null)
  142. {
  143. Send(sessionConnection, message);
  144. }
  145. }
  146. public void PushToSession(Guid session, Type TPush, BaseObject push)
  147. {
  148. var message = new RpcMessage(Guid.NewGuid(), "Push", new RpcPush
  149. {
  150. Object = push,
  151. Type = TPush
  152. }.WriteBinary(BinarySerializationSettings.Latest));
  153. var sessionConnection = _sessions.FirstOrDefault(x => x.Value.ID == session).Key;
  154. if (sessionConnection is not null)
  155. {
  156. Send(sessionConnection, message);
  157. }
  158. }
  159. public IEnumerable<Guid> GetUserSessions(Guid user) =>
  160. _sessions.Values.Where(x => x.UserGuid == user).Select(x => x.ID);
  161. public IEnumerable<Guid> GetSessions(Platform platform) =>
  162. _sessions.Values.Where(x => x.Platform == platform).Select(x => x.ID);
  163. #endregion
  164. }
  165. }