Skip to content

Commit

Permalink
added observers and updated property names
Browse files Browse the repository at this point in the history
  • Loading branch information
wshaddix committed Feb 27, 2018
1 parent 2f7fe7e commit ebec67d
Show file tree
Hide file tree
Showing 12 changed files with 109 additions and 58 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,10 @@
**1.0.3**

* changing the format of the message that the proxy sends to microservices and the pipeline steps to make them more intuitive. Specifically changed `Cookies, ExtendedProperties, QueryParams, RequestHeaders` and `ResponseHeaders` from name/value collections to `Dictionary<string, object>` so they serialize into a more intuitive json string.

**1.0.4**

* changed the call timings from a tuple to a custom class because tuples do not serialize to json with readable property names (see https://github.com/JamesNK/Newtonsoft.Json/issues/1230)
* renamed property `Body` to `RequestBody` on the `NatsMessage`
* renamed property `Response` to `ResponseBody` on the `NatsMessage`
* introduced the concept of Observers to the request pipeline (see README for details)
23 changes: 10 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,33 +58,30 @@ Every http(s) request that comes into the http-nats-proxy can go through a serie

**order:** A numeric value that is the order that your step should be called in relation to other steps in the pipeline

### Observers

There are times when you want to get a copy of the request/response message after it has completed running through all of the pipeline steps. Examples of this would be when you wanted to log the request/response or capture metrics about how long each request took to process, etc. In these cases, the metadata about the request is not available during the execution of the pipeline steps. For these scenarios you can use Observers. Observers are notified via a NATS publish message after all of the pipeline steps have executed and metadata has been stored for the request/response pair. It is a "copy" of the final state of the http request.

#### Example pipeline-config.yaml file
```
steps:
- subject: pipeline.metrics
pattern: publish
direction: outgoing
order: 1
- subject: trace.header
pattern: request
direction: incoming
order: 2
- subject: pipeline.logging
pattern: publish
direction: outgoing
order: 3
order: 1
- subject: authentication
pattern: request
direction: incoming
order: 4
order: 2
- subject: '*'
pattern: request
direction: incoming
order: 5
order: 3
observers:
- subject: 'pipeline.logging'
- subject: 'pipeline.metrics'
```

## Configuration
Expand Down
2 changes: 1 addition & 1 deletion src/LoggingMicroservice/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private static void PostLog(object sender, MsgHandlerEventArgs e)
{
// extract the message that we will log
var log = Encoding.UTF8.GetString(e.Message.Data);
Console.WriteLine($"Received the log:\r\n{log}");
Console.WriteLine(log);
}
}
}
5 changes: 2 additions & 3 deletions src/MetricsMicroservice/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,15 @@ private static void PostMetric(object sender, MsgHandlerEventArgs e)
{
// deserialize the NATS message
var msg = JsonConvert.DeserializeObject<JObject>(Encoding.UTF8.GetString(e.Message.Data));
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();

Console.WriteLine($"{DateTime.Now.ToShortTimeString()}: Message Called: {msg["subject"]} Total Execution Time (ms): {now - (long)msg["startedOnUtc"]}");
Console.WriteLine($"{DateTime.Now.ToShortTimeString()}: Message Called: {msg["subject"]} Total Execution Time (ms): {msg["executionTimeMs"]}");
Console.WriteLine("\tBreakdown:");

if (msg["callTimings"] is JArray callTimings)
{
foreach (var token in callTimings)
{
Console.WriteLine($"\t\tSubject: {token["item1"]} Pattern: {token["item2"]}, Execution Time (ms): {token["item3"]}");
Console.WriteLine($"\t\tSubject: {token["subject"]} - Execution Time (ms): {token["ellapsedMs"]}");
}
}

Expand Down
21 changes: 17 additions & 4 deletions src/Proxy/NatsMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,32 @@

namespace Proxy
{
public class CallTiming
{
public long EllapsedMs { get; set; }

public string Subject { get; set; }

public CallTiming(string subject, long ellapsedMs)
{
Subject = subject;
EllapsedMs = ellapsedMs;
}
}

public sealed class NatsMessage
{
public string Body { get; set; }
public List<(string, string, long)> CallTimings { get; set; }
public List<CallTiming> CallTimings { get; set; }
public long CompletedOnUtc { get; set; }
public Dictionary<string, object> Cookies { get; set; }
public string ErrorMessage { get; set; }
public long ExecutionTimeMs => CompletedOnUtc - StartedOnUtc;
public Dictionary<string, object> ExtendedProperties { get; set; }
public string Host { get; set; }
public Dictionary<string, object> QueryParams { get; set; }
public string RequestBody { get; set; }
public Dictionary<string, object> RequestHeaders { get; set; }
public string Response { get; set; }
public string ResponseBody { get; set; }
public string ResponseContentType { get; set; }
public Dictionary<string, object> ResponseHeaders { get; set; }
public int ResponseStatusCode { get; set; }
Expand Down Expand Up @@ -45,7 +58,7 @@ public NatsMessage(string host, string contentType)
QueryParams = new Dictionary<string, object>();
RequestHeaders = new Dictionary<string, object>();
ResponseHeaders = new Dictionary<string, object>();
CallTimings = new List<(string, string, long)>();
CallTimings = new List<CallTiming>();
}

public void MarkComplete()
Expand Down
7 changes: 7 additions & 0 deletions src/Proxy/Observer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Proxy
{
public class Observer
{
public string Subject { get; set; }
}
}
2 changes: 2 additions & 0 deletions src/Proxy/Pipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ namespace Proxy
{
public class Pipeline
{
public List<Observer> Observers { get; set; }
public List<Step> Steps { get; set; }

public Pipeline()
{
Steps = new List<Step>();
Observers = new List<Observer>();
}
}
}
14 changes: 14 additions & 0 deletions src/Proxy/ProxyConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class ProxyConfiguration
public Pipeline IncomingPipeline { get; private set; }
public IConnection NatsConnection { get; set; }
public string NatsUrl { get; set; }
public IList<string> Observers { get; set; }
public Pipeline OutgoingPipeline { get; private set; }
public int PatchStatusCode { get; set; }
public string PipelineConfigFile { get; set; }
Expand Down Expand Up @@ -50,6 +51,9 @@ public void Build()

// configure the outgoing pipeline
ConfigureOutgoingPipeline(pipeline);

// configure the observers
ConfigureObservers(pipeline);
}

private Pipeline BuildRequestPipeline()
Expand Down Expand Up @@ -108,6 +112,16 @@ private void ConfigureIncomingPipeline(Pipeline pipeline)
}
}

