Skip to content

Commit

Permalink
fix: Handle null or empty SQS messages and/or message attributes (#2833)
Browse files Browse the repository at this point in the history
* Null check and collection count check to prevent exceptions

* Some more null checks needed in both agent and test code

* Run SQS tests with AWSConfigs.InitializeCollections set both true and false

* Test receiving empty messages

* Attempt to solve test container conflict

* PR feedback

* Allow tests to run in parallel without container name conflicts

* disabled CentOS / arm64 container tests

---------

Co-authored-by: Marty Tippin <120425148+tippmar-nr@users.noreply.github.com>
  • Loading branch information
nr-ahemsath and tippmar-nr authored Oct 15, 2024
1 parent 0a98da4 commit 758b770
Show file tree
Hide file tree
Showing 10 changed files with 172 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using NewRelic.Agent.Api;
using NewRelic.Agent.Api.Experimental;
using NewRelic.Agent.Extensions.Providers.Wrapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ public static AfterWrappedMethodDelegate HandleSQSRequest(InstrumentedMethodCall
var ec = executionContext;
var response = ec.ResponseContext.Response; // response is a ReceiveMessageResponse

// accept distributed trace headers from the first message in the response
SqsHelper.AcceptDistributedTraceHeaders(transaction, response.Messages[0].MessageAttributes);
AcceptTracingHeadersIfSafe(transaction, response);
}
);

Expand All @@ -119,10 +118,10 @@ void ProcessResponse(Task responseTask)

// taskResult is a ReceiveMessageResponse
var taskResultGetter = _getRequestResponseFromGeneric.GetOrAdd(responseTask.GetType(), t => VisibilityBypasser.Instance.GeneratePropertyAccessor<object>(t, "Result"));
dynamic receiveMessageResponse = taskResultGetter(responseTask);
dynamic response = taskResultGetter(responseTask);

AcceptTracingHeadersIfSafe(transaction, response);

// accept distributed trace headers from the first message in the response
SqsHelper.AcceptDistributedTraceHeaders(transaction, receiveMessageResponse.Messages[0].MessageAttributes);
}
}

Expand All @@ -131,5 +130,14 @@ private static bool ValidTaskResponse(Task response)
return response?.Status == TaskStatus.RanToCompletion;
}

