From 9cf7a8312d1d39c7f4835abe8cffad5fc51d01f1 Mon Sep 17 00:00:00 2001 From: Sergei Zaychenko Date: Fri, 11 Oct 2024 01:29:46 -0700 Subject: [PATCH] `CompactionService` decoupled from `DatasetRepository` --- .../scenario_existing_diverged_dataset.rs | 6 +- .../scenario_existing_diverged_dataset.rs | 6 +- src/domain/core/src/entities/mod.rs | 2 + .../core/src/entities/resolved_dataset.rs | 29 +++++++ .../core/src/services/compaction_service.rs | 6 +- .../src/task_logical_plan_runner_impl.rs | 7 +- src/infra/core/src/compaction_service_impl.rs | 15 ++-- src/infra/core/src/transform_service_impl.rs | 5 +- .../compact_dataset_use_case_impl.rs | 14 ++- .../core/tests/tests/engine/test_engine_io.rs | 1 - .../tests/tests/test_compact_service_impl.rs | 85 +++++++------------ .../tests/test_transform_service_impl.rs | 6 +- 12 files changed, 107 insertions(+), 75 deletions(-) create mode 100644 src/domain/core/src/entities/resolved_dataset.rs diff --git a/src/adapter/http/tests/tests/tests_pull/scenarios/scenario_existing_diverged_dataset.rs b/src/adapter/http/tests/tests/tests_pull/scenarios/scenario_existing_diverged_dataset.rs index abc4089f3..a99469f71 100644 --- a/src/adapter/http/tests/tests/tests_pull/scenarios/scenario_existing_diverged_dataset.rs +++ b/src/adapter/http/tests/tests/tests_pull/scenarios/scenario_existing_diverged_dataset.rs @@ -91,10 +91,14 @@ impl SmartPullExistingDivergedDatasetScenario ) .await; + let server_dataset = server_harness + .cli_dataset_repository() + .get_dataset_by_handle(&server_create_result.dataset_handle); + let compaction_service = server_harness.cli_compaction_service(); let server_compaction_result = compaction_service .compact_dataset( - &server_create_result.dataset_handle, + ResolvedDataset::new(server_dataset, server_create_result.dataset_handle.clone()), CompactionOptions::default(), None, ) diff --git a/src/adapter/http/tests/tests/tests_push/scenarios/scenario_existing_diverged_dataset.rs b/src/adapter/http/tests/tests/tests_push/scenarios/scenario_existing_diverged_dataset.rs index 979181ba3..f70817b4e 100644 --- a/src/adapter/http/tests/tests/tests_push/scenarios/scenario_existing_diverged_dataset.rs +++ b/src/adapter/http/tests/tests/tests_push/scenarios/scenario_existing_diverged_dataset.rs @@ -93,11 +93,15 @@ impl SmartPushExistingDivergedDatasetScenario ) .await; + let client_dataset = client_harness + .dataset_repository() + .get_dataset_by_handle(&client_create_result.dataset_handle); + // Compact at client side let compaction_service = client_harness.compaction_service(); let client_compaction_result = compaction_service .compact_dataset( - &client_create_result.dataset_handle, + ResolvedDataset::new(client_dataset, client_create_result.dataset_handle.clone()), CompactionOptions::default(), None, ) diff --git a/src/domain/core/src/entities/mod.rs b/src/domain/core/src/entities/mod.rs index a9a5137a6..1b02abdad 100644 --- a/src/domain/core/src/entities/mod.rs +++ b/src/domain/core/src/entities/mod.rs @@ -12,8 +12,10 @@ pub mod dataset_summary; pub mod engine; pub mod metadata_chain; pub mod metadata_stream; +pub mod resolved_dataset; pub use dataset::*; pub use dataset_summary::*; pub use metadata_chain::*; pub use metadata_stream::*; +pub use resolved_dataset::*; diff --git a/src/domain/core/src/entities/resolved_dataset.rs b/src/domain/core/src/entities/resolved_dataset.rs new file mode 100644 index 000000000..da88f5553 --- /dev/null +++ b/src/domain/core/src/entities/resolved_dataset.rs @@ -0,0 +1,29 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::sync::Arc; + +use opendatafabric::DatasetHandle; + +use super::Dataset; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub struct ResolvedDataset { + pub dataset: Arc, + pub handle: DatasetHandle, +} + +impl ResolvedDataset { + pub fn new(dataset: Arc, handle: DatasetHandle) -> Self { + Self { dataset, handle } + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/core/src/services/compaction_service.rs b/src/domain/core/src/services/compaction_service.rs index 811fb4a05..30e742007 100644 --- a/src/domain/core/src/services/compaction_service.rs +++ b/src/domain/core/src/services/compaction_service.rs @@ -27,7 +27,7 @@ pub const DEFAULT_MAX_SLICE_RECORDS: u64 = 10_000; pub trait CompactionService: Send + Sync { async fn compact_dataset( &self, - dataset_handle: &DatasetHandle, + target: ResolvedDataset, options: CompactionOptions, listener: Option>, ) -> Result; @@ -114,9 +114,9 @@ impl From for CompactionError { } #[derive(Error, Debug)] -#[error("Dataset {dataset_name} in not root kind")] +#[error("Dataset '{dataset_alias}' in not root kind")] pub struct InvalidDatasetKindError { - pub dataset_name: DatasetName, + pub dataset_alias: DatasetAlias, } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/task-system/services/src/task_logical_plan_runner_impl.rs b/src/domain/task-system/services/src/task_logical_plan_runner_impl.rs index 904e92ffe..0c30b2870 100644 --- a/src/domain/task-system/services/src/task_logical_plan_runner_impl.rs +++ b/src/domain/task-system/services/src/task_logical_plan_runner_impl.rs @@ -23,6 +23,7 @@ use kamu_core::{ PullService, ResetError, ResetService, + ResolvedDataset, TransformError, }; use kamu_datasets::{DatasetEnvVar, DatasetEnvVarService}; @@ -137,6 +138,7 @@ impl TaskLogicalPlanRunnerImpl { ) -> Result { let compaction_svc = self.catalog.get_one::().int_err()?; let dataset_repo = self.catalog.get_one::().int_err()?; + let dataset_handle = dataset_repo .resolve_dataset_ref(&args.dataset_id.as_local_ref()) .await @@ -144,7 +146,10 @@ impl TaskLogicalPlanRunnerImpl { let compaction_result = compaction_svc .compact_dataset( - &dataset_handle, + ResolvedDataset::new( + dataset_repo.get_dataset_by_handle(&dataset_handle), + dataset_handle.clone(), + ), CompactionOptions { max_slice_size: args.max_slice_size, max_slice_records: args.max_slice_records, diff --git a/src/infra/core/src/compaction_service_impl.rs b/src/infra/core/src/compaction_service_impl.rs index 5c46d6576..9992cab79 100644 --- a/src/infra/core/src/compaction_service_impl.rs +++ b/src/infra/core/src/compaction_service_impl.rs @@ -32,7 +32,6 @@ use internal_error::ResultIntoInternal; use kamu_core::*; use opendatafabric::{ Checkpoint, - DatasetHandle, DatasetKind, DatasetVocabulary, MetadataEvent, @@ -50,7 +49,6 @@ use crate::*; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// pub struct CompactionServiceImpl { - dataset_repo: Arc, object_store_registry: Arc, time_source: Arc, run_info_dir: Arc, @@ -102,13 +100,11 @@ struct ChainFilesInfo { #[interface(dyn CompactionService)] impl CompactionServiceImpl { pub fn new( - dataset_repo: Arc, object_store_registry: Arc, time_source: Arc, run_info_dir: Arc, ) -> Self { Self { - dataset_repo, object_store_registry, time_source, run_info_dir, @@ -463,13 +459,12 @@ impl CompactionService for CompactionServiceImpl { #[tracing::instrument(level = "info", skip_all)] async fn compact_dataset( &self, - dataset_handle: &DatasetHandle, + target: ResolvedDataset, options: CompactionOptions, maybe_listener: Option>, ) -> Result { - let dataset = self.dataset_repo.get_dataset_by_handle(dataset_handle); - - let dataset_kind = dataset + let dataset_kind = target + .dataset .get_summary(GetSummaryOpts::default()) .await .int_err()? @@ -478,7 +473,7 @@ impl CompactionService for CompactionServiceImpl { if !options.keep_metadata_only && dataset_kind != DatasetKind::Root { return Err(CompactionError::InvalidDatasetKind( InvalidDatasetKindError { - dataset_name: dataset_handle.alias.dataset_name.clone(), + dataset_alias: target.handle.alias.clone(), }, )); } @@ -492,7 +487,7 @@ impl CompactionService for CompactionServiceImpl { match self .compact_dataset_impl( - dataset, + target.dataset, max_slice_size, max_slice_records, options.keep_metadata_only, diff --git a/src/infra/core/src/transform_service_impl.rs b/src/infra/core/src/transform_service_impl.rs index 8fead7347..ca7809be4 100644 --- a/src/infra/core/src/transform_service_impl.rs +++ b/src/infra/core/src/transform_service_impl.rs @@ -710,7 +710,10 @@ impl TransformServiceImpl { let compaction_result = self .compaction_svc .compact_dataset( - &dataset_handle, + ResolvedDataset::new( + self.dataset_repo.get_dataset_by_handle(&dataset_handle), + dataset_handle.clone(), + ), CompactionOptions { keep_metadata_only: true, ..Default::default() diff --git a/src/infra/core/src/use_cases/compact_dataset_use_case_impl.rs b/src/infra/core/src/use_cases/compact_dataset_use_case_impl.rs index 6ef6ff9a6..fc5a9da43 100644 --- a/src/infra/core/src/use_cases/compact_dataset_use_case_impl.rs +++ b/src/infra/core/src/use_cases/compact_dataset_use_case_impl.rs @@ -20,7 +20,9 @@ use kamu_core::{ CompactionResponse, CompactionResult, CompactionService, + DatasetRegistry, NullCompactionMultiListener, + ResolvedDataset, }; use opendatafabric::DatasetHandle; @@ -30,16 +32,19 @@ use opendatafabric::DatasetHandle; #[interface(dyn CompactDatasetUseCase)] pub struct CompactDatasetUseCaseImpl { compaction_service: Arc, + dataset_registry: Arc, dataset_action_authorizer: Arc, } impl CompactDatasetUseCaseImpl { pub fn new( compaction_service: Arc, + dataset_registry: Arc, dataset_action_authorizer: Arc, ) -> Self { Self { compaction_service, + dataset_registry, dataset_action_authorizer, } } @@ -58,9 +63,16 @@ impl CompactDatasetUseCase for CompactDatasetUseCaseImpl { .check_action_allowed(dataset_handle, DatasetAction::Write) .await?; + // Resolve dataset + let dataset = self.dataset_registry.get_dataset_by_handle(dataset_handle); + // Actual action self.compaction_service - .compact_dataset(dataset_handle, options, maybe_listener) + .compact_dataset( + ResolvedDataset::new(dataset, dataset_handle.clone()), + options, + maybe_listener, + ) .await } diff --git a/src/infra/core/tests/tests/engine/test_engine_io.rs b/src/infra/core/tests/tests/engine/test_engine_io.rs index 73aed3992..d20c561b3 100644 --- a/src/infra/core/tests/tests/engine/test_engine_io.rs +++ b/src/infra/core/tests/tests/engine/test_engine_io.rs @@ -69,7 +69,6 @@ async fn test_engine_io_common< engine_provisioner.clone(), Arc::new(SystemTimeSourceDefault), Arc::new(CompactionServiceImpl::new( - dataset_repo.clone(), object_store_registry.clone(), time_source.clone(), run_info_dir.clone(), diff --git a/src/infra/core/tests/tests/test_compact_service_impl.rs b/src/infra/core/tests/tests/test_compact_service_impl.rs index 920dea467..63e486306 100644 --- a/src/infra/core/tests/tests/test_compact_service_impl.rs +++ b/src/infra/core/tests/tests/test_compact_service_impl.rs @@ -67,12 +67,7 @@ async fn test_dataset_compact() { assert_matches!( harness - .compaction_svc - .compact_dataset( - &created.dataset_handle, - CompactionOptions::default(), - Some(Arc::new(NullCompactionListener {})) - ) + .compact_dataset(&created, CompactionOptions::default()) .await, Ok(CompactionResult::NothingToDo) ); @@ -111,12 +106,7 @@ async fn test_dataset_compact() { assert_matches!( harness - .compaction_svc - .compact_dataset( - &created.dataset_handle, - CompactionOptions::default(), - Some(Arc::new(NullCompactionListener {})) - ) + .compact_dataset(&created, CompactionOptions::default()) .await, Ok(CompactionResult::Success { new_head, @@ -209,12 +199,7 @@ async fn test_dataset_compact_s3() { assert_matches!( harness - .compaction_svc - .compact_dataset( - &created.dataset_handle, - CompactionOptions::default(), - Some(Arc::new(NullCompactionListener {})) - ) + .compact_dataset(&created, CompactionOptions::default()) .await, Ok(CompactionResult::NothingToDo) ); @@ -253,13 +238,8 @@ async fn test_dataset_compact_s3() { assert_matches!( harness - .compaction_svc - .compact_dataset( - &created.dataset_handle, - CompactionOptions::default(), - Some(Arc::new(NullCompactionListener {})) - ) - .await, + .compact_dataset(&created, CompactionOptions::default()) + .await, Ok(CompactionResult::Success { new_head, old_head, @@ -371,12 +351,7 @@ async fn test_dataset_compaction_watermark_only_blocks() { // After: ... <- add_data(6 records, wm2, src2) let res = harness - .compaction_svc - .compact_dataset( - &created.dataset_handle, - CompactionOptions::default(), - Some(Arc::new(NullCompactionListener {})), - ) + .compact_dataset(&created, CompactionOptions::default()) .await .unwrap(); @@ -538,14 +513,12 @@ async fn test_dataset_compaction_limits() { assert_matches!( harness - .compaction_svc .compact_dataset( - &created.dataset_handle, + &created, CompactionOptions { max_slice_records: Some(6), ..CompactionOptions::default() }, - Some(Arc::new(NullCompactionListener {})) ) .await, Ok(CompactionResult::Success { @@ -698,11 +671,9 @@ async fn test_dataset_compaction_keep_all_non_data_blocks() { assert_matches!( harness - .compaction_svc .compact_dataset( - &created.dataset_handle, + &created, CompactionOptions::default(), - Some(Arc::new(NullCompactionListener {})) ) .await, Ok(CompactionResult::Success { @@ -785,12 +756,7 @@ async fn test_dataset_compaction_derive_error() { assert_matches!( harness - .compaction_svc - .compact_dataset( - &created.dataset_handle, - CompactionOptions::default(), - Some(Arc::new(NullCompactionListener {})) - ) + .compact_dataset(&created, CompactionOptions::default(),) .await, Err(CompactionError::InvalidDatasetKind(_)), ); @@ -844,15 +810,13 @@ async fn test_large_dataset_compact() { assert_matches!( harness - .compaction_svc .compact_dataset( - &created.dataset_handle, + &created, CompactionOptions { max_slice_records: Some(10), max_slice_size: None, ..CompactionOptions::default() }, - Some(Arc::new(NullCompactionListener {})) ) .await, Ok(CompactionResult::Success { @@ -942,14 +906,12 @@ async fn test_dataset_keep_metadata_only_compact() { assert_matches!( harness - .compaction_svc .compact_dataset( - &created_derived.dataset_handle, + &created_derived, CompactionOptions { keep_metadata_only: true, ..CompactionOptions::default() }, - Some(Arc::new(NullCompactionListener {})) ) .await, Ok(CompactionResult::NothingToDo) @@ -979,14 +941,12 @@ async fn test_dataset_keep_metadata_only_compact() { assert_matches!( harness - .compaction_svc .compact_dataset( - &created_derived.dataset_handle, + &created_derived, CompactionOptions { keep_metadata_only: true, ..CompactionOptions::default() }, - Some(Arc::new(NullCompactionListener {})) ) .await, Ok(CompactionResult::Success { @@ -1026,14 +986,12 @@ async fn test_dataset_keep_metadata_only_compact() { assert_matches!( harness - .compaction_svc .compact_dataset( - &created_root.dataset_handle, + &created_root, CompactionOptions { keep_metadata_only: true, ..CompactionOptions::default() }, - Some(Arc::new(NullCompactionListener {})) ) .await, Ok(CompactionResult::Success { @@ -1401,4 +1359,21 @@ impl CompactTestHarness { result.outcome.is_ok() } + + async fn compact_dataset( + &self, + dataset_create_result: &CreateDatasetResult, + compaction_options: CompactionOptions, + ) -> Result { + self.compaction_svc + .compact_dataset( + ResolvedDataset::new( + dataset_create_result.dataset.clone(), + dataset_create_result.dataset_handle.clone(), + ), + compaction_options, + Some(Arc::new(NullCompactionListener {})), + ) + .await + } } diff --git a/src/infra/core/tests/tests/test_transform_service_impl.rs b/src/infra/core/tests/tests/test_transform_service_impl.rs index 71cad6799..8784532af 100644 --- a/src/infra/core/tests/tests/test_transform_service_impl.rs +++ b/src/infra/core/tests/tests/test_transform_service_impl.rs @@ -644,10 +644,14 @@ async fn test_transform_with_compaction_retry() { assert_matches!(transform_result, Ok(TransformResult::Updated { .. })); + let foo_dataset = harness + .dataset_repo + .get_dataset_by_handle(&foo_created_result.dataset_handle); + harness .compaction_service .compact_dataset( - &foo_created_result.dataset_handle, + ResolvedDataset::new(foo_dataset, foo_created_result.dataset_handle.clone()), CompactionOptions::default(), None, )