Skip to content

Commit

Permalink
feat: update codebase to use Config Injection (#265)
Browse files Browse the repository at this point in the history
* feat: update codebase to use Config Injection

* trigger ci

* checkstyle
  • Loading branch information
paullatzelsperger authored Nov 18, 2024
1 parent ff03664 commit b815d90
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import java.util.Map;

import static java.util.Optional.ofNullable;
import static org.eclipse.edc.catalog.cache.FederatedCatalogDefaultServicesExtension.NUM_CRAWLER_SETTING;
import static org.eclipse.edc.catalog.spi.CacheSettings.DEFAULT_NUMBER_OF_CRAWLERS;
import static org.eclipse.edc.catalog.spi.CatalogConstants.DATASPACE_PROTOCOL;
import static org.eclipse.edc.protocol.dsp.spi.type.DspConstants.DSP_TRANSFORMER_CONTEXT_V_08;
Expand All @@ -67,8 +66,11 @@ public class FederatedCatalogCacheExtension implements ServiceExtension {

public static final String NAME = "Federated Catalog Cache";

@Setting
public static final String CRAWLING_ENABLED_PROPERTY = "edc.catalog.cache.execution.enabled";
@Setting(description = "Determines whether catalog crawling is globally enabled or disabled", key = "edc.catalog.cache.execution.enabled", defaultValue = "true")
private boolean executionEnabled;

@Setting(description = "The number of crawlers (execution threads) that should be used. The engine will re-use crawlers when necessary.", key = "edc.catalog.cache.partition.num.crawlers", defaultValue = DEFAULT_NUMBER_OF_CRAWLERS + "")
private int numCrawlers;

@Inject
private FederatedCatalogCache store;
Expand Down Expand Up @@ -112,13 +114,9 @@ public void initialize(ServiceExtensionContext context) {
if (healthCheckService != null) {
healthCheckService.addReadinessProvider(() -> HealthCheckResult.Builder.newInstance().component("Crawler Subsystem").build());
}
int numCrawlers = context.getSetting(NUM_CRAWLER_SETTING, DEFAULT_NUMBER_OF_CRAWLERS);

// by default only uses FC nodes that are not "self"
nodeFilter = ofNullable(nodeFilter).orElse(node -> !node.name().equals(context.getRuntimeId()));

var isEnabled = context.getConfig().getBoolean(CRAWLING_ENABLED_PROPERTY, true);

executionManager = ExecutionManager.Builder.newInstance()
.monitor(context.getMonitor().withPrefix("ExecutionManager"))
.preExecutionTask(() -> {
Expand All @@ -130,7 +128,7 @@ public void initialize(ServiceExtensionContext context) {
.onSuccess(this::persist)
.nodeDirectory(directory)
.nodeFilterFunction(nodeFilter)
.isEnabled(isEnabled)
.isEnabled(executionEnabled)
.build();

registerTransformers(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,13 @@ public class FederatedCatalogDefaultServicesExtension implements ServiceExtensio

public static final String NAME = "Federated Catalog Default Services";

@Setting("The time to elapse between two crawl runs")
public static final String EXECUTION_PLAN_PERIOD_SECONDS = "edc.catalog.cache.execution.period.seconds";
@Setting("The number of crawlers (execution threads) that should be used. The engine will re-use crawlers when necessary.")
public static final String NUM_CRAWLER_SETTING = "edc.catalog.cache.partition.num.crawlers";
@Setting("The initial delay for the cache crawler engine")
public static final String EXECUTION_PLAN_DELAY_SECONDS = "edc.catalog.cache.execution.delay.seconds";
@Setting(description = "The time to elapse between two crawl runs", key = "edc.catalog.cache.execution.period.seconds", defaultValue = DEFAULT_EXECUTION_PERIOD_SECONDS + "")
private long periodSeconds;

@Setting(description = "The number of crawlers (execution threads) that should be used. The engine will re-use crawlers when necessary.", key = "edc.catalog.cache.partition.num.crawlers", defaultValue = DEFAULT_NUMBER_OF_CRAWLERS + "")
private int numCrawlers;
@Setting(description = "The initial delay for the cache crawler engine", key = "edc.catalog.cache.execution.delay.seconds", required = false)
private Integer delaySeconds;

@Inject
private FederatedCatalogCache store;
Expand All @@ -79,23 +80,20 @@ public QueryService defaultQueryEngine() {

@Provider(isDefault = true)
public ExecutionPlan createRecurringExecutionPlan(ServiceExtensionContext context) {
var periodSeconds = context.getSetting(EXECUTION_PLAN_PERIOD_SECONDS, DEFAULT_EXECUTION_PERIOD_SECONDS);
var setting = context.getSetting(EXECUTION_PLAN_DELAY_SECONDS, null);
int initialDelaySeconds;
if ("random".equals(setting) || setting == null) {
if (delaySeconds == null) {
initialDelaySeconds = randomSeconds();
} else {
try {
initialDelaySeconds = Integer.parseInt(setting);
initialDelaySeconds = delaySeconds;
} catch (NumberFormatException ex) {
initialDelaySeconds = 0;
}
}
var monitor = context.getMonitor();
if (periodSeconds < LOW_EXECUTION_PERIOD_SECONDS_THRESHOLD) {
var crawlers = context.getSetting(NUM_CRAWLER_SETTING, DEFAULT_NUMBER_OF_CRAWLERS);
monitor.warning(format("An execution period of %d seconds is very low (threshold = %d). This might result in the work queue to be ever growing." +
" A longer execution period or more crawler threads (currently using %d) should be considered.", periodSeconds, LOW_EXECUTION_PERIOD_SECONDS_THRESHOLD, crawlers));
" A longer execution period or more crawler threads (currently using %d) should be considered.", periodSeconds, LOW_EXECUTION_PERIOD_SECONDS_THRESHOLD, numCrawlers));
}
return new RecurringExecutionPlan(Duration.ofSeconds(periodSeconds), Duration.ofSeconds(initialDelaySeconds), monitor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.eclipse.edc.junit.extensions.DependencyInjectionExtension;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.system.configuration.Config;
import org.eclipse.edc.spi.system.configuration.ConfigFactory;
import org.eclipse.edc.spi.system.health.HealthCheckService;
import org.eclipse.edc.spi.types.TypeManager;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -36,18 +36,16 @@

import java.time.Duration;
import java.util.List;
import java.util.Map;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.eclipse.edc.catalog.cache.FederatedCatalogCacheExtension.CRAWLING_ENABLED_PROPERTY;
import static org.eclipse.edc.catalog.test.TestUtil.TEST_PROTOCOL;
import static org.eclipse.edc.catalog.test.TestUtil.createCatalog;
import static org.eclipse.edc.catalog.test.TestUtil.createNode;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -86,7 +84,6 @@ void initialize(ServiceExtensionContext context) {
extension.initialize(context);

verify(context, atLeastOnce()).getMonitor();
verify(context).getSetting("edc.catalog.cache.partition.num.crawlers", 2);
}

@Test
Expand All @@ -102,8 +99,7 @@ void initialize_withHealthCheck(ServiceExtensionContext context, ObjectFactory f

@Test
void initialize_withDisabledExecution(ServiceExtensionContext context, ObjectFactory factory) {
var mockedConfig = mock(Config.class);
when(mockedConfig.getBoolean(eq(CRAWLING_ENABLED_PROPERTY), anyBoolean())).thenReturn(false);
var mockedConfig = ConfigFactory.fromMap(Map.of("edc.catalog.cache.execution.enabled", Boolean.FALSE.toString()));
when(context.getConfig()).thenReturn(mockedConfig);
var mockedPlan = mock(ExecutionPlan.class);
context.registerService(ExecutionPlan.class, mockedPlan);
Expand All @@ -114,7 +110,6 @@ void initialize_withDisabledExecution(ServiceExtensionContext context, ObjectFac
extension.start();

verifyNoInteractions(mockedPlan);

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,15 @@
import org.eclipse.edc.spi.types.TypeManager;
import org.eclipse.edc.sql.QueryExecutor;
import org.eclipse.edc.sql.bootstrapper.SqlSchemaBootstrapper;
import org.eclipse.edc.sql.configuration.DataSourceName;
import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry;
import org.eclipse.edc.transaction.spi.TransactionContext;

@Provides(FederatedCatalogCache.class)
@Extension(value = "SQL federated catalog cache")
public class SqlFederatedCatalogCacheExtension implements ServiceExtension {

@Deprecated(since = "0.8.1")
@Setting
public static final String DATASOURCE_SETTING_NAME = "edc.datasource.federatedcatalog.name";
@Setting(value = "The datasource to be used", defaultValue = DataSourceRegistry.DEFAULT_DATASOURCE)
public static final String DATASOURCE_NAME = "edc.sql.store.federatedcatalog.datasource";
@Setting(description = "The datasource to be used", defaultValue = DataSourceRegistry.DEFAULT_DATASOURCE, key = "edc.sql.store.federatedcatalog.datasource")
private String dataSourceName;

@Inject
private DataSourceRegistry dataSourceRegistry;
Expand All @@ -59,7 +55,6 @@ public class SqlFederatedCatalogCacheExtension implements ServiceExtension {
@Override
public void initialize(ServiceExtensionContext context) {
typeManager.registerTypes(Catalog.class, Dataset.class);
var dataSourceName = getDataSourceName(context);
var store = new SqlFederatedCatalogCache(dataSourceRegistry, dataSourceName, trxContext,
typeManager.getMapper(), queryExecutor, getStatementImpl());
context.registerService(FederatedCatalogCache.class, store);
Expand All @@ -73,7 +68,4 @@ private FederatedCatalogCacheStatements getStatementImpl() {
return statements != null ? statements : new PostgresDialectStatements();
}

private String getDataSourceName(ServiceExtensionContext context) {
return DataSourceName.getDataSourceName(DATASOURCE_NAME, DATASOURCE_SETTING_NAME, context.getConfig(), context.getMonitor());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
import org.junit.jupiter.api.extension.ExtendWith;

import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.edc.catalog.store.sql.SqlFederatedCatalogCacheExtension.DATASOURCE_NAME;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -57,6 +55,5 @@ void shouldInitializeTheStore(SqlFederatedCatalogCacheExtension extension, Servi
assertThat(service).isInstanceOf(SqlFederatedCatalogCache.class);

verify(typeManager).registerTypes(Catalog.class, Dataset.class);
verify(config).getString(eq(DATASOURCE_NAME), any());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
@Extension(value = "SQL target node directory")
public class SqlTargetNodeDirectoryExtension implements ServiceExtension {

@Setting(value = "The datasource to be used", defaultValue = DataSourceRegistry.DEFAULT_DATASOURCE)
public static final String DATASOURCE_NAME = "edc.sql.store.targetnodedirectory.datasource";
@Setting(description = "The datasource to be used", defaultValue = DataSourceRegistry.DEFAULT_DATASOURCE, key = "edc.sql.store.targetnodedirectory.datasource")
private String dataSourceName;

@Inject
private DataSourceRegistry dataSourceRegistry;
Expand All @@ -54,7 +54,6 @@ public class SqlTargetNodeDirectoryExtension implements ServiceExtension {
@Override
public void initialize(ServiceExtensionContext context) {
typeManager.registerTypes(TargetNode.class);
var dataSourceName = context.getSetting(DATASOURCE_NAME, DataSourceRegistry.DEFAULT_DATASOURCE);
var targetNodeDirectory = new SqlTargetNodeDirectory(dataSourceRegistry, dataSourceName, trxContext,
typeManager.getMapper(), queryExecutor, getStatementImpl());
context.registerService(TargetNodeDirectory.class, targetNodeDirectory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.system.configuration.Config;
import org.eclipse.edc.spi.types.TypeManager;
import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.edc.catalog.store.sql.SqlTargetNodeDirectoryExtension.DATASOURCE_NAME;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -56,6 +54,5 @@ void shouldInitializeTheStore(SqlTargetNodeDirectoryExtension extension, Service
assertThat(service).isInstanceOf(SqlTargetNodeDirectory.class);

verify(typeManager).registerTypes(TargetNode.class);
verify(config).getString(DATASOURCE_NAME, DataSourceRegistry.DEFAULT_DATASOURCE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.util.Map;
import java.util.HashMap;

import static io.restassured.RestAssured.given;
import static org.awaitility.Awaitility.await;
Expand Down Expand Up @@ -55,16 +55,22 @@ class FederatedCatalogDcp extends SmokeTest {
@RegisterExtension
protected RuntimeExtension runtime =
new RuntimePerMethodExtension(new EmbeddedRuntime("fc-dcp-bom",
Map.of(
"edc.iam.sts.oauth.token.url", "https://sts.com/token",
"edc.iam.sts.oauth.client.id", "test-clientid",
"edc.iam.sts.oauth.client.secret.alias", "test-alias",
"web.http.port", "8080",
"web.http.path", "/api",
"web.http.catalog.port", "8081",
"web.http.catalog.path", "/api/catalog",
"edc.catalog.cache.execution.period.seconds", "5",
"edc.catalog.cache.execution.delay.seconds", "0"),
new HashMap<>() {
{
put("edc.iam.sts.oauth.token.url", "https://sts.com/token");
put("edc.iam.sts.oauth.client.id", "test-clientid");
put("edc.iam.sts.oauth.client.secret.alias", "test-alias");
put("web.http.port", "8080");
put("web.http.path", "/api");
put("web.http.catalog.port", "8081");
put("web.http.catalog.path", "/api/catalog");
put("edc.catalog.cache.execution.period.seconds", "5");
put("edc.iam.issuer.id", "did:web:testparticipant");
put("edc.iam.sts.privatekey.alias", "private-alias");
put("edc.iam.sts.publickey.id", "public-key-id");
put("edc.catalog.cache.execution.delay.seconds", "0");
}
},
":dist:bom:federatedcatalog-dcp-bom"
));
}
Expand Down

0 comments on commit b815d90

Please sign in to comment.