Skip to content

Commit

Permalink
CompactionService decoupled from DatasetRepository
Browse files Browse the repository at this point in the history
  • Loading branch information
zaychenko-sergei committed Oct 11, 2024
1 parent 7053f4a commit 9cf7a83
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,14 @@ impl<TServerHarness: ServerSideHarness> SmartPullExistingDivergedDatasetScenario
)
.await;

let server_dataset = server_harness
.cli_dataset_repository()
.get_dataset_by_handle(&server_create_result.dataset_handle);

let compaction_service = server_harness.cli_compaction_service();
let server_compaction_result = compaction_service
.compact_dataset(
&server_create_result.dataset_handle,
ResolvedDataset::new(server_dataset, server_create_result.dataset_handle.clone()),
CompactionOptions::default(),
None,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,15 @@ impl<TServerHarness: ServerSideHarness> SmartPushExistingDivergedDatasetScenario
)
.await;

let client_dataset = client_harness
.dataset_repository()
.get_dataset_by_handle(&client_create_result.dataset_handle);

// Compact at client side
let compaction_service = client_harness.compaction_service();
let client_compaction_result = compaction_service
.compact_dataset(
&client_create_result.dataset_handle,
ResolvedDataset::new(client_dataset, client_create_result.dataset_handle.clone()),
CompactionOptions::default(),
None,
)
Expand Down
2 changes: 2 additions & 0 deletions src/domain/core/src/entities/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ pub mod dataset_summary;
pub mod engine;
pub mod metadata_chain;
pub mod metadata_stream;
pub mod resolved_dataset;

pub use dataset::*;
pub use dataset_summary::*;
pub use metadata_chain::*;
pub use metadata_stream::*;
pub use resolved_dataset::*;
29 changes: 29 additions & 0 deletions src/domain/core/src/entities/resolved_dataset.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright Kamu Data, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::sync::Arc;

use opendatafabric::DatasetHandle;

use super::Dataset;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

pub struct ResolvedDataset {
pub dataset: Arc<dyn Dataset>,
pub handle: DatasetHandle,
}