private static void AcceptTracingHeadersIfSafe(ITransaction transaction, dynamic response)
{
if (response.Messages != null && response.Messages.Count > 0 && response.Messages[0].MessageAttributes != null)
{
// accept distributed trace headers from the first message in the response
SqsHelper.AcceptDistributedTraceHeaders(transaction, response.Messages[0].MessageAttributes);
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private AmazonSQSClient GetSqsClient()
var awsCredentials = new Amazon.Runtime.BasicAWSCredentials("dummy", "dummy");
var config = new AmazonSQSConfig
{
ServiceURL = "http://localstack-containertest:4566",
ServiceURL = "http://localstack:4566",
AuthenticationRegion = "us-west-2"
};

Expand Down Expand Up @@ -117,23 +117,34 @@ public async Task<IEnumerable<Message>> SQS_ReceiveMessageAsync(int maxMessagesT
if (messageAttributeNames.Count != 1)
throw new Exception("Expected messageAttributeNames to have a single element");

foreach (var message in response.Messages)
if (response.Messages != null)
{
Console.WriteLine($"Message: {message.Body}");
foreach (var attr in message.MessageAttributes)
foreach (var message in response.Messages)
{
Console.WriteLine($"MessageAttributes: {attr.Key} = {{ DataType = {attr.Value.DataType}, StringValue = {attr.Value.StringValue}}}");
Console.WriteLine($"Message: {message.Body}");
if (message.MessageAttributes != null)
{
foreach (var attr in message.MessageAttributes)
{
Console.WriteLine($"MessageAttributes: {attr.Key} = {{ DataType = {attr.Value.DataType}, StringValue = {attr.Value.StringValue}}}");
}
}

// delete message
await _amazonSqsClient.DeleteMessageAsync(new DeleteMessageRequest
{
QueueUrl = _sqsQueueUrl,
ReceiptHandle = message.ReceiptHandle
});
}

// delete message
await _amazonSqsClient.DeleteMessageAsync(new DeleteMessageRequest
{
QueueUrl = _sqsQueueUrl,
ReceiptHandle = message.ReceiptHandle
});
return response.Messages;
}
else
{
// received an empty response, so return an empty list of messages
return new List<Message>();
}

return response.Messages;
}

// send message batch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,19 @@ ARG NEW_RELIC_HOST
ARG NEW_RELIC_LICENSE_KEY
ARG NEW_RELIC_APP_NAME

# Control whether or not 'empty' things (e.g. message attributes) are initialized
# to an empty collection or left null
ARG AWSSDK_INITCOLLECTIONS

ENV CORECLR_ENABLE_PROFILING=1 \
CORECLR_PROFILER={36032161-FFC0-4B61-B559-F6C5D41BAE5A} \
CORECLR_NEW_RELIC_HOME=/usr/local/newrelic-dotnet-agent \
CORECLR_PROFILER_PATH=/usr/local/newrelic-dotnet-agent/libNewRelicProfiler.so \
NEW_RELIC_HOST=${NEW_RELIC_HOST} \
NEW_RELIC_LICENSE_KEY=${NEW_RELIC_LICENSE_KEY} \
NEW_RELIC_APP_NAME=${NEW_RELIC_APP_NAME} \
NEW_RELIC_LOG_DIRECTORY=/app/logs
NEW_RELIC_LOG_DIRECTORY=/app/logs \
AWSSDK_INITCOLLECTIONS=${AWSSDK_INITCOLLECTIONS}

WORKDIR /app
COPY --from=publish /app/publish .
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.IO;
using System.Net;
using System.Threading.Tasks;
using Amazon;
using AwsSdkTestApp.SQSBackgroundService;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
Expand All @@ -23,6 +24,10 @@ public static async Task Main(string[] args)
builder.Logging.ClearProviders();
builder.Logging.AddConsole();

var initCollections = GetBoolFromEnvVar("AWSSDK_INITCOLLECTIONS", true);

AWSConfigs.InitializeCollections = initCollections;

// Add services to the container.
builder.Services.AddControllers();

Expand Down Expand Up @@ -57,4 +62,27 @@ static void CreatePidFile()
using var file = File.CreateText(pidFileNameAndPath);
file.WriteLine(pid);
}

static bool GetBoolFromEnvVar(string name, bool defaultValue)
{
bool returnVal = defaultValue;
var envVarVal = Environment.GetEnvironmentVariable(name);
if (envVarVal != null)
{
Console.WriteLine($"Value of env var {name}={envVarVal}");
if (bool.TryParse(envVarVal, out returnVal))
{
Console.WriteLine($"Parsed bool from env var: {returnVal}");
}
else
{
Console.WriteLine("Could not parse bool from env var val: " + envVarVal);
}
}
else
{
Console.WriteLine($"{name} is not set in the environment");
}
return returnVal;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@

services:
localstack:
container_name: "localstack-containertest"
image: localstack/localstack:stable
expose: # ports are only available intneral to the service, not external so there's no chance for conflicts
expose: # ports are only available internal to the service, not external so there's no chance for conflicts
- "4566" # LocalStack Gateway
- "4510-4559" # external services port range
environment:
Expand Down Expand Up @@ -52,8 +51,15 @@ services:
NEW_RELIC_HOST: ${NEW_RELIC_HOST}
DOTNET_VERSION: ${DOTNET_VERSION}
APP_DOTNET_VERSION: ${APP_DOTNET_VERSION}
AWSSDK_INITCOLLECTIONS: ${AWSSDK_INITCOLLECTIONS}
ports:
- "${PORT}:80"
volumes:
- ${AGENT_PATH}:/usr/local/newrelic-dotnet-agent # AGENT_PATH from .env, points to newrelichome_linux_x64
- ${LOG_PATH}:/app/logs # LOG_PATH from .env, should be a folder unique to this run of the smoketest app

networks:
default:
driver: bridge
driver_opts:
com.docker.network.bridge.enable_icc: "true"
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// SPDX-License-Identifier: Apache-2.0

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using NewRelic.Agent.ContainerIntegrationTests.Applications;
using NewRelic.Agent.ContainerIntegrationTests.Fixtures;
Expand All @@ -14,7 +13,7 @@ public abstract class AwsSdkContainerTestFixtureBase(
string distroTag,
ContainerApplication.Architecture containerArchitecture,
string dockerfile,
string dockerComposeFile = "docker-compose-awssdk.yml")
string dockerComposeFile = "docker-compose-awssdk.yml")
: RemoteApplicationFixture(new ContainerApplication(distroTag, containerArchitecture, DotnetVersion, dockerfile,
dockerComposeFile, "awssdktestapp"))
{
Expand Down Expand Up @@ -50,7 +49,7 @@ public string ExerciseSQS_SendAndReceiveInSeparateTransactions(string queueName)
{
var address = $"http://localhost:{Port}/awssdk";

var queueUrl = GetString($"{address}/SQS_InitializeQueue?queueName={queueName}");
var queueUrl = GetString($"{address}/SQS_InitializeQueue?queueName={queueName}");

GetAndAssertStatusCode($"{address}/SQS_SendMessageToQueue?message=Hello&messageQueueUrl={queueUrl}", System.Net.HttpStatusCode.OK);

Expand All @@ -61,4 +60,17 @@ public string ExerciseSQS_SendAndReceiveInSeparateTransactions(string queueName)
return messagesJson;
}

public string ExerciseSQS_ReceiveEmptyMessage(string queueName)
{
var address = $"http://localhost:{Port}/awssdk";

var queueUrl = GetString($"{address}/SQS_InitializeQueue?queueName={queueName}");

var messagesJson = GetString($"{address}/SQS_ReceiveMessageFromQueue?messageQueueUrl={queueUrl}");

GetAndAssertStatusCode($"{address}/SQS_DeleteQueue?messageQueueUrl={queueUrl}", System.Net.HttpStatusCode.OK);

return messagesJson;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,24 @@

namespace NewRelic.Agent.ContainerIntegrationTests.Tests.AwsSdk;

public class AwsSdkSQSTest : NewRelicIntegrationTest<AwsSdkContainerSQSTestFixture>
public abstract class AwsSdkSQSTestBase : NewRelicIntegrationTest<AwsSdkContainerSQSTestFixture>
{
private readonly AwsSdkContainerSQSTestFixture _fixture;

private readonly string _testQueueName1 = $"TestQueue1-{Guid.NewGuid()}";
private readonly string _testQueueName2 = $"TestQueue2-{Guid.NewGuid()}";
private readonly string _testQueueName3 = $"TestQueue3-{Guid.NewGuid()}";
private readonly string _metricScope1 = "WebTransaction/MVC/AwsSdk/SQS_SendReceivePurge/{queueName}";
private readonly string _metricScope2 = "WebTransaction/MVC/AwsSdk/SQS_SendMessageToQueue/{message}/{messageQueueUrl}";
private bool _initCollections;

public AwsSdkSQSTest(AwsSdkContainerSQSTestFixture fixture, ITestOutputHelper output) : base(fixture)
protected AwsSdkSQSTestBase(AwsSdkContainerSQSTestFixture fixture, ITestOutputHelper output, bool initCollections) : base(fixture)
{
_fixture = fixture;
_fixture.TestLogger = output;
_initCollections = initCollections;

_fixture.SetAdditionalEnvironmentVariable("AWSSDK_INITCOLLECTIONS", initCollections.ToString());


_fixture.Actions(setupConfiguration: () =>
Expand All @@ -44,6 +49,7 @@ public AwsSdkSQSTest(AwsSdkContainerSQSTestFixture fixture, ITestOutputHelper ou

_fixture.ExerciseSQS_SendReceivePurge(_testQueueName1);
_fixture.ExerciseSQS_SendAndReceiveInSeparateTransactions(_testQueueName2);
_fixture.ExerciseSQS_ReceiveEmptyMessage(_testQueueName3);

_fixture.AgentLog.WaitForLogLine(AgentLogBase.MetricDataLogLineRegex, TimeSpan.FromMinutes(2));
_fixture.AgentLog.WaitForLogLine(AgentLogBase.TransactionTransformCompletedLogLineRegex, TimeSpan.FromMinutes(2));
Expand All @@ -60,6 +66,12 @@ public AwsSdkSQSTest(AwsSdkContainerSQSTestFixture fixture, ITestOutputHelper ou
[Fact]
public void Test()
{
// Making sure there are no application errors or wrapper exceptions
// See https://github.com/newrelic/newrelic-dotnet-agent/issues/2811

Assert.Equal(0, _fixture.AgentLog.GetWrapperExceptionLineCount());
Assert.Equal(0, _fixture.AgentLog.GetApplicationErrorLineCount());

var metrics = _fixture.AgentLog.GetMetrics().ToList();

var expectedMetrics = new List<Assertions.ExpectedMetric>
Expand All @@ -76,9 +88,18 @@ public void Test()
new() { metricName = $"MessageBroker/SQS/Queue/Consume/Named/{_testQueueName2}", callCount = 1},
new() { metricName = $"MessageBroker/SQS/Queue/Consume/Named/{_testQueueName2}", callCount = 1, metricScope = "OtherTransaction/Custom/AwsSdkTestApp.SQSBackgroundService.SQSReceiverService/ProcessRequestAsync"},

new () { metricName = "Supportability/TraceContext/Accept/Success", callCount = 1}, // only one accept should occur (from the SQSReceiverService/ProcessRequestAsync transaction)
// Only consume metrics for queue 3
new() { metricName = $"MessageBroker/SQS/Queue/Consume/Named/{_testQueueName3}", callCount = 1},
new() { metricName = $"MessageBroker/SQS/Queue/Consume/Named/{_testQueueName3}", callCount = 1, metricScope = "OtherTransaction/Custom/AwsSdkTestApp.SQSBackgroundService.SQSReceiverService/ProcessRequestAsync"},

};

// If the AWS SDK is configured to NOT initialize empty collections, trace headers will not be accepted
if (_initCollections)
{
expectedMetrics.Add(new() { metricName = "Supportability/TraceContext/Accept/Success", callCount = 1 });
}

var sendReceivePurgeTransactionEvent = _fixture.AgentLog.TryGetTransactionEvent(_metricScope1);
var sendReceivePurgeTransactionSample = _fixture.AgentLog.TryGetTransactionSample(_metricScope1);
var sendReceivePurgeExpectedTransactionTraceSegments = new List<string>
Expand All @@ -102,25 +123,34 @@ public void Test()
() => Assert.True(receiveMessageTransactionEvent != null, "receiveMessageTransactionEvent should not be null")
);

// verify that distributed trace worked as expected -- the last produce span should have the same traceId and parentId as the last consume span
var queueProduce = $"MessageBroker/SQS/Queue/Produce/Named/{_testQueueName2}";
var queueConsume = $"MessageBroker/SQS/Queue/Consume/Named/{_testQueueName2}";

var spans = _fixture.AgentLog.GetSpanEvents().ToList();
var produceSpan = spans.LastOrDefault(s => s.IntrinsicAttributes["name"].Equals(queueProduce));
var consumeSpan = spans.LastOrDefault(s => s.IntrinsicAttributes["name"].Equals(queueConsume));
var processRequestSpan = spans.LastOrDefault(s => s.IntrinsicAttributes["name"].Equals("OtherTransaction/Custom/AwsSdkTestApp.SQSBackgroundService.SQSReceiverService/ProcessRequestAsync"));

NrAssert.Multiple(
() => Assert.True(produceSpan != null, "produceSpan should not be null"),
() => Assert.True(consumeSpan != null, "consumeSpan should not be null"),
() => Assert.True(processRequestSpan != null, "processRequestSpan should not be null"),
() => Assert.True(produceSpan!.IntrinsicAttributes.ContainsKey("traceId")),
() => Assert.True(produceSpan!.IntrinsicAttributes.ContainsKey("guid")),
() => Assert.True(consumeSpan!.IntrinsicAttributes.ContainsKey("traceId")),
() => Assert.True(processRequestSpan!.IntrinsicAttributes.ContainsKey("parentId")),
() => Assert.Equal(produceSpan!.IntrinsicAttributes["traceId"], consumeSpan!.IntrinsicAttributes["traceId"]),
() => Assert.Equal(produceSpan!.IntrinsicAttributes["guid"], processRequestSpan!.IntrinsicAttributes["parentId"]),
() => Assert.True(consumeSpan!.IntrinsicAttributes.ContainsKey("traceId"))
);

if (_initCollections)
{
// verify that distributed trace worked as expected -- the last produce span should have the same traceId and parentId as the last consume span
var processRequestSpan = spans.LastOrDefault(s => s.IntrinsicAttributes["name"].Equals("OtherTransaction/Custom/AwsSdkTestApp.SQSBackgroundService.SQSReceiverService/ProcessRequestAsync") && s.IntrinsicAttributes.ContainsKey("parentId"));

NrAssert.Multiple(
() => Assert.True(processRequestSpan != null, "processRequestSpan should not be null"),
() => Assert.Equal(produceSpan!.IntrinsicAttributes["traceId"], consumeSpan!.IntrinsicAttributes["traceId"]),
() => Assert.Equal(produceSpan!.IntrinsicAttributes["guid"], processRequestSpan!.IntrinsicAttributes["parentId"])
);
}

NrAssert.Multiple(
// entity relationship attributes
() => Assert.Equal(produceSpan!.AgentAttributes["messaging.system"], "aws_sqs"),
() => Assert.Equal(produceSpan!.AgentAttributes["messaging.destination.name"], _testQueueName2),
Expand All @@ -133,3 +163,17 @@ public void Test()
);
}
}

public class AwsSdkSQSTestInitializedCollections : AwsSdkSQSTestBase
{
public AwsSdkSQSTestInitializedCollections(AwsSdkContainerSQSTestFixture fixture, ITestOutputHelper output) : base(fixture, output, true)
{
}
}
public class AwsSdkSQSTestNullCollections : AwsSdkSQSTestBase
{
public AwsSdkSQSTestNullCollections(AwsSdkContainerSQSTestFixture fixture, ITestOutputHelper output) : base(fixture, output, false)
{
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,13 @@ public CentosX64ContainerTest(CentosX64ContainerTestFixture fixture, ITestOutput
}
}

public class CentosArm64ContainerTest : LinuxContainerTest<CentosArm64ContainerTestFixture>
{
public CentosArm64ContainerTest(CentosArm64ContainerTestFixture fixture, ITestOutputHelper output) : base(fixture, output)
{
}
}
// temporarily disabled until the checksum issue is resolved
// public class CentosArm64ContainerTest : LinuxContainerTest<CentosArm64ContainerTestFixture>
// {
// public CentosArm64ContainerTest(CentosArm64ContainerTestFixture fixture, ITestOutputHelper output) : base(fixture, output)
// {
// }
// }

public class AmazonX64ContainerTest : LinuxContainerTest<AmazonX64ContainerTestFixture>
{
Expand Down
Loading

0 comments on commit 758b770

Please sign in to comment.