diff --git a/Cargo.lock b/Cargo.lock index 9ea52fd..b969455 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -161,6 +161,321 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "aws-config" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b30c39ebe61f75d1b3785362b1586b41991873c9ab3e317a9181c246fb71d82" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-sdk-sso", + "aws-sdk-ssooidc", + "aws-sdk-sts", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "hex", + "http 0.2.11", + "hyper", + "ring", + "time", + "tokio", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-credential-types" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33cc49dcdd31c8b6e79850a179af4c367669150c7ac0135f176c61bec81a70f7" +dependencies = [ + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "zeroize", +] + +[[package]] +name = "aws-runtime" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb031bff99877c26c28895766f7bb8484a05e24547e370768d6cc9db514662aa" +dependencies = [ + "aws-credential-types", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.11", + "http-body", + "percent-encoding", + "pin-project-lite", + "tracing", + "uuid", +] + +[[package]] +name = "aws-sdk-sqs" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1bd3f54f12028a536c9b4450c425d7c510f1472d81169d69b603263a4dbf6a3" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http 0.2.11", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sso" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f486420a66caad72635bc2ce0ff6581646e0d32df02aa39dc983bfe794955a5b" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http 0.2.11", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-ssooidc" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39ddccf01d82fce9b4a15c8ae8608211ee7db8ed13a70b514bbfe41df3d24841" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http 0.2.11", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sts" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a591f8c7e6a621a501b2b5d2e88e1697fcb6274264523a6ad4d5959889a41ce" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "http 0.2.11", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sigv4" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c371c6b0ac54d4605eb6f016624fb5c7c2925d315fdf600ac1bf21b19d5f1742" +dependencies = [ + "aws-credential-types", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "form_urlencoded", + "hex", + "hmac", + "http 0.2.11", + "http 1.0.0", + "once_cell", + "percent-encoding", + "sha2", + "time", + "tracing", +] + +[[package]] +name = "aws-smithy-async" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72ee2d09cce0ef3ae526679b522835d63e75fb427aca5413cd371e490d52dcc6" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "aws-smithy-http" +version = "0.60.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dab56aea3cd9e1101a0a999447fb346afb680ab1406cebc44b32346e25b4117d" +dependencies = [ + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.11", + "http-body", + "once_cell", + "percent-encoding", + "pin-project-lite", + "pin-utils", + "tracing", +] + +[[package]] +name = "aws-smithy-json" +version = "0.60.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd3898ca6518f9215f62678870064398f00031912390efd03f1f6ef56d83aa8e" +dependencies = [ + "aws-smithy-types", +] + +[[package]] +name = "aws-smithy-query" +version = "0.60.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda4b1dfc9810e35fba8a620e900522cd1bd4f9578c446e82f49d1ce41d2e9f9" +dependencies = [ + "aws-smithy-types", + "urlencoding", +] + +[[package]] +name = "aws-smithy-runtime" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fafdab38f40ad7816e7da5dec279400dd505160780083759f01441af1bbb10ea" +dependencies = [ + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "fastrand", + "h2", + "http 0.2.11", + "http-body", + "hyper", + "hyper-rustls", + "once_cell", + "pin-project-lite", + "pin-utils", + "rustls", + "tokio", + "tracing", +] + +[[package]] +name = "aws-smithy-runtime-api" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c18276dd28852f34b3bf501f4f3719781f4999a51c7bff1a5c6dc8c4529adc29" +dependencies = [ + "aws-smithy-async", + "aws-smithy-types", + "bytes", + "http 0.2.11", + "pin-project-lite", + "tokio", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-smithy-types" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb3e134004170d3303718baa2a4eb4ca64ee0a1c0a7041dca31b38be0fb414f3" +dependencies = [ + "base64-simd", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.11", + "http-body", + "itoa", + "num-integer", + "pin-project-lite", + "pin-utils", + "ryu", + "serde", + "time", + "tokio", + "tokio-util", +] + +[[package]] +name = "aws-smithy-xml" +version = "0.60.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8604a11b25e9ecaf32f9aa56b9fe253c5e2f606a3477f0071e96d3155a5ed218" +dependencies = [ + "xmlparser", +] + +[[package]] +name = "aws-types" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "789bbe008e65636fe1b6dbbb374c40c8960d1232b96af5ff4aec349f9c4accf4" +dependencies = [ + "aws-credential-types", + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "http 0.2.11", + "rustc_version", + "tracing", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -188,12 +503,37 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64-simd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "339abbe78e73178762e23bea9dfd08e697eb3f3301cd4be981c0f78ba5859195" +dependencies = [ + "outref", + "vsimd", +] + [[package]] name = "base64ct" version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "bit-set" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0700ddab506f33b20a03b13996eccd309a48e5ff77d0d95926aa0210fb4e95f1" +dependencies = [ + "bit-vec", +] + +[[package]] +name = "bit-vec" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" + [[package]] name = "bitflags" version = "1.3.2" @@ -224,6 +564,26 @@ version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" +[[package]] +name = "bytemuck" +version = "1.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed2490600f404f2b94c167e31d3ed1d5f3c225a0f3b80230053b3e0b7b962bd9" +dependencies = [ + "bytemuck_derive", +] + +[[package]] +name = "bytemuck_derive" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "965ab7eb5f8f97d2a083c799f3a1b994fc397b2fe2da5d1da1626ce15a39f2b1" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "byteorder" version = "1.5.0" @@ -236,6 +596,16 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" +[[package]] +name = "bytes-utils" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dafe3a8757b027e2be6e4e5601ed563c55989fcf1546e933c66c8eb3a058d35" +dependencies = [ + "bytes", + "either", +] + [[package]] name = "cadence" version = "0.29.1" @@ -572,6 +942,16 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257" +[[package]] +name = "docker-db" +version = "0.1.0" +source = "git+https://github.com/Dzejkop/docker-db?rev=ef9a4dfccd9cb9b4babeebfea0ba815e36afbafe#ef9a4dfccd9cb9b4babeebfea0ba815e36afbafe" +dependencies = [ + "test-case", + "thiserror", + "tokio", +] + [[package]] name = "dotenv" version = "0.15.0" @@ -657,6 +1037,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fcfdc7a0362c9f4444381a9e697c79d435fe65b52a37466fc2c1184cee9edc6" +[[package]] +name = "float_eq" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28a80e3145d8ad11ba0995949bbcf48b9df2be62772b3d351ef017dff6ecb853" + [[package]] name = "flume" version = "0.11.0" @@ -698,6 +1084,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.30" @@ -771,6 +1172,7 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -820,7 +1222,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.11", "indexmap 2.1.0", "slab", "tokio", @@ -886,6 +1288,9 @@ name = "hex" version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +dependencies = [ + "serde", +] [[package]] name = "hkdf" @@ -925,6 +1330,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -932,7 +1348,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.11", "pin-project-lite", ] @@ -959,7 +1375,7 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", + "http 0.2.11", "http-body", "httparse", "httpdate", @@ -972,6 +1388,22 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +dependencies = [ + "futures-util", + "http 0.2.11", + "hyper", + "log", + "rustls", + "rustls-native-certs", + "tokio", + "tokio-rustls", +] + [[package]] name = "iana-time-zone" version = "0.1.59" @@ -1188,6 +1620,16 @@ version = "2.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" +[[package]] +name = "memmap" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "metrics" version = "0.21.1" @@ -1257,13 +1699,26 @@ dependencies = [ name = "mpc" version = "0.1.0" dependencies = [ + "aws-config", + "aws-sdk-sqs", + "bytemuck", "clap", "config", "criterion", + "docker-db", "dotenv", "eyre", + "float_eq", + "futures", + "hex", + "itertools 0.12.0", + "memmap", "metrics", + "proptest", + "rand", + "rayon", "serde", + "serde_json", "sqlx", "telemetry-batteries", "tokio", @@ -1460,7 +1915,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5f4ecf595095d3b641dd2761a0c3d1f175d3d6c28f38e65418d8004ea3255dd" dependencies = [ "futures-core", - "http", + "http 0.2.11", "indexmap 1.9.3", "itertools 0.10.5", "once_cell", @@ -1481,7 +1936,7 @@ checksum = "c7594ec0e11d8e33faf03530a4c49af7064ebba81c1480e01be67d90b356508b" dependencies = [ "async-trait", "bytes", - "http", + "http 0.2.11", "opentelemetry_api", "reqwest", ] @@ -1558,6 +2013,12 @@ dependencies = [ "hashbrown 0.12.3", ] +[[package]] +name = "outref" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" + [[package]] name = "overload" version = "0.1.1" @@ -1753,6 +2214,32 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "proptest" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31b476131c3c86cb68032fdc5cb6d5a1045e3e42d96b69fa599fd77701e1f5bf" +dependencies = [ + "bit-set", + "bit-vec", + "bitflags 2.4.2", + "lazy_static", + "num-traits", + "rand", + "rand_chacha", + "rand_xorshift", + "regex-syntax 0.8.2", + "rusty-fork", + "tempfile", + "unarray", +] + +[[package]] +name = "quick-error" +version = "1.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" + [[package]] name = "quote" version = "1.0.35" @@ -1792,6 +2279,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rand_xorshift" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d25bf25ec5ae4a3f1b92f929810509a2f53d7dca2f50b794ff57e3face536c8f" +dependencies = [ + "rand_core", +] + [[package]] name = "rayon" version = "1.8.1" @@ -1864,6 +2360,12 @@ dependencies = [ "regex-syntax 0.8.2", ] +[[package]] +name = "regex-lite" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b661b2f27137bdbc16f00eda72866a92bb28af1753ffbd56744fb6e2e9cd8e" + [[package]] name = "regex-syntax" version = "0.6.29" @@ -1888,7 +2390,7 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", + "http 0.2.11", "http-body", "hyper", "ipnet", @@ -1911,6 +2413,20 @@ dependencies = [ "winreg", ] +[[package]] +name = "ring" +version = "0.17.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "688c63d65483050968b2a8937f7995f443e27041a0f7700aa59b0822aedebb74" +dependencies = [ + "cc", + "getrandom", + "libc", + "spin 0.9.8", + "untrusted", + "windows-sys 0.48.0", +] + [[package]] name = "rmp" version = "0.8.12" @@ -1969,6 +2485,15 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + [[package]] name = "rustix" version = "0.38.30" @@ -1982,6 +2507,61 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustls" +version = "0.21.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" +dependencies = [ + "log", + "ring", + "rustls-webpki", + "sct", +] + +[[package]] +name = "rustls-native-certs" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" +dependencies = [ + "base64 0.21.7", +] + +[[package]] +name = "rustls-webpki" +version = "0.101.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +dependencies = [ + "ring", + "untrusted", +] + +[[package]] +name = "rusty-fork" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb3dcc6e454c328bb824492db107ab7c0ae8fcffe4ad210136ef014458c1bc4f" +dependencies = [ + "fnv", + "quick-error", + "tempfile", + "wait-timeout", +] + [[package]] name = "ryu" version = "1.0.16" @@ -2012,6 +2592,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "security-framework" version = "2.9.2" @@ -2035,6 +2625,12 @@ dependencies = [ "libc", ] +[[package]] +name = "semver" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0" + [[package]] name = "serde" version = "1.0.195" @@ -2109,6 +2705,15 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + [[package]] name = "signature" version = "2.2.0" @@ -2455,7 +3060,7 @@ source = "git+https://github.com/worldcoin/telemetry-batteries.git?rev=c68166244 dependencies = [ "chrono", "dirs", - "http", + "http 0.2.11", "metrics", "metrics-exporter-statsd", "opentelemetry", @@ -2485,6 +3090,39 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "test-case" +version = "3.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb2550dd13afcd286853192af8601920d959b14c401fcece38071d53bf0768a8" +dependencies = [ + "test-case-macros", +] + +[[package]] +name = "test-case-core" +version = "3.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adcb7fd841cd518e279be3d5a3eb0636409487998a4aff22f3de87b81e88384f" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 2.0.48", +] + +[[package]] +name = "test-case-macros" +version = "3.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c89e72a01ed4c579669add59014b9a524d609c0c88c6a585ce37485879f6ffb" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", + "test-case-core", +] + [[package]] name = "thiserror" version = "1.0.56" @@ -2580,7 +3218,9 @@ dependencies = [ "libc", "mio", "num_cpus", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.48.0", @@ -2597,6 +3237,16 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.14" @@ -2769,6 +3419,12 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" +[[package]] +name = "unarray" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eaea85b334db583fe3274d12b4cd1880032beab409c0d774be044d4480ab9a94" + [[package]] name = "unicode-bidi" version = "0.3.15" @@ -2802,6 +3458,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" version = "2.5.0" @@ -2825,6 +3487,12 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "uuid" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f00cc9702ca12d3c81455259621e676d0f7251cec66a21e98fe2e9a37db93b2a" + [[package]] name = "valuable" version = "0.1.0" @@ -2843,6 +3511,21 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "vsimd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" + +[[package]] +name = "wait-timeout" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f200f5b12eb75f8c1ed65abd4b2db8a6e1b138a20de009dacee265a2498f3f6" +dependencies = [ + "libc", +] + [[package]] name = "walkdir" version = "2.4.0" @@ -3132,6 +3815,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "xmlparser" +version = "0.13.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" + [[package]] name = "yaml-rust" version = "0.4.5" diff --git a/Cargo.toml b/Cargo.toml index 5bfe584..46ac0cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,28 +4,48 @@ version = "0.1.0" edition = "2021" [dependencies] +aws-config = "1.1.4" +aws-sdk-sqs = "1.12.0" +bytemuck = { version = "1.14.1", features = ["derive"] } clap = { version = "4.4.18", features = ["derive", "env"] } config = "0.13.4" -dotenv = "0.15.0" criterion = "0.5.1" +dotenv = "0.15.0" eyre = "0.6.11" +futures = "0.3.30" +hex = { version = "0.4.3", features = ["serde"] } +itertools = "0.12.0" +memmap = "0.7.0" +metrics = "0.21.1" +rand = "0.8.5" +rayon = "1.8.1" +serde = { version = "1.0.195", features = ["derive"] } +serde_json = "1.0.111" sqlx = { version = "0.7.3", features = [ "runtime-tokio-native-tls", "any", "postgres", "chrono", ] } +docker-db = { git = "https://github.com/Dzejkop/docker-db", rev = "ef9a4dfccd9cb9b4babeebfea0ba815e36afbafe" } telemetry-batteries = { git = "https://github.com/worldcoin/telemetry-batteries.git", rev = "c6816624415ae194da5203a5161621a9e10ad3b0" } tokio = { version = "1.35.1", features = ["macros"] } tracing = "0.1.40" tracing-subscriber = "0.3.18" -metrics = "0.21.1" -serde = { version = "1.0.195", features = ["derive"] } + +[dev-dependencies] +float_eq = "1.0.1" +proptest = "1.4.0" [[bin]] -name = "mpc-node" -path = "bin/mpc_node.rs" +name = "mpc-participant" +path = "bin/mpc_participant.rs" + +[[bin]] +name = "mpc-coordinator" +path = "bin/mpc_coordinator.rs" [[bench]] name = "example" harness = false + diff --git a/bin/mpc_coordinator.rs b/bin/mpc_coordinator.rs new file mode 100644 index 0000000..201d13b --- /dev/null +++ b/bin/mpc_coordinator.rs @@ -0,0 +1,72 @@ +use std::path::PathBuf; + +use aws_config::BehaviorVersion; +use clap::Parser; +use mpc::config::CoordinatorConfig; +use mpc::coordinator::Coordinator; +use telemetry_batteries::metrics::batteries::StatsdBattery; +use telemetry_batteries::tracing::batteries::DatadogBattery; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; + +pub const SERVICE_NAME: &str = "mpc-coordinator"; + +pub const METRICS_HOST: &str = "localhost"; +pub const METRICS_PORT: u16 = 8125; +pub const METRICS_QUEUE_SIZE: usize = 5000; +pub const METRICS_BUFFER_SIZE: usize = 1024; +pub const METRICS_PREFIX: &str = "mpc-coordinator"; + +#[derive(Parser)] +#[clap(version)] +pub struct Args { + #[clap(short, long, env)] + telemetry: bool, + + #[clap(short, long, env)] + config: Option, +} + +#[tokio::main] +async fn main() -> eyre::Result<()> { + dotenv::dotenv().ok(); + + let args = Args::parse(); + + if args.telemetry { + DatadogBattery::init(None, SERVICE_NAME, None, true); + + StatsdBattery::init( + METRICS_HOST, + METRICS_PORT, + METRICS_QUEUE_SIZE, + METRICS_BUFFER_SIZE, + Some(METRICS_PREFIX), + )?; + } else { + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer().pretty().compact()) + .with(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + } + + let mut settings = config::Config::builder(); + + if let Some(path) = args.config { + settings = settings.add_source(config::File::from(path).required(true)); + } + + let settings = settings + .add_source(config::Environment::with_prefix("MPC").separator("__")) + .build()?; + + let config = settings.try_deserialize::()?; + + let coordinator = + Coordinator::new(vec![], "template_queue_url", "distance_queue_url") + .await?; + + coordinator.spawn().await?; + + Ok(()) +} diff --git a/bin/mpc_node.rs b/bin/mpc_participant.rs similarity index 79% rename from bin/mpc_node.rs rename to bin/mpc_participant.rs index 49cdf45..5e8e50f 100644 --- a/bin/mpc_node.rs +++ b/bin/mpc_participant.rs @@ -1,7 +1,6 @@ use std::path::PathBuf; use clap::Parser; -use mpc::config::Config; use telemetry_batteries::metrics::batteries::StatsdBattery; use telemetry_batteries::tracing::batteries::DatadogBattery; use tracing_subscriber::layer::SubscriberExt; @@ -60,22 +59,5 @@ async fn main() -> eyre::Result<()> { .add_source(config::Environment::with_prefix("MPC").separator("__")) .build()?; - let config = settings.try_deserialize::()?; - - let mut n = 0; - - loop { - foo(&config, n).await; - - n += 1; - - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; - } -} - -#[tracing::instrument(skip(config))] -async fn foo(config: &Config, n: usize) { - tracing::info!(n, test = config.test.test, "Foo"); - - metrics::gauge!("foo", n as f64); + Ok(()) } diff --git a/migrations/coordinator/001_init.sql b/migrations/coordinator/001_init.sql new file mode 100644 index 0000000..d1d74d1 --- /dev/null +++ b/migrations/coordinator/001_init.sql @@ -0,0 +1,4 @@ +CREATE TABLE masks ( + id BIGINT PRIMARY KEY, + mask BYTEA NOT NULL +); diff --git a/migrations/participant/001_init.sql b/migrations/participant/001_init.sql new file mode 100644 index 0000000..ecd7af4 --- /dev/null +++ b/migrations/participant/001_init.sql @@ -0,0 +1,4 @@ +CREATE TABLE shares ( + id BIGINT PRIMARY KEY, + share BYTEA NOT NULL +); diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 0748762..31ee93e 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "nightly-2024-01-26" +channel = "nightly-2024-01-25" components = ["rustc-dev", "rustc", "cargo", "rustfmt", "clippy"] diff --git a/src/arch/generic.rs b/src/arch/generic.rs new file mode 100644 index 0000000..ac1ee88 --- /dev/null +++ b/src/arch/generic.rs @@ -0,0 +1,100 @@ +#![allow(unused)] +use std::cmp::min; +use std::mem::swap; +use std::thread::JoinHandle; + +use rayon::prelude::*; + +use crate::distance::Bits; +use crate::encoded_bits::EncodedBits; + +pub fn distances<'a>( + query: &'a EncodedBits, + db: &'a [EncodedBits], +) -> impl Iterator + 'a { + const BATCH: usize = 10_000; + + // Prepare 31 rotations of query in advance + let rotations: Box<[_]> = (-15..=15).map(|r| query.rotated(r)).collect(); + + // Iterate over a batch of database entries + db.chunks(BATCH).flat_map(move |chunk| { + let mut results = [[0_u16; 31]; BATCH]; + + // Parallel computation over batch + results.par_iter_mut().zip(chunk.par_iter()).for_each( + |(result, entry)| { + // Compute dot product for each rotation + for (d, rotation) in result.iter_mut().zip(rotations.iter()) { + *d = rotation.dot(entry); + } + }, + ); + + // Sequentially output results + results.into_iter().take(chunk.len()) + }) +} + +pub fn denominators<'a>( + query: &'a Bits, + db: &'a [Bits], +) -> impl Iterator + 'a { + const BATCH: usize = 10_000; + + // Prepare 31 rotations of query in advance + let rotations: Box<[_]> = (-15..=15).map(|r| query.rotated(r)).collect(); + + // Iterate over a batch of database entries + db.chunks(BATCH).flat_map(move |chunk| { + // Parallel computation over batch + let results = chunk + .par_iter() + .map(|(entry)| { + let mut result = [0_u16; 31]; + // Compute dot product for each rotation + for (d, rotation) in result.iter_mut().zip(rotations.iter()) { + *d = rotation.dot(entry); + } + result + }) + .collect::>(); + + // Sequentially output results + results.into_iter().take(chunk.len()) + }) +} + +//TODO: move this to the benches file +#[cfg(feature = "bench")] +pub mod benches { + use core::hint::black_box; + + use criterion::Criterion; + use rand::{thread_rng, Rng}; + + use super::*; + + pub fn group(c: &mut Criterion) { + let mut rng = thread_rng(); + let mut g = c.benchmark_group("generic"); + + g.bench_function("distances 31x1000", |bench| { + let a: EncodedBits = rng.gen(); + let b: Box<[EncodedBits]> = (0..1000).map(|_| rng.gen()).collect(); + bench.iter(|| { + black_box(distances(black_box(&a), black_box(&b))) + .for_each(|_| {}) + }) + }); + + g.bench_function("denominators 31x1000", |bench| { + let a: Bits = rng.gen(); + let b: Box<[Bits]> = (0..1000).map(|_| rng.gen()).collect(); + bench.iter(|| { + black_box(denominators(black_box(&a), black_box(&b))) + .for_each(|_| {}) + }) + }); + } +} diff --git a/src/arch/mod.rs b/src/arch/mod.rs new file mode 100644 index 0000000..6cbe090 --- /dev/null +++ b/src/arch/mod.rs @@ -0,0 +1,22 @@ +mod generic; // Optimized generic implementation +mod neon; // Optimized aarch64 NEON implementation +mod reference; // Simple generic implementations + +pub use generic::{denominators, distances}; + +//TODO: move this to the benches file +#[cfg(feature = "bench")] +pub mod benches { + use criterion::Criterion; + + use super::*; + + pub fn group(c: &mut Criterion) { + reference::benches::group(c); + + generic::benches::group(c); + + #[cfg(target_feature = "neon")] + neon::benches::group(c); + } +} diff --git a/src/arch/neon.rs b/src/arch/neon.rs new file mode 100644 index 0000000..7c59a7c --- /dev/null +++ b/src/arch/neon.rs @@ -0,0 +1,20 @@ +#![cfg(target_feature = "neon")] +#![allow(unused)] + +// Rust + LLVM already generates good NEON code for the generic implementation. + +//TODO: move this to the benches file +#[cfg(feature = "bench")] +pub mod benches { + use core::hint::black_box; + + use criterion::Criterion; + use rand::{thread_rng, Rng}; + + use super::*; + + pub fn group(c: &mut Criterion) { + let mut g = c.benchmark_group("neon"); + let mut rng = thread_rng(); + } +} diff --git a/src/arch/reference.rs b/src/arch/reference.rs new file mode 100644 index 0000000..3aa7e66 --- /dev/null +++ b/src/arch/reference.rs @@ -0,0 +1,64 @@ +#![allow(unused)] + +use crate::distance::Bits; +use crate::encoded_bits::EncodedBits; + +pub fn distances<'a>( + query: &'a EncodedBits, + db: &'a [EncodedBits], +) -> impl Iterator + 'a { + db.iter().map(|entry| { + let mut result = [0_u16; 31]; + for (d, r) in result.iter_mut().zip(-15..=15) { + *d = query.rotated(r).dot(entry); + } + result + }) +} + +pub fn denominators<'a>( + query: &'a Bits, + db: &'a [Bits], +) -> impl Iterator + 'a { + db.iter().map(|entry| { + let mut result = [0_u16; 31]; + for (d, r) in result.iter_mut().zip(-15..=15) { + *d = query.rotated(r).dot(entry); + } + result + }) +} + +//TODO: move this to the benches file +#[cfg(feature = "bench")] +pub mod benches { + use core::hint::black_box; + + use criterion::Criterion; + use rand::{thread_rng, Rng}; + + use super::*; + + pub fn group(c: &mut Criterion) { + let mut rng = thread_rng(); + let mut g = c.benchmark_group("reference"); + + g.bench_function("distances 31x1000", |bench| { + let a: EncodedBits = rng.gen(); + let b: Box<[EncodedBits]> = (0..1000).map(|_| rng.gen()).collect(); + bench.iter(|| { + black_box(distances(black_box(&a), black_box(&b))) + .for_each(|_| {}) + }) + }); + + g.bench_function("denominators 31x1000", |bench| { + let a: Bits = rng.gen(); + let b: Box<[Bits]> = (0..1000).map(|_| rng.gen()).collect(); + bench.iter(|| { + black_box(denominators(black_box(&a), black_box(&b))) + .for_each(|_| {}) + }) + }); + } +} diff --git a/src/bits.rs b/src/bits.rs new file mode 100644 index 0000000..a4a35c6 --- /dev/null +++ b/src/bits.rs @@ -0,0 +1,261 @@ +use std::fmt::Debug; +use std::ops; +use std::ops::Index; + +use bytemuck::{ + bytes_of, cast_slice_mut, try_cast_slice, try_cast_slice_mut, Pod, Zeroable, +}; +use rand::distributions::{Distribution, Standard}; +use rand::Rng; +use serde::de::Error as _; +use serde::{Deserialize, Serialize}; + +pub const COLS: usize = 200; +pub const ROWS: usize = 4 * 16; +pub const BITS: usize = ROWS * COLS; +const LIMBS: usize = BITS / 64; +const BYTES_PER_COL: usize = COLS / 8; + +#[repr(transparent)] +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub struct Bits(pub [u64; LIMBS]); + +impl Bits { + pub fn rotate(&mut self, amount: i32) { + let bytes: &mut [u8] = + try_cast_slice_mut(self.0.as_mut_slice()).unwrap(); + for chunk in bytes.chunks_exact_mut(BYTES_PER_COL) { + rotate_row(chunk.try_into().unwrap(), amount) + } + } + + pub fn rotated(&self, amount: i32) -> Self { + let mut copy = *self; + copy.rotate(amount); + copy + } + + pub fn count_ones(&self) -> u16 { + self.0.iter().map(|n| n.count_ones() as u16).sum() + } + + pub fn dot(&self, other: &Self) -> u16 { + self.0 + .iter() + .zip(other.0.iter()) + .map(|(&a, &b)| (a & b).count_ones() as u16) + .sum() + } +} + +unsafe impl Zeroable for Bits {} + +unsafe impl Pod for Bits {} + +impl Index for Bits { + type Output = bool; + + fn index(&self, index: usize) -> &Self::Output { + assert!(index < BITS); + let (limb, bit) = (index / 64, index % 64); + let b = self.0[limb] & (1_u64 << bit) != 0; + if b { + &true + } else { + &false + } + } +} + +impl Default for Bits { + fn default() -> Self { + Self([0; LIMBS]) + } +} + +impl Debug for Bits { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + for limb in self.0 { + write!(f, "{limb:016x}")?; + } + Ok(()) + } +} + +impl Serialize for Bits { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + hex::serialize(bytes_of(self), serializer) + } +} + +impl<'de> Deserialize<'de> for Bits { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let bytes: Vec = hex::deserialize(deserializer)?; + let limbs = + try_cast_slice(bytes.as_slice()).map_err(D::Error::custom)?; + let limbs = limbs.try_into().map_err(D::Error::custom)?; + Ok(Bits(limbs)) + } +} + +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> Bits { + let mut values = [0_u64; LIMBS]; + rng.fill_bytes(cast_slice_mut(values.as_mut_slice())); + Bits(values) + } +} + +impl ops::Not for &Bits { + type Output = Bits; + + fn not(self) -> Self::Output { + let mut result = Bits::default(); + for (r, s) in result.0.iter_mut().zip(self.0.iter()) { + *r = !s; + } + result + } +} + +impl ops::BitAnd for &Bits { + type Output = Bits; + + fn bitand(self, rhs: Self) -> Self::Output { + let mut result = *self; + result &= rhs; + result + } +} + +impl ops::BitAnd<&Bits> for Bits { + type Output = Bits; + + fn bitand(mut self, rhs: &Self) -> Self::Output { + self &= rhs; + self + } +} + +impl ops::BitOr for &Bits { + type Output = Bits; + + fn bitor(self, rhs: Self) -> Self::Output { + let mut result = *self; + result |= rhs; + result + } +} + +impl ops::BitXor for &Bits { + type Output = Bits; + + fn bitxor(self, rhs: Self) -> Self::Output { + let mut result = *self; + result ^= rhs; + result + } +} + +impl ops::BitAndAssign<&Bits> for Bits { + fn bitand_assign(&mut self, rhs: &Self) { + for (s, r) in self.0.iter_mut().zip(rhs.0.iter()) { + s.bitand_assign(r); + } + } +} + +impl ops::BitOrAssign<&Bits> for Bits { + fn bitor_assign(&mut self, rhs: &Self) { + for (s, r) in self.0.iter_mut().zip(rhs.0.iter()) { + s.bitor_assign(r); + } + } +} + +impl ops::BitXorAssign<&Bits> for Bits { + fn bitxor_assign(&mut self, rhs: &Self) { + for (s, r) in self.0.iter_mut().zip(rhs.0.iter()) { + s.bitxor_assign(r); + } + } +} + +fn rotate_row(a: &mut [u8; BYTES_PER_COL], mut amount: i32) { + if amount <= -8 { + a.rotate_left((amount.unsigned_abs() as usize) / 8); + amount %= 8; + } else if amount >= 8 { + a.rotate_right((amount as usize) / 8); + amount %= 8; + } + if amount < 0 { + let r = amount.abs(); + let l = 8 - r; + let mut carry = a[0] << l; + for b in a.iter_mut().rev() { + let old = *b; + *b = (old >> r) | carry; + carry = old << l; + } + } else if amount > 0 { + let l = amount.abs(); + let r = 8 - l; + let mut carry = a[24] >> r; + for b in a.iter_mut() { + let old = *b; + *b = (old << l) | carry; + carry = old >> r; + } + } +} + +#[cfg(test)] +mod tests { + use rand::{thread_rng, Rng}; + + use super::*; + + #[test] + fn limbs_exact() { + assert_eq!(LIMBS * 64, BITS); + assert_eq!(BYTES_PER_COL * 8, COLS); + } + + #[test] + fn test_index() { + let mut rng = thread_rng(); + for _ in 0..100 { + let bits: Bits = rng.gen(); + for location in 0..BITS { + let actual = bits[location]; + + let (byte, bit) = (location / 8, location % 8); + let expected = bytes_of(&bits)[byte] & (1_u8 << bit) != 0; + + assert_eq!(actual, expected); + } + } + } + + #[test] + fn test_rotated_inverse() { + let mut rng = thread_rng(); + for _ in 0..100 { + let bits: Bits = rng.gen(); + for amount in -15..=15 { + assert_eq!( + bits.rotated(amount).rotated(-amount), + bits, + "Rotation failed for {amount}" + ) + } + } + } +} diff --git a/src/config.rs b/src/config.rs index d2a6f67..c25c1f5 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,27 +1,4 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Config { - #[serde(default)] - pub test: TestConfig, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct TestConfig { - #[serde(default = "default::test")] - pub test: String, -} - -impl Default for TestConfig { - fn default() -> Self { - Self { - test: default::test(), - } - } -} - -pub mod default { - pub fn test() -> String { - "default".to_string() - } -} +pub struct CoordinatorConfig {} diff --git a/src/coordinator.rs b/src/coordinator.rs new file mode 100644 index 0000000..01ee038 --- /dev/null +++ b/src/coordinator.rs @@ -0,0 +1,277 @@ +use std::io::ErrorKind; +use std::net::SocketAddr; +use std::panic::panic_any; +use std::sync::Arc; + +use aws_sdk_sqs::operation::receive_message::builders::ReceiveMessageFluentBuilder; +use aws_sdk_sqs::operation::send_message::builders::SendMessageFluentBuilder; +use bytemuck::bytes_of; +use eyre::{anyhow, Error}; +use futures::future; +use memmap::{Mmap, MmapOptions}; +use serde::{Deserialize, Serialize}; +use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; +use tokio::net::TcpStream; +use tokio::sync::mpsc::Receiver; +use tokio::sync::{self, mpsc}; +use tokio::task::JoinHandle; + +use crate::bits::Bits; +use crate::distance::{self, MasksEngine}; +use crate::encoded_bits::EncodedBits; +use crate::template::{self, Template}; + +const BATCH_SIZE: usize = 20_000; + +pub struct Coordinator { + aws_client: aws_sdk_sqs::Client, + shares_queue_url: String, + distances_queue_url: String, + participants: Vec>, +} + +impl Coordinator { + pub async fn new( + participants: Vec, + shares_queue_url: &str, + distances_queue_url: &str, + //TODO: Update error handling + ) -> eyre::Result { + let aws_config = + aws_config::load_defaults(aws_config::BehaviorVersion::latest()) + .await; + + let aws_client = aws_sdk_sqs::Client::new(&aws_config); + + let mut streams = vec![]; + for participant in participants { + let stream = BufReader::new(TcpStream::connect(participant).await?); + + streams.push(stream); + } + + Ok(Self { + aws_client, + shares_queue_url: shares_queue_url.to_string(), + distances_queue_url: distances_queue_url.to_string(), + participants: streams, + }) + } + + //TODO: update error handling + pub async fn spawn(mut self) -> eyre::Result<()> { + let mmap_db: Arc = Arc::new(self.initialize_mmap_db()); + + loop { + if let Some(messages) = self.dequeue_queries().await? { + for message in messages { + let template = serde_json::from_str::