RPCClient.cs 11 KB


  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Threading;
  6. using InABox.Clients;
  7. using InABox.Core;
  8. namespace InABox.Rpc
  9. {
  10. public class RpcClient<TEntity> : BaseClient<TEntity> where TEntity : Entity, new()
  11. {
  12. private IRpcClientTransport _transport;
  13. private ConcurrentDictionary<Guid, ManualResetEventSlim> _events = new ConcurrentDictionary<Guid, ManualResetEventSlim>();
  14. private ConcurrentDictionary<Guid, RpcMessage> _responses = new ConcurrentDictionary<Guid, RpcMessage>();
  15. private const int DefaultRequestTimeout = 5 * 60 * 1000; // 5 minutes
  16. public RpcClient(IRpcClientTransport transport)
  17. {
  18. _transport = transport;
  19. _transport.OnMessage += Transport_Message;
  20. if (!_transport.IsConnected())
  21. _transport.Connect();
  22. }
  23. ~RpcClient()
  24. {
  25. _transport.OnMessage -= Transport_Message;
  26. }
  27. #region TransportManagement
  28. public override bool IsConnected() => _transport?.IsConnected() == true;
  29. // I'm assuming this is for unexpected messages (notifications, etc)?
  30. private void Transport_Message(IRpcTransport transport, RpcTransportMessageArgs e)
  31. {
  32. RaiseLogEvent(LogType.Error, "", "Message received: ({0}) -> {1}", e.Message.Command, e.Message.Payload);
  33. }
  34. public TResult Send<TCommand, TParameters, TResult>(TParameters parameters)
  35. where TCommand : IRpcCommand<TParameters,TResult>
  36. where TParameters : ISerializeBinary
  37. where TResult : ISerializeBinary, new()
  38. {
  39. var request = new RpcMessage()
  40. {
  41. Id = new Guid(),
  42. Command = typeof(TCommand).Name,
  43. Payload = parameters.WriteBinary(BinarySerializationSettings.Latest)
  44. };
  45. var response = Send(request);
  46. if (response.Error != RpcError.NONE)
  47. throw new Exception($"Exception in {typeof(TCommand).Name}({request.Id}): {response.Error}");
  48. var result = Serialization.ReadBinary<TResult>(response.Payload, BinarySerializationSettings.Latest);
  49. if (result == null)
  50. throw new Exception($"{typeof(TCommand).Name}({request.Id}) returned NULL");
  51. return result;
  52. }
  53. public RpcMessage Send(RpcMessage request, int timeout = DefaultRequestTimeout)
  54. {
  55. var start = DateTime.Now;
  56. var ev = Queue(request.Id);
  57. _transport.Send(request);
  58. var result = GetResult(request.Id, ev, timeout);
  59. return result;
  60. }
  61. public ManualResetEventSlim Queue(Guid id)
  62. {
  63. var ev = new ManualResetEventSlim();
  64. _events[id] = ev;
  65. return ev;
  66. }
  67. public RpcMessage GetResult(Guid id, ManualResetEventSlim ev, int timeout)
  68. {
  69. if (_responses.TryGetValue(id, out var result))
  70. {
  71. _responses.Remove(id, out result);
  72. _events.Remove(id, out ev);
  73. return result;
  74. }
  75. try
  76. {
  77. if (!ev.Wait(timeout))
  78. {
  79. return new RpcMessage() { Id = id, Error = RpcError.TIMEOUT };
  80. }
  81. }
  82. catch (Exception e)
  83. {
  84. RaiseLogEvent(LogType.Error, "", e.Message);
  85. throw;
  86. }
  87. _responses.Remove(id, out result);
  88. _events.Remove(id, out ev);
  89. return result ?? new RpcMessage() { Id =id, Error = RpcError.UNKNOWN };
  90. }
  91. #endregion
  92. #region Client Interface
  93. public override DatabaseInfo Info()
  94. {
  95. var result = _transport.Send<RpcInfoCommand, RpcInfoParameters, RpcInfoResult>(new RpcInfoParameters());
  96. return result.Info;
  97. }
  98. private static string[]? _types;
  99. public override IEnumerable<string> SupportedTypes()
  100. {
  101. _types ??= CoreUtils.Entities
  102. .Where(x => x.GetInterfaces().Contains(typeof(IPersistent)))
  103. .Select(x => x.EntityName().Replace(".", "_"))
  104. .ToArray();
  105. return _types;
  106. }
  107. #region Validate & 2FA
  108. protected override IValidationData DoValidate(string userid, string password, Guid session = default)
  109. {
  110. var parameters = new RpcValidateParameters()
  111. {
  112. UserID = userid,
  113. Password = password,
  114. PIN = "",
  115. UsePIN = false,
  116. SessionID = session,
  117. Platform = ClientFactory.Platform,
  118. Version = ClientFactory.Version
  119. };
  120. return _transport.Send<RpcValidateCommand, RpcValidateParameters, RpcValidateResult>(parameters);
  121. }
  122. protected override IValidationData DoValidate(string pin, Guid session = default)
  123. {
  124. var ticks = DateTime.Now.ToUniversalTime().Ticks.ToString();
  125. var parameters = new RpcValidateParameters()
  126. {
  127. UserID = Encryption.Encrypt(ticks, "wCq9rryEJEuHIifYrxRjxg", true),
  128. Password = Encryption.Encrypt(ticks, "7mhvLnqMwkCAzN+zNGlyyg", true),
  129. PIN = pin,
  130. UsePIN = true,
  131. SessionID = session,
  132. Platform = ClientFactory.Platform,
  133. Version = ClientFactory.Version
  134. };
  135. return _transport.Send<RpcValidateCommand, RpcValidateParameters, RpcValidateResult>(parameters);
  136. }
  137. protected override IValidationData DoValidate(Guid session = default)
  138. {
  139. var parameters = new RpcValidateParameters()
  140. {
  141. UserID = "",
  142. Password = "",
  143. PIN = "",
  144. UsePIN = false,
  145. SessionID = session,
  146. Platform = ClientFactory.Platform,
  147. Version = ClientFactory.Version
  148. };
  149. return _transport.Send<RpcValidateCommand, RpcValidateParameters, RpcValidateResult>(parameters);
  150. }
  151. protected override bool DoCheck2FA(string code, Guid? session)
  152. {
  153. var parameters = new RpcCheck2FAParameters()
  154. {
  155. Code = code,
  156. SessionId = session ?? Guid.Empty,
  157. };
  158. var result = _transport.Send<RpcCheck2FACommand, RpcCheck2FAParameters, RpcCheck2FAResult>(parameters);
  159. return result.Valid;
  160. }
  161. #endregion
  162. protected override CoreTable DoQuery(Filter<TEntity>? filter, Columns<TEntity>? columns, SortOrder<TEntity>? sort = null)
  163. {
  164. var parameters = new RpcQueryParameters()
  165. {
  166. Queries = new RpcQueryDefinition[]
  167. {
  168. new RpcQueryDefinition()
  169. {
  170. Type = typeof(TEntity),
  171. Filter = filter,
  172. Columns = columns,
  173. Sort = sort
  174. }
  175. }
  176. };
  177. var result = _transport.Send<RpcQueryCommand, RpcQueryParameters, RpcQueryResult>(parameters);
  178. return result.Tables[0].Table;
  179. }
  180. protected override TEntity[] DoLoad(Filter<TEntity>? filter = null, SortOrder<TEntity>? sort = null)
  181. {
  182. return DoQuery(filter, null, sort).Rows.Select(r => r.ToObject<TEntity>()).ToArray();
  183. }
  184. protected override Dictionary<string, CoreTable> DoQueryMultiple(Dictionary<string, IQueryDef> queries)
  185. {
  186. var result = new Dictionary<String, CoreTable>();
  187. var parameters = new RpcQueryParameters()
  188. {
  189. Queries = queries.Select(kvp =>
  190. new RpcQueryDefinition()
  191. {
  192. Key = kvp.Key,
  193. Type = kvp.Value.Type,
  194. Filter = kvp.Value.Filter,
  195. Columns = kvp.Value.Columns,
  196. Sort = kvp.Value.SortOrder
  197. }
  198. ).ToArray()
  199. };
  200. var response = _transport.Send<RpcQueryCommand, RpcQueryParameters, RpcQueryResult>(parameters);
  201. foreach (var key in response.Tables)
  202. result[key.Key] = key.Table;
  203. return result;
  204. }
  205. protected override void DoSave(TEntity entity, string auditnote)
  206. {
  207. DoSave(new TEntity[] { entity }, auditnote);
  208. }
  209. protected override void DoSave(IEnumerable<TEntity> entities, string auditnote)
  210. {
  211. var items = entities.ToArray();
  212. var parameters = new RpcSaveParameters()
  213. {
  214. Type = typeof(TEntity),
  215. Items = items
  216. };
  217. var result = _transport.Send<RpcSaveCommand, RpcSaveParameters, RpcSaveResult>(parameters);
  218. for (int i=0; i< result.Deltas.Length; i++)
  219. {
  220. items[i].SetObserving(false);
  221. foreach (var (key, value) in result.Deltas[i])
  222. {
  223. if (CoreUtils.TryGetProperty<TEntity>(key, out var property))
  224. CoreUtils.SetPropertyValue(items[i], key, CoreUtils.ChangeType(value, property.PropertyType));
  225. }
  226. items[i].CommitChanges();
  227. items[i].SetObserving(true);
  228. }
  229. }
  230. protected override void DoDelete(TEntity entity, string auditnote)
  231. {
  232. DoDelete(new TEntity[] { entity }, auditnote);
  233. }
  234. protected override void DoDelete(IList<TEntity> entities, string auditnote)
  235. {
  236. var parameters = new RpcDeleteParameters()
  237. {
  238. Type = typeof(TEntity),
  239. IDs = entities.Select(x=>x.ID).ToArray(),
  240. AuditNote = auditnote
  241. };
  242. _transport.Send<RpcDeleteCommand, RpcDeleteParameters, RpcDeleteResult>(parameters);
  243. }
  244. protected override bool DoPing()
  245. {
  246. try
  247. {
  248. _transport.Send<RpcPingCommand, RpcPingParameters, RpcPingResult>(new RpcPingParameters());
  249. return true;
  250. }
  251. catch (Exception e)
  252. {
  253. return false;
  254. }
  255. }
  256. #endregion
  257. }
  258. }