diff --git a/Cargo.lock b/Cargo.lock index 189e8a5..56d2062 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -778,7 +778,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "21dc42e00223fc37204bd4aa177e69420c604ca4a183209a8f9de30c6d934698" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.11.6", +] + +[[package]] +name = "prost" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4fdd22f3b9c31b53c060df4a0613a1c7f062d4115a2b984dd15b1858f7e340d" +dependencies = [ + "bytes", + "prost-derive 0.12.1", ] [[package]] @@ -795,8 +805,8 @@ dependencies = [ "multimap", "petgraph", "prettyplease", - "prost", - "prost-types", + "prost 0.11.6", + "prost-types 0.11.6", "regex", "syn 1.0.107", "tempfile", @@ -816,6 +826,19 @@ dependencies = [ "syn 1.0.107", ] +[[package]] +name = "prost-derive" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "265baba7fabd416cf5078179f7d2cbeca4ce7a9041111900675ea7c4cb8a4c32" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 2.0.33", +] + [[package]] name = "prost-types" version = "0.11.6" @@ -823,7 +846,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a5e0526209433e96d83d750dd81a99118edbc55739e7e61a46764fd2ad537788" dependencies = [ "bytes", - "prost", + "prost 0.11.6", +] + +[[package]] +name = "prost-types" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e081b29f63d83a4bc75cfc9f3fe424f9156cf92d8a4f0c9407cce9a1b67327cf" +dependencies = [ + "prost 0.12.1", ] [[package]] @@ -1136,9 +1168,12 @@ name = "spawn-examples" version = "0.1.0" dependencies = [ "env_logger", - "prost-types", + "log", + "prost 0.12.1", + "prost-types 0.12.1", "rocket", "spawn-rs", + "tonic-build", ] [[package]] @@ -1146,8 +1181,9 @@ name = "spawn-rs" version = "0.1.0" dependencies = [ "env_logger", - "prost", - "prost-types", + "log", + "prost 0.12.1", + "prost-types 0.12.1", "rocket", "tonic-build", ] diff --git a/spawn-examples/Cargo.toml b/spawn-examples/Cargo.toml index 3c56eff..061c65c 100644 --- a/spawn-examples/Cargo.toml +++ b/spawn-examples/Cargo.toml @@ -5,6 +5,11 @@ version = "0.1.0" [dependencies] env_logger = "0.10.0" -prost-types = "0.11" +log = {version = "0.4.8", features = ["std"]} +prost = "0.12.1" +prost-types = "0.12.1" rocket = "=0.5.0-rc.3" spawn-rs = {path = "../spawn-rs"} + +[build-dependencies] +tonic-build = "0.8" diff --git a/spawn-examples/build.rs b/spawn-examples/build.rs new file mode 100644 index 0000000..3f53a23 --- /dev/null +++ b/spawn-examples/build.rs @@ -0,0 +1,7 @@ +pub(crate) fn main() -> Result<(), Box> { + tonic_build::configure() + .build_server(false) + .out_dir("src/domain") + .compile(&["proto/domain.proto"], &["proto"])?; + Ok(()) +} diff --git a/spawn-examples/proto/domain.proto b/spawn-examples/proto/domain.proto new file mode 100644 index 0000000..d583bbc --- /dev/null +++ b/spawn-examples/proto/domain.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +package domain; + +message State { + repeated string languages = 1; +} + +message Request { + string language = 1; +} + +message Reply { + string response = 1; +} \ No newline at end of file diff --git a/spawn-examples/src/domain/domain.rs b/spawn-examples/src/domain/domain.rs new file mode 100644 index 0000000..624afcb --- /dev/null +++ b/spawn-examples/src/domain/domain.rs @@ -0,0 +1,18 @@ +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct State { + #[prost(string, repeated, tag = "1")] + pub languages: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Request { + #[prost(string, tag = "1")] + pub language: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Reply { + #[prost(string, tag = "1")] + pub response: ::prost::alloc::string::String, +} diff --git a/spawn-examples/src/domain/mod.rs b/spawn-examples/src/domain/mod.rs new file mode 100644 index 0000000..d7abca1 --- /dev/null +++ b/spawn-examples/src/domain/mod.rs @@ -0,0 +1 @@ +pub mod domain; diff --git a/spawn-examples/src/joe.rs b/spawn-examples/src/joe.rs index 4691d1c..043500f 100644 --- a/spawn-examples/src/joe.rs +++ b/spawn-examples/src/joe.rs @@ -1,10 +1,26 @@ -use prost_types::Any; - +use spawn_examples::domain::domain::{Reply, Request}; use spawn_rs::{context::Context, value::Value, Message}; -pub fn sum(_msg: Message, ctx: Context) -> Value { - return Value::new() - .state(ctx.state().clone()) - .response(Any::default()) - .to_owned(); +use log::info; + +pub fn set_language(msg: Message, ctx: Context) -> Value { + info!("Actor msg: {:?}", msg); + let value: Value = match msg.body::() { + Ok(request) => { + let lang = request.language; + info!("Setlanguage To: {:?}", lang); + let reply = Reply::default(); + + Value::new() + .state(ctx.state().clone()) + .response(&Reply::default()) + .to_owned() + } + Err(e) => Value::new() + .state(ctx.state().clone()) + //.response(Any::default()) + .to_owned(), + }; + + return value; } diff --git a/spawn-examples/src/lib.rs b/spawn-examples/src/lib.rs new file mode 100644 index 0000000..95ab638 --- /dev/null +++ b/spawn-examples/src/lib.rs @@ -0,0 +1,5 @@ +#[macro_use] +extern crate log; +extern crate prost_types; + +pub mod domain; diff --git a/spawn-examples/src/main.rs b/spawn-examples/src/main.rs index d25a24a..66f2cb9 100644 --- a/spawn-examples/src/main.rs +++ b/spawn-examples/src/main.rs @@ -4,14 +4,12 @@ extern crate rocket; mod joe; -use joe::sum; +use joe::set_language; use spawn_rs::actor::{ActorDefinition, ActorSettings, Kind}; use spawn_rs::spawn::Spawn; #[rocket::main] async fn main() -> Result<(), rocket::Error> { - env_logger::init_from_env(env_logger::Env::new().default_filter_or("debug")); - Spawn::new() .create("spawn-system".to_string()) .with_actor( @@ -25,7 +23,7 @@ async fn main() -> Result<(), rocket::Error> { .snapshot_timeout(10000) .to_owned(), ) - .with_action("sum".to_owned(), sum), + .with_action("sum".to_owned(), set_language), ) .start() .await?; diff --git a/spawn-rs/Cargo.toml b/spawn-rs/Cargo.toml index 5d0b0f5..8050207 100644 --- a/spawn-rs/Cargo.toml +++ b/spawn-rs/Cargo.toml @@ -15,8 +15,9 @@ version = "0.1.0" [dependencies] env_logger = "0.10.0" -prost = "0.11" -prost-types = "0.11" +log = {version = "0.4.8", features = ["std"]} +prost = "0.12.1" +prost-types = "0.12.1" rocket = "=0.5.0-rc.3" [build-dependencies] diff --git a/spawn-rs/src/handler/actor_router.rs b/spawn-rs/src/handler/actor_router.rs index dcb37f0..8642322 100644 --- a/spawn-rs/src/handler/actor_router.rs +++ b/spawn-rs/src/handler/actor_router.rs @@ -2,6 +2,15 @@ use std::collections::HashMap; use crate::actor::ActorDefinition; +use crate::context::Context as ActorContext; +use crate::eigr::spawn::actor_invocation::Payload; +use crate::eigr::spawn::{ActorId, ActorInvocation, ActorInvocationResponse, Context, Noop}; +use crate::value::Value; +use crate::Message as ActorMessage; + +use log::{debug, info}; +use prost_types::Any; + #[derive()] pub struct Handler { actors: HashMap, @@ -35,4 +44,50 @@ impl Handler { pub fn get_actors(&mut self) -> &mut HashMap { &mut self.actors } + + pub fn handle(&mut self, request: ActorInvocation) -> ActorInvocationResponse { + info!("Received ActorInvocation request."); + debug!( + "Handle ActorInvocation with incoming request: {:?}", + request + ); + + let actor_id: ActorId = request.actor.unwrap(); + let action: String = request.action_name; + let context: Context = request.current_context.unwrap(); + + let response = ActorInvocationResponse::default(); + + if self.actors.contains_key(actor_id.name.as_str()) { + debug!( + "Forward ActorInvocation to Actor: {:?}", + actor_id.name.as_str() + ); + // handle response + let mut actor_def = self.actors.get(actor_id.name.as_str()).unwrap().clone(); + + if actor_def.get_actions().contains_key(action.as_str()) { + let function: &fn(ActorMessage, ActorContext) -> Value = + actor_def.get_actions().get(action.as_str()).unwrap(); + + let payload = match request.payload { + Some(Payload::Value(value)) => value, + Some(Payload::Noop(_)) => Any::default(), + None => Any::default(), + }; + + let mut msg: ActorMessage = ActorMessage::new(); + msg.set_body(payload); + + let ctx: ActorContext = ActorContext::new(); + + let result: Value = (function)(msg, ctx); + + // TODO: build correct response + return response; + } + } + + return response; + } } diff --git a/spawn-rs/src/lib.rs b/spawn-rs/src/lib.rs index 6bf9a48..5d9f614 100644 --- a/spawn-rs/src/lib.rs +++ b/spawn-rs/src/lib.rs @@ -1,5 +1,6 @@ #[macro_use] extern crate rocket; +extern crate log; extern crate prost_types; mod eigr; @@ -11,18 +12,34 @@ pub mod serializer; pub mod spawn; pub mod value; +use prost::DecodeError; use prost_types::Any; +// fn to_any(message: &T) -> Any +// where +// T: prost::Message, +// { +// Any { +// type_url: T::type_url().to_string(), +// value: message.encode_to_vec(), +// } +// } + +fn from_any(message: &Any) -> Result +where + T: prost::Message + Default, +{ + T::decode(message.value.as_slice()) +} + #[derive(Debug, Clone)] pub struct Message { - action: String, body: Any, } impl Default for Message { fn default() -> Message { Message { - action: String::from(""), body: Any::default(), } } @@ -33,11 +50,14 @@ impl Message { Default::default() } - pub fn action(&self) -> &str { - &self.action + pub fn body(&self) -> Result + where + T: prost::Message + Default, + { + from_any(&self.body) } - pub fn body(&self) -> &Any { - &self.body + pub fn set_body(&mut self, message: Any) { + self.body = message } } diff --git a/spawn-rs/src/spawn.rs b/spawn-rs/src/spawn.rs index 33a05fe..f15812b 100644 --- a/spawn-rs/src/spawn.rs +++ b/spawn-rs/src/spawn.rs @@ -1,9 +1,6 @@ use crate::actor::ActorDefinition; -use crate::context::Context as ActorContext; -use crate::eigr::spawn::{ActorId, ActorInvocation, ActorInvocationResponse, Context}; +use crate::eigr::spawn::{ActorInvocation, ActorInvocationResponse}; use crate::handler::actor_router::Handler; -use crate::value::Value; -use crate::Message as ActorMessage; use prost::Message; use rocket::post; @@ -24,35 +21,12 @@ async fn handle(data: Data<'_>, handler: &State>>) -> io::Res let bytes = data.open(2048.megabytes()).into_bytes().await?; let request_handler = Arc::clone(&handler); - let actors = request_handler.lock().unwrap().get_actors().clone(); let buffer = bytes.into_inner(); let request: ActorInvocation = ActorInvocation::decode(&mut Cursor::new(buffer)).unwrap(); - let actor_id: ActorId = request.actor.unwrap(); - let action: String = request.action_name; - let context: Context = request.current_context.unwrap(); let mut buf: Vec = Vec::new(); - let response = ActorInvocationResponse::default(); - - if actors.contains_key(actor_id.name.as_str()) { - // handle response - let mut actor_def = actors.get(actor_id.name.as_str()).unwrap().clone(); - - if actor_def.get_actions().contains_key(action.as_str()) { - let function: &fn(ActorMessage, ActorContext) -> Value = - actor_def.get_actions().get(action.as_str()).unwrap(); - - let msg: ActorMessage = ActorMessage::new(); - let ctx: ActorContext = ActorContext::new(); - - let result: Value = (function)(msg, ctx); - } - - response.encode(&mut buf).unwrap(); - return Ok(buf); - } - + let response: ActorInvocationResponse = request_handler.lock().unwrap().handle(request); response.encode(&mut buf).unwrap(); return Ok(buf); } @@ -88,6 +62,8 @@ impl Spawn { } pub async fn start(&mut self) -> Result<(), rocket::Error> { + env_logger::init_from_env(env_logger::Env::new().default_filter_or("debug")); + let figment = rocket::Config::figment().merge(("port", 8093)); let mut handler: Handler = Handler::new(); handler.add_actors(self.actors.as_mut()); diff --git a/spawn-rs/src/value.rs b/spawn-rs/src/value.rs index 12b3d2b..58e1228 100644 --- a/spawn-rs/src/value.rs +++ b/spawn-rs/src/value.rs @@ -29,8 +29,11 @@ impl Value { &self.state } - pub fn response(&mut self, response: Any) -> &mut Value { - self.response = response; + pub fn response(&mut self, message: &T) -> &mut Value + where + T: prost::Name, + { + self.response = Any::from_msg(message).unwrap(); self }