diff --git a/src/naming/message/request/mod.rs b/src/naming/message/request/mod.rs index 43f93fe..9e7ece4 100644 --- a/src/naming/message/request/mod.rs +++ b/src/naming/message/request/mod.rs @@ -1,6 +1,7 @@ mod batch_instance_request; mod instance_request; mod notify_subscriber_request; +mod persistent_instance_request; mod service_list_request; mod service_query_request; mod subscribe_service_request; @@ -8,6 +9,7 @@ mod subscribe_service_request; pub(crate) use batch_instance_request::*; pub(crate) use instance_request::*; pub(crate) use notify_subscriber_request::*; +pub(crate) use persistent_instance_request::*; pub(crate) use service_list_request::*; pub(crate) use service_query_request::*; pub(crate) use subscribe_service_request::*; diff --git a/src/naming/message/request/persistent_instance_request.rs b/src/naming/message/request/persistent_instance_request.rs new file mode 100644 index 0000000..2291762 --- /dev/null +++ b/src/naming/message/request/persistent_instance_request.rs @@ -0,0 +1,85 @@ +use nacos_macro::request; + +use crate::{api::naming::ServiceInstance, common::remote::generate_request_id}; + +#[request(identity = "PersistentInstanceRequest", module = "naming")] +pub(crate) struct PersistentInstanceRequest { + #[serde(rename = "type")] + pub(crate) r_type: String, + pub(crate) instance: ServiceInstance, +} + +impl PersistentInstanceRequest { + pub(crate) fn register( + instance: ServiceInstance, + service_name: Option, + namespace: Option, + group_name: Option, + ) -> Self { + PersistentInstanceRequest::new( + RequestType::Register, + instance, + service_name, + namespace, + group_name, + ) + } + + pub(crate) fn deregister( + instance: ServiceInstance, + service_name: Option, + namespace: Option, + group_name: Option, + ) -> Self { + PersistentInstanceRequest::new( + RequestType::Deregister, + instance, + service_name, + namespace, + group_name, + ) + } + + fn new( + request_type: RequestType, + instance: ServiceInstance, + service_name: Option, + namespace: Option, + group_name: Option, + ) -> Self { + let request_id = Some(generate_request_id()); + Self { + r_type: request_type.request_code(), + instance, + request_id, + namespace, + service_name, + group_name, + ..Default::default() + } + } +} + +pub(crate) enum RequestType { + Register, + Deregister, +} + +impl RequestType { + pub(crate) fn request_code(&self) -> String { + match self { + RequestType::Register => "registerInstance".to_string(), + RequestType::Deregister => "deregisterInstance".to_string(), + } + } +} + +impl From<&str> for RequestType { + fn from(code: &str) -> Self { + match code { + "registerInstance" => RequestType::Register, + "deregisterInstance" => RequestType::Deregister, + _ => RequestType::Register, + } + } +} diff --git a/src/naming/mod.rs b/src/naming/mod.rs index e448882..9f784d1 100644 --- a/src/naming/mod.rs +++ b/src/naming/mod.rs @@ -161,12 +161,15 @@ impl NacosNamingService { } impl NacosNamingService { - async fn register_instance_async( + async fn register_ephemeral_instance_async( &self, service_name: String, group_name: Option, service_instance: ServiceInstance, ) -> Result<()> { + info!( + "register ephemeral instance: service_name: {service_name}, group_name: {group_name:?}" + ); let namespace = Some(self.namespace.clone()); let group_name = group_name .filter(|data| !data.is_empty()) @@ -195,7 +198,7 @@ impl NacosNamingService { .await?; if !body.is_success() { return Err(ErrResult(format!( - "naming service register service failed: resultCode: {}, errorCode:{}, message:{}", + "naming service register ephemeral service failed: resultCode: {}, errorCode:{}, message:{}", body.result_code, body.error_code, body.message.unwrap_or_default() @@ -206,12 +209,60 @@ impl NacosNamingService { Ok(()) } - async fn deregister_instance_async( + async fn register_persistent_instance_async( &self, service_name: String, group_name: Option, service_instance: ServiceInstance, ) -> Result<()> { + info!("register persistent instance: service_name: {service_name}, group_name: {group_name:?}"); + let namespace = Some(self.namespace.clone()); + let group_name = group_name + .filter(|data| !data.is_empty()) + .unwrap_or_else(|| crate::api::constants::DEFAULT_GROUP.to_owned()); + let request = PersistentInstanceRequest::register( + service_instance, + Some(service_name), + namespace, + Some(group_name), + ); + + // automatic request + let auto_request: Arc = Arc::new(request.clone()); + let redo_task = Arc::new(NamingRedoTask::new( + self.nacos_grpc_client.clone(), + auto_request, + )); + + // active redo task + redo_task.active(); + // add redo task to executor + self.redo_task_executor.add_task(redo_task.clone()).await; + + let body = self + .request_to_server::(request) + .await?; + if !body.is_success() { + return Err(ErrResult(format!( + "naming service register persistent service failed: resultCode: {}, errorCode:{}, message:{}", + body.result_code, + body.error_code, + body.message.unwrap_or_default() + ))); + } + + redo_task.frozen(); + Ok(()) + } + + async fn deregister_ephemeral_instance_async( + &self, + service_name: String, + group_name: Option, + service_instance: ServiceInstance, + ) -> Result<()> { + info!("deregister ephemeral instance: service_name: {service_name}, group_name: {group_name:?}"); + let namespace = Some(self.namespace.clone()); let group_name = group_name .filter(|data| !data.is_empty()) @@ -232,7 +283,44 @@ impl NacosNamingService { .await?; if !body.is_success() { - return Err(ErrResult(format!("naming service deregister service failed: resultCode: {}, errorCode:{}, message:{}", body.result_code, body.error_code, body.message.unwrap_or_default()))); + return Err(ErrResult(format!("naming service deregister ephemeral service failed: resultCode: {}, errorCode:{}, message:{}", body.result_code, body.error_code, body.message.unwrap_or_default()))); + } + + // remove redo task from executor + self.redo_task_executor + .remove_task(redo_task.task_key().as_str()) + .await; + Ok(()) + } + + async fn deregister_persistent_instance_async( + &self, + service_name: String, + group_name: Option, + service_instance: ServiceInstance, + ) -> Result<()> { + info!("deregister persistent instance: service_name: {service_name}, group_name: {group_name:?}"); + let namespace = Some(self.namespace.clone()); + let group_name = group_name + .filter(|data| !data.is_empty()) + .unwrap_or_else(|| crate::api::constants::DEFAULT_GROUP.to_owned()); + let request = PersistentInstanceRequest::deregister( + service_instance, + Some(service_name), + namespace, + Some(group_name), + ); + + // automatic request + let auto_request: Arc = Arc::new(request.clone()); + let redo_task = NamingRedoTask::new(self.nacos_grpc_client.clone(), auto_request); + + let body = self + .request_to_server::(request) + .await?; + + if !body.is_success() { + return Err(ErrResult(format!("naming service deregister persistent service failed: resultCode: {}, errorCode:{}, message:{}", body.result_code, body.error_code, body.message.unwrap_or_default()))); } // remove redo task from executor @@ -563,8 +651,21 @@ impl NamingService for NacosNamingService { group_name: Option, service_instance: ServiceInstance, ) -> Result<()> { - let future = self.deregister_instance_async(service_name, group_name, service_instance); - futures::executor::block_on(future) + if service_instance.ephemeral { + let future = self.deregister_ephemeral_instance_async( + service_name, + group_name, + service_instance, + ); + futures::executor::block_on(future) + } else { + let future = self.deregister_persistent_instance_async( + service_name, + group_name, + service_instance, + ); + futures::executor::block_on(future) + } } #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] @@ -648,8 +749,15 @@ impl NamingService for NacosNamingService { group_name: Option, service_instance: ServiceInstance, ) -> Result<()> { - let future = self.register_instance_async(service_name, group_name, service_instance); - futures::executor::block_on(future) + if service_instance.ephemeral { + let future = + self.register_ephemeral_instance_async(service_name, group_name, service_instance); + futures::executor::block_on(future) + } else { + let future = + self.register_persistent_instance_async(service_name, group_name, service_instance); + futures::executor::block_on(future) + } } #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] @@ -677,8 +785,13 @@ impl NamingService for NacosNamingService { group_name: Option, service_instance: ServiceInstance, ) -> Result<()> { - self.deregister_instance_async(service_name, group_name, service_instance) - .await + if service_instance.ephemeral { + self.deregister_ephemeral_instance_async(service_name, group_name, service_instance) + .await + } else { + self.deregister_persistent_instance_async(service_name, group_name, service_instance) + .await + } } #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] @@ -760,8 +873,13 @@ impl NamingService for NacosNamingService { group_name: Option, service_instance: ServiceInstance, ) -> Result<()> { - self.register_instance_async(service_name, group_name, service_instance) - .await + if service_instance.ephemeral { + self.register_ephemeral_instance_async(service_name, group_name, service_instance) + .await + } else { + self.register_persistent_instance_async(service_name, group_name, service_instance) + .await + } } #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] @@ -792,7 +910,7 @@ pub(crate) mod tests { #[test] #[ignore] - fn test_register_service() -> Result<()> { + fn test_ephemeral_register_service() -> Result<()> { tracing_subscriber::fmt() .with_thread_names(true) .with_file(true) @@ -816,8 +934,11 @@ pub(crate) mod tests { ..Default::default() }; - let ret = - naming_service.register_instance("test-service".to_string(), None, service_instance); + let ret = naming_service.register_instance( + "test-ephemeral-service".to_string(), + None, + service_instance, + ); info!("response. {ret:?}"); let ten_millis = time::Duration::from_secs(100); @@ -827,7 +948,95 @@ pub(crate) mod tests { #[test] #[ignore] - fn test_register_and_deregister_service() -> Result<()> { + fn test_persistent_register_service() -> Result<()> { + tracing_subscriber::fmt() + .with_thread_names(true) + .with_file(true) + .with_level(true) + .with_line_number(true) + .with_thread_ids(true) + .with_max_level(LevelFilter::DEBUG) + .init(); + + let props = ClientProps::new().server_addr("127.0.0.1:8848"); + + let mut metadata = HashMap::::new(); + metadata.insert("netType".to_string(), "external".to_string()); + metadata.insert("version".to_string(), "2.0".to_string()); + + let naming_service = NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default()))?; + let service_instance = ServiceInstance { + ip: "127.0.0.1".to_string(), + port: 8848, + ephemeral: false, + metadata, + ..Default::default() + }; + + let ret = naming_service.register_instance( + "test-persistent-service".to_string(), + None, + service_instance, + ); + info!("response. {ret:?}"); + + let ten_millis = time::Duration::from_secs(100); + thread::sleep(ten_millis); + Ok(()) + } + + #[test] + #[ignore] + fn test_register_and_deregister_persistent_service() -> Result<()> { + tracing_subscriber::fmt() + .with_thread_names(true) + .with_file(true) + .with_level(true) + .with_line_number(true) + .with_thread_ids(true) + .with_max_level(LevelFilter::DEBUG) + .init(); + + let props = ClientProps::new().server_addr("127.0.0.1:8848"); + + let mut metadata = HashMap::::new(); + metadata.insert("netType".to_string(), "external".to_string()); + metadata.insert("version".to_string(), "2.0".to_string()); + + let naming_service = NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default()))?; + let service_instance = ServiceInstance { + ip: "127.0.0.1".to_string(), + port: 8848, + metadata, + ephemeral: false, + ..Default::default() + }; + + let ret = naming_service.register_instance( + "test-persistent-service".to_string(), + None, + service_instance.clone(), + ); + info!("response. {ret:?}"); + + let ten_millis = time::Duration::from_secs(30); + thread::sleep(ten_millis); + + let ret = naming_service.deregister_instance( + "test-persistent-service".to_string(), + Some(crate::api::constants::DEFAULT_GROUP.to_string()), + service_instance, + ); + info!("response. {ret:?}"); + + let ten_millis = time::Duration::from_secs(30); + thread::sleep(ten_millis); + Ok(()) + } + + #[test] + #[ignore] + fn test_register_and_deregister_ephemeral_service() -> Result<()> { tracing_subscriber::fmt() .with_thread_names(true) .with_file(true) @@ -852,7 +1061,7 @@ pub(crate) mod tests { }; let ret = naming_service.register_instance( - "test-service".to_string(), + "test-ephemeral-service".to_string(), None, service_instance.clone(), ); @@ -862,7 +1071,7 @@ pub(crate) mod tests { thread::sleep(ten_millis); let ret = naming_service.deregister_instance( - "test-service".to_string(), + "test-ephemeral-service".to_string(), Some(crate::api::constants::DEFAULT_GROUP.to_string()), service_instance, ); diff --git a/src/naming/redo/automatic_request/mod.rs b/src/naming/redo/automatic_request/mod.rs index 272fc9a..ce9d3f6 100644 --- a/src/naming/redo/automatic_request/mod.rs +++ b/src/naming/redo/automatic_request/mod.rs @@ -1,3 +1,4 @@ pub(crate) mod batch_instance_request; pub(crate) mod instance_request; +pub(crate) mod persistent_instance_request; pub(crate) mod subscribe_service_request; diff --git a/src/naming/redo/automatic_request/persistent_instance_request.rs b/src/naming/redo/automatic_request/persistent_instance_request.rs new file mode 100644 index 0000000..e3486cc --- /dev/null +++ b/src/naming/redo/automatic_request/persistent_instance_request.rs @@ -0,0 +1,49 @@ +use std::sync::Arc; +use tonic::async_trait; +use tracing::debug; +use tracing::error; +use tracing::instrument; +use tracing::Instrument; + +use crate::common::remote::generate_request_id; +use crate::common::remote::grpc::message::GrpcMessageData; +use crate::common::remote::grpc::NacosGrpcClient; +use crate::naming::message::request::PersistentInstanceRequest; +use crate::naming::message::response::InstanceResponse; +use crate::naming::redo::AutomaticRequest; +use crate::naming::redo::CallBack; + +#[async_trait] +impl AutomaticRequest for PersistentInstanceRequest { + #[instrument(skip_all)] + async fn run(&self, grpc_client: Arc, call_back: CallBack) { + let mut request = self.clone(); + request.request_id = Some(generate_request_id()); + debug!("automatically execute persistent instance request. {request:?}"); + let ret = grpc_client + .send_request::(request) + .in_current_span() + .await; + if let Err(e) = ret { + error!("automatically execute persistent instance request occur an error. {e:?}"); + call_back(Err(e)); + } else { + call_back(Ok(())); + } + } + + fn name(&self) -> String { + let namespace = self.namespace.as_deref().unwrap_or(""); + let service_name = self.service_name.as_deref().unwrap_or(""); + let group_name = self.group_name.as_deref().unwrap_or(""); + + let request_name = format!( + "{}@@{}@@{}@@{}", + namespace, + group_name, + service_name, + PersistentInstanceRequest::identity() + ); + request_name + } +}