diff --git a/CHANGELOG.md b/CHANGELOG.md index d1a1691bca..52cfa62eaf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,8 @@ If your change does not need a CHANGELOG entry, add the "skip changelog" label t ### Enhancements +- Add CloudWatch EMF metrics exporter with auto instrumentation configuration + ([#1209](https://github.com/aws-observability/aws-otel-java-instrumentation/pull/1209)) - Support X-Ray Trace Id extraction from Lambda Context object, and respect user-configured OTEL_PROPAGATORS in AWS Lamdba instrumentation ([#1191](https://github.com/aws-observability/aws-otel-java-instrumentation/pull/1191)) ([#1218](https://github.com/aws-observability/aws-otel-java-instrumentation/pull/1218)) - Adaptive Sampling improvements: Ensure propagation of sampling rule across services and AWS accounts. Remove unnecessary B3 propagator. diff --git a/awsagentprovider/build.gradle.kts b/awsagentprovider/build.gradle.kts index 5cc97b14fc..003e7d1ac8 100644 --- a/awsagentprovider/build.gradle.kts +++ b/awsagentprovider/build.gradle.kts @@ -53,6 +53,8 @@ dependencies { runtimeOnly("software.amazon.awssdk:sts") implementation("software.amazon.awssdk:auth") implementation("software.amazon.awssdk:http-auth-aws") + // For EMF exporter + implementation("software.amazon.awssdk:cloudwatchlogs") testImplementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure") testImplementation("io.opentelemetry:opentelemetry-sdk-testing") diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsConfigValidator.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsConfigUtils.java similarity index 64% rename from awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsConfigValidator.java rename to awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsConfigUtils.java index bd4f6980d2..f77cc305f7 100644 --- a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsConfigValidator.java +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsConfigUtils.java @@ -18,16 +18,45 @@ import static software.amazon.opentelemetry.javaagent.providers.AwsApplicationSignalsCustomizerProvider.*; import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties; -import java.util.Arrays; +import java.util.*; import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Pattern; /** Utilities class to validate ADOT environment variable configuration. */ -public final class AwsApplicationSignalsConfigValidator { +public final class AwsApplicationSignalsConfigUtils { private static final Logger logger = Logger.getLogger(AwsApplicationSignalsCustomizerProvider.class.getName()); + /** + * Removes "awsemf" from OTEL_METRICS_EXPORTER if present to prevent validation errors from OTel + * dependencies which would try to load metric exporters. We will contribute emf exporter to + * upstream for supporting OTel metrics in SDK + * + * @param configProps the configuration properties + * @return Optional containing string with "awsemf" removed if the original OTEL_METRICS_EXPORTER + * contains "awsemf", otherwise empty Optional if "awsemf" is not found + */ + static Optional removeEmfExporterIfEnabled(ConfigProperties configProps) { + String metricExporters = configProps.getString(OTEL_METRICS_EXPORTER); + + if (metricExporters == null || !metricExporters.contains("awsemf")) { + return Optional.empty(); + } + + String[] exporters = metricExporters.split(","); + List filtered = + Arrays.stream(exporters) + .map(String::trim) + .filter(exp -> !exp.equals("awsemf")) + .collect(java.util.stream.Collectors.toList()); + + // Return empty string instead of "none" because upstream will not call + // customizeMetricExporter if OTEL_METRICS_EXPORTER is set to "none" as it assumes + // no metrics exporter is configured + return Optional.of(filtered.isEmpty() ? "" : String.join(",", filtered)); + } + /** * Is the given configuration correct to enable SigV4 for Logs? * @@ -61,27 +90,21 @@ static boolean isSigV4EnabledLogs(ConfigProperties config) { if (logsHeaders == null || logsHeaders.isEmpty()) { logger.warning( - "Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS to include x-aws-log-group and x-aws-log-stream"); + String.format( + "Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS to include %s and %s", + AWS_OTLP_LOGS_GROUP_HEADER, AWS_OTLP_LOGS_STREAM_HEADER)); return false; } + Map parsedHeaders = + AwsApplicationSignalsConfigUtils.parseOtlpHeaders(logsHeaders); - long filteredLogHeaders = - Arrays.stream(logsHeaders.split(",")) - .filter( - pair -> { - if (pair.contains("=")) { - String key = pair.split("=", 2)[0]; - return key.equals(AWS_OTLP_LOGS_GROUP_HEADER) - || key.equals(AWS_OTLP_LOGS_STREAM_HEADER); - } - return false; - }) - .count(); - - if (filteredLogHeaders != 2) { + if (!(parsedHeaders.containsKey(AWS_OTLP_LOGS_GROUP_HEADER) + && parsedHeaders.containsKey(AWS_OTLP_LOGS_STREAM_HEADER))) { logger.warning( - "Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS to have values for x-aws-log-group and x-aws-log-stream"); + String.format( + "Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS to have values for %s and %s", + AWS_OTLP_LOGS_GROUP_HEADER, AWS_OTLP_LOGS_STREAM_HEADER)); return false; } @@ -168,4 +191,26 @@ private static boolean isSigv4ValidConfig( return false; } + + /** + * Parse OTLP headers and return a map of header key to value. See: ... + * + * @param headersString the headers string in format "key1=value1,key2=value2" + * @return map of header keys to values + */ + static Map parseOtlpHeaders(String headersString) { + Map headers = new HashMap<>(); + if (headersString == null || headersString.isEmpty()) { + return headers; + } + + for (String pair : headersString.split(",")) { + if (pair.contains("=")) { + String[] keyValue = pair.split("=", 2); + headers.put(keyValue[0].trim(), keyValue[1].trim()); + } + } + return headers; + } } diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProvider.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProvider.java index 1073b52bc3..c0fcbdf981 100644 --- a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProvider.java +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProvider.java @@ -65,6 +65,8 @@ import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.concurrent.Immutable; +import software.amazon.opentelemetry.javaagent.providers.exporter.aws.metrics.AwsCloudWatchEmfExporter; +import software.amazon.opentelemetry.javaagent.providers.exporter.aws.metrics.ConsoleEmfExporter; import software.amazon.opentelemetry.javaagent.providers.exporter.otlp.aws.logs.OtlpAwsLogsExporterBuilder; import software.amazon.opentelemetry.javaagent.providers.exporter.otlp.aws.traces.OtlpAwsSpanExporterBuilder; @@ -86,6 +88,9 @@ @Immutable public final class AwsApplicationSignalsCustomizerProvider implements AutoConfigurationCustomizerProvider { + // https://docs.aws.amazon.com/cli/v1/userguide/cli-configure-envvars.html + static final String AWS_REGION = "aws.region"; + static final String AWS_DEFAULT_REGION = "aws.default.region"; static final String AWS_LAMBDA_FUNCTION_NAME_CONFIG = "AWS_LAMBDA_FUNCTION_NAME"; static final String LAMBDA_APPLICATION_SIGNALS_REMOTE_ENVIRONMENT = "LAMBDA_APPLICATION_SIGNALS_REMOTE_ENVIRONMENT"; @@ -103,6 +108,7 @@ public final class AwsApplicationSignalsCustomizerProvider // https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html#CloudWatch-LogsEndpoint static final String AWS_OTLP_LOGS_GROUP_HEADER = "x-aws-log-group"; static final String AWS_OTLP_LOGS_STREAM_HEADER = "x-aws-log-stream"; + static final String AWS_EMF_METRICS_NAMESPACE = "x-aws-metric-namespace"; private static final String DEPRECATED_SMP_ENABLED_CONFIG = "otel.smp.enabled"; private static final String DEPRECATED_APP_SIGNALS_ENABLED_CONFIG = @@ -132,7 +138,7 @@ public final class AwsApplicationSignalsCustomizerProvider private static final String OTEL_BSP_MAX_EXPORT_BATCH_SIZE_CONFIG = "otel.bsp.max.export.batch.size"; - private static final String OTEL_METRICS_EXPORTER = "otel.metrics.exporter"; + static final String OTEL_METRICS_EXPORTER = "otel.metrics.exporter"; static final String OTEL_LOGS_EXPORTER = "otel.logs.exporter"; static final String OTEL_TRACES_EXPORTER = "otel.traces.exporter"; static final String OTEL_EXPORTER_OTLP_TRACES_PROTOCOL = "otel.exporter.otlp.traces.protocol"; @@ -161,6 +167,7 @@ public final class AwsApplicationSignalsCustomizerProvider private static final int LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10; private Sampler sampler; + private boolean isEmfExporterEnabled = false; public void customize(AutoConfigurationCustomizer autoConfiguration) { autoConfiguration.addPropertiesCustomizer(this::customizeProperties); @@ -171,6 +178,15 @@ public void customize(AutoConfigurationCustomizer autoConfiguration) { autoConfiguration.addMeterProviderCustomizer(this::customizeMeterProvider); autoConfiguration.addSpanExporterCustomizer(this::customizeSpanExporter); autoConfiguration.addLogRecordExporterCustomizer(this::customizeLogsExporter); + autoConfiguration.addMetricExporterCustomizer(this::customizeMetricExporter); + } + + private static Optional getAwsRegionFromConfig(ConfigProperties configProps) { + String region = configProps.getString(AWS_REGION); + if (region != null) { + return Optional.of(region); + } + return Optional.ofNullable(configProps.getString(AWS_DEFAULT_REGION)); } static boolean isLambdaEnvironment() { @@ -190,10 +206,18 @@ private boolean isApplicationSignalsRuntimeEnabled(ConfigProperties configProps) && configProps.getBoolean(APPLICATION_SIGNALS_RUNTIME_ENABLED_CONFIG, true); } - private Map customizeProperties(ConfigProperties configProps) { + Map customizeProperties(ConfigProperties configProps) { Map propsOverride = new HashMap<>(); boolean isLambdaEnvironment = isLambdaEnvironment(); + // Check if awsemf was specified and remove it from OTEL_METRICS_EXPORTER + Optional filteredExporters = + AwsApplicationSignalsConfigUtils.removeEmfExporterIfEnabled(configProps); + if (filteredExporters.isPresent()) { + this.isEmfExporterEnabled = true; + propsOverride.put(OTEL_METRICS_EXPORTER, filteredExporters.get()); + } + // Enable AWS Resource Providers propsOverride.put(OTEL_RESOURCE_PROVIDERS_AWS_ENABLED, "true"); @@ -394,7 +418,6 @@ private SdkTracerProviderBuilder customizeTracerProviderBuilder( private SdkMeterProviderBuilder customizeMeterProvider( SdkMeterProviderBuilder sdkMeterProviderBuilder, ConfigProperties configProps) { - if (isApplicationSignalsRuntimeEnabled(configProps)) { Set registeredScopeNames = new HashSet<>(1); String jmxRuntimeScopeName = "io.opentelemetry.jmx"; @@ -434,7 +457,7 @@ SpanExporter customizeSpanExporter(SpanExporter spanExporter, ConfigProperties c } } - if (AwsApplicationSignalsConfigValidator.isSigV4EnabledTraces(configProps)) { + if (AwsApplicationSignalsConfigUtils.isSigV4EnabledTraces(configProps)) { // can cast here since we've checked that the configuration for OTEL_TRACES_EXPORTER is otlp // and OTEL_EXPORTER_OTLP_TRACES_PROTOCOL is http/protobuf // so the given spanExporter will be an instance of OtlpHttpSpanExporter @@ -480,7 +503,7 @@ private boolean isOtlpSpanExporter(SpanExporter spanExporter) { LogRecordExporter customizeLogsExporter( LogRecordExporter logsExporter, ConfigProperties configProps) { - if (AwsApplicationSignalsConfigValidator.isSigV4EnabledLogs(configProps)) { + if (AwsApplicationSignalsConfigUtils.isSigV4EnabledLogs(configProps)) { // can cast here since we've checked that the configuration for OTEL_LOGS_EXPORTER is otlp and // OTEL_EXPORTER_OTLP_LOGS_PROTOCOL is http/protobuf // so the given logsExporter will be an instance of OtlpHttpLogRecorderExporter @@ -509,6 +532,45 @@ LogRecordExporter customizeLogsExporter( return logsExporter; } + MetricExporter customizeMetricExporter( + MetricExporter metricExporter, ConfigProperties configProps) { + if (isEmfExporterEnabled) { + Map headers = + AwsApplicationSignalsConfigUtils.parseOtlpHeaders( + configProps.getString(OTEL_EXPORTER_OTLP_LOGS_HEADERS)); + Optional awsRegion = getAwsRegionFromConfig(configProps); + + if (awsRegion.isPresent()) { + String namespace = headers.get(AWS_EMF_METRICS_NAMESPACE); + + if (headers.containsKey(AWS_OTLP_LOGS_GROUP_HEADER) + && headers.containsKey(AWS_OTLP_LOGS_STREAM_HEADER)) { + String logGroup = headers.get(AWS_OTLP_LOGS_GROUP_HEADER); + String logStream = headers.get(AWS_OTLP_LOGS_STREAM_HEADER); + return new AwsCloudWatchEmfExporter(namespace, logGroup, logStream, awsRegion.get()); + } + if (isLambdaEnvironment()) { + return new ConsoleEmfExporter(namespace); + } + logger.warning( + String.format( + "Improper EMF Exporter configuration: Please configure the environment variable %s to have values for %s, %s, and %s", + OTEL_EXPORTER_OTLP_LOGS_HEADERS, + AWS_OTLP_LOGS_GROUP_HEADER, + AWS_OTLP_LOGS_STREAM_HEADER, + AWS_EMF_METRICS_NAMESPACE)); + + } else { + logger.warning( + String.format( + "Improper EMF Exporter configuration: AWS region not found in environment variables please set %s or %s", + AWS_REGION, AWS_DEFAULT_REGION)); + } + } + + return metricExporter; + } + static AwsXrayAdaptiveSamplingConfig parseConfigString(String config) throws JsonProcessingException { if (config == null) { diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/AwsCloudWatchEmfExporter.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/AwsCloudWatchEmfExporter.java new file mode 100644 index 0000000000..7580da4e1f --- /dev/null +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/AwsCloudWatchEmfExporter.java @@ -0,0 +1,73 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.opentelemetry.javaagent.providers.exporter.aws.metrics; + +import io.opentelemetry.sdk.common.CompletableResultCode; +import java.util.logging.Logger; +import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; +import software.amazon.opentelemetry.javaagent.providers.exporter.aws.metrics.common.BaseEmfExporter; +import software.amazon.opentelemetry.javaagent.providers.exporter.aws.metrics.common.emitter.CloudWatchLogsClientEmitter; +import software.amazon.opentelemetry.javaagent.providers.exporter.aws.metrics.common.emitter.LogEventEmitter; + +/** + * EMF metrics exporter for sending data directly to CloudWatch Logs. + * + *

This exporter converts OTel metrics into CloudWatch EMF logs which are then sent to CloudWatch + * Logs. CloudWatch Logs automatically extracts the metrics from the EMF logs. + * + *

... + */ +public class AwsCloudWatchEmfExporter extends BaseEmfExporter { + private static final Logger logger = Logger.getLogger(AwsCloudWatchEmfExporter.class.getName()); + + /** + * Initialize the CloudWatch EMF exporter. + * + * @param namespace CloudWatch namespace for metrics (default: "default") + * @param logGroupName CloudWatch log group name + * @param logStreamName CloudWatch log stream name (auto-generated if null) + * @param awsRegion AWS region + */ + public AwsCloudWatchEmfExporter( + String namespace, String logGroupName, String logStreamName, String awsRegion) { + super(namespace, new CloudWatchLogsClientEmitter(logGroupName, logStreamName, awsRegion)); + } + + /** + * Initialize the CloudWatch EMF exporter with a custom emitter. + * + * @param namespace CloudWatch namespace for metrics + * @param emitter Custom log emitter + */ + public AwsCloudWatchEmfExporter(String namespace, LogEventEmitter emitter) { + super(namespace, emitter); + } + + @Override + public CompletableResultCode flush() { + this.emitter.flushEvents(); + logger.fine("AwsCloudWatchEmfExporter force flushes the buffered metrics"); + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode shutdown() { + this.flush(); + logger.fine("AwsCloudWatchEmfExporter shutdown called"); + return CompletableResultCode.ofSuccess(); + } +} diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/ConsoleEmfExporter.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/ConsoleEmfExporter.java new file mode 100644 index 0000000000..409f667ea3 --- /dev/null +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/ConsoleEmfExporter.java @@ -0,0 +1,63 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.opentelemetry.javaagent.providers.exporter.aws.metrics; + +import io.opentelemetry.sdk.common.CompletableResultCode; +import java.io.PrintStream; +import java.util.logging.Level; +import java.util.logging.Logger; +import software.amazon.opentelemetry.javaagent.providers.exporter.aws.metrics.common.BaseEmfExporter; +import software.amazon.opentelemetry.javaagent.providers.exporter.aws.metrics.common.emitter.ConsoleEmitter; +import software.amazon.opentelemetry.javaagent.providers.exporter.aws.metrics.common.emitter.LogEventEmitter; + +/** EMF metrics exporter for printing data to Standard Out. */ +public class ConsoleEmfExporter extends BaseEmfExporter { + private static final Logger logger = Logger.getLogger(ConsoleEmfExporter.class.getName()); + + /** + * Initialize the Console EMF exporter. + * + * @param namespace CloudWatch namespace for metrics (defaults to "default") + */ + public ConsoleEmfExporter(String namespace) { + super(namespace, new ConsoleEmitter()); + } + + /** + * Initialize the Console EMF exporter with custom emitter. + * + * @param namespace CloudWatch namespace for metrics + * @param emitter Custom emitter + */ + public ConsoleEmfExporter(String namespace, LogEventEmitter emitter) { + super(namespace, emitter); + } + + @Override + public CompletableResultCode flush() { + this.emitter.flushEvents(); + logger.log( + Level.FINE, + "ConsoleEmfExporter force_flush called - no buffering to flush for console output"); + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode shutdown() { + logger.log(Level.FINE, "ConsoleEmfExporter shutdown called"); + return CompletableResultCode.ofSuccess(); + } +} diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/common/BaseEmfExporter.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/common/BaseEmfExporter.java new file mode 100644 index 0000000000..4b84340bb4 --- /dev/null +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/common/BaseEmfExporter.java @@ -0,0 +1,170 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.opentelemetry.javaagent.providers.exporter.aws.metrics.common; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.metrics.Aggregation; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.Data; +import io.opentelemetry.sdk.metrics.data.ExponentialHistogramData; +import io.opentelemetry.sdk.metrics.data.ExponentialHistogramPointData; +import io.opentelemetry.sdk.metrics.data.GaugeData; +import io.opentelemetry.sdk.metrics.data.HistogramData; +import io.opentelemetry.sdk.metrics.data.HistogramPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.PointData; +import io.opentelemetry.sdk.metrics.data.SumData; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.logging.Logger; +import software.amazon.opentelemetry.javaagent.providers.exporter.aws.metrics.common.emitter.LogEventEmitter; + +/** + * Base class for EMF metric exporters that converts OpenTelemetry metrics to CloudWatch EMF format. + */ +public abstract class BaseEmfExporter implements MetricExporter { + private static final Logger logger = Logger.getLogger(BaseEmfExporter.class.getName()); + private final String namespace; + protected final LogEventEmitter emitter; + + /** + * Creates a new EMF exporter with the specified namespace and log emitter. + * + * @param namespace the CloudWatch metric namespace, defaults to "default" if null + * @param emitter the log event emitter for sending EMF logs + */ + protected BaseEmfExporter(String namespace, LogEventEmitter emitter) { + this.namespace = namespace != null ? namespace : "default"; + this.emitter = emitter; + } + + @Override + public CompletableResultCode export(Collection metricsData) { + try { + if (metricsData.isEmpty()) { + return CompletableResultCode.ofSuccess(); + } + + // Group metrics by attributes and timestamp + Map> groupedMetrics = new HashMap<>(); + + for (MetricData metric : metricsData) { + Data metricData = metric.getData(); + if (metricData == null || metricData.getPoints().isEmpty()) { + continue; + } + + for (PointData point : metricData.getPoints()) { + MetricRecord record = null; + + if (metricData instanceof GaugeData || metricData instanceof SumData) { + record = MetricRecord.convertGaugeAndSum(metric, point); + } + if (metricData instanceof HistogramData && point instanceof HistogramPointData) { + record = MetricRecord.convertHistogram(metric, (HistogramPointData) point); + } + if (metricData instanceof ExponentialHistogramData + && point instanceof ExponentialHistogramPointData) { + record = + MetricRecord.convertExponentialHistogram( + metric, (ExponentialHistogramPointData) point); + } + + if (record == null) { + logger.fine( + String.format( + "Unsupported metric data type: %s", metricData.getClass().getSimpleName())); + continue; + } + + String groupKey = this.groupByAttributesAndTimestamp(record); + groupedMetrics.computeIfAbsent(groupKey, k -> new ArrayList<>()).add(record); + } + } + + // Process each group to create EMF logs + for (List records : groupedMetrics.values()) { + if (!records.isEmpty()) { + MetricRecord firstRecord = records.get(0); + // Get resource from first metric in the collection + MetricData firstMetric = metricsData.iterator().next(); + Map emfLog = + MetricRecord.createEmfLog( + records, + firstMetric.getResource().getAttributes(), + this.namespace, + firstRecord.getTimestamp()); + + Map logEvent = new HashMap<>(); + logEvent.put("message", new ObjectMapper().writeValueAsString(emfLog)); + logEvent.put("timestamp", firstRecord.getTimestamp()); + this.emitter.emit(logEvent); + } + } + + return CompletableResultCode.ofSuccess(); + } catch (Exception e) { + logger.severe(String.format("Failed to export metrics: %s", e.getMessage())); + return CompletableResultCode.ofFailure(); + } + } + + @Override + public abstract CompletableResultCode flush(); + + @Override + public abstract CompletableResultCode shutdown(); + + @Override + public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) { + return AggregationTemporality.DELTA; + } + + @Override + public Aggregation getDefaultAggregation(InstrumentType instrumentType) { + if (instrumentType == InstrumentType.HISTOGRAM) { + return Aggregation.base2ExponentialBucketHistogram(); + } + return Aggregation.defaultAggregation(); + } + + private String groupByAttributesAndTimestamp(MetricRecord record) { + // Java doesn't have built-in, hash-able tuples, so we + // concatenate the attributes key and timestamp into a single string to create a unique + // grouping key for the HashMap. + String attrsKey = getAttributesKey(record.getAttributes()); + return attrsKey + "_" + record.getTimestamp(); + } + + private String getAttributesKey(Attributes attributes) { + // Sort the attributes to ensure consistent keys + // Using TreeMap: The map is sorted + // according to the natural ordering of its keys, or by a Comparator provided at map creation + // time, depending on which constructor is used. + // https://docs.oracle.com/javase/8/docs/api/java/util/TreeMap.html + Map sortedAttrs = new TreeMap<>(); + attributes.forEach((key, value) -> sortedAttrs.put(key.getKey(), value)); + return sortedAttrs.toString(); + } +} diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/common/MetricRecord.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/common/MetricRecord.java new file mode 100644 index 0000000000..9c83760fc3 --- /dev/null +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/common/MetricRecord.java @@ -0,0 +1,435 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.opentelemetry.javaagent.providers.exporter.aws.metrics.common; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.metrics.data.DoublePointData; +import io.opentelemetry.sdk.metrics.data.ExponentialHistogramBuckets; +import io.opentelemetry.sdk.metrics.data.ExponentialHistogramPointData; +import io.opentelemetry.sdk.metrics.data.HistogramPointData; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.PointData; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.logging.Logger; +import javax.annotation.Nullable; + +/** The metric data unified representation of all OTel metrics for OTel to CW EMF conversion. */ +public class MetricRecord { + private static final Logger logger = Logger.getLogger(MetricRecord.class.getName()); + + // CloudWatch EMF supported units + // Ref: https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html + private static final Set EMF_SUPPORTED_UNITS = + new HashSet<>( + Arrays.asList( + "Seconds", + "Microseconds", + "Milliseconds", + "Bytes", + "Kilobytes", + "Megabytes", + "Gigabytes", + "Terabytes", + "Bits", + "Kilobits", + "Megabits", + "Gigabits", + "Terabits", + "Percent", + "Count", + "Bytes/Second", + "Kilobytes/Second", + "Megabytes/Second", + "Gigabytes/Second", + "Terabytes/Second", + "Bits/Second", + "Kilobits/Second", + "Megabits/Second", + "Gigabits/Second", + "Terabits/Second", + "Count/Second", + "None")); + + // OTel to CloudWatch unit mapping + // Ref: opentelemetry-collector-contrib/blob/main/exporter/awsemfexporter/grouped_metric.go#L188 + private static final Map UNIT_MAPPING = new HashMap<>(); + + static { + UNIT_MAPPING.put("1", ""); + UNIT_MAPPING.put("ns", ""); + UNIT_MAPPING.put("ms", "Milliseconds"); + UNIT_MAPPING.put("s", "Seconds"); + UNIT_MAPPING.put("us", "Microseconds"); + UNIT_MAPPING.put("By", "Bytes"); + UNIT_MAPPING.put("bit", "Bits"); + } + + // Instrument metadata + private final String name; + private final String unit; + + @Nullable private Long timestamp; + private Attributes attributes = Attributes.empty(); + + // Different metric type data - only one will be set per record + @Nullable private Double value; + @Nullable private Map histogramData; + @Nullable private Map expHistogramData; + + /** + * Initialize metric record. + * + * @param metricName Name of the metric + * @param metricUnit Unit of the metric + */ + public MetricRecord(String metricName, String metricUnit) { + this.name = metricName; + this.unit = metricUnit; + } + + /** + * Create EMF log dictionary from metric records. + * + * @param metricRecords List of metric records grouped by attributes + * @param resource Resource attributes + * @param namespace CloudWatch namespace + * @param timestamp Optional timestamp + * @return EMF log as Map + */ + static Map createEmfLog( + List metricRecords, + Attributes resource, + String namespace, + @Nullable Long timestamp) { + Map emfLog = new HashMap<>(); + + // Base structure + List cloudWatchMetrics = new ArrayList<>(); + Map aws = new HashMap<>(); + aws.put("Timestamp", timestamp != null ? timestamp : System.currentTimeMillis()); + aws.put("CloudWatchMetrics", cloudWatchMetrics); + emfLog.put("_aws", aws); + emfLog.put("Version", "1"); + + // Add resource attributes to EMF log but not as dimensions + // OTel collector EMF Exporter has a resource_to_telemetry_conversion flag that will convert + // resource attributes + // as regular metric attributes(potential dimensions). However, for this SDK EMF implementation, + // we align with the OpenTelemetry concept that all metric attributes are treated as dimensions. + // And have resource attributes as just additional metadata in EMF, added otel.resource as + // prefix to distinguish. + if (resource != null) { + resource.forEach( + (key, value) -> emfLog.put("otel.resource." + key.getKey(), value.toString())); + } + + List> metricDefinitions = new ArrayList<>(); + // Collect attributes from all records (they should be the same for all records in the group) + // Only collect once from the first record and apply to all records + Attributes allAttributes = + !metricRecords.isEmpty() ? metricRecords.get(0).getAttributes() : Attributes.empty(); + + // Process each metric record + for (MetricRecord record : metricRecords) { + String metricName = record.getName(); + if (metricName == null || metricName.isEmpty()) { + continue; + } + + Map metricData = new HashMap<>(); + metricData.put("Name", metricName); + + String unit = MetricRecord.convertUnit(record); + if (!unit.isEmpty()) { + metricData.put("Unit", unit); + } + + boolean hasMetricData = false; + if (record.getExpHistogramData() != null) { + emfLog.put(metricName, record.getExpHistogramData()); + hasMetricData = true; + } + if (record.getHistogramData() != null) { + emfLog.put(metricName, record.getHistogramData()); + hasMetricData = true; + } + if (record.getValue() != null) { + emfLog.put(metricName, record.getValue()); + hasMetricData = true; + } + if (!hasMetricData) { + logger.fine("Skipping metric " + metricName + " as it does not have valid metric value"); + continue; + } + + metricDefinitions.add(metricData); + } + + // Add attribute values to EMF log + allAttributes.forEach((key, value) -> emfLog.put(key.getKey(), value.toString())); + + // Add CloudWatch Metrics + if (!metricDefinitions.isEmpty()) { + Map cloudwatchMetric = new HashMap<>(); + cloudwatchMetric.put("Namespace", namespace); + cloudwatchMetric.put("Metrics", metricDefinitions); + + List dimensionNames = new ArrayList<>(); + allAttributes.forEach((key, value) -> dimensionNames.add(key.getKey())); + + if (!dimensionNames.isEmpty()) { + cloudwatchMetric.put("Dimensions", Collections.singletonList(dimensionNames)); + } + + cloudWatchMetrics.add(cloudwatchMetric); + } + + return emfLog; + } + + static MetricRecord convertHistogram(MetricData metric, HistogramPointData dataPoint) { + MetricRecord record = new MetricRecord(metric.getName(), metric.getUnit()); + long timestampMs = + dataPoint.getEpochNanos() != 0 + ? MetricRecord.normalizeTimestamp(dataPoint.getEpochNanos()) + : System.currentTimeMillis(); + Map histogramMap = new HashMap<>(); + histogramMap.put("Count", dataPoint.getCount()); + histogramMap.put("Sum", dataPoint.getSum()); + histogramMap.put("Min", dataPoint.getMin()); + histogramMap.put("Max", dataPoint.getMax()); + + record.setTimestamp(timestampMs); + record.setAttributes(dataPoint.getAttributes()); + record.setHistogramData(histogramMap); + + return record; + } + + static MetricRecord convertExponentialHistogram( + MetricData metric, ExponentialHistogramPointData dataPoint) { + MetricRecord record = new MetricRecord(metric.getName(), metric.getUnit()); + long timestampMs = + dataPoint.getEpochNanos() != 0 + ? MetricRecord.normalizeTimestamp(dataPoint.getEpochNanos()) + : System.currentTimeMillis(); + List values = new ArrayList<>(); + List counts = new ArrayList<>(); + + double base = Math.pow(2, Math.pow(2, -1 * dataPoint.getScale())); + + record.setTimestamp(timestampMs); + record.setAttributes(dataPoint.getAttributes()); + + // Process positive buckets + ExponentialHistogramBuckets positiveBuckets = dataPoint.getPositiveBuckets(); + if (positiveBuckets != null && !positiveBuckets.getBucketCounts().isEmpty()) { + int positiveOffset = positiveBuckets.getOffset(); + List positiveBucketCounts = positiveBuckets.getBucketCounts(); + + double bucketBegin = 0; + double bucketEnd = 0; + + for (int bucketIndex = 0; bucketIndex < positiveBucketCounts.size(); bucketIndex++) { + long count = positiveBucketCounts.get(bucketIndex); + if (count > 0) { + int index = bucketIndex + positiveOffset; + + if (bucketBegin == 0) { + bucketBegin = Math.pow(base, index); + } else { + bucketBegin = bucketEnd; + } + + bucketEnd = Math.pow(base, index + 1); + + // Calculate midpoint value of the bucket + double metricVal = (bucketBegin + bucketEnd) / 2; + + values.add((float) metricVal); + counts.add((float) count); + } + } + } + + long zeroCount = dataPoint.getZeroCount(); + if (zeroCount > 0) { + values.add(0f); + counts.add((float) zeroCount); + } + + // Process negative buckets + ExponentialHistogramBuckets negativeBuckets = dataPoint.getNegativeBuckets(); + if (negativeBuckets != null && !negativeBuckets.getBucketCounts().isEmpty()) { + int negativeOffset = negativeBuckets.getOffset(); + List negativeBucketCounts = negativeBuckets.getBucketCounts(); + + double bucketBegin = 0; + double bucketEnd = 0; + + for (int bucketIndex = 0; bucketIndex < negativeBucketCounts.size(); bucketIndex++) { + long count = negativeBucketCounts.get(bucketIndex); + if (count > 0) { + int index = bucketIndex + negativeOffset; + + if (bucketEnd == 0) { + bucketEnd = -Math.pow(base, index); + } else { + bucketEnd = bucketBegin; + } + + bucketBegin = -Math.pow(base, index + 1); + + // Calculate midpoint value of the bucket + double metricVal = (bucketBegin + bucketEnd) / 2; + + values.add((float) metricVal); + counts.add((float) count); + } + } + } + + Map expHistogramMap = new HashMap<>(); + expHistogramMap.put("Values", values); + expHistogramMap.put("Counts", counts); + expHistogramMap.put("Count", dataPoint.getCount()); + expHistogramMap.put("Sum", dataPoint.getSum()); + expHistogramMap.put("Max", dataPoint.getMax()); + expHistogramMap.put("Min", dataPoint.getMin()); + record.setExpHistogramData(expHistogramMap); + + return record; + } + + /** + * Convert a Gauge or Sum metric datapoint to a metric record. + * + * @param metric The metric object + * @param dataPoint The datapoint to convert + * @return MetricRecord with populated timestamp, attributes, and value + */ + static MetricRecord convertGaugeAndSum(MetricData metric, PointData dataPoint) { + MetricRecord record = new MetricRecord(metric.getName(), metric.getUnit()); + + long timestampMs = + dataPoint.getEpochNanos() != 0 + ? MetricRecord.normalizeTimestamp(dataPoint.getEpochNanos()) + : System.currentTimeMillis(); + + record.setTimestamp(timestampMs); + record.setAttributes(dataPoint.getAttributes()); + + if (dataPoint instanceof DoublePointData) { + record.setValue(((DoublePointData) dataPoint).getValue()); + } + if (dataPoint instanceof LongPointData) { + record.setValue((double) ((LongPointData) dataPoint).getValue()); + } + + return record; + } + + private static long normalizeTimestamp(long timestampNs) { + return timestampNs / 1_000_000; + } + + /** + * Converts OTel unit to equivalent CloudWatch unit. + * + * @param record The metric record + * @return CloudWatch-compatible unit or empty string. + */ + private static String convertUnit(MetricRecord record) { + String unit = record.getUnit(); + + if (unit == null || unit.isEmpty()) { + return ""; + } + + if (EMF_SUPPORTED_UNITS.contains(unit)) { + return unit; + } + + // Convert non-units that use curly braces to annotate a quantity to Count + // See: https://opentelemetry.io/docs/specs/semconv/general/metrics/#instrument-units + if (unit.startsWith("{") && unit.endsWith("}")) { + return "Count"; + } + + String mappedUnit = UNIT_MAPPING.get(unit); + return mappedUnit != null ? mappedUnit : ""; + } + + String getName() { + return this.name; + } + + String getUnit() { + return this.unit; + } + + @Nullable + Long getTimestamp() { + return this.timestamp; + } + + void setTimestamp(@Nullable Long timestamp) { + this.timestamp = timestamp; + } + + Attributes getAttributes() { + return this.attributes; + } + + void setAttributes(Attributes attributes) { + this.attributes = attributes; + } + + @Nullable + Double getValue() { + return this.value; + } + + void setValue(@Nullable Double value) { + this.value = value; + } + + @Nullable + Map getHistogramData() { + return this.histogramData; + } + + void setHistogramData(Map histogramData) { + this.histogramData = histogramData; + } + + @Nullable + Map getExpHistogramData() { + return this.expHistogramData; + } + + void setExpHistogramData(@Nullable Map expHistogramData) { + this.expHistogramData = expHistogramData; + } +} diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/common/emitter/CloudWatchLogsClientEmitter.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/common/emitter/CloudWatchLogsClientEmitter.java new file mode 100644 index 0000000000..402db1d4e6 --- /dev/null +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/common/emitter/CloudWatchLogsClientEmitter.java @@ -0,0 +1,410 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.opentelemetry.javaagent.providers.exporter.aws.metrics.common.emitter; + +import java.time.Duration; +import java.util.*; +import java.util.logging.Logger; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.retries.StandardRetryStrategy; +import software.amazon.awssdk.retries.api.BackoffStrategy; +import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; +import software.amazon.awssdk.services.cloudwatchlogs.model.*; + +/** + * A log event emitter that sends Log Events to CloudWatch Logs. + * + *

This class handles the batching logic and CloudWatch Logs API interactions for sending EMF + * logs while respecting CloudWatch Logs constraints. + */ +public class CloudWatchLogsClientEmitter implements LogEventEmitter { + private static final Logger logger = + Logger.getLogger(CloudWatchLogsClientEmitter.class.getName()); + + // Constants for CloudWatch Logs limits + // http://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/cloudwatch_limits_cwl.html + // http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html + private static final int CW_MAX_EVENT_PAYLOAD_BYTES = 256 * 1024; // 256KB + private static final int CW_MAX_REQUEST_EVENT_COUNT = 10000; + private static final int CW_PER_EVENT_HEADER_BYTES = 26; + private static final long BATCH_FLUSH_INTERVAL = 60 * 1000; // 60 seconds + private static final int CW_MAX_REQUEST_PAYLOAD_BYTES = 1024 * 1024; // 1MB + private static final String CW_TRUNCATED_SUFFIX = "[Truncated...]"; + // None of the log events in the batch can be older than 14 days + private static final long CW_EVENT_TIMESTAMP_LIMIT_PAST = 14 * 24 * 60 * 60 * 1000L; + // None of the log events in the batch can be more than 2 hours in the future + private static final long CW_EVENT_TIMESTAMP_LIMIT_FUTURE = 2 * 60 * 60 * 1000L; + + private CloudWatchLogsClient logsClient; + private final String logGroupName; + private final String logStreamName; + private final String awsRegion; + private LogEventBatch eventBatch; + + /** + * Initialize the CloudWatch Logs client wrapper. + * + * @param logGroupName CloudWatch log group name + * @param logStreamName CloudWatch log stream name (auto-generated if null) + * @param awsRegion AWS region + */ + public CloudWatchLogsClientEmitter(String logGroupName, String logStreamName, String awsRegion) { + this.logGroupName = logGroupName; + this.logStreamName = logStreamName != null ? logStreamName : generateLogStreamName(); + this.awsRegion = awsRegion; + } + + @Override + public CloudWatchLogsClient getEmitter() { + if (this.logsClient == null) { + // TODO: Add support for Retry-After header: + // https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling + // Current implementation uses exponential backoff but doesn't respect server-provided retry + // delays + this.logsClient = + CloudWatchLogsClient.builder() + .overrideConfiguration( + config -> config.retryStrategy(createExponentialBackoffRetryStrategy()).build()) + .region(Region.of(this.awsRegion)) + .build(); + } + return this.logsClient; + } + + @Override + public void emit(Map logEvent) { + try { + if (!isValidLogEvent(logEvent)) { + throw new IllegalArgumentException("Log event validation failed"); + } + + String message = (String) logEvent.get("message"); + Long timestamp = (Long) logEvent.get("timestamp"); + int eventSize = message.length() + CW_PER_EVENT_HEADER_BYTES; + + if (eventBatch == null) { + eventBatch = new LogEventBatch(); + } + + LogEventBatch currentBatch = eventBatch; + + if (willEventBatchExceedLimit(currentBatch, eventSize) + || !isBatchActive(currentBatch, timestamp)) { + this.sendLogBatch(currentBatch.getLogEvents()); + eventBatch = new LogEventBatch(); + currentBatch = eventBatch; + } + + currentBatch.addEvent(message, timestamp, eventSize); + + } catch (Exception error) { + logger.severe(String.format("Failed to process log event: %s", error.getMessage())); + throw error; + } + } + + @Override + public void flushEvents() { + if (eventBatch != null && !eventBatch.getLogEvents().isEmpty()) { + LogEventBatch currentBatch = eventBatch; + this.sendLogBatch(currentBatch.getLogEvents()); + eventBatch = new LogEventBatch(); + } + logger.fine("CloudWatchLogClient flushed the buffered log events"); + } + + /** Generate a unique log stream name. */ + private String generateLogStreamName() { + String uniqueId = UUID.randomUUID().toString().substring(0, 8); + return "otel-java-" + uniqueId; + } + + /** Create log group if it doesn't exist. */ + private void createLogGroupIfNeeded() { + try { + CreateLogGroupRequest request = + CreateLogGroupRequest.builder().logGroupName(this.logGroupName).build(); + + this.getEmitter().createLogGroup(request); + logger.info(String.format("Created log group: %s", this.logGroupName)); + } catch (ResourceAlreadyExistsException e) { + logger.fine(String.format("Log group %s already exists", this.logGroupName)); + } catch (AwsServiceException e) { + logger.severe( + String.format("Failed to create log group %s: %s", this.logGroupName, e.getMessage())); + throw e; + } + } + + /** Create log stream if it doesn't exist. */ + private void createLogStreamIfNeeded() { + try { + CreateLogStreamRequest request = + CreateLogStreamRequest.builder() + .logGroupName(this.logGroupName) + .logStreamName(this.logStreamName) + .build(); + + this.getEmitter().createLogStream(request); + logger.info(String.format("Created log stream: %s", this.logStreamName)); + } catch (ResourceAlreadyExistsException e) { + logger.fine(String.format("Log stream %s already exists", this.logStreamName)); + } catch (AwsServiceException e) { + logger.severe( + String.format("Failed to create log stream %s: %s", this.logStreamName, e.getMessage())); + throw e; + } + } + + /** + * Send a batch of log events to CloudWatch Logs. Creates log group and stream if they don't + * exist. + * + * @param batch The event batch to send + */ + private void sendLogBatch(List batch) { + if (batch.isEmpty()) { + return; + } + batch.sort(Comparator.comparing(InputLogEvent::timestamp)); + + PutLogEventsRequest request = + PutLogEventsRequest.builder() + .logGroupName(this.logGroupName) + .logStreamName(this.logStreamName) + .logEvents(batch) + .build(); + + long startTime = System.currentTimeMillis(); + + try { + this.getEmitter().putLogEvents(request); + + long elapsedMs = System.currentTimeMillis() - startTime; + int batchSizeKB = + batch.stream().mapToInt(logEvent -> logEvent.message().length()).sum() / 1024; + + logger.fine( + String.format( + "Successfully sent %d log events (%d KB) in %d ms", + batch.size(), batchSizeKB, elapsedMs)); + + } catch (ResourceNotFoundException e) { + logger.info("Log group or stream not found, creating resources and retrying"); + try { + createLogGroupIfNeeded(); + createLogStreamIfNeeded(); + + // Retry the PutLogEvents call + this.getEmitter().putLogEvents(request); + + long elapsedMs = System.currentTimeMillis() - startTime; + int batchSizeKB = + batch.stream().mapToInt(logEvent -> logEvent.message().length()).sum() / 1024; + logger.fine( + String.format( + "Successfully sent %d log events (%d KB) in %d ms after creating resources", + batch.size(), batchSizeKB, elapsedMs)); + + } catch (AwsServiceException retryError) { + logger.severe( + String.format( + "Failed to send log events after creating resources: %s", retryError.getMessage())); + throw retryError; + } + } catch (AwsServiceException e) { + logger.severe(String.format("Failed to send log events: %s", e.getMessage())); + throw e; + } + } + + /** + * Validate the log event according to CloudWatch Logs constraints. Truncates the log event + * message to CW_MAX_EVENT_PAYLOAD_BYTES if it exceeds size limits. + * + * @param logEvent The log event to validate + * @return True if the log event is valid, false otherwise + */ + private boolean isValidLogEvent(Map logEvent) { + String message = (String) logEvent.get("message"); + Long timestamp = (Long) logEvent.get("timestamp"); + + if (timestamp == null) { + timestamp = System.currentTimeMillis(); + } + + // Check empty message + if (message == null || message.trim().isEmpty()) { + logger.severe("Empty log event message"); + return false; + } + + // Check message size + int messageSize = message.length() + CW_PER_EVENT_HEADER_BYTES; + if (messageSize > CW_MAX_EVENT_PAYLOAD_BYTES) { + logger.warning( + String.format( + "Log event size %d exceeds maximum allowed size %d. Truncating.", + messageSize, CW_MAX_EVENT_PAYLOAD_BYTES)); + int maxMessageSize = + CW_MAX_EVENT_PAYLOAD_BYTES - CW_PER_EVENT_HEADER_BYTES - CW_TRUNCATED_SUFFIX.length(); + logEvent.put("message", message.substring(0, maxMessageSize) + CW_TRUNCATED_SUFFIX); + } + + // Check timestamp constraints + long currentTime = System.currentTimeMillis(); + long timeDiff = currentTime - timestamp; + + // Check if too old or too far in the future + if (timeDiff > CW_EVENT_TIMESTAMP_LIMIT_PAST + || timeDiff < -1 * CW_EVENT_TIMESTAMP_LIMIT_FUTURE) { + logger.severe( + String.format( + "Log event timestamp %d is either older than 14 days or more than 2 hours in the future. Current time: %d", + timestamp, currentTime)); + return false; + } + + return true; + } + + /** + * Will adding the next log event exceed CloudWatch Logs limits? + * + * @param batch The current batch of events + * @param nextEventSize Size of the next event in bytes + * @return True if adding the next event would exceed limits + */ + private boolean willEventBatchExceedLimit(LogEventBatch batch, int nextEventSize) { + int currentBatchSize = 0; + for (InputLogEvent event : batch.getLogEvents()) { + currentBatchSize += event.message().length() + CW_PER_EVENT_HEADER_BYTES; + } + + return batch.size() >= CW_MAX_REQUEST_EVENT_COUNT + || currentBatchSize + nextEventSize > CW_MAX_REQUEST_PAYLOAD_BYTES; + } + + /** + * Has the log event batch spanned for more than 24 hours? + * + * @param batch The log event batch + * @param targetTimestamp The timestamp of the event to add + * @return True if the batch is active and can accept the event + */ + private boolean isBatchActive(LogEventBatch batch, long targetTimestamp) { + // New log event batch + if (batch.getMinTimestampMs() == 0 || batch.getMaxTimestampMs() == 0) { + return true; + } + + // Check if adding the event would make the batch span more than 24 hours + if (targetTimestamp - batch.getMinTimestampMs() > 24 * 3600 * 1000L) { + return false; + } + + if (batch.getMaxTimestampMs() - targetTimestamp > 24 * 3600 * 1000L) { + return false; + } + + // Flush the event batch when reached 60s interval + return System.currentTimeMillis() - batch.getCreatedTimestampMs() < BATCH_FLUSH_INTERVAL; + } + + private static StandardRetryStrategy createExponentialBackoffRetryStrategy() { + // TODO: Add support for Retry-After header: + // https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling + BackoffStrategy backoffStrategy = + attempt -> { + // Exponential base-2 backoff: 1s, 2s, 4s, 8s, 16s, 32s, 64s max + long exponentialDelay = + Math.min( + Duration.ofSeconds(1).toMillis() * (1L << (attempt - 1)), + Duration.ofSeconds(64).toMillis()); + return Duration.ofMillis(exponentialDelay); + }; + + return StandardRetryStrategy.builder().backoffStrategy(backoffStrategy).maxAttempts(7).build(); + } + + /** + * Container for a batch of CloudWatch log events with metadata. + * + *

Tracks the log events, total byte size, and timestamps for batching and validation. + */ + private static class LogEventBatch { + private final List logEvents = new ArrayList<>(); + private int byteTotal = 0; + private long minTimestampMs = 0; + private long maxTimestampMs = 0; + private final long createdTimestampMs = System.currentTimeMillis(); + + private void addEvent(String message, Long timestamp, int eventSize) { + if (timestamp == null) { + timestamp = System.currentTimeMillis(); + } + + InputLogEvent inputLogEvent = + InputLogEvent.builder().message(message).timestamp(timestamp).build(); + + logEvents.add(inputLogEvent); + byteTotal += eventSize; + + if (minTimestampMs == 0 || timestamp < minTimestampMs) { + minTimestampMs = timestamp; + } + if (maxTimestampMs == 0 || timestamp > maxTimestampMs) { + maxTimestampMs = timestamp; + } + } + + private List getLogEvents() { + return logEvents; + } + + private int getByteTotal() { + return byteTotal; + } + + private long getMinTimestampMs() { + return minTimestampMs; + } + + private long getMaxTimestampMs() { + return maxTimestampMs; + } + + private long getCreatedTimestampMs() { + return createdTimestampMs; + } + + private boolean isEmpty() { + return logEvents.isEmpty(); + } + + private int size() { + return logEvents.size(); + } + + private void clear() { + logEvents.clear(); + byteTotal = 0; + minTimestampMs = 0; + maxTimestampMs = 0; + } + } +} diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/common/emitter/ConsoleEmitter.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/common/emitter/ConsoleEmitter.java new file mode 100644 index 0000000000..15f799fc1d --- /dev/null +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/common/emitter/ConsoleEmitter.java @@ -0,0 +1,65 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.opentelemetry.javaagent.providers.exporter.aws.metrics.common.emitter; + +import java.io.PrintStream; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** A log event emitter that prints Log Events to Standard Out. */ +public class ConsoleEmitter implements LogEventEmitter { + private static final Logger logger = Logger.getLogger(ConsoleEmitter.class.getName()); + private final PrintStream emitter; + + public ConsoleEmitter() { + this.emitter = System.out; + } + + public ConsoleEmitter(PrintStream emitter) { + this.emitter = emitter; + } + + @Override + public PrintStream getEmitter() { + return this.emitter; + } + + @Override + public void emit(Map logEvent) { + try { + Object messageObj = logEvent.get("message"); + String message = messageObj != null ? messageObj.toString() : ""; + if (message.isEmpty()) { + logger.log(Level.WARNING, String.format("Empty message in log event: %s", logEvent)); + return; + } + this.emitter.println(message); + this.emitter.flush(); + } catch (Exception error) { + logger.log( + Level.SEVERE, + String.format( + "Failed to write EMF log to console. Log event: %s. Error: %s", + logEvent, error.getMessage())); + } + } + + @Override + public void flushEvents() { + this.emitter.flush(); + } +} diff --git a/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/common/emitter/LogEventEmitter.java b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/common/emitter/LogEventEmitter.java new file mode 100644 index 0000000000..694f90b1a2 --- /dev/null +++ b/awsagentprovider/src/main/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/common/emitter/LogEventEmitter.java @@ -0,0 +1,43 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.opentelemetry.javaagent.providers.exporter.aws.metrics.common.emitter; + +import java.util.Map; + +/** + * Generic interface for log event emitters. + * + * @param The type of the underlying emitter client + */ +public interface LogEventEmitter { + + /** + * Get the underlying emitter client. + * + * @return The emitter client + */ + T getEmitter(); + + /** + * Emit a log event. + * + * @param logEvent The log event to emit + */ + void emit(Map logEvent); + + /** Flush any pending events. */ + void flushEvents(); +} diff --git a/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProviderTest.java b/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProviderTest.java index 93d6a97f11..624c4a521c 100644 --- a/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProviderTest.java +++ b/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerProviderTest.java @@ -18,15 +18,235 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatException; import static org.assertj.core.api.Assertions.assertThatNoException; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; +import static software.amazon.opentelemetry.javaagent.providers.AwsApplicationSignalsCustomizerProvider.*; import com.fasterxml.jackson.core.JsonProcessingException; import io.opentelemetry.contrib.awsxray.AwsXrayAdaptiveSamplingConfig; +import io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogRecordExporter; +import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter; +import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter; +import io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporter; +import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; +import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties; +import io.opentelemetry.sdk.autoconfigure.spi.internal.DefaultConfigProperties; +import io.opentelemetry.sdk.logs.export.LogRecordExporter; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import io.opentelemetry.sdk.trace.export.SpanExporter; import java.io.File; import java.net.URISyntaxException; import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; +import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.MockedStatic; +import software.amazon.opentelemetry.javaagent.providers.exporter.aws.metrics.AwsCloudWatchEmfExporter; +import software.amazon.opentelemetry.javaagent.providers.exporter.otlp.aws.logs.OtlpAwsLogsExporter; +import software.amazon.opentelemetry.javaagent.providers.exporter.otlp.aws.traces.OtlpAwsSpanExporter; class AwsApplicationSignalsCustomizerProviderTest { + private AwsApplicationSignalsCustomizerProvider provider; + private final LogRecordExporter defaultHttpLogsExporter = OtlpHttpLogRecordExporter.getDefault(); + private final SpanExporter defaultHttpSpanExporter = OtlpHttpSpanExporter.getDefault(); + private final MetricExporter defaultHttpMetricsExporter = OtlpHttpMetricExporter.getDefault(); + + @BeforeEach + void init() { + this.provider = new AwsApplicationSignalsCustomizerProvider(); + } + + @ParameterizedTest + @MethodSource("validSigv4LogsConfigProvider") + void testShouldEnableSigV4LogsExporterIfConfigIsCorrect(Map validSigv4Config) { + customizeExporterTest( + validSigv4Config, + defaultHttpLogsExporter, + this.provider::customizeLogsExporter, + OtlpAwsLogsExporter.class); + } + + @ParameterizedTest + @MethodSource("invalidSigv4LogsConfigProvider") + void testShouldNotUseSigv4LogsExporterIfConfigIsIncorrect( + Map invalidSigv4Config) { + customizeExporterTest( + invalidSigv4Config, + defaultHttpLogsExporter, + this.provider::customizeLogsExporter, + OtlpHttpLogRecordExporter.class); + } + + @Test + void testShouldNotUseSigv4LogsExporterIfValidatorThrows() { + try (MockedStatic ignored = mockStatic(Pattern.class)) { + when(Pattern.compile(any())).thenThrow(PatternSyntaxException.class); + customizeExporterTest( + Map.of( + OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, + "https://logs.us-east-1.amazonaws.com/v1/logs", + OTEL_EXPORTER_OTLP_LOGS_HEADERS, + "x-aws-log-group=test1,x-aws-log-stream=test2", + OTEL_EXPORTER_OTLP_LOGS_PROTOCOL, + "http/protobuf", + OTEL_LOGS_EXPORTER, + "otlp"), + defaultHttpSpanExporter, + this.provider::customizeSpanExporter, + OtlpHttpSpanExporter.class); + } + } + + @ParameterizedTest + @MethodSource("validSigv4TracesConfigProvider") + void testShouldEnableSigV4SpanExporterIfConfigIsCorrect(Map validSigv4Config) { + customizeExporterTest( + validSigv4Config, + defaultHttpSpanExporter, + this.provider::customizeSpanExporter, + OtlpAwsSpanExporter.class); + } + + @ParameterizedTest + @MethodSource("invalidSigv4TracesConfigProvider") + void testShouldNotUseSigv4SpanExporterIfConfigIsIncorrect( + Map invalidSigv4Config) { + customizeExporterTest( + invalidSigv4Config, + defaultHttpSpanExporter, + this.provider::customizeSpanExporter, + OtlpHttpSpanExporter.class); + } + + @Test + void testShouldNotUseSigv4SpanExporterIfValidatorThrows() { + try (MockedStatic ignored = mockStatic(Pattern.class)) { + when(Pattern.compile(any())).thenThrow(PatternSyntaxException.class); + customizeExporterTest( + Map.of( + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, + "http://xray.us-east-1.amazonaws.com/v1/traces", + OTEL_EXPORTER_OTLP_TRACES_PROTOCOL, + "http/protobuf", + OTEL_TRACES_EXPORTER, + "otlp"), + defaultHttpSpanExporter, + this.provider::customizeSpanExporter, + OtlpHttpSpanExporter.class); + } + } + + // This technically should never happen as the validator checks for the correct env variables. But + // just to be safe. + @Test + void testShouldThrowIllegalStateExceptionIfIncorrectSpanExporter() { + assertThrows( + IllegalStateException.class, + () -> + customizeExporterTest( + Map.of( + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, + "https://xray.us-east-1.amazonaws.com/v1/traces", + OTEL_EXPORTER_OTLP_TRACES_PROTOCOL, + "http/protobuf", + OTEL_TRACES_EXPORTER, + "otlp"), + OtlpGrpcSpanExporter.getDefault(), + this.provider::customizeSpanExporter, + OtlpHttpSpanExporter.class)); + } + + // This technically should never happen as the validator checks for the correct env variables + @Test + void testShouldThrowIllegalStateExceptionIfIncorrectLogsExporter() { + assertThrows( + IllegalStateException.class, + () -> + customizeExporterTest( + Map.of( + OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, + "https://logs.us-east-1.amazonaws.com/v1/logs", + OTEL_EXPORTER_OTLP_LOGS_HEADERS, + "x-aws-log-group=test1,x-aws-log-stream=test2", + OTEL_EXPORTER_OTLP_LOGS_PROTOCOL, + "http/protobuf", + OTEL_LOGS_EXPORTER, + "otlp"), + OtlpGrpcLogRecordExporter.getDefault(), + this.provider::customizeLogsExporter, + OtlpHttpLogRecordExporter.class)); + } + + @Test + void testEnableApplicationSignalsSpanExporter() { + customizeExporterTest( + Map.of( + APPLICATION_SIGNALS_ENABLED_CONFIG, + "true", + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, + "http://localhost:4318/v1/traces", + OTEL_EXPORTER_OTLP_TRACES_PROTOCOL, + "http/protobuf", + OTEL_TRACES_EXPORTER, + "otlp"), + defaultHttpSpanExporter, + this.provider::customizeSpanExporter, + AwsMetricAttributesSpanExporter.class); + } + + @Test + void testSigv4ShouldNotDisableApplicationSignalsSpanExporter() { + customizeExporterTest( + Map.of( + APPLICATION_SIGNALS_ENABLED_CONFIG, + "true", + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, + "https://xray.us-east-1.amazonaws.com/v1/traces", + OTEL_EXPORTER_OTLP_TRACES_PROTOCOL, + "http/protobuf", + OTEL_TRACES_EXPORTER, + "otlp"), + defaultHttpSpanExporter, + this.provider::customizeSpanExporter, + AwsMetricAttributesSpanExporter.class); + } + + @ParameterizedTest + @MethodSource("validEmfConfigProvider") + void testShouldEnableEmfExporterIfConfigIsCorrect(Map validEmfConfig) { + DefaultConfigProperties configProps = DefaultConfigProperties.createFromMap(validEmfConfig); + this.provider.customizeProperties(configProps); + + customizeExporterTest( + validEmfConfig, + defaultHttpMetricsExporter, + this.provider::customizeMetricExporter, + AwsCloudWatchEmfExporter.class); + } + + @ParameterizedTest + @MethodSource("invalidEmfConfigProvider") + void testShouldNotUseEmfExporterIfConfigIsIncorrect(Map invalidEmfConfig) { + DefaultConfigProperties configProps = DefaultConfigProperties.createFromMap(invalidEmfConfig); + this.provider.customizeProperties(configProps); + + customizeExporterTest( + invalidEmfConfig, + defaultHttpMetricsExporter, + this.provider::customizeMetricExporter, + OtlpHttpMetricExporter.class); + } @Test void setAdaptiveSamplingConfigFromString_validConfig() throws JsonProcessingException { @@ -99,4 +319,321 @@ void setAdaptiveSamplingConfigFromFile_invalidYaml() throws URISyntaxException { assertThatException() .isThrownBy(() -> AwsApplicationSignalsCustomizerProvider.parseConfigString(absolutePath)); } + + private static void customizeExporterTest( + Map config, + Exporter defaultExporter, + BiFunction executor, + Class expectedExporterType) { + + DefaultConfigProperties configProps = DefaultConfigProperties.createFromMap(config); + Exporter result = executor.apply(defaultExporter, configProps); + assertEquals(expectedExporterType, result.getClass()); + } + + static Stream validSigv4TracesConfigProvider() { + List> args = new ArrayList<>(); + String[] tracesGoodEndpoints = { + "https://xray.us-east-1.amazonaws.com/v1/traces", + "https://XRAY.US-EAST-1.AMAZONAWS.COM/V1/TRACES", + "https://xray.us-east-1.amazonaws.com/v1/traces", + "https://XRAY.US-EAST-1.amazonaws.com/v1/traces", + "https://xray.US-EAST-1.AMAZONAWS.com/v1/traces", + "https://Xray.Us-East-1.amazonaws.com/v1/traces", + "https://xRAY.us-EAST-1.amazonaws.com/v1/traces", + "https://XRAY.us-EAST-1.AMAZONAWS.com/v1/TRACES", + "https://xray.US-EAST-1.amazonaws.com/V1/Traces", + "https://xray.us-east-1.AMAZONAWS.COM/v1/traces", + "https://XrAy.Us-EaSt-1.AmAzOnAwS.cOm/V1/TrAcEs", + "https://xray.US-EAST-1.amazonaws.com/v1/traces", + "https://xray.us-east-1.amazonaws.com/V1/TRACES", + "https://XRAY.US-EAST-1.AMAZONAWS.COM/v1/traces", + "https://xray.us-east-1.AMAZONAWS.COM/V1/traces" + }; + + for (String endpoint : tracesGoodEndpoints) { + Map badEndpoint = + Map.of( + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, endpoint, + OTEL_EXPORTER_OTLP_TRACES_PROTOCOL, "http/protobuf", + OTEL_TRACES_EXPORTER, "otlp"); + + args.add(badEndpoint); + } + + return args.stream().map(Arguments::of); + } + + static Stream invalidSigv4TracesConfigProvider() { + List> args = new ArrayList<>(); + String[] tracesBadEndpoints = { + "http://localhost:4318/v1/traces", + "http://xray.us-east-1.amazonaws.com/v1/traces", + "ftp://xray.us-east-1.amazonaws.com/v1/traces", + "https://ray.us-east-1.amazonaws.com/v1/traces", + "https://xra.us-east-1.amazonaws.com/v1/traces", + "https://x-ray.us-east-1.amazonaws.com/v1/traces", + "https://xray.amazonaws.com/v1/traces", + "https://xray.us-east-1.amazon.com/v1/traces", + "https://xray.us-east-1.aws.com/v1/traces", + "https://xray.us_east_1.amazonaws.com/v1/traces", + "https://xray.us.east.1.amazonaws.com/v1/traces", + "https://xray..amazonaws.com/v1/traces", + "https://xray.us-east-1.amazonaws.com/traces", + "https://xray.us-east-1.amazonaws.com/v2/traces", + "https://xray.us-east-1.amazonaws.com/v1/trace", + "https://xray.us-east-1.amazonaws.com/v1/traces/", + "https://xray.us-east-1.amazonaws.com//v1/traces", + "https://xray.us-east-1.amazonaws.com/v1//traces", + "https://xray.us-east-1.amazonaws.com/v1/traces?param=value", + "https://xray.us-east-1.amazonaws.com/v1/traces#fragment", + "https://xray.us-east-1.amazonaws.com:443/v1/traces", + "https:/xray.us-east-1.amazonaws.com/v1/traces", + "https:://xray.us-east-1.amazonaws.com/v1/traces", + }; + + Map invalidProtocol = + Map.of( + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, "https://xray.us-east-1.amazonaws.com/v1/traces", + OTEL_EXPORTER_OTLP_TRACES_PROTOCOL, "http/json", + OTEL_TRACES_EXPORTER, "otlp"); + + Map consoleExporter = + Map.of( + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, "https://xray.us-east-1.amazonaws.com/v1/traces", + OTEL_EXPORTER_OTLP_TRACES_PROTOCOL, "http/protobuf", + OTEL_TRACES_EXPORTER, "console"); + + for (String endpoint : tracesBadEndpoints) { + Map badEndpoint = + Map.of( + OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, endpoint, + OTEL_EXPORTER_OTLP_TRACES_PROTOCOL, "http/protobuf", + OTEL_TRACES_EXPORTER, "otlp"); + + args.add(badEndpoint); + } + + args.add(consoleExporter); + args.add(invalidProtocol); + + return args.stream().map(Arguments::of); + } + + static Stream validSigv4LogsConfigProvider() { + List> args = new ArrayList<>(); + String[] logsGoodEndpoints = { + "https://logs.us-east-1.amazonaws.com/v1/logs", + "https://LOGS.US-EAST-1.AMAZONAWS.COM/V1/LOGS", + "https://logs.us-east-1.amazonaws.com/v1/logs", + "https://LOGS.US-EAST-1.amazonaws.com/v1/logs", + "https://logs.US-EAST-1.AMAZONAWS.com/v1/logs", + "https://Logs.Us-East-1.amazonaws.com/v1/logs", + "https://lOGS.us-EAST-1.amazonaws.com/v1/logs", + "https://LOGS.us-EAST-1.AMAZONAWS.com/v1/LOGS", + "https://logs.US-EAST-1.amazonaws.com/V1/Logs", + "https://logs.us-east-1.AMAZONAWS.COM/v1/logs", + "https://LoGs.Us-EaSt-1.AmAzOnAwS.cOm/V1/LoGs", + "https://logs.US-EAST-1.amazonaws.com/v1/logs", + "https://logs.us-east-1.amazonaws.com/V1/LOGS", + "https://LOGS.US-EAST-1.AMAZONAWS.COM/v1/logs", + "https://logs.us-east-1.AMAZONAWS.COM/V1/logs" + }; + + for (String endpoint : logsGoodEndpoints) { + Map badEndpoint = + Map.of( + OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, + endpoint, + OTEL_EXPORTER_OTLP_LOGS_HEADERS, + "x-aws-log-group=test1,x-aws-log-stream=test2", + OTEL_EXPORTER_OTLP_LOGS_PROTOCOL, + "http/protobuf", + OTEL_LOGS_EXPORTER, + "otlp"); + + args.add(badEndpoint); + } + + return args.stream().map(Arguments::of); + } + + static Stream invalidSigv4LogsConfigProvider() { + List> args = new ArrayList<>(); + String[] logsBadEndpoints = { + "http://localhost:4318/v1/logs", + "http://logs.us-east-1.amazonaws.com/v1/logs", + "ftp://logs.us-east-1.amazonaws.com/v1/logs", + "https://log.us-east-1.amazonaws.com/v1/logs", + "https://logging.us-east-1.amazonaws.com/v1/logs", + "https://cloud-logs.us-east-1.amazonaws.com/v1/logs", + "https://logs.amazonaws.com/v1/logs", + "https://logs.us-east-1.amazon.com/v1/logs", + "https://logs.us-east-1.aws.com/v1/logs", + "https://logs.US-EAST-1.amazonaws.com/v1/logs", + "https://logs.us_east_1.amazonaws.com/v1/logs", + "https://logs.us.east.1.amazonaws.com/v1/logs", + "https://logs..amazonaws.com/v1/logs", + "https://logs.us-east-1.amazonaws.com/logs", + "https://logs.us-east-1.amazonaws.com/v2/logs", + "https://logs.us-east-1.amazonaws.com/v1/log", + "https://logs.us-east-1.amazonaws.com/v1/logs/", + "https://logs.us-east-1.amazonaws.com//v1/logs", + "https://logs.us-east-1.amazonaws.com/v1//logs", + "https://logs.us-east-1.amazonaws.com/v1/logs?param=value", + "https://logs.us-east-1.amazonaws.com/v1/logs#fragment", + "https://logs.us-east-1.amazonaws.com:443/v1/logs", + "https:/logs.us-east-1.amazonaws.com/v1/logs", + "https:://logs.us-east-1.amazonaws.com/v1/logs", + "https://LOGS.us-east-1.amazonaws.com/v1/logs", + "https://logs.us-east-1.amazonaws.com/V1/LOGS", + "https://logs.us-east-1.amazonaws.com/v1/logging", + "https://logs.us-east-1.amazonaws.com/v1/cloudwatchlogs", + "https://logs.us-east-1.amazonaws.com/v1/cwlogs" + }; + + Map noLogGroupHeader = + Map.of( + OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, + "https://logs.us-east-1.amazonaws.com/v1/logs", + OTEL_EXPORTER_OTLP_LOGS_HEADERS, + "x-aws-log-stream=test2", + OTEL_EXPORTER_OTLP_LOGS_PROTOCOL, + "http/protobuf", + OTEL_LOGS_EXPORTER, + "otlp"); + + Map noLogStreamHeader = + Map.of( + OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, + "https://logs.us-east-1.amazonaws.com/v1/logs", + OTEL_EXPORTER_OTLP_LOGS_HEADERS, + "x-aws-log-group=test2", + OTEL_EXPORTER_OTLP_LOGS_PROTOCOL, + "http/protobuf", + OTEL_LOGS_EXPORTER, + "otlp"); + + Map badLogStreamHeader = + Map.of( + OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, + "https://logs.us-east-1.amazonaws.com/v1/logs", + OTEL_EXPORTER_OTLP_LOGS_HEADERS, + "x-aws-log-group=test1,x-aws-log-strea21=test2", + OTEL_EXPORTER_OTLP_LOGS_PROTOCOL, + "http/protobuf", + OTEL_LOGS_EXPORTER, + "otlp"); + + Map invalidProtocol = + Map.of( + OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, + "https://logs.us-east-1.amazonaws.com/v1/logs", + OTEL_EXPORTER_OTLP_LOGS_HEADERS, + "x-aws-log-group=test1,x-aws-log-stream=test2", + OTEL_EXPORTER_OTLP_LOGS_PROTOCOL, + "grpc", + OTEL_LOGS_EXPORTER, + "otlp"); + + Map consoleExporter = + Map.of( + OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, "https://logs.us-east-1.amazonaws.com/v1/logs", + OTEL_EXPORTER_OTLP_LOGS_HEADERS, "x-aws-log-stream=test2", + OTEL_EXPORTER_OTLP_LOGS_PROTOCOL, "http/protobuf", + OTEL_LOGS_EXPORTER, "console"); + + for (String endpoint : logsBadEndpoints) { + Map badEndpoint = + Map.of( + OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, endpoint, + OTEL_EXPORTER_OTLP_LOGS_PROTOCOL, "http/protobuf", + OTEL_LOGS_EXPORTER, "otlp"); + + args.add(badEndpoint); + } + + args.add(badLogStreamHeader); + args.add(noLogStreamHeader); + args.add(noLogGroupHeader); + args.add(invalidProtocol); + args.add(consoleExporter); + + return args.stream().map(Arguments::of); + } + + static Stream invalidEmfConfigProvider() { + List> args = new ArrayList<>(); + + Map wrongExporter = + Map.of( + OTEL_METRICS_EXPORTER, + "otlp", + OTEL_EXPORTER_OTLP_LOGS_HEADERS, + "x-aws-log-group=test-group,x-aws-log-stream=test-stream,x-aws-metric-namespace=test-namespace", + AWS_REGION, + "us-east-1"); + + Map missingHeaders = + Map.of(OTEL_METRICS_EXPORTER, "awsemf", AWS_REGION, "us-east-1"); + + Map missingRegion = + Map.of( + OTEL_METRICS_EXPORTER, "awsemf", + OTEL_EXPORTER_OTLP_LOGS_HEADERS, + "x-aws-log-group=test-group,x-aws-log-stream=test-stream,x-aws-metric-namespace=test-namespace"); + + Map missingLogGroup = + Map.of( + OTEL_METRICS_EXPORTER, + "awsemf", + OTEL_EXPORTER_OTLP_LOGS_HEADERS, + "x-aws-log-stream=test-stream,x-aws-metric-namespace=test-namespace", + AWS_REGION, + "us-east-1"); + + Map missingLogStream = + Map.of( + OTEL_METRICS_EXPORTER, + "awsemf", + OTEL_EXPORTER_OTLP_LOGS_HEADERS, + "x-aws-log-group=test-group,x-aws-metric-namespace=test-namespace", + AWS_REGION, + "us-east-1"); + + args.add(wrongExporter); + args.add(missingHeaders); + args.add(missingRegion); + args.add(missingLogGroup); + args.add(missingLogStream); + + return args.stream().map(Arguments::of); + } + + static Stream validEmfConfigProvider() { + List> args = new ArrayList<>(); + + Map awsRegionConfig = + Map.of( + OTEL_METRICS_EXPORTER, + "awsemf", + OTEL_EXPORTER_OTLP_LOGS_HEADERS, + "x-aws-log-group=test-group,x-aws-log-stream=test-stream,x-aws-metric-namespace=test-namespace", + AWS_REGION, + "us-east-1"); + + Map awsDefaultRegionConfig = + Map.of( + OTEL_METRICS_EXPORTER, + "awsemf", + OTEL_EXPORTER_OTLP_LOGS_HEADERS, + "x-aws-log-group=test-group,x-aws-log-stream=test-stream,x-aws-metric-namespace=test-namespace", + AWS_DEFAULT_REGION, + "us-west-2"); + + args.add(awsRegionConfig); + args.add(awsDefaultRegionConfig); + + return args.stream().map(Arguments::of); + } } diff --git a/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerTest.java b/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerTest.java deleted file mode 100644 index 1732c95627..0000000000 --- a/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/AwsApplicationSignalsCustomizerTest.java +++ /dev/null @@ -1,459 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.opentelemetry.javaagent.providers; - -import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mockStatic; -import static org.mockito.Mockito.when; -import static software.amazon.opentelemetry.javaagent.providers.AwsApplicationSignalsCustomizerProvider.*; - -import io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogRecordExporter; -import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter; -import io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporter; -import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; -import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties; -import io.opentelemetry.sdk.autoconfigure.spi.internal.DefaultConfigProperties; -import io.opentelemetry.sdk.logs.export.LogRecordExporter; -import io.opentelemetry.sdk.trace.export.SpanExporter; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.function.BiFunction; -import java.util.regex.Pattern; -import java.util.regex.PatternSyntaxException; -import java.util.stream.Stream; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; -import org.mockito.MockedStatic; -import org.mockito.junit.jupiter.MockitoExtension; -import software.amazon.opentelemetry.javaagent.providers.exporter.otlp.aws.logs.OtlpAwsLogsExporter; -import software.amazon.opentelemetry.javaagent.providers.exporter.otlp.aws.traces.OtlpAwsSpanExporter; - -@ExtendWith({MockitoExtension.class}) -public class AwsApplicationSignalsCustomizerTest { - private AwsApplicationSignalsCustomizerProvider provider; - private final LogRecordExporter defaultHttpLogsExporter = OtlpHttpLogRecordExporter.getDefault(); - private final SpanExporter defaultHttpSpanExporter = OtlpHttpSpanExporter.getDefault(); - - @BeforeEach - void init() { - this.provider = new AwsApplicationSignalsCustomizerProvider(); - } - - @AfterEach - void reset() {} - - @ParameterizedTest - @MethodSource("validSigv4LogsConfigProvider") - void testShouldEnableSigV4LogsExporterIfConfigIsCorrect(Map validSigv4Config) { - customizeExporterTest( - validSigv4Config, - defaultHttpLogsExporter, - this.provider::customizeLogsExporter, - OtlpAwsLogsExporter.class); - } - - @ParameterizedTest - @MethodSource("invalidSigv4LogsConfigProvider") - void testShouldNotUseSigv4LogsExporter(Map invalidSigv4Config) { - customizeExporterTest( - invalidSigv4Config, - defaultHttpLogsExporter, - this.provider::customizeLogsExporter, - OtlpHttpLogRecordExporter.class); - } - - @Test - void testShouldNotUseSigv4LogsExporterIfValidatorThrows() { - try (MockedStatic ignored = mockStatic(Pattern.class)) { - when(Pattern.compile(any())).thenThrow(PatternSyntaxException.class); - customizeExporterTest( - Map.of( - OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, - "https://logs.us-east-1.amazonaws.com/v1/logs", - OTEL_EXPORTER_OTLP_LOGS_HEADERS, - "x-aws-log-group=test1,x-aws-log-stream=test2", - OTEL_EXPORTER_OTLP_LOGS_PROTOCOL, - "http/protobuf", - OTEL_LOGS_EXPORTER, - "otlp"), - defaultHttpSpanExporter, - this.provider::customizeSpanExporter, - OtlpHttpSpanExporter.class); - } - } - - @ParameterizedTest - @MethodSource("validSigv4TracesConfigProvider") - void testShouldEnableSigV4SpanExporterIfConfigIsCorrect(Map validSigv4Config) { - customizeExporterTest( - validSigv4Config, - defaultHttpSpanExporter, - this.provider::customizeSpanExporter, - OtlpAwsSpanExporter.class); - } - - @ParameterizedTest - @MethodSource("invalidSigv4TracesConfigProvider") - void testShouldNotUseSigv4SpanExporter(Map invalidSigv4Config) { - customizeExporterTest( - invalidSigv4Config, - defaultHttpSpanExporter, - this.provider::customizeSpanExporter, - OtlpHttpSpanExporter.class); - } - - @Test - void testShouldNotUseSigv4SpanExporterIfValidatorThrows() { - try (MockedStatic ignored = mockStatic(Pattern.class)) { - when(Pattern.compile(any())).thenThrow(PatternSyntaxException.class); - customizeExporterTest( - Map.of( - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, - "http://xray.us-east-1.amazonaws.com/v1/traces", - OTEL_EXPORTER_OTLP_TRACES_PROTOCOL, - "http/protobuf", - OTEL_TRACES_EXPORTER, - "otlp"), - defaultHttpSpanExporter, - this.provider::customizeSpanExporter, - OtlpHttpSpanExporter.class); - } - } - - // This technically should never happen as the validator checks for the correct env variables. But - // just to be safe. - @Test - void testShouldThrowIllegalStateExceptionIfIncorrectSpanExporter() { - assertThrows( - IllegalStateException.class, - () -> - customizeExporterTest( - Map.of( - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, - "https://xray.us-east-1.amazonaws.com/v1/traces", - OTEL_EXPORTER_OTLP_TRACES_PROTOCOL, - "http/protobuf", - OTEL_TRACES_EXPORTER, - "otlp"), - OtlpGrpcSpanExporter.getDefault(), - this.provider::customizeSpanExporter, - OtlpHttpSpanExporter.class)); - } - - // This technically should never happen as the validator checks for the correct env variables - @Test - void testShouldThrowIllegalStateExceptionIfIncorrectLogsExporter() { - assertThrows( - IllegalStateException.class, - () -> - customizeExporterTest( - Map.of( - OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, - "https://logs.us-east-1.amazonaws.com/v1/logs", - OTEL_EXPORTER_OTLP_LOGS_HEADERS, - "x-aws-log-group=test1,x-aws-log-stream=test2", - OTEL_EXPORTER_OTLP_LOGS_PROTOCOL, - "http/protobuf", - OTEL_LOGS_EXPORTER, - "otlp"), - OtlpGrpcLogRecordExporter.getDefault(), - this.provider::customizeLogsExporter, - OtlpHttpLogRecordExporter.class)); - } - - @Test - void testEnableApplicationSignalsSpanExporter() { - customizeExporterTest( - Map.of( - APPLICATION_SIGNALS_ENABLED_CONFIG, - "true", - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, - "http://localhost:4318/v1/traces", - OTEL_EXPORTER_OTLP_TRACES_PROTOCOL, - "http/protobuf", - OTEL_TRACES_EXPORTER, - "otlp"), - defaultHttpSpanExporter, - this.provider::customizeSpanExporter, - AwsMetricAttributesSpanExporter.class); - } - - @Test - void testSigv4ShouldNotDisableApplicationSignalsSpanExporter() { - customizeExporterTest( - Map.of( - APPLICATION_SIGNALS_ENABLED_CONFIG, - "true", - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, - "https://xray.us-east-1.amazonaws.com/v1/traces", - OTEL_EXPORTER_OTLP_TRACES_PROTOCOL, - "http/protobuf", - OTEL_TRACES_EXPORTER, - "otlp"), - defaultHttpSpanExporter, - this.provider::customizeSpanExporter, - AwsMetricAttributesSpanExporter.class); - } - - private static void customizeExporterTest( - Map config, - Exporter defaultExporter, - BiFunction executor, - Class expectedExporterType) { - - DefaultConfigProperties configProps = DefaultConfigProperties.createFromMap(config); - Exporter result = executor.apply(defaultExporter, configProps); - assertEquals(expectedExporterType, result.getClass()); - } - - static Stream validSigv4TracesConfigProvider() { - List> args = new ArrayList<>(); - String[] tracesGoodEndpoints = { - "https://xray.us-east-1.amazonaws.com/v1/traces", - "https://XRAY.US-EAST-1.AMAZONAWS.COM/V1/TRACES", - "https://xray.us-east-1.amazonaws.com/v1/traces", - "https://XRAY.US-EAST-1.amazonaws.com/v1/traces", - "https://xray.US-EAST-1.AMAZONAWS.com/v1/traces", - "https://Xray.Us-East-1.amazonaws.com/v1/traces", - "https://xRAY.us-EAST-1.amazonaws.com/v1/traces", - "https://XRAY.us-EAST-1.AMAZONAWS.com/v1/TRACES", - "https://xray.US-EAST-1.amazonaws.com/V1/Traces", - "https://xray.us-east-1.AMAZONAWS.COM/v1/traces", - "https://XrAy.Us-EaSt-1.AmAzOnAwS.cOm/V1/TrAcEs", - "https://xray.US-EAST-1.amazonaws.com/v1/traces", - "https://xray.us-east-1.amazonaws.com/V1/TRACES", - "https://XRAY.US-EAST-1.AMAZONAWS.COM/v1/traces", - "https://xray.us-east-1.AMAZONAWS.COM/V1/traces" - }; - - for (String endpoint : tracesGoodEndpoints) { - Map badEndpoint = - Map.of( - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, endpoint, - OTEL_EXPORTER_OTLP_TRACES_PROTOCOL, "http/protobuf", - OTEL_TRACES_EXPORTER, "otlp"); - - args.add(badEndpoint); - } - - return args.stream().map(Arguments::of); - } - - static Stream invalidSigv4TracesConfigProvider() { - List> args = new ArrayList<>(); - String[] tracesBadEndpoints = { - "http://localhost:4318/v1/traces", - "http://xray.us-east-1.amazonaws.com/v1/traces", - "ftp://xray.us-east-1.amazonaws.com/v1/traces", - "https://ray.us-east-1.amazonaws.com/v1/traces", - "https://xra.us-east-1.amazonaws.com/v1/traces", - "https://x-ray.us-east-1.amazonaws.com/v1/traces", - "https://xray.amazonaws.com/v1/traces", - "https://xray.us-east-1.amazon.com/v1/traces", - "https://xray.us-east-1.aws.com/v1/traces", - "https://xray.us_east_1.amazonaws.com/v1/traces", - "https://xray.us.east.1.amazonaws.com/v1/traces", - "https://xray..amazonaws.com/v1/traces", - "https://xray.us-east-1.amazonaws.com/traces", - "https://xray.us-east-1.amazonaws.com/v2/traces", - "https://xray.us-east-1.amazonaws.com/v1/trace", - "https://xray.us-east-1.amazonaws.com/v1/traces/", - "https://xray.us-east-1.amazonaws.com//v1/traces", - "https://xray.us-east-1.amazonaws.com/v1//traces", - "https://xray.us-east-1.amazonaws.com/v1/traces?param=value", - "https://xray.us-east-1.amazonaws.com/v1/traces#fragment", - "https://xray.us-east-1.amazonaws.com:443/v1/traces", - "https:/xray.us-east-1.amazonaws.com/v1/traces", - "https:://xray.us-east-1.amazonaws.com/v1/traces", - }; - - Map invalidProtocol = - Map.of( - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, "https://xray.us-east-1.amazonaws.com/v1/traces", - OTEL_EXPORTER_OTLP_TRACES_PROTOCOL, "http/json", - OTEL_TRACES_EXPORTER, "otlp"); - - Map consoleExporter = - Map.of( - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, "https://xray.us-east-1.amazonaws.com/v1/traces", - OTEL_EXPORTER_OTLP_TRACES_PROTOCOL, "http/protobuf", - OTEL_TRACES_EXPORTER, "console"); - - for (String endpoint : tracesBadEndpoints) { - Map badEndpoint = - Map.of( - OTEL_EXPORTER_OTLP_TRACES_ENDPOINT, endpoint, - OTEL_EXPORTER_OTLP_TRACES_PROTOCOL, "http/protobuf", - OTEL_TRACES_EXPORTER, "otlp"); - - args.add(badEndpoint); - } - - args.add(consoleExporter); - args.add(invalidProtocol); - - return args.stream().map(Arguments::of); - } - - static Stream validSigv4LogsConfigProvider() { - List> args = new ArrayList<>(); - String[] logsGoodEndpoints = { - "https://logs.us-east-1.amazonaws.com/v1/logs", - "https://LOGS.US-EAST-1.AMAZONAWS.COM/V1/LOGS", - "https://logs.us-east-1.amazonaws.com/v1/logs", - "https://LOGS.US-EAST-1.amazonaws.com/v1/logs", - "https://logs.US-EAST-1.AMAZONAWS.com/v1/logs", - "https://Logs.Us-East-1.amazonaws.com/v1/logs", - "https://lOGS.us-EAST-1.amazonaws.com/v1/logs", - "https://LOGS.us-EAST-1.AMAZONAWS.com/v1/LOGS", - "https://logs.US-EAST-1.amazonaws.com/V1/Logs", - "https://logs.us-east-1.AMAZONAWS.COM/v1/logs", - "https://LoGs.Us-EaSt-1.AmAzOnAwS.cOm/V1/LoGs", - "https://logs.US-EAST-1.amazonaws.com/v1/logs", - "https://logs.us-east-1.amazonaws.com/V1/LOGS", - "https://LOGS.US-EAST-1.AMAZONAWS.COM/v1/logs", - "https://logs.us-east-1.AMAZONAWS.COM/V1/logs" - }; - - for (String endpoint : logsGoodEndpoints) { - Map badEndpoint = - Map.of( - OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, - endpoint, - OTEL_EXPORTER_OTLP_LOGS_HEADERS, - "x-aws-log-group=test1,x-aws-log-stream=test2", - OTEL_EXPORTER_OTLP_LOGS_PROTOCOL, - "http/protobuf", - OTEL_LOGS_EXPORTER, - "otlp"); - - args.add(badEndpoint); - } - - return args.stream().map(Arguments::of); - } - - static Stream invalidSigv4LogsConfigProvider() { - List> args = new ArrayList<>(); - String[] logsBadEndpoints = { - "http://localhost:4318/v1/logs", - "http://logs.us-east-1.amazonaws.com/v1/logs", - "ftp://logs.us-east-1.amazonaws.com/v1/logs", - "https://log.us-east-1.amazonaws.com/v1/logs", - "https://logging.us-east-1.amazonaws.com/v1/logs", - "https://cloud-logs.us-east-1.amazonaws.com/v1/logs", - "https://logs.amazonaws.com/v1/logs", - "https://logs.us-east-1.amazon.com/v1/logs", - "https://logs.us-east-1.aws.com/v1/logs", - "https://logs.US-EAST-1.amazonaws.com/v1/logs", - "https://logs.us_east_1.amazonaws.com/v1/logs", - "https://logs.us.east.1.amazonaws.com/v1/logs", - "https://logs..amazonaws.com/v1/logs", - "https://logs.us-east-1.amazonaws.com/logs", - "https://logs.us-east-1.amazonaws.com/v2/logs", - "https://logs.us-east-1.amazonaws.com/v1/log", - "https://logs.us-east-1.amazonaws.com/v1/logs/", - "https://logs.us-east-1.amazonaws.com//v1/logs", - "https://logs.us-east-1.amazonaws.com/v1//logs", - "https://logs.us-east-1.amazonaws.com/v1/logs?param=value", - "https://logs.us-east-1.amazonaws.com/v1/logs#fragment", - "https://logs.us-east-1.amazonaws.com:443/v1/logs", - "https:/logs.us-east-1.amazonaws.com/v1/logs", - "https:://logs.us-east-1.amazonaws.com/v1/logs", - "https://LOGS.us-east-1.amazonaws.com/v1/logs", - "https://logs.us-east-1.amazonaws.com/V1/LOGS", - "https://logs.us-east-1.amazonaws.com/v1/logging", - "https://logs.us-east-1.amazonaws.com/v1/cloudwatchlogs", - "https://logs.us-east-1.amazonaws.com/v1/cwlogs" - }; - - Map noLogGroupHeader = - Map.of( - OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, - "https://logs.us-east-1.amazonaws.com/v1/logs", - OTEL_EXPORTER_OTLP_LOGS_HEADERS, - "x-aws-log-stream=test2", - OTEL_EXPORTER_OTLP_LOGS_PROTOCOL, - "http/protobuf", - OTEL_LOGS_EXPORTER, - "otlp"); - - Map noLogStreamHeader = - Map.of( - OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, - "https://logs.us-east-1.amazonaws.com/v1/logs", - OTEL_EXPORTER_OTLP_LOGS_HEADERS, - "x-aws-log-group=test2", - OTEL_EXPORTER_OTLP_LOGS_PROTOCOL, - "http/protobuf", - OTEL_LOGS_EXPORTER, - "otlp"); - - Map badLogStreamHeader = - Map.of( - OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, - "https://logs.us-east-1.amazonaws.com/v1/logs", - OTEL_EXPORTER_OTLP_LOGS_HEADERS, - "x-aws-log-group=test1,x-aws-log-strea21=test2", - OTEL_EXPORTER_OTLP_LOGS_PROTOCOL, - "http/protobuf", - OTEL_LOGS_EXPORTER, - "otlp"); - - Map invalidProtocol = - Map.of( - OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, - "https://logs.us-east-1.amazonaws.com/v1/logs", - OTEL_EXPORTER_OTLP_LOGS_HEADERS, - "x-aws-log-group=test1,x-aws-log-stream=test2", - OTEL_EXPORTER_OTLP_LOGS_PROTOCOL, - "grpc", - OTEL_LOGS_EXPORTER, - "otlp"); - - Map consoleExporter = - Map.of( - OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, "https://logs.us-east-1.amazonaws.com/v1/logs", - OTEL_EXPORTER_OTLP_LOGS_HEADERS, "x-aws-log-stream=test2", - OTEL_EXPORTER_OTLP_LOGS_PROTOCOL, "http/protobuf", - OTEL_LOGS_EXPORTER, "console"); - - for (String endpoint : logsBadEndpoints) { - Map badEndpoint = - Map.of( - OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, endpoint, - OTEL_EXPORTER_OTLP_LOGS_PROTOCOL, "http/protobuf", - OTEL_LOGS_EXPORTER, "otlp"); - - args.add(badEndpoint); - } - - args.add(badLogStreamHeader); - args.add(noLogStreamHeader); - args.add(noLogGroupHeader); - args.add(invalidProtocol); - args.add(consoleExporter); - - return args.stream().map(Arguments::of); - } -} diff --git a/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/AwsCloudWatchEmfExporterTest.java b/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/AwsCloudWatchEmfExporterTest.java new file mode 100644 index 0000000000..c77c8c65c0 --- /dev/null +++ b/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/AwsCloudWatchEmfExporterTest.java @@ -0,0 +1,310 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.opentelemetry.javaagent.providers.exporter.aws.metrics; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.*; + +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import java.util.*; +import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; +import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; +import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest; +import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest; +import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest; +import software.amazon.awssdk.services.cloudwatchlogs.model.ResourceAlreadyExistsException; +import software.amazon.awssdk.services.cloudwatchlogs.model.ResourceNotFoundException; +import software.amazon.opentelemetry.javaagent.providers.exporter.aws.metrics.common.emitter.CloudWatchLogsClientEmitter; +import software.amazon.opentelemetry.javaagent.providers.exporter.aws.metrics.common.emitter.LogEventEmitter; + +public class AwsCloudWatchEmfExporterTest extends BaseEmfExporterTest { + private static final String LOG_GROUP_NAME = "test-log-group"; + private static final String LOG_STREAM_NAME = "test-stream"; + private static final String REGION = "us-east-1"; + + private AwsCloudWatchEmfExporter mockExporter; + private CloudWatchLogsClientEmitter testMockEmitter; + private CloudWatchLogsClient mockClient; + private CloudWatchLogsClientEmitter wrapper; + private long currentTime; + + @BeforeEach + void setUp() { + super.setup(); + this.currentTime = System.currentTimeMillis(); + this.testMockEmitter = mock(CloudWatchLogsClientEmitter.class); + this.mockExporter = new AwsCloudWatchEmfExporter(NAMESPACE, this.testMockEmitter); + this.mockClient = mock(CloudWatchLogsClient.class); + this.wrapper = spy(new CloudWatchLogsClientEmitter(LOG_GROUP_NAME, LOG_STREAM_NAME, REGION)); + doReturn(this.mockClient).when(this.wrapper).getEmitter(); + } + + @Override + protected LogEventEmitter createEmitter() { + return mock(CloudWatchLogsClientEmitter.class); + } + + @Override + protected MetricExporter createExporter() { + return new AwsCloudWatchEmfExporter(NAMESPACE, this.mockEmitter); + } + + @Test + void testShutdown() { + AwsCloudWatchEmfExporter spyExporter = spy(this.mockExporter); + doNothing().when(this.testMockEmitter).flushEvents(); + + CompletableResultCode result = spyExporter.shutdown(); + + assertTrue(result.isSuccess()); + verify(spyExporter).flush(); + } + + @Test + void testLogEventBatch() { + long laterTime = this.currentTime + 1000; + + this.wrapper.emit(createLogEvent("first message", this.currentTime)); + this.wrapper.emit(createLogEvent("second message", laterTime)); + + // Verify both events are batched together before flush + verify(this.mockClient, never()).putLogEvents(any(PutLogEventsRequest.class)); + + this.wrapper.flushEvents(); + + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(PutLogEventsRequest.class); + verify(this.mockClient).putLogEvents(requestCaptor.capture()); + + PutLogEventsRequest request = requestCaptor.getValue(); + assertEquals(2, request.logEvents().size()); + assertEquals("first message", request.logEvents().get(0).message()); + assertEquals("second message", request.logEvents().get(1).message()); + assertEquals(this.currentTime, request.logEvents().get(0).timestamp()); + assertEquals(laterTime, request.logEvents().get(1).timestamp()); + } + + @Test + void testLogEventsSortedByTimestamp() { + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(PutLogEventsRequest.class); + + // Add events in non-chronological order + this.wrapper.emit(createLogEvent("third", this.currentTime + 2000)); + this.wrapper.emit(createLogEvent("first", this.currentTime)); + this.wrapper.emit(createLogEvent("second", this.currentTime + 1000)); + this.wrapper.flushEvents(); + + // Verify putLogEvents was called with sorted events + verify(this.mockClient).putLogEvents(requestCaptor.capture()); + PutLogEventsRequest request = requestCaptor.getValue(); + + assertEquals(3, request.logEvents().size()); + assertEquals("first", request.logEvents().get(0).message()); + assertEquals("second", request.logEvents().get(1).message()); + assertEquals("third", request.logEvents().get(2).message()); + } + + @Test + void testCreateLogStreamIfNeededAlreadyExists() { + when(this.mockClient.putLogEvents(any(PutLogEventsRequest.class))) + .thenThrow(ResourceNotFoundException.builder().build()) + .thenReturn(null); + when(this.mockClient.createLogStream(any(CreateLogStreamRequest.class))) + .thenThrow(ResourceAlreadyExistsException.builder().build()); + + // Should make a call to create a Log Stream if it does not exist. + assertDoesNotThrow( + () -> { + this.wrapper.emit(createLogEvent("test", this.currentTime)); + this.wrapper.flushEvents(); + }); + + verify(this.mockClient).createLogStream(any(CreateLogStreamRequest.class)); + } + + @Test + void testCreateLogGroupIfNeededAlreadyExists() { + when(this.mockClient.putLogEvents(any(PutLogEventsRequest.class))) + .thenThrow(ResourceNotFoundException.builder().build()) + .thenReturn(null); + when(this.mockClient.createLogGroup(any(CreateLogGroupRequest.class))) + .thenThrow(ResourceAlreadyExistsException.builder().build()); + + // Should make a call to create a Log Group if it does not exist. + assertDoesNotThrow( + () -> { + this.wrapper.emit(createLogEvent("test", this.currentTime)); + this.wrapper.flushEvents(); + }); + + verify(this.mockClient).createLogGroup(any(CreateLogGroupRequest.class)); + } + + @Test + void testBatchActiveNewBatch() { + Map logEvent = createLogEvent("test", this.currentTime); + + // Should batch multiple events with the same timestamp together + this.wrapper.emit(logEvent); + this.wrapper.emit(logEvent); + this.wrapper.emit(logEvent); + + // Should only call emit once per event since batch is active + verify(this.wrapper, times(3)).emit(logEvent); + } + + @Test + void testSendLogEventForceBatchSend() { + // Send events up to the limit (should all be batched) + for (int i = 0; i < 10000; i++) { + this.wrapper.emit(createLogEvent("test message " + i, this.currentTime)); + } + + // At this point, no batch should have been sent yet + verify(this.mockClient, never()).putLogEvents(any(PutLogEventsRequest.class)); + + // Send one more event and should trigger batch send due to count limit + this.wrapper.emit(createLogEvent("final message", this.currentTime)); + + verify(this.mockClient, times(1)).putLogEvents(any(PutLogEventsRequest.class)); + } + + @Test + void testLogEventBatchClear() { + this.wrapper.emit(createLogEvent("test", this.currentTime)); + + this.wrapper.flushEvents(); + verify(this.mockClient, times(1)).putLogEvents(any(PutLogEventsRequest.class)); + + // Add another event after flush - should create new batch + this.wrapper.emit(createLogEvent("new test", this.currentTime + 1000)); + + // Flush again - should send the new event + this.wrapper.flushEvents(); + verify(this.mockClient, times(2)).putLogEvents(any(PutLogEventsRequest.class)); + } + + @Test + void testBatch24HourBoundaryEdgeCases() { + long baseTime = this.currentTime - (25L * 60 * 60 * 1000); // 25 hours ago + + this.wrapper.emit(createLogEvent("first", baseTime)); + + // Should still batch the events together at exactly 24 hours + long exactly24Hours = baseTime + (24L * 60 * 60 * 1000); + this.wrapper.emit(createLogEvent("boundary", exactly24Hours)); + + // Should still be batched - no putLogEvents call yet + verify(this.mockClient, never()).putLogEvents(any(PutLogEventsRequest.class)); + + // Add event just over 24 hour boundary (should trigger new batch) + long over24Hours = baseTime + (24L * 60 * 60 * 1000 + 1); + this.wrapper.emit(createLogEvent("over", over24Hours)); + + verify(this.mockClient, times(1)).putLogEvents(any(PutLogEventsRequest.class)); + } + + @Test + void testBatchInactiveAfter24Hours() { + long futureTime = this.currentTime + (25L * 60 * 60 * 1000); // 25 hours later + Map firstEvent = createLogEvent("test1", this.currentTime); + Map secondEvent = createLogEvent("test2", futureTime); + + this.testMockEmitter.emit(firstEvent); + this.testMockEmitter.emit(secondEvent); + + // Should trigger 2 separate batch sends due to 24-hour span limit + verify(this.testMockEmitter, times(1)).emit(firstEvent); + verify(this.testMockEmitter, times(1)).emit(secondEvent); + } + + @ParameterizedTest + @MethodSource("batchLimitScenarios") + void testEventBatchLimits( + Map logEvent, int eventCount, boolean shouldExceedLimit) { + for (int i = 0; i < eventCount; i++) { + this.testMockEmitter.emit(logEvent); + } + + if (shouldExceedLimit) { + verify(this.testMockEmitter, atLeast(2)).emit(logEvent); + } else { + verify(this.testMockEmitter, times(eventCount)).emit(logEvent); + } + } + + @ParameterizedTest + @MethodSource("invalidLogEvents") + void testValidateLogEventInvalid(Map logEvent) { + assertThrows(IllegalArgumentException.class, () -> this.wrapper.emit(logEvent)); + } + + static Stream batchLimitScenarios() { + Map smallEvent = new HashMap<>(); + smallEvent.put("message", "test"); + smallEvent.put("timestamp", System.currentTimeMillis()); + + Map largeEvent = new HashMap<>(); + largeEvent.put("message", "x".repeat(1024 * 1024)); + largeEvent.put("timestamp", System.currentTimeMillis()); + + return Stream.of( + Arguments.of(smallEvent, 10001, true), // count limit exceeded + Arguments.of(largeEvent, 2, true), // size limit exceeded + Arguments.of(smallEvent, 10, false) // within limits + ); + } + + static Stream invalidLogEvents() { + long currentTime = System.currentTimeMillis(); + Map oldTimestampEvent = new HashMap<>(); + oldTimestampEvent.put("message", "{\"test\":\"data\"}"); + oldTimestampEvent.put("timestamp", currentTime - (15L * 24 * 60 * 60 * 1000)); + + Map futureTimestampEvent = new HashMap<>(); + futureTimestampEvent.put("message", "{\"test\":\"data\"}"); + futureTimestampEvent.put("timestamp", currentTime + (3L * 60 * 60 * 1000)); + + Map emptyMessageEvent = new HashMap<>(); + emptyMessageEvent.put("message", ""); + emptyMessageEvent.put("timestamp", currentTime); + + Map whitespaceMessageEvent = new HashMap<>(); + whitespaceMessageEvent.put("message", " "); + whitespaceMessageEvent.put("timestamp", currentTime); + + Map missingMessageEvent = new HashMap<>(); + missingMessageEvent.put("timestamp", currentTime); + + return Stream.of( + Arguments.of(oldTimestampEvent), + Arguments.of(futureTimestampEvent), + Arguments.of(emptyMessageEvent), + Arguments.of(whitespaceMessageEvent), + Arguments.of(missingMessageEvent)); + } +} diff --git a/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/BaseEmfExporterTest.java b/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/BaseEmfExporterTest.java new file mode 100644 index 0000000000..e9379338b2 --- /dev/null +++ b/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/BaseEmfExporterTest.java @@ -0,0 +1,348 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.opentelemetry.javaagent.providers.exporter.aws.metrics; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.metrics.data.Data; +import io.opentelemetry.sdk.metrics.data.DoublePointData; +import io.opentelemetry.sdk.metrics.data.ExponentialHistogramBuckets; +import io.opentelemetry.sdk.metrics.data.ExponentialHistogramData; +import io.opentelemetry.sdk.metrics.data.ExponentialHistogramPointData; +import io.opentelemetry.sdk.metrics.data.GaugeData; +import io.opentelemetry.sdk.metrics.data.HistogramData; +import io.opentelemetry.sdk.metrics.data.HistogramPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.PointData; +import io.opentelemetry.sdk.metrics.data.SumData; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import io.opentelemetry.sdk.resources.Resource; +import java.util.*; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import software.amazon.opentelemetry.javaagent.providers.exporter.aws.metrics.common.emitter.LogEventEmitter; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +public abstract class BaseEmfExporterTest { + private static final double PRECISION_TOLERANCE = 0.00001; + private static final Random RANDOM = new Random(); + private static long timestampCounter = System.nanoTime(); + private static final ObjectMapper objectMapper = new ObjectMapper(); + private MetricData metricData; + static final String NAMESPACE = "test-namespace"; + List> capturedLogEvents; + MetricExporter exporter; + LogEventEmitter mockEmitter; + + abstract LogEventEmitter createEmitter(); + + abstract MetricExporter createExporter(); + + @BeforeEach + void setup() { + capturedLogEvents = new ArrayList<>(); + mockEmitter = createEmitter(); + exporter = createExporter(); + metricData = mock(MetricData.class); + + doAnswer( + invocation -> { + capturedLogEvents.add(invocation.getArgument(0)); + return null; + }) + .when(mockEmitter) + .emit(any()); + } + + @Test + void testExportEmptyMetrics() { + CompletableResultCode result = exporter.export(Collections.emptyList()); + assertTrue(result.isSuccess()); + assertEquals(0, capturedLogEvents.size()); + } + + @Test + void testExportFailureHandling() { + doThrow(new RuntimeException("Test exception")).when(mockEmitter).emit(any()); + + GaugeData gaugeData = mock(GaugeData.class); + DoublePointData pointData = mock(DoublePointData.class); + when(pointData.getValue()).thenReturn(10.0); + when(pointData.getAttributes()).thenReturn(Attributes.empty()); + when(pointData.getEpochNanos()).thenReturn(timestampCounter += 1_000_000); + + when(metricData.getName()).thenReturn("test.metric"); + when(metricData.getUnit()).thenReturn("1"); + when(metricData.getData()).thenReturn(gaugeData); + when(metricData.getResource()).thenReturn(Resource.getDefault()); + when(gaugeData.getPoints()).thenReturn(Collections.singletonList(pointData)); + + CompletableResultCode result = exporter.export(Collections.singletonList(metricData)); + + assertFalse(result.isSuccess()); + } + + @Test + void testSumMetricProcessing() { + List expectedValues = generateRandomNumbers(2); + String name = "test.sum"; + + SumData sumData = mock(SumData.class); + List points = new ArrayList<>(); + for (Number value : expectedValues) { + DoublePointData pointData = mock(DoublePointData.class); + when(pointData.getValue()).thenReturn(value.doubleValue()); + when(pointData.getAttributes()).thenReturn(Attributes.empty()); + when(pointData.getEpochNanos()).thenReturn(timestampCounter += 1_000_000); + points.add(pointData); + } + when(metricData.getName()).thenReturn(name); + when(metricData.getUnit()).thenReturn("1"); + when(metricData.getData()).thenReturn(sumData); + when(metricData.getResource()).thenReturn(Resource.getDefault()); + when(sumData.getPoints()).thenReturn(points); + + CompletableResultCode result = exporter.export(Collections.singletonList(metricData)); + + assertTrue(result.isSuccess()); + assertEquals(expectedValues.size(), capturedLogEvents.size()); + for (Map logEvent : capturedLogEvents) { + Map emfLog = this.validateEmfStructure(logEvent, name).orElseThrow(); + double actualValue = ((Number) emfLog.get(name)).doubleValue(); + assertTrue( + expectedValues.stream() + .anyMatch(v -> Math.abs(v.doubleValue() - actualValue) < PRECISION_TOLERANCE), + "Actual value " + actualValue + " not found in expected values: " + expectedValues); + } + } + + @Test + void testGaugeMetricProcessing() { + List expectedValues = generateRandomNumbers(3); + String name = "test.gauge"; + + GaugeData gaugeData = mock(GaugeData.class); + List points = new ArrayList<>(); + for (Number value : expectedValues) { + DoublePointData pointData = mock(DoublePointData.class); + when(pointData.getValue()).thenReturn(value.doubleValue()); + when(pointData.getAttributes()).thenReturn(Attributes.empty()); + when(pointData.getEpochNanos()).thenReturn(timestampCounter += 1_000_000); + points.add(pointData); + } + when(metricData.getName()).thenReturn(name); + when(metricData.getUnit()).thenReturn("1"); + when(metricData.getData()).thenReturn(gaugeData); + when(metricData.getResource()).thenReturn(Resource.getDefault()); + when(gaugeData.getPoints()).thenReturn(points); + + CompletableResultCode result = exporter.export(Collections.singletonList(metricData)); + + assertTrue(result.isSuccess()); + assertEquals(expectedValues.size(), capturedLogEvents.size()); + for (Map logEvent : capturedLogEvents) { + Map emfLog = this.validateEmfStructure(logEvent, name).orElseThrow(); + double actualValue = ((Number) emfLog.get(name)).doubleValue(); + assertTrue( + expectedValues.stream() + .anyMatch(v -> Math.abs(v.doubleValue() - actualValue) < PRECISION_TOLERANCE), + "Actual value " + actualValue + " not found in expected values: " + expectedValues); + } + } + + @Test + void testHistogramMetricProcessing() { + String name = "test.histogram"; + + HistogramData histogramData = mock(HistogramData.class); + HistogramPointData pointData = mock(HistogramPointData.class); + when(pointData.getCount()).thenReturn(10L); + when(pointData.getSum()).thenReturn(100.0); + when(pointData.getMin()).thenReturn(5.0); + when(pointData.getMax()).thenReturn(25.0); + when(pointData.getAttributes()).thenReturn(Attributes.empty()); + when(pointData.getEpochNanos()).thenReturn(timestampCounter += 1_000_000); + when(metricData.getName()).thenReturn(name); + when(metricData.getUnit()).thenReturn("ms"); + when(metricData.getData()).thenReturn((Data) histogramData); + when(metricData.getResource()).thenReturn(Resource.getDefault()); + when(histogramData.getPoints()).thenReturn(Collections.singletonList(pointData)); + + CompletableResultCode result = exporter.export(Collections.singletonList(metricData)); + + assertTrue(result.isSuccess()); + assertEquals(1, capturedLogEvents.size()); + Map emfLog = + this.validateEmfStructure(capturedLogEvents.get(0), name).orElseThrow(); + Map histogramDataMap = (Map) emfLog.get(name); + assertEquals(10, ((Number) histogramDataMap.get("Count")).intValue()); + assertEquals(100.0, ((Number) histogramDataMap.get("Sum")).doubleValue(), PRECISION_TOLERANCE); + assertEquals(5.0, ((Number) histogramDataMap.get("Min")).doubleValue(), PRECISION_TOLERANCE); + assertEquals(25.0, ((Number) histogramDataMap.get("Max")).doubleValue(), PRECISION_TOLERANCE); + } + + @Test + void testExponentialHistogramMetricProcessing() { + String name = "test.exp_histogram"; + + ExponentialHistogramPointData dataPoint = mock(ExponentialHistogramPointData.class); + ExponentialHistogramBuckets positiveBuckets = mock(ExponentialHistogramBuckets.class); + ExponentialHistogramBuckets negativeBuckets = mock(ExponentialHistogramBuckets.class); + when(metricData.getName()).thenReturn(name); + when(metricData.getUnit()).thenReturn("s"); + when(dataPoint.getCount()).thenReturn(10L); + when(dataPoint.getSum()).thenReturn(50.0); + when(dataPoint.getMin()).thenReturn(1.0); + when(dataPoint.getMax()).thenReturn(20.0); + when(dataPoint.getScale()).thenReturn(1); + when(dataPoint.getZeroCount()).thenReturn(0L); + when(dataPoint.getAttributes()).thenReturn(Attributes.builder().put("env", "test").build()); + when(dataPoint.getEpochNanos()).thenReturn(1609459200000000000L); + when(dataPoint.getPositiveBuckets()).thenReturn(positiveBuckets); + when(positiveBuckets.getOffset()).thenReturn(0); + when(positiveBuckets.getBucketCounts()).thenReturn(Arrays.asList(1L, 2L, 1L)); + when(dataPoint.getNegativeBuckets()).thenReturn(negativeBuckets); + when(negativeBuckets.getOffset()).thenReturn(0); + when(negativeBuckets.getBucketCounts()).thenReturn(Collections.emptyList()); + ExponentialHistogramData expHistogramData = mock(ExponentialHistogramData.class); + when(expHistogramData.getPoints()).thenReturn(Collections.singletonList(dataPoint)); + when(metricData.getData()).thenReturn((Data) expHistogramData); + when(metricData.getResource()).thenReturn(Resource.getDefault()); + + CompletableResultCode result = exporter.export(Collections.singletonList(metricData)); + + assertTrue(result.isSuccess()); + assertEquals(1, capturedLogEvents.size()); + Map emfLog = + this.validateEmfStructure(capturedLogEvents.get(0), name).orElseThrow(); + Map expHistogramRecord = (Map) emfLog.get(name); + assertTrue(expHistogramRecord.containsKey("Count")); + assertTrue(expHistogramRecord.containsKey("Sum")); + assertTrue(expHistogramRecord.containsKey("Values")); + assertTrue(expHistogramRecord.containsKey("Counts")); + assertEquals(10, ((Number) expHistogramRecord.get("Count")).intValue()); + assertEquals(50.0, ((Number) expHistogramRecord.get("Sum")).doubleValue(), PRECISION_TOLERANCE); + + List values = (List) expHistogramRecord.get("Values"); + List counts = (List) expHistogramRecord.get("Counts"); + assertEquals(values.size(), counts.size()); + List expectedValues = Arrays.asList(1.2071068, 1.7071068, 2.4142136); + List expectedCounts = Arrays.asList(1L, 2L, 1L); + assertEquals(expectedValues.size(), values.size()); + for (int i = 0; i < expectedValues.size(); i++) { + assertEquals(expectedValues.get(i), values.get(i).doubleValue(), PRECISION_TOLERANCE); + assertEquals(expectedCounts.get(i).longValue(), counts.get(i).longValue()); + } + } + + @Test + void testGroupByAttributesAndTimestamp() { + String name1 = "test.metric1"; + String name2 = "test.metric2"; + long timestamp = 1234567890000000L; + Attributes attrs = Attributes.builder().put("env", "test").build(); + + MetricData metric1 = mock(MetricData.class); + GaugeData gaugeData1 = mock(GaugeData.class); + DoublePointData pointData1 = mock(DoublePointData.class); + when(pointData1.getValue()).thenReturn(10.0); + when(pointData1.getAttributes()).thenReturn(attrs); + when(pointData1.getEpochNanos()).thenReturn(timestamp); + when(metric1.getName()).thenReturn(name1); + when(metric1.getUnit()).thenReturn("1"); + when(metric1.getData()).thenReturn(gaugeData1); + when(metric1.getResource()).thenReturn(Resource.getDefault()); + when(gaugeData1.getPoints()).thenReturn(Collections.singletonList(pointData1)); + + MetricData metric2 = mock(MetricData.class); + GaugeData gaugeData2 = mock(GaugeData.class); + DoublePointData pointData2 = mock(DoublePointData.class); + when(pointData2.getValue()).thenReturn(20.0); + when(pointData2.getAttributes()).thenReturn(attrs); + when(pointData2.getEpochNanos()).thenReturn(timestamp); + when(metric2.getName()).thenReturn(name2); + when(metric2.getUnit()).thenReturn("1"); + when(metric2.getData()).thenReturn((Data) gaugeData2); + when(metric2.getResource()).thenReturn(Resource.getDefault()); + when(gaugeData2.getPoints()).thenReturn(Collections.singletonList(pointData2)); + + CompletableResultCode result = exporter.export(Arrays.asList(metric1, metric2)); + + assertTrue(result.isSuccess()); + assertEquals(1, capturedLogEvents.size()); + Map emfLog = + this.validateEmfStructure(capturedLogEvents.get(0), name1).orElseThrow(); + assertTrue(emfLog.containsKey(name1)); + assertTrue(emfLog.containsKey(name2)); + assertEquals(10.0, ((Number) emfLog.get(name1)).doubleValue(), PRECISION_TOLERANCE); + assertEquals(20.0, ((Number) emfLog.get(name2)).doubleValue(), PRECISION_TOLERANCE); + assertEquals("test", emfLog.get("env")); + } + + protected Optional> validateEmfStructure( + Map logEvent, String metricName) { + assertTrue(logEvent.containsKey("message")); + assertTrue(logEvent.containsKey("timestamp")); + + String messageJson = (String) logEvent.get("message"); + try { + Map emfLog = + objectMapper.readValue(messageJson, new TypeReference>() {}); + assertTrue(emfLog.containsKey("_aws")); + Map awsMetadata = (Map) emfLog.get("_aws"); + assertTrue(awsMetadata.containsKey("CloudWatchMetrics")); + List> cloudWatchMetrics = + (List>) awsMetadata.get("CloudWatchMetrics"); + assertEquals(1, cloudWatchMetrics.size()); + Map metricGroup = cloudWatchMetrics.get(0); + assertEquals(NAMESPACE, metricGroup.get("Namespace")); + List> metrics = (List>) metricGroup.get("Metrics"); + assertTrue(metrics.size() >= 1); + boolean foundMetric = metrics.stream().anyMatch(m -> metricName.equals(m.get("Name"))); + assertTrue(foundMetric, "Expected metric " + metricName + " not found in metrics list"); + assertTrue(emfLog.containsKey(metricName)); + return Optional.of(emfLog); + } catch (Exception e) { + fail("Failed to parse JSON message: " + e.getMessage()); + return Optional.empty(); + } + } + + protected Map createLogEvent(String message, long timestamp) { + Map logEvent = new HashMap<>(); + logEvent.put("message", message); + logEvent.put("timestamp", timestamp); + return logEvent; + } + + private List generateRandomNumbers(int count) { + List values = new ArrayList<>(); + for (int i = 0; i < count; i++) { + values.add(RANDOM.nextDouble() * 100); + } + return values; + } +} diff --git a/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/ConsoleEmfExporterTest.java b/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/ConsoleEmfExporterTest.java new file mode 100644 index 0000000000..5b04465a54 --- /dev/null +++ b/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/exporter/aws/metrics/ConsoleEmfExporterTest.java @@ -0,0 +1,160 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.opentelemetry.javaagent.providers.exporter.aws.metrics; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.Collections; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import software.amazon.opentelemetry.javaagent.providers.exporter.aws.metrics.common.emitter.ConsoleEmitter; +import software.amazon.opentelemetry.javaagent.providers.exporter.aws.metrics.common.emitter.LogEventEmitter; + +public class ConsoleEmfExporterTest extends BaseEmfExporterTest { + private LogEventEmitter testMockEmitter; + private ConsoleEmfExporter testExporter; + private ByteArrayOutputStream outputStream; + private ConsoleEmitter consoleEmitter; + + @BeforeEach + void setUp() { + super.setup(); + this.testMockEmitter = mock(ConsoleEmitter.class); + this.testExporter = new ConsoleEmfExporter(NAMESPACE, this.testMockEmitter); + this.outputStream = new ByteArrayOutputStream(); + this.consoleEmitter = new ConsoleEmitter(new PrintStream(this.outputStream)); + } + + @Override + protected LogEventEmitter createEmitter() { + return mock(ConsoleEmitter.class); + } + + @Override + protected MetricExporter createExporter() { + return new ConsoleEmfExporter(NAMESPACE, mockEmitter); + } + + @Test + void testFlush() { + PrintStream mockPrintStream = mock(PrintStream.class); + ConsoleEmitter testEmitter = new ConsoleEmitter(mockPrintStream); + ConsoleEmfExporter exporter = new ConsoleEmfExporter(NAMESPACE, testEmitter); + + assertTrue(exporter.flush().isSuccess()); + verify(mockPrintStream, times(1)).flush(); + } + + @Test + void testShutdown() { + assertTrue(this.testExporter.shutdown().isSuccess()); + } + + @Test + void testIntegrationWithMetricsData() { + MetricData mockMetricData = mock(MetricData.class); + when(mockMetricData.getData()).thenReturn(null); + + CompletableResultCode result = + this.testExporter.export(Collections.singletonList(mockMetricData)); + + assertTrue(result.isSuccess()); + } + + @Test + void testIntegrationExportWithEmptyMetrics() { + CompletableResultCode result = this.testExporter.export(Collections.emptyList()); + + assertTrue(result.isSuccess()); + } + + @Test + void testExportFailureHandling() { + LogEventEmitter failingEmitter = mock(ConsoleEmitter.class); + doThrow(new IllegalStateException("Test exception")).when(failingEmitter).emit(any()); + ConsoleEmfExporter failingExporter = new ConsoleEmfExporter(NAMESPACE, failingEmitter); + + MetricData mockMetricData = mock(MetricData.class); + when(mockMetricData.getData()).thenThrow(new IllegalStateException("Test exception")); + + CompletableResultCode result = + failingExporter.export(Collections.singletonList(mockMetricData)); + + assertFalse(result.isSuccess()); + } + + @Test + void testExportLogEventSuccess() { + String testMessage = + "{\"_aws\":{\"Timestamp\":1640995200000,\"CloudWatchMetrics\":[{\"Namespace\":\"TestNamespace\",\"Dimensions\":[[\"Service\"]],\"Metrics\":[{\"Name\":\"TestMetric\",\"Unit\":\"Count\"}]}]},\"Service\":\"test-service\",\"TestMetric\":42}"; + Map logEvent = createLogEvent(testMessage, 1640995200000L); + + this.consoleEmitter.emit(logEvent); + + String capturedOutput = this.outputStream.toString().trim(); + assertEquals(testMessage, capturedOutput); + } + + @Test + void testExportLogEventEmptyMessage() { + Map logEvent = createLogEvent("", 1640995200000L); + + this.consoleEmitter.emit(logEvent); + + String capturedOutput = this.outputStream.toString().trim(); + assertEquals("", capturedOutput); + } + + @Test + void testExportLogEventMissingMessage() { + Map logEvent = createLogEvent(null, 1640995200000L); + logEvent.remove("message"); + + this.consoleEmitter.emit(logEvent); + + String capturedOutput = this.outputStream.toString().trim(); + assertEquals("", capturedOutput); + } + + @Test + void testExportLogEventWithNullMessage() { + Map logEvent = createLogEvent(null, 1640995200000L); + + this.consoleEmitter.emit(logEvent); + + String capturedOutput = this.outputStream.toString().trim(); + assertEquals("", capturedOutput); + } + + @Test + void testExportLogEventPrintException() { + Map logEvent = createLogEvent("test message", 1640995200000L); + PrintStream failingPrintStream = mock(PrintStream.class); + doThrow(new IllegalStateException("Print failed")) + .when(failingPrintStream) + .println(anyString()); + ConsoleEmitter failingEmitter = new ConsoleEmitter(failingPrintStream); + + assertDoesNotThrow(() -> failingEmitter.emit(logEvent)); + } +} diff --git a/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/OtlpAwsExporterTest.java b/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/exporter/otlp/aws/OtlpAwsExporterTest.java similarity index 99% rename from awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/OtlpAwsExporterTest.java rename to awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/exporter/otlp/aws/OtlpAwsExporterTest.java index 0af51a345a..4c809e8fec 100644 --- a/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/OtlpAwsExporterTest.java +++ b/awsagentprovider/src/test/java/software/amazon/opentelemetry/javaagent/providers/exporter/otlp/aws/OtlpAwsExporterTest.java @@ -13,7 +13,7 @@ * permissions and limitations under the License. */ -package software.amazon.opentelemetry.javaagent.providers; +package software.amazon.opentelemetry.javaagent.providers.exporter.otlp.aws; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any;