impl ResolvedDataset {
pub fn new(dataset: Arc<dyn Dataset>, handle: DatasetHandle) -> Self {
Self { dataset, handle }
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
6 changes: 3 additions & 3 deletions src/domain/core/src/services/compaction_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub const DEFAULT_MAX_SLICE_RECORDS: u64 = 10_000;
pub trait CompactionService: Send + Sync {
async fn compact_dataset(
&self,
dataset_handle: &DatasetHandle,
target: ResolvedDataset,
options: CompactionOptions,
listener: Option<Arc<dyn CompactionListener>>,
) -> Result<CompactionResult, CompactionError>;
Expand Down Expand Up @@ -114,9 +114,9 @@ impl From<SetRefError> for CompactionError {
}

#[derive(Error, Debug)]
#[error("Dataset {dataset_name} in not root kind")]
#[error("Dataset '{dataset_alias}' in not root kind")]
pub struct InvalidDatasetKindError {
pub dataset_name: DatasetName,
pub dataset_alias: DatasetAlias,
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use kamu_core::{
PullService,
ResetError,
ResetService,
ResolvedDataset,
TransformError,
};
use kamu_datasets::{DatasetEnvVar, DatasetEnvVarService};
Expand Down Expand Up @@ -137,14 +138,18 @@ impl TaskLogicalPlanRunnerImpl {
) -> Result<TaskOutcome, InternalError> {
let compaction_svc = self.catalog.get_one::<dyn CompactionService>().int_err()?;
let dataset_repo = self.catalog.get_one::<dyn DatasetRepository>().int_err()?;

let dataset_handle = dataset_repo
.resolve_dataset_ref(&args.dataset_id.as_local_ref())
.await
.int_err()?;

let compaction_result = compaction_svc
.compact_dataset(
&dataset_handle,
ResolvedDataset::new(
dataset_repo.get_dataset_by_handle(&dataset_handle),
dataset_handle.clone(),
),
CompactionOptions {
max_slice_size: args.max_slice_size,
max_slice_records: args.max_slice_records,
Expand Down
15 changes: 5 additions & 10 deletions src/infra/core/src/compaction_service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use internal_error::ResultIntoInternal;
use kamu_core::*;
use opendatafabric::{
Checkpoint,
DatasetHandle,
DatasetKind,
DatasetVocabulary,
MetadataEvent,
Expand All @@ -50,7 +49,6 @@ use crate::*;
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

pub struct CompactionServiceImpl {
dataset_repo: Arc<dyn DatasetRepository>,
object_store_registry: Arc<dyn ObjectStoreRegistry>,
time_source: Arc<dyn SystemTimeSource>,
run_info_dir: Arc<RunInfoDir>,
Expand Down Expand Up @@ -102,13 +100,11 @@ struct ChainFilesInfo {
#[interface(dyn CompactionService)]
impl CompactionServiceImpl {
pub fn new(
dataset_repo: Arc<dyn DatasetRepository>,
object_store_registry: Arc<dyn ObjectStoreRegistry>,
time_source: Arc<dyn SystemTimeSource>,
run_info_dir: Arc<RunInfoDir>,
) -> Self {
Self {
dataset_repo,
object_store_registry,
time_source,
run_info_dir,
Expand Down Expand Up @@ -463,13 +459,12 @@ impl CompactionService for CompactionServiceImpl {
#[tracing::instrument(level = "info", skip_all)]
async fn compact_dataset(
&self,
dataset_handle: &DatasetHandle,
target: ResolvedDataset,
options: CompactionOptions,
maybe_listener: Option<Arc<dyn CompactionListener>>,
) -> Result<CompactionResult, CompactionError> {
let dataset = self.dataset_repo.get_dataset_by_handle(dataset_handle);

let dataset_kind = dataset
let dataset_kind = target
.dataset
.get_summary(GetSummaryOpts::default())
.await
.int_err()?
Expand All @@ -478,7 +473,7 @@ impl CompactionService for CompactionServiceImpl {
if !options.keep_metadata_only && dataset_kind != DatasetKind::Root {
return Err(CompactionError::InvalidDatasetKind(
InvalidDatasetKindError {
dataset_name: dataset_handle.alias.dataset_name.clone(),
dataset_alias: target.handle.alias.clone(),
},
));
}
Expand All @@ -492,7 +487,7 @@ impl CompactionService for CompactionServiceImpl {

match self
.compact_dataset_impl(
dataset,
target.dataset,
max_slice_size,
max_slice_records,
options.keep_metadata_only,
Expand Down
5 changes: 4 additions & 1 deletion src/infra/core/src/transform_service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,10 @@ impl TransformServiceImpl {
let compaction_result = self
.compaction_svc
.compact_dataset(
&dataset_handle,
ResolvedDataset::new(
self.dataset_repo.get_dataset_by_handle(&dataset_handle),
dataset_handle.clone(),
),
CompactionOptions {
keep_metadata_only: true,
..Default::default()
Expand Down
14 changes: 13 additions & 1 deletion src/infra/core/src/use_cases/compact_dataset_use_case_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use kamu_core::{
CompactionResponse,
CompactionResult,
CompactionService,
DatasetRegistry,
NullCompactionMultiListener,
ResolvedDataset,
};
use opendatafabric::DatasetHandle;

Expand All @@ -30,16 +32,19 @@ use opendatafabric::DatasetHandle;
#[interface(dyn CompactDatasetUseCase)]
pub struct CompactDatasetUseCaseImpl {
compaction_service: Arc<dyn CompactionService>,
dataset_registry: Arc<dyn DatasetRegistry>,
dataset_action_authorizer: Arc<dyn DatasetActionAuthorizer>,
}

impl CompactDatasetUseCaseImpl {
pub fn new(
compaction_service: Arc<dyn CompactionService>,
dataset_registry: Arc<dyn DatasetRegistry>,
dataset_action_authorizer: Arc<dyn DatasetActionAuthorizer>,
) -> Self {
Self {
compaction_service,
dataset_registry,
dataset_action_authorizer,
}
}
Expand All @@ -58,9 +63,16 @@ impl CompactDatasetUseCase for CompactDatasetUseCaseImpl {
.check_action_allowed(dataset_handle, DatasetAction::Write)
.await?;

// Resolve dataset
let dataset = self.dataset_registry.get_dataset_by_handle(dataset_handle);

// Actual action
self.compaction_service
.compact_dataset(dataset_handle, options, maybe_listener)
.compact_dataset(
ResolvedDataset::new(dataset, dataset_handle.clone()),
options,
maybe_listener,
)
.await
}

Expand Down
1 change: 0 additions & 1 deletion src/infra/core/tests/tests/engine/test_engine_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ async fn test_engine_io_common<
engine_provisioner.clone(),
Arc::new(SystemTimeSourceDefault),
Arc::new(CompactionServiceImpl::new(
dataset_repo.clone(),
object_store_registry.clone(),
time_source.clone(),
run_info_dir.clone(),
Expand Down
Loading

0 comments on commit 9cf7a83

Please sign in to comment.