Skip to content

Commit

Permalink
Merge pull request #3 from trocco-io/feature/support-with_group_by
Browse files Browse the repository at this point in the history
Add Support for AWS Cost Explorer's withGroupBy Feature
  • Loading branch information
d-hrs authored Mar 25, 2024
2 parents 14db3db + 90ae145 commit 2cd2558
Show file tree
Hide file tree
Showing 8 changed files with 449 additions and 65 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ AWS Cost Explorer Cost and Usage Report
- **access_key_id**: AWS Access Key Id (integer, required)
- **secret_access_key**: AWS Secrete Access Key (string, default: `myvalue`)
- **metrics**: metrics of the report (`AmortizedCost` `BlendedCost` `NetAmortizedCost` `NetUnblendedCost` `NormalizedUsageAmount` `UnblendedCost` `UsageQuantity`, default: `UnblendedCost`)
- **groups**: grouping of costs by up to two different groups, either dimensions, tag keys, cost categories, or any two group by types (optional)
- **type**: valid values `DIMENSION` `TAG` `COST_CATEGORY` (string, required)
- **key**: valid values for the `DIMENSION` type
are `AZ` `INSTANCE_TYPE` `LEGAL_ENTITY_NAME` `INVOICING_ENTITY` `LINKED_ACCOUNT` `OPERATION` `PLATFORM` `PURCHASE_TYPE` `SERVICE` `TENANCY` `RECORD_TYPE` and `USAGE_TYPE`
(string, required)
- **start_date**: start date of the report (string, required, format: `2020-01-01`)
- **end_date**: end date of the report (string, required, format: `2020-01-01`)

