diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 049e8e5..e70d12e 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -14,10 +14,11 @@ jobs: strategy: fail-fast: false matrix: - os: [ "ubuntu-latest", "windows-latest", "macos-latest" ] - # go: [ "1.20.x", "1.21.x" ] + os: [ "ubuntu-latest", "macos-latest" ] runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v3 - name: Build run: cargo build --verbose + - name: Test + run: cargo test --verbose diff --git a/Cargo.lock b/Cargo.lock index efcbaa1..2c72d2f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3,7 +3,7 @@ version = 3 [[package]] -name = "WASMable-Transport" +name = "W-A-T-E-R" version = "0.1.0" dependencies = [ "anyhow", @@ -14,6 +14,8 @@ dependencies = [ "futures", "libc", "once_cell", + "pprof", + "rand", "rustls", "rustls-pemfile", "serde", @@ -59,6 +61,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" dependencies = [ "cfg-if", + "getrandom", "once_cell", "version_check", ] @@ -138,6 +141,12 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2d098ff73c1ca148721f37baad5ea6a465a13f9573aba8641fbbbae8164a54e" +[[package]] +name = "arrayvec" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" + [[package]] name = "async-trait" version = "0.1.73" @@ -146,7 +155,7 @@ checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.32", ] [[package]] @@ -212,6 +221,12 @@ version = "3.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1" +[[package]] +name = "bytemuck" +version = "1.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "374d28ec25809ee0e23827c2ab573d729e293f281dfe393500e7ad618baa61c6" + [[package]] name = "byteorder" version = "1.4.3" @@ -334,7 +349,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn", + "syn 2.0.32", ] [[package]] @@ -358,6 +373,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "cpp_demangle" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e8227005286ec39567949b33df9896bcadfa6051bccca2488129f108ca23119" +dependencies = [ + "cfg-if", +] + [[package]] name = "cpufeatures" version = "0.2.9" @@ -658,6 +682,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" +[[package]] +name = "fastrand" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" + [[package]] name = "fd-lock" version = "4.0.0" @@ -679,6 +709,24 @@ dependencies = [ "log", ] +[[package]] +name = "findshlibs" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40b9e59cd0f7e0806cca4be089683ecb6434e602038df21fe6bf6711b2f07f64" +dependencies = [ + "cc", + "lazy_static", + "libc", + "winapi", +] + +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "form_urlencoded" version = "1.2.0" @@ -755,7 +803,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.32", ] [[package]] @@ -881,6 +929,15 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b" +[[package]] +name = "home" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5444c27eef6923071f7ebcc33e3444508466a76f7a2b93da00ed6e19f30c1ddb" +dependencies = [ + "windows-sys", +] + [[package]] name = "humantime" version = "2.1.0" @@ -924,6 +981,24 @@ dependencies = [ "serde", ] +[[package]] +name = "inferno" +version = "0.11.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c50453ec3a6555fad17b1cd1a80d16af5bc7cb35094f64e429fd46549018c6a3" +dependencies = [ + "ahash", + "indexmap 2.0.0", + "is-terminal", + "itoa", + "log", + "num-format", + "once_cell", + "quick-xml", + "rgb", + "str_stack", +] + [[package]] name = "io-extras" version = "0.18.0" @@ -1051,6 +1126,16 @@ version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a9bad9f94746442c783ca431b22403b519cd7fbeed0533fdd6328b2f2212128" +[[package]] +name = "lock_api" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1cc9717a20b1bb222f333e6a92fd32f7d8a18ddc5a3191a11af45dcbf4dcd16" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "log" version = "0.4.20" @@ -1087,6 +1172,15 @@ dependencies = [ "rustix 0.37.23", ] +[[package]] +name = "memmap2" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83faa42c0a078c393f6b29d5db232d8be22776a891f8f56e5284faee4a20b327" +dependencies = [ + "libc", +] + [[package]] name = "memoffset" version = "0.9.0" @@ -1116,6 +1210,23 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "multimap" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" + +[[package]] +name = "nix" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b" +dependencies = [ + "bitflags 1.3.2", + "cfg-if", + "libc", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -1126,6 +1237,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-format" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a652d9771a63711fd3c3deb670acfbe5c30a4072e664d7a3bf5a9e1056ac72c3" +dependencies = [ + "arrayvec", + "itoa", +] + [[package]] name = "num_cpus" version = "1.16.0" @@ -1169,6 +1290,29 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall 0.3.5", + "smallvec", + "windows-targets", +] + [[package]] name = "paste" version = "1.0.14" @@ -1181,6 +1325,16 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" +[[package]] +name = "petgraph" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" +dependencies = [ + "fixedbitset", + "indexmap 2.0.0", +] + [[package]] name = "pin-project-lite" version = "0.2.13" @@ -1199,12 +1353,49 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +[[package]] +name = "pprof" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "196ded5d4be535690899a4631cc9f18cdc41b7ebf24a79400f46f48e49a11059" +dependencies = [ + "backtrace", + "cfg-if", + "findshlibs", + "inferno", + "libc", + "log", + "nix", + "once_cell", + "parking_lot", + "prost", + "prost-build", + "prost-derive", + "protobuf", + "protobuf-codegen-pure", + "sha2", + "smallvec", + "symbolic-demangle", + "tempfile", + "thiserror", +] + [[package]] name = "ppv-lite86" version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "prettyplease" +version = "0.1.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c8646e95016a7a6c4adea95bafa8a16baab64b583356217f2c85db4a39d9a86" +dependencies = [ + "proc-macro2", + "syn 1.0.109", +] + [[package]] name = "proc-macro2" version = "1.0.66" @@ -1214,6 +1405,85 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270" +dependencies = [ + "bytes", + "heck", + "itertools", + "lazy_static", + "log", + "multimap", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn 1.0.109", + "tempfile", + "which", +] + +[[package]] +name = "prost-derive" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "prost-types" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" +dependencies = [ + "prost", +] + +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" + +[[package]] +name = "protobuf-codegen" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "033460afb75cf755fcfc16dfaed20b86468082a2ea24e05ac35ab4a099a017d6" +dependencies = [ + "protobuf", +] + +[[package]] +name = "protobuf-codegen-pure" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95a29399fc94bcd3eeaa951c715f7bea69409b2445356b00519740bcd6ddd865" +dependencies = [ + "protobuf", + "protobuf-codegen", +] + [[package]] name = "psm" version = "0.1.21" @@ -1234,6 +1504,15 @@ dependencies = [ "unicase", ] +[[package]] +name = "quick-xml" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f50b1c63b38611e7d4d7f68b82d3ad0cc71a2ad2e7f61fc10f1328d917c93cd" +dependencies = [ + "memchr", +] + [[package]] name = "quote" version = "1.0.33" @@ -1304,6 +1583,15 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redox_syscall" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_users" version = "0.4.3" @@ -1311,7 +1599,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b033d837a7cf162d7993aded9304e30a83213c648b6e389db233191f891e5c2b" dependencies = [ "getrandom", - "redox_syscall", + "redox_syscall 0.2.16", "thiserror", ] @@ -1357,6 +1645,15 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" +[[package]] +name = "rgb" +version = "0.8.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20ec2d3e3fc7a92ced357df9cebd5a10b6fb2aa1ee797bf7e9ce2f17dffc8f59" +dependencies = [ + "bytemuck", +] + [[package]] name = "ring" version = "0.16.20" @@ -1479,7 +1776,7 @@ checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.32", ] [[package]] @@ -1580,12 +1877,52 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "str_stack" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9091b6114800a5f2141aee1d1b9d6ca3592ac062dc5decb3764ec5895a47b4eb" + [[package]] name = "strsim" version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "symbolic-common" +version = "10.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b55cdc318ede251d0957f07afe5fed912119b8c1bc5a7804151826db999e737" +dependencies = [ + "debugid", + "memmap2", + "stable_deref_trait", + "uuid", +] + +[[package]] +name = "symbolic-demangle" +version = "10.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79be897be8a483a81fff6a3a4e195b4ac838ef73ca42d348b3f722da9902e489" +dependencies = [ + "cpp_demangle 0.4.3", + "rustc-demangle", + "symbolic-common", +] + +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.32" @@ -1619,6 +1956,19 @@ version = "0.12.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d0e916b1148c8e263850e1ebcbd046f333e0683c724876bb0da63ea4373dc8a" +[[package]] +name = "tempfile" +version = "3.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef" +dependencies = [ + "cfg-if", + "fastrand", + "redox_syscall 0.3.5", + "rustix 0.38.13", + "windows-sys", +] + [[package]] name = "termcolor" version = "1.2.0" @@ -1645,7 +1995,7 @@ checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.32", ] [[package]] @@ -1753,7 +2103,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.32", ] [[package]] @@ -1952,7 +2302,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn", + "syn 2.0.32", "wasm-bindgen-shared", ] @@ -1974,7 +2324,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.32", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -2110,7 +2460,7 @@ dependencies = [ "anyhow", "proc-macro2", "quote", - "syn", + "syn 2.0.32", "wasmtime-component-util", "wasmtime-wit-bindgen", "wit-parser", @@ -2208,7 +2558,7 @@ dependencies = [ "anyhow", "bincode", "cfg-if", - "cpp_demangle", + "cpp_demangle 0.3.5", "gimli 0.27.3", "ittapi", "log", @@ -2296,7 +2646,7 @@ checksum = "39ca36fa6cad8ef885bc27d7d50c8b1cb7da0534251188a824f4953b07875703" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.32", ] [[package]] @@ -2423,6 +2773,18 @@ dependencies = [ "untrusted", ] +[[package]] +name = "which" +version = "4.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" +dependencies = [ + "either", + "home", + "once_cell", + "rustix 0.38.13", +] + [[package]] name = "wiggle" version = "12.0.1" @@ -2449,7 +2811,7 @@ dependencies = [ "proc-macro2", "quote", "shellexpand", - "syn", + "syn 2.0.32", "witx", ] @@ -2461,7 +2823,7 @@ checksum = "5677f7d740bc41f9f6af4a6a719a07fbe1aa8ec66e0ec1ca4d3617f2b27d5361" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.32", "wiggle-generate", ] diff --git a/Cargo.toml b/Cargo.toml index 6addb2a..a71cc78 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,16 +1,16 @@ [package] -name = "WASMable-Transport" +name = "W-A-T-E-R" version = "0.1.0" author = "cerikccc@gmail.com" edition = "2021" [lib] # crate-type = ["cdylib"] -name = "wasmable_transport" +name = "water" path = "src/lib.rs" [[bin]] -name = "wasmable_transport" +name = "water_cli" path = "src/main.rs" [dependencies] @@ -36,4 +36,9 @@ bincode = "1.3" rustls = "0.20.6" rustls-pemfile = "1.0.0" -zeroize = { version = "1.5.4", features = ["alloc"] } \ No newline at end of file +zeroize = { version = "1.5.4", features = ["alloc"] } + +# test & benchmark dependency +[dev-dependencies] +rand = "0.8" +pprof = { version = "0.11.1", features = ["flamegraph", "protobuf-codec", "prost-codec"] } diff --git a/src/cli/mod.rs b/src/cli/mod.rs index cc66cd8..2d65e2c 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -1,11 +1,35 @@ -use wasmable_transport::*; +use crate::{config::Config, runtime}; use std::sync::Arc; +pub fn parse() -> Result { + // Parse command-line arguments and execute the appropriate commands + let conf = Config::from_args()?; + Ok(conf) +} pub fn parse_and_execute() -> Result<(), anyhow::Error> { - // Parse command-line arguments and execute the appropriate commands - let conf = Config::init()?; - - execute(conf) + execute(parse()?) +} + +pub fn execute(conf: Config) -> Result<(), anyhow::Error> { + let mut water_client = runtime::WATERClient::new(conf)?; + + // // FIXME: hardcoded the addr & port for now + // water_client.connect("", 0)?; + + // loop { + // // keep reading from stdin and call read and write function from water_client.stream + // let mut buf = String::new(); + // std::io::stdin().read_line(&mut buf)?; + + // water_client.write(buf.as_bytes())?; + + // let mut buf = vec![0; 1024]; + // water_client.read(&mut buf)?; + + // println!("read: {:?}", String::from_utf8_lossy(&buf)); + // } + + Ok(()) } \ No newline at end of file diff --git a/src/config/mod.rs b/src/config/mod.rs index f1632e1..fdc28ca 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -6,7 +6,7 @@ use clap::Parser; use crate::globals::{WASM_PATH, MAIN, CONFIF_WASM_PATH}; // pub mod parser; -pub mod sharedconfig; +pub mod wasm_shared_config; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] @@ -23,32 +23,35 @@ struct Args { #[arg(short, long, default_value_t = String::from(MAIN))] entry_fn: String, - /// Optional argument specifying the .wasm file to load + /// Optional argument specifying the config file #[arg(short, long, default_value_t = String::from(CONFIF_WASM_PATH))] config_wasm: String, + + /// Optional argument specifying the client_type, default to be Runner + #[arg(short, long, default_value_t = 2)] + type_client: u32, } pub struct Config { pub filepath: String, pub entry_fn: String, - pub config_wasm: String + pub config_wasm: String, + pub client_type: u32, } impl Config { - // pub fn init() -> Result, anyhow::Error> { - pub fn init() -> Result { + pub fn from_args() -> Result { let args = Args::parse(); - // let config = Arc::new(Config { - // filepath: args.wasm_path, - // entry_fn: args.entry_fn, - // config_wasm: args.config_wasm, - // }); + Self::init(args.wasm_path, args.entry_fn, args.config_wasm, args.type_client) + } + pub fn init(wasm_path: String, entry_fn: String, config_wasm: String, client_type: u32) -> Result { Ok(Config { - filepath: args.wasm_path, - entry_fn: args.entry_fn, - config_wasm: args.config_wasm, + filepath: wasm_path, + entry_fn: entry_fn, + config_wasm: config_wasm, + client_type: client_type, }) } } \ No newline at end of file diff --git a/src/config/sharedconfig.rs b/src/config/wasm_shared_config.rs similarity index 100% rename from src/config/sharedconfig.rs rename to src/config/wasm_shared_config.rs diff --git a/src/lib.rs b/src/lib.rs index feace28..0b93a6c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,34 +3,4 @@ pub mod runtime; pub mod errors; pub mod utils; pub mod globals; - -use config::*; -// use runtime::{Host, WASMRuntime}; - -use std::sync::Arc; - -// Re-export main components for easier access -// pub use wasmruntime::{RuntimeConn, RuntimeDialer, RuntimeDialerConn}; -pub use config::Config; - -pub fn execute(conf: Config) -> Result<(), anyhow::Error> { - let mut water_client = runtime::WATERClient::new(conf)?; - - // FIXME: hardcoded the addr & port for now - water_client.connect("", 0)?; - - loop { - // keep reading from stdin and call read and write function from water_client.stream - let mut buf = String::new(); - std::io::stdin().read_line(&mut buf)?; - - water_client.stream.write(buf.as_bytes())?; - - let mut buf = vec![0; 1024]; - water_client.stream.read(&mut buf)?; - - println!("read: {:?}", String::from_utf8_lossy(&buf)); - } - - Ok(()) -} \ No newline at end of file +pub mod cli; \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 18cbf60..6349021 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,5 @@ -mod cli; +use water::*; -use wasmable_transport; use tracing_subscriber; use tracing::Level; diff --git a/src/runtime/core.rs b/src/runtime/core.rs new file mode 100644 index 0000000..8ebc180 --- /dev/null +++ b/src/runtime/core.rs @@ -0,0 +1,165 @@ +use crate::runtime::*; + +#[derive(Default, Clone)] +pub struct Host { + pub preview1_ctx: Option, + pub wasi_threads: Option>>, +} + +pub struct H2O { + pub engine: Engine, + pub linker: Linker, + pub instance: Instance, + pub store: Store, + pub module: Module, +} + +impl H2O { + pub fn init(conf: &Config) -> Result { + info!("[HOST] WATERCore H2O initing..."); + + let mut wasm_config = wasmtime::Config::new(); + wasm_config.wasm_threads(true); + + let engine = Engine::new(&wasm_config)?; + let mut linker: Linker = Linker::new(&engine); + + let module = Module::from_file(&engine, &conf.filepath)?; + + let host = Host::default(); + let mut store = Store::new(&engine, host); + + // let path = unsafe { Dir::open_ambient_dir(".", ambient_authority())? }; + + // store.data_mut().preview1_ctx = Some(WasiCtxBuilder::new().inherit_stdio().preopened_dir(path, ".")?.build()); + store.data_mut().preview1_ctx = Some(WasiCtxBuilder::new().inherit_stdio().build()); + + wasmtime_wasi::add_to_linker(&mut linker, |h: &mut Host| { + h.preview1_ctx.as_mut().unwrap() + })?; + + // initializing stuff for multithreading + #[cfg(feature = "multithread")] + { + + store.data_mut().wasi_threads = Some(Arc::new(WasiThreadsCtx::new( + module.clone(), + Arc::new(linker.clone()), + )?)); + + wasmtime_wasi_threads::add_to_linker(&mut linker, &store, &module, |h: &mut Host| { + h.wasi_threads.as_ref().unwrap() + })?; + } + + // export functions -- link connect functions + export_tcp_connect(&mut linker); + export_tcplistener_create(&mut linker); + + let instance = linker.instantiate(&mut store, &module)?; + + Ok(H2O { + engine: engine, + linker: linker, + instance: instance, + store: store, + module: module, + }) + } + + pub fn _prepare(&mut self, conf: &Config) -> Result<(), anyhow::Error> { + self._version()?; + self._init()?; + self._process_config(&conf)?; + Ok(()) + } + + pub fn _init(&mut self) -> Result<(), anyhow::Error> { + info!("[HOST] WATERCore H2O calling _init from WASM..."); + + let init_fn = match self.instance.get_func(&mut self.store, INIT_FN) { + Some(func) => func, + None => return Err(anyhow::Error::msg("init function not found")), + }; + + // TODO: check if we need to pass in any arguments / configs later + match init_fn.call(&mut self.store, &[], &mut []) { + Ok(_) => {}, + Err(e) => return Err(anyhow::Error::msg(format!("init function failed: {}", e))), + } + + Ok(()) + } + + pub fn _version(&mut self) -> Result<(), anyhow::Error> { + info!("[HOST] WATERCore H2O calling _version from WASM..."); + + let version_fn = match self.instance.get_func(&mut self.store, VERSION_FN) { + Some(func) => func, + None => return Err(anyhow::Error::msg("version function not found")), + }; + + // let mut res = vec![Val::I32(0); version_fn.ty(&self.core.store).results().len()]; + + // NOTE: below is the most generic way to setup the res vector, to avoid panic from calling + let mut res: Vec = version_fn + .ty(&self.store) + .results() + .map(|ty| // map each type to a default value + match ty { + // i32 and i64 are the only integer types in WebAssembly as of 2021 + ValType::I32 => Val::I32(0), + ValType::I64 => Val::I64(0), + ValType::F32 => Val::F32(0), + ValType::F64 => Val::F64(0), + _ => panic!("Unsupported type"), + } + ) + .collect(); // collect the default values into a Vec + + // TODO: add error handling code like this for all other func.call()'s + match version_fn.call(&mut self.store, &[], &mut res) { + Ok(_) => {}, + Err(e) => return Err(anyhow::Error::msg(format!("version function failed: {}", e))), + } + + match res.get(0) { + Some(Val::I32(v)) => { + if *v != RUNTIME_VERSION_MAJOR { + return Err(anyhow::Error::msg(format!("WASI module version {} is not compatible with runtime version {}", v, RUNTIME_VERSION))); + } + }, + Some(_) => return Err(anyhow::Error::msg("version function returned unexpected type")), + None => return Err(anyhow::Error::msg("version function has no return")), + }; + + Ok(()) + } + + pub fn _process_config(&mut self, config: &Config) -> Result<(), anyhow::Error> { + info!("[HOST] WATERCore H2O calling _process_config from WASM..."); + + // _required to implement _process_config(i32) in WASM, which will be parsing all the configurations + let config_fn = match self.instance.get_func(&mut self.store, CONFIG_FN) { + Some(func) => func, + None => return Err(anyhow::Error::msg("_process_config function not found in WASM")), + }; + + // open the config file and insert to WASM + let dir = Dir::open_ambient_dir(".", ambient_authority())?; // Open the root directory + let wasi_file = dir.open_with(&config.config_wasm, OpenOptions::new().read(true).write(true))?; + let wasi_file = wasmtime_wasi::sync::file::File::from_cap_std(wasi_file); + + // TODO: might be better to ask WASM for the fd? Or if it is fixed in the pipeline, then 3 is fine + let ctx = self.store.data_mut().preview1_ctx.as_mut().unwrap(); + let config_fd = ctx.push_file(Box::new(wasi_file), FileAccessMode::all())? as i32; + + let params = vec![Val::I32(config_fd); config_fn.ty(&self.store).params().len()]; + match config_fn.call(&mut self.store, ¶ms, &mut []) { + Ok(_) => {}, + Err(e) => return Err(anyhow::Error::msg(format!("_process_config function in WASM failed: {}", e))), + } + + Ok(()) + } +} \ No newline at end of file diff --git a/src/runtime/funcs.rs b/src/runtime/funcs.rs index 22e399b..1fc03c7 100644 --- a/src/runtime/funcs.rs +++ b/src/runtime/funcs.rs @@ -1,5 +1,5 @@ use crate::runtime::*; -use crate::sharedconfig::StreamConfig; +use crate::config::wasm_shared_config::StreamConfig; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; pub fn export_tcp_connect(linker: &mut Linker) { @@ -38,14 +38,14 @@ pub fn export_tcp_connect(linker: &mut Linker) { }; let tcp = match (host.as_str(), port) { - ("localhost", port) => std::net::TcpStream::connect(SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::LOCALHOST, - port, - ))), - addr => std::net::TcpStream::connect(addr), - } - .map(TcpStream::from_std) - .context("failed to connect to endpoint").unwrap(); + ("localhost", port) => std::net::TcpStream::connect(SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::LOCALHOST, + port, + ))), + addr => std::net::TcpStream::connect(addr), + } + .map(TcpStream::from_std) + .context("failed to connect to endpoint").unwrap(); // Connecting Tcp let socket_file: Box = wasmtime_wasi::net::Socket::from(tcp).into(); diff --git a/src/runtime/listener.rs b/src/runtime/listener.rs new file mode 100644 index 0000000..99615e5 --- /dev/null +++ b/src/runtime/listener.rs @@ -0,0 +1,152 @@ +use crate::runtime::*; + +pub struct WATERListener { + // WASM functions for reading & writing + + // the reader in WASM (read from net -- n2w) + // returns the number of bytes read + pub reader: Func, + + // the writer in WASM (write to net -- w2n) + // returns the number of bytes written + pub writer: Func, + + pub caller_reader: UnixStream, // the reader in Caller (read from WASM -- w2u) + pub caller_writer: UnixStream, // the writer in Caller (write to WASM -- u2w) + + pub core: H2O, // core WASM runtime (engine, linker, instance, store, module) +} + +impl WATERListener { + + /// Read from the target address + pub fn read(&mut self, buf: &mut Vec) -> Result { + info!("[HOST] WATERStream reading..."); + + let mut res = vec![Val::I64(0); self.reader.ty(&self.core.store).results().len()]; + match self.reader.call(&mut self.core.store, &[], &mut res) { + Ok(_) => {}, + Err(e) => return Err(anyhow::Error::msg(format!("{} function failed: {}", READER_FN, e))), + } + + let nums: i64 = match res.get(0) { + Some(wasmtime::Val::I64(v)) => { + *v + }, + _ => return Err(anyhow::Error::msg(format!("{} function returned unexpected type / no return", READER_FN))), + }; + + // read from WASM's caller_reader + buf.resize(nums as usize, 0); + match self.caller_reader.read(&mut buf[..]) { + Ok(_) => {}, + Err(e) => return Err(anyhow::Error::msg(format!("failed to read from caller_reader: {}", e))), + } + + Ok(nums) + } + + /// Write to the target address + pub fn write(&mut self, buf: &[u8]) -> Result<(), anyhow::Error> { + info!("[HOST] WATERStream writing..."); + + // write to WASM's caller_writer + match self.caller_writer.write_all(buf) { + Ok(_) => {}, + Err(e) => return Err(anyhow::Error::msg(format!("failed to write to caller_writer: {}", e))), + } + + let params = vec![Val::I64(buf.len() as i64)]; + let mut res = vec![Val::I64(0)]; + match self.writer.call(&mut self.core.store, ¶ms, &mut res) { + Ok(_) => { + match res.get(0) { + Some(wasmtime::Val::I64(v)) => { + if *v != buf.len() as i64 { + return Err(anyhow::Error::msg(format!("WASM write function returned unexpected value: {}", *v))); + } + }, + _ => return Err(anyhow::Error::msg("user_write_done function returned unexpected type / no return")), + }; + }, + Err(e) => return Err(anyhow::Error::msg(format!("{} function failed: {}", WRITER_FN, e))), + } + + Ok(()) + } + + /// Listening at the addr:port with running the WASM listen function + pub fn listen(&mut self, conf: &Config, addr: &str, port: u16) -> Result<(), anyhow::Error> { + info!("[HOST] WATERStream listening..."); + + // TODO: add addr:port sharing with WASM, for now WASM is using config.json's remote_addr:port + let fnc = self.core.instance.get_func(&mut self.core.store, &conf.entry_fn).unwrap(); + match fnc.call(&mut self.core.store, &[], &mut []) { + Ok(_) => {}, + Err(e) => return Err(anyhow::Error::msg(format!("connect function failed: {}", e))), + } + + + Ok(()) + } + + pub fn init(conf: &Config) -> Result { + info!("[HOST] WATERStream init..."); + + let mut core = H2O::init(conf)?; + core._prepare(conf)?; + + // constructing 2 pairs of UnixStream for communicating between WASM and Host + // returns (read_end, write_end) for caller + let (caller_read_end, water_write_end) = UnixStream::pair()?; + let (water_read_end, caller_write_end) = UnixStream::pair()?; + + let water_write_file = unsafe { cap_std::fs::File::from_raw_fd(water_write_end.as_raw_fd()) }; + let water_read_file = unsafe { cap_std::fs::File::from_raw_fd(water_read_end.as_raw_fd()) }; + + // insert file here + let wasi_water_reader = wasmtime_wasi::sync::file::File::from_cap_std(water_read_file); + let wasi_water_writer = wasmtime_wasi::sync::file::File::from_cap_std(water_write_file); + + std::mem::forget(water_write_end); + std::mem::forget(water_read_end); + + let ctx = core.store.data_mut().preview1_ctx.as_mut().unwrap(); + let water_reader_fd = ctx.push_file(Box::new(wasi_water_reader), FileAccessMode::all())?; + let water_writer_fd = ctx.push_file(Box::new(wasi_water_writer), FileAccessMode::all())?; + + let water_bridging = match core.instance.get_func(&mut core.store, WATER_BRIDGING_FN) { + Some(func) => func, + None => return Err(anyhow::Error::msg(format!("{} function not found in WASM", WATER_BRIDGING_FN))), + }; + + let params = vec![Val::I32(water_reader_fd as i32), Val::I32(water_writer_fd as i32)]; + match water_bridging.call(&mut core.store, ¶ms, &mut []) { + Ok(_) => {}, + Err(e) => return Err(anyhow::Error::msg(format!("{} function failed: {}", WATER_BRIDGING_FN, e))), + } + + // getting reader & writer func from WASM + let reader = match core.instance.get_func(&mut core.store, READER_FN) { + Some(func) => func, + None => return Err(anyhow::Error::msg(format!("{} function not found in WASM", READER_FN))), + }; + + let writer = match core.instance.get_func(&mut core.store, WRITER_FN) { + Some(func) => func, + None => return Err(anyhow::Error::msg(format!("{} function not found in WASM", WRITER_FN))), + }; + + let runtime = WATERListener { + reader: reader, + writer: writer, + + caller_reader: caller_read_end, + caller_writer: caller_write_end, + + core: core, + }; + + Ok(runtime) + } +} \ No newline at end of file diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index c54559a..76bd7f8 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -1,6 +1,9 @@ pub mod net; pub mod funcs; pub mod stream; +pub mod core; +pub mod listener; +pub mod runner; use std::sync::Arc; use anyhow::{Context, Result}; @@ -28,8 +31,8 @@ use tracing::{info, trace}; use cap_std::net::{TcpListener, TcpStream}; -use crate::Config; -use crate::config::sharedconfig::WASMSharedConfig; +use crate::config::Config; +use crate::config::wasm_shared_config::WASMSharedConfig; use crate::globals::READER_FN; use crate::globals::WRITER_FN; @@ -38,21 +41,42 @@ use funcs::{export_tcplistener_create, export_tcp_connect}; use crate::globals::{VERSION_FN, RUNTIME_VERSION_MAJOR, RUNTIME_VERSION, INIT_FN, USER_READ_FN, WRITE_DONE_FN, CONFIG_FN, WATER_BRIDGING_FN}; -use stream::{WATERStream, Host}; +use stream::{WATERStream}; +use listener::{WATERListener}; +use runner::{WATERRunner}; +// use core::WATERCore; +use core::{H2O, Host}; + + +pub enum WATERClientType { + Dialer(WATERStream), + Listener(WATERListener), + Runner(WATERRunner), // This is a customized runner -- not like any stream +} pub struct WATERClient { debug: bool, pub config: Config, - pub stream: WATERStream, + pub stream: WATERClientType, } impl WATERClient { pub fn new(conf: Config) -> Result { - let mut water = WATERStream::init(&conf)?; - water._version()?; - water._init()?; - water._process_config(&conf)?; + // client_type: 0 -> Dialer, 1 -> Listener, 2 -> Runner + let mut water: WATERClientType; + if conf.client_type == 0 { + let mut stream = WATERStream::init(&conf)?; + water = WATERClientType::Dialer(stream); + } else if conf.client_type == 1 { + let mut stream = WATERListener::init(&conf)?; + water = WATERClientType::Listener(stream); + } else if conf.client_type == 2 { + let mut runner = WATERRunner::init(&conf)?; + water = WATERClientType::Runner(runner); + } else { + return Err(anyhow::anyhow!("Invalid client type")); + } Ok(WATERClient { config: conf, @@ -65,9 +89,76 @@ impl WATERClient { self.debug = debug; } + pub fn execute(&mut self) -> Result<(), anyhow::Error> { + info!("[HOST] WATERClient Executing ..."); + + match &mut self.stream { + WATERClientType::Runner(runner) => { + runner.run(&self.config)?; + }, + _ => { + return Err(anyhow::anyhow!("This client is not a Runner")); + } + } + Ok(()) + } + pub fn connect(&mut self, addr: &str, port: u16) -> Result<(), anyhow::Error> { + info!("[HOST] WATERClient connecting ..."); // NOTE: After creating the WATERStream, do some initial calls to WASM (e.g. version, init, etc.) - self.stream.connect(&self.config, addr, port)?; + match &mut self.stream { + WATERClientType::Dialer(dialer) => { + dialer.connect(&self.config, addr, port)?; + }, + _ => { + return Err(anyhow::anyhow!("This client is not a listener")); + } + } + Ok(()) + } + + pub fn listen(&mut self, addr: &str, port: u16) -> Result<(), anyhow::Error> { + info!("[HOST] WATERClient listening ..."); + // NOTE: After creating the WATERStream, do some initial calls to WASM (e.g. version, init, etc.) + match &mut self.stream { + WATERClientType::Listener(listener) => { + listener.listen(&self.config, addr, port)?; + }, + _ => { + return Err(anyhow::anyhow!("This client is not a listener")); + } + } + Ok(()) + } + + pub fn read(&mut self, buf: &mut Vec) -> Result { + let read_bytes = match self.stream { + WATERClientType::Dialer(ref mut dialer) => { + dialer.read(buf)? + }, + WATERClientType::Listener(ref mut listener) => { + listener.read(buf)? + }, + _ => { + return Err(anyhow::anyhow!("This client is not supporting read")); + } + }; + + Ok(read_bytes) + } + + pub fn write(&mut self, buf: &[u8]) -> Result<(), anyhow::Error> { + match self.stream { + WATERClientType::Dialer(ref mut dialer) => { + dialer.write(buf)?; + }, + WATERClientType::Listener(ref mut listener) => { + listener.write(buf)?; + }, + _ => { + return Err(anyhow::anyhow!("This client is not supporting write")); + } + } Ok(()) } } \ No newline at end of file diff --git a/src/runtime/runner.rs b/src/runtime/runner.rs new file mode 100644 index 0000000..b54cf58 --- /dev/null +++ b/src/runtime/runner.rs @@ -0,0 +1,33 @@ +use crate::runtime::*; + +pub struct WATERRunner { + pub core: H2O, // core WASM runtime (engine, linker, instance, store, module) +} + +impl WATERRunner { + /// Run the entry function + pub fn run(&mut self, conf: &Config) -> Result<(), anyhow::Error> { + info!("[HOST] WATERRunner running..."); + + let fnc = self.core.instance.get_func(&mut self.core.store, &conf.entry_fn).unwrap(); + match fnc.call(&mut self.core.store, &[], &mut []) { + Ok(_) => {}, + Err(e) => return Err(anyhow::Error::msg(format!("run function failed: {}", e))), + } + + Ok(()) + } + + pub fn init(conf: &Config) -> Result { + info!("[HOST] WATERRunner init..."); + + let mut core = H2O::init(conf)?; + core._prepare(conf)?; + + let runtime = WATERRunner { + core: core, + }; + + Ok(runtime) + } +} \ No newline at end of file diff --git a/src/runtime/stream.rs b/src/runtime/stream.rs index 7efd81a..0915fd9 100644 --- a/src/runtime/stream.rs +++ b/src/runtime/stream.rs @@ -1,14 +1,11 @@ use crate::runtime::*; -#[derive(Default, Clone)] -pub struct Host { - pub preview1_ctx: Option, - pub wasi_threads: Option>>, -} +/// This file contains the WATERStream implementation +/// which is a TcpStream liked definition with utilizing WASM // UnixSocket Connection created with Host // Write => u2w +----------------+ w2n -// ----->| Decode |------> +// ----->| WATERStream |------> // Caller | WASM Runtime | n2w Destination // <-----| Decode/Encode |<------ // Read => w2u +----------------+ @@ -28,21 +25,17 @@ pub struct WATERStream { pub caller_reader: UnixStream, // the reader in Caller (read from WASM -- w2u) pub caller_writer: UnixStream, // the writer in Caller (write to WASM -- u2w) - pub engine: Engine, - pub linker: Linker, - pub instance: Instance, - pub store: Store, - pub module: Module, + pub core: H2O, // core WASM runtime (engine, linker, instance, store, module) } impl WATERStream { /// Read from the target address - pub fn read(&mut self, buf: &mut Vec) -> Result<(), anyhow::Error> { + pub fn read(&mut self, buf: &mut Vec) -> Result { info!("[HOST] WATERStream reading..."); - let mut res = vec![Val::I64(0); self.reader.ty(&self.store).results().len()]; - match self.reader.call(&mut self.store, &[], &mut res) { + let mut res = vec![Val::I64(0); self.reader.ty(&self.core.store).results().len()]; + match self.reader.call(&mut self.core.store, &[], &mut res) { Ok(_) => {}, Err(e) => return Err(anyhow::Error::msg(format!("{} function failed: {}", READER_FN, e))), } @@ -61,7 +54,7 @@ impl WATERStream { Err(e) => return Err(anyhow::Error::msg(format!("failed to read from caller_reader: {}", e))), } - Ok(()) + Ok(nums) } /// Write to the target address @@ -76,7 +69,7 @@ impl WATERStream { let params = vec![Val::I64(buf.len() as i64)]; let mut res = vec![Val::I64(0)]; - match self.writer.call(&mut self.store, ¶ms, &mut res) { + match self.writer.call(&mut self.core.store, ¶ms, &mut res) { Ok(_) => { match res.get(0) { Some(wasmtime::Val::I64(v)) => { @@ -93,13 +86,13 @@ impl WATERStream { Ok(()) } - /// Connect to the target address with running the WASM entry function + /// Connect to the target address with running the WASM connect function pub fn connect(&mut self, conf: &Config, addr: &str, port: u16) -> Result<(), anyhow::Error> { info!("[HOST] WATERStream connecting..."); // TODO: add addr:port sharing with WASM, for now WASM is using config.json's remote_addr:port - let fnc = self.instance.get_func(&mut self.store, &conf.entry_fn).unwrap(); - match fnc.call(&mut self.store, &[], &mut []) { + let fnc = self.core.instance.get_func(&mut self.core.store, &conf.entry_fn).unwrap(); + match fnc.call(&mut self.core.store, &[], &mut []) { Ok(_) => {}, Err(e) => return Err(anyhow::Error::msg(format!("connect function failed: {}", e))), } @@ -110,47 +103,10 @@ impl WATERStream { pub fn init(conf: &Config) -> Result { info!("[HOST] WATERStream init..."); - - let mut wasm_config = wasmtime::Config::new(); - wasm_config.wasm_threads(true); - - let engine = Engine::new(&wasm_config)?; - let mut linker: Linker = Linker::new(&engine); - - let module = Module::from_file(&engine, &conf.filepath)?; - - let host = Host::default(); - let mut store = Store::new(&engine, host); - // let path = unsafe { Dir::open_ambient_dir(".", ambient_authority())? }; - - // store.data_mut().preview1_ctx = Some(WasiCtxBuilder::new().inherit_stdio().preopened_dir(path, ".")?.build()); - store.data_mut().preview1_ctx = Some(WasiCtxBuilder::new().inherit_stdio().build()); + let mut core = H2O::init(conf)?; + core._prepare(conf)?; - wasmtime_wasi::add_to_linker(&mut linker, |h: &mut Host| { - h.preview1_ctx.as_mut().unwrap() - })?; - - // initializing stuff for multithreading - #[cfg(feature = "multithread")] - { - - store.data_mut().wasi_threads = Some(Arc::new(WasiThreadsCtx::new( - module.clone(), - Arc::new(linker.clone()), - )?)); - - wasmtime_wasi_threads::add_to_linker(&mut linker, &store, &module, |h: &mut Host| { - h.wasi_threads.as_ref().unwrap() - })?; - } - - // export functions -- link connect functions - export_tcp_connect(&mut linker); - export_tcplistener_create(&mut linker); - - let instance = linker.instantiate(&mut store, &module)?; - // constructing 2 pairs of UnixStream for communicating between WASM and Host // returns (read_end, write_end) for caller let (caller_read_end, water_write_end) = UnixStream::pair()?; @@ -166,28 +122,28 @@ impl WATERStream { std::mem::forget(water_write_end); std::mem::forget(water_read_end); - let ctx = store.data_mut().preview1_ctx.as_mut().unwrap(); + let ctx = core.store.data_mut().preview1_ctx.as_mut().unwrap(); let water_reader_fd = ctx.push_file(Box::new(wasi_water_reader), FileAccessMode::all())?; let water_writer_fd = ctx.push_file(Box::new(wasi_water_writer), FileAccessMode::all())?; - let water_bridging = match instance.get_func(&mut store, WATER_BRIDGING_FN) { + let water_bridging = match core.instance.get_func(&mut core.store, WATER_BRIDGING_FN) { Some(func) => func, None => return Err(anyhow::Error::msg(format!("{} function not found in WASM", WATER_BRIDGING_FN))), }; let params = vec![Val::I32(water_reader_fd as i32), Val::I32(water_writer_fd as i32)]; - match water_bridging.call(&mut store, ¶ms, &mut []) { + match water_bridging.call(&mut core.store, ¶ms, &mut []) { Ok(_) => {}, Err(e) => return Err(anyhow::Error::msg(format!("{} function failed: {}", WATER_BRIDGING_FN, e))), } // getting reader & writer func from WASM - let reader = match instance.get_func(&mut store, READER_FN) { + let reader = match core.instance.get_func(&mut core.store, READER_FN) { Some(func) => func, None => return Err(anyhow::Error::msg(format!("{} function not found in WASM", READER_FN))), }; - let writer = match instance.get_func(&mut store, WRITER_FN) { + let writer = match core.instance.get_func(&mut core.store, WRITER_FN) { Some(func) => func, None => return Err(anyhow::Error::msg(format!("{} function not found in WASM", WRITER_FN))), }; @@ -199,102 +155,9 @@ impl WATERStream { caller_reader: caller_read_end, caller_writer: caller_write_end, - engine: engine, - linker: linker, - instance: instance, - store: store, - module: module, + core: core, }; Ok(runtime) } - - pub fn _version(&mut self) -> Result<(), anyhow::Error> { - info!("[HOST] WATERStream calling _version from WASM..."); - - let version_fn = match self.instance.get_func(&mut self.store, VERSION_FN) { - Some(func) => func, - None => return Err(anyhow::Error::msg("version function not found")), - }; - - // let mut res = vec![Val::I32(0); version_fn.ty(&self.store).results().len()]; - - // NOTE: below is the most generic way to setup the res vector, to avoid panic from calling - let mut res: Vec = version_fn - .ty(&self.store) - .results() - .map(|ty| // map each type to a default value - match ty { - // i32 and i64 are the only integer types in WebAssembly as of 2021 - ValType::I32 => Val::I32(0), - ValType::I64 => Val::I64(0), - ValType::F32 => Val::F32(0), - ValType::F64 => Val::F64(0), - _ => panic!("Unsupported type"), - } - ) - .collect(); // collect the default values into a Vec - - // TODO: add error handling code like this for all other func.call()'s - match version_fn.call(&mut self.store, &[], &mut res) { - Ok(_) => {}, - Err(e) => return Err(anyhow::Error::msg(format!("version function failed: {}", e))), - } - - match res.get(0) { - Some(Val::I32(v)) => { - if *v != RUNTIME_VERSION_MAJOR { - return Err(anyhow::Error::msg(format!("WASI module version {} is not compatible with runtime version {}", v, RUNTIME_VERSION))); - } - }, - Some(_) => return Err(anyhow::Error::msg("version function returned unexpected type")), - None => return Err(anyhow::Error::msg("version function has no return")), - }; - - Ok(()) - } - - pub fn _init(&mut self) -> Result<(), anyhow::Error> { - info!("[HOST] WATERStream calling _init from WASM..."); - - let init_fn = match self.instance.get_func(&mut self.store, INIT_FN) { - Some(func) => func, - None => return Err(anyhow::Error::msg("init function not found")), - }; - - // TODO: check if we need to pass in any arguments / configs later - match init_fn.call(&mut self.store, &[], &mut []) { - Ok(_) => {}, - Err(e) => return Err(anyhow::Error::msg(format!("init function failed: {}", e))), - } - - Ok(()) - } - - pub fn _process_config(&mut self, config: &Config) -> Result<(), anyhow::Error> { - info!("[HOST] WATERStream calling _process_config from WASM..."); - - // _required to implement _process_config(i32) in WASM, which will be parsing all the configurations - let config_fn = match self.instance.get_func(&mut self.store, CONFIG_FN) { - Some(func) => func, - None => return Err(anyhow::Error::msg("_process_config function not found in WASM")), - }; - - // open the config file and insert to WASM - let dir = Dir::open_ambient_dir(".", ambient_authority())?; // Open the root directory - let wasi_file = dir.open_with(&config.config_wasm, OpenOptions::new().read(true).write(true))?; - let wasi_file = wasmtime_wasi::sync::file::File::from_cap_std(wasi_file); - - // TODO: might be better to ask WASM for the fd? Or if it is fixed in the pipeline, then 3 is fine - let ctx = self.store.data_mut().preview1_ctx.as_mut().unwrap(); - let config_fd = ctx.push_file(Box::new(wasi_file), FileAccessMode::all())? as i32; - - let params = vec![Val::I32(config_fd); config_fn.ty(&self.store).params().len()]; - match config_fn.call(&mut self.store, ¶ms, &mut []) { - Ok(_) => {}, - Err(e) => return Err(anyhow::Error::msg(format!("_process_config function in WASM failed: {}", e))), - } - - Ok(()) - } } \ No newline at end of file diff --git a/tests/benchmarking_v0.rs b/tests/benchmarking_v0.rs new file mode 100644 index 0000000..3ee03f7 --- /dev/null +++ b/tests/benchmarking_v0.rs @@ -0,0 +1,80 @@ +use water::*; +use rand; + +use pprof::protos::Message; +use std::io::Write; +use std::thread; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use tracing_subscriber; +use tracing::Level; + +#[test] +fn benchmarking_v0() -> Result<(), anyhow::Error> { + tracing_subscriber::fmt() + .with_max_level(Level::INFO) + .init(); + + // Start the listener in a new thread + let listener_handle = std::thread::spawn(|| -> Result<(), anyhow::Error> { + let conf = config::Config::init(String::from("./tests/test_wasm/proxy.wasm"), String::from("listen"), String::from("./tests/test_data/config.json"), 2)?; + + let mut water_client = runtime::WATERClient::new(conf)?; + + water_client.execute() + }); + + // Give the listener some time to start + std::thread::sleep(std::time::Duration::from_millis(5000)); + + // --------- start to dial the listener --------- + let dial_handle = std::thread::spawn(|| -> Result<(), anyhow::Error> { + let conf = config::Config::init(String::from("./tests/test_wasm/proxy.wasm"), String::from("dial"), String::from("./tests/test_data/config.json"), 0)?; + + let mut water_client = runtime::WATERClient::new(conf)?; + + // FIXME: hardcoded the addr & port for now + water_client.connect("", 0)?; + + let guard = pprof::ProfilerGuard::new(100).unwrap(); + + for _ in 0..10000 { + let random_data: Vec = (0..1000).map(|_| rand::random::()).collect(); + + water_client.write(&random_data)?; + + let mut buf = vec![0; 1000]; + water_client.read(&mut buf)?; + + // println!("read: {:?}", String::from_utf8_lossy(&buf)); + } + + // Stop and report profiler data + if let Ok(report) = guard.report().build() { + // println!("{:?}", report); + // report.flamegraph(std::io::stdout())?; + let mut file = std::fs::File::create("flamegraph.svg")?; + report.flamegraph(file)?; + + // let mut file = std::fs::File::create("profile.pb")?; + // report.pprof(file)?; + let mut file = std::fs::File::create("profile.pb").unwrap(); + let profile = report.pprof().unwrap(); + + let mut content = Vec::new(); + // profile.encode(&mut content).unwrap(); + profile.write_to_vec(&mut content).unwrap(); + file.write_all(&content).unwrap(); + } + + Ok(()) + }); + + dial_handle.join().expect("Listener thread panicked")?; + + // // Signal the listener thread to stop + // should_stop.store(true, Ordering::Relaxed); + + Ok(()) +} \ No newline at end of file diff --git a/tests/test_data/config.json b/tests/test_data/config.json new file mode 100644 index 0000000..bc6f7c7 --- /dev/null +++ b/tests/test_data/config.json @@ -0,0 +1,6 @@ +{ + "remote_address": "127.0.0.1", + "remote_port": 8088, + "local_address": "127.0.0.1", + "local_port": 8088 +} \ No newline at end of file diff --git a/tests/test_wasm/proxy.wasm b/tests/test_wasm/proxy.wasm new file mode 100644 index 0000000..eccdbf7 Binary files /dev/null and b/tests/test_wasm/proxy.wasm differ