private void ConfigureObservers(Pipeline pipeline)
{
Observers = new List<string>();

foreach (var observer in pipeline.Observers)
{
Observers.Add(observer.Subject);
}
}

private void ConfigureOutgoingPipeline(Pipeline pipeline)
{
OutgoingPipeline = new Pipeline();
Expand Down
58 changes: 39 additions & 19 deletions src/Proxy/RequestHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public async Task HandleAsync(HttpContext context)
sw.Stop();

// record how long the step took to execute
message.CallTimings.Add((step.Subject, step.Pattern, sw.ElapsedMilliseconds));
message.CallTimings.Add(new CallTiming(step.Subject, sw.ElapsedMilliseconds));

// if the step requested termination we should stop processing steps
if (message.ShouldTerminateRequest)
Expand All @@ -65,11 +65,12 @@ public async Task HandleAsync(HttpContext context)
sw.Stop();

// record how long the step took to execute
message.CallTimings.Add((step.Subject, step.Pattern, sw.ElapsedMilliseconds));
message.CallTimings.Add(new CallTiming(step.Subject, sw.ElapsedMilliseconds));
}

// set the response status code
context.Response.StatusCode = message.ResponseStatusCode == -1 ? DetermineStatusCode(context) : message.ResponseStatusCode;
message.ResponseStatusCode = message.ResponseStatusCode == -1 ? DetermineStatusCode(context) : message.ResponseStatusCode;
context.Response.StatusCode = message.ResponseStatusCode;

// set any response headers
foreach (var header in message.ResponseHeaders)
Expand All @@ -80,6 +81,12 @@ public async Task HandleAsync(HttpContext context)
// capture the execution time that it took to process the message
message.MarkComplete();

// notify any observers that want a copy of the completed request/response
foreach (var observer in _config.Observers)
{
NotifyObserver(message, observer);
}

// if the response message includes an error, then return it
if (!string.IsNullOrWhiteSpace(message.ErrorMessage))
{
Expand All @@ -92,9 +99,9 @@ public async Task HandleAsync(HttpContext context)
}

// return the response to the api client
if (!string.IsNullOrWhiteSpace(message.Response))
if (!string.IsNullOrWhiteSpace(message.ResponseBody))
{
await context.Response.WriteAsync(message.Response);
await context.Response.WriteAsync(message.ResponseBody);
}
}
catch (Exception ex)
Expand Down Expand Up @@ -147,26 +154,30 @@ private static string ExtractSubject(string method, string path)
return string.Concat(method, subjectPath).ToLower();
}

