RPCServerTransport.cs 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. using System.Collections.Concurrent;
  2. using System.Text;
  3. using InABox.Core;
  4. namespace InABox.Rpc
  5. {
  6. public abstract class RpcServerTransport<TConnection> : IPusher, IRpcServerTransport where TConnection : notnull
  7. {
  8. public abstract bool IsSecure();
  9. private ConcurrentDictionary<TConnection, RpcServerSession> _sessions = new ConcurrentDictionary<TConnection, RpcServerSession>();
  10. protected RpcServerSession CreateSession(TConnection connection)
  11. {
  12. var result = new RpcServerSession();
  13. _sessions[connection] = result;
  14. return result;
  15. }
  16. public RpcServerSession? GetSession(TConnection? connection)
  17. {
  18. if (connection == null)
  19. return null;
  20. _sessions.TryGetValue(connection, out RpcServerSession? result);
  21. return result;
  22. }
  23. protected RpcServerSession? DeleteSession(TConnection connection)
  24. {
  25. _sessions.Remove(connection, out RpcServerSession? session);
  26. return session;
  27. }
  28. private Dictionary<string, IRpcCommandHandler> _handlers = new Dictionary<string, IRpcCommandHandler>();
  29. public void AddHandler<TSender, TCommand, TProperties, TResult>(RpcCommandHandler<TSender, TCommand, TProperties, TResult> handler)
  30. where TSender : class
  31. where TCommand : IRpcCommand<TProperties, TResult>
  32. where TProperties : IRpcCommandParameters, new()
  33. where TResult : IRpcCommandResult, new()
  34. {
  35. _handlers[typeof(TCommand).Name] = handler;
  36. }
  37. public abstract void Start();
  38. public abstract void Stop();
  39. public event RpcTransportOpenEvent? OnOpen;
  40. protected void DoOpen(TConnection connection)
  41. {
  42. var session = CreateSession(connection);
  43. OnOpen?.Invoke(this, new RpcTransportOpenArgs(session) );
  44. }
  45. public event RpcTransportCloseEvent? OnClose;
  46. protected void DoClose(TConnection connection, RpcTransportCloseEventType type)
  47. {
  48. var session = DeleteSession(connection);
  49. OnClose?.Invoke(this, new RpcTransportCloseArgs(session, type));
  50. }
  51. public event RpcTransportExceptionEvent? OnException;
  52. protected void DoException(TConnection? connection, Exception e)
  53. {
  54. var session = GetSession(connection);
  55. OnException?.Invoke(this, new RpcTransportExceptionArgs(session, e));
  56. }
  57. public event RpcTransportMessageEvent? BeforeMessage;
  58. protected void DoBeforeMessage(RpcServerSession? session, RpcMessage? message)
  59. {
  60. BeforeMessage?.Invoke(this, new RpcTransportMessageArgs(session,message));
  61. }
  62. public event RpcTransportMessageEvent? AfterMessage;
  63. protected void DoAfterMessage(RpcServerSession? session, RpcMessage? message)
  64. {
  65. AfterMessage?.Invoke(this, new RpcTransportMessageArgs(session,message));
  66. }
  67. /// <summary>
  68. /// Handle a message from a client.
  69. /// </summary>
  70. /// <param name="connection">The client connection.</param>
  71. /// <param name="message">The message to be handled.</param>
  72. /// <returns>The response to be sent back to the client.</returns>
  73. public RpcMessage? DoMessage(TConnection? connection, RpcMessage? message)
  74. {
  75. if(message is null)
  76. {
  77. DoException(connection, new Exception("NULL Message Received"));
  78. return null;
  79. }
  80. var response = new RpcMessage() { Id = message.Id, Command = message.Command };
  81. try
  82. {
  83. var session = GetSession(connection);
  84. DoBeforeMessage(session, message);
  85. if (session != null)
  86. {
  87. response = new RpcMessage() { Id = message.Id, Command = message.Command };
  88. if (_handlers.TryGetValue(message.Command, out var command))
  89. {
  90. try
  91. {
  92. response.Payload = command.Execute(session, message.Payload);
  93. }
  94. catch (RpcException err)
  95. {
  96. response.Payload = Encoding.UTF8.GetBytes(err.Message);
  97. response.Error = err.Error;
  98. }
  99. }
  100. else
  101. {
  102. DoException(connection, new Exception("Command Not Found"));
  103. response.Error = RpcError.COMMANDNOTFOUND;
  104. }
  105. DoAfterMessage(session, response);
  106. }
  107. else
  108. {
  109. DoException(connection, new Exception("Session not Found"));
  110. response.Error = RpcError.SESSIONNOTFOUND;
  111. }
  112. }
  113. catch(Exception e)
  114. {
  115. DoException(connection, e);
  116. response.Payload = Encoding.UTF8.GetBytes(e.Message);
  117. response.Error = RpcError.SERVERERROR;
  118. }
  119. return response;
  120. }
  121. /// <summary>
  122. /// Send a message to a particular client connection.
  123. /// </summary>
  124. /// <param name="connection">The connection to send to.</param>
  125. /// <param name="message">The message to send.</param>
  126. public abstract void Send(TConnection connection, RpcMessage message);
  127. #region Pusher Stuff
  128. public void PushToAll<TPush>(TPush push) where TPush : BaseObject
  129. {
  130. var message = new RpcMessage(Guid.NewGuid(), "Push", RpcPush.Create(push).WriteBinary(BinarySerializationSettings.Latest));
  131. foreach (var connection in _sessions.Keys)
  132. {
  133. Send(connection, message);
  134. }
  135. }
  136. public void PushToSession<TPush>(Guid session, TPush push) where TPush : BaseObject
  137. {
  138. var message = new RpcMessage(Guid.NewGuid(), "Push", RpcPush.Create(push).WriteBinary(BinarySerializationSettings.Latest));
  139. var sessionConnection = _sessions.FirstOrDefault(x => x.Value.ID == session).Key;
  140. if(sessionConnection is not null)
  141. {
  142. Send(sessionConnection, message);
  143. }
  144. }
  145. public void PushToSession(Guid session, Type TPush, BaseObject push)
  146. {
  147. var message = new RpcMessage(Guid.NewGuid(), "Push", new RpcPush
  148. {
  149. Object = push,
  150. Type = TPush
  151. }.WriteBinary(BinarySerializationSettings.Latest));
  152. var sessionConnection = _sessions.FirstOrDefault(x => x.Value.ID == session).Key;
  153. if (sessionConnection is not null)
  154. {
  155. Send(sessionConnection, message);
  156. }
  157. }
  158. public IEnumerable<Guid> GetUserSessions(Guid user) =>
  159. _sessions.Values.Where(x => x.UserGuid == user).Select(x => x.ID);
  160. public IEnumerable<Guid> GetSessions(Platform platform) =>
  161. _sessions.Values.Where(x => x.Platform == platform).Select(x => x.ID);
  162. #endregion
  163. }
  164. }