Skip to content

Commit

Permalink
Refactor. Register feature
Browse files Browse the repository at this point in the history
  • Loading branch information
Adriano Santos committed Sep 21, 2023
1 parent 750dc35 commit 8246023
Showing 1 changed file with 126 additions and 67 deletions.
193 changes: 126 additions & 67 deletions spawn-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub mod value;

use std::collections::HashMap;

use actor::ActorDefinition;
use actor::{ActorDefinition, ActorSettings};
use eigr::spawn::{
actor_deactivation_strategy::Strategy, Action, Actor, ActorDeactivationStrategy, ActorId,
ActorSnapshotStrategy, ActorState, ActorSystem, Metadata, RegistrationRequest, Registry,
Expand Down Expand Up @@ -160,51 +160,40 @@ impl SpawnClient {
definitions: Vec<ActorDefinition>,
) -> Result<Response, Error> {
debug!("Make registration request to Spawn proxy");
let actors: Vec<Actor> = definitions
.iter()
.map(|actor_def| {
let mut settings = actor_def.to_owned().get_settings().to_owned();

let mut id = ActorId::default();
id.system = system.to_string();
id.name = settings.get_name();

let mut deactivate_timeout_strategy = TimeoutStrategy::default();
deactivate_timeout_strategy.timeout = settings.get_deactivated_timeout();
let actors: Vec<Actor> = self.build_actors(system.to_string(), definitions);
let request: RegistrationRequest = self.build_registration_request(system, actors);

let mut deactivate = ActorDeactivationStrategy::default();
deactivate.strategy = Some(Strategy::Timeout(deactivate_timeout_strategy));
let mut request_buffer: Vec<u8> = Vec::new();
prost::Message::encode(&request, &mut request_buffer).unwrap();

let mut snapshot_timeout_strategy = TimeoutStrategy::default();
snapshot_timeout_strategy.timeout = settings.get_snapshot_timeout();
let res = self
.client
.post(format!(
"http://{}:{}/api/v1/system",
self.proxy_host, self.proxy_port,
))
.header("Content-Type", "application/octet-stream")
.body(request_buffer)
.send()
.await?;

let mut snapshot = ActorSnapshotStrategy::default();
snapshot.strategy = Some(eigr::spawn::actor_snapshot_strategy::Strategy::Timeout(
snapshot_timeout_strategy,
));
debug!("Rust SDK registration response status {:?}", res.status());
info!("Actors register response {:?}", res);

let mut actor_settings = eigr::spawn::ActorSettings::default();
actor_settings.deactivation_strategy = Some(deactivate);
actor_settings.snapshot_strategy = Some(snapshot);
actor_settings.stateful = settings.get_stateful();
Ok(res)
}

match settings.get_kind() {
actor::Kind::NAMED => actor_settings.kind = eigr::spawn::Kind::Named.into(),
actor::Kind::UNNAMED => actor_settings.kind = eigr::spawn::Kind::Unamed.into(),
actor::Kind::POOLED => actor_settings.kind = eigr::spawn::Kind::Pooled.into(),
actor::Kind::PROXY => actor_settings.kind = eigr::spawn::Kind::Proxy.into(),
};
fn build_actors(&mut self, system: String, definitions: Vec<ActorDefinition>) -> Vec<Actor> {
let actors: Vec<Actor> = definitions
.iter()
.map(|actor_def| {
let mut settings = actor_def.to_owned().get_settings().to_owned();

let mut ac: Actor = Actor::default();
ac.id = Some(id);
ac.state = Some(ActorState::default());
ac.settings = Some(actor_settings);

// TODO: use correct metadata
let mut metadata = Metadata::default();
metadata.channel_group = settings.get_channel();
metadata.tags = HashMap::new();
ac.metadata = Some(metadata);
ac.id = self.build_actor_id(system.to_string(), &mut settings);
ac.state = self.build_initial_actor_state(&mut settings);
ac.settings = self.build_actor_settings(&mut settings);
ac.metadata = self.build_actor_metadata(&mut settings);

let actions = actor_def
.clone()
Expand All @@ -223,19 +212,110 @@ impl SpawnClient {
})
.collect::<Vec<Actor>>();

actors
}

fn build_registration_request(
&mut self,
system: String,
actors: Vec<Actor>,
) -> RegistrationRequest {
let mut request: RegistrationRequest = RegistrationRequest::default();
request.service_info = self.build_service_info();
request.actor_system = self.build_actor_system(system, actors);

request
}

fn build_actor_system(&mut self, system: String, actors: Vec<Actor>) -> Option<ActorSystem> {
let mut actor_system = ActorSystem::default();
actor_system.name = system;
actor_system.registry = self.build_registry(actors);

Some(actor_system)
}

fn build_actor_settings(
&mut self,
settings: &mut ActorSettings,
) -> Option<eigr::spawn::ActorSettings> {
let mut definition_settings = settings.clone();
let mut actor_settings = eigr::spawn::ActorSettings::default();

actor_settings.deactivation_strategy = self.build_deactivate_timeout(settings);

actor_settings.snapshot_strategy = self.build_snapshot_timeout(settings);
actor_settings.stateful = definition_settings.get_stateful();

match definition_settings.get_kind() {
actor::Kind::NAMED => actor_settings.kind = eigr::spawn::Kind::Named.into(),
actor::Kind::UNNAMED => actor_settings.kind = eigr::spawn::Kind::Unamed.into(),
actor::Kind::POOLED => actor_settings.kind = eigr::spawn::Kind::Pooled.into(),
actor::Kind::PROXY => actor_settings.kind = eigr::spawn::Kind::Proxy.into(),
};

Some(actor_settings)
}

fn build_actor_id(&mut self, system: String, settings: &mut ActorSettings) -> Option<ActorId> {
let mut id = ActorId::default();
id.system = system.to_string();
id.name = settings.get_name();

Some(id)
}

fn build_initial_actor_state(&mut self, _settings: &mut ActorSettings) -> Option<ActorState> {
Some(ActorState::default())
}

fn build_actor_metadata(&mut self, settings: &mut ActorSettings) -> Option<Metadata> {
let mut metadata = Metadata::default();
metadata.channel_group = settings.get_channel();
metadata.tags = HashMap::new();

Some(metadata)
}

fn build_deactivate_timeout(
&mut self,
settings: &mut ActorSettings,
) -> Option<ActorDeactivationStrategy> {
let mut deactivate_timeout_strategy = TimeoutStrategy::default();
deactivate_timeout_strategy.timeout = settings.get_deactivated_timeout();

let mut deactivate = ActorDeactivationStrategy::default();
deactivate.strategy = Some(Strategy::Timeout(deactivate_timeout_strategy));

Some(deactivate)
}

fn build_snapshot_timeout(
&mut self,
settings: &mut ActorSettings,
) -> Option<ActorSnapshotStrategy> {
let mut snapshot_timeout_strategy = TimeoutStrategy::default();
snapshot_timeout_strategy.timeout = settings.get_snapshot_timeout();

let mut snapshot = ActorSnapshotStrategy::default();
snapshot.strategy = Some(eigr::spawn::actor_snapshot_strategy::Strategy::Timeout(
snapshot_timeout_strategy,
));

Some(snapshot)
}

fn build_registry(&mut self, actors: Vec<Actor>) -> Option<Registry> {
let mut registry: Registry = Registry::default();
for item in actors.iter() {
let name = item.id.to_owned().unwrap().name;
registry.actors.insert(name, item.clone());
}

let mut request_buffer: Vec<u8> = Vec::new();
let mut request: RegistrationRequest = RegistrationRequest::default();

let mut actor_system = ActorSystem::default();
actor_system.name = system;
actor_system.registry = Some(registry);
Some(registry)
}

fn build_service_info(&mut self) -> Option<ServiceInfo> {
let mut service_info = ServiceInfo::default();
service_info.service_name = "spawn-rust-sdk".to_string();
service_info.service_version = "0.1.0".to_string();
Expand All @@ -244,27 +324,6 @@ impl SpawnClient {
service_info.protocol_major_version = 1;
service_info.protocol_minor_version = 1;

request.service_info = Some(service_info);
request.actor_system = Some(actor_system);

prost::Message::encode(&request, &mut request_buffer).unwrap();

info!("");

let res = self
.client
.post(format!(
"http://{}:{}/api/v1/system",
self.proxy_host, self.proxy_port,
))
.header("Content-Type", "application/octet-stream")
.body(request_buffer)
.send()
.await?;

debug!("Rust SDK registration response status {:?}", res.status());
info!("Actors register response {:?}", res);

Ok(res)
Some(service_info)
}
}

0 comments on commit 8246023

Please sign in to comment.