diff --git a/.env.template b/.env.template index c9f3f7c6..f721e043 100644 --- a/.env.template +++ b/.env.template @@ -2,5 +2,4 @@ DATABASE_URL='postgres://matverseny:secret@127.0.0.1:5432/matverseny' IAM_JWT_RSA_PUBLIC='-----BEGIN PUBLIC KEY-----MIICITANBgkqhkiG9w0BAQEFAAOCAg4AMIICCQKCAgBdptOAsZZBpF7P0+79iKssZuYKq6UMVUYVHYFv2ClXAJmIMU+QXcued9oZA077BXhv5e7Lu8RXqylMNUd3hEEJjisXce3e33bcDrJZmGEljv/I3YCrNWK1LFpqd5YzossJpT+65TPxmeVipqJ65ZUpBCO3V82r3dDMx8d2CGMqOOygI1afeFxoZDVm/H1flR+uDusbB9EvYlKsiCfEGlQz/lnHiHI/bEmdeIzhpCLmRhtWxyyl6wqe07f718JdGpnPo6Aql8UtSMQRjtKcRW2G67hNXGgB7uS5y8qW8fj/fIcyePZGm8TUv3vL+wMUb6+05RN7i9BXt6Eurgok2NQGYvwforJHlCsj3aIzjTcfH3s6jkdZSj9Yho2BgtJi50qWxiWYxTTAmtVSDihjIty2h2NkzlXePWDF+iQW0bkbuYQQKzM6dLGNF0+z8t7ddlvKeqG8CI1+kZ3QR+XsKWSIhhGx+yncbxWUAzqMoLUUK9WlmQttoC8VenFSunNof2QxT+1BbYJt9ZFI5ZltTICR6K9kmRNQQ7qrdQacBsKi2SD+JVK7ESAARj9FZNvf0X78LM+H1NZACe4pT4tlObH4OwkHpl77oCmghNe49Q1CNv7d5QKesOS19kBoQYMYb+jKjKc/uj7iObwTuywX8I1d19gJeHD2XkZS9VVcHbYLUQIDAQAB-----END PUBLIC KEY-----' IAM_URL=http://127.0.0.1:3001 IAM_APP_SECRET= -KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092 - +NATS_URL=127.0.0.1 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6a7eea4f..8ba635f6 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -31,6 +31,6 @@ jobs: IAM_URL: http://127.0.0.1:3001 IAM_JWT_RSA_PUBLIC: -----BEGIN PUBLIC KEY-----MIICITANBgkqhkiG9w0BAQEFAAOCAg4AMIICCQKCAgBdptOAsZZBpF7P0+79iKssZuYKq6UMVUYVHYFv2ClXAJmIMU+QXcued9oZA077BXhv5e7Lu8RXqylMNUd3hEEJjisXce3e33bcDrJZmGEljv/I3YCrNWK1LFpqd5YzossJpT+65TPxmeVipqJ65ZUpBCO3V82r3dDMx8d2CGMqOOygI1afeFxoZDVm/H1flR+uDusbB9EvYlKsiCfEGlQz/lnHiHI/bEmdeIzhpCLmRhtWxyyl6wqe07f718JdGpnPo6Aql8UtSMQRjtKcRW2G67hNXGgB7uS5y8qW8fj/fIcyePZGm8TUv3vL+wMUb6+05RN7i9BXt6Eurgok2NQGYvwforJHlCsj3aIzjTcfH3s6jkdZSj9Yho2BgtJi50qWxiWYxTTAmtVSDihjIty2h2NkzlXePWDF+iQW0bkbuYQQKzM6dLGNF0+z8t7ddlvKeqG8CI1+kZ3QR+XsKWSIhhGx+yncbxWUAzqMoLUUK9WlmQttoC8VenFSunNof2QxT+1BbYJt9ZFI5ZltTICR6K9kmRNQQ7qrdQacBsKi2SD+JVK7ESAARj9FZNvf0X78LM+H1NZACe4pT4tlObH4OwkHpl77oCmghNe49Q1CNv7d5QKesOS19kBoQYMYb+jKjKc/uj7iObwTuywX8I1d19gJeHD2XkZS9VVcHbYLUQIDAQAB-----END PUBLIC KEY----- IAM_APP_SECRET: QXBwSUQtNzAwNzk1N2QtN2Q5OC0xMWVlLTg4NTktNzY2NTY0MDAwMDAwOndhdDRUSGdwcEJJdWExa095Z3R5R3hpU3hpRkVSZFZ4 - KAFKA_BOOTSTRAP_SERVERS: 127.0.0.1:9092 + NATS_URL: 127.0.0.1 - name: cargo test --doc run: cargo test --locked --all-features --workspace --doc diff --git a/Cargo.lock b/Cargo.lock index 2338c4ef..3accfbca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -49,9 +49,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "0.7.20" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc936419f96fa211c1b9166887b38e5e40b19958e5b895be7c1f93adec7071ac" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" dependencies = [ "memchr", ] @@ -184,6 +184,39 @@ dependencies = [ "zstd-safe", ] +[[package]] +name = "async-nats" +version = "0.35.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab8df97cb8fc4a884af29ab383e9292ea0939cfcdd7d2a17179086dc6c427e7f" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures", + "memchr", + "nkeys", + "nuid", + "once_cell", + "portable-atomic", + "rand", + "regex", + "ring 0.17.5", + "rustls-native-certs", + "rustls-pemfile 2.1.2", + "rustls-webpki 0.102.4", + "serde", + "serde_json", + "serde_nanos", + "serde_repr", + "thiserror", + "time", + "tokio", + "tokio-rustls 0.26.0", + "tracing", + "tryhard", + "url", +] + [[package]] name = "async-stream" version = "0.3.4" @@ -343,6 +376,12 @@ version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "base64ct" version = "1.6.0" @@ -444,6 +483,9 @@ name = "bytes" version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" +dependencies = [ + "serde", +] [[package]] name = "cc" @@ -526,15 +568,6 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1" -[[package]] -name = "cmake" -version = "0.1.49" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db34956e100b30725f2eb215f90d4871051239535632f84fea3bc92722c66b7c" -dependencies = [ - "cc", -] - [[package]] name = "cmds" version = "0.1.0" @@ -636,9 +669,9 @@ checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" [[package]] name = "cpufeatures" -version = "0.2.5" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28d997bd5e24a5928dd43e46dc529867e207907fe0b239c3477d924f7f2ca320" +checksum = "53fe5e26ff1b7aef8bca9c6080520cfb8d9333c7568e1829cef191a9723e5504" dependencies = [ "libc", ] @@ -706,6 +739,32 @@ dependencies = [ "typenum", ] +[[package]] +name = "curve25519-dalek" +version = "4.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" +dependencies = [ + "cfg-if", + "cpufeatures", + "curve25519-dalek-derive", + "digest", + "fiat-crypto", + "rustc_version", + "subtle", +] + +[[package]] +name = "curve25519-dalek-derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.39", +] + [[package]] name = "cxx" version = "1.0.91" @@ -826,6 +885,16 @@ dependencies = [ "zeroize", ] +[[package]] +name = "deranged" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" +dependencies = [ + "powerfmt", + "serde", +] + [[package]] name = "derivative" version = "2.2.0" @@ -839,9 +908,9 @@ dependencies = [ [[package]] name = "digest" -version = "0.10.6" +version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", "const-oid 0.9.5", @@ -855,6 +924,28 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" +[[package]] +name = "ed25519" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" +dependencies = [ + "signature", +] + +[[package]] +name = "ed25519-dalek" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a3daa8e81a3963a60642bcc1f90a670680bd4a77535faa384e9d1c79d620871" +dependencies = [ + "curve25519-dalek", + "ed25519", + "sha2", + "signature", + "subtle", +] + [[package]] name = "either" version = "1.8.1" @@ -922,6 +1013,12 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" +[[package]] +name = "fiat-crypto" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" + [[package]] name = "flate2" version = "1.0.25" @@ -1576,18 +1673,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "libz-sys" -version = "1.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9702761c3935f8cc2f101793272e202c72b99da8f4224a19ddcf1279a6450bbf" -dependencies = [ - "cc", - "libc", - "pkg-config", - "vcpkg", -] - [[package]] name = "link-cplusplus" version = "1.0.8" @@ -1637,7 +1722,7 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" dependencies = [ - "regex-automata", + "regex-automata 0.1.10", ] [[package]] @@ -1651,6 +1736,7 @@ name = "matverseny-backend" version = "0.1.0" dependencies = [ "assert-json-diff", + "async-nats", "axum", "bytes", "chrono", @@ -1669,7 +1755,6 @@ dependencies = [ "pin-project-lite", "rand", "rand_chacha", - "rdkafka", "reqwest", "sea-orm 0.12.7", "serde", @@ -1699,9 +1784,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.5.0" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" [[package]] name = "migration" @@ -1745,14 +1830,28 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.6" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b9d9a46eff5b4ff64b45a9e316a6d1e0bc719ef429cbec4dc630684212bfdf9" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" dependencies = [ "libc", - "log", "wasi", - "windows-sys 0.45.0", + "windows-sys 0.48.0", +] + +[[package]] +name = "nkeys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc522a19199a0795776406619aa6aa78e1e55690fbeb3181b8db5265fd0e89ce" +dependencies = [ + "data-encoding", + "ed25519", + "ed25519-dalek", + "getrandom", + "log", + "rand", + "signatory", ] [[package]] @@ -1775,6 +1874,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "nuid" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc895af95856f929163a0aa20c26a78d26bfdc839f51b9d5aa7a5b79e52b7e83" +dependencies = [ + "rand", +] + [[package]] name = "num-bigint" version = "0.4.3" @@ -1803,6 +1911,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-integer" version = "0.1.45" @@ -1844,27 +1958,6 @@ dependencies = [ "libc", ] -[[package]] -name = "num_enum" -version = "0.5.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f646caf906c20226733ed5b1374287eb97e3c2a5c227ce668c1f2ce20ae57c9" -dependencies = [ - "num_enum_derive", -] - -[[package]] -name = "num_enum_derive" -version = "0.5.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcbff9bc912032c62bf65ef1d5aea88983b420f4f839db1e9b0c281a25c9c799" -dependencies = [ - "proc-macro-crate", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "object" version = "0.32.1" @@ -1883,6 +1976,12 @@ dependencies = [ "parking_lot_core 0.9.7", ] +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + [[package]] name = "ordered-float" version = "3.9.2" @@ -2124,20 +2223,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160" [[package]] -name = "ppv-lite86" -version = "0.2.17" +name = "portable-atomic" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +checksum = "7170ef9988bc169ba16dd36a7fa041e5c4cbeb6a35b76d4c03daded371eae7c0" [[package]] -name = "proc-macro-crate" -version = "1.3.1" +name = "powerfmt" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f4c021e1093a56626774e81216a4ce732a735e5bad4868a03f3ed65ca0c3919" -dependencies = [ - "once_cell", - "toml_edit", -] +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + +[[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro-error" @@ -2211,38 +2312,6 @@ dependencies = [ "getrandom", ] -[[package]] -name = "rdkafka" -version = "0.35.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f16c17f411935214a5870e40aff9291f8b40a73e97bf8de29e5959c473d5ef33" -dependencies = [ - "futures-channel", - "futures-util", - "libc", - "log", - "rdkafka-sys", - "serde", - "serde_derive", - "serde_json", - "slab", - "tokio", - "tracing", -] - -[[package]] -name = "rdkafka-sys" -version = "4.7.0+2.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55e0d2f9ba6253f6ec72385e453294f8618e9e15c2c6aba2a5c01ccf9622d615" -dependencies = [ - "cmake", - "libc", - "libz-sys", - "num_enum", - "pkg-config", -] - [[package]] name = "redis" version = "0.22.3" @@ -2285,13 +2354,14 @@ dependencies = [ [[package]] name = "regex" -version = "1.7.1" +version = "1.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48aaa5748ba571fb95cd2c85c09f629215d3a6ece942baa100950af03a34f733" +checksum = "b91213439dad192326a0d7c6ee3955910425f441d7038e0d6933b0aec5c4517f" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-automata 0.4.7", + "regex-syntax 0.8.4", ] [[package]] @@ -2300,7 +2370,18 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" dependencies = [ - "regex-syntax", + "regex-syntax 0.6.28", +] + +[[package]] +name = "regex-automata" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38caf58cc5ef2fed281f89292ef23f6365465ed9a41b7a7754eb4e26496c92df" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax 0.8.4", ] [[package]] @@ -2309,6 +2390,12 @@ version = "0.6.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848" +[[package]] +name = "regex-syntax" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" + [[package]] name = "reqwest" version = "0.11.22" @@ -2333,7 +2420,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "rustls 0.21.8", - "rustls-pemfile", + "rustls-pemfile 1.0.2", "serde", "serde_json", "serde_urlencoded", @@ -2435,6 +2522,15 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + [[package]] name = "rustix" version = "0.38.21" @@ -2468,10 +2564,37 @@ checksum = "446e14c5cda4f3f30fe71863c34ec70f5ac79d6087097ad0bb433e1be5edf04c" dependencies = [ "log", "ring 0.17.5", - "rustls-webpki", + "rustls-webpki 0.101.7", "sct", ] +[[package]] +name = "rustls" +version = "0.23.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05cff451f60db80f490f3c182b77c35260baace73209e9cdbbe526bfe3a4d402" +dependencies = [ + "once_cell", + "ring 0.17.5", + "rustls-pki-types", + "rustls-webpki 0.102.4", + "subtle", + "zeroize", +] + +[[package]] +name = "rustls-native-certs" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.1.2", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "1.0.2" @@ -2481,6 +2604,22 @@ dependencies = [ "base64 0.21.0", ] +[[package]] +name = "rustls-pemfile" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29993a25686778eb88d4189742cd713c9bce943bc54251a33509dc63cbacf73d" +dependencies = [ + "base64 0.22.1", + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" + [[package]] name = "rustls-webpki" version = "0.101.7" @@ -2491,6 +2630,17 @@ dependencies = [ "untrusted 0.9.0", ] +[[package]] +name = "rustls-webpki" +version = "0.102.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff448f7e92e913c4b7d4c6d8e4540a1724b319b4152b8aef6d4cf8339712b33e" +dependencies = [ + "ring 0.17.5", + "rustls-pki-types", + "untrusted 0.9.0", +] + [[package]] name = "rustversion" version = "1.0.11" @@ -2503,6 +2653,15 @@ version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041" +[[package]] +name = "schannel" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "scopeguard" version = "1.1.0" @@ -2765,6 +2924,35 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "security-framework" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "770452e37cad93e0a50d5abc3990d2bc351c36d0328f86cefec2f2fb206eaef6" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "317936bbbd05227752583946b9e66d7ce3b489f84e11a94a510b4437fef407d7" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "semver" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" + [[package]] name = "serde" version = "1.0.193" @@ -2796,6 +2984,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_nanos" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a93142f0367a4cc53ae0fead1bcda39e85beccfad3dcd717656cacab94b12985" +dependencies = [ + "serde", +] + [[package]] name = "serde_path_to_error" version = "0.1.10" @@ -2928,6 +3125,18 @@ dependencies = [ "libc", ] +[[package]] +name = "signatory" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e303f8205714074f6068773f0e29527e0453937fe837c9717d066635b65f31" +dependencies = [ + "pkcs8 0.10.2", + "rand_core", + "signature", + "zeroize", +] + [[package]] name = "signature" version = "2.1.0" @@ -3091,7 +3300,7 @@ dependencies = [ "rand", "rsa 0.6.1", "rustls 0.20.8", - "rustls-pemfile", + "rustls-pemfile 1.0.2", "sha1", "sha2", "smallvec", @@ -3134,7 +3343,7 @@ dependencies = [ "paste", "percent-encoding", "rustls 0.21.8", - "rustls-pemfile", + "rustls-pemfile 1.0.2", "serde", "serde_json", "sha2", @@ -3357,9 +3566,9 @@ checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" [[package]] name = "subtle" -version = "2.4.1" +version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" +checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" @@ -3464,11 +3673,14 @@ dependencies = [ [[package]] name = "time" -version = "0.3.20" +version = "0.3.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd0cbfecb4d19b5ea75bb31ad904eb5b9fa13f21079c3b92017ebdf4999a5890" +checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885" dependencies = [ + "deranged", "itoa", + "num-conv", + "powerfmt", "serde", "time-core", "time-macros", @@ -3476,16 +3688,17 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.0" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e153e1f1acaef8acc537e68b44906d2db6436e2b35ac2c6b42640fff91f00fd" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.8" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd80a657e71da814b8e5d60d3374fc6d35045062245d80224748ae522dd76f36" +checksum = "3f252a68540fde3a3877aeea552b832b40ab9a69e318efd078774a01ddee1ccf" dependencies = [ + "num-conv", "time-core", ] @@ -3506,9 +3719,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.33.0" +version = "1.38.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" +checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a" dependencies = [ "backtrace", "bytes", @@ -3525,9 +3738,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.1.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" dependencies = [ "proc-macro2", "quote", @@ -3555,6 +3768,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" +dependencies = [ + "rustls 0.23.10", + "rustls-pki-types", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.12" @@ -3592,23 +3816,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "toml_datetime" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ab8ed2edee10b50132aed5f331333428b011c99402b5a534154ed15746f9622" - -[[package]] -name = "toml_edit" -version = "0.19.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a1eb0622d28f4b9c90adc4ea4b2b46b47663fde9ac5fafcb14a1369d5508825" -dependencies = [ - "indexmap 1.9.2", - "toml_datetime", - "winnow", -] - [[package]] name = "tower" version = "0.4.13" @@ -3729,6 +3936,17 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" +[[package]] +name = "tryhard" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c9f0a709784e86923586cff0d872dba54cd2d2e116b3bc57587d15737cfce9d" +dependencies = [ + "futures", + "pin-project-lite", + "tokio", +] + [[package]] name = "tungstenite" version = "0.20.1" @@ -4023,7 +4241,7 @@ version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b291546d5d9d1eab74f069c77749f2cb8504a12caa20f0f2de93ddbf6f411888" dependencies = [ - "rustls-webpki", + "rustls-webpki 0.101.7", ] [[package]] @@ -4087,6 +4305,15 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.5", +] + [[package]] name = "windows-targets" version = "0.42.1" @@ -4117,6 +4344,22 @@ dependencies = [ "windows_x86_64_msvc 0.48.5", ] +[[package]] +name = "windows-targets" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb" +dependencies = [ + "windows_aarch64_gnullvm 0.52.5", + "windows_aarch64_msvc 0.52.5", + "windows_i686_gnu 0.52.5", + "windows_i686_gnullvm", + "windows_i686_msvc 0.52.5", + "windows_x86_64_gnu 0.52.5", + "windows_x86_64_gnullvm 0.52.5", + "windows_x86_64_msvc 0.52.5", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.42.1" @@ -4129,6 +4372,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263" + [[package]] name = "windows_aarch64_msvc" version = "0.42.1" @@ -4141,6 +4390,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6" + [[package]] name = "windows_i686_gnu" version = "0.42.1" @@ -4153,6 +4408,18 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" +[[package]] +name = "windows_i686_gnu" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9" + [[package]] name = "windows_i686_msvc" version = "0.42.1" @@ -4165,6 +4432,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" +[[package]] +name = "windows_i686_msvc" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf" + [[package]] name = "windows_x86_64_gnu" version = "0.42.1" @@ -4177,6 +4450,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9" + [[package]] name = "windows_x86_64_gnullvm" version = "0.42.1" @@ -4189,6 +4468,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596" + [[package]] name = "windows_x86_64_msvc" version = "0.42.1" @@ -4202,13 +4487,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] -name = "winnow" -version = "0.3.4" +name = "windows_x86_64_msvc" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c95fb4ff192527911dd18eb138ac30908e7165b8944e528b6af93aa4c842d345" -dependencies = [ - "memchr", -] +checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" [[package]] name = "winreg" @@ -4242,9 +4524,9 @@ dependencies = [ [[package]] name = "zeroize" -version = "1.5.7" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c394b5bd0c6f669e7275d9c20aa90ae064cb22e75a1cad54e1b34088034b149f" +checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" [[package]] name = "zstd" diff --git a/Cargo.toml b/Cargo.toml index 7fde5aad..a199b205 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,7 +77,6 @@ jsonwebtoken = "9.1.0" once_cell = { version = "1.18.0", features = ["parking_lot"] } rand = "0.8.5" futures = "0.3.29" -rdkafka = { version = "0.35.0", features = ["tracing", "cmake-build"] } rand_chacha = "0.3.1" serde_with = "3.4.0" tokio-tungstenite = "0.20.1" @@ -92,6 +91,7 @@ pin-project-lite = "0.2.13" reqwest = { version = "0.11.22", default-features = false, features = ["json", "rustls-tls"] } libiam = { git = "https://github.com/Verseghy/iam", package = "libiam" } chrono = { version = "0.4.31", features = ["serde"] } +async-nats = "0.35.1" [workspace.dependencies] uuid = { version = "1.5.0", features = ["v4", "fast-rng", "serde"] } diff --git a/compose.yaml b/compose.yaml index 664ceba1..3e020bce 100644 --- a/compose.yaml +++ b/compose.yaml @@ -1,5 +1,3 @@ -version: '3' - services: db: image: postgres:16 @@ -10,28 +8,12 @@ services: ports: - 5432:5432 - kafka: - image: bitnami/kafka:latest - environment: - KAFKA_BROKER_ID: 1 - KAFKA_CFG_LISTENERS: PLAINTEXT://:9092 - KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092 - KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181 - ALLOW_PLAINTEXT_LISTENER: yes - networks: - - default - - kafka + nats: + image: docker.io/nats ports: - - 9092:9092 - depends_on: - - zookeeper - - zookeeper: - image: bitnami/zookeeper:latest - environment: - ALLOW_ANONYMOUS_LOGIN: yes - networks: - - kafka + - 4222:4222 + - 6222:6222 + - 8222:8222 iam: image: verseghy/iam @@ -71,6 +53,3 @@ services: depends_on: iam-database: condition: service_healthy - -networks: - kafka: diff --git a/src/error/constants.rs b/src/error/constants.rs index b58bf107..1a9049d0 100644 --- a/src/error/constants.rs +++ b/src/error/constants.rs @@ -128,11 +128,6 @@ const_error! { #[status(INTERNAL_SERVER_ERROR)] const JSON_DESERIALIZE; } -const_error! { - #[error("M026", "kafka error")] - #[status(INTERNAL_SERVER_ERROR)] - const KAFKA_ERROR; -} const_error! { #[error("M027", "websocket error")] const WEBSOCKET_ERROR; @@ -157,3 +152,8 @@ const_error! { #[status(BAD_REQUEST)] const TIME_SECONDS_OUT_OF_RANGE; } +const_error! { + #[error("M032", "NATS error")] + #[status(INTERNAL_SERVER_ERROR)] + const NATS_ERROR; +} diff --git a/src/error/mod.rs b/src/error/mod.rs index f1b02354..9b370d13 100644 --- a/src/error/mod.rs +++ b/src/error/mod.rs @@ -9,9 +9,9 @@ use axum::{ response::{IntoResponse, Response}, }; use bytes::{BufMut, Bytes, BytesMut}; -use rdkafka::{error::KafkaError, message::OwnedMessage}; use sea_orm::DbErr; use serde_json::json; +use std::fmt::{Debug, Display}; #[derive(Debug)] pub struct Error<'a> { @@ -104,19 +104,22 @@ impl From for Error<'_> { } } -impl From for Error<'_> { +impl From> for Error<'_> +where + K: PartialEq + Debug + Display + Clone, +{ #[inline] - fn from(error: KafkaError) -> Self { - error!("kafka error: {:?}", error); - constants::KAFKA_ERROR + fn from(value: async_nats::error::Error) -> Self { + error!("NATS error: {:?}", value); + constants::NATS_ERROR } } -impl From<(KafkaError, OwnedMessage)> for Error<'_> { +impl From for Error<'_> { #[inline] - fn from((error, _): (KafkaError, OwnedMessage)) -> Self { - error!("kafka error: {:?}", error); - constants::KAFKA_ERROR + fn from(value: async_nats::SubscribeError) -> Self { + error!("NATS error: {:?}", value); + constants::NATS_ERROR } } diff --git a/src/handlers/competition/solution.rs b/src/handlers/competition/solution.rs index b0c7c904..f9521937 100644 --- a/src/handlers/competition/solution.rs +++ b/src/handlers/competition/solution.rs @@ -3,11 +3,9 @@ use crate::utils::topics; use crate::{error, error::Result, iam::Claims, json::Json, StateTrait}; use axum::{extract::State, http::StatusCode}; use entity::{solutions_history, teams}; -use rdkafka::producer::FutureRecord; use sea_orm::ActiveValue::Set; use sea_orm::{EntityTrait, QuerySelect, TransactionTrait}; use serde::Deserialize; -use std::time::Duration; use uuid::Uuid; #[derive(Debug, Deserialize)] @@ -43,18 +41,15 @@ pub async fn set_solution( .await?; state - .kafka_producer() - .send( - FutureRecord::<(), String>::to(&topics::team_solutions(&team.id)) - .partition(0) - .payload( - &serde_json::to_string(&Event::SolutionSet { - problem: request.problem, - solution: request.solution, - }) - .unwrap(), - ), - Duration::from_secs(5), + .nats() + .publish( + topics::team_solutions(&team.id), + serde_json::to_vec(&Event::SolutionSet { + problem: request.problem, + solution: request.solution, + }) + .unwrap() + .into(), ) .await?; diff --git a/src/handlers/competition/time.rs b/src/handlers/competition/time.rs index 869e1f52..8232afa3 100644 --- a/src/handlers/competition/time.rs +++ b/src/handlers/competition/time.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - use crate::{ error::{self, Result}, handlers::socket::Event, @@ -10,7 +8,6 @@ use crate::{ use axum::{extract::State, http::StatusCode}; use chrono::{NaiveDateTime, TimeZone, Utc}; use entity::times; -use rdkafka::producer::FutureRecord; use sea_orm::{ColumnTrait, Condition, EntityTrait, QueryFilter, Set, TransactionTrait}; use serde::{Deserialize, Serialize}; @@ -59,16 +56,15 @@ pub async fn set_time_patch( } state - .kafka_producer() - .send( - FutureRecord::<(), String>::to(topics::times()).payload( - &serde_json::to_string(&Event::UpdateTime { - start_time: req.start_time, - end_time: req.end_time, - }) - .unwrap(), - ), - Duration::from_secs(5), + .nats() + .publish( + topics::times(), + serde_json::to_vec(&Event::UpdateTime { + start_time: req.start_time, + end_time: req.end_time, + }) + .unwrap() + .into(), ) .await?; diff --git a/src/handlers/problem/delete.rs b/src/handlers/problem/delete.rs index a3359bff..fdc3adf3 100644 --- a/src/handlers/problem/delete.rs +++ b/src/handlers/problem/delete.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - use crate::{ error::{self, Result}, handlers::socket::Event, @@ -11,7 +9,6 @@ use axum::{ http::StatusCode, }; use entity::{problems, problems_order}; -use rdkafka::producer::FutureRecord; use sea_orm::{EntityTrait, TransactionTrait}; use uuid::Uuid; @@ -34,12 +31,12 @@ pub async fn delete_problem( } state - .kafka_producer() - .send( - FutureRecord::<(), String>::to(topics::problems()) - .partition(0) - .payload(&serde_json::to_string(&Event::DeleteProblem { id }).unwrap()), - Duration::from_secs(5), + .nats() + .publish( + topics::problems(), + serde_json::to_vec(&Event::DeleteProblem { id }) + .unwrap() + .into(), ) .await?; diff --git a/src/handlers/problem/order.rs b/src/handlers/problem/order.rs index b078ff0b..20b85fee 100644 --- a/src/handlers/problem/order.rs +++ b/src/handlers/problem/order.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - use crate::{ error::{self, DatabaseError, Result}, handlers::socket::Event, @@ -13,7 +11,6 @@ use entity::{ problems, problems_order::{self, constraints::*}, }; -use rdkafka::producer::FutureRecord; use sea_orm::{ sea_query::{CaseStatement, Query}, ActiveValue::NotSet, @@ -83,20 +80,17 @@ pub async fn change( let res = problems::Entity::find_by_id(id).one(&txn).await?.unwrap(); state - .kafka_producer() - .send( - FutureRecord::<(), String>::to(topics::problems()) - .partition(0) - .payload( - &serde_json::to_string(&Event::InsertProblem { - before: Some(before), - id: res.id, - body: res.body, - image: res.image, - }) - .unwrap(), - ), - Duration::from_secs(5), + .nats() + .publish( + topics::problems(), + serde_json::to_vec(&Event::InsertProblem { + before: Some(before), + id: res.id, + body: res.body, + image: res.image, + }) + .unwrap() + .into(), ) .await?; @@ -146,20 +140,17 @@ pub async fn change( let res = problems::Entity::find_by_id(id).one(&txn).await?.unwrap(); state - .kafka_producer() - .send( - FutureRecord::<(), String>::to(topics::problems()) - .partition(0) - .payload( - &serde_json::to_string(&Event::InsertProblem { - before: None, - id: res.id, - body: res.body, - image: res.image, - }) - .unwrap(), - ), - Duration::from_secs(5), + .nats() + .publish( + topics::problems(), + serde_json::to_vec(&Event::InsertProblem { + before: None, + id: res.id, + body: res.body, + image: res.image, + }) + .unwrap() + .into(), ) .await?; @@ -170,12 +161,12 @@ pub async fn change( delete_problem(&txn, id).await?; state - .kafka_producer() - .send( - FutureRecord::<(), String>::to(topics::problems()) - .partition(0) - .payload(&serde_json::to_string(&Event::DeleteProblem { id }).unwrap()), - Duration::from_secs(5), + .nats() + .publish( + topics::problems(), + serde_json::to_vec(&Event::DeleteProblem { id }) + .unwrap() + .into(), ) .await?; @@ -267,14 +258,12 @@ pub async fn change( .await?; state - .kafka_producer() - .send( - FutureRecord::<(), String>::to(topics::problems()) - .partition(0) - .payload( - &serde_json::to_string(&Event::SwapProblems { id1, id2 }).unwrap(), - ), - Duration::from_secs(5), + .nats() + .publish( + topics::problems(), + serde_json::to_vec(&Event::SwapProblems { id1, id2 }) + .unwrap() + .into(), ) .await?; diff --git a/src/handlers/problem/update.rs b/src/handlers/problem/update.rs index fcacb4e1..40e53472 100644 --- a/src/handlers/problem/update.rs +++ b/src/handlers/problem/update.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - use crate::{ error::{self, Result}, handlers::socket::Event, @@ -9,7 +7,6 @@ use crate::{ }; use axum::{extract::State, http::StatusCode}; use entity::problems; -use rdkafka::producer::FutureRecord; use sea_orm::{DbErr, EntityTrait, Set, TransactionTrait}; use serde::Deserialize; use uuid::Uuid; @@ -32,7 +29,7 @@ pub async fn update_problem( return Ok(StatusCode::NO_CONTENT); } - let kafka_payload = serde_json::to_string(&Event::UpdateProblem { + let payload = serde_json::to_vec(&Event::UpdateProblem { id: request.id, body: request.body.clone(), image: request.image.clone(), @@ -56,13 +53,8 @@ pub async fn update_problem( }; state - .kafka_producer() - .send( - FutureRecord::<(), String>::to(topics::problems()) - .partition(0) - .payload(&kafka_payload), - Duration::from_secs(5), - ) + .nats() + .publish(topics::problems(), payload.into()) .await?; txn.commit().await?; diff --git a/src/handlers/socket.rs b/src/handlers/socket.rs index 755fa823..4ec8795f 100644 --- a/src/handlers/socket.rs +++ b/src/handlers/socket.rs @@ -18,11 +18,7 @@ use entity::{ solutions_history, teams, users::{self, Class}, }; -use futures::StreamExt; -use rdkafka::{ - consumer::{Consumer, StreamConsumer}, - ClientConfig, Message as _, TopicPartitionList, -}; +use futures::{Stream, StreamExt}; use sea_orm::{ColumnTrait, Condition, EntityTrait, QueryFilter, QueryOrder, QuerySelect}; use serde::{Deserialize, Serialize}; use std::{borrow::Cow, error::Error as _, mem::MaybeUninit, pin::pin, time::Duration}; @@ -137,7 +133,7 @@ async fn socket_handler(state: S, socket: &mut WebSocket) -> Resu let claims_span = info_span!("claims", user_id = claims.subject.to_string()); async move { - let consumer = create_consumer(&team.id).await?; + let mut consumer = std::pin::pin!(create_consumer(&state, &team.id).await?); socket .send(Message::Text( @@ -169,9 +165,6 @@ async fn socket_handler(state: S, socket: &mut WebSocket) -> Resu let problems = state.problems(); let mut problems_stream = ProblemStream::new_empty(); - - let mut kafka_stream = consumer.stream(); - loop { tokio::select! { _ = &mut sleep_until_start, if !has_sent_initial_problems => { @@ -203,29 +196,8 @@ async fn socket_handler(state: S, socket: &mut WebSocket) -> Resu break Err(error::WEBSOCKET_ERROR) } } - message = timeout(Duration::from_secs(5), kafka_stream.next()), if has_sent_initial_problems => { - let Ok(message) = message else { - continue; - }; - - let Some(message) = message else { - error!("kafka stream closed unexpectedly"); - break Err(error::INTERNAL) - }; - - let message = message?; - - debug!("kafka message: {:?}", message); - - let Some(payload) = message.payload() else { - warn!("got kafka message without payload"); - // This shouldn't happen so if somehow it still happens just ignore it - continue - }; - - // SAFETY: the backend will always send valid utf-8 - let payload = unsafe { std::str::from_utf8_unchecked(payload) }; - let event = serde_json::from_str(payload)?; + Ok(Some(event)) = timeout(Duration::from_secs(5), consumer.next()), if has_sent_initial_problems => { + let event_text = serde_json::to_string(&event).unwrap(); if matches!(event, Event::DisbandTeam) || matches!(event, Event::LeaveTeam { user } if user == claims.subject) @@ -233,7 +205,7 @@ async fn socket_handler(state: S, socket: &mut WebSocket) -> Resu { let _ = socket.send(Message::Close(Some(CloseFrame { code: close_code::NORMAL, - reason: Cow::Owned(payload.to_owned()), + reason: Cow::Owned(event_text), }))).await; socket.next().await; @@ -241,7 +213,7 @@ async fn socket_handler(state: S, socket: &mut WebSocket) -> Resu return Ok(()) } - if let Err(err) = socket.send(Message::Text(payload.to_owned())).await { + if let Err(err) = socket.send(Message::Text(event_text)).await { let tungstenite_error = err.source().unwrap().downcast_ref::().unwrap(); error!("websocket error: {:?}", tungstenite_error); break Err(error::WEBSOCKET_ERROR) @@ -335,31 +307,6 @@ async fn send_answers( Ok(()) } -// async fn wait_for_start(state: &S) -> Result<()> { -// loop { -// let res = times::Entity::find() -// .filter(times::Column::Name.eq("start_time")) -// .one(state.db()) -// .await?; -// -// let start_time = match res { -// None => { -// error!("start_time is not found in the database"); -// return Err(error::INTERNAL); -// } -// Some(time) => time.time, -// }; -// -// if start_time < chrono::Utc::now() { -// break; -// } -// -// sleep(Duration::from_secs(3)).await; -// } -// -// Ok(()) -// } - type TeamInfo = (teams::Model, Vec, Claims); #[derive(Serialize, Deserialize)] @@ -456,31 +403,19 @@ async fn socket_auth(state: &S, socket: &mut WebSocket) -> Result Ok((result, members, claims)) } -// TODO: create a global singleton consumer for performance reasons -async fn create_consumer(team_id: &Uuid) -> Result { - let bootstrap_servers = std::env::var("KAFKA_BOOTSTRAP_SERVERS") - .expect("environment variable KAFKA_BOOTSTRAP_SERVERS is not set"); - - let mut buf = [0u8; uuid::fmt::Simple::LENGTH]; - let id = uuid::Uuid::new_v4().as_simple().encode_lower(&mut buf); - - let consumer: StreamConsumer = ClientConfig::new() - .set("bootstrap.servers", bootstrap_servers) - .set("group.id", id) - .set("enable.partition.eof", "false") - .set("enable.auto.commit", "false") - .set("auto.offset.reset", "latest") - .create()?; - - consumer.assign(&{ - let mut list = TopicPartitionList::new(); - list.add_partition(&topics::team_info(team_id), 0); - list.add_partition(&topics::team_solutions(team_id), 0); - list.add_partition(topics::times(), 0); - list - })?; - - tokio::time::sleep(std::time::Duration::from_millis(200)).await; - - Ok(consumer) +async fn create_consumer<'a, 'b, S: StateTrait>( + state: &'a S, + team_id: &'a Uuid, +) -> Result + 'b> { + let nats = state.nats(); + + let combined = futures::stream::select_all([ + nats.subscribe(topics::team_info(team_id)).await?, + nats.subscribe(topics::team_solutions(team_id)).await?, + nats.subscribe(topics::times()).await?, + ]); + + Ok(combined.filter_map(|message| async move { + serde_json::from_slice::(&message.payload).ok() + })) } diff --git a/src/handlers/team/code.rs b/src/handlers/team/code.rs index 3c2521e5..7c7515d6 100644 --- a/src/handlers/team/code.rs +++ b/src/handlers/team/code.rs @@ -7,9 +7,7 @@ use crate::{ }; use axum::{extract::State, http::StatusCode}; use entity::teams::{self, constrains::*}; -use rdkafka::producer::FutureRecord; use sea_orm::{EntityTrait, IntoActiveModel, QuerySelect, Set, TransactionTrait}; -use std::time::Duration; pub async fn regenerate_code( State(state): State, @@ -31,7 +29,7 @@ pub async fn regenerate_code( return Err(error::LOCKED_TEAM); } - let kafka_topic = topics::team_info(&team.id); + let topic = topics::team_info(&team.id); let model = team.into_active_model(); for _ in 0..16 { @@ -47,7 +45,7 @@ pub async fn regenerate_code( r => r?, }; - let kafka_payload = serde_json::to_string(&Event::UpdateTeam { + let payload = serde_json::to_vec(&Event::UpdateTeam { name: None, owner: None, co_owner: None, @@ -56,15 +54,7 @@ pub async fn regenerate_code( }) .unwrap(); - state - .kafka_producer() - .send( - FutureRecord::<(), String>::to(&kafka_topic) - .partition(0) - .payload(&kafka_payload), - Duration::from_secs(5), - ) - .await?; + state.nats().publish(topic, payload.into()).await?; txn.commit().await?; diff --git a/src/handlers/team/create.rs b/src/handlers/team/create.rs index 32c95087..789798a7 100644 --- a/src/handlers/team/create.rs +++ b/src/handlers/team/create.rs @@ -1,7 +1,7 @@ use crate::{ error::{self, DatabaseError, Result}, iam::Claims, - utils::{generate_join_code, topics}, + utils::generate_join_code, StateTrait, ValidatedJson, }; use axum::{extract::State, http::StatusCode}; @@ -10,7 +10,6 @@ use entity::{ teams::{self, constrains::*}, users, }; -use rdkafka::admin::{AdminOptions, NewTopic, TopicReplication}; use sea_orm::{EntityTrait, QuerySelect, Set, TransactionTrait}; use serde::Deserialize; use uuid::Uuid; @@ -77,25 +76,6 @@ pub async fn create_team( r => r?, }; - state - .kafka_admin() - .create_topics( - &[ - NewTopic::new( - &topics::team_info(&result.last_insert_id), - 1, - TopicReplication::Fixed(1), - ), - NewTopic::new( - &topics::team_solutions(&result.last_insert_id), - 1, - TopicReplication::Fixed(1), - ), - ], - &AdminOptions::new(), - ) - .await?; - txn.commit().await?; return Ok(StatusCode::CREATED); diff --git a/src/handlers/team/disband.rs b/src/handlers/team/disband.rs index dce50c7f..ae76d777 100644 --- a/src/handlers/team/disband.rs +++ b/src/handlers/team/disband.rs @@ -7,9 +7,7 @@ use crate::{ }; use axum::{extract::State, http::StatusCode}; use entity::{team_members, teams}; -use rdkafka::producer::FutureRecord; use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect, TransactionTrait}; -use std::time::Duration; pub async fn disband_team( State(state): State, @@ -39,12 +37,10 @@ pub async fn disband_team( teams::Entity::delete_by_id(team.id).exec(&txn).await?; state - .kafka_producer() - .send( - FutureRecord::<(), String>::to(&topics::team_info(&team.id)) - .partition(0) - .payload(&serde_json::to_string(&Event::DisbandTeam).unwrap()), - Duration::from_secs(5), + .nats() + .publish( + topics::team_info(&team.id), + serde_json::to_vec(&Event::DisbandTeam).unwrap().into(), ) .await?; diff --git a/src/handlers/team/join.rs b/src/handlers/team/join.rs index bfe693bc..ce86c63d 100644 --- a/src/handlers/team/join.rs +++ b/src/handlers/team/join.rs @@ -10,10 +10,8 @@ use entity::{ team_members::{self, constraints::*}, teams, users, }; -use rdkafka::producer::FutureRecord; use sea_orm::{EntityTrait, QuerySelect, Set, TransactionTrait}; use serde::Deserialize; -use std::time::Duration; #[derive(Deserialize)] pub struct Request { @@ -72,18 +70,15 @@ pub async fn join_team( })?; state - .kafka_producer() - .send( - FutureRecord::<(), String>::to(&topics::team_info(&team.id)) - .partition(0) - .payload( - &serde_json::to_string(&Event::JoinTeam { - user: user.id, - name: user_info.name, - }) - .unwrap(), - ), - Duration::from_secs(5), + .nats() + .publish( + topics::team_info(&team.id), + serde_json::to_vec(&Event::JoinTeam { + user: user.id, + name: user_info.name, + }) + .unwrap() + .into(), ) .await?; diff --git a/src/handlers/team/kick.rs b/src/handlers/team/kick.rs index 1c928cfd..2f53355c 100644 --- a/src/handlers/team/kick.rs +++ b/src/handlers/team/kick.rs @@ -8,10 +8,8 @@ use crate::{ }; use axum::{extract::State, http::StatusCode}; use entity::{team_members, teams, users}; -use rdkafka::producer::FutureRecord; use sea_orm::{EntityTrait, IntoActiveModel, QuerySelect, Set, TransactionTrait}; use serde::Deserialize; -use std::time::Duration; use uuid::Uuid; #[derive(Deserialize)] @@ -65,7 +63,7 @@ pub async fn kick_user( return Err(error::NO_SUCH_MEMBER); } - let kafka_topic = topics::team_info(&team.id); + let topic = topics::team_info(&team.id); if Some(request.user) == team.co_owner { let mut model = team.into_active_model(); @@ -74,12 +72,12 @@ pub async fn kick_user( } state - .kafka_producer() - .send( - FutureRecord::<(), String>::to(&kafka_topic) - .partition(0) - .payload(&serde_json::to_string(&Event::KickUser { user: request.user }).unwrap()), - Duration::from_secs(5), + .nats() + .publish( + topic, + serde_json::to_vec(&Event::KickUser { user: request.user }) + .unwrap() + .into(), ) .await?; diff --git a/src/handlers/team/leave.rs b/src/handlers/team/leave.rs index a7d2da8f..4471665f 100644 --- a/src/handlers/team/leave.rs +++ b/src/handlers/team/leave.rs @@ -1,9 +1,7 @@ use crate::{error, handlers::socket::Event, iam::Claims, utils::topics, Result, StateTrait}; use axum::{extract::State, http::StatusCode}; use entity::{team_members, teams, users}; -use rdkafka::producer::FutureRecord; use sea_orm::{EntityTrait, QuerySelect, TransactionTrait}; -use std::time::Duration; pub async fn leave_team( State(state): State, @@ -42,12 +40,12 @@ pub async fn leave_team( .await?; state - .kafka_producer() - .send( - FutureRecord::<(), String>::to(&topics::team_info(&team.id)) - .partition(0) - .payload(&serde_json::to_string(&Event::LeaveTeam { user: user.id }).unwrap()), - Duration::from_secs(5), + .nats() + .publish( + topics::team_info(&team.id), + serde_json::to_vec(&Event::LeaveTeam { user: user.id }) + .unwrap() + .into(), ) .await?; diff --git a/src/handlers/team/update.rs b/src/handlers/team/update.rs index 9c350a1d..5f849c5c 100644 --- a/src/handlers/team/update.rs +++ b/src/handlers/team/update.rs @@ -8,10 +8,8 @@ use crate::{ }; use axum::{extract::State, http::StatusCode}; use entity::teams; -use rdkafka::producer::FutureRecord; use sea_orm::{ConnectionTrait, EntityTrait, IntoActiveModel, QuerySelect, TransactionTrait}; use serde::Deserialize; -use std::time::Duration; use uuid::Uuid; use validator::Validate; @@ -73,7 +71,7 @@ pub async fn update_team( } } - let kafka_payload = serde_json::to_string(&Event::UpdateTeam { + let payload = serde_json::to_vec(&Event::UpdateTeam { name: request.name.clone(), owner: request.owner, co_owner: request.co_owner, @@ -82,7 +80,7 @@ pub async fn update_team( }) .unwrap(); - let kafka_topic = topics::team_info(&team.id); + let topic = topics::team_info(&team.id); let mut active_model = team.into_active_model(); active_model.name = set_option(request.name); @@ -92,15 +90,7 @@ pub async fn update_team( teams::Entity::update(active_model).exec(&txn).await?; - state - .kafka_producer() - .send( - FutureRecord::<(), String>::to(&kafka_topic) - .partition(0) - .payload(&kafka_payload), - Duration::from_secs(5), - ) - .await?; + state.nats().publish(topic, payload.into()).await?; txn.commit().await?; Ok(StatusCode::NO_CONTENT) diff --git a/src/state.rs b/src/state.rs index eca6ac77..7bfb105d 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,6 +1,6 @@ use crate::{ iam::{Iam, IamTrait}, - utils::{topics, Problems}, + utils::Problems, }; use libiam::App; use rand::{ @@ -8,12 +8,6 @@ use rand::{ Rng, SeedableRng, }; use rand_chacha::ChaCha20Core; -use rdkafka::{ - admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}, - client::DefaultClientContext, - producer::FutureProducer, - ClientConfig, -}; use sea_orm::{ConnectOptions, ConnectionTrait, Database, DbConn, TransactionTrait}; use std::{env, sync::Arc}; use tracing::log::LevelFilter; @@ -27,20 +21,18 @@ pub trait StateTrait: Send + Sync + Clone + 'static { fn iam(&self) -> &Self::Iam; fn iam_app(&self) -> &App; fn rng(&self) -> Self::Rand; - fn kafka_producer(&self) -> &FutureProducer; - fn kafka_admin(&self) -> &AdminClient; fn app_secret(&self) -> &str; fn problems(&self) -> Arc; + fn nats(&self) -> async_nats::Client; } pub struct State { database: DbConn, iam: Iam, iam_app: App, - kafka_producer: FutureProducer, - kafka_admin: AdminClient, app_secret: String, problems: Arc, + nats: async_nats::Client, } impl State { @@ -49,55 +41,18 @@ impl State { } pub async fn with_database(iam_app: App, conn: DbConn) -> Arc { - let problems = Problems::new(&conn).await; + let nats = Self::connect_nats().await; + let problems = Problems::new(&conn, nats.clone()).await; Arc::new(Self { database: conn, iam: Iam::new(), iam_app, - kafka_producer: Self::create_kafka_producer(), - kafka_admin: Self::create_kafka_admin().await, app_secret: env::var("IAM_APP_SECRET").expect("IAM_APP_SECRET is not set"), problems: Arc::new(problems), + nats, }) } - fn create_kafka_producer() -> FutureProducer { - info!("Creating kafka producer"); - - let bootstrap_servers = - env::var("KAFKA_BOOTSTRAP_SERVERS").expect("KAFKA_BOOTSTRAP_SERVERS not set"); - - ClientConfig::new() - .set("bootstrap.servers", bootstrap_servers) - .create() - .expect("failed to create kafka producer") - } - - async fn create_kafka_admin() -> AdminClient { - info!("Creating kafka admin client"); - - let bootstrap_servers = - env::var("KAFKA_BOOTSTRAP_SERVERS").expect("KAFKA_BOOTSRAP_SERVERS not set"); - - let admin = ClientConfig::new() - .set("bootstrap.servers", bootstrap_servers) - .create::>() - .expect("failed to create kafka admin client"); - - admin - .create_topics( - &[ - NewTopic::new(topics::times(), 1, TopicReplication::Fixed(1)), - NewTopic::new(topics::problems(), 1, TopicReplication::Fixed(1)), - ], - &AdminOptions::new(), - ) - .await - .expect("failed to create times topic"); - - admin - } - async fn connect_database() -> DbConn { info!("Trying to connect to database"); @@ -111,6 +66,17 @@ impl State { db } + + async fn connect_nats() -> async_nats::Client { + info!("Trying to connect to NATS"); + + let url = env::var("NATS_URL").expect("NATS_URL is not set"); + let client = async_nats::connect(url).await.unwrap(); + + info!("Connected to NATS"); + + client + } } thread_local! { @@ -141,14 +107,6 @@ impl StateTrait for Arc { CHACHA_THREAD_RNG.with(|x| x.clone()) } - fn kafka_producer(&self) -> &FutureProducer { - &self.kafka_producer - } - - fn kafka_admin(&self) -> &AdminClient { - &self.kafka_admin - } - fn app_secret(&self) -> &str { &self.app_secret } @@ -156,4 +114,8 @@ impl StateTrait for Arc { fn problems(&self) -> Arc { Arc::clone(&self.problems) } + + fn nats(&self) -> async_nats::Client { + self.nats.clone() + } } diff --git a/src/utils/problems.rs b/src/utils/problems.rs index 0339f262..cf23e7e1 100644 --- a/src/utils/problems.rs +++ b/src/utils/problems.rs @@ -2,10 +2,6 @@ use crate::{handlers::socket::Event, utils::topics}; use entity::{problems, problems_order}; use futures::{Stream, StreamExt}; use pin_project_lite::pin_project; -use rdkafka::{ - consumer::{Consumer, StreamConsumer}, - ClientConfig, Message, TopicPartitionList, -}; use sea_orm::{ ConnectionTrait, DbConn, EntityName, EntityTrait, FromQueryResult, JoinType, QuerySelect, RelationTrait, TransactionTrait, @@ -19,7 +15,6 @@ use std::{ use tokio::{ sync::{broadcast, mpsc, RwLock}, task, - time::sleep, }; use uuid::Uuid; @@ -38,7 +33,7 @@ pub struct Problems { } impl Problems { - pub async fn new(db: &DbConn) -> Self { + pub async fn new(db: &DbConn, nats: async_nats::Client) -> Self { let txn = db.begin().await.expect("failed to start transaction"); txn.execute_unprepared(&format!( @@ -64,30 +59,7 @@ impl Problems { let problems = Arc::new(RwLock::new(sort_initial_problems(res))); - let bootstrap_servers = std::env::var("KAFKA_BOOTSTRAP_SERVERS") - .expect("environment variable KAFKA_BOOTSTRAP_SERVERS is not set"); - - let mut buf = [0u8; uuid::fmt::Simple::LENGTH]; - let id = uuid::Uuid::new_v4().as_simple().encode_lower(&mut buf); - - let consumer: StreamConsumer = ClientConfig::new() - .set("bootstrap.servers", bootstrap_servers) - .set("group.id", id) - .set("enable.partition.eof", "false") - .set("enable.auto.commit", "false") - .set("auto.offset.reset", "latest") - .create() - .expect("Failed to create kafka consumer"); - - consumer - .assign(&{ - let mut list = TopicPartitionList::new(); - list.add_partition(topics::problems(), 0); - list - }) - .expect("failed to assign topics"); - - sleep(std::time::Duration::from_millis(200)).await; + let mut subscription = nats.subscribe(topics::problems()).await.unwrap(); txn.commit().await.expect("failed to commit transaction"); @@ -97,22 +69,8 @@ impl Problems { let problems = Arc::clone(&problems); let tx = tx.clone(); async move { - let mut stream = consumer.stream(); - while let Some(message) = stream.next().await { - let message = match message { - Err(err) => { - error!("kafka error: {err:?}"); - break; - } - Ok(message) => message, - }; - - let Some(Ok(payload)) = message.payload_view::() else { - error!("payload is not utf-8 string"); - break; - }; - - let event: Event = serde_json::from_str(payload).unwrap(); + while let Some(message) = subscription.next().await { + let event: Event = serde_json::from_slice(&message.payload).unwrap(); debug!("problems message: {event:?}"); let mut guard = problems.write().await; @@ -164,10 +122,10 @@ impl Problems { if let Some(pos) = pos { if let Some(body) = body { - guard[pos].body = body.clone(); + guard[pos].body.clone_from(body); } if let Some(image) = image { - guard[pos].image = image.clone(); + guard[pos].image.clone_from(image); } } else { warn!("no problems with id: {}", id); diff --git a/tests/problem.rs b/tests/problem.rs index 63364148..eecd71ac 100644 --- a/tests/problem.rs +++ b/tests/problem.rs @@ -316,9 +316,9 @@ mod delete { #[parallel] async fn success() { let app = get_cached_app().await; - - let user = utils::iam::register_user().await; + let user = app.register_user().await; utils::iam::make_admin(&user).await; + let _team = app.create_team(&user).await; let res = app .post("/problem") @@ -338,6 +338,9 @@ mod delete { res.json::().await, ); + let mut socket = app.socket("/ws").start().await; + assert_team_info!(socket, user); + let body: Value = res.json().await; let id = body["id"].as_str().unwrap(); @@ -353,6 +356,18 @@ mod delete { "failed to delete problem: response={:#?}", res.json::().await, ); + + let message = utils::get_socket_message(socket.next().await); + + assert_json_eq!( + message, + json!({ + "event": "DELETE_PROBLEM", + "data": { + "id": id, + }, + }), + ); } #[tokio::test]