private static NatsMessage MergeMessageProperties(NatsMessage message, NatsMessage responseMessage)
private static void MergeMessageProperties(NatsMessage message, NatsMessage responseMessage)
{
// we don't want to lose data on the original message if a microservice fails to return all of the data so we're going to just copy
// non-null properties from the responseMessage onto the message
message.ShouldTerminateRequest = responseMessage.ShouldTerminateRequest;
message.ResponseStatusCode = responseMessage.ResponseStatusCode;
message.Response = responseMessage.Response;
message.ResponseBody = responseMessage.ResponseBody;
message.ErrorMessage = responseMessage.ErrorMessage ?? message.ErrorMessage;

// we want to concatenate the extended properties as each step in the pipeline may be adding information
message.ExtendedProperties = message.ExtendedProperties
.Concat(responseMessage.ExtendedProperties)
.ToDictionary(e => e.Key, e => e.Value);
message.ExtendedProperties.ToList().ForEach(h =>
{
if (!message.ExtendedProperties.ContainsKey(h.Key))
{
message.ExtendedProperties.Add(h.Key, h.Value);
}
});

// we want to add any request headers that the pipeline step could have added that are not already in the RequestHeaders dictionary
responseMessage.RequestHeaders.ToList().ForEach(h =>
{
if (!message.RequestHeaders.ContainsKey(h.Key))
{
message.RequestHeaders[h.Key] = h.Value;
message.RequestHeaders.Add(h.Key, h.Value);
}
});

Expand All @@ -175,38 +186,35 @@ private static NatsMessage MergeMessageProperties(NatsMessage message, NatsMessa
{
if (!message.ResponseHeaders.ContainsKey(h.Key))
{
message.ResponseHeaders[h.Key] = h.Value;
message.ResponseHeaders.Add(h.Key, h.Value);
}
});

// return the merged message
return message;
}

private static void ParseHttpRequest(HttpRequest request, NatsMessage message)
{
// if there is a body with the request then read it
using (var reader = new StreamReader(request.Body, Encoding.UTF8))
{
message.Body = reader.ReadToEnd();
message.RequestBody = reader.ReadToEnd();
}

// parse the headers
foreach (var header in request.Headers)
{
message.RequestHeaders[header.Key] = header.Value;
message.RequestHeaders.Add(header.Key, string.Join(',', header.Value));
}

// parse the cookies
foreach (var cookie in request.Cookies)
{
message.Cookies[cookie.Key] = cookie.Value;
message.Cookies.Add(cookie.Key, string.Join(',', cookie.Value));
}

// parse the query string parameters
foreach (var param in request.Query)
{
message.QueryParams[param.Key] = param.Value;
message.QueryParams.Add(param.Key, string.Join(',', param.Value));
}
}

Expand Down Expand Up @@ -285,5 +293,17 @@ private async Task ExecuteStep(NatsMessage message, Step step)
throw new StepException(subject, step.Pattern, ex.GetBaseException().Message);
}
}

private void NotifyObserver(NatsMessage message, string observer)
{
// ensure the nats connection is still in a CONNECTED state
if (_config.NatsConnection.State != ConnState.CONNECTED)
{
throw new Exception($"Cannot send message to the NATS server because the connection is in a {_config.NatsConnection.State} state");
}

// send the message to the nats server
_config.NatsConnection.Publish(observer, message.ToBytes(_config.JsonSerializerSettings));
}
}
}
19 changes: 6 additions & 13 deletions src/Proxy/pipeline.yaml
Original file line number Diff line number Diff line change
@@ -1,25 +1,18 @@
steps:
- subject: pipeline.metrics
pattern: publish
direction: outgoing
order: 1

- subject: trace.header
pattern: request
direction: incoming
order: 2

- subject: pipeline.logging
pattern: publish
direction: outgoing
order: 3
order: 1

- subject: authentication
pattern: request
direction: incoming
order: 4
order: 2

- subject: '*'
pattern: request
direction: incoming
order: 5
order: 3
observers:
- subject: 'pipeline.logging'
- subject: 'pipeline.metrics'
6 changes: 3 additions & 3 deletions src/TestMicroservice/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private static void GetCustomer(object sender, MsgHandlerEventArgs e)
};

// store the reply on the NATS message
msg["response"] = JsonConvert.SerializeObject(result);
msg["responseBody"] = JsonConvert.SerializeObject(result);

// send the NATS message (with the response now set) back to the caller
_connection.Publish(e.Message.Reply, PackageResponse(msg));
Expand Down Expand Up @@ -101,7 +101,7 @@ private static void PostCustomer(object sender, MsgHandlerEventArgs e)
};

// store the reply on the NATS message
msg["response"] = JsonConvert.SerializeObject(result);
msg["responseBody"] = JsonConvert.SerializeObject(result);

// send the NATS message (with the response now set) back to the caller
_connection.Publish(e.Message.Reply, PackageResponse(msg));
Expand All @@ -121,7 +121,7 @@ private static void PutCustomer(object sender, MsgHandlerEventArgs e)
};

// store the reply on the NATS message
msg["response"] = JsonConvert.SerializeObject(result);
msg["responseBody"] = JsonConvert.SerializeObject(result);

// lets also override the response status code from the default 201
msg["responseStatusCode"] = 200;
Expand Down
Loading

0 comments on commit ebec67d

Please sign in to comment.