diff --git a/CHANGELOG.md b/CHANGELOG.md index 4fd5d8503..0c54749a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,11 @@ Recommendation: for ease of reading, use the following order: - Fixed --> +## [Unreleased] +### Changed +- `kamu push ` command now can be called without `--to` reference and Alias or Remote dataset repository will be used as destination +- `kamu login` command now will store repository to Repository registry. Name can be provided with `--repo-name` flag and to skip creating repo can be used `--skip-add-repo` flag + ## [0.204.5] - 2024-10-08 ### Added - Postgres implementation for dataset entry and account Re-BAC repositories diff --git a/resources/cli-reference.md b/resources/cli-reference.md index c7af457a6..0355784ae 100644 --- a/resources/cli-reference.md +++ b/resources/cli-reference.md @@ -526,6 +526,8 @@ Authentiates with a remote ODF server interactively * `--user` — Store access token in the user home folder rather than in the workspace * `--check` — Check whether existing authorization is still valid without triggering a login flow * `--access-token ` — Provide an existing access token +* `--repo-name ` — Repository name which will be used to store in repositories list +* `--skip-add-repo` — Don't automatically add a remote repository for this host diff --git a/src/adapter/http/src/data/query_handler.rs b/src/adapter/http/src/data/query_handler.rs index 4e1ca03e8..e116e1a44 100644 --- a/src/adapter/http/src/data/query_handler.rs +++ b/src/adapter/http/src/data/query_handler.rs @@ -143,7 +143,7 @@ pub(crate) async fn query_handler_post_v2( }, }) } else { - Err(ApiError::not_implemented(ResponseSigningNotCongigured))? + Err(ApiError::not_implemented(ResponseSigningNotConfigured))? }; Ok(Json(response)) @@ -223,4 +223,4 @@ pub async fn query_handler( #[derive(Debug, thiserror::Error)] #[error("Response signing is not enabled by the node operator")] -struct ResponseSigningNotCongigured; +struct ResponseSigningNotConfigured; diff --git a/src/adapter/http/src/general/account_handler.rs b/src/adapter/http/src/general/account_handler.rs new file mode 100644 index 000000000..3251e88dc --- /dev/null +++ b/src/adapter/http/src/general/account_handler.rs @@ -0,0 +1,71 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use axum::extract::Extension; +use axum::response::Json; +use database_common_macros::transactional_handler; +use dill::Catalog; +use http_common::*; +use kamu_accounts::{Account, AuthenticationService, CurrentAccountSubject}; +use opendatafabric::{AccountID, AccountName}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug, serde::Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AccountResponse { + pub id: AccountID, + pub account_name: AccountName, +} + +impl From for AccountResponse { + fn from(value: Account) -> Self { + Self { + id: value.id, + account_name: value.account_name, + } + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[transactional_handler] +pub async fn account_handler( + Extension(catalog): Extension, +) -> Result, ApiError> { + let response = get_account(&catalog).await?; + tracing::debug!(?response, "Get account info response"); + Ok(response) +} + +async fn get_account(catalog: &Catalog) -> Result, ApiError> { + let current_account_subject = catalog.get_one::().unwrap(); + match current_account_subject.as_ref() { + CurrentAccountSubject::Anonymous(_) => Err(ApiError::new_unauthorized()), + CurrentAccountSubject::Logged(account) => { + let auth_service = catalog.get_one::().unwrap(); + let full_account_info = auth_service + .account_by_id(&account.account_id) + .await? + .unwrap(); + Ok(Json(full_account_info.into())) + } + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/adapter/http/src/general/dataset_info_handler.rs b/src/adapter/http/src/general/dataset_info_handler.rs new file mode 100644 index 000000000..1b2054980 --- /dev/null +++ b/src/adapter/http/src/general/dataset_info_handler.rs @@ -0,0 +1,79 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use axum::extract::{Extension, Path}; +use axum::response::Json; +use database_common_macros::transactional_handler; +use dill::Catalog; +use http_common::*; +use kamu_core::{DatasetRepository, GetDatasetError}; +use opendatafabric::{AccountName, DatasetHandle, DatasetID, DatasetName}; + +use crate::axum_utils::ensure_authenticated_account; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug, serde::Serialize)] +#[serde(rename_all = "camelCase")] +pub struct DatasetInfoResponse { + pub id: DatasetID, + pub account_name: Option, + pub dataset_name: DatasetName, +} + +impl From for DatasetInfoResponse { + fn from(value: DatasetHandle) -> Self { + Self { + id: value.id, + account_name: value.alias.account_name, + dataset_name: value.alias.dataset_name, + } + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[transactional_handler] +pub async fn dataset_info_handler( + Extension(catalog): Extension, + Path(dataset_id): Path, +) -> Result, ApiError> { + let response = get_dataset_by_id(&catalog, &dataset_id).await?; + tracing::debug!(?response, "Get dataset by id info response"); + Ok(response) +} + +async fn get_dataset_by_id( + catalog: &Catalog, + dataset_id: &DatasetID, +) -> Result, ApiError> { + ensure_authenticated_account(catalog).api_err()?; + + let dataset_repo = catalog.get_one::().unwrap(); + let dataset_handle = dataset_repo + .resolve_dataset_ref(&dataset_id.clone().as_local_ref()) + .await + .map_err(|err| match err { + GetDatasetError::NotFound(e) => ApiError::not_found(e), + GetDatasetError::Internal(e) => e.api_err(), + })?; + + Ok(Json(dataset_handle.into())) +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/adapter/http/src/general/mod.rs b/src/adapter/http/src/general/mod.rs new file mode 100644 index 000000000..60f92f82e --- /dev/null +++ b/src/adapter/http/src/general/mod.rs @@ -0,0 +1,15 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +mod account_handler; +mod dataset_info_handler; +mod node_info_handler; +mod router; + +pub use router::*; diff --git a/src/adapter/http/src/general/node_info_handler.rs b/src/adapter/http/src/general/node_info_handler.rs new file mode 100644 index 000000000..f938e2714 --- /dev/null +++ b/src/adapter/http/src/general/node_info_handler.rs @@ -0,0 +1,51 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use axum::extract::Extension; +use axum::response::Json; +use dill::Catalog; +use http_common::*; +use kamu_core::DatasetRepository; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug, serde::Serialize)] +#[serde(rename_all = "camelCase")] +pub struct NodeInfoResponse { + pub is_multi_tenant: bool, +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub async fn node_info_handler( + Extension(catalog): Extension, +) -> Result, ApiError> { + let response = get_node_info(&catalog); + tracing::debug!(?response, "Get node info response"); + Ok(response) +} + +fn get_node_info(catalog: &Catalog) -> Json { + let dataset_repo = catalog.get_one::().unwrap(); + + Json(NodeInfoResponse { + is_multi_tenant: dataset_repo.is_multi_tenant(), + }) +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/adapter/http/src/general/router.rs b/src/adapter/http/src/general/router.rs new file mode 100644 index 000000000..1dac53d9b --- /dev/null +++ b/src/adapter/http/src/general/router.rs @@ -0,0 +1,37 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub fn root_router() -> axum::Router { + axum::Router::new() + .route( + "/info", + axum::routing::get(super::node_info_handler::node_info_handler), + ) + .route( + "/accounts/me", + axum::routing::get(super::account_handler::account_handler), + ) + .route( + "/datasets/:id", + axum::routing::get(super::dataset_info_handler::dataset_info_handler), + ) +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/adapter/http/src/lib.rs b/src/adapter/http/src/lib.rs index 5be8164c4..0a138e055 100644 --- a/src/adapter/http/src/lib.rs +++ b/src/adapter/http/src/lib.rs @@ -24,6 +24,7 @@ pub mod smart_protocol; mod upload; mod ws_common; pub use upload::*; +pub mod general; pub type SmartTransferProtocolClientWs = smart_protocol::ws_tungstenite_client::WsSmartTransferProtocolClient; diff --git a/src/adapter/http/tests/harness/client_side_harness.rs b/src/adapter/http/tests/harness/client_side_harness.rs index d25df6456..01b44ce41 100644 --- a/src/adapter/http/tests/harness/client_side_harness.rs +++ b/src/adapter/http/tests/harness/client_side_harness.rs @@ -8,6 +8,7 @@ // by the Apache License, Version 2.0. use std::path::PathBuf; +use std::str::FromStr; use std::sync::Arc; use auth::OdfServerAccessTokenResolver; @@ -15,6 +16,7 @@ use container_runtime::ContainerRuntime; use database_common::NoOpDatabasePlugin; use dill::Component; use headers::Header; +use internal_error::{InternalError, ResultIntoInternal}; use kamu::domain::*; use kamu::*; use kamu_accounts::CurrentAccountSubject; @@ -25,9 +27,10 @@ use opendatafabric::{ AccountID, AccountName, DatasetID, + DatasetPushTarget, DatasetRef, DatasetRefAny, - DatasetRefRemote, + RepoName, }; use tempfile::TempDir; use time_source::SystemTimeSourceDefault; @@ -106,6 +109,7 @@ impl ClientSideHarness { b.add::(); b.add::(); + b.add::(); b.add::(); @@ -239,16 +243,13 @@ impl ClientSideHarness { pub async fn push_dataset( &self, dataset_local_ref: DatasetRef, - dataset_remote_ref: DatasetRefRemote, + dataset_remote_ref: DatasetPushTarget, force: bool, dataset_visibility: DatasetVisibility, ) -> Vec { self.push_service - .push_multi_ext( - vec![PushRequest { - local_ref: Some(dataset_local_ref), - remote_ref: Some(dataset_remote_ref), - }], + .push_multi( + vec![dataset_local_ref], PushMultiOptions { sync_options: SyncOptions { create_if_not_exists: true, @@ -256,6 +257,7 @@ impl ClientSideHarness { dataset_visibility, ..SyncOptions::default() }, + remote_target: Some(dataset_remote_ref), ..PushMultiOptions::default() }, None, @@ -266,7 +268,7 @@ impl ClientSideHarness { pub async fn push_dataset_result( &self, dataset_local_ref: DatasetRef, - dataset_remote_ref: DatasetRefRemote, + dataset_remote_ref: DatasetPushTarget, force: bool, dataset_visibility: DatasetVisibility, ) -> SyncResult { @@ -288,6 +290,24 @@ impl ClientSideHarness { } } + pub fn add_repository( + &self, + repo_name: &RepoName, + base_url: &str, + ) -> Result<(), InternalError> { + let remote_repo_reg = self + .catalog + .get_one::() + .unwrap(); + + remote_repo_reg + .add_repository( + repo_name, + Url::from_str(&format!("http://{base_url}")).unwrap(), + ) + .int_err() + } + pub fn internal_datasets_folder_path(&self) -> PathBuf { self.tempdir.path().join("datasets") } diff --git a/src/adapter/http/tests/harness/server_side_harness.rs b/src/adapter/http/tests/harness/server_side_harness.rs index 8c2c91b60..c4d43a65a 100644 --- a/src/adapter/http/tests/harness/server_side_harness.rs +++ b/src/adapter/http/tests/harness/server_side_harness.rs @@ -62,6 +62,10 @@ pub(crate) trait ServerSideHarness { self.dataset_url_with_scheme(dataset_alias, "odf+http") } + fn api_server_addr(&self) -> String; + + fn api_server_account(&self) -> Account; + fn system_time_source(&self) -> &SystemTimeSourceStub; async fn api_server_run(self) -> Result<(), InternalError>; @@ -77,22 +81,23 @@ pub(crate) struct ServerSideHarnessOptions { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -pub(crate) fn server_authentication_mock() -> MockAuthenticationService { - MockAuthenticationService::resolving_token( - kamu_accounts::DUMMY_ACCESS_TOKEN, - Account { - id: AccountID::new_seeded_ed25519(SERVER_ACCOUNT_NAME.as_bytes()), - account_name: AccountName::new_unchecked(SERVER_ACCOUNT_NAME), - account_type: AccountType::User, - display_name: SERVER_ACCOUNT_NAME.to_string(), - email: None, - avatar_url: None, - registered_at: Utc::now(), - is_admin: false, - provider: String::from(PROVIDER_PASSWORD), - provider_identity_key: String::from(SERVER_ACCOUNT_NAME), - }, - ) +pub(crate) fn server_authentication_mock(account: &Account) -> MockAuthenticationService { + MockAuthenticationService::resolving_token(kamu_accounts::DUMMY_ACCESS_TOKEN, account.clone()) +} + +pub(crate) fn get_server_account() -> Account { + Account { + id: AccountID::new_seeded_ed25519(SERVER_ACCOUNT_NAME.as_bytes()), + account_name: AccountName::new_unchecked(SERVER_ACCOUNT_NAME), + account_type: AccountType::User, + display_name: SERVER_ACCOUNT_NAME.to_string(), + email: None, + avatar_url: None, + registered_at: Utc::now(), + is_admin: false, + provider: String::from(PROVIDER_PASSWORD), + provider_identity_key: String::from(SERVER_ACCOUNT_NAME), + } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/adapter/http/tests/harness/server_side_local_fs_harness.rs b/src/adapter/http/tests/harness/server_side_local_fs_harness.rs index 9e55bc18c..997a56687 100644 --- a/src/adapter/http/tests/harness/server_side_local_fs_harness.rs +++ b/src/adapter/http/tests/harness/server_side_local_fs_harness.rs @@ -36,9 +36,10 @@ use kamu::{ DependencyGraphServiceInMemory, ObjectStoreBuilderLocalFs, ObjectStoreRegistryImpl, + RemoteRepositoryRegistryImpl, }; use kamu_accounts::testing::MockAuthenticationService; -use kamu_accounts::AuthenticationService; +use kamu_accounts::{Account, AuthenticationService}; use messaging_outbox::DummyOutboxImpl; use opendatafabric::{AccountName, DatasetAlias, DatasetHandle}; use tempfile::TempDir; @@ -48,6 +49,7 @@ use url::Url; use super::{ create_cli_user_catalog, create_web_user_catalog, + get_server_account, server_authentication_mock, ServerSideHarness, ServerSideHarnessOptions, @@ -64,6 +66,7 @@ pub(crate) struct ServerSideLocalFsHarness { api_server: TestAPIServer, options: ServerSideHarnessOptions, time_source: SystemTimeSourceStub, + account: Account, } impl ServerSideLocalFsHarness { @@ -79,6 +82,8 @@ impl ServerSideLocalFsHarness { let cache_dir = tempdir.path().join("cache"); std::fs::create_dir(&cache_dir).unwrap(); + let account = get_server_account(); + let time_source = SystemTimeSourceStub::new(); let (base_catalog, listener) = { let mut b = match &options.base_catalog { @@ -103,12 +108,13 @@ impl ServerSideLocalFsHarness { ) .bind::() .bind::() - .add_value(server_authentication_mock()) + .add_value(server_authentication_mock(&account)) .bind::() .add_value(ServerUrlConfig::new_test(Some(&base_url_rest))) .add::() .add::() .add::() + .add::() .add::() .add::() .add::() @@ -131,13 +137,10 @@ impl ServerSideLocalFsHarness { api_server, options, time_source, + account, } } - pub fn api_server_addr(&self) -> String { - self.api_server.local_addr().to_string() - } - fn internal_datasets_folder_path(&self) -> PathBuf { self.tempdir.path().join("datasets") } @@ -188,6 +191,14 @@ impl ServerSideHarness for ServerSideLocalFsHarness { cli_catalog.get_one::().unwrap() } + fn api_server_addr(&self) -> String { + self.api_server.local_addr().to_string() + } + + fn api_server_account(&self) -> Account { + self.account.clone() + } + fn dataset_url_with_scheme(&self, dataset_alias: &DatasetAlias, scheme: &str) -> Url { let api_server_address = self.api_server_addr(); Url::from_str( diff --git a/src/adapter/http/tests/harness/server_side_s3_harness.rs b/src/adapter/http/tests/harness/server_side_s3_harness.rs index e1a6db973..a2d5a1981 100644 --- a/src/adapter/http/tests/harness/server_side_s3_harness.rs +++ b/src/adapter/http/tests/harness/server_side_s3_harness.rs @@ -41,7 +41,7 @@ use kamu::{ ObjectStoreRegistryImpl, }; use kamu_accounts::testing::MockAuthenticationService; -use kamu_accounts::AuthenticationService; +use kamu_accounts::{Account, AuthenticationService}; use messaging_outbox::DummyOutboxImpl; use opendatafabric::{AccountName, DatasetAlias, DatasetHandle}; use time_source::{SystemTimeSource, SystemTimeSourceStub}; @@ -50,6 +50,7 @@ use url::Url; use super::{ create_cli_user_catalog, create_web_user_catalog, + get_server_account, server_authentication_mock, ServerSideHarness, ServerSideHarnessOptions, @@ -67,6 +68,7 @@ pub(crate) struct ServerSideS3Harness { options: ServerSideHarnessOptions, time_source: SystemTimeSourceStub, _temp_dir: tempfile::TempDir, + account: Account, } impl ServerSideS3Harness { @@ -80,6 +82,8 @@ impl ServerSideS3Harness { let time_source = SystemTimeSourceStub::new(); + let account = get_server_account(); + let (base_catalog, listener) = { let addr = SocketAddr::from(([127, 0, 0, 1], 0)); let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); @@ -95,11 +99,11 @@ impl ServerSideS3Harness { .add_builder( DatasetRepositoryS3::builder() .with_s3_context(s3_context.clone()) - .with_multi_tenant(false), + .with_multi_tenant(options.multi_tenant), ) .bind::() .bind::() - .add_value(server_authentication_mock()) + .add_value(server_authentication_mock(&account)) .bind::() .add_value(ServerUrlConfig::new_test(Some(&base_url_rest))) .add::() @@ -130,16 +134,13 @@ impl ServerSideS3Harness { options, time_source, _temp_dir: temp_dir, + account, } } pub fn internal_bucket_folder_path(&self) -> PathBuf { self.s3.tmp_dir.path().join(&self.s3.bucket) } - - fn api_server_addr(&self) -> String { - self.api_server.local_addr().to_string() - } } #[async_trait::async_trait] @@ -205,6 +206,14 @@ impl ServerSideHarness for ServerSideS3Harness { .unwrap() } + fn api_server_addr(&self) -> String { + self.api_server.local_addr().to_string() + } + + fn api_server_account(&self) -> Account { + self.account.clone() + } + fn dataset_layout(&self, dataset_handle: &DatasetHandle) -> DatasetLayout { DatasetLayout::new( self.internal_bucket_folder_path() diff --git a/src/adapter/http/tests/harness/test_api_server.rs b/src/adapter/http/tests/harness/test_api_server.rs index 24471fbff..9fd34bb00 100644 --- a/src/adapter/http/tests/harness/test_api_server.rs +++ b/src/adapter/http/tests/harness/test_api_server.rs @@ -38,6 +38,7 @@ impl TestAPIServer { axum::routing::post(kamu_adapter_http::platform_file_upload_post_handler), ) .nest("/", kamu_adapter_http::data::root_router()) + .nest("/", kamu_adapter_http::general::root_router()) .nest( if multi_tenant { "/:account_name/:dataset_name" diff --git a/src/adapter/http/tests/tests/mod.rs b/src/adapter/http/tests/tests/mod.rs index 81123bb4f..eabae3bea 100644 --- a/src/adapter/http/tests/tests/mod.rs +++ b/src/adapter/http/tests/tests/mod.rs @@ -7,10 +7,13 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +mod test_account_info; mod test_authentication_layer; mod test_data_ingest; mod test_data_query; mod test_dataset_authorization_layer; +mod test_dataset_info; +mod test_node_info; mod test_platform_login_validate; mod test_protocol_dataset_helpers; mod test_routing; @@ -100,7 +103,7 @@ macro_rules! test_client_server_s3_harness_permutations { paste::paste! { #[test_group::group(containerized)] #[test_log::test(tokio::test)] - async fn [<$test_name "_st_client_st_local_fs_server">] () { + async fn [<$test_name "_st_client_st_s3_server">] () { $test_package::$test_name( ClientSideHarness::new(ClientSideHarnessOptions { multi_tenant: false, authenticated_remotely: true }), ServerSideS3Harness::new(ServerSideHarnessOptions { @@ -116,7 +119,7 @@ macro_rules! test_client_server_s3_harness_permutations { paste::paste! { #[test_group::group(containerized)] #[test_log::test(tokio::test)] - async fn [<$test_name "_st_client_mt_local_fs_server">] () { + async fn [<$test_name "_st_client_mt_s3_server">] () { $test_package::$test_name( ClientSideHarness::new(ClientSideHarnessOptions { multi_tenant: false, authenticated_remotely: true }), ServerSideS3Harness::new(ServerSideHarnessOptions { diff --git a/src/adapter/http/tests/tests/test_account_info.rs b/src/adapter/http/tests/tests/test_account_info.rs new file mode 100644 index 000000000..db3b32b01 --- /dev/null +++ b/src/adapter/http/tests/tests/test_account_info.rs @@ -0,0 +1,99 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use kamu_accounts::DUMMY_ACCESS_TOKEN; +use kamu_core::RunInfoDir; +use serde_json::json; + +use crate::harness::*; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[test_log::test(tokio::test)] +async fn test_get_account_info_with_wrong_token() { + let harness = AccountInfoHarness::new(false).await; + + let client = async move { + let cl = reqwest::Client::new(); + + let res = cl + .get(&format!("{}accounts/me", harness.root_url)) + .send() + .await + .unwrap(); + assert_eq!(401, res.status()); + assert_eq!("Unauthorized", res.text().await.unwrap()); + }; + + await_client_server_flow!(harness.server_harness.api_server_run(), client); +} + +#[test_log::test(tokio::test)] +async fn test_get_account_info() { + let harness = AccountInfoHarness::new(false).await; + let expected_account = harness.server_harness.api_server_account(); + + let client = async move { + let cl = reqwest::Client::new(); + + let res = cl + .get(&format!("{}accounts/me", harness.root_url)) + .header("Authorization", format!("Bearer {DUMMY_ACCESS_TOKEN}")) + .send() + .await + .unwrap(); + + pretty_assertions::assert_eq!( + res.json::().await.unwrap(), + json!({ + "accountName": expected_account.account_name, + "id": expected_account.id, + }) + ); + }; + + await_client_server_flow!(harness.server_harness.api_server_run(), client); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +struct AccountInfoHarness { + #[allow(dead_code)] + run_info_dir: tempfile::TempDir, + root_url: url::Url, + server_harness: ServerSideLocalFsHarness, +} + +impl AccountInfoHarness { + async fn new(is_multi_tenant: bool) -> Self { + let run_info_dir = tempfile::tempdir().unwrap(); + + let catalog = dill::CatalogBuilder::new() + .add_value(RunInfoDir::new(run_info_dir.path())) + .build(); + + let server_harness = ServerSideLocalFsHarness::new(ServerSideHarnessOptions { + multi_tenant: is_multi_tenant, + authorized_writes: true, + base_catalog: Some(catalog), + }) + .await; + + let root_url = url::Url::parse( + format!("http://{}", server_harness.api_server_addr()).trim_end_matches('/'), + ) + .unwrap(); + + Self { + run_info_dir, + root_url, + server_harness, + } + } +} diff --git a/src/adapter/http/tests/tests/test_dataset_info.rs b/src/adapter/http/tests/tests/test_dataset_info.rs new file mode 100644 index 000000000..2440354bd --- /dev/null +++ b/src/adapter/http/tests/tests/test_dataset_info.rs @@ -0,0 +1,117 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use chrono::Utc; +use kamu::testing::MetadataFactory; +use kamu_accounts::DUMMY_ACCESS_TOKEN; +use opendatafabric::{DatasetAlias, DatasetID, DatasetKind, DatasetName}; +use serde_json::json; + +use crate::harness::*; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[test_log::test(tokio::test)] +async fn test_get_dataset_info_by_id() { + let harness = DatasetInfoHarness::new(false).await; + + let dataset_alias = DatasetAlias::new(None, DatasetName::new_unchecked("foo")); + let create_result = harness + .server_harness + .cli_create_dataset_use_case() + .execute( + &dataset_alias, + MetadataFactory::metadata_block(MetadataFactory::seed(DatasetKind::Root).build()) + .system_time(Utc::now()) + .build_typed(), + Default::default(), + ) + .await + .unwrap(); + + let client = async move { + let cl = reqwest::Client::new(); + + let res = cl + .get(&format!( + "{}datasets/{}", + harness.root_url, create_result.dataset_handle.id + )) + .header("Authorization", format!("Bearer {DUMMY_ACCESS_TOKEN}")) + .send() + .await + .unwrap() + .error_for_status() + .unwrap(); + + pretty_assertions::assert_eq!( + res.json::().await.unwrap(), + json!({ + "id": create_result.dataset_handle.id, + "datasetName": create_result.dataset_handle.alias.dataset_name, + "accountName": create_result.dataset_handle.alias.account_name, + }) + ); + }; + + await_client_server_flow!(harness.server_harness.api_server_run(), client); +} + +#[test_log::test(tokio::test)] +async fn test_get_dataset_info_by_id_not_found_err() { + let harness = DatasetInfoHarness::new(false).await; + + let client = async move { + let cl = reqwest::Client::new(); + let dataset_id = DatasetID::new_seeded_ed25519(b"foo"); + + let res = cl + .get(&format!("{}datasets/{dataset_id}", harness.root_url)) + .header("Authorization", format!("Bearer {DUMMY_ACCESS_TOKEN}")) + .send() + .await + .unwrap(); + + assert_eq!(404, res.status()); + assert_eq!( + format!("Dataset not found: {dataset_id}"), + res.text().await.unwrap() + ); + }; + + await_client_server_flow!(harness.server_harness.api_server_run(), client); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +struct DatasetInfoHarness { + root_url: url::Url, + server_harness: ServerSideLocalFsHarness, +} + +impl DatasetInfoHarness { + async fn new(is_multi_tenant: bool) -> Self { + let server_harness = ServerSideLocalFsHarness::new(ServerSideHarnessOptions { + multi_tenant: is_multi_tenant, + authorized_writes: true, + base_catalog: None, + }) + .await; + + let root_url = url::Url::parse( + format!("http://{}", server_harness.api_server_addr()).trim_end_matches('/'), + ) + .unwrap(); + + Self { + root_url, + server_harness, + } + } +} diff --git a/src/adapter/http/tests/tests/test_node_info.rs b/src/adapter/http/tests/tests/test_node_info.rs new file mode 100644 index 000000000..894bda0b0 --- /dev/null +++ b/src/adapter/http/tests/tests/test_node_info.rs @@ -0,0 +1,94 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use serde_json::json; + +use crate::harness::*; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[test_log::test(tokio::test)] +async fn test_node_info_single_tenant() { + let harness = NodeInfoHarness::new(false).await; + + let client = async move { + let cl = reqwest::Client::new(); + + let res = cl + .get(&format!("{}info", harness.root_url)) + .send() + .await + .unwrap() + .error_for_status() + .unwrap(); + + pretty_assertions::assert_eq!( + res.json::().await.unwrap(), + json!({ + "isMultiTenant": false + }) + ); + }; + + await_client_server_flow!(harness.server_harness.api_server_run(), client); +} + +#[test_log::test(tokio::test)] +async fn test_node_info_multi_tenant() { + let harness = NodeInfoHarness::new(true).await; + + let client = async move { + let cl = reqwest::Client::new(); + + let res = cl + .get(&format!("{}info", harness.root_url)) + .send() + .await + .unwrap() + .error_for_status() + .unwrap(); + + pretty_assertions::assert_eq!( + res.json::().await.unwrap(), + json!({ + "isMultiTenant": true + }) + ); + }; + + await_client_server_flow!(harness.server_harness.api_server_run(), client); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +struct NodeInfoHarness { + root_url: url::Url, + server_harness: ServerSideLocalFsHarness, +} + +impl NodeInfoHarness { + async fn new(is_multi_tenant: bool) -> Self { + let server_harness = ServerSideLocalFsHarness::new(ServerSideHarnessOptions { + multi_tenant: is_multi_tenant, + authorized_writes: true, + base_catalog: None, + }) + .await; + + let root_url = url::Url::parse( + format!("http://{}", server_harness.api_server_addr()).trim_end_matches('/'), + ) + .unwrap(); + + Self { + root_url, + server_harness, + } + } +} diff --git a/src/adapter/http/tests/tests/tests_push/scenarios/mod.rs b/src/adapter/http/tests/tests/tests_push/scenarios/mod.rs index 93a5f9d72..0102506fd 100644 --- a/src/adapter/http/tests/tests/tests_push/scenarios/mod.rs +++ b/src/adapter/http/tests/tests/tests_push/scenarios/mod.rs @@ -15,6 +15,7 @@ mod scenario_existing_evolved_dataset; mod scenario_existing_ref_collision; mod scenario_existing_up_to_date_dataset; mod scenario_new_dataset; +mod scenario_new_dataset_via_repo_ref; mod scenario_new_empty_dataset; pub(crate) use scenario_aborted_write_of_new_rewrite_succeeds::*; @@ -25,4 +26,5 @@ pub(crate) use scenario_existing_evolved_dataset::*; pub(crate) use scenario_existing_ref_collision::*; pub(crate) use scenario_existing_up_to_date_dataset::*; pub(crate) use scenario_new_dataset::*; +pub(crate) use scenario_new_dataset_via_repo_ref::*; pub(crate) use scenario_new_empty_dataset::*; diff --git a/src/adapter/http/tests/tests/tests_push/scenarios/scenario_new_dataset_via_repo_ref.rs b/src/adapter/http/tests/tests/tests_push/scenarios/scenario_new_dataset_via_repo_ref.rs new file mode 100644 index 000000000..c09647c7d --- /dev/null +++ b/src/adapter/http/tests/tests/tests_push/scenarios/scenario_new_dataset_via_repo_ref.rs @@ -0,0 +1,98 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use kamu::domain::*; +use kamu::testing::MetadataFactory; +use kamu::DatasetLayout; +use opendatafabric::*; + +use crate::harness::{ + commit_add_data_event, + make_dataset_ref, + ClientSideHarness, + ServerSideHarness, +}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub(crate) struct SmartPushNewDatasetViaRepoRefScenario { + pub client_harness: ClientSideHarness, + pub server_harness: TServerHarness, + pub server_dataset_layout: DatasetLayout, + pub client_dataset_layout: DatasetLayout, + pub server_dataset_ref: DatasetPushTarget, + pub client_dataset_ref: DatasetRef, + pub client_commit_result: CommitResult, +} + +impl SmartPushNewDatasetViaRepoRefScenario { + pub async fn prepare( + client_harness: ClientSideHarness, + server_harness: TServerHarness, + ) -> Self { + let client_account_name = client_harness.operating_account_name(); + let server_account_name = server_harness.operating_account_name(); + + let client_create_result = client_harness + .create_dataset_from_snapshot() + .execute( + MetadataFactory::dataset_snapshot() + .name(DatasetAlias::new( + client_account_name.clone(), + DatasetName::new_unchecked("foo"), + )) + .kind(DatasetKind::Root) + .push_event(MetadataFactory::set_polling_source().build()) + .push_event(MetadataFactory::set_data_schema().build()) + .build(), + Default::default(), + ) + .await + .unwrap(); + + let client_dataset_layout = + client_harness.dataset_layout(&client_create_result.dataset_handle.id, "foo"); + + let foo_name = DatasetName::new_unchecked("foo"); + + let server_dataset_layout = server_harness.dataset_layout(&DatasetHandle::new( + client_create_result.dataset_handle.id.clone(), + DatasetAlias::new(server_account_name.clone(), foo_name.clone()), + )); + + let client_dataset_ref = make_dataset_ref(&client_account_name, "foo"); + let client_commit_result = commit_add_data_event( + client_harness.dataset_repository().as_ref(), + &client_dataset_ref, + &client_dataset_layout, + None, + ) + .await; + + client_harness + .add_repository( + &RepoName::new_unchecked("foo"), + &server_harness.api_server_addr(), + ) + .unwrap(); + let server_dataset_ref = DatasetPushTarget::Repository(RepoName::new_unchecked("foo")); + + Self { + client_harness, + server_harness, + server_dataset_layout, + client_dataset_layout, + server_dataset_ref, + client_dataset_ref, + client_commit_result, + } + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/adapter/http/tests/tests/tests_push/test_smart_push_local_fs.rs b/src/adapter/http/tests/tests/tests_push/test_smart_push_local_fs.rs index 4f8aef377..274afe499 100644 --- a/src/adapter/http/tests/tests/tests_push/test_smart_push_local_fs.rs +++ b/src/adapter/http/tests/tests/tests_push/test_smart_push_local_fs.rs @@ -80,3 +80,10 @@ test_client_server_local_fs_harness_permutations!( ); //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +test_client_server_local_fs_harness_permutations!( + test_smart_push_shared, + test_smart_push_via_repo_ref +); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/adapter/http/tests/tests/tests_push/test_smart_push_s3.rs b/src/adapter/http/tests/tests/tests_push/test_smart_push_s3.rs index 0c0aba0ba..841fbd957 100644 --- a/src/adapter/http/tests/tests/tests_push/test_smart_push_s3.rs +++ b/src/adapter/http/tests/tests/tests_push/test_smart_push_s3.rs @@ -77,3 +77,7 @@ test_client_server_s3_harness_permutations!( ); //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +test_client_server_s3_harness_permutations!(test_smart_push_shared, test_smart_push_via_repo_ref); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/adapter/http/tests/tests/tests_push/test_smart_push_shared.rs b/src/adapter/http/tests/tests/tests_push/test_smart_push_shared.rs index 7efa75386..8088d9212 100644 --- a/src/adapter/http/tests/tests/tests_push/test_smart_push_shared.rs +++ b/src/adapter/http/tests/tests/tests_push/test_smart_push_shared.rs @@ -28,7 +28,7 @@ pub(crate) async fn test_smart_push_new_dataset( + a_client_harness: ClientSideHarness, + a_server_harness: TServerHarness, +) { + let scenario = + SmartPushNewDatasetViaRepoRefScenario::prepare(a_client_harness, a_server_harness).await; + + let api_server_handle = scenario.server_harness.api_server_run(); + + let client_handle = async { + let push_result = scenario + .client_harness + .push_dataset_result( + scenario.client_dataset_ref, + scenario.server_dataset_ref, + false, + DatasetVisibility::Private, + ) + .await; + + assert_eq!( + SyncResult::Updated { + old_head: None, + new_head: scenario.client_commit_result.new_head, + num_blocks: 4, + }, + push_result + ); + + DatasetTestHelper::assert_datasets_in_sync( + &scenario.server_dataset_layout, + &scenario.client_dataset_layout, + ); + }; + + await_client_server_flow!(api_server_handle, client_handle); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/adapter/http/tests/tests/tests_push/test_smart_push_special.rs b/src/adapter/http/tests/tests/tests_push/test_smart_push_special.rs index f33459f56..1ab22b597 100644 --- a/src/adapter/http/tests/tests/tests_push/test_smart_push_special.rs +++ b/src/adapter/http/tests/tests/tests_push/test_smart_push_special.rs @@ -50,7 +50,7 @@ async fn test_smart_push_new_dataset_unauthenticated() { .client_harness .push_dataset( scenario.client_dataset_ref, - scenario.server_dataset_ref, + scenario.server_dataset_ref.try_into().unwrap(), false, DatasetVisibility::Private, ) @@ -101,7 +101,7 @@ async fn test_smart_push_new_dataset_wrong_user() { .client_harness .push_dataset( scenario.client_dataset_ref, - wrong_server_dataset_ref, + wrong_server_dataset_ref.try_into().unwrap(), false, DatasetVisibility::Private, ) @@ -145,7 +145,7 @@ async fn test_smart_push_existing_dataset_unauthenticated() { .client_harness .push_dataset( scenario.client_dataset_ref, - scenario.server_dataset_ref, + scenario.server_dataset_ref.try_into().unwrap(), false, DatasetVisibility::Private, ) @@ -189,7 +189,7 @@ async fn test_smart_push_existing_dataset_unauthorized() { .client_harness .push_dataset( scenario.client_dataset_ref, - scenario.server_dataset_ref, + scenario.server_dataset_ref.try_into().unwrap(), false, DatasetVisibility::Private, ) @@ -233,7 +233,7 @@ async fn test_smart_push_existing_ref_collision() { .client_harness .push_dataset( scenario.client_dataset_ref, - scenario.server_dataset_ref, + scenario.server_dataset_ref.try_into().unwrap(), false, DatasetVisibility::Private, ) diff --git a/src/app/cli/src/app.rs b/src/app/cli/src/app.rs index fb7a54fff..bfed1ce7d 100644 --- a/src/app/cli/src/app.rs +++ b/src/app/cli/src/app.rs @@ -419,6 +419,8 @@ pub fn configure_base_catalog( b.add::(); + b.add::(); + b.add::(); b.add::(); diff --git a/src/app/cli/src/cli.rs b/src/app/cli/src/cli.rs index ad74510d2..11610e789 100644 --- a/src/app/cli/src/cli.rs +++ b/src/app/cli/src/cli.rs @@ -12,8 +12,8 @@ use std::path::PathBuf; use clap::{ArgAction, Parser}; use opendatafabric as odf; +use crate::cli_value_parser::{self as parsers}; use crate::{ - cli_value_parser as parsers, LineageOutputFormat, MetadataLogOutputFormat, OutputFormat, @@ -656,6 +656,14 @@ pub struct Login { /// ODF server URL (defaults to kamu.dev) #[arg()] pub server: Option, + + /// Repository name which will be used to store in repositories list + #[arg(long, value_parser = parsers::repo_name)] + pub repo_name: Option, + + /// Don't automatically add a remote repository for this host + #[arg(long)] + pub skip_add_repo: bool, } #[derive(Debug, clap::Subcommand)] @@ -904,8 +912,8 @@ pub struct Push { pub no_alias: bool, /// Remote alias or a URL to push to - #[arg(long, value_name = "REM", value_parser = parsers::dataset_ref_remote)] - pub to: Option, + #[arg(long, value_name = "REM", value_parser = parsers::dataset_push_target)] + pub to: Option, /// Overwrite remote version with local, even if revisions have diverged #[arg(long, short = 'f')] @@ -916,8 +924,8 @@ pub struct Push { pub visibility: parsers::DatasetVisibility, /// Local or remote dataset reference(s) - #[arg(value_parser = parsers::dataset_ref_pattern_any)] - pub dataset: Option>, + #[arg(value_parser = parsers::dataset_ref_pattern)] + pub dataset: Option>, } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/app/cli/src/cli_commands.rs b/src/app/cli/src/cli_commands.rs index f7ef6df72..09c790b6b 100644 --- a/src/app/cli/src/cli_commands.rs +++ b/src/app/cli/src/cli_commands.rs @@ -211,6 +211,7 @@ pub fn get_command( cli_catalog.get_one()?, cli_catalog.get_one()?, cli_catalog.get_one()?, + cli_catalog.get_one()?, if c.user { odf_server::AccessTokenStoreScope::User } else { @@ -219,6 +220,8 @@ pub fn get_command( c.server.map(Into::into), c.access_token, c.check, + c.repo_name, + c.skip_add_repo, )), }, cli::Command::Logout(c) => Box::new(LogoutCommand::new( @@ -276,11 +279,9 @@ pub fn get_command( } } cli::Command::Push(c) => Box::new(PushCommand::new( - cli_catalog.get_one()?, cli_catalog.get_one()?, cli_catalog.get_one()?, c.dataset.unwrap_or_default(), - cli_catalog.get_one()?, c.all, c.recursive, !c.no_alias, diff --git a/src/app/cli/src/cli_value_parser.rs b/src/app/cli/src/cli_value_parser.rs index 226535ea6..0407f9bbe 100644 --- a/src/app/cli/src/cli_value_parser.rs +++ b/src/app/cli/src/cli_value_parser.rs @@ -74,6 +74,20 @@ pub(crate) fn dataset_ref_remote(s: &str) -> Result Result { + match odf::DatasetPushTarget::from_str(s) { + Ok(push_dataset_ref) => Ok(push_dataset_ref), + Err(_) => Err( + "Remote reference should be in form: `repository/account/dataset-name` or \ + `scheme://some-url` or repository reference can only contain alphanumerics, dashes, \ + and dots" + .to_string(), + ), + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + pub(crate) fn repo_name(s: &str) -> Result { match odf::RepoName::try_from(s) { Ok(v) => Ok(v), diff --git a/src/app/cli/src/commands/login_command.rs b/src/app/cli/src/commands/login_command.rs index 9c125e221..bbff377c6 100644 --- a/src/app/cli/src/commands/login_command.rs +++ b/src/app/cli/src/commands/login_command.rs @@ -10,6 +10,8 @@ use std::sync::Arc; use console::style as s; +use kamu::domain::{AddRepoError, RemoteRepositoryRegistry}; +use opendatafabric::RepoName; use url::Url; use crate::{odf_server, CLIError, Command, OutputConfig}; @@ -19,31 +21,40 @@ use crate::{odf_server, CLIError, Command, OutputConfig}; pub struct LoginCommand { login_service: Arc, access_token_registry_service: Arc, + remote_repo_reg: Arc, output_config: Arc, scope: odf_server::AccessTokenStoreScope, server: Option, access_token: Option, check: bool, + repo_name: Option, + skip_add_repo: bool, } impl LoginCommand { pub fn new( login_service: Arc, access_token_registry_service: Arc, + remote_repo_reg: Arc, output_config: Arc, scope: odf_server::AccessTokenStoreScope, server: Option, access_token: Option, check: bool, + repo_name: Option, + skip_add_repo: bool, ) -> Self { Self { login_service, access_token_registry_service, + remote_repo_reg, output_config, scope, server, access_token, check, + repo_name, + skip_add_repo, } } @@ -55,18 +66,37 @@ impl LoginCommand { fn new_login_with_token( &self, - odf_server_frontend_url: &Url, + odf_server_backend_url: &Url, access_token: &str, ) -> Result<(), CLIError> { self.access_token_registry_service.save_access_token( self.scope, - Some(odf_server_frontend_url), - odf_server_frontend_url, + Some(odf_server_backend_url), + odf_server_backend_url, access_token.to_string(), )?; + self.add_repository(odf_server_backend_url, odf_server_backend_url)?; + Ok(()) } + fn add_repository(&self, frontend_url: &Url, backend_url: &Url) -> Result<(), CLIError> { + if self.skip_add_repo { + return Ok(()); + } + let repo_name = self.repo_name.clone().unwrap_or( + RepoName::try_from(frontend_url.host_str().unwrap()).map_err(CLIError::failure)?, + ); + match self + .remote_repo_reg + .add_repository(&repo_name, backend_url.clone()) + { + Ok(_) => Ok(()), + Err(_err @ AddRepoError::AlreadyExists(_)) => Ok(()), + Err(e) => Err(CLIError::failure(e)), + } + } + async fn new_login(&self, odf_server_frontend_url: Url) -> Result<(), CLIError> { let login_callback_response = self .login_service @@ -86,6 +116,11 @@ impl LoginCommand { login_callback_response.access_token, )?; + self.add_repository( + &odf_server_frontend_url, + &login_callback_response.backend_url, + )?; + eprintln!( "{}: {}", console::style("Login successful").green().bold(), @@ -174,7 +209,7 @@ impl Command for LoginCommand { if self.check { return if let Some(token_find_report) = self .access_token_registry_service - .find_by_frontend_or_backend_url(self.scope, &odf_server_url) + .find_by_frontend_or_backend_url(&odf_server_url) { match self.validate_login(token_find_report).await { Ok(_) => { @@ -212,7 +247,7 @@ impl Command for LoginCommand { // non-interactive login, only the interactive one with the browser if let Some(token_find_report) = self .access_token_registry_service - .find_by_frontend_url(self.scope, &odf_server_url) + .find_by_frontend_url(&odf_server_url) { match self.validate_login(token_find_report).await { Ok(_) => { diff --git a/src/app/cli/src/commands/login_silent_command.rs b/src/app/cli/src/commands/login_silent_command.rs index ba5368eeb..f67160ef3 100644 --- a/src/app/cli/src/commands/login_silent_command.rs +++ b/src/app/cli/src/commands/login_silent_command.rs @@ -178,7 +178,7 @@ impl Command for LoginSilentCommand { // Validate token and trigger browser login flow if needed if let Some(token_find_report) = self .access_token_registry_service - .find_by_backend_url(self.scope, &odf_server_backend_url) + .find_by_backend_url(&odf_server_backend_url) { match self.validate_login(token_find_report).await { Ok(_) => { diff --git a/src/app/cli/src/commands/push_command.rs b/src/app/cli/src/commands/push_command.rs index 8e34c5d87..33231f4e4 100644 --- a/src/app/cli/src/commands/push_command.rs +++ b/src/app/cli/src/commands/push_command.rs @@ -13,8 +13,7 @@ use std::time::Duration; use console::style as s; use futures::TryStreamExt; use kamu::domain::*; -use kamu::utils::datasets_filtering::filter_datasets_by_any_pattern; -use kamu_accounts::CurrentAccountSubject; +use kamu::utils::datasets_filtering::filter_datasets_by_local_pattern; use opendatafabric::*; use super::{BatchError, CLIError, Command}; @@ -27,14 +26,12 @@ use crate::output::OutputConfig; pub struct PushCommand { push_svc: Arc, dataset_repo: Arc, - search_svc: Arc, - refs: Vec, - current_account_subject: Arc, + refs: Vec, all: bool, recursive: bool, add_aliases: bool, force: bool, - to: Option, + to: Option, dataset_visibility: DatasetVisibility, output_config: Arc, } @@ -43,26 +40,22 @@ impl PushCommand { pub fn new( push_svc: Arc, dataset_repo: Arc, - search_svc: Arc, refs: I, - current_account_subject: Arc, all: bool, recursive: bool, add_aliases: bool, force: bool, - to: Option, + to: Option, dataset_visibility: DatasetVisibility, output_config: Arc, ) -> Self where - I: IntoIterator, + I: IntoIterator, { Self { push_svc, dataset_repo, - search_svc, refs: refs.into_iter().collect(), - current_account_subject, all, recursive, add_aliases, @@ -77,71 +70,26 @@ impl PushCommand { &self, listener: Option>, ) -> Result, CLIError> { - let current_account_name = match self.current_account_subject.as_ref() { - CurrentAccountSubject::Anonymous(_) => { - return Err(CLIError::usage_error( - "Anonymous account misused, use multi-tenant alias", - )) - } - CurrentAccountSubject::Logged(l) => &l.account_name, - }; - - if let Some(remote_ref) = &self.to { - let local_ref = match self.refs[0].as_dataset_ref_any() { - Some(dataset_ref_any) => dataset_ref_any - .as_local_ref(|_| !self.dataset_repo.is_multi_tenant()) - .map_err(|_| { - CLIError::usage_error( - "When using --to reference should point to a local dataset", - ) - })?, - None => { - return Err(CLIError::usage_error( - "When using --to reference should not point to wildcard pattern", - )) - } - }; - - Ok(self - .push_svc - .push_multi_ext( - vec![PushRequest { - local_ref: Some(local_ref), - remote_ref: Some(remote_ref.clone()), - }], - PushMultiOptions { - all: self.all, - recursive: self.recursive, - add_aliases: self.add_aliases, - sync_options: self.sync_options(), - }, - listener, - ) - .await) - } else { - let dataset_refs: Vec<_> = filter_datasets_by_any_pattern( - self.dataset_repo.as_ref(), - self.search_svc.clone(), - self.refs.clone(), - current_account_name, + let dataset_refs: Vec<_> = + filter_datasets_by_local_pattern(self.dataset_repo.as_ref(), self.refs.clone()) + .map_ok(|dataset_handle| dataset_handle.as_local_ref()) + .try_collect() + .await?; + + Ok(self + .push_svc + .push_multi( + dataset_refs, + PushMultiOptions { + all: self.all, + recursive: self.recursive, + add_aliases: self.add_aliases, + sync_options: self.sync_options(), + remote_target: self.to.clone(), + }, + listener, ) - .try_collect() - .await?; - - Ok(self - .push_svc - .push_multi( - dataset_refs, - PushMultiOptions { - all: self.all, - recursive: self.recursive, - add_aliases: self.add_aliases, - sync_options: self.sync_options(), - }, - listener, - ) - .await) - } + .await) } fn sync_options(&self) -> SyncOptions { @@ -232,10 +180,8 @@ impl Command for PushCommand { .into_iter() .filter(|res| res.result.is_err()) .map(|res| { - ( - res.result.err().unwrap(), - format!("Failed to push {}", res.original_request), - ) + let push_err = format!("Failed to push {res}"); + (res.result.err().unwrap(), push_err) }), ) .into()) diff --git a/src/app/cli/src/explore/api_server.rs b/src/app/cli/src/explore/api_server.rs index d045f48a2..3d238c5fc 100644 --- a/src/app/cli/src/explore/api_server.rs +++ b/src/app/cli/src/explore/api_server.rs @@ -114,6 +114,7 @@ impl APIServer { axum::routing::post(kamu_adapter_http::platform_file_upload_post_handler), ) .nest("/", kamu_adapter_http::data::root_router()) + .nest("/", kamu_adapter_http::general::root_router()) .nest( "/odata", if multi_tenant_workspace { diff --git a/src/app/cli/src/services/odf_server/access_token_registry_service.rs b/src/app/cli/src/services/odf_server/access_token_registry_service.rs index daf5384dd..0b3fd29e1 100644 --- a/src/app/cli/src/services/odf_server/access_token_registry_service.rs +++ b/src/app/cli/src/services/odf_server/access_token_registry_service.rs @@ -63,98 +63,84 @@ impl AccessTokenRegistryService { pub fn find_by_frontend_or_backend_url( &self, - scope: AccessTokenStoreScope, odf_server_url: &Url, ) -> Option { - let registry_ptr = match scope { - AccessTokenStoreScope::User => &self.user_registry, - AccessTokenStoreScope::Workspace => &self.workspace_registry, - }; - - let registry = registry_ptr - .lock() - .expect("Could not lock access tokens registry"); - - if let Some(server_record) = registry.iter().find(|c| { - if &c.backend_url == odf_server_url { - true - } else { - match &c.frontend_url { - Some(frontend_url) => frontend_url == odf_server_url, - _ => false, + for registry_ptr in [&self.workspace_registry, &self.user_registry] { + let registry = registry_ptr + .lock() + .expect("Could not lock access tokens registry"); + let server_record_maybe = registry.iter().find(|c| { + if &c.backend_url == odf_server_url { + true + } else { + match &c.frontend_url { + Some(frontend_url) => frontend_url == odf_server_url, + _ => false, + } } + }); + if let Some(server_record) = server_record_maybe { + return server_record + .token_for_account(self.account_name()) + .map(|ac| AccessTokenFindReport { + backend_url: server_record.backend_url.clone(), + frontend_url: server_record.frontend_url.clone(), + access_token: ac.clone(), + }); } - }) { - server_record - .token_for_account(self.account_name()) - .map(|ac| AccessTokenFindReport { - backend_url: server_record.backend_url.clone(), - frontend_url: server_record.frontend_url.clone(), - access_token: ac.clone(), - }) - } else { - None } + None } pub fn find_by_frontend_url( &self, - scope: AccessTokenStoreScope, odf_server_frontend_url: &Url, ) -> Option { - let registry_ptr = match scope { - AccessTokenStoreScope::User => &self.user_registry, - AccessTokenStoreScope::Workspace => &self.workspace_registry, - }; - - let registry = registry_ptr - .lock() - .expect("Could not lock access tokens registry"); - - if let Some(server_record) = registry.iter().find(|c| match &c.frontend_url { - Some(frontend_url) => frontend_url == odf_server_frontend_url, - _ => false, - }) { - server_record - .token_for_account(self.account_name()) - .map(|ac| AccessTokenFindReport { - backend_url: server_record.backend_url.clone(), - frontend_url: server_record.frontend_url.clone(), - access_token: ac.clone(), - }) - } else { - None + for registry_ptr in [&self.workspace_registry, &self.user_registry] { + let registry = registry_ptr + .lock() + .expect("Could not lock access tokens registry"); + + let server_record_maybe = registry.iter().find(|c| match &c.frontend_url { + Some(frontend_url) => frontend_url == odf_server_frontend_url, + _ => false, + }); + if let Some(server_record) = server_record_maybe { + return server_record + .token_for_account(self.account_name()) + .map(|ac| AccessTokenFindReport { + backend_url: server_record.backend_url.clone(), + frontend_url: server_record.frontend_url.clone(), + access_token: ac.clone(), + }); + } } + None } pub fn find_by_backend_url( &self, - scope: AccessTokenStoreScope, odf_server_backend_url: &Url, ) -> Option { - let registry_ptr = match scope { - AccessTokenStoreScope::User => &self.user_registry, - AccessTokenStoreScope::Workspace => &self.workspace_registry, - }; - - let registry = registry_ptr - .lock() - .expect("Could not lock access tokens registry"); - - if let Some(token_map) = registry - .iter() - .find(|c| &c.backend_url == odf_server_backend_url) - { - token_map - .token_for_account(self.account_name()) - .map(|ac| AccessTokenFindReport { - backend_url: token_map.backend_url.clone(), - frontend_url: token_map.frontend_url.clone(), - access_token: ac.clone(), - }) - } else { - None + for registry_ptr in [&self.workspace_registry, &self.user_registry] { + let registry = registry_ptr + .lock() + .expect("Could not lock access tokens registry"); + + if let Some(token_map) = registry + .iter() + .find(|c| &c.backend_url == odf_server_backend_url) + { + return token_map.token_for_account(self.account_name()).map(|ac| { + AccessTokenFindReport { + backend_url: token_map.backend_url.clone(), + frontend_url: token_map.frontend_url.clone(), + access_token: ac.clone(), + } + }); + } } + None } pub fn save_access_token( @@ -277,17 +263,8 @@ impl kamu::domain::auth::OdfServerAccessTokenResolver for AccessTokenRegistrySer let origin = odf_dataset_http_url.origin().unicode_serialization(); let odf_server_backend_url = Url::parse(origin.as_str()).unwrap(); - if let Some(token_find_report) = - self.find_by_backend_url(AccessTokenStoreScope::Workspace, &odf_server_backend_url) - { - Some(token_find_report.access_token.access_token) - } else if let Some(token_find_report) = - self.find_by_backend_url(AccessTokenStoreScope::User, &odf_server_backend_url) - { - Some(token_find_report.access_token.access_token) - } else { - None - } + self.find_by_backend_url(&odf_server_backend_url) + .map(|token_report| token_report.access_token.access_token) } } diff --git a/src/app/cli/tests/tests/test_access_token_registry_svc.rs b/src/app/cli/tests/tests/test_access_token_registry_svc.rs index 109a08303..11511856f 100644 --- a/src/app/cli/tests/tests/test_access_token_registry_svc.rs +++ b/src/app/cli/tests/tests/test_access_token_registry_svc.rs @@ -134,16 +134,12 @@ async fn test_find_token() { ) .unwrap(); - let report = svc - .find_by_frontend_url(AccessTokenStoreScope::User, &frontend_url) - .unwrap(); + let report = svc.find_by_frontend_url(&frontend_url).unwrap(); assert_eq!(report.frontend_url.as_ref(), Some(&frontend_url)); assert_eq!(report.backend_url, backend_url); assert_eq!(&report.access_token.access_token, "random-token-user"); - let report = svc - .find_by_backend_url(AccessTokenStoreScope::User, &backend_url) - .unwrap(); + let report = svc.find_by_backend_url(&backend_url).unwrap(); assert_eq!(report.frontend_url.as_ref(), Some(&frontend_url)); assert_eq!(report.backend_url, backend_url); assert_eq!(&report.access_token.access_token, "random-token-user"); @@ -187,19 +183,9 @@ async fn test_drop_token() { .unwrap(); assert!(res); - assert!(svc - .find_by_frontend_url(AccessTokenStoreScope::User, &frontend_url) - .is_none()); - assert!(svc - .find_by_frontend_url(AccessTokenStoreScope::Workspace, &frontend_url) - .is_none()); - - assert!(svc - .find_by_backend_url(AccessTokenStoreScope::User, &backend_url) - .is_none()); - assert!(svc - .find_by_backend_url(AccessTokenStoreScope::Workspace, &backend_url) - .is_none()); + assert!(svc.find_by_frontend_url(&frontend_url).is_none()); + + assert!(svc.find_by_backend_url(&backend_url).is_none()); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/accounts/domain/src/testing/mock_authentication_service.rs b/src/domain/accounts/domain/src/testing/mock_authentication_service.rs index 9394e7e42..b39f1158b 100644 --- a/src/domain/accounts/domain/src/testing/mock_authentication_service.rs +++ b/src/domain/accounts/domain/src/testing/mock_authentication_service.rs @@ -163,6 +163,11 @@ impl MockAuthenticationService { pub fn resolving_token(access_token: &str, expected_account_info: Account) -> Self { let mut mock_authentication_service = MockAuthenticationService::new(); + let account_cloned = expected_account_info.clone(); + mock_authentication_service + .expect_account_by_id() + .with(eq(account_cloned.id.clone())) + .returning(move |_| Ok(Some(account_cloned.clone()))); mock_authentication_service .expect_account_by_token() .with(eq(access_token.to_string())) diff --git a/src/domain/core/src/services/push_service.rs b/src/domain/core/src/services/push_service.rs index c63f96569..f87befbf3 100644 --- a/src/domain/core/src/services/push_service.rs +++ b/src/domain/core/src/services/push_service.rs @@ -24,37 +24,33 @@ use crate::{DatasetNotFoundError, GetDatasetError}; pub trait PushService: Send + Sync { async fn push_multi( &self, - dataset_refs: Vec, + dataset_refs: Vec, options: PushMultiOptions, sync_listener: Option>, ) -> Vec; - - async fn push_multi_ext( - &self, - requests: Vec, - options: PushMultiOptions, - sync_listener: Option>, - ) -> Vec; -} - -#[derive(Debug, Clone)] -pub struct PushRequest { - pub local_ref: Option, - pub remote_ref: Option, } #[derive(Debug)] pub struct PushResponse { - /// Parameters passed into the call - pub original_request: PushRequest, /// Local dataset handle, if resolved pub local_handle: Option, /// Destination reference, if resolved - pub remote_ref: Option, + pub target: Option, /// Result of the push operation pub result: Result, } +impl std::fmt::Display for PushResponse { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match (&self.local_handle, &self.target) { + (Some(l), None) => write!(f, "{l}"), + (None, Some(r)) => write!(f, "{r}"), + (Some(l), Some(r)) => write!(f, "{l} to {r}"), + (None, None) => write!(f, "???"), + } + } +} + #[derive(Debug, Clone)] pub struct PushMultiOptions { /// Push all dataset dependencies recursively in depth-first order @@ -65,6 +61,8 @@ pub struct PushMultiOptions { pub add_aliases: bool, /// Sync options pub sync_options: SyncOptions, + /// Destination reference, if resolved + pub remote_target: Option, } impl Default for PushMultiOptions { @@ -74,17 +72,7 @@ impl Default for PushMultiOptions { all: false, add_aliases: true, sync_options: SyncOptions::default(), - } - } -} - -impl std::fmt::Display for PushRequest { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match (&self.local_ref, &self.remote_ref) { - (Some(l), None) => write!(f, "{l}"), - (None, Some(r)) => write!(f, "{r}"), - (Some(l), Some(r)) => write!(f, "{l} to {r}"), - (None, None) => write!(f, "???"), + remote_target: None, } } } diff --git a/src/domain/core/src/services/remote_aliases_registry.rs b/src/domain/core/src/services/remote_aliases_registry.rs index 85e5c4da0..d181d92bd 100644 --- a/src/domain/core/src/services/remote_aliases_registry.rs +++ b/src/domain/core/src/services/remote_aliases_registry.rs @@ -46,3 +46,103 @@ impl From for GetAliasesError { } } } + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// RemoteAliasResolver +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[async_trait] +pub trait RemoteAliasResolver: Send + Sync { + // Resolve remote alias reference. + // Firstly try to resolve from AliasRegistry, if cannot do it + // try to resolve via repository registry + async fn resolve_remote_alias( + &self, + local_dataset_handle: &DatasetHandle, + dataset_push_target_maybe: Option, + remote_alias_kind: RemoteAliasKind, + ) -> Result; +} + +#[derive(Debug, Clone)] +pub struct RemoteAliasRef { + pub url: url::Url, + pub repo_name: Option, + pub dataset_name: Option, + pub account_name: Option, +} + +impl RemoteAliasRef { + pub fn new( + url: url::Url, + repo_name: Option, + dataset_name: Option, + account_name: Option, + ) -> Self { + Self { + url, + repo_name, + dataset_name, + account_name, + } + } +} + +#[derive(Error, Debug)] +pub enum ResolveAliasError { + #[error(transparent)] + RepositoryNotFound( + #[from] + #[backtrace] + RepositoryNotFoundError, + ), + #[error("Cannot choose between multiple repositories")] + AmbiguousRepository, + #[error("Cannot choose between multiple push aliases")] + AmbiguousAlias, + #[error("Repositories list is empty")] + EmptyRepositoryList, + #[error(transparent)] + Internal( + #[from] + #[backtrace] + InternalError, + ), +} + +impl From for PushError { + fn from(val: ResolveAliasError) -> Self { + match val { + ResolveAliasError::AmbiguousAlias | ResolveAliasError::AmbiguousRepository => { + Self::AmbiguousTarget + } + ResolveAliasError::EmptyRepositoryList | ResolveAliasError::RepositoryNotFound(_) => { + Self::NoTarget + } + ResolveAliasError::Internal(e) => Self::Internal(e), + } + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug, Error)] +pub enum GetRemoteAccountError { + #[error(transparent)] + InvalidResponse(InvalidApiResponseError), + + #[error(transparent)] + Internal(InternalError), +} + +#[derive(Debug, Error)] +#[error("Invalid gql response: {response}")] +pub struct InvalidApiResponseError { + pub response: String, +} + +impl From for GetRemoteAccountError { + fn from(value: InternalError) -> Self { + Self::Internal(value) + } +} diff --git a/src/domain/opendatafabric/src/identity/dataset_refs.rs b/src/domain/opendatafabric/src/identity/dataset_refs.rs index e7843fc91..76bffdd93 100644 --- a/src/domain/opendatafabric/src/identity/dataset_refs.rs +++ b/src/domain/opendatafabric/src/identity/dataset_refs.rs @@ -11,6 +11,7 @@ use std::fmt; use std::str::FromStr; use std::sync::Arc; +use thiserror::Error; use url::Url; use super::grammar::Grammar; @@ -929,3 +930,72 @@ impl std::str::FromStr for DatasetRefAnyPattern { super::dataset_identity::impl_parse_error!(DatasetRefAnyPattern); //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum DatasetPushTarget { + Url(Url), + Alias(DatasetAliasRemote), + Repository(RepoName), +} + +impl std::str::FromStr for DatasetPushTarget { + type Err = ParseError; + + fn from_str(s: &str) -> Result { + match DatasetAliasRemote::from_str(s) { + Ok(alias) => Ok(Self::Alias(alias)), + Err(_) => match RepoName::from_str(s) { + Ok(repo_name) => Ok(Self::Repository(repo_name)), + Err(_) => match Grammar::match_url(s) { + Some(_) => match Url::from_str(s) { + Ok(url) => Ok(Self::Url(url)), + Err(_) => Err(Self::Err::new(s)), + }, + None => Err(Self::Err::new(s)), + }, + }, + } + } +} + +impl DatasetPushTarget { + pub fn into_repo_name(self) -> Option { + match self { + Self::Alias(dataset_alias_remote) => Some(dataset_alias_remote.repo_name), + Self::Repository(repo_name) => Some(repo_name), + Self::Url(_) => None, + } + } +} + +impl fmt::Display for DatasetPushTarget { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Alias(v) => write!(f, "{v}"), + Self::Url(v) => write!(f, "{v}"), + Self::Repository(v) => write!(f, "{v}"), + } + } +} + +impl TryFrom for DatasetPushTarget { + type Error = DatasetPushTargetError; + + fn try_from(value: DatasetRefRemote) -> Result { + match value { + DatasetRefRemote::Alias(remote_alias_ref) => Ok(Self::Alias(remote_alias_ref)), + DatasetRefRemote::Url(url_ref) => Ok(Self::Url(url_ref.as_ref().clone())), + _ => Err(Self::Error::UnsupportedType), + } + } +} + +#[derive(Error, Debug)] +pub enum DatasetPushTargetError { + #[error("Unsupported type to cast")] + UnsupportedType, +} + +super::dataset_identity::impl_parse_error!(DatasetPushTarget); + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/domain/opendatafabric/tests/tests/test_dataset_refs.rs b/src/domain/opendatafabric/tests/tests/test_dataset_refs.rs index ed72ed411..a3f14a3c4 100644 --- a/src/domain/opendatafabric/tests/tests/test_dataset_refs.rs +++ b/src/domain/opendatafabric/tests/tests/test_dataset_refs.rs @@ -286,3 +286,50 @@ fn test_dataset_ref_any_pattern() { ), ); } + +#[test] +fn test_dataset_push_target() { + // Parse valid remote url ref + let param = "http://net.example.com"; + let res = DatasetPushTarget::from_str(param).unwrap(); + + assert_eq!(res, DatasetPushTarget::Url(Url::from_str(param).unwrap())); + + // Parse valid remote single tenant alias ref + let repo_name = RepoName::new_unchecked("net.example.com"); + let dataset_name = DatasetName::new_unchecked("foo"); + + let res = DatasetPushTarget::from_str(&format!("{repo_name}/{dataset_name}")).unwrap(); + + assert_eq!( + res, + DatasetPushTarget::Alias(DatasetAliasRemote::new( + repo_name.clone(), + None, + dataset_name.clone() + )) + ); + + // Parse valid remote single tenant alias ref + let account_name = AccountName::new_unchecked("bar"); + let res = + DatasetPushTarget::from_str(&format!("{repo_name}/{account_name}/{dataset_name}")).unwrap(); + + assert_eq!( + res, + DatasetPushTarget::Alias(DatasetAliasRemote::new( + repo_name.clone(), + Some(account_name.clone()), + dataset_name.clone() + )) + ); + + // Parse valid repository ref + let param = "net.example.com"; + let res = DatasetPushTarget::from_str(param).unwrap(); + + assert_eq!( + res, + DatasetPushTarget::Repository(RepoName::new_unchecked(param)) + ); +} diff --git a/src/infra/core/src/lib.rs b/src/infra/core/src/lib.rs index fd050b95d..afe7dfa8d 100644 --- a/src/infra/core/src/lib.rs +++ b/src/infra/core/src/lib.rs @@ -38,6 +38,7 @@ mod provenance_service_impl; mod pull_service_impl; mod push_service_impl; mod query_service_impl; +mod remote_alias_resolver_impl; mod remote_aliases_registry_impl; mod remote_repository_registry_impl; mod reset_service_impl; @@ -60,6 +61,7 @@ pub use provenance_service_impl::*; pub use pull_service_impl::*; pub use push_service_impl::*; pub use query_service_impl::*; +pub use remote_alias_resolver_impl::*; pub use remote_aliases_registry_impl::*; pub use remote_repository_registry_impl::*; pub use repos::*; diff --git a/src/infra/core/src/push_service_impl.rs b/src/infra/core/src/push_service_impl.rs index 8f2c048e9..0c18b6bb9 100644 --- a/src/infra/core/src/push_service_impl.rs +++ b/src/infra/core/src/push_service_impl.rs @@ -10,13 +10,13 @@ use std::sync::Arc; use dill::*; -use internal_error::ResultIntoInternal; use kamu_core::*; use opendatafabric::*; pub struct PushServiceImpl { dataset_repo: Arc, remote_alias_reg: Arc, + remote_alias_resolver: Arc, sync_svc: Arc, } @@ -26,21 +26,27 @@ impl PushServiceImpl { pub fn new( dataset_repo: Arc, remote_alias_reg: Arc, + remote_alias_resolver: Arc, sync_svc: Arc, ) -> Self { Self { dataset_repo, remote_alias_reg, + remote_alias_resolver, sync_svc, } } - async fn collect_plan(&self, items: &Vec) -> (Vec, Vec) { + async fn collect_plan( + &self, + items: &Vec, + push_target: &Option, + ) -> (Vec, Vec) { let mut plan = Vec::new(); let mut errors = Vec::new(); - for request in items { - match self.collect_plan_item(request.clone()).await { + for dataset_ref in items { + match self.collect_plan_item(dataset_ref, push_target).await { Ok(item) => plan.push(item), Err(err) => errors.push(err), } @@ -49,137 +55,39 @@ impl PushServiceImpl { (plan, errors) } - async fn collect_plan_item(&self, request: PushRequest) -> Result { + async fn collect_plan_item( + &self, + dataset_ref: &DatasetRef, + push_target: &Option, + ) -> Result { // Resolve local dataset if we have a local reference - let local_handle = if let Some(local_ref) = &request.local_ref { - match self.dataset_repo.resolve_dataset_ref(local_ref).await { - Ok(h) => Some(h), - Err(e) => { - return Err(PushResponse { - local_handle: None, - remote_ref: request.remote_ref.clone(), - result: Err(e.into()), - original_request: request, - }) - } + let local_handle = match self.dataset_repo.resolve_dataset_ref(dataset_ref).await { + Ok(h) => h, + Err(e) => { + return Err(PushResponse { + local_handle: None, + target: push_target.clone(), + result: Err(e.into()), + }) } - } else { - None }; - match &request { - PushRequest { - local_ref: None, - remote_ref: None, - } => panic!("Push request must contain either local or remote reference"), - PushRequest { - local_ref: Some(_), - remote_ref: None, - } => match self - .resolve_push_alias(local_handle.as_ref().unwrap()) - .await - { - Ok(remote_ref) => Ok(PushItem { - local_handle: local_handle.unwrap(), - remote_ref, - original_request: request, - }), - Err(e) => Err(PushResponse { - local_handle, - remote_ref: request.remote_ref.clone(), - result: Err(e), - original_request: request, - }), - }, - PushRequest { - local_ref: None, - remote_ref: Some(remote_ref), - } => match self.inverse_lookup_dataset_by_push_alias(remote_ref).await { - Ok(local_handle) => Ok(PushItem { - local_handle, - remote_ref: remote_ref.clone(), - original_request: request, - }), - Err(e) => Err(PushResponse { - local_handle: None, - remote_ref: Some(remote_ref.clone()), - result: Err(e), - original_request: request, - }), - }, - PushRequest { - local_ref: Some(_), - remote_ref: Some(remote_ref), - } => Ok(PushItem { - local_handle: local_handle.unwrap(), - remote_ref: remote_ref.clone(), - original_request: request, - }), - } - } - - async fn resolve_push_alias( - &self, - local_handle: &DatasetHandle, - ) -> Result { - let remote_aliases = self - .remote_alias_reg - .get_remote_aliases(&local_handle.as_local_ref()) + match self + .remote_alias_resolver + .resolve_remote_alias(&local_handle, push_target.clone(), RemoteAliasKind::Push) .await - .int_err()?; - - let mut push_aliases: Vec<_> = remote_aliases.get_by_kind(RemoteAliasKind::Push).collect(); - - match push_aliases.len() { - 0 => Err(PushError::NoTarget), - 1 => Ok(push_aliases.remove(0).clone()), - _ => Err(PushError::AmbiguousTarget), - } - } - - // TODO: avoid traversing all datasets for every alias - async fn inverse_lookup_dataset_by_push_alias( - &self, - remote_ref: &DatasetRefRemote, - ) -> Result { - // Do a quick check when remote and local names match - if let Some(remote_name) = remote_ref.dataset_name() { - if let Some(local_handle) = self - .dataset_repo - .try_resolve_dataset_ref( - &DatasetAlias::new(None, remote_name.clone()).as_local_ref(), - ) - .await? - { - if self - .remote_alias_reg - .get_remote_aliases(&local_handle.as_local_ref()) - .await - .int_err()? - .contains(remote_ref, RemoteAliasKind::Push) - { - return Ok(local_handle); - } - } - } - - // No luck - now have to search through aliases - use tokio_stream::StreamExt; - let mut datasets = self.dataset_repo.get_all_datasets(); - while let Some(dataset_handle) = datasets.next().await { - let dataset_handle = dataset_handle?; - - if self - .remote_alias_reg - .get_remote_aliases(&dataset_handle.as_local_ref()) - .await - .int_err()? - .contains(remote_ref, RemoteAliasKind::Push) - { - return Ok(dataset_handle); - } + { + Ok(remote_alias_ref) => Ok(PushItem { + local_handle, + remote_alias_ref, + push_target: push_target.clone(), + }), + Err(e) => Err(PushResponse { + local_handle: Some(local_handle), + target: push_target.clone(), + result: Err(e.into()), + }), } - Err(PushError::NoTarget) } } @@ -187,32 +95,7 @@ impl PushServiceImpl { impl PushService for PushServiceImpl { async fn push_multi( &self, - dataset_refs: Vec, - options: PushMultiOptions, - sync_listener: Option>, - ) -> Vec { - let requests = dataset_refs - .into_iter() - .map( - |r| match r.as_local_ref(|_| !self.dataset_repo.is_multi_tenant()) { - Ok(local_ref) => PushRequest { - local_ref: Some(local_ref), - remote_ref: None, - }, - Err(remote_ref) => PushRequest { - local_ref: None, - remote_ref: Some(remote_ref), - }, - }, - ) - .collect(); - - self.push_multi_ext(requests, options, sync_listener).await - } - - async fn push_multi_ext( - &self, - initial_requests: Vec, + dataset_refs: Vec, options: PushMultiOptions, sync_listener: Option>, ) -> Vec { @@ -223,7 +106,9 @@ impl PushService for PushServiceImpl { unimplemented!("Pushing all datasets is not yet supported") } - let (plan, errors) = self.collect_plan(&initial_requests).await; + let (plan, errors) = self + .collect_plan(&dataset_refs, &options.remote_target) + .await; if !errors.is_empty() { return errors; } @@ -234,7 +119,7 @@ impl PushService for PushServiceImpl { plan.iter() .map(|pi| SyncRequest { src: pi.local_handle.as_any_ref(), - dst: pi.remote_ref.as_any_ref(), + dst: (&pi.remote_alias_ref.url).into(), }) .collect(), options.sync_options, @@ -244,31 +129,29 @@ impl PushService for PushServiceImpl { assert_eq!(plan.len(), sync_results.len()); - let results: Vec<_> = std::iter::zip(plan, sync_results) + let results: Vec<_> = std::iter::zip(&plan, sync_results) .map(|(pi, res)| { + let remote_ref: DatasetRefAny = (&pi.remote_alias_ref.url).into(); assert_eq!(pi.local_handle.as_any_ref(), res.src); - assert_eq!(pi.remote_ref.as_any_ref(), res.dst); - pi.into_response(res.result) + assert_eq!(remote_ref, res.dst); + pi.as_response(res.result) }) .collect(); // If no errors - add aliases to initial items if options.add_aliases && results.iter().all(|r| r.result.is_ok()) { - for request in &initial_requests { - if let PushRequest { - local_ref: Some(local_ref), - remote_ref: Some(remote_ref), - } = request - { - // TODO: Improve error handling - self.remote_alias_reg - .get_remote_aliases(local_ref) - .await - .unwrap() - .add(remote_ref, RemoteAliasKind::Push) - .await - .unwrap(); - } + for push_item in &plan { + // TODO: Improve error handling + self.remote_alias_reg + .get_remote_aliases(&(push_item.local_handle.as_local_ref())) + .await + .unwrap() + .add( + &((&push_item.remote_alias_ref.url).into()), + RemoteAliasKind::Push, + ) + .await + .unwrap(); } } @@ -278,17 +161,16 @@ impl PushService for PushServiceImpl { #[derive(Debug)] struct PushItem { - original_request: PushRequest, local_handle: DatasetHandle, - remote_ref: DatasetRefRemote, + remote_alias_ref: RemoteAliasRef, + push_target: Option, } impl PushItem { - fn into_response(self, result: Result) -> PushResponse { + fn as_response(&self, result: Result) -> PushResponse { PushResponse { - original_request: self.original_request, - local_handle: Some(self.local_handle), - remote_ref: Some(self.remote_ref), + local_handle: Some(self.local_handle.clone()), + target: self.push_target.clone(), result: result.map_err(Into::into), } } diff --git a/src/infra/core/src/remote_alias_resolver_impl.rs b/src/infra/core/src/remote_alias_resolver_impl.rs new file mode 100644 index 000000000..3429a6963 --- /dev/null +++ b/src/infra/core/src/remote_alias_resolver_impl.rs @@ -0,0 +1,296 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::str::FromStr; +use std::sync::Arc; + +use auth::OdfServerAccessTokenResolver; +use dill::*; +use internal_error::{InternalError, ResultIntoInternal}; +use kamu_core::*; +use opendatafabric as odf; +use url::Url; + +use crate::UrlExt; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub struct RemoteAliasResolverImpl { + remote_repo_reg: Arc, + access_token_resolver: Arc, + remote_alias_reg: Arc, +} + +#[component(pub)] +#[interface(dyn RemoteAliasResolver)] +impl RemoteAliasResolverImpl { + pub fn new( + remote_repo_reg: Arc, + access_token_resolver: Arc, + remote_alias_reg: Arc, + ) -> Self { + Self { + remote_repo_reg, + access_token_resolver, + remote_alias_reg, + } + } + + async fn fetch_remote_url( + &self, + local_handle: &odf::DatasetHandle, + remote_alias_kind: RemoteAliasKind, + ) -> Result, ResolveAliasError> { + let remote_aliases = self + .remote_alias_reg + .get_remote_aliases(&local_handle.as_local_ref()) + .await + .int_err()?; + + let aliases: Vec<_> = remote_aliases.get_by_kind(remote_alias_kind).collect(); + + match aliases.len() { + 0 => Ok(None), + 1 => { + if let odf::DatasetRefRemote::Url(remote_url) = aliases[0].clone() { + return Ok(Some(remote_url.as_ref().clone())); + } + Ok(None) + } + _ => Err(ResolveAliasError::AmbiguousAlias), + } + } + + fn combine_remote_url( + &self, + repo_url: &Url, + account_name_maybe: Option<&odf::AccountName>, + dataset_name: &odf::DatasetName, + ) -> Result { + let mut res_url = repo_url.clone().as_odf_protocol().int_err()?; + { + let mut path_segments = res_url.path_segments_mut().unwrap(); + if let Some(account_name) = account_name_maybe { + path_segments.push(account_name); + } + path_segments.push(dataset_name); + } + Ok(res_url) + } + + async fn resolve_remote_dataset_name( + &self, + dataset_handle: &odf::DatasetHandle, + remote_repo_url: &Url, + access_token_maybe: Option<&String>, + ) -> Result { + let result = if let Some(remote_dataset_name) = + RemoteAliasResolverApiHelper::fetch_remote_dataset_name( + remote_repo_url, + &dataset_handle.id, + access_token_maybe, + ) + .await? + { + remote_dataset_name + } else { + dataset_handle.alias.dataset_name.clone() + }; + Ok(result) + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#[async_trait::async_trait] +impl RemoteAliasResolver for RemoteAliasResolverImpl { + async fn resolve_remote_alias( + &self, + local_dataset_handle: &odf::DatasetHandle, + dataset_push_target_maybe: Option, + remote_alias_kind: RemoteAliasKind, + ) -> Result { + let repo_name: odf::RepoName; + let mut account_name = None; + let mut dataset_name = None; + + if let Some(dataset_push_target) = &dataset_push_target_maybe { + match dataset_push_target { + odf::DatasetPushTarget::Alias(dataset_alias_remote) => { + repo_name = dataset_alias_remote.repo_name.clone(); + account_name.clone_from(&dataset_alias_remote.account_name); + dataset_name = Some(dataset_alias_remote.dataset_name.clone()); + } + odf::DatasetPushTarget::Url(url_ref) => { + return Ok(RemoteAliasRef::new( + url_ref.clone(), + None, + dataset_name, + account_name, + )); + } + odf::DatasetPushTarget::Repository(repository_name) => { + repo_name = repository_name.clone(); + } + } + } else { + if let Some(remote_url) = self + .fetch_remote_url(local_dataset_handle, remote_alias_kind) + .await? + { + return Ok(RemoteAliasRef::new( + remote_url, + None, + dataset_name, + account_name, + )); + } + let remote_repo_names: Vec<_> = self.remote_repo_reg.get_all_repositories().collect(); + if remote_repo_names.len() > 1 { + return Err(ResolveAliasError::AmbiguousRepository); + } + if let Some(repository_name) = remote_repo_names.first() { + repo_name = repository_name.clone(); + } else { + return Err(ResolveAliasError::EmptyRepositoryList); + } + } + let remote_repo = self.remote_repo_reg.get_repository(&repo_name).int_err()?; + + let access_token_maybe = self + .access_token_resolver + .resolve_odf_dataset_access_token(&remote_repo.url); + if account_name.is_none() { + account_name = RemoteAliasResolverApiHelper::resolve_remote_account_name( + &remote_repo.url, + access_token_maybe.as_ref(), + ) + .await + .int_err()?; + } + let transfer_dataset_name = dataset_name.clone().unwrap_or( + self.resolve_remote_dataset_name( + local_dataset_handle, + &remote_repo.url, + access_token_maybe.as_ref(), + ) + .await?, + ); + + let remote_url = self.combine_remote_url( + &remote_repo.url, + account_name.as_ref(), + &transfer_dataset_name, + )?; + + return Ok(RemoteAliasRef::new( + remote_url, + Some(repo_name), + dataset_name, + account_name, + )); + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub struct RemoteAliasResolverApiHelper {} + +impl RemoteAliasResolverApiHelper { + fn build_headers_map(access_token_maybe: Option<&String>) -> http::HeaderMap { + let mut header_map = http::HeaderMap::new(); + if let Some(access_token) = access_token_maybe { + header_map.append( + http::header::AUTHORIZATION, + http::HeaderValue::from_str(format!("Bearer {access_token}").as_str()).unwrap(), + ); + }; + header_map + } + + // Return account name if remote workspace is in multi tenant mode + pub async fn resolve_remote_account_name( + server_backend_url: &Url, + access_token_maybe: Option<&String>, + ) -> Result, GetRemoteAccountError> { + let client = reqwest::Client::new(); + let header_map = Self::build_headers_map(access_token_maybe); + + let workspace_info_response = client + .get(server_backend_url.join("info").unwrap()) + .headers(header_map.clone()) + .send() + .await + .int_err()? + .error_for_status() + .int_err()?; + let json_workspace_info_response: serde_json::Value = + workspace_info_response.json().await.int_err()?; + + if let Some(is_multi_tenant) = json_workspace_info_response["isMultiTenant"].as_bool() + && !is_multi_tenant + { + return Ok(None); + } + + let account_response = client + .get(server_backend_url.join("accounts/me").unwrap()) + .headers(header_map) + .send() + .await + .int_err()? + .error_for_status() + .int_err()?; + let json_account_response: serde_json::Value = account_response.json().await.int_err()?; + + if let Some(api_account_name) = json_account_response["accountName"].as_str() { + let account_name = odf::AccountName::from_str(api_account_name).map_err(|_| { + GetRemoteAccountError::InvalidResponse(InvalidApiResponseError { + response: json_account_response.to_string(), + }) + })?; + return Ok(Some(account_name)); + } + Err(GetRemoteAccountError::InvalidResponse( + InvalidApiResponseError { + response: json_account_response.to_string(), + }, + )) + } + + pub async fn fetch_remote_dataset_name( + server_backend_url: &Url, + dataset_id: &odf::DatasetID, + access_token_maybe: Option<&String>, + ) -> Result, ResolveAliasError> { + let client = reqwest::Client::new(); + let header_map = Self::build_headers_map(access_token_maybe); + + let response = client + .get( + server_backend_url + .join(&format!("datasets/{dataset_id}")) + .unwrap(), + ) + .headers(header_map) + .send() + .await + .int_err()?; + if response.status() == http::StatusCode::NOT_FOUND { + return Ok(None); + } + let json_response: serde_json::Value = response.json().await.int_err()?; + + if let Some(res_dataset_name) = json_response["datasetName"].as_str() { + let dataset_name = odf::DatasetName::try_from(res_dataset_name).int_err()?; + return Ok(Some(dataset_name)); + } + Ok(None) + } +} diff --git a/src/infra/core/src/sync_service_impl.rs b/src/infra/core/src/sync_service_impl.rs index efa3bb4d0..56d3334ca 100644 --- a/src/infra/core/src/sync_service_impl.rs +++ b/src/infra/core/src/sync_service_impl.rs @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::str::FromStr; use std::sync::Arc; use dill::*; @@ -695,6 +696,7 @@ impl SyncRef { pub trait UrlExt { fn ensure_trailing_slash(&mut self); fn is_odf_protocol(&self) -> bool; + fn as_odf_protocol(&self) -> Result; /// Converts from odf+http(s) scheme to plain http(s) fn odf_to_transport_protocol(&self) -> Result; @@ -707,6 +709,11 @@ impl UrlExt for Url { } } + fn as_odf_protocol(&self) -> Result { + let url_string = self.as_str().replace("http", "odf+http"); + Url::from_str(&url_string).int_err() + } + fn is_odf_protocol(&self) -> bool { self.scheme().starts_with("odf+") }