Skip to content

Commit

Permalink
No more DatasetRepository in RemoteAliasesRegistry, `DatasetChang…
Browse files Browse the repository at this point in the history
…esService`, `DatasetOwnershipService`, `ProvenanceService`
  • Loading branch information
zaychenko-sergei committed Oct 11, 2024
1 parent 62d5ce9 commit c1af778
Show file tree
Hide file tree
Showing 23 changed files with 199 additions and 186 deletions.
8 changes: 4 additions & 4 deletions src/adapter/graphql/src/mutations/dataset_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

use chrono::{DateTime, Utc};
use domain::{DeleteDatasetError, RenameDatasetError};
use kamu_core::{self as domain};
use kamu_core::{self as domain, SetWatermarkUseCase};
use opendatafabric as odf;

use super::{DatasetEnvVarsMut, DatasetFlowsMut, DatasetMetadataMut};
Expand Down Expand Up @@ -124,9 +124,9 @@ impl DatasetMut {
ctx: &Context<'_>,
watermark: DateTime<Utc>,
) -> Result<SetWatermarkResult> {
let pull_svc = from_catalog::<dyn domain::PullService>(ctx).unwrap();
match pull_svc
.set_watermark(&self.dataset_handle, watermark)
let set_watermark_use_case = from_catalog::<dyn SetWatermarkUseCase>(ctx).unwrap();
match set_watermark_use_case
.execute(&self.dataset_handle, watermark)
.await
{
Ok(domain::PullResult::UpToDate(_)) => {
Expand Down
1 change: 1 addition & 0 deletions src/app/cli/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ pub fn configure_base_catalog(
b.add::<RenameDatasetUseCaseImpl>();
b.add::<ResetDatasetUseCaseImpl>();
b.add::<SetWatermarkUseCaseImpl>();
b.add::<VerifyDatasetUseCaseImpl>();

b.add::<kamu_accounts_services::LoginPasswordAuthProvider>();

Expand Down
1 change: 1 addition & 0 deletions src/app/cli/src/cli_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ pub fn get_command(
)),
cli::RepoSubCommand::Alias(sc) => match sc.subcommand {
cli::RepoAliasSubCommand::Add(ssc) => Box::new(AliasAddCommand::new(
cli_catalog.get_one()?,
cli_catalog.get_one()?,
cli_catalog.get_one()?,
ssc.dataset,
Expand Down
15 changes: 12 additions & 3 deletions src/app/cli/src/commands/alias_add_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@ use opendatafabric::*;
use super::{CLIError, Command};

pub struct AliasAddCommand {
dataset_registry: Arc<dyn DatasetRegistry>,
remote_repo_reg: Arc<dyn RemoteRepositoryRegistry>,
remote_alias_reg: Arc<dyn RemoteAliasesRegistry>,
dataset: DatasetRef,
dataset_ref: DatasetRef,
alias: DatasetRefRemote,
pull: bool,
push: bool,
}

impl AliasAddCommand {
pub fn new(
dataset_registry: Arc<dyn DatasetRegistry>,
remote_repo_reg: Arc<dyn RemoteRepositoryRegistry>,
remote_alias_reg: Arc<dyn RemoteAliasesRegistry>,
dataset: DatasetRef,
Expand All @@ -33,9 +35,10 @@ impl AliasAddCommand {
push: bool,
) -> Self {
Self {
dataset_registry,
remote_repo_reg,
remote_alias_reg,
dataset,
dataset_ref: dataset,
alias,
pull,
push,
Expand All @@ -58,9 +61,15 @@ impl Command for AliasAddCommand {
.map_err(CLIError::failure)?;
}

let dataset = self
.dataset_registry
.get_dataset_by_ref(&self.dataset_ref)
.await
.map_err(CLIError::failure)?;

let mut aliases = self
.remote_alias_reg
.get_remote_aliases(&self.dataset)
.get_remote_aliases(dataset)
.await
.map_err(CLIError::failure)?;

Expand Down
34 changes: 18 additions & 16 deletions src/app/cli/src/commands/alias_delete_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use super::{CLIError, Command};
pub struct AliasDeleteCommand {
dataset_registry: Arc<dyn DatasetRegistry>,
remote_alias_reg: Arc<dyn RemoteAliasesRegistry>,
dataset: Option<DatasetRef>,
alias: Option<DatasetRefRemote>,
maybe_dataset_ref: Option<DatasetRef>,
maybe_alias: Option<DatasetRefRemote>,
all: bool,
pull: bool,
push: bool,
Expand All @@ -29,27 +29,33 @@ impl AliasDeleteCommand {
pub fn new(
dataset_registry: Arc<dyn DatasetRegistry>,
remote_alias_reg: Arc<dyn RemoteAliasesRegistry>,
dataset: Option<DatasetRef>,
alias: Option<DatasetRefRemote>,
maybe_dataset_ref: Option<DatasetRef>,
maybe_alias: Option<DatasetRefRemote>,
all: bool,
pull: bool,
push: bool,
) -> Self {
Self {
dataset_registry,
remote_alias_reg,
dataset,
alias,
maybe_dataset_ref,
maybe_alias,
all,
pull,
push,
}
}

async fn delete_dataset_alias(&self) -> Result<usize, CLIError> {
let dataset = self
.dataset_registry
.get_dataset_by_ref(self.maybe_dataset_ref.as_ref().unwrap())
.await
.map_err(CLIError::failure)?;

let mut aliases = self
.remote_alias_reg
.get_remote_aliases(self.dataset.as_ref().unwrap())
.get_remote_aliases(dataset)
.await
.map_err(CLIError::failure)?;

Expand All @@ -58,7 +64,7 @@ impl AliasDeleteCommand {
if self.all {
count += aliases.clear(RemoteAliasKind::Pull).await?;
count += aliases.clear(RemoteAliasKind::Push).await?;
} else if let Some(alias) = &self.alias {
} else if let Some(alias) = &self.maybe_alias {
let both = !self.pull && !self.push;

if (self.pull || both) && aliases.delete(alias, RemoteAliasKind::Pull).await? {
Expand All @@ -78,13 +84,9 @@ impl AliasDeleteCommand {
let mut count = 0;

let mut stream = self.dataset_registry.get_all_datasets();
while let Some(dataset_handle) =
stream.next().await.transpose().map_err(CLIError::failure)?
{
let mut aliases = self
.remote_alias_reg
.get_remote_aliases(&dataset_handle.into_local_ref())
.await?;
while let Some(hdl) = stream.next().await.transpose().map_err(CLIError::failure)? {
let dataset = self.dataset_registry.get_dataset_by_handle(&hdl);
let mut aliases = self.remote_alias_reg.get_remote_aliases(dataset).await?;

// --all --push - clears all push aliases only
// --all --pull - clears all pull aliases only
Expand All @@ -104,7 +106,7 @@ impl AliasDeleteCommand {
#[async_trait::async_trait(?Send)]
impl Command for AliasDeleteCommand {
async fn run(&mut self) -> Result<(), CLIError> {
let count = if self.dataset.is_some() {
let count = if self.maybe_dataset_ref.is_some() {
self.delete_dataset_alias().await
} else if self.all {
self.delete_all_aliases().await
Expand Down
20 changes: 9 additions & 11 deletions src/app/cli/src/commands/alias_list_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,21 @@ pub struct AliasListCommand {
dataset_registry: Arc<dyn DatasetRegistry>,
remote_alias_reg: Arc<dyn RemoteAliasesRegistry>,
output_config: Arc<OutputConfig>,
dataset_ref: Option<DatasetRef>,
maybe_dataset_ref: Option<DatasetRef>,
}

impl AliasListCommand {
pub fn new(
dataset_registry: Arc<dyn DatasetRegistry>,
remote_alias_reg: Arc<dyn RemoteAliasesRegistry>,
output_config: Arc<OutputConfig>,
dataset_ref: Option<DatasetRef>,
maybe_dataset_ref: Option<DatasetRef>,
) -> Self {
Self {
dataset_registry,
remote_alias_reg,
output_config,
dataset_ref,
maybe_dataset_ref,
}
}

Expand Down Expand Up @@ -68,11 +68,9 @@ impl AliasListCommand {
let mut col_kind = Vec::new();
let mut col_alias = Vec::new();

for ds in datasets {
let aliases = self
.remote_alias_reg
.get_remote_aliases(&ds.as_local_ref())
.await?;
for hdl in datasets {
let dataset = self.dataset_registry.get_dataset_by_handle(hdl);
let aliases = self.remote_alias_reg.get_remote_aliases(dataset).await?;
let mut pull_aliases: Vec<_> = aliases
.get_by_kind(RemoteAliasKind::Pull)
.map(ToString::to_string)
Expand All @@ -86,13 +84,13 @@ impl AliasListCommand {
push_aliases.sort();

for alias in pull_aliases {
col_dataset.push(ds.alias.to_string());
col_dataset.push(hdl.alias.to_string());
col_kind.push("Pull");
col_alias.push(alias);
}

for alias in push_aliases {
col_dataset.push(ds.alias.to_string());
col_dataset.push(hdl.alias.to_string());
col_kind.push("Push");
col_alias.push(alias);
}
Expand All @@ -115,7 +113,7 @@ impl AliasListCommand {
#[async_trait::async_trait(?Send)]
impl Command for AliasListCommand {
async fn run(&mut self) -> Result<(), CLIError> {
let mut datasets: Vec<_> = if let Some(dataset_ref) = &self.dataset_ref {
let mut datasets: Vec<_> = if let Some(dataset_ref) = &self.maybe_dataset_ref {
let hdl = self
.dataset_registry
.resolve_dataset_ref(dataset_ref)
Expand Down
8 changes: 3 additions & 5 deletions src/app/cli/src/commands/complete_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,9 @@ impl CompleteCommand {
if let Some(registry) = self.dataset_registry.as_ref() {
if let Some(reg) = self.remote_alias_reg.as_ref() {
let mut datasets = registry.get_all_datasets();
while let Some(dataset_handle) = datasets.try_next().await.unwrap() {
let aliases = reg
.get_remote_aliases(&dataset_handle.as_local_ref())
.await
.unwrap();
while let Some(hdl) = datasets.try_next().await.unwrap() {
let dataset = registry.get_dataset_by_handle(&hdl);
let aliases = reg.get_remote_aliases(dataset).await.unwrap();
for alias in aliases.get_by_kind(RemoteAliasKind::Pull) {
if alias.to_string().starts_with(prefix) {
writeln!(output, "{alias}").unwrap();
Expand Down
3 changes: 2 additions & 1 deletion src/app/cli/src/commands/ingest_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,10 @@ impl IngestCommand {
&self,
dataset_handle: &DatasetHandle,
) -> Result<(), CLIError> {
let dataset = self.dataset_registry.get_dataset_by_handle(dataset_handle);
let aliases = self
.remote_alias_reg
.get_remote_aliases(&dataset_handle.as_local_ref())
.get_remote_aliases(dataset)
.await
.map_err(CLIError::failure)?;
let pull_aliases: Vec<_> = aliases
Expand Down
3 changes: 2 additions & 1 deletion src/app/cli/src/commands/list_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ impl ListCommand {
handle: &DatasetHandle,
summary: &DatasetSummary,
) -> Result<String, CLIError> {
let dataset = self.dataset_registry.get_dataset_by_handle(handle);
let is_remote = self
.remote_alias_reg
.get_remote_aliases(&handle.as_local_ref())
.get_remote_aliases(dataset)
.await?
.get_by_kind(RemoteAliasKind::Pull)
.next()
Expand Down
3 changes: 2 additions & 1 deletion src/app/cli/src/commands/verify_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,10 @@ impl VerifyCommand {
let mut missed_dependencies = vec![];

for hdl in dataset_handles {
let dataset = self.dataset_registry.get_dataset_by_handle(&hdl);
let is_remote = self
.remote_alias_reg
.get_remote_aliases(&hdl.as_local_ref())
.get_remote_aliases(dataset)
.await
.unwrap()
.get_by_kind(RemoteAliasKind::Pull)
Expand Down
1 change: 0 additions & 1 deletion src/app/cli/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ impl From<GetDatasetError> for CLIError {
impl From<GetAliasesError> for CLIError {
fn from(v: GetAliasesError) -> Self {
match v {
e @ GetAliasesError::DatasetNotFound(_) => Self::failure(e),
e @ GetAliasesError::Internal(_) => Self::critical(e),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/domain/core/src/services/pull_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub trait PullService: Send + Sync {
/// Manually advances the watermark of a root dataset
async fn set_watermark(
&self,
dataset_handle: &DatasetHandle,
dataset: Arc<dyn Dataset>,
watermark: DateTime<Utc>,
) -> Result<PullResult, SetWatermarkError>;
}
Expand Down
27 changes: 9 additions & 18 deletions src/domain/core/src/services/remote_aliases_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,27 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use async_trait::async_trait;
use std::sync::Arc;

use internal_error::InternalError;
use opendatafabric::*;
use thiserror::Error;

use crate::*;

#[async_trait]
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[async_trait::async_trait]
pub trait RemoteAliasesRegistry: Send + Sync {
async fn get_remote_aliases(
&self,
dataset_ref: &DatasetRef,
dataset: Arc<dyn Dataset>,
) -> Result<Box<dyn RemoteAliases>, GetAliasesError>;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[derive(Error, Debug)]
pub enum GetAliasesError {
#[error(transparent)]
DatasetNotFound(
#[from]
#[backtrace]
DatasetNotFoundError,
),
#[error(transparent)]
Internal(
#[from]
Expand All @@ -38,11 +36,4 @@ pub enum GetAliasesError {
),
}

impl From<GetDatasetError> for GetAliasesError {
fn from(v: GetDatasetError) -> Self {
match v {
GetDatasetError::NotFound(e) => Self::DatasetNotFound(e),
GetDatasetError::Internal(e) => Self::Internal(e),
}
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
11 changes: 5 additions & 6 deletions src/infra/core/src/dataset_changes_service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ use kamu_core::{
Dataset,
DatasetChangesService,
DatasetIntervalIncrement,
DatasetRepository,
DatasetRepositoryExt,
DatasetRegistry,
GetDatasetError,
GetIncrementError,
GetRefError,
Expand All @@ -30,23 +29,23 @@ use opendatafabric::{DataSlice, DatasetID, MetadataEvent, Multihash};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

pub struct DatasetChangesServiceImpl {
dataset_repo: Arc<dyn DatasetRepository>,
dataset_registry: Arc<dyn DatasetRegistry>,
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[component(pub)]
#[interface(dyn DatasetChangesService)]
impl DatasetChangesServiceImpl {
pub fn new(dataset_repo: Arc<dyn DatasetRepository>) -> Self {
Self { dataset_repo }
pub fn new(dataset_registry: Arc<dyn DatasetRegistry>) -> Self {
Self { dataset_registry }
}

async fn resolve_dataset_by_id(
&self,
dataset_id: &DatasetID,
) -> Result<Arc<dyn Dataset>, GetIncrementError> {
self.dataset_repo
self.dataset_registry
.get_dataset_by_ref(&dataset_id.as_local_ref())
.await
.map_err(|e| match e {
Expand Down
Loading

0 comments on commit c1af778

Please sign in to comment.