From 824602376b5cb1af1cc13ee93768c78246e49997 Mon Sep 17 00:00:00 2001 From: Adriano Santos Date: Thu, 21 Sep 2023 11:30:31 -0300 Subject: [PATCH] Refactor. Register feature --- spawn-rs/src/lib.rs | 193 +++++++++++++++++++++++++++++--------------- 1 file changed, 126 insertions(+), 67 deletions(-) diff --git a/spawn-rs/src/lib.rs b/spawn-rs/src/lib.rs index c63ec48..f8d5f3d 100644 --- a/spawn-rs/src/lib.rs +++ b/spawn-rs/src/lib.rs @@ -13,7 +13,7 @@ pub mod value; use std::collections::HashMap; -use actor::ActorDefinition; +use actor::{ActorDefinition, ActorSettings}; use eigr::spawn::{ actor_deactivation_strategy::Strategy, Action, Actor, ActorDeactivationStrategy, ActorId, ActorSnapshotStrategy, ActorState, ActorSystem, Metadata, RegistrationRequest, Registry, @@ -160,51 +160,40 @@ impl SpawnClient { definitions: Vec, ) -> Result { debug!("Make registration request to Spawn proxy"); - let actors: Vec = definitions - .iter() - .map(|actor_def| { - let mut settings = actor_def.to_owned().get_settings().to_owned(); - - let mut id = ActorId::default(); - id.system = system.to_string(); - id.name = settings.get_name(); - - let mut deactivate_timeout_strategy = TimeoutStrategy::default(); - deactivate_timeout_strategy.timeout = settings.get_deactivated_timeout(); + let actors: Vec = self.build_actors(system.to_string(), definitions); + let request: RegistrationRequest = self.build_registration_request(system, actors); - let mut deactivate = ActorDeactivationStrategy::default(); - deactivate.strategy = Some(Strategy::Timeout(deactivate_timeout_strategy)); + let mut request_buffer: Vec = Vec::new(); + prost::Message::encode(&request, &mut request_buffer).unwrap(); - let mut snapshot_timeout_strategy = TimeoutStrategy::default(); - snapshot_timeout_strategy.timeout = settings.get_snapshot_timeout(); + let res = self + .client + .post(format!( + "http://{}:{}/api/v1/system", + self.proxy_host, self.proxy_port, + )) + .header("Content-Type", "application/octet-stream") + .body(request_buffer) + .send() + .await?; - let mut snapshot = ActorSnapshotStrategy::default(); - snapshot.strategy = Some(eigr::spawn::actor_snapshot_strategy::Strategy::Timeout( - snapshot_timeout_strategy, - )); + debug!("Rust SDK registration response status {:?}", res.status()); + info!("Actors register response {:?}", res); - let mut actor_settings = eigr::spawn::ActorSettings::default(); - actor_settings.deactivation_strategy = Some(deactivate); - actor_settings.snapshot_strategy = Some(snapshot); - actor_settings.stateful = settings.get_stateful(); + Ok(res) + } - match settings.get_kind() { - actor::Kind::NAMED => actor_settings.kind = eigr::spawn::Kind::Named.into(), - actor::Kind::UNNAMED => actor_settings.kind = eigr::spawn::Kind::Unamed.into(), - actor::Kind::POOLED => actor_settings.kind = eigr::spawn::Kind::Pooled.into(), - actor::Kind::PROXY => actor_settings.kind = eigr::spawn::Kind::Proxy.into(), - }; + fn build_actors(&mut self, system: String, definitions: Vec) -> Vec { + let actors: Vec = definitions + .iter() + .map(|actor_def| { + let mut settings = actor_def.to_owned().get_settings().to_owned(); let mut ac: Actor = Actor::default(); - ac.id = Some(id); - ac.state = Some(ActorState::default()); - ac.settings = Some(actor_settings); - - // TODO: use correct metadata - let mut metadata = Metadata::default(); - metadata.channel_group = settings.get_channel(); - metadata.tags = HashMap::new(); - ac.metadata = Some(metadata); + ac.id = self.build_actor_id(system.to_string(), &mut settings); + ac.state = self.build_initial_actor_state(&mut settings); + ac.settings = self.build_actor_settings(&mut settings); + ac.metadata = self.build_actor_metadata(&mut settings); let actions = actor_def .clone() @@ -223,19 +212,110 @@ impl SpawnClient { }) .collect::>(); + actors + } + + fn build_registration_request( + &mut self, + system: String, + actors: Vec, + ) -> RegistrationRequest { + let mut request: RegistrationRequest = RegistrationRequest::default(); + request.service_info = self.build_service_info(); + request.actor_system = self.build_actor_system(system, actors); + + request + } + + fn build_actor_system(&mut self, system: String, actors: Vec) -> Option { + let mut actor_system = ActorSystem::default(); + actor_system.name = system; + actor_system.registry = self.build_registry(actors); + + Some(actor_system) + } + + fn build_actor_settings( + &mut self, + settings: &mut ActorSettings, + ) -> Option { + let mut definition_settings = settings.clone(); + let mut actor_settings = eigr::spawn::ActorSettings::default(); + + actor_settings.deactivation_strategy = self.build_deactivate_timeout(settings); + + actor_settings.snapshot_strategy = self.build_snapshot_timeout(settings); + actor_settings.stateful = definition_settings.get_stateful(); + + match definition_settings.get_kind() { + actor::Kind::NAMED => actor_settings.kind = eigr::spawn::Kind::Named.into(), + actor::Kind::UNNAMED => actor_settings.kind = eigr::spawn::Kind::Unamed.into(), + actor::Kind::POOLED => actor_settings.kind = eigr::spawn::Kind::Pooled.into(), + actor::Kind::PROXY => actor_settings.kind = eigr::spawn::Kind::Proxy.into(), + }; + + Some(actor_settings) + } + + fn build_actor_id(&mut self, system: String, settings: &mut ActorSettings) -> Option { + let mut id = ActorId::default(); + id.system = system.to_string(); + id.name = settings.get_name(); + + Some(id) + } + + fn build_initial_actor_state(&mut self, _settings: &mut ActorSettings) -> Option { + Some(ActorState::default()) + } + + fn build_actor_metadata(&mut self, settings: &mut ActorSettings) -> Option { + let mut metadata = Metadata::default(); + metadata.channel_group = settings.get_channel(); + metadata.tags = HashMap::new(); + + Some(metadata) + } + + fn build_deactivate_timeout( + &mut self, + settings: &mut ActorSettings, + ) -> Option { + let mut deactivate_timeout_strategy = TimeoutStrategy::default(); + deactivate_timeout_strategy.timeout = settings.get_deactivated_timeout(); + + let mut deactivate = ActorDeactivationStrategy::default(); + deactivate.strategy = Some(Strategy::Timeout(deactivate_timeout_strategy)); + + Some(deactivate) + } + + fn build_snapshot_timeout( + &mut self, + settings: &mut ActorSettings, + ) -> Option { + let mut snapshot_timeout_strategy = TimeoutStrategy::default(); + snapshot_timeout_strategy.timeout = settings.get_snapshot_timeout(); + + let mut snapshot = ActorSnapshotStrategy::default(); + snapshot.strategy = Some(eigr::spawn::actor_snapshot_strategy::Strategy::Timeout( + snapshot_timeout_strategy, + )); + + Some(snapshot) + } + + fn build_registry(&mut self, actors: Vec) -> Option { let mut registry: Registry = Registry::default(); for item in actors.iter() { let name = item.id.to_owned().unwrap().name; registry.actors.insert(name, item.clone()); } - let mut request_buffer: Vec = Vec::new(); - let mut request: RegistrationRequest = RegistrationRequest::default(); - - let mut actor_system = ActorSystem::default(); - actor_system.name = system; - actor_system.registry = Some(registry); + Some(registry) + } + fn build_service_info(&mut self) -> Option { let mut service_info = ServiceInfo::default(); service_info.service_name = "spawn-rust-sdk".to_string(); service_info.service_version = "0.1.0".to_string(); @@ -244,27 +324,6 @@ impl SpawnClient { service_info.protocol_major_version = 1; service_info.protocol_minor_version = 1; - request.service_info = Some(service_info); - request.actor_system = Some(actor_system); - - prost::Message::encode(&request, &mut request_buffer).unwrap(); - - info!(""); - - let res = self - .client - .post(format!( - "http://{}:{}/api/v1/system", - self.proxy_host, self.proxy_port, - )) - .header("Content-Type", "application/octet-stream") - .body(request_buffer) - .send() - .await?; - - debug!("Rust SDK registration response status {:?}", res.status()); - info!("Actors register response {:?}", res); - - Ok(res) + Some(service_info) } }