Skip to content

Commit

Permalink
Decoupled ResetService from DatasetRepository
Browse files Browse the repository at this point in the history
  • Loading branch information
zaychenko-sergei committed Oct 11, 2024
1 parent 9cf7a83 commit 5815c41
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 21 deletions.
6 changes: 5 additions & 1 deletion src/domain/core/src/services/reset_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,22 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::sync::Arc;

use internal_error::InternalError;
use opendatafabric::*;
use thiserror::Error;

use crate::entities::SetRefError;
use crate::*;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[async_trait::async_trait]
pub trait ResetService: Send + Sync {
async fn reset_dataset(
&self,
dataset_handle: &DatasetHandle,
dataset: Arc<dyn Dataset>,
block_hash: Option<&Multihash>,
old_head_maybe: Option<&Multihash>,
) -> Result<Multihash, ResetError>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,15 @@ impl TaskLogicalPlanRunnerImpl {
async fn run_reset(&self, args: &ResetDataset) -> Result<TaskOutcome, InternalError> {
let reset_svc = self.catalog.get_one::<dyn ResetService>().int_err()?;
let dataset_repo = self.catalog.get_one::<dyn DatasetRepository>().int_err()?;

let dataset_handle = dataset_repo
.resolve_dataset_ref(&args.dataset_id.as_local_ref())
.await
.int_err()?;

let reset_result_maybe = reset_svc
.reset_dataset(
&dataset_handle,
dataset_repo.get_dataset_by_handle(&dataset_handle),
args.new_head_hash.as_ref(),
args.old_head_hash.as_ref(),
)
Expand Down
14 changes: 2 additions & 12 deletions src/infra/core/src/reset_service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,18 @@ use opendatafabric::*;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

pub struct ResetServiceImpl {
dataset_repo: Arc<dyn DatasetRepository>,
}

#[component(pub)]
#[interface(dyn ResetService)]
impl ResetServiceImpl {
pub fn new(dataset_repo: Arc<dyn DatasetRepository>) -> Self {
Self { dataset_repo }
}
}
pub struct ResetServiceImpl {}

#[async_trait::async_trait]
impl ResetService for ResetServiceImpl {
async fn reset_dataset(
&self,
dataset_handle: &DatasetHandle,
dataset: Arc<dyn Dataset>,
new_head_maybe: Option<&Multihash>,
old_head_maybe: Option<&Multihash>,
) -> Result<Multihash, ResetError> {
let dataset = self.dataset_repo.get_dataset_by_handle(dataset_handle);

let new_head = if let Some(new_head) = new_head_maybe {
new_head
} else {
Expand Down
10 changes: 8 additions & 2 deletions src/infra/core/src/use_cases/reset_dataset_use_case_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::sync::Arc;

use dill::{component, interface};
use kamu_core::auth::{DatasetAction, DatasetActionAuthorizer};
use kamu_core::{ResetDatasetUseCase, ResetError, ResetService};
use kamu_core::{DatasetRegistry, ResetDatasetUseCase, ResetError, ResetService};
use opendatafabric::{DatasetHandle, Multihash};

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -20,16 +20,19 @@ use opendatafabric::{DatasetHandle, Multihash};
#[interface(dyn ResetDatasetUseCase)]
pub struct ResetDatasetUseCaseImpl {
reset_service: Arc<dyn ResetService>,
dataset_registry: Arc<dyn DatasetRegistry>,
dataset_action_authorizer: Arc<dyn DatasetActionAuthorizer>,
}

impl ResetDatasetUseCaseImpl {
pub fn new(
reset_service: Arc<dyn ResetService>,
dataset_registry: Arc<dyn DatasetRegistry>,
dataset_action_authorizer: Arc<dyn DatasetActionAuthorizer>,
) -> Self {
Self {
reset_service,
dataset_registry,
dataset_action_authorizer,
}
}
Expand All @@ -48,9 +51,12 @@ impl ResetDatasetUseCase for ResetDatasetUseCaseImpl {
.check_action_allowed(dataset_handle, DatasetAction::Write)
.await?;

// Resolve dataset
let dataset = self.dataset_registry.get_dataset_by_handle(dataset_handle);

// Actual action
self.reset_service
.reset_dataset(dataset_handle, maybe_new_head, maybe_old_head)
.reset_dataset(dataset, maybe_new_head, maybe_old_head)
.await
}
}
Expand Down
17 changes: 12 additions & 5 deletions src/infra/core/tests/tests/test_reset_service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ async fn test_reset_dataset_with_2revisions_drop_last() {
assert_eq!(test_case.hash_polling_source_block, current_head);

let result = harness
.reset_svc
.reset_dataset(
&test_case.dataset_handle,
Some(&test_case.hash_seed_block),
Expand All @@ -57,7 +56,6 @@ async fn test_reset_dataset_with_2revisions_without_changes() {
assert_eq!(test_case.hash_polling_source_block, current_head);

let result = harness
.reset_svc
.reset_dataset(
&test_case.dataset_handle,
Some(&test_case.hash_polling_source_block),
Expand All @@ -84,7 +82,6 @@ async fn test_reset_dataset_to_non_existing_block_fails() {
Multihash::from_multibase("zW1a3CNT52HXiJNniLkWMeev3CPRy9QiNRMWGyTrVNg4hY8").unwrap();

let result = harness
.reset_svc
.reset_dataset(
&test_case.dataset_handle,
Some(&a_hash_not_present_in_chain),
Expand All @@ -100,7 +97,6 @@ async fn test_reset_dataset_with_wrong_head() {
let test_case = harness.a_chain_with_2_blocks().await;

let result = harness
.reset_svc
.reset_dataset(
&test_case.dataset_handle,
Some(&test_case.hash_seed_block),
Expand All @@ -119,7 +115,6 @@ async fn test_reset_dataset_with_default_seed_block() {
assert_eq!(test_case.hash_polling_source_block, current_head);

let result = harness
.reset_svc
.reset_dataset(
&test_case.dataset_handle,
None,
Expand Down Expand Up @@ -228,6 +223,18 @@ impl ResetTestHarness {
ChainWith2BlocksTestCase::new(dataset_handle, hash_seed_block, hash_polling_source_block)
}

async fn reset_dataset(
&self,
dataset_handle: &DatasetHandle,
block_hash: Option<&Multihash>,
old_head_maybe: Option<&Multihash>,
) -> Result<Multihash, ResetError> {
let dataset = self.resolve_dataset(dataset_handle);
self.reset_svc
.reset_dataset(dataset, block_hash, old_head_maybe)
.await
}

async fn get_dataset_head(&self, dataset_handle: &DatasetHandle) -> Multihash {
let dataset = self.resolve_dataset(dataset_handle);
dataset
Expand Down

0 comments on commit 5815c41

Please sign in to comment.