Skip to content

Commit

Permalink
Merge pull request #304 from asserts/radha/sc-16108
Browse files Browse the repository at this point in the history
Fix NPEs in exporter SaaS mode
  • Loading branch information
jradhakrishnan authored Jul 11, 2023
2 parents 46f1b2c + 1baf98d commit 4001fe5
Show file tree
Hide file tree
Showing 13 changed files with 130 additions and 54 deletions.
4 changes: 3 additions & 1 deletion src/main/java/ai/asserts/aws/MetadataTaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,9 @@ public void perMinute() {
if (environmentConfig.isEnabled()) {
taskThreadPool.getExecutorService().submit(scrapeConfigProvider::update);
taskThreadPool.getExecutorService().submit(ecsTaskProvider);
taskThreadPool.getExecutorService().submit(ecsServiceDiscoveryExporter);
if (environmentConfig.isSingleTenant()) {
taskThreadPool.getExecutorService().submit(ecsServiceDiscoveryExporter);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.hekate.cluster.ClusterNode;
import lombok.extern.slf4j.Slf4j;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -42,16 +43,19 @@ public Set<AWSAccount> getAccounts() {
}

public List<String> pick(List<String> allKeys) {
RendezvousHash<String, String> hash = new RendezvousHash<>(
hashFunction,
(Funnel<String>) (from, into) -> into.putUnencodedChars(from),
(Funnel<String>) (from, into) -> into.putUnencodedChars(from),
hekateCluster.allNodes().stream().map(this::clusterNodeToString).collect(Collectors.toList()));

String localNodeString = clusterNodeToString(hekateCluster.localNode());
return allKeys.stream()
.filter(k -> localNodeString.equals(hash.get(k)))
.collect(Collectors.toList());
if (hekateCluster.clusterDiscovered()) {
RendezvousHash<String, String> hash = new RendezvousHash<>(
hashFunction,
(Funnel<String>) (from, into) -> into.putUnencodedChars(from),
(Funnel<String>) (from, into) -> into.putUnencodedChars(from),
hekateCluster.allNodes().stream().map(this::clusterNodeToString).collect(Collectors.toList()));

String localNodeString = clusterNodeToString(hekateCluster.localNode());
return allKeys.stream()
.filter(k -> localNodeString.equals(hash.get(k)))
.collect(Collectors.toList());
}
return Collections.emptyList();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,6 @@ private Set<AWSAccount> getAccountsInternal() {
regions = new TreeSet<>();
regions.add("us-west-2");
}
if (scrapeConfig.isScrapeCurrentAccount()) {
String accountId = accountIDProvider.getAccountId();
AWSAccount ac = new AWSAccount(tenantName, accountId, null, null, null, regions);
accountRegions.put(ac.getAccountId(), ac);
log.info("Scraping AWS Accounts {}", accountRegions);
}

// Get Configured AWS Accounts
if (scrapeConfig.isFetchAccountConfigs()) {
Expand Down Expand Up @@ -96,9 +90,15 @@ private Set<AWSAccount> getAccountsInternal() {
accountRegions.putIfAbsent(awsAccount.getAccountId(), awsAccount);
});
}

}
}
if (scrapeConfig.isScrapeCurrentAccount()) {
String accountId = accountIDProvider.getAccountId();
AWSAccount ac = new AWSAccount(tenantName, accountId, null, null, null,
regions);
accountRegions.putIfAbsent(ac.getAccountId(), ac);
log.info("Scraping AWS Accounts {}", accountRegions);
}
return Sets.newHashSet(accountRegions.values());
}

Expand Down
6 changes: 4 additions & 2 deletions src/main/java/ai/asserts/aws/cluster/HekateCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@
import io.hekate.cluster.event.ClusterEvent;
import io.hekate.cluster.event.ClusterEventListener;
import io.hekate.core.HekateException;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

import java.util.List;

