diff --git a/backend/gateways/spacegate-plugins/src/plugin/audit_log.rs b/backend/gateways/spacegate-plugins/src/plugin/audit_log.rs index 11c37eb6b..c552e94a9 100644 --- a/backend/gateways/spacegate-plugins/src/plugin/audit_log.rs +++ b/backend/gateways/spacegate-plugins/src/plugin/audit_log.rs @@ -3,7 +3,7 @@ use std::collections::HashMap; use std::str::FromStr; use std::time::Instant; -use bios_sdk_invoke::clients::spi_log_client; +use bios_sdk_invoke::clients::spi_log_client::{self, LogItemAddReq}; use bios_sdk_invoke::invoke_config::InvokeConfig; use bios_sdk_invoke::invoke_enumeration::InvokeModuleKind; use bios_sdk_invoke::invoke_initializer; @@ -141,14 +141,16 @@ impl AuditLogPlugin { } fn init(&mut self) -> Result<(), TardisError> { - if !self.log_url.is_empty() && !self.spi_app_id.is_empty() { - if let Ok(jsonpath_inst) = JsonPathInst::from_str(&self.success_json_path).map_err(|e| log::error!("[Plugin.AuditLog] invalid json path:{e}")) { - self.jsonpath_inst = Some(jsonpath_inst); - } else { - self.enabled = false; - return Ok(()); - }; - self.enabled = true; + if let Ok(jsonpath_inst) = JsonPathInst::from_str(&self.success_json_path).map_err(|e| log::error!("[Plugin.AuditLog] invalid json path:{e}")) { + self.jsonpath_inst = Some(jsonpath_inst); + } else { + self.enabled = false; + return Err(TardisError::bad_request("[Plugin.AuditLog] plugin is not active, miss log_url or spi_app_id.", "")); + }; + self.enabled = true; + if self.log_url.is_empty() || self.spi_app_id.is_empty() { + warn!("[Plugin.AuditLog] log_url or spi_app_id is empty!"); + } else { invoke_initializer::init( CODE, InvokeConfig { @@ -156,11 +158,9 @@ impl AuditLogPlugin { module_urls: HashMap::from([(InvokeModuleKind::Log.to_string(), self.log_url.clone())]), }, )?; - Ok(()) - } else { - self.enabled = false; - Err(TardisError::bad_request("[Plugin.AuditLog] plugin is not active, miss log_url or spi_app_id.", "")) } + + Ok(()) } fn req(&self, mut req: Request) -> Result, Response> { @@ -206,31 +206,36 @@ impl AuditLogPlugin { }; let tag = self.tag.clone(); - tokio::task::spawn(async move { - match spi_log_client::SpiLogClient::add_with_many_params( - &tag, - &TardisFuns::json.obj_to_string(&content).unwrap_or_default(), - Some(content.to_value()), - None, - None, - Some(content.op), - None, - Some(tardis::chrono::Utc::now().to_rfc3339()), - content.user_id, - None, - &funs, - &spi_ctx, - ) - .await - { - Ok(_) => { - log::trace!("[Plugin.AuditLog] add log success") - } - Err(e) => { - log::warn!("[Plugin.AuditLog] failed to add log:{e}") - } - }; - }); + if !self.log_url.is_empty() && !self.spi_app_id.is_empty() { + tokio::task::spawn(async move { + match spi_log_client::SpiLogClient::add( + &LogItemAddReq { + tag, + content: TardisFuns::json.obj_to_string(&content).unwrap_or_default(), + kind: None, + ext: Some(content.to_value()), + key: None, + op: Some(content.op), + rel_key: None, + id: None, + ts: Some(tardis::chrono::Utc::now()), + owner: content.user_id, + own_paths: None, + }, + &funs, + &spi_ctx, + ) + .await + { + Ok(_) => { + log::trace!("[Plugin.AuditLog] add log success") + } + Err(e) => { + log::warn!("[Plugin.AuditLog] failed to add log:{e}") + } + }; + }); + } } Ok(resp) diff --git a/backend/middlewares/schedule/src/serv/schedule_job_serv_v2.rs b/backend/middlewares/schedule/src/serv/schedule_job_serv_v2.rs index c4464ccc9..e40198d3b 100644 --- a/backend/middlewares/schedule/src/serv/schedule_job_serv_v2.rs +++ b/backend/middlewares/schedule/src/serv/schedule_job_serv_v2.rs @@ -52,7 +52,7 @@ pub(crate) fn init() { if !TardisFuns::web_server().is_running().await { tardis::tokio::task::yield_now().await; } else { - break + break; } } // 五秒钟轮询一次 diff --git a/backend/spi/spi-object/src/object_constants.rs b/backend/spi/spi-object/src/object_constants.rs index 2dd3bd174..f34602dcb 100644 --- a/backend/spi/spi-object/src/object_constants.rs +++ b/backend/spi/spi-object/src/object_constants.rs @@ -1,2 +1,3 @@ pub const DOMAIN_CODE: &str = "spi-object"; pub const SPI_S3_KIND_CODE: &str = "spi-bs-s3"; +pub const SPI_OBS_KIND_CODE: &str = "spi-bs-obs"; diff --git a/backend/spi/spi-object/src/object_initializer.rs b/backend/spi/spi-object/src/object_initializer.rs index bb0ec0a08..90516ca6d 100644 --- a/backend/spi/spi-object/src/object_initializer.rs +++ b/backend/spi/spi-object/src/object_initializer.rs @@ -41,6 +41,8 @@ pub async fn init_fun(bs_cert: SpiBsCertResp, ctx: &TardisContext, mgr: bool) -> let inst = match bs_cert.kind_code.as_str() { #[cfg(feature = "spi-s3")] object_constants::SPI_S3_KIND_CODE => serv::s3::object_s3_initializer::init(&bs_cert, ctx, mgr).await, + #[cfg(feature = "spi-s3")] + object_constants::SPI_OBS_KIND_CODE => serv::obs::object_obs_initializer::init(&bs_cert, ctx, mgr).await, _ => Err(bs_cert.bs_not_implemented())?, }?; info!("[BIOS.Object] Fun [{}]({}) initialized", bs_cert.kind_code, bs_cert.conn_uri); diff --git a/backend/spi/spi-object/src/serv.rs b/backend/spi/spi-object/src/serv.rs index 1d30ccd4d..1c21f0167 100644 --- a/backend/spi/spi-object/src/serv.rs +++ b/backend/spi/spi-object/src/serv.rs @@ -1,2 +1,3 @@ pub mod object_obj_serv; +pub mod obs; pub mod s3; diff --git a/backend/spi/spi-object/src/serv/object_obj_serv.rs b/backend/spi/spi-object/src/serv/object_obj_serv.rs index afd9eb07c..9dd8dfec9 100644 --- a/backend/spi/spi-object/src/serv/object_obj_serv.rs +++ b/backend/spi/spi-object/src/serv/object_obj_serv.rs @@ -8,7 +8,8 @@ use tardis::TardisFunsInst; use crate::dto::object_dto::{ObjectBatchBuildCreatePresignUrlReq, ObjectCompleteMultipartUploadReq, ObjectInitiateMultipartUploadReq, ObjectObjPresignKind}; use crate::{object_constants, object_initializer}; -use super::s3; +use super::s3::S3 as _; +use super::{obs, s3}; pub async fn presign_obj_url( presign_kind: ObjectObjPresignKind, @@ -26,7 +27,11 @@ pub async fn presign_obj_url( match inst.kind_code() { #[cfg(feature = "spi-s3")] object_constants::SPI_S3_KIND_CODE => { - s3::object_s3_obj_serv::presign_obj_url(presign_kind, object_path, max_width, max_height, exp_secs, private, special, obj_exp, funs, ctx, &inst).await + s3::object_s3_obj_serv::S3Service::presign_obj_url(presign_kind, object_path, max_width, max_height, exp_secs, private, special, obj_exp, funs, ctx, &inst).await + } + #[cfg(feature = "spi-s3")] + object_constants::SPI_OBS_KIND_CODE => { + obs::object_obs_obj_serv::OBSService::presign_obj_url(presign_kind, object_path, max_width, max_height, exp_secs, private, special, obj_exp, funs, ctx, &inst).await } kind_code => Err(funs.bs_not_implemented(kind_code)), } @@ -44,7 +49,13 @@ pub async fn batch_get_presign_obj_url( let inst = funs.init(ctx, true, object_initializer::init_fun).await?; match inst.kind_code() { #[cfg(feature = "spi-s3")] - object_constants::SPI_S3_KIND_CODE => s3::object_s3_obj_serv::batch_get_presign_obj_url(object_paths, exp_secs, private, special, obj_exp, funs, ctx, &inst).await, + object_constants::SPI_S3_KIND_CODE => { + s3::object_s3_obj_serv::S3Service::batch_get_presign_obj_url(object_paths, exp_secs, private, special, obj_exp, funs, ctx, &inst).await + } + #[cfg(feature = "spi-s3")] + object_constants::SPI_OBS_KIND_CODE => { + obs::object_obs_obj_serv::OBSService::batch_get_presign_obj_url(object_paths, exp_secs, private, special, obj_exp, funs, ctx, &inst).await + } kind_code => Err(funs.bs_not_implemented(kind_code)), } } @@ -54,7 +65,11 @@ pub async fn initiate_multipart_upload(req: ObjectInitiateMultipartUploadReq, fu match inst.kind_code() { #[cfg(feature = "spi-s3")] object_constants::SPI_S3_KIND_CODE => { - s3::object_s3_obj_serv::initiate_multipart_upload(&req.object_path, req.content_type, req.private, req.special, funs, ctx, &inst).await + s3::object_s3_obj_serv::S3Service::initiate_multipart_upload(&req.object_path, req.content_type, req.private, req.special, funs, ctx, &inst).await + } + #[cfg(feature = "spi-s3")] + object_constants::SPI_OBS_KIND_CODE => { + obs::object_obs_obj_serv::OBSService::initiate_multipart_upload(&req.object_path, req.content_type, req.private, req.special, funs, ctx, &inst).await } kind_code => Err(funs.bs_not_implemented(kind_code)), } @@ -65,7 +80,22 @@ pub async fn batch_build_create_presign_url(req: ObjectBatchBuildCreatePresignUr match inst.kind_code() { #[cfg(feature = "spi-s3")] object_constants::SPI_S3_KIND_CODE => { - s3::object_s3_obj_serv::batch_build_create_presign_url( + s3::object_s3_obj_serv::S3Service::batch_build_create_presign_url( + &req.object_path, + &req.upload_id, + req.part_number, + req.expire_sec, + req.private, + req.special, + funs, + ctx, + &inst, + ) + .await + } + #[cfg(feature = "spi-s3")] + object_constants::SPI_OBS_KIND_CODE => { + obs::object_obs_obj_serv::OBSService::batch_build_create_presign_url( &req.object_path, &req.upload_id, req.part_number, @@ -87,7 +117,11 @@ pub async fn complete_multipart_upload(req: ObjectCompleteMultipartUploadReq, fu match inst.kind_code() { #[cfg(feature = "spi-s3")] object_constants::SPI_S3_KIND_CODE => { - s3::object_s3_obj_serv::complete_multipart_upload(&req.object_path, &req.upload_id, req.parts, req.private, req.special, funs, ctx, &inst).await + s3::object_s3_obj_serv::S3Service::complete_multipart_upload(&req.object_path, &req.upload_id, req.parts, req.private, req.special, funs, ctx, &inst).await + } + #[cfg(feature = "spi-s3")] + object_constants::SPI_OBS_KIND_CODE => { + obs::object_obs_obj_serv::OBSService::complete_multipart_upload(&req.object_path, &req.upload_id, req.parts, req.private, req.special, funs, ctx, &inst).await } kind_code => Err(funs.bs_not_implemented(kind_code)), } @@ -104,7 +138,9 @@ pub async fn object_delete( let inst = funs.init(ctx, true, object_initializer::init_fun).await?; match inst.kind_code() { #[cfg(feature = "spi-s3")] - object_constants::SPI_S3_KIND_CODE => s3::object_s3_obj_serv::object_delete(&object_path, private, special, obj_exp, funs, ctx, &inst).await, + object_constants::SPI_S3_KIND_CODE => s3::object_s3_obj_serv::S3Service::object_delete(&object_path, private, special, obj_exp, funs, ctx, &inst).await, + #[cfg(feature = "spi-s3")] + object_constants::SPI_OBS_KIND_CODE => obs::object_obs_obj_serv::OBSService::object_delete(&object_path, private, special, obj_exp, funs, ctx, &inst).await, kind_code => Err(funs.bs_not_implemented(kind_code)), } } @@ -120,7 +156,9 @@ pub async fn batch_object_delete( let inst = funs.init(ctx, true, object_initializer::init_fun).await?; match inst.kind_code() { #[cfg(feature = "spi-s3")] - object_constants::SPI_S3_KIND_CODE => s3::object_s3_obj_serv::batch_object_delete(object_paths, private, special, obj_exp, funs, ctx, &inst).await, + object_constants::SPI_S3_KIND_CODE => s3::object_s3_obj_serv::S3Service::batch_object_delete(object_paths, private, special, obj_exp, funs, ctx, &inst).await, + #[cfg(feature = "spi-s3")] + object_constants::SPI_OBS_KIND_CODE => obs::object_obs_obj_serv::OBSService::batch_object_delete(object_paths, private, special, obj_exp, funs, ctx, &inst).await, kind_code => Err(funs.bs_not_implemented(kind_code)), } } @@ -129,7 +167,9 @@ pub async fn object_copy(from: String, to: String, private: Option, specia let inst = funs.init(ctx, true, object_initializer::init_fun).await?; match inst.kind_code() { #[cfg(feature = "spi-s3")] - object_constants::SPI_S3_KIND_CODE => s3::object_s3_obj_serv::object_copy(&from, &to, private, special, funs, ctx, &inst).await, + object_constants::SPI_S3_KIND_CODE => s3::object_s3_obj_serv::S3Service::object_copy(&from, &to, private, special, funs, ctx, &inst).await, + #[cfg(feature = "spi-s3")] + object_constants::SPI_OBS_KIND_CODE => obs::object_obs_obj_serv::OBSService::object_copy(&from, &to, private, special, funs, ctx, &inst).await, kind_code => Err(funs.bs_not_implemented(kind_code)), } } @@ -145,7 +185,9 @@ pub async fn object_exist( let inst = funs.init(ctx, true, object_initializer::init_fun).await?; match inst.kind_code() { #[cfg(feature = "spi-s3")] - object_constants::SPI_S3_KIND_CODE => s3::object_s3_obj_serv::object_exist(&object_paths, private, special, obj_exp, funs, ctx, &inst).await, + object_constants::SPI_S3_KIND_CODE => s3::object_s3_obj_serv::S3Service::object_exist(&object_paths, private, special, obj_exp, funs, ctx, &inst).await, + #[cfg(feature = "spi-s3")] + object_constants::SPI_OBS_KIND_CODE => obs::object_obs_obj_serv::OBSService::object_exist(&object_paths, private, special, obj_exp, funs, ctx, &inst).await, kind_code => Err(funs.bs_not_implemented(kind_code)), } } diff --git a/backend/spi/spi-object/src/serv/obs.rs b/backend/spi/spi-object/src/serv/obs.rs new file mode 100644 index 000000000..013f47c52 --- /dev/null +++ b/backend/spi/spi-object/src/serv/obs.rs @@ -0,0 +1,2 @@ +pub mod object_obs_initializer; +pub mod object_obs_obj_serv; diff --git a/backend/spi/spi-object/src/serv/obs/object_obs_initializer.rs b/backend/spi/spi-object/src/serv/obs/object_obs_initializer.rs new file mode 100644 index 000000000..9fd232549 --- /dev/null +++ b/backend/spi/spi-object/src/serv/obs/object_obs_initializer.rs @@ -0,0 +1,8 @@ +use bios_basic::spi::{dto::spi_bs_dto::SpiBsCertResp, spi_funs::SpiBsInst}; +use tardis::basic::{dto::TardisContext, result::TardisResult}; + +use crate::serv::s3; + +pub async fn init(bs_cert: &SpiBsCertResp, ctx: &TardisContext, mgr: bool) -> TardisResult { + s3::object_s3_initializer::init(bs_cert, ctx, mgr).await +} diff --git a/backend/spi/spi-object/src/serv/obs/object_obs_obj_serv.rs b/backend/spi/spi-object/src/serv/obs/object_obs_obj_serv.rs new file mode 100644 index 000000000..a99a93c3e --- /dev/null +++ b/backend/spi/spi-object/src/serv/obs/object_obs_obj_serv.rs @@ -0,0 +1,16 @@ +use tardis::{ + basic::{result::TardisResult}, + os::{ + os_client::TardisOSClient, + }, +}; + +use crate::serv::s3::S3; + +/// OBS need manually configure lifecycle rules +pub(crate) struct OBSService; +impl S3 for OBSService { + async fn rebuild_path(_bucket_name: Option<&str>, origin_path: &str, _obj_exp: Option, _client: &TardisOSClient) -> TardisResult { + Ok(origin_path.to_string()) + } +} diff --git a/backend/spi/spi-object/src/serv/s3.rs b/backend/spi/spi-object/src/serv/s3.rs index 3db23160b..87b4aca0c 100644 --- a/backend/spi/spi-object/src/serv/s3.rs +++ b/backend/spi/spi-object/src/serv/s3.rs @@ -1,2 +1,217 @@ pub mod object_s3_initializer; pub mod object_s3_obj_serv; + +use std::collections::HashMap; + +use bios_basic::spi::{serv::spi_bs_serv::SpiBsServ, spi_funs::SpiBsInst, spi_initializer::common}; +use itertools::Itertools; +use tardis::{ + basic::{dto::TardisContext, error::TardisError, result::TardisResult}, + futures::future::join_all, + os::os_client::TardisOSClient, + TardisFunsInst, +}; + +use crate::dto::object_dto::ObjectObjPresignKind; + +pub trait S3 { + /// + /// obj_exp: 设置obj的过期时间 单位为天 + async fn presign_obj_url( + presign_kind: ObjectObjPresignKind, + object_path: &str, + _max_width: Option, + _max_height: Option, + exp_secs: u32, + private: Option, + special: Option, + obj_exp: Option, + funs: &TardisFunsInst, + ctx: &TardisContext, + inst: &SpiBsInst, + ) -> TardisResult { + let bs_inst = inst.inst::(); + let client = bs_inst.0; + let bucket_name = Self::get_bucket_name(private, special, obj_exp.map(|_| true), inst); + let path = Self::rebuild_path(bucket_name.as_deref(), object_path, obj_exp, client).await?; + match presign_kind { + ObjectObjPresignKind::Upload => client.object_create_url(&path, exp_secs, bucket_name.as_deref()).await, + ObjectObjPresignKind::Delete => client.object_delete_url(&path, exp_secs, bucket_name.as_deref()).await, + ObjectObjPresignKind::View => { + if private.unwrap_or(true) || special.unwrap_or(false) { + client.object_get_url(&path, exp_secs, bucket_name.as_deref()).await + } else { + let spi_bs = SpiBsServ::get_bs_by_rel(&ctx.ak, None, funs, ctx).await?; + let Some(bucket_name) = bucket_name else { + return Err(TardisError::internal_error( + "Cannot get public bucket name while presign object url, it may due to the lack of isolation_flag", + "500-spi-object-s3-cannot-get-bucket-name", + )); + }; + Ok(format!("{}/{}/{}", spi_bs.conn_uri, bucket_name, object_path)) + } + } + } + } + + async fn object_delete( + object_path: &str, + private: Option, + special: Option, + obj_exp: Option, + _funs: &TardisFunsInst, + _ctx: &TardisContext, + inst: &SpiBsInst, + ) -> TardisResult<()> { + let bs_inst = inst.inst::(); + let client = bs_inst.0; + let bucket_name = Self::get_bucket_name(private, special, obj_exp.map(|_| true), inst); + let path = Self::rebuild_path(bucket_name.as_deref(), object_path, obj_exp, client).await?; + client.object_delete(&path, bucket_name.as_deref()).await + } + + async fn batch_object_delete( + object_paths: Vec, + private: Option, + special: Option, + obj_exp: Option, + _funs: &TardisFunsInst, + _ctx: &TardisContext, + inst: &SpiBsInst, + ) -> TardisResult> { + let failed_object_paths = join_all( + object_paths + .into_iter() + .map(|object_path| async move { + let result = Self::object_delete(&object_path, private, special, obj_exp, _funs, _ctx, inst).await; + if result.is_err() { + object_path.to_string() + } else { + "".to_string() + } + }) + .collect_vec(), + ) + .await; + Ok(failed_object_paths.into_iter().filter(|object_path| !object_path.is_empty()).collect_vec()) + } + + async fn object_copy(from: &str, to: &str, private: Option, special: Option, _funs: &TardisFunsInst, _ctx: &TardisContext, inst: &SpiBsInst) -> TardisResult<()> { + let bs_inst = inst.inst::(); + let client = bs_inst.0; + let bucket_name = Self::get_bucket_name(private, special, None, inst); + client.object_copy(from, to, bucket_name.as_deref()).await + } + + async fn object_exist( + object_path: &str, + private: Option, + special: Option, + obj_exp: Option, + _funs: &TardisFunsInst, + _ctx: &TardisContext, + inst: &SpiBsInst, + ) -> TardisResult { + let bs_inst = inst.inst::(); + let client = bs_inst.0; + let bucket_name = Self::get_bucket_name(private, special, obj_exp.map(|_| true), inst); + let path = Self::rebuild_path(bucket_name.as_deref(), object_path, obj_exp, client).await?; + client.object_exist(&path, bucket_name.as_deref()).await + } + + async fn batch_get_presign_obj_url( + object_paths: Vec, + exp_secs: u32, + private: Option, + special: Option, + obj_exp: Option, + _funs: &TardisFunsInst, + _ctx: &TardisContext, + inst: &SpiBsInst, + ) -> TardisResult> { + let result = join_all( + object_paths + .into_iter() + .map(|object_path| async move { + let result = Self::presign_obj_url(ObjectObjPresignKind::View, &object_path, None, None, exp_secs, private, special, obj_exp, _funs, _ctx, inst).await; + if let Ok(presign_obj_url) = result { + (object_path.to_string(), presign_obj_url) + } else { + ("".to_string(), "".to_string()) + } + }) + .collect_vec(), + ) + .await; + Ok(result.into_iter().filter(|(object_path, _)| !object_path.is_empty()).collect::>()) + } + + async fn initiate_multipart_upload( + object_path: &str, + content_type: Option, + private: Option, + special: Option, + _funs: &TardisFunsInst, + _ctx: &TardisContext, + inst: &SpiBsInst, + ) -> TardisResult { + let bs_inst = inst.inst::(); + let client = bs_inst.0; + let bucket_name = Self::get_bucket_name(private, special, None, inst); + client.initiate_multipart_upload(object_path, content_type.as_deref(), bucket_name.as_deref()).await + } + + async fn batch_build_create_presign_url( + object_path: &str, + upload_id: &str, + part_number: u32, + expire_sec: u32, + private: Option, + special: Option, + _funs: &TardisFunsInst, + _ctx: &TardisContext, + inst: &SpiBsInst, + ) -> TardisResult> { + let bs_inst = inst.inst::(); + let client = bs_inst.0; + let bucket_name = Self::get_bucket_name(private, special, None, inst); + client.batch_build_create_presign_url(object_path, upload_id, part_number, expire_sec, bucket_name.as_deref()).await + } + + async fn complete_multipart_upload( + object_path: &str, + upload_id: &str, + parts: Vec, + private: Option, + special: Option, + _funs: &TardisFunsInst, + _ctx: &TardisContext, + inst: &SpiBsInst, + ) -> TardisResult<()> { + let bs_inst = inst.inst::(); + let client = bs_inst.0; + let bucket_name = Self::get_bucket_name(private, special, None, inst); + client.complete_multipart_upload(object_path, upload_id, parts, bucket_name.as_deref()).await + } + + fn get_bucket_name(private: Option, special: Option, tamp: Option, inst: &SpiBsInst) -> Option { + let bs_inst = inst.inst::(); + common::get_isolation_flag_from_ext(bs_inst.1).map(|bucket_name_prefix| { + format!( + "{}-{}", + bucket_name_prefix, + if special.unwrap_or(false) { + "spe" + } else if tamp.unwrap_or(false) { + "tamp" + } else if private.unwrap_or(true) { + "pri" + } else { + "pub" + } + ) + }) + } + + async fn rebuild_path(bucket_name: Option<&str>, origin_path: &str, obj_exp: Option, client: &TardisOSClient) -> TardisResult; +} diff --git a/backend/spi/spi-object/src/serv/s3/object_s3_obj_serv.rs b/backend/spi/spi-object/src/serv/s3/object_s3_obj_serv.rs index 1c440c235..0f942c900 100644 --- a/backend/spi/spi-object/src/serv/s3/object_s3_obj_serv.rs +++ b/backend/spi/spi-object/src/serv/s3/object_s3_obj_serv.rs @@ -1,230 +1,46 @@ -use std::collections::HashMap; - -use bios_basic::spi::{serv::spi_bs_serv::SpiBsServ, spi_funs::SpiBsInst, spi_initializer::common}; -use itertools::Itertools; use tardis::{ - basic::{dto::TardisContext, error::TardisError, result::TardisResult}, - futures::future::join_all, + basic::{error::TardisError, result::TardisResult}, os::{ os_client::TardisOSClient, serde_types::{BucketLifecycleConfiguration, Expiration, LifecycleFilter, LifecycleRule}, }, - TardisFunsInst, }; -use crate::dto::object_dto::ObjectObjPresignKind; -/// -/// obj_exp: 设置obj的过期时间 单位为天 -pub async fn presign_obj_url( - presign_kind: ObjectObjPresignKind, - object_path: &str, - _max_width: Option, - _max_height: Option, - exp_secs: u32, - private: Option, - special: Option, - obj_exp: Option, - funs: &TardisFunsInst, - ctx: &TardisContext, - inst: &SpiBsInst, -) -> TardisResult { - let bs_inst = inst.inst::(); - let client = bs_inst.0; - let bucket_name = get_bucket_name(private, special, obj_exp.map(|_| true), inst); - match presign_kind { - ObjectObjPresignKind::Upload => { - client.object_create_url(&rebuild_path(bucket_name.as_deref(), object_path, obj_exp, client).await?, exp_secs, bucket_name.as_deref()).await - } - ObjectObjPresignKind::Delete => { - client.object_delete_url(&rebuild_path(bucket_name.as_deref(), object_path, obj_exp, client).await?, exp_secs, bucket_name.as_deref()).await - } - ObjectObjPresignKind::View => { - if private.unwrap_or(true) || special.unwrap_or(false) { - client.object_get_url(&rebuild_path(bucket_name.as_deref(), object_path, obj_exp, client).await?, exp_secs, bucket_name.as_deref()).await - } else { - let spi_bs = SpiBsServ::get_bs_by_rel(&ctx.ak, None, funs, ctx).await?; - let Some(bucket_name) = bucket_name else { - return Err(TardisError::internal_error( - "Cannot get public bucket name while presign object url, it may due to the lack of isolation_flag", - "500-spi-object-s3-cannot-get-bucket-name", - )); - }; - Ok(format!("{}/{}/{}", spi_bs.conn_uri, bucket_name, object_path)) - } - } - } -} - -pub async fn object_delete( - object_path: &str, - private: Option, - special: Option, - obj_exp: Option, - _funs: &TardisFunsInst, - _ctx: &TardisContext, - inst: &SpiBsInst, -) -> TardisResult<()> { - let bs_inst = inst.inst::(); - let client = bs_inst.0; - let bucket_name = get_bucket_name(private, special, obj_exp.map(|_| true), inst); - client.object_delete(object_path, bucket_name.as_deref()).await -} - -pub async fn batch_object_delete( - object_paths: Vec, - private: Option, - special: Option, - obj_exp: Option, - _funs: &TardisFunsInst, - _ctx: &TardisContext, - inst: &SpiBsInst, -) -> TardisResult> { - let failed_object_paths = join_all( - object_paths - .into_iter() - .map(|object_path| async move { - let result = object_delete(&object_path, private, special, obj_exp, _funs, _ctx, inst).await; - if result.is_err() { - object_path.to_string() - } else { - "".to_string() - } - }) - .collect_vec(), - ) - .await; - Ok(failed_object_paths.into_iter().filter(|object_path| !object_path.is_empty()).collect_vec()) -} - -pub async fn object_copy(from: &str, to: &str, private: Option, special: Option, _funs: &TardisFunsInst, _ctx: &TardisContext, inst: &SpiBsInst) -> TardisResult<()> { - let bs_inst = inst.inst::(); - let client = bs_inst.0; - let bucket_name = get_bucket_name(private, special, None, inst); - client.object_copy(from, to, bucket_name.as_deref()).await -} - -pub async fn object_exist( - object_path: &str, - private: Option, - special: Option, - obj_exp: Option, - _funs: &TardisFunsInst, - _ctx: &TardisContext, - inst: &SpiBsInst, -) -> TardisResult { - let bs_inst = inst.inst::(); - let client = bs_inst.0; - let bucket_name = get_bucket_name(private, special, obj_exp.map(|_| true), inst); - client.object_exist(object_path, bucket_name.as_deref()).await -} - -pub async fn batch_get_presign_obj_url( - object_paths: Vec, - exp_secs: u32, - private: Option, - special: Option, - obj_exp: Option, - _funs: &TardisFunsInst, - _ctx: &TardisContext, - inst: &SpiBsInst, -) -> TardisResult> { - let result = join_all( - object_paths - .into_iter() - .map(|object_path| async move { - let result = presign_obj_url(ObjectObjPresignKind::View, &object_path, None, None, exp_secs, private, special, obj_exp, _funs, _ctx, inst).await; - if let Ok(presign_obj_url) = result { - (object_path.to_string(), presign_obj_url) - } else { - ("".to_string(), "".to_string()) +pub(crate) struct S3Service; +impl super::S3 for S3Service { + async fn rebuild_path(bucket_name: Option<&str>, origin_path: &str, obj_exp: Option, client: &TardisOSClient) -> TardisResult { + if let Some(obj_exp) = obj_exp { + let resp = client.get_lifecycle(bucket_name).await; + match resp { + Ok(config) => { + let mut rules = config.rules; + let prefix = if let Some(is_have_prefix) = rules + .iter() + .filter(|r| r.status == *"Enabled" && r.expiration.clone().is_some_and(|exp| exp.days.is_some_and(|days| days == obj_exp))) + .filter_map(|r| r.filter.clone()) + .find_map(|f| f.prefix) + { + is_have_prefix + } else { + let rand_id = tardis::rand::random::().to_string(); + let prefix = format!("{}/", rand_id); + //add rule + let add_rule = LifecycleRule::builder("Enabled") + .id(&rand_id) + .expiration(Expiration::new(None, Some(obj_exp), None)) + .filter(LifecycleFilter::new(None, None, None, Some(prefix.clone()), None)) + .build(); + rules.push(add_rule); + client.put_lifecycle(bucket_name, BucketLifecycleConfiguration::new(rules)).await?; + prefix + }; + Ok(format!("{}{}", prefix, origin_path)) } - }) - .collect_vec(), - ) - .await; - Ok(result.into_iter().filter(|(object_path, _)| !object_path.is_empty()).collect::>()) -} - -pub async fn initiate_multipart_upload( - object_path: &str, - content_type: Option, - private: Option, - special: Option, - _funs: &TardisFunsInst, - _ctx: &TardisContext, - inst: &SpiBsInst, -) -> TardisResult { - let bs_inst = inst.inst::(); - let client = bs_inst.0; - let bucket_name = get_bucket_name(private, special, None, inst); - client.initiate_multipart_upload(object_path, content_type.as_deref(), bucket_name.as_deref()).await -} - -pub async fn batch_build_create_presign_url( - object_path: &str, - upload_id: &str, - part_number: u32, - expire_sec: u32, - private: Option, - special: Option, - _funs: &TardisFunsInst, - _ctx: &TardisContext, - inst: &SpiBsInst, -) -> TardisResult> { - let bs_inst = inst.inst::(); - let client = bs_inst.0; - let bucket_name = get_bucket_name(private, special, None, inst); - client.batch_build_create_presign_url(object_path, upload_id, part_number, expire_sec, bucket_name.as_deref()).await -} - -pub async fn complete_multipart_upload( - object_path: &str, - upload_id: &str, - parts: Vec, - private: Option, - special: Option, - _funs: &TardisFunsInst, - _ctx: &TardisContext, - inst: &SpiBsInst, -) -> TardisResult<()> { - let bs_inst = inst.inst::(); - let client = bs_inst.0; - let bucket_name = get_bucket_name(private, special, None, inst); - client.complete_multipart_upload(object_path, upload_id, parts, bucket_name.as_deref()).await -} - -fn get_bucket_name(private: Option, special: Option, tamp: Option, inst: &SpiBsInst) -> Option { - let bs_inst = inst.inst::(); - common::get_isolation_flag_from_ext(bs_inst.1).map(|bucket_name_prefix| { - format!( - "{}-{}", - bucket_name_prefix, - if special.unwrap_or(false) { - "spe" - } else if tamp.unwrap_or(false) { - "tamp" - } else if private.unwrap_or(true) { - "pri" - } else { - "pub" - } - ) - }) -} -async fn rebuild_path(bucket_name: Option<&str>, origin_path: &str, obj_exp: Option, client: &TardisOSClient) -> TardisResult { - if let Some(obj_exp) = obj_exp { - let resp = client.get_lifecycle(bucket_name).await; - match resp { - Ok(config) => { - let mut rules = config.rules; - let prefix = if let Some(is_have_prefix) = rules - .iter() - .filter(|r| r.status == *"Enabled" && r.expiration.clone().is_some_and(|exp| exp.days.is_some_and(|days| days == obj_exp))) - .filter_map(|r| r.filter.clone()) - .find_map(|f| f.prefix) - { - is_have_prefix - } else { + Err(e) => { + if e.code != "404" { + return Err(TardisError::internal_error(&format!("Bucket {:?} get lifecycle failed", bucket_name), &format!("{:?}", e))); + } + let mut rules = vec![]; let rand_id = tardis::rand::random::().to_string(); let prefix = format!("{}/", rand_id); //add rule @@ -235,29 +51,11 @@ async fn rebuild_path(bucket_name: Option<&str>, origin_path: &str, obj_exp: Opt .build(); rules.push(add_rule); client.put_lifecycle(bucket_name, BucketLifecycleConfiguration::new(rules)).await?; - prefix - }; - Ok(format!("{}{}", prefix, origin_path)) - } - Err(e) => { - if e.code != "404" { - return Err(TardisError::internal_error(&format!("Bucket {:?} get lifecycle failed", bucket_name), &format!("{:?}", e))); + Ok(format!("{}{}", prefix, origin_path)) } - let mut rules = vec![]; - let rand_id = tardis::rand::random::().to_string(); - let prefix = format!("{}/", rand_id); - //add rule - let add_rule = LifecycleRule::builder("Enabled") - .id(&rand_id) - .expiration(Expiration::new(None, Some(obj_exp), None)) - .filter(LifecycleFilter::new(None, None, None, Some(prefix.clone()), None)) - .build(); - rules.push(add_rule); - client.put_lifecycle(bucket_name, BucketLifecycleConfiguration::new(rules)).await?; - Ok(format!("{}{}", prefix, origin_path)) } + } else { + Ok(origin_path.to_string()) } - } else { - Ok(origin_path.to_string()) } } diff --git a/backend/spi/spi-stats/tests/test_stats_metric.rs b/backend/spi/spi-stats/tests/test_stats_metric.rs index 3a8d6abd0..9d6f13948 100644 --- a/backend/spi/spi-stats/tests/test_stats_metric.rs +++ b/backend/spi/spi-stats/tests/test_stats_metric.rs @@ -262,7 +262,7 @@ pub async fn test_metric_query(client: &mut TestHttpClient) -> TardisResult<()> assert_eq!(resp.group.as_object().unwrap()["ROLLUP"]["plan_hours__sum"], "200"); assert_eq!(resp.group.as_object().unwrap()["hangzhou"]["key__count"], 8); assert_eq!(resp.group.as_object().unwrap()["hangzhou"]["plan_hours__sum"], "160"); - + // todo The following test case is problematic // test simple two dimensions let resp: StatsQueryMetricsResp = client diff --git a/backend/supports/iam/src/basic/serv/iam_role_serv.rs b/backend/supports/iam/src/basic/serv/iam_role_serv.rs index af983ad69..adc014334 100644 --- a/backend/supports/iam/src/basic/serv/iam_role_serv.rs +++ b/backend/supports/iam/src/basic/serv/iam_role_serv.rs @@ -468,7 +468,7 @@ impl IamRoleServ { let kind = if scope_level == RBUM_SCOPE_LEVEL_APP { IamRoleKind::App } else { IamRoleKind::Tenant }; if let Some(base_role) = Self::find_one_item( &IamRoleFilterReq { - kind: Some(kind), + kind: Some(kind), // in_embed: Some(true), extend_role_id: Some(extend_role_id.to_string()), ..Default::default()