Skip to content

Commit

Permalink
Add repo as a to param
Browse files Browse the repository at this point in the history
  • Loading branch information
rmn-boiko committed Sep 24, 2024
1 parent ab8451a commit 9c25603
Show file tree
Hide file tree
Showing 13 changed files with 358 additions and 77 deletions.
23 changes: 23 additions & 0 deletions src/adapter/graphql/src/queries/accounts/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use kamu_accounts::{
DEFAULT_ACCOUNT_ID,
DEFAULT_ACCOUNT_NAME,
};
use kamu_core::DatasetRepository;
use opendatafabric as odf;
use tokio::sync::OnceCell;

Expand Down Expand Up @@ -188,4 +189,26 @@ impl Account {
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub struct WorkspaceAccountInfo {
account: Account,
}

#[Object]
impl WorkspaceAccountInfo {
#[graphql(skip)]
pub(crate) fn new(account: Account) -> Self {
Self { account }
}

async fn account(&self) -> &Account {
&self.account
}

async fn is_multi_tenant(&self, ctx: &Context<'_>) -> bool {
let dataset_repo = from_catalog::<dyn DatasetRepository>(ctx).unwrap();
dataset_repo.is_multi_tenant()
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
5 changes: 3 additions & 2 deletions src/adapter/graphql/src/queries/accounts/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

use opendatafabric as odf;

use super::WorkspaceAccountInfo;
use crate::prelude::*;
use crate::queries::Account;

Expand Down Expand Up @@ -54,15 +55,15 @@ impl Accounts {
&self,
ctx: &Context<'_>,
access_token: String,
) -> Result<Option<Account>> {
) -> Result<WorkspaceAccountInfo> {
let authentication_service =
from_catalog::<dyn kamu_accounts::AuthenticationService>(ctx).unwrap();

let account = authentication_service
.account_by_token(access_token)
.await?;

Ok(Some(Account::from_account(account)))
Ok(WorkspaceAccountInfo::new(Account::from_account(account)))
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/app/cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use std::path::PathBuf;
use clap::{ArgAction, Parser};
use opendatafabric as odf;

use crate::cli_value_parser::{self as parsers, PushDatasetRef};
use crate::{
cli_value_parser as parsers,
LineageOutputFormat,
MetadataLogOutputFormat,
OutputFormat,
Expand Down Expand Up @@ -904,8 +904,8 @@ pub struct Push {
pub no_alias: bool,

/// Remote alias or a URL to push to
#[arg(long, value_name = "REM", value_parser = parsers::dataset_ref_remote)]
pub to: Option<odf::DatasetRefRemote>,
#[arg(long, value_name = "REM", value_parser = parsers::push_dataset_ref_remote)]
pub to: Option<PushDatasetRef>,

/// Overwrite remote version with local, even if revisions have diverged
#[arg(long, short = 'f')]
Expand Down
40 changes: 39 additions & 1 deletion src/app/cli/src/cli_value_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

use std::str::FromStr;

use opendatafabric as odf;
use opendatafabric::{self as odf, RepoName};
use url::Url;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -74,6 +74,12 @@ pub(crate) fn dataset_ref_remote(s: &str) -> Result<odf::DatasetRefRemote, Strin

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

pub(crate) fn push_dataset_ref_remote(s: &str) -> Result<PushDatasetRef, String> {
PushDatasetRef::from_str(s)
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

pub(crate) fn repo_name(s: &str) -> Result<odf::RepoName, String> {
match odf::RepoName::try_from(s) {
Ok(v) => Ok(v),
Expand Down Expand Up @@ -169,3 +175,35 @@ impl From<DatasetVisibility> for kamu::domain::DatasetVisibility {
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[derive(Debug, Clone)]
pub enum PushDatasetRef {
RefRemote(odf::DatasetRefRemote),
Repository(odf::RepoName),
}

impl PushDatasetRef {
fn from_str(s: &str) -> Result<Self, String> {
if let Ok(dataset_ref) = dataset_ref_remote(s) {
return Ok(Self::RefRemote(dataset_ref));
}
if let Ok(repo_name) = repo_name(s) {
return Ok(Self::Repository(repo_name));
}
Err(
"Remote reference should be in form: `did:odf:...` or `repository/account/dataset-id` \
or `scheme://some-url`
or RepositoryID can only contain alphanumerics, dashes, and dots"
.to_string(),
)
}

pub fn into_repo_name(self) -> Option<RepoName> {
match self {
Self::RefRemote(_) => None,
Self::Repository(repo_name) => Some(repo_name),
}
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
2 changes: 1 addition & 1 deletion src/app/cli/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ pub trait Command {
}

/// Will be called before running to perform various argument sanity checks
async fn validate_args(&mut self) -> Result<(), CLIError> {
async fn validate_args(&self) -> Result<(), CLIError> {
Ok(())
}

Expand Down
99 changes: 54 additions & 45 deletions src/app/cli/src/commands/push_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ use console::style as s;
use futures::TryStreamExt;
use kamu::domain::*;
use kamu::utils::datasets_filtering::filter_datasets_by_any_pattern;
use kamu::UrlExt;
use kamu_accounts::CurrentAccountSubject;
use opendatafabric::*;

use super::{BatchError, CLIError, Command};
use crate::cli_value_parser::PushDatasetRef;
use crate::odf_server;
use crate::output::OutputConfig;

Expand All @@ -39,7 +39,7 @@ pub struct PushCommand {
recursive: bool,
add_aliases: bool,
force: bool,
to: Option<DatasetRefRemote>,
to: Option<PushDatasetRef>,
dataset_visibility: DatasetVisibility,
output_config: Arc<OutputConfig>,
}
Expand All @@ -58,7 +58,7 @@ impl PushCommand {
recursive: bool,
add_aliases: bool,
force: bool,
to: Option<DatasetRefRemote>,
to: Option<PushDatasetRef>,
dataset_visibility: DatasetVisibility,
output_config: Arc<OutputConfig>,
) -> Self
Expand All @@ -84,6 +84,45 @@ impl PushCommand {
}
}

async fn get_remote_repo_opts(
&self,
repo_name_maybe: Option<RepoName>,
) -> Result<Option<PushRemoteRepoOpts>, CLIError> {
let repo_name = if repo_name_maybe.is_some() {
repo_name_maybe
} else {
let remote_repo_names: Vec<_> = self.remote_repo_reg.get_all_repositories().collect();
remote_repo_names.first().cloned()
};
if let Some(remote_repo_name) = repo_name {
let remote_repo = self
.remote_repo_reg
.get_repository(&remote_repo_name)
.map_err(CLIError::failure)?;
let mut result = PushRemoteRepoOpts {
remote_account_name: None,
remote_repo_url: remote_repo.url,
};

if let Some(access_token_info) = self
.access_token_reg_svc
.find_by_backend_url(&result.remote_repo_url)
&& let Some(account_name) = self
.login_svc
.get_remote_account_name_by_access_token(
&result.remote_repo_url,
access_token_info.access_token.access_token.as_str(),
)
.await
.map_err(CLIError::failure)?
{
result.remote_account_name = Some(account_name);
}
return Ok(Some(result));
}
Ok(None)
}

async fn do_push(
&self,
listener: Option<Arc<dyn SyncMultiListener>>,
Expand All @@ -97,7 +136,7 @@ impl PushCommand {
CurrentAccountSubject::Logged(l) => &l.account_name,
};

if let Some(remote_ref) = &self.to {
if let Some(_remote_ref @ PushDatasetRef::RefRemote(dataset_ref_remote)) = &self.to {
let local_ref = match self.refs[0].as_dataset_ref_any() {
Some(dataset_ref_any) => dataset_ref_any
.as_local_ref(|_| !self.dataset_repo.is_multi_tenant())
Expand All @@ -118,13 +157,14 @@ impl PushCommand {
.push_multi_ext(
vec![PushRequest {
local_ref: Some(local_ref),
remote_ref: Some(remote_ref.clone()),
remote_ref: Some(dataset_ref_remote.clone()),
}],
PushMultiOptions {
all: self.all,
recursive: self.recursive,
add_aliases: self.add_aliases,
sync_options: self.sync_options(),
remote_repo_opts: None,
},
listener,
)
Expand All @@ -139,6 +179,13 @@ impl PushCommand {
.try_collect()
.await?;

let repo_name = if let Some(push_dataset_ref) = &self.to {
push_dataset_ref.clone().into_repo_name()
} else {
None
};
let remote_repo_opts = self.get_remote_repo_opts(repo_name).await?;

Ok(self
.push_svc
.push_multi(
Expand All @@ -148,6 +195,7 @@ impl PushCommand {
recursive: self.recursive,
add_aliases: self.add_aliases,
sync_options: self.sync_options(),
remote_repo_opts,
},
listener,
)
Expand All @@ -172,7 +220,7 @@ impl PushCommand {

#[async_trait::async_trait(?Send)]
impl Command for PushCommand {
async fn validate_args(&mut self) -> Result<(), CLIError> {
async fn validate_args(&self) -> Result<(), CLIError> {
if self.refs.is_empty() && !self.all {
return Err(CLIError::usage_error("Specify a dataset or pass --all"));
}
Expand All @@ -183,45 +231,6 @@ impl Command for PushCommand {
));
}

if self.refs.len() == 1
&& self.to.is_none()
&& let Some(dataset_ref_any) = self.refs[0].as_dataset_ref_any()
// ToDo check remote
&& let Ok(dataset_ref) = dataset_ref_any.as_local_ref(|_| false)
&& let Some(dataset_name) = dataset_ref.dataset_name()
{
let remote_repo_names: Vec<_> = self.remote_repo_reg.get_all_repositories().collect();
if let Some(remote_repo_name) = remote_repo_names.first() {
let remote_repo = self
.remote_repo_reg
.get_repository(remote_repo_name)
.map_err(CLIError::failure)?;
let remote_repo_url = remote_repo.url;
if let Some(access_token_info) = self
.access_token_reg_svc
.find_by_backend_url(&remote_repo_url)
{
let account_name = self
.login_svc
.get_account_name_by_access_token(
&remote_repo_url,
access_token_info.access_token.access_token.as_str(),
)
.await
.map_err(CLIError::failure)?;
println!("account_name: {:?}", account_name);
println!("account_name: {:?}", dataset_name);
println!("remote_repo_url: {:?}", remote_repo_url);
let mut odf_url = remote_repo_url
.as_odf_protoocol()
.map_err(CLIError::failure)?;
odf_url.set_path(format!("{}/{dataset_name}", account_name).as_str());
self.to = Some(DatasetRefRemote::from(&odf_url));
println!("self.to: {:?}", self.to);
}
}
}

Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ impl AccessTokenRegistryService {
frontend_url: server_record.frontend_url.clone(),
access_token: ac.clone(),
});
} else {
continue;
}
}
None
Expand All @@ -113,8 +111,6 @@ impl AccessTokenRegistryService {
frontend_url: server_record.frontend_url.clone(),
access_token: ac.clone(),
});
} else {
continue;
}
}
None
Expand All @@ -140,8 +136,6 @@ impl AccessTokenRegistryService {
access_token: ac.clone(),
}
});
} else {
continue;
}
}
None
Expand Down
Loading

0 comments on commit 9c25603

Please sign in to comment.