diff --git a/src/domain/core/src/services/reset_service.rs b/src/domain/core/src/services/reset_service.rs index 17240e704..f56a1e013 100644 --- a/src/domain/core/src/services/reset_service.rs +++ b/src/domain/core/src/services/reset_service.rs @@ -7,6 +7,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::sync::Arc; + use internal_error::InternalError; use opendatafabric::*; use thiserror::Error; @@ -14,11 +16,13 @@ use thiserror::Error; use crate::entities::SetRefError; use crate::*; +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + #[async_trait::async_trait] pub trait ResetService: Send + Sync { async fn reset_dataset( &self, - dataset_handle: &DatasetHandle, + dataset: Arc, block_hash: Option<&Multihash>, old_head_maybe: Option<&Multihash>, ) -> Result; diff --git a/src/domain/task-system/services/src/task_logical_plan_runner_impl.rs b/src/domain/task-system/services/src/task_logical_plan_runner_impl.rs index 0c30b2870..42d78ee09 100644 --- a/src/domain/task-system/services/src/task_logical_plan_runner_impl.rs +++ b/src/domain/task-system/services/src/task_logical_plan_runner_impl.rs @@ -107,6 +107,7 @@ impl TaskLogicalPlanRunnerImpl { async fn run_reset(&self, args: &ResetDataset) -> Result { let reset_svc = self.catalog.get_one::().int_err()?; let dataset_repo = self.catalog.get_one::().int_err()?; + let dataset_handle = dataset_repo .resolve_dataset_ref(&args.dataset_id.as_local_ref()) .await @@ -114,7 +115,7 @@ impl TaskLogicalPlanRunnerImpl { let reset_result_maybe = reset_svc .reset_dataset( - &dataset_handle, + dataset_repo.get_dataset_by_handle(&dataset_handle), args.new_head_hash.as_ref(), args.old_head_hash.as_ref(), ) diff --git a/src/infra/core/src/reset_service_impl.rs b/src/infra/core/src/reset_service_impl.rs index 8c9ca3b29..0c9697498 100644 --- a/src/infra/core/src/reset_service_impl.rs +++ b/src/infra/core/src/reset_service_impl.rs @@ -16,28 +16,18 @@ use opendatafabric::*; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -pub struct ResetServiceImpl { - dataset_repo: Arc, -} - #[component(pub)] #[interface(dyn ResetService)] -impl ResetServiceImpl { - pub fn new(dataset_repo: Arc) -> Self { - Self { dataset_repo } - } -} +pub struct ResetServiceImpl {} #[async_trait::async_trait] impl ResetService for ResetServiceImpl { async fn reset_dataset( &self, - dataset_handle: &DatasetHandle, + dataset: Arc, new_head_maybe: Option<&Multihash>, old_head_maybe: Option<&Multihash>, ) -> Result { - let dataset = self.dataset_repo.get_dataset_by_handle(dataset_handle); - let new_head = if let Some(new_head) = new_head_maybe { new_head } else { diff --git a/src/infra/core/src/use_cases/reset_dataset_use_case_impl.rs b/src/infra/core/src/use_cases/reset_dataset_use_case_impl.rs index 9b2356f6a..17816aec3 100644 --- a/src/infra/core/src/use_cases/reset_dataset_use_case_impl.rs +++ b/src/infra/core/src/use_cases/reset_dataset_use_case_impl.rs @@ -11,7 +11,7 @@ use std::sync::Arc; use dill::{component, interface}; use kamu_core::auth::{DatasetAction, DatasetActionAuthorizer}; -use kamu_core::{ResetDatasetUseCase, ResetError, ResetService}; +use kamu_core::{DatasetRegistry, ResetDatasetUseCase, ResetError, ResetService}; use opendatafabric::{DatasetHandle, Multihash}; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -20,16 +20,19 @@ use opendatafabric::{DatasetHandle, Multihash}; #[interface(dyn ResetDatasetUseCase)] pub struct ResetDatasetUseCaseImpl { reset_service: Arc, + dataset_registry: Arc, dataset_action_authorizer: Arc, } impl ResetDatasetUseCaseImpl { pub fn new( reset_service: Arc, + dataset_registry: Arc, dataset_action_authorizer: Arc, ) -> Self { Self { reset_service, + dataset_registry, dataset_action_authorizer, } } @@ -48,9 +51,12 @@ impl ResetDatasetUseCase for ResetDatasetUseCaseImpl { .check_action_allowed(dataset_handle, DatasetAction::Write) .await?; + // Resolve dataset + let dataset = self.dataset_registry.get_dataset_by_handle(dataset_handle); + // Actual action self.reset_service - .reset_dataset(dataset_handle, maybe_new_head, maybe_old_head) + .reset_dataset(dataset, maybe_new_head, maybe_old_head) .await } } diff --git a/src/infra/core/tests/tests/test_reset_service_impl.rs b/src/infra/core/tests/tests/test_reset_service_impl.rs index 072e07f2f..2d67e4cbf 100644 --- a/src/infra/core/tests/tests/test_reset_service_impl.rs +++ b/src/infra/core/tests/tests/test_reset_service_impl.rs @@ -30,7 +30,6 @@ async fn test_reset_dataset_with_2revisions_drop_last() { assert_eq!(test_case.hash_polling_source_block, current_head); let result = harness - .reset_svc .reset_dataset( &test_case.dataset_handle, Some(&test_case.hash_seed_block), @@ -57,7 +56,6 @@ async fn test_reset_dataset_with_2revisions_without_changes() { assert_eq!(test_case.hash_polling_source_block, current_head); let result = harness - .reset_svc .reset_dataset( &test_case.dataset_handle, Some(&test_case.hash_polling_source_block), @@ -84,7 +82,6 @@ async fn test_reset_dataset_to_non_existing_block_fails() { Multihash::from_multibase("zW1a3CNT52HXiJNniLkWMeev3CPRy9QiNRMWGyTrVNg4hY8").unwrap(); let result = harness - .reset_svc .reset_dataset( &test_case.dataset_handle, Some(&a_hash_not_present_in_chain), @@ -100,7 +97,6 @@ async fn test_reset_dataset_with_wrong_head() { let test_case = harness.a_chain_with_2_blocks().await; let result = harness - .reset_svc .reset_dataset( &test_case.dataset_handle, Some(&test_case.hash_seed_block), @@ -119,7 +115,6 @@ async fn test_reset_dataset_with_default_seed_block() { assert_eq!(test_case.hash_polling_source_block, current_head); let result = harness - .reset_svc .reset_dataset( &test_case.dataset_handle, None, @@ -228,6 +223,18 @@ impl ResetTestHarness { ChainWith2BlocksTestCase::new(dataset_handle, hash_seed_block, hash_polling_source_block) } + async fn reset_dataset( + &self, + dataset_handle: &DatasetHandle, + block_hash: Option<&Multihash>, + old_head_maybe: Option<&Multihash>, + ) -> Result { + let dataset = self.resolve_dataset(dataset_handle); + self.reset_svc + .reset_dataset(dataset, block_hash, old_head_maybe) + .await + } + async fn get_dataset_head(&self, dataset_handle: &DatasetHandle) -> Multihash { let dataset = self.resolve_dataset(dataset_handle); dataset