Expand All @@ -25,6 +30,11 @@ in:
access_key_id: xxx
secret_access_key: yyy
metrics: UnblendedCost
groups:
- type: DIMENSION
key: USAGE_TYPE
- type: TAG
key: Environment
start_date: "2020-04-01"
end_date: "2020-04-20"
```
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ targetCompatibility = 1.8
dependencies {
compile "org.embulk:embulk-core:0.9.23"
provided "org.embulk:embulk-core:0.9.23"
compile "com.amazonaws:aws-java-sdk-costexplorer:1.11.772"
compile "com.amazonaws:aws-java-sdk-costexplorer:1.12.666"
testCompile "junit:junit:4.+"
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,129 +1,117 @@
package org.embulk.input.aws_cost_explorer;

import java.util.List;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.costexplorer.AWSCostExplorer;
import com.amazonaws.services.costexplorer.AWSCostExplorerClientBuilder;
import com.amazonaws.services.costexplorer.model.DateInterval;
import com.amazonaws.services.costexplorer.model.GetCostAndUsageRequest;
import com.amazonaws.services.costexplorer.model.GetCostAndUsageResult;
import com.amazonaws.services.costexplorer.model.Granularity;
import com.amazonaws.services.costexplorer.model.MetricValue;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;

import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigSource;
import org.embulk.config.Task;
import org.embulk.config.TaskReport;
import org.embulk.config.TaskSource;
import org.embulk.spi.Column;
import org.embulk.input.aws_cost_explorer.client.AwsCostExplorerClient;
import org.embulk.input.aws_cost_explorer.client.AwsCostExplorerClientFactory;
import org.embulk.input.aws_cost_explorer.client.AwsCostExplorerRequestParametersFactory;
import org.embulk.spi.Exec;
import org.embulk.spi.InputPlugin;
import org.embulk.spi.PageBuilder;
import org.embulk.spi.PageOutput;
import org.embulk.spi.Schema;
import org.embulk.spi.time.TimestampParser;
import org.embulk.spi.type.Types;
import org.slf4j.Logger;

public class AwsCostExplorerInputPlugin implements InputPlugin
{
protected final Logger logger = Exec.getLogger(getClass());
import java.util.List;
import java.util.Map;

public interface PluginTask extends Task
public class AwsCostExplorerInputPlugin
implements InputPlugin
{
public interface PluginTask
extends Task
{
@Config("access_key_id")
public String getAccessKeyId();
String getAccessKeyId();

@Config("secret_access_key")
public String getSecretAccessKey();
String getSecretAccessKey();

@Config("metrics")
@ConfigDefault("\"UnblendedCost\"")
public String getMetrics();
String getMetrics();

@Config("groups")
@ConfigDefault("[]")
List<Map<String, String>> getGroups();

@Config("start_date")
public String getStartDate();
String getStartDate();

@Config("end_date")
public String getEndDate();
String getEndDate();
}

@Override
public ConfigDiff transaction(final ConfigSource config, final InputPlugin.Control control)
{
final PluginTask task = config.loadConfig(PluginTask.class);

ImmutableList.Builder<Column> columns = ImmutableList.builder();

columns.add(new Column(0, "time_period_start", Types.TIMESTAMP));
columns.add(new Column(1, "time_period_end", Types.TIMESTAMP));
columns.add(new Column(2, "metrics", Types.STRING));
columns.add(new Column(3, "amount", Types.DOUBLE));
columns.add(new Column(4, "unit", Types.STRING));
columns.add(new Column(5, "estimated", Types.BOOLEAN));
GroupsConfigValidator.validate(task.getGroups());

final Schema schema = new Schema(columns.build());
final Schema schema = createSchema(task);
final int taskCount = 1; // number of run() method calls

return resume(task.dump(), schema, taskCount, control);
}

private Schema createSchema(PluginTask task)
{
Schema.Builder builder = Schema.builder()
.add("time_period_start", Types.TIMESTAMP)
.add("time_period_end", Types.TIMESTAMP)
.add("metrics", Types.STRING);

addGroupsToSchema(builder, task.getGroups());

builder
.add("amount", Types.DOUBLE)
.add("unit", Types.STRING)
.add("estimated", Types.BOOLEAN);

return builder.build();
}

private void addGroupsToSchema(Schema.Builder builder, List<Map<String, String>> groups)
{
for (int i = 1; i <= groups.size(); i++) {
builder.add("group_key" + i, Types.STRING);
}
}

@Override
public ConfigDiff resume(final TaskSource taskSource, final Schema schema, final int taskCount,
final InputPlugin.Control control)
final InputPlugin.Control control)
{
control.run(taskSource, schema, taskCount);
return Exec.newConfigDiff();
}

@Override
public void cleanup(final TaskSource taskSource, final Schema schema, final int taskCount,
final List<TaskReport> successTaskReports)
final List<TaskReport> successTaskReports)
{
}

@Override
public TaskReport run(final TaskSource taskSource, final Schema schema, final int taskIndex,
final PageOutput output)
final PageOutput output)
{
final PluginTask task = taskSource.loadTask(PluginTask.class);

final AWSCredentials cred = new BasicAWSCredentials(task.getAccessKeyId(), task.getSecretAccessKey());

final AWSCostExplorer awsCostExplorerClient = AWSCostExplorerClientBuilder.standard().withRegion("us-east-1")
.withCredentials(new AWSStaticCredentialsProvider(cred)).build();
GetCostAndUsageRequest request = new GetCostAndUsageRequest()
.withTimePeriod(new DateInterval().withStart(task.getStartDate()).withEnd(task.getEndDate()))
.withGranularity(Granularity.DAILY).withMetrics(task.getMetrics());

GetCostAndUsageResult result = awsCostExplorerClient.getCostAndUsage(request);
final TimestampParser parser = TimestampParser.of("%Y-%m-%d", "UTC");
final AwsCostExplorerClient awsCostExplorerClient = AwsCostExplorerClientFactory.create(task);
final GetCostAndUsageRequest requestParameters = AwsCostExplorerRequestParametersFactory.create(task);

try (final PageBuilder pageBuilder = new PageBuilder(Exec.getBufferAllocator(), schema, output)) {
result.getResultsByTime().forEach(resultsByTime -> {
System.out.println(resultsByTime.toString());
logger.info("Cost Explorer API results: {}", resultsByTime.toString());
String start = resultsByTime.getTimePeriod().getStart();
String end = resultsByTime.getTimePeriod().getEnd();
pageBuilder.setTimestamp(schema.getColumn(0), parser.parse(start));
pageBuilder.setTimestamp(schema.getColumn(1), parser.parse(end));
pageBuilder.setString(schema.getColumn(2), task.getMetrics());
MetricValue metricValue = resultsByTime.getTotal().get(task.getMetrics());
pageBuilder.setDouble(schema.getColumn(3), Double.parseDouble(metricValue.getAmount()));
pageBuilder.setString(schema.getColumn(4), metricValue.getUnit());
pageBuilder.setBoolean(schema.getColumn(5), resultsByTime.isEstimated());
pageBuilder.addRecord();
});
awsCostExplorerClient.requestAll(requestParameters).forEach(response -> response.addRecordsToPage(pageBuilder, task));
pageBuilder.finish();
}

return null;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package org.embulk.input.aws_cost_explorer;

import com.amazonaws.services.costexplorer.model.Dimension;
import com.amazonaws.services.costexplorer.model.GroupDefinitionType;
import org.embulk.config.ConfigException;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class GroupsConfigValidator
{
private GroupsConfigValidator()
{
}

/**
* Validates the groups configuration.
*
* @param groups the groups configuration
*/
public static void validate(List<Map<String, String>> groups)
{
if (groups.isEmpty()) {
return;
}

if (groups.size() > 2) {
throw new ConfigException("groups must have at most 2 elements.");
}

for (Map<String, String> group : groups) {
validateGroup(group);
}

validateDuplicateGroups(groups);
}

private static void validateGroup(Map<String, String> group)
{
final String groupType = group.get("type");
final String groupKey = group.get("key");

if (groupType == null || groupKey == null) {
throw new ConfigException("group must have a type and a key.");
}

if (GroupDefinitionType.DIMENSION.name().equals(groupType)) {
validateDimensionGroupKey(groupKey);
return;
}

for (GroupDefinitionType enumEntry : GroupDefinitionType.values()) {
if (enumEntry.toString().equals(groupType)) {
validateGroupKey(groupKey);
return;
}
}

throw new ConfigException("group type must be one of the following: "
+ Arrays.stream(GroupDefinitionType.values()).map(Enum::name).collect(Collectors.joining(", ")));
}

private static void validateDimensionGroupKey(String groupKey)
{
for (Dimension enumEntry : Dimension.values()) {
if (enumEntry.name().equals(groupKey)) {
return;
}
}

throw new ConfigException("dimension group key must be one of the following: "
+ Arrays.stream(Dimension.values()).map(Enum::name).collect(Collectors.joining(", ")));
}

private static void validateGroupKey(String groupKey)
{
if (!groupKey.isEmpty() && groupKey.length() <= 1024) {
return;
}

throw new ConfigException("group key must be non-empty and at most 1024 characters long.");
}

private static void validateDuplicateGroups(List<Map<String, String>> groups)
{
Set<String> typeAndKeyPairs = new HashSet<>();

for (Map<String, String> group : groups) {
String type = group.get("type");
String key = group.get("key");
String pair = type + ":" + key;

if (!typeAndKeyPairs.add(pair)) {
throw new ConfigException("groups must not have duplicate type and key pairs.");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.embulk.input.aws_cost_explorer.client;

import com.amazonaws.services.costexplorer.AWSCostExplorer;
import com.amazonaws.services.costexplorer.model.GetCostAndUsageRequest;
import com.amazonaws.services.costexplorer.model.GetCostAndUsageResult;

import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class AwsCostExplorerClient
{
private final AWSCostExplorer client;

public AwsCostExplorerClient(AWSCostExplorer client)
{
this.client = client;
}

/**
* Request all pages of the cost and usage data.
*
* @param requestParameters The request parameters for the cost and usage data.
* @return A stream of the cost and usage data.
*/
public Stream<AwsCostExplorerResponse> requestAll(GetCostAndUsageRequest requestParameters)
{
return StreamSupport.stream(makeResponseIterator(requestParameters), false);
}

private Spliterator<AwsCostExplorerResponse> makeResponseIterator(GetCostAndUsageRequest requestParameters)
{
return new Spliterator<AwsCostExplorerResponse>()
{
@Override
public boolean tryAdvance(Consumer<? super AwsCostExplorerResponse> action)
{
final AwsCostExplorerResponse response = request(requestParameters);
action.accept(response);

final String nextPageToken = response.getNextPageToken();
requestParameters.setNextPageToken(nextPageToken);

return nextPageToken != null && !nextPageToken.isEmpty();
}

private AwsCostExplorerResponse request(GetCostAndUsageRequest requestParameters)
{
final GetCostAndUsageResult result = client.getCostAndUsage(requestParameters);
return new AwsCostExplorerResponse(result);
}

@Override
public Spliterator<AwsCostExplorerResponse> trySplit()
{
return null; // Parallel processing is not supported.
}

@Override
public long estimateSize()
{
return Long.MAX_VALUE; // Size is unknown in advance.
}

@Override
public int characteristics()
{
return Spliterator.NONNULL | Spliterator.IMMUTABLE;
}
};
}
}
Loading

0 comments on commit 2cd2558

Please sign in to comment.