|
2 | 2 | // It is important to maintain the same structure as in the proto.
|
3 | 3 | pub mod wasmcloud {
|
4 | 4 | pub mod ctl {
|
5 |
| - include!(concat!(env!("OUT_DIR"), "/wasmcloud.types.rs")); |
| 5 | + include!("generated/wasmcloud.types.rs"); |
6 | 6 | }
|
7 | 7 | }
|
8 | 8 |
|
9 | 9 | #[cfg(test)]
|
10 | 10 | mod test {
|
11 | 11 | use super::wasmcloud;
|
| 12 | + use crate::wasmcloud::ctl::{ControlInterfaceServiceClient, ControlInterfaceServiceServer}; |
12 | 13 |
|
13 |
| - use bytes::BytesMut; |
14 |
| - use prost::Message; |
15 |
| - use tokio::task::JoinSet; |
16 |
| - |
17 |
| - // (Host-side) implementation of the ControlInterface |
18 |
| - pub trait ControlServer { |
19 |
| - fn start_component( |
20 |
| - &self, |
21 |
| - request: wasmcloud::ctl::StartComponentRequest, |
22 |
| - ) -> impl futures::Future<Output = Result<wasmcloud::ctl::StartComponentResponse, String>> + Send; |
23 |
| - } |
24 |
| - pub struct WasmcloudControlServer {} |
25 |
| - impl ControlServer for WasmcloudControlServer { |
| 14 | + struct Host; |
| 15 | + impl ControlInterfaceServiceServer for Host { |
26 | 16 | async fn start_component(
|
27 | 17 | &self,
|
28 | 18 | request: wasmcloud::ctl::StartComponentRequest,
|
29 |
| - ) -> Result<wasmcloud::ctl::StartComponentResponse, String> { |
| 19 | + ) -> anyhow::Result<wasmcloud::ctl::StartComponentResponse> { |
30 | 20 | let component_id = format!("{}-{}", request.reference, request.max_instances);
|
31 | 21 | Ok(wasmcloud::ctl::StartComponentResponse {
|
32 | 22 | component_id,
|
33 | 23 | message: "everything went smoothly and we're going to be okay".to_string(),
|
34 | 24 | })
|
35 | 25 | }
|
36 | 26 | }
|
37 |
| - async fn start_server<S>(server: S, client: async_nats::Client) -> Result<JoinSet<()>, String> |
38 |
| - where |
39 |
| - S: ControlServer + Send + 'static, |
40 |
| - { |
41 |
| - let mut sub = client |
42 |
| - .subscribe("wasmbus.ctl.proto.>") |
43 |
| - .await |
44 |
| - .expect("to subscribe to start component"); |
45 |
| - use futures::StreamExt; |
46 |
| - let mut tasks = JoinSet::new(); |
47 |
| - tasks.spawn(async move { |
48 |
| - while let Some(message) = sub.next().await { |
49 |
| - if message.subject.contains("start.component") { |
50 |
| - let request = wasmcloud::ctl::StartComponentRequest::decode(message.payload) |
51 |
| - .expect("to decode request"); |
52 |
| - let reply = server |
53 |
| - .start_component(request) |
54 |
| - .await |
55 |
| - .expect("to start component"); |
56 |
| - if let Some(reply_to) = message.reply { |
57 |
| - let mut buf = BytesMut::with_capacity(reply.encoded_len()); |
58 |
| - reply |
59 |
| - .encode(&mut buf) |
60 |
| - .expect("to encode without reaching capacity"); |
61 |
| - client |
62 |
| - .publish(reply_to, buf.into()) |
63 |
| - .await |
64 |
| - .expect("to send reply"); |
65 |
| - } |
66 |
| - } |
67 |
| - } |
68 |
| - }); |
69 | 27 |
|
70 |
| - Ok(tasks) |
71 |
| - } |
| 28 | + // Test and validate host functionality without NATS |
| 29 | + #[tokio::test] |
| 30 | + async fn unit_test_proto() { |
| 31 | + let host = Host {}; |
| 32 | + let request = wasmcloud::ctl::StartComponentRequest { |
| 33 | + reference: "test".to_string(), |
| 34 | + max_instances: 1, |
| 35 | + }; |
| 36 | + let response = host |
| 37 | + .start_component(request) |
| 38 | + .await |
| 39 | + .expect("host to handle request"); |
72 | 40 |
|
73 |
| - // (wash/wadm/client-side) implementation of the ControlInterface |
74 |
| - pub trait ControlClient { |
75 |
| - async fn start_component( |
76 |
| - &self, |
77 |
| - request: wasmcloud::ctl::StartComponentRequest, |
78 |
| - ) -> Result<wasmcloud::ctl::StartComponentResponse, String>; |
79 |
| - } |
80 |
| - pub struct WasmcloudControlClient { |
81 |
| - client: async_nats::Client, |
82 |
| - } |
83 |
| - impl WasmcloudControlClient { |
84 |
| - pub fn new(client: async_nats::Client) -> Self { |
85 |
| - Self { client } |
86 |
| - } |
87 |
| - } |
88 |
| - impl ControlClient for WasmcloudControlClient { |
89 |
| - async fn start_component( |
90 |
| - &self, |
91 |
| - request: wasmcloud::ctl::StartComponentRequest, |
92 |
| - ) -> Result<wasmcloud::ctl::StartComponentResponse, String> { |
93 |
| - let mut buf = BytesMut::with_capacity(request.encoded_len()); |
94 |
| - request |
95 |
| - .encode(&mut buf) |
96 |
| - .expect("to encode without reaching capacity"); |
97 |
| - let reply = self |
98 |
| - .client |
99 |
| - .request("wasmbus.ctl.proto.start.component", buf.into()) |
100 |
| - .await |
101 |
| - .expect("to send request"); |
102 |
| - wasmcloud::ctl::StartComponentResponse::decode(reply.payload).map_err(|e| e.to_string()) |
103 |
| - } |
| 41 | + assert_eq!(response.component_id, "test-1"); |
| 42 | + assert_eq!( |
| 43 | + response.message, |
| 44 | + "everything went smoothly and we're going to be okay" |
| 45 | + ); |
104 | 46 | }
|
105 | 47 |
|
| 48 | + // Test and validate host functionality with NATS |
106 | 49 | #[tokio::test]
|
107 | 50 | async fn end_to_end_proto() {
|
108 |
| - let server = WasmcloudControlServer {}; |
109 | 51 | let nats_client = async_nats::connect("nats://127.0.0.1:4222")
|
110 | 52 | .await
|
111 | 53 | .expect("should connect to NATS");
|
112 |
| - let _task = start_server(server, nats_client) |
113 |
| - .await |
114 |
| - .expect("to start server"); |
115 |
| - |
116 |
| - let client = WasmcloudControlClient::new( |
117 |
| - async_nats::connect("nats://127.0.0.1:4222") |
| 54 | + let host = Host {}; |
| 55 | + let server = tokio::spawn({ |
| 56 | + let nats_client = nats_client.clone(); |
| 57 | + wasmcloud::ctl::start_server(host, nats_client) |
118 | 58 | .await
|
119 |
| - .expect("should connect to NATS"), |
120 |
| - ); |
121 |
| - let reply = client |
122 |
| - .start_component(wasmcloud::ctl::StartComponentRequest { |
| 59 | + .expect("to subscribe and start server") |
| 60 | + }); |
| 61 | + |
| 62 | + // Let subscriptions get set up |
| 63 | + tokio::time::sleep(std::time::Duration::from_secs(1)).await; |
| 64 | + |
| 65 | + let reply = nats_client |
| 66 | + .start_component(crate::wasmcloud::ctl::StartComponentRequest { |
123 | 67 | reference: "test".to_string(),
|
124 | 68 | max_instances: 1,
|
125 | 69 | })
|
126 | 70 | .await
|
127 |
| - .expect("reply to be okay"); |
| 71 | + .expect("should start component"); |
128 | 72 |
|
129 | 73 | assert_eq!(reply.component_id, "test-1");
|
130 | 74 | assert_eq!(
|
131 | 75 | reply.message,
|
132 | 76 | "everything went smoothly and we're going to be okay"
|
133 | 77 | );
|
| 78 | + |
| 79 | + server.abort(); |
134 | 80 | }
|
135 | 81 | }
|
0 commit comments