From ebec67d0aba5a1d034195ed6f105a167c6816a04 Mon Sep 17 00:00:00 2001 From: Wes Shaddix Date: Tue, 27 Feb 2018 11:47:39 -0500 Subject: [PATCH] added observers and updated property names --- CHANGELOG.md | 7 ++++ README.md | 23 +++++----- src/LoggingMicroservice/Program.cs | 2 +- src/MetricsMicroservice/Program.cs | 5 +-- src/Proxy/NatsMessage.cs | 21 ++++++++-- src/Proxy/Observer.cs | 7 ++++ src/Proxy/Pipeline.cs | 2 + src/Proxy/ProxyConfiguration.cs | 14 +++++++ src/Proxy/RequestHandler.cs | 58 +++++++++++++++++--------- src/Proxy/pipeline.yaml | 19 +++------ src/TestMicroservice/Program.cs | 6 +-- src/TraceHeaderPipelineStep/Program.cs | 3 +- 12 files changed, 109 insertions(+), 58 deletions(-) create mode 100644 src/Proxy/Observer.cs diff --git a/CHANGELOG.md b/CHANGELOG.md index cc79fea..af797d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,3 +26,10 @@ **1.0.3** * changing the format of the message that the proxy sends to microservices and the pipeline steps to make them more intuitive. Specifically changed `Cookies, ExtendedProperties, QueryParams, RequestHeaders` and `ResponseHeaders` from name/value collections to `Dictionary` so they serialize into a more intuitive json string. + +**1.0.4** + +* changed the call timings from a tuple to a custom class because tuples do not serialize to json with readable property names (see https://github.com/JamesNK/Newtonsoft.Json/issues/1230) +* renamed property `Body` to `RequestBody` on the `NatsMessage` +* renamed property `Response` to `ResponseBody` on the `NatsMessage` +* introduced the concept of Observers to the request pipeline (see README for details) \ No newline at end of file diff --git a/README.md b/README.md index 6f63148..f24462a 100644 --- a/README.md +++ b/README.md @@ -58,33 +58,30 @@ Every http(s) request that comes into the http-nats-proxy can go through a serie **order:** A numeric value that is the order that your step should be called in relation to other steps in the pipeline +### Observers + +There are times when you want to get a copy of the request/response message after it has completed running through all of the pipeline steps. Examples of this would be when you wanted to log the request/response or capture metrics about how long each request took to process, etc. In these cases, the metadata about the request is not available during the execution of the pipeline steps. For these scenarios you can use Observers. Observers are notified via a NATS publish message after all of the pipeline steps have executed and metadata has been stored for the request/response pair. It is a "copy" of the final state of the http request. + #### Example pipeline-config.yaml file ``` steps: - - subject: pipeline.metrics - pattern: publish - direction: outgoing - order: 1 - - subject: trace.header pattern: request direction: incoming - order: 2 - - - subject: pipeline.logging - pattern: publish - direction: outgoing - order: 3 + order: 1 - subject: authentication pattern: request direction: incoming - order: 4 + order: 2 - subject: '*' pattern: request direction: incoming - order: 5 + order: 3 +observers: + - subject: 'pipeline.logging' + - subject: 'pipeline.metrics' ``` ## Configuration diff --git a/src/LoggingMicroservice/Program.cs b/src/LoggingMicroservice/Program.cs index 40c05f1..d5401f0 100644 --- a/src/LoggingMicroservice/Program.cs +++ b/src/LoggingMicroservice/Program.cs @@ -40,7 +40,7 @@ private static void PostLog(object sender, MsgHandlerEventArgs e) { // extract the message that we will log var log = Encoding.UTF8.GetString(e.Message.Data); - Console.WriteLine($"Received the log:\r\n{log}"); + Console.WriteLine(log); } } } \ No newline at end of file diff --git a/src/MetricsMicroservice/Program.cs b/src/MetricsMicroservice/Program.cs index 2060d61..c9e120c 100644 --- a/src/MetricsMicroservice/Program.cs +++ b/src/MetricsMicroservice/Program.cs @@ -42,16 +42,15 @@ private static void PostMetric(object sender, MsgHandlerEventArgs e) { // deserialize the NATS message var msg = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(e.Message.Data)); - var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); - Console.WriteLine($"{DateTime.Now.ToShortTimeString()}: Message Called: {msg["subject"]} Total Execution Time (ms): {now - (long)msg["startedOnUtc"]}"); + Console.WriteLine($"{DateTime.Now.ToShortTimeString()}: Message Called: {msg["subject"]} Total Execution Time (ms): {msg["executionTimeMs"]}"); Console.WriteLine("\tBreakdown:"); if (msg["callTimings"] is JArray callTimings) { foreach (var token in callTimings) { - Console.WriteLine($"\t\tSubject: {token["item1"]} Pattern: {token["item2"]}, Execution Time (ms): {token["item3"]}"); + Console.WriteLine($"\t\tSubject: {token["subject"]} - Execution Time (ms): {token["ellapsedMs"]}"); } } diff --git a/src/Proxy/NatsMessage.cs b/src/Proxy/NatsMessage.cs index a657863..a22cfde 100644 --- a/src/Proxy/NatsMessage.cs +++ b/src/Proxy/NatsMessage.cs @@ -5,10 +5,22 @@ namespace Proxy { + public class CallTiming + { + public long EllapsedMs { get; set; } + + public string Subject { get; set; } + + public CallTiming(string subject, long ellapsedMs) + { + Subject = subject; + EllapsedMs = ellapsedMs; + } + } + public sealed class NatsMessage { - public string Body { get; set; } - public List<(string, string, long)> CallTimings { get; set; } + public List CallTimings { get; set; } public long CompletedOnUtc { get; set; } public Dictionary Cookies { get; set; } public string ErrorMessage { get; set; } @@ -16,8 +28,9 @@ public sealed class NatsMessage public Dictionary ExtendedProperties { get; set; } public string Host { get; set; } public Dictionary QueryParams { get; set; } + public string RequestBody { get; set; } public Dictionary RequestHeaders { get; set; } - public string Response { get; set; } + public string ResponseBody { get; set; } public string ResponseContentType { get; set; } public Dictionary ResponseHeaders { get; set; } public int ResponseStatusCode { get; set; } @@ -45,7 +58,7 @@ public NatsMessage(string host, string contentType) QueryParams = new Dictionary(); RequestHeaders = new Dictionary(); ResponseHeaders = new Dictionary(); - CallTimings = new List<(string, string, long)>(); + CallTimings = new List(); } public void MarkComplete() diff --git a/src/Proxy/Observer.cs b/src/Proxy/Observer.cs new file mode 100644 index 0000000..2969af3 --- /dev/null +++ b/src/Proxy/Observer.cs @@ -0,0 +1,7 @@ +namespace Proxy +{ + public class Observer + { + public string Subject { get; set; } + } +} \ No newline at end of file diff --git a/src/Proxy/Pipeline.cs b/src/Proxy/Pipeline.cs index e4f1e83..e90e777 100644 --- a/src/Proxy/Pipeline.cs +++ b/src/Proxy/Pipeline.cs @@ -4,11 +4,13 @@ namespace Proxy { public class Pipeline { + public List Observers { get; set; } public List Steps { get; set; } public Pipeline() { Steps = new List(); + Observers = new List(); } } } \ No newline at end of file diff --git a/src/Proxy/ProxyConfiguration.cs b/src/Proxy/ProxyConfiguration.cs index fcb4010..109ebfd 100644 --- a/src/Proxy/ProxyConfiguration.cs +++ b/src/Proxy/ProxyConfiguration.cs @@ -21,6 +21,7 @@ public class ProxyConfiguration public Pipeline IncomingPipeline { get; private set; } public IConnection NatsConnection { get; set; } public string NatsUrl { get; set; } + public IList Observers { get; set; } public Pipeline OutgoingPipeline { get; private set; } public int PatchStatusCode { get; set; } public string PipelineConfigFile { get; set; } @@ -50,6 +51,9 @@ public void Build() // configure the outgoing pipeline ConfigureOutgoingPipeline(pipeline); + + // configure the observers + ConfigureObservers(pipeline); } private Pipeline BuildRequestPipeline() @@ -108,6 +112,16 @@ private void ConfigureIncomingPipeline(Pipeline pipeline) } } + private void ConfigureObservers(Pipeline pipeline) + { + Observers = new List(); + + foreach (var observer in pipeline.Observers) + { + Observers.Add(observer.Subject); + } + } + private void ConfigureOutgoingPipeline(Pipeline pipeline) { OutgoingPipeline = new Pipeline(); diff --git a/src/Proxy/RequestHandler.cs b/src/Proxy/RequestHandler.cs index 7a7ca1b..80ebf8b 100644 --- a/src/Proxy/RequestHandler.cs +++ b/src/Proxy/RequestHandler.cs @@ -48,7 +48,7 @@ public async Task HandleAsync(HttpContext context) sw.Stop(); // record how long the step took to execute - message.CallTimings.Add((step.Subject, step.Pattern, sw.ElapsedMilliseconds)); + message.CallTimings.Add(new CallTiming(step.Subject, sw.ElapsedMilliseconds)); // if the step requested termination we should stop processing steps if (message.ShouldTerminateRequest) @@ -65,11 +65,12 @@ public async Task HandleAsync(HttpContext context) sw.Stop(); // record how long the step took to execute - message.CallTimings.Add((step.Subject, step.Pattern, sw.ElapsedMilliseconds)); + message.CallTimings.Add(new CallTiming(step.Subject, sw.ElapsedMilliseconds)); } // set the response status code - context.Response.StatusCode = message.ResponseStatusCode == -1 ? DetermineStatusCode(context) : message.ResponseStatusCode; + message.ResponseStatusCode = message.ResponseStatusCode == -1 ? DetermineStatusCode(context) : message.ResponseStatusCode; + context.Response.StatusCode = message.ResponseStatusCode; // set any response headers foreach (var header in message.ResponseHeaders) @@ -80,6 +81,12 @@ public async Task HandleAsync(HttpContext context) // capture the execution time that it took to process the message message.MarkComplete(); + // notify any observers that want a copy of the completed request/response + foreach (var observer in _config.Observers) + { + NotifyObserver(message, observer); + } + // if the response message includes an error, then return it if (!string.IsNullOrWhiteSpace(message.ErrorMessage)) { @@ -92,9 +99,9 @@ public async Task HandleAsync(HttpContext context) } // return the response to the api client - if (!string.IsNullOrWhiteSpace(message.Response)) + if (!string.IsNullOrWhiteSpace(message.ResponseBody)) { - await context.Response.WriteAsync(message.Response); + await context.Response.WriteAsync(message.ResponseBody); } } catch (Exception ex) @@ -147,26 +154,30 @@ private static string ExtractSubject(string method, string path) return string.Concat(method, subjectPath).ToLower(); } - private static NatsMessage MergeMessageProperties(NatsMessage message, NatsMessage responseMessage) + private static void MergeMessageProperties(NatsMessage message, NatsMessage responseMessage) { // we don't want to lose data on the original message if a microservice fails to return all of the data so we're going to just copy // non-null properties from the responseMessage onto the message message.ShouldTerminateRequest = responseMessage.ShouldTerminateRequest; message.ResponseStatusCode = responseMessage.ResponseStatusCode; - message.Response = responseMessage.Response; + message.ResponseBody = responseMessage.ResponseBody; message.ErrorMessage = responseMessage.ErrorMessage ?? message.ErrorMessage; // we want to concatenate the extended properties as each step in the pipeline may be adding information - message.ExtendedProperties = message.ExtendedProperties - .Concat(responseMessage.ExtendedProperties) - .ToDictionary(e => e.Key, e => e.Value); + message.ExtendedProperties.ToList().ForEach(h => + { + if (!message.ExtendedProperties.ContainsKey(h.Key)) + { + message.ExtendedProperties.Add(h.Key, h.Value); + } + }); // we want to add any request headers that the pipeline step could have added that are not already in the RequestHeaders dictionary responseMessage.RequestHeaders.ToList().ForEach(h => { if (!message.RequestHeaders.ContainsKey(h.Key)) { - message.RequestHeaders[h.Key] = h.Value; + message.RequestHeaders.Add(h.Key, h.Value); } }); @@ -175,12 +186,9 @@ private static NatsMessage MergeMessageProperties(NatsMessage message, NatsMessa { if (!message.ResponseHeaders.ContainsKey(h.Key)) { - message.ResponseHeaders[h.Key] = h.Value; + message.ResponseHeaders.Add(h.Key, h.Value); } }); - - // return the merged message - return message; } private static void ParseHttpRequest(HttpRequest request, NatsMessage message) @@ -188,25 +196,25 @@ private static void ParseHttpRequest(HttpRequest request, NatsMessage message) // if there is a body with the request then read it using (var reader = new StreamReader(request.Body, Encoding.UTF8)) { - message.Body = reader.ReadToEnd(); + message.RequestBody = reader.ReadToEnd(); } // parse the headers foreach (var header in request.Headers) { - message.RequestHeaders[header.Key] = header.Value; + message.RequestHeaders.Add(header.Key, string.Join(',', header.Value)); } // parse the cookies foreach (var cookie in request.Cookies) { - message.Cookies[cookie.Key] = cookie.Value; + message.Cookies.Add(cookie.Key, string.Join(',', cookie.Value)); } // parse the query string parameters foreach (var param in request.Query) { - message.QueryParams[param.Key] = param.Value; + message.QueryParams.Add(param.Key, string.Join(',', param.Value)); } } @@ -285,5 +293,17 @@ private async Task ExecuteStep(NatsMessage message, Step step) throw new StepException(subject, step.Pattern, ex.GetBaseException().Message); } } + + private void NotifyObserver(NatsMessage message, string observer) + { + // ensure the nats connection is still in a CONNECTED state + if (_config.NatsConnection.State != ConnState.CONNECTED) + { + throw new Exception($"Cannot send message to the NATS server because the connection is in a {_config.NatsConnection.State} state"); + } + + // send the message to the nats server + _config.NatsConnection.Publish(observer, message.ToBytes(_config.JsonSerializerSettings)); + } } } \ No newline at end of file diff --git a/src/Proxy/pipeline.yaml b/src/Proxy/pipeline.yaml index d8ef419..2f5cdb9 100644 --- a/src/Proxy/pipeline.yaml +++ b/src/Proxy/pipeline.yaml @@ -1,25 +1,18 @@ steps: - - subject: pipeline.metrics - pattern: publish - direction: outgoing - order: 1 - - subject: trace.header pattern: request direction: incoming - order: 2 - - - subject: pipeline.logging - pattern: publish - direction: outgoing - order: 3 + order: 1 - subject: authentication pattern: request direction: incoming - order: 4 + order: 2 - subject: '*' pattern: request direction: incoming - order: 5 \ No newline at end of file + order: 3 +observers: + - subject: 'pipeline.logging' + - subject: 'pipeline.metrics' \ No newline at end of file diff --git a/src/TestMicroservice/Program.cs b/src/TestMicroservice/Program.cs index f5ea94a..734206b 100644 --- a/src/TestMicroservice/Program.cs +++ b/src/TestMicroservice/Program.cs @@ -50,7 +50,7 @@ private static void GetCustomer(object sender, MsgHandlerEventArgs e) }; // store the reply on the NATS message - msg["response"] = JsonConvert.SerializeObject(result); + msg["responseBody"] = JsonConvert.SerializeObject(result); // send the NATS message (with the response now set) back to the caller _connection.Publish(e.Message.Reply, PackageResponse(msg)); @@ -101,7 +101,7 @@ private static void PostCustomer(object sender, MsgHandlerEventArgs e) }; // store the reply on the NATS message - msg["response"] = JsonConvert.SerializeObject(result); + msg["responseBody"] = JsonConvert.SerializeObject(result); // send the NATS message (with the response now set) back to the caller _connection.Publish(e.Message.Reply, PackageResponse(msg)); @@ -121,7 +121,7 @@ private static void PutCustomer(object sender, MsgHandlerEventArgs e) }; // store the reply on the NATS message - msg["response"] = JsonConvert.SerializeObject(result); + msg["responseBody"] = JsonConvert.SerializeObject(result); // lets also override the response status code from the default 201 msg["responseStatusCode"] = 200; diff --git a/src/TraceHeaderPipelineStep/Program.cs b/src/TraceHeaderPipelineStep/Program.cs index dbff237..f55de56 100644 --- a/src/TraceHeaderPipelineStep/Program.cs +++ b/src/TraceHeaderPipelineStep/Program.cs @@ -4,7 +4,6 @@ using Newtonsoft.Json.Serialization; using System; using System.Collections.Generic; -using System.Linq; using System.Text; using System.Threading; @@ -29,7 +28,7 @@ private static void InjectTraceHeader(object sender, MsgHandlerEventArgs e) var msg = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(e.Message.Data)); // if the msg doesn't include a trace header we need to inject one - var traceHeader = msg["requestHeaders"][TraceHeaderName]?.FirstOrDefault(); + var traceHeader = msg.SelectToken($"requestHeaders.{TraceHeaderName}"); if (null == traceHeader) {