public class HekateCluster implements ClusterEventListener {
private ClusterTopology clusterTopology;

public boolean clusterDiscovered() {
return clusterTopology != null;
}

@Override
public void onEvent(ClusterEvent event) throws HekateException {
clusterTopology = event.topology();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import ai.asserts.aws.EnvironmentConfig;
import ai.asserts.aws.ObjectMapperFactory;
import ai.asserts.aws.ScrapeConfigProvider;
import ai.asserts.aws.account.AccountTenantMapper;
import ai.asserts.aws.config.ScrapeConfig;
import ai.asserts.aws.config.ScrapeConfig.SubnetDetails;
import ai.asserts.aws.resource.ResourceMapper;
Expand Down Expand Up @@ -66,6 +67,7 @@ public class ECSServiceDiscoveryExporter implements InitializingBean, Runnable {
private final ECSTaskProvider ecsTaskProvider;

private final AccountIDProvider accountIDProvider;
private final AccountTenantMapper accountTenantMapper;

@Getter
private final AtomicReference<SubnetDetails> subnetDetails = new AtomicReference<>(null);
Expand All @@ -77,7 +79,8 @@ public ECSServiceDiscoveryExporter(EnvironmentConfig environmentConfig,
RestTemplate restTemplate, AccountIDProvider accountIDProvider,
ScrapeConfigProvider scrapeConfigProvider,
ResourceMapper resourceMapper, ECSTaskUtil ecsTaskUtil,
ObjectMapperFactory objectMapperFactory, ECSTaskProvider ecsTaskProvider) {
ObjectMapperFactory objectMapperFactory, ECSTaskProvider ecsTaskProvider,
AccountTenantMapper accountTenantMapper) {
this.environmentConfig = environmentConfig;
this.restTemplate = restTemplate;
this.scrapeConfigProvider = scrapeConfigProvider;
Expand All @@ -86,6 +89,7 @@ public ECSServiceDiscoveryExporter(EnvironmentConfig environmentConfig,
this.objectMapperFactory = objectMapperFactory;
this.ecsTaskProvider = ecsTaskProvider;
this.accountIDProvider = accountIDProvider;
this.accountTenantMapper = accountTenantMapper;
if (environmentConfig.isEnabled()) {
identifySubnetsToScrape();
}
Expand Down Expand Up @@ -128,9 +132,11 @@ public void afterPropertiesSet() throws Exception {
* <code>false</code> otherwise.
*/
public boolean isPrimaryExporter() {
ScrapeConfig scrapeConfig = scrapeConfigProvider.getScrapeConfig(null);
String accountId = accountIDProvider.getAccountId();
String tenantName = accountTenantMapper.getTenantName(accountId);
ScrapeConfig scrapeConfig = scrapeConfigProvider.getScrapeConfig(tenantName);
Map<String, SubnetDetails> primaryExportersByAccount = scrapeConfig.getPrimaryExporterByAccount();
SubnetDetails primaryConfig = primaryExportersByAccount.get(accountIDProvider.getAccountId());
SubnetDetails primaryConfig = primaryExportersByAccount.get(accountId);
return primaryConfig == null ||
(!hasLength(primaryConfig.getVpcId()) || runningInVPC(primaryConfig.getVpcId())) &&
(!hasLength(primaryConfig.getSubnetId()) || runningInSubnet(primaryConfig.getSubnetId()));
Expand All @@ -153,7 +159,8 @@ public boolean runningInSubnet(String subnetId) {
@Override
public void run() {
if (environmentConfig.isSingleTenant()) {
ScrapeConfig scrapeConfig = scrapeConfigProvider.getScrapeConfig(null);
String tenantName = accountTenantMapper.getTenantName(accountIDProvider.getAccountId());
ScrapeConfig scrapeConfig = scrapeConfigProvider.getScrapeConfig(tenantName);
if (scrapeConfig.isDiscoverECSTasks()) {
List<StaticConfig> targets = new ArrayList<>(ecsTaskProvider.getScrapeTargets());
// If scrapes need to happen over TLS, split the configs into TLS and non-TLS.
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/ai/asserts/aws/exporter/ECSTaskProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import ai.asserts.aws.TaskExecutorUtil;
import ai.asserts.aws.account.AWSAccount;
import ai.asserts.aws.account.AccountProvider;
import ai.asserts.aws.account.AccountTenantMapper;
import ai.asserts.aws.config.ScrapeConfig;
import ai.asserts.aws.exporter.ECSServiceDiscoveryExporter.StaticConfig;
import ai.asserts.aws.resource.Resource;
Expand Down Expand Up @@ -222,9 +223,13 @@ void buildNewTargets(AWSAccount account, ScrapeConfig scrapeConfig,
taskResponse.tasks().stream()
.filter(ecsTaskUtil::hasAllInfo)
.forEach(task -> resourceMapper.map(task.taskArn()).ifPresent(taskResource -> {
String tenantName = account.getName();
List<StaticConfig> staticConfigs =
ecsTaskUtil.buildScrapeTargets(
scrapeConfigProvider.getScrapeConfig(null), ecsClient, cluster,
account,
scrapeConfigProvider.getScrapeConfig(tenantName),
ecsClient,
cluster,
getService(task), task);
Map<Resource, List<StaticConfig>> clusterTargets =
tasksByCluster.computeIfAbsent(cluster, k -> new HashMap<>());
Expand Down
14 changes: 9 additions & 5 deletions src/main/java/ai/asserts/aws/exporter/ECSTaskUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ public boolean hasAllInfo(Task task) {
}

@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public List<StaticConfig> buildScrapeTargets(ScrapeConfig scrapeConfig, EcsClient ecsClient,
public List<StaticConfig> buildScrapeTargets(AWSAccount account, ScrapeConfig scrapeConfig, EcsClient ecsClient,
Resource cluster, Optional<String> service, Task task) {
Map<String, String> tagLabels = tagUtil.tagLabels(scrapeConfig, task.tags().stream()
.map(ecsTag -> Tag.builder().key(ecsTag.key()).value(ecsTag.value()).build())
.collect(Collectors.toList()));

LabelsBuilder labelsBuilder = getLabelsBuilder(cluster, service, task);
LabelsBuilder labelsBuilder = getLabelsBuilder(account, cluster, service, task);

String ipAddress = getIPAddress(task);
if (ipAddress == null) {
Expand Down Expand Up @@ -188,7 +188,7 @@ public List<StaticConfig> buildScrapeTargets(ScrapeConfig scrapeConfig, EcsClien
}

@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private LabelsBuilder getLabelsBuilder(Resource cluster, Optional<String> service, Task task) {
private LabelsBuilder getLabelsBuilder(AWSAccount account, Resource cluster, Optional<String> service, Task task) {
Resource taskDefResource = resourceMapper.map(task.taskDefinitionArn())
.orElseThrow(() -> new RuntimeException("Unknown resource ARN: " + task.taskDefinitionArn()));
Resource taskResource = resourceMapper.map(task.taskArn())
Expand All @@ -207,7 +207,7 @@ private LabelsBuilder getLabelsBuilder(Resource cluster, Optional<String> servic
.accountId(cluster.getAccount())
.region(cluster.getRegion())
.cluster(cluster.getName())
.env(envName != null ? envName : cluster.getAccount())
.env(getEnv(account))
.site(cluster.getRegion())
.taskDefName(taskDefResource.getName())
.taskDefVersion(taskDefResource.getVersion())
Expand All @@ -223,7 +223,7 @@ private LabelsBuilder getLabelsBuilder(Resource cluster, Optional<String> servic
.accountId(cluster.getAccount())
.region(cluster.getRegion())
.cluster(cluster.getName())
.env(envName != null ? envName : cluster.getAccount())
.env(getEnv(account))
.site(cluster.getRegion())
.taskDefName(taskDefResource.getName())
.taskDefVersion(taskDefResource.getVersion())
Expand All @@ -232,6 +232,10 @@ private LabelsBuilder getLabelsBuilder(Resource cluster, Optional<String> servic
return labelsBuilder;
}

private String getEnv(AWSAccount account) {
return envName != null ? envName : account.getName() != null ? account.getName() : account.getAccountId();
}

private String getIPAddress(Task task) {
String ipAddress = null;
Optional<KeyValuePair> ipAddressOpt = task.attachments().stream()
Expand Down
1 change: 1 addition & 0 deletions src/test/java/ai/asserts/aws/MetadataTaskManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ public void perMinuteTasks_singleInstancePrimaryMode() {
Capture<Runnable> capture0 = newCapture();
Capture<Runnable> capture1 = newCapture();
Capture<Runnable> capture2 = newCapture();
expect(environmentConfig.isSingleTenant()).andReturn(true);
expect(taskThreadPool.getExecutorService()).andReturn(executorService).anyTimes();
expect(executorService.submit(capture(capture0))).andReturn(null);
expect(scrapeConfigProvider.getScrapeConfig("")).andReturn(scrapeConfig).anyTimes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -73,8 +74,18 @@ String clusterNodeToString(ClusterNode node) {
};
}

@Test
public void getAccounts_ClusterNotDiscovered() {
expect(delegate.getAccounts()).andReturn(allAccounts);
expect(hekateCluster.clusterDiscovered()).andReturn(false);
replayAll();
assertEquals(Collections.emptySet(), testClass.getAccounts());
verifyAll();
}

@Test
public void getAccounts_OnlyOneNode() {
expect(hekateCluster.clusterDiscovered()).andReturn(true);
expect(delegate.getAccounts()).andReturn(allAccounts);
expect(hekateCluster.localNode()).andReturn(clusterNode1);
expect(hekateCluster.allNodes()).andReturn(ImmutableList.of(clusterNode1)).anyTimes();
Expand All @@ -85,6 +96,7 @@ public void getAccounts_OnlyOneNode() {

@Test
public void getAccounts_TwoNodes() {
expect(hekateCluster.clusterDiscovered()).andReturn(true).anyTimes();
expect(delegate.getAccounts()).andReturn(allAccounts).anyTimes();
expect(hekateCluster.localNode()).andReturn(clusterNode1);
expect(hekateCluster.localNode()).andReturn(clusterNode2);
Expand Down
4 changes: 4 additions & 0 deletions src/test/java/ai/asserts/aws/cluster/HekateClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

import static org.easymock.EasyMock.expect;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class HekateClusterTest extends EasyMockSupport {
@Test
Expand All @@ -29,9 +31,11 @@ public void clusterSetup() {
expect(clusterTopology.nodes()).andReturn(allNodes);

replayAll();
assertFalse(hekateCluster.clusterDiscovered());
hekateCluster.onEvent(clusterEvent);
assertEquals(node1, hekateCluster.localNode());
assertEquals(allNodes, hekateCluster.allNodes());
assertTrue(hekateCluster.clusterDiscovered());
verifyAll();
}
}
Loading

0 comments on commit 4001fe5

Please sign in to comment.