Skip to content

Commit

Permalink
fix: config
Browse files Browse the repository at this point in the history
  • Loading branch information
kamilczaja committed Oct 24, 2024
1 parent 171f91c commit 39abdf5
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import de.sovity.edc.ext.catalog.crawler.dao.CatalogPatchApplier;
import de.sovity.edc.ext.catalog.crawler.dao.config.DataSourceFactory;
import de.sovity.edc.ext.catalog.crawler.dao.config.DslContextFactory;
import de.sovity.edc.ext.catalog.crawler.dao.config.FlywayService;
import de.sovity.edc.ext.catalog.crawler.dao.connectors.ConnectorQueries;
import de.sovity.edc.ext.catalog.crawler.dao.connectors.ConnectorStatusUpdater;
import de.sovity.edc.ext.catalog.crawler.dao.contract_offers.ContractOfferQueries;
Expand Down Expand Up @@ -112,8 +111,6 @@ public static CrawlerExtensionContext buildContext(
// DB
var dataSourceFactory = new DataSourceFactory(config);
var dataSource = dataSourceFactory.newDataSource();
var flywayService = new FlywayService(config, monitor, dataSource);
flywayService.validateOrMigrateInTests();

// Dao
var dataOfferQueries = new DataOfferQueries();
Expand Down Expand Up @@ -183,10 +180,10 @@ public static CrawlerExtensionContext buildContext(

// Schedules
List<CronJobRef<?>> jobs = List.of(
getOnlineConnectorRefreshCronJob(dslContextFactory, connectorQueueFiller),
getOfflineConnectorRefreshCronJob(dslContextFactory, connectorQueueFiller),
getDeadConnectorRefreshCronJob(dslContextFactory, connectorQueueFiller),
getOfflineConnectorCleanerCronJob(dslContextFactory, offlineConnectorCleaner)
getOnlineConnectorRefreshCronJob(dslContextFactory, connectorQueueFiller, config),
getOfflineConnectorRefreshCronJob(dslContextFactory, connectorQueueFiller, config),
getDeadConnectorRefreshCronJob(dslContextFactory, connectorQueueFiller, config),
getOfflineConnectorCleanerCronJob(dslContextFactory, offlineConnectorCleaner, config)
);

// Startup
Expand Down Expand Up @@ -264,9 +261,9 @@ private static AssetMapper newAssetMapper(

@NotNull
private static CronJobRef<OfflineConnectorCleanerJob> getOfflineConnectorCleanerCronJob(DslContextFactory dslContextFactory,
OfflineConnectorCleaner offlineConnectorCleaner) {
OfflineConnectorCleaner offlineConnectorCleaner, Config config) {
return new CronJobRef<>(
CrawlerExtension.SCHEDULED_KILL_OFFLINE_CONNECTORS,
CrawlerConfigProps.CRAWLER_SCHEDULED_KILL_OFFLINE_CONNECTORS.getStringOrThrow(config),
OfflineConnectorCleanerJob.class,
() -> new OfflineConnectorCleanerJob(dslContextFactory, offlineConnectorCleaner)
);
Expand All @@ -275,10 +272,11 @@ private static CronJobRef<OfflineConnectorCleanerJob> getOfflineConnectorCleaner
@NotNull
private static CronJobRef<OnlineConnectorRefreshJob> getOnlineConnectorRefreshCronJob(
DslContextFactory dslContextFactory,
ConnectorQueueFiller connectorQueueFiller
ConnectorQueueFiller connectorQueueFiller,
Config config
) {
return new CronJobRef<>(
CrawlerExtension.CRON_ONLINE_CONNECTOR_REFRESH,
CrawlerConfigProps.CRAWLER_CRON_ONLINE_CONNECTOR_REFRESH.getStringOrThrow(config),
OnlineConnectorRefreshJob.class,
() -> new OnlineConnectorRefreshJob(dslContextFactory, connectorQueueFiller)
);
Expand All @@ -287,20 +285,21 @@ private static CronJobRef<OnlineConnectorRefreshJob> getOnlineConnectorRefreshCr
@NotNull
private static CronJobRef<OfflineConnectorRefreshJob> getOfflineConnectorRefreshCronJob(
DslContextFactory dslContextFactory,
ConnectorQueueFiller connectorQueueFiller
ConnectorQueueFiller connectorQueueFiller,
Config config
) {
return new CronJobRef<>(
CrawlerExtension.CRON_OFFLINE_CONNECTOR_REFRESH,
CrawlerConfigProps.CRAWLER_CRON_OFFLINE_CONNECTOR_REFRESH.getStringOrThrow(config),
OfflineConnectorRefreshJob.class,
() -> new OfflineConnectorRefreshJob(dslContextFactory, connectorQueueFiller)
);
}

@NotNull
private static CronJobRef<DeadConnectorRefreshJob> getDeadConnectorRefreshCronJob(DslContextFactory dslContextFactory,
ConnectorQueueFiller connectorQueueFiller) {
ConnectorQueueFiller connectorQueueFiller, Config config) {
return new CronJobRef<>(
CrawlerExtension.CRON_DEAD_CONNECTOR_REFRESH,
CrawlerConfigProps.CRAWLER_CRON_DEAD_CONNECTOR_REFRESH.getStringOrThrow(config),
DeadConnectorRefreshJob.class,
() -> new DeadConnectorRefreshJob(dslContextFactory, connectorQueueFiller)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package de.sovity.edc.ext.catalog.crawler.dao.config;

import com.zaxxer.hikari.HikariDataSource;
import de.sovity.edc.ext.catalog.crawler.CrawlerConfigProps;
import de.sovity.edc.ext.catalog.crawler.CrawlerExtension;
import de.sovity.edc.extension.postgresql.HikariDataSourceFactory;
import de.sovity.edc.extension.postgresql.JdbcCredentials;
Expand All @@ -36,8 +37,8 @@ public class DataSourceFactory {
*/
public HikariDataSource newDataSource() {
var jdbcCredentials = getJdbcCredentials();
int maxPoolSize = config.getInteger(CrawlerExtension.DB_CONNECTION_POOL_SIZE);
int connectionTimeoutInMs = config.getInteger(CrawlerExtension.DB_CONNECTION_TIMEOUT_IN_MS);
int maxPoolSize = CrawlerConfigProps.CRAWLER_DB_CONNECTION_POOL_SIZE.getInt(config);
int connectionTimeoutInMs = CrawlerConfigProps.CRAWLER_DB_CONNECTION_TIMEOUT_IN_MS.getInt(config);
return HikariDataSourceFactory.newDataSource(
jdbcCredentials,
maxPoolSize,
Expand All @@ -48,16 +49,9 @@ public HikariDataSource newDataSource() {

public JdbcCredentials getJdbcCredentials() {
return new JdbcCredentials(
getRequiredStringProperty(config, CrawlerExtension.JDBC_URL),
getRequiredStringProperty(config, CrawlerExtension.JDBC_USER),
getRequiredStringProperty(config, CrawlerExtension.JDBC_PASSWORD)
CrawlerConfigProps.CRAWLER_DB_JDBC_URL.getStringOrThrow(config),
CrawlerConfigProps.CRAWLER_DB_JDBC_USER.getStringOrThrow(config),
CrawlerConfigProps.CRAWLER_DB_JDBC_PASSWORD.getStringOrThrow(config)
);
}

private String getRequiredStringProperty(Config config, String name) {
String value = config.getString(name, "");
Validate.notBlank(value, "EDC Property '%s' is required".formatted(name));
return value;
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package de.sovity.edc.ext.catalog.crawler.orchestration.config;

import de.sovity.edc.ext.catalog.crawler.CrawlerConfigProps;
import de.sovity.edc.ext.catalog.crawler.CrawlerExtension;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
Expand All @@ -27,11 +28,11 @@ public class CrawlerConfigFactory {
private final Config config;

public CrawlerConfig buildCrawlerConfig() {
var environmentId = config.getString(CrawlerExtension.ENVIRONMENT_ID);
var numThreads = config.getInteger(CrawlerExtension.NUM_THREADS, 1);
var killOfflineConnectorsAfter = getDuration(CrawlerExtension.KILL_OFFLINE_CONNECTORS_AFTER, Duration.ofDays(5));
var maxDataOffers = config.getInteger(CrawlerExtension.MAX_DATA_OFFERS_PER_CONNECTOR, -1);
var maxContractOffers = config.getInteger(CrawlerExtension.MAX_CONTRACT_OFFERS_PER_DATA_OFFER, -1);
var environmentId = CrawlerConfigProps.CRAWLER_ENVIRONMENT_ID.getStringOrThrow(config);
var numThreads = CrawlerConfigProps.CRAWLER_NUM_THREADS.getInt(config);
var killOfflineConnectorsAfter = Duration.parse(CrawlerConfigProps.CRAWLER_KILL_OFFLINE_CONNECTORS_AFTER.getStringOrThrow(config));
var maxDataOffers = CrawlerConfigProps.CRAWLER_MAX_DATA_OFFERS_PER_CONNECTOR.getInt(config);
var maxContractOffers = CrawlerConfigProps.CRAWLER_MAX_CONTRACT_OFFERS_PER_DATA_OFFER.getInt(config);

return CrawlerConfig.builder()
.environmentId(environmentId)
Expand All @@ -41,14 +42,4 @@ public CrawlerConfig buildCrawlerConfig() {
.maxContractOffersPerDataOffer(maxContractOffers)
.build();
}

private Duration getDuration(@NonNull String configProperty, Duration defaultValue) {
var value = config.getString(configProperty, "");

if (StringUtils.isBlank(value)) {
return defaultValue;
}

return Duration.parse(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import com.zaxxer.hikari.HikariDataSource;
import de.sovity.edc.ext.catalog.crawler.dao.config.DslContextFactory;
import de.sovity.edc.ext.catalog.crawler.dao.config.FlywayService;
import de.sovity.edc.extension.e2e.db.TestDatabaseViaTestcontainers;
import de.sovity.edc.extension.postgresql.FlywayExecutionParams;
import de.sovity.edc.extension.postgresql.FlywayUtils;
import de.sovity.edc.extension.postgresql.HikariDataSourceFactory;
import de.sovity.edc.extension.postgresql.JdbcCredentials;
Expand Down Expand Up @@ -39,7 +39,7 @@ public void beforeAll(ExtensionContext extensionContext) throws Exception {
dslContextFactory = new DslContextFactory(dataSource);

// Migrate DB
var params = FlywayService.baseConfig("classpath:/migration-test-utils")
var params = baseConfig("classpath:/migration-test-utils")
.migrate(true)
.build();
try {
Expand All @@ -59,4 +59,14 @@ public void afterAll(ExtensionContext extensionContext) throws Exception {
// Close DB
db.afterAll(extensionContext);
}

public static FlywayExecutionParams.FlywayExecutionParamsBuilder baseConfig(String additionalMigrationLocations) {
var migrationLocations = FlywayUtils.parseFlywayLocations(
"classpath:/db/migration,%s".formatted(additionalMigrationLocations)
);

return FlywayExecutionParams.builder()
.migrationLocations(migrationLocations)
.table("flyway_schema_history");
}
}

0 comments on commit 39abdf5

Please sign in to comment.