diff --git a/Cargo.lock b/Cargo.lock index 7d8fdad65..3fd4e7910 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -38,6 +38,161 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f093eed78becd229346bf859eec0aa4dd7ddde0757287b2b4107a1f09c80002" +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener 2.5.3", + "futures-core", +] + +[[package]] +name = "async-channel" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28243a43d821d11341ab73c80bed182dc015c514b951616cf79bd4af39af0c3" +dependencies = [ + "concurrent-queue", + "event-listener 5.0.0", + "event-listener-strategy 0.5.0", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-executor" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17ae5ebefcc48e7452b4987947920dac9450be1110cadf34d1b8c116bdbaf97c" +dependencies = [ + "async-lock 3.3.0", + "async-task", + "concurrent-queue", + "fastrand 2.0.1", + "futures-lite 2.2.0", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" +dependencies = [ + "async-channel 2.2.0", + "async-executor", + "async-io 2.3.1", + "async-lock 3.3.0", + "blocking", + "futures-lite 2.2.0", + "once_cell", +] + +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock 2.8.0", + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-lite 1.13.0", + "log", + "parking", + "polling 2.8.0", + "rustix 0.37.27", + "slab", + "socket2 0.4.10", + "waker-fn", +] + +[[package]] +name = "async-io" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f97ab0c5b00a7cdbe5a371b9a782ee7be1316095885c8a4ea1daf490eb0ef65" +dependencies = [ + "async-lock 3.3.0", + "cfg-if", + "concurrent-queue", + "futures-io", + "futures-lite 2.2.0", + "parking", + "polling 3.4.0", + "rustix 0.38.28", + "slab", + "tracing", + "windows-sys 0.52.0", +] + +[[package]] +name = "async-lock" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" +dependencies = [ + "event-listener 2.5.3", +] + +[[package]] +name = "async-lock" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d034b430882f8381900d3fe6f0aaa3ad94f2cb4ac519b429692a1bc2dda4ae7b" +dependencies = [ + "event-listener 4.0.3", + "event-listener-strategy 0.4.0", + "pin-project-lite", +] + +[[package]] +name = "async-std" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" +dependencies = [ + "async-channel 1.9.0", + "async-global-executor", + "async-io 1.13.0", + "async-lock 2.8.0", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite 1.13.0", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + +[[package]] +name = "async-task" +version = "4.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbb36e985947064623dbd357f727af08ffd077f93d696782f3c56365fa2e2799" + +[[package]] +name = "async-trait" +version = "0.1.76" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "531b97fb4cd3dfdce92c35dedbfdc1f0b9d8091c8ca943d6dae340ef5012d514" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.42", +] + [[package]] name = "atom_syndication" version = "0.12.2" @@ -51,6 +206,12 @@ dependencies = [ "quick-xml", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "autocfg" version = "1.1.0" @@ -99,6 +260,22 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a37913e8dc4ddcc604f0c6d3bf2887c995153af3611de9e23c352b44c1b9118" +dependencies = [ + "async-channel 2.2.0", + "async-lock 3.3.0", + "async-task", + "fastrand 2.0.1", + "futures-io", + "futures-lite 2.2.0", + "piper", + "tracing", +] + [[package]] name = "bumpalo" version = "3.14.0" @@ -141,6 +318,29 @@ dependencies = [ "num-traits", ] +[[package]] +name = "combine" +version = "4.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35ed6e9d84f0b51a7f52daf1c7d71dd136fd7a3f41a8462b8cdb8c78d920fad4" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + +[[package]] +name = "concurrent-queue" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "contrast" version = "0.1.0" @@ -176,6 +376,12 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" +[[package]] +name = "crossbeam-utils" +version = "0.8.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" + [[package]] name = "crypto-common" version = "0.1.6" @@ -306,6 +512,63 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + +[[package]] +name = "event-listener" +version = "4.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b215c49b2b248c855fb73579eb1f4f26c38ffdc12973e20e07b91d78d5646e" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b72557800024fabbaa2449dd4bf24e37b93702d457a4d4f2b0dd1f0f039f20c1" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3" +dependencies = [ + "event-listener 4.0.3", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "feedafcaa9b749175d5ac357452a9d41ea2911da598fde46ce1fe02c37751291" +dependencies = [ + "event-listener 5.0.0", + "pin-project-lite", +] + +[[package]] +name = "fastrand" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + [[package]] name = "fastrand" version = "2.0.1" @@ -352,43 +615,121 @@ dependencies = [ "new_debug_unreachable", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", + "futures-sink", ] [[package]] name = "futures-core" -version = "0.3.29" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" + +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand 1.9.0", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + +[[package]] +name = "futures-lite" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "445ba825b27408685aaecefd65178908c36c6e96aaf6d8599419d46e624192ba" +dependencies = [ + "fastrand 2.0.1", + "futures-core", + "futures-io", + "parking", + "pin-project-lite", +] + +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.42", +] [[package]] name = "futures-sink" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] name = "futures-task" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-util" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -418,6 +759,18 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +[[package]] +name = "gloo-timers" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "h2" version = "0.3.24" @@ -520,7 +873,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.5.5", "tokio", "tower-service", "tracing", @@ -567,6 +920,26 @@ dependencies = [ "serde", ] +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "io-lifetimes" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys 0.48.0", +] + [[package]] name = "ipnet" version = "2.9.0" @@ -606,6 +979,15 @@ dependencies = [ "serde", ] +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -628,6 +1010,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "linux-raw-sys" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" + [[package]] name = "linux-raw-sys" version = "0.4.12" @@ -649,6 +1037,9 @@ name = "log" version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +dependencies = [ + "value-bag", +] [[package]] name = "mac" @@ -674,13 +1065,17 @@ dependencies = [ name = "matrix-hookshot" version = "5.1.2" dependencies = [ + "async-std", "atom_syndication", "contrast", + "futures", "hex", "md-5", "napi", "napi-build", "napi-derive", + "rand", + "redis", "reqwest", "rgb", "rss", @@ -689,6 +1084,7 @@ dependencies = [ "serde_derive", "serde_json", "url", + "uuid", ] [[package]] @@ -901,6 +1297,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "parking" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" + [[package]] name = "parking_lot" version = "0.12.1" @@ -1022,12 +1424,53 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "668d31b1c4eba19242f2088b2bf3316b82ca31082a8335764db4e083db7485d4" +dependencies = [ + "atomic-waker", + "fastrand 2.0.1", + "futures-io", +] + [[package]] name = "pkg-config" version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a" +[[package]] +name = "polling" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +dependencies = [ + "autocfg", + "bitflags 1.3.2", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys 0.48.0", +] + +[[package]] +name = "polling" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30054e72317ab98eddd8561db0f6524df3367636884b7b21b703e4b280a84a14" +dependencies = [ + "cfg-if", + "concurrent-queue", + "pin-project-lite", + "rustix 0.38.28", + "tracing", + "windows-sys 0.52.0", +] + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -1108,6 +1551,27 @@ dependencies = [ "getrandom", ] +[[package]] +name = "redis" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c580d9cbbe1d1b479e8d67cf9daf6a62c957e6846048408b80b43ac3f6af84cd" +dependencies = [ + "async-trait", + "bytes", + "combine", + "futures-util", + "itoa", + "percent-encoding", + "pin-project-lite", + "ryu", + "sha1_smol", + "socket2 0.4.10", + "tokio", + "tokio-util", + "url", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -1313,6 +1777,20 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustix" +version = "0.37.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea8ca367a3a01fe35e6943c400addf443c0f57670e6ec51196f71a4b8762dd2" +dependencies = [ + "bitflags 1.3.2", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys 0.3.8", + "windows-sys 0.48.0", +] + [[package]] name = "rustix" version = "0.38.28" @@ -1322,7 +1800,7 @@ dependencies = [ "bitflags 2.4.1", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.4.12", "windows-sys 0.52.0", ] @@ -1441,6 +1919,12 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1_smol" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" + [[package]] name = "siphasher" version = "0.3.11" @@ -1462,6 +1946,16 @@ version = "1.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" +[[package]] +name = "socket2" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "socket2" version = "0.5.5" @@ -1554,9 +2048,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ef1adac450ad7f4b3c28589471ade84f25f731a7a0fe30d71dfa9f60fd808e5" dependencies = [ "cfg-if", - "fastrand", + "fastrand 2.0.1", "redox_syscall", - "rustix", + "rustix 0.38.28", "windows-sys 0.48.0", ] @@ -1618,7 +2112,7 @@ dependencies = [ "mio", "num_cpus", "pin-project-lite", - "socket2", + "socket2 0.5.5", "windows-sys 0.48.0", ] @@ -1773,6 +2267,21 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "uuid" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f00cc9702ca12d3c81455259621e676d0f7251cec66a21e98fe2e9a37db93b2a" +dependencies = [ + "getrandom", +] + +[[package]] +name = "value-bag" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "126e423afe2dd9ac52142e7e9d5ce4135d7e13776c529d27fd6bc49f19e3280b" + [[package]] name = "vcpkg" version = "0.2.15" @@ -1785,6 +2294,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "waker-fn" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3c4517f54858c779bbcbf228f4fca63d121bf85fbecb2dc578cdf4a39395690" + [[package]] name = "want" version = "0.3.1" @@ -1882,6 +2397,28 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffa44a4268d649eba546544ed45fd9591059d9653a0e584efe030b56d8172b58" +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/Cargo.toml b/Cargo.toml index 4d75e4089..76273c94a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,11 @@ rss = "2.0" atom_syndication = "0.12" ruma = { version = "0.9", features = ["events", "html"] } reqwest = "0.11" +rand = "0.8.5" +uuid = { version = "1.7.0", features = ["v4"] } +async-std = "1.12.0" +redis = { version = "0.24.0", features = ["aio", "tokio-comp"] } +futures = "0.3.30" [build-dependencies] napi-build = "2" diff --git a/changelog.d/890.misc b/changelog.d/890.misc new file mode 100644 index 000000000..23b8fb1c1 --- /dev/null +++ b/changelog.d/890.misc @@ -0,0 +1 @@ +Failing RSS/atom feeds are now backed off before being retried. This should result in a speedup for large public deployments where failing feeds may result in a slowdown. \ No newline at end of file diff --git a/spec/github.spec.ts b/spec/github.spec.ts index 57a431c5b..132674298 100644 --- a/spec/github.spec.ts +++ b/spec/github.spec.ts @@ -58,7 +58,7 @@ describe('GitHub', () => { return testEnv?.tearDown(); }); - it.only('should be able to handle a GitHub event', async () => { + it('should be able to handle a GitHub event', async () => { const user = testEnv.getUser('user'); const bridgeApi = await getBridgeApi(testEnv.opts.config?.widgets?.publicUrl!, user); const testRoomId = await user.createRoom({ name: 'Test room', invite:[testEnv.botMxid] }); diff --git a/src/App/ResetCryptoStore.ts b/src/App/ResetCryptoStore.ts index 1fddf1458..b9b5c77c5 100644 --- a/src/App/ResetCryptoStore.ts +++ b/src/App/ResetCryptoStore.ts @@ -5,7 +5,7 @@ import { Logger } from "matrix-appservice-bridge"; import { LogService, MatrixClient } from "matrix-bot-sdk"; import { getAppservice } from "../appservice"; import BotUsersManager from "../Managers/BotUsersManager"; -import { IBridgeStorageProvider } from "../Stores/StorageProvider"; +import { IBridgeStorageProvider } from "../stores/StorageProvider"; const log = new Logger("ResetCryptoStore"); diff --git a/src/Bridge.ts b/src/Bridge.ts index c92517458..79a1e5a73 100644 --- a/src/Bridge.ts +++ b/src/Bridge.ts @@ -8,7 +8,7 @@ import { CommentProcessor } from "./CommentProcessor"; import { ConnectionManager } from "./ConnectionManager"; import { GetIssueResponse, GetIssueOpts } from "./Gitlab/Types" import { GithubInstance } from "./github/GithubInstance"; -import { IBridgeStorageProvider } from "./Stores/StorageProvider"; +import { IBridgeStorageProvider } from "./stores/StorageProvider"; import { IConnection, GitHubDiscussionSpace, GitHubDiscussionConnection, GitHubUserSpace, JiraProjectConnection, GitLabRepoConnection, GitHubIssueConnection, GitHubProjectConnection, GitHubRepoConnection, GitLabIssueConnection, FigmaFileConnection, FeedConnection, GenericHookConnection, WebhookResponse } from "./Connections"; import { IGitLabWebhookIssueStateEvent, IGitLabWebhookMREvent, IGitLabWebhookNoteEvent, IGitLabWebhookPushEvent, IGitLabWebhookReleaseEvent, IGitLabWebhookTagPushEvent, IGitLabWebhookWikiPageEvent } from "./Gitlab/WebhookTypes"; diff --git a/src/ConnectionManager.ts b/src/ConnectionManager.ts index c475aa778..aac180fc7 100644 --- a/src/ConnectionManager.ts +++ b/src/ConnectionManager.ts @@ -13,7 +13,7 @@ import { FigmaFileConnection, FeedConnection } from "./Connections"; import { GetConnectionTypeResponseItem } from "./provisioning/api"; import { GitLabClient } from "./Gitlab/Client"; import { GithubInstance } from "./github/GithubInstance"; -import { IBridgeStorageProvider } from "./Stores/StorageProvider"; +import { IBridgeStorageProvider } from "./stores/StorageProvider"; import { JiraProject, JiraVersion } from "./jira/Types"; import { Logger } from "matrix-appservice-bridge"; import { MessageSenderClient } from "./MatrixSender"; diff --git a/src/Connections/FeedConnection.ts b/src/Connections/FeedConnection.ts index d1aa9b248..26f427f38 100644 --- a/src/Connections/FeedConnection.ts +++ b/src/Connections/FeedConnection.ts @@ -220,15 +220,16 @@ export class FeedConnection extends BaseConnection implements IConnection { // We want to retry these sends, because sometimes the network / HS // craps out. + const content = { + msgtype: 'm.notice', + format: "org.matrix.custom.html", + formatted_body: md.renderInline(message), + body: message, + external_url: entry.link ?? undefined, + "uk.half-shot.matrix-hookshot.feeds.item": entry, + }; await retry( - () => this.intent.sendEvent(this.roomId, { - msgtype: 'm.notice', - format: "org.matrix.custom.html", - formatted_body: md.renderInline(message), - body: message, - external_url: entry.link ?? undefined, - "uk.half-shot.matrix-hookshot.feeds.item": entry, - }), + () => this.intent.sendEvent(this.roomId, content), SEND_EVENT_MAX_ATTEMPTS, SEND_EVENT_INTERVAL_MS, // Filter for showstopper errors like 4XX errors, but otherwise diff --git a/src/Connections/FigmaFileConnection.ts b/src/Connections/FigmaFileConnection.ts index 1a662dede..526404456 100644 --- a/src/Connections/FigmaFileConnection.ts +++ b/src/Connections/FigmaFileConnection.ts @@ -4,7 +4,7 @@ import { FigmaPayload } from "../figma/types"; import { BaseConnection } from "./BaseConnection"; import { IConnection, IConnectionState } from "."; import { Logger } from "matrix-appservice-bridge"; -import { IBridgeStorageProvider } from "../Stores/StorageProvider"; +import { IBridgeStorageProvider } from "../stores/StorageProvider"; import { BridgeConfig } from "../config/Config"; import { Connection, InstantiateConnectionOpts, ProvisionConnectionOpts } from "./IConnection"; import { ConfigGrantChecker, GrantChecker } from "../grants/GrantCheck"; diff --git a/src/Connections/GitlabRepo.ts b/src/Connections/GitlabRepo.ts index c3936a4c5..5115c0a83 100644 --- a/src/Connections/GitlabRepo.ts +++ b/src/Connections/GitlabRepo.ts @@ -16,7 +16,7 @@ import { CommandError } from "../errors"; import QuickLRU from "@alloc/quick-lru"; import { HookFilter } from "../HookFilter"; import { GitLabClient } from "../Gitlab/Client"; -import { IBridgeStorageProvider } from "../Stores/StorageProvider"; +import { IBridgeStorageProvider } from "../stores/StorageProvider"; import axios from "axios"; import { GitLabGrantChecker } from "../Gitlab/GrantChecker"; diff --git a/src/Connections/IConnection.ts b/src/Connections/IConnection.ts index 9fe63a9a3..c69247cb8 100644 --- a/src/Connections/IConnection.ts +++ b/src/Connections/IConnection.ts @@ -6,7 +6,7 @@ import { BridgeConfig, BridgePermissionLevel } from "../config/Config"; import { UserTokenStore } from "../UserTokenStore"; import { CommentProcessor } from "../CommentProcessor"; import { MessageSenderClient } from "../MatrixSender"; -import { IBridgeStorageProvider } from "../Stores/StorageProvider"; +import { IBridgeStorageProvider } from "../stores/StorageProvider"; import { GithubInstance } from "../github/GithubInstance"; import "reflect-metadata"; diff --git a/src/NotificationsProcessor.ts b/src/NotificationsProcessor.ts index 8b4ea8013..eced03553 100644 --- a/src/NotificationsProcessor.ts +++ b/src/NotificationsProcessor.ts @@ -1,5 +1,5 @@ import { MessageSenderClient } from "./MatrixSender"; -import { IBridgeStorageProvider } from "./Stores/StorageProvider"; +import { IBridgeStorageProvider } from "./stores/StorageProvider"; import { UserNotificationsEvent } from "./Notifications/UserNotificationWatcher"; import { Logger } from "matrix-appservice-bridge"; import { AdminRoom } from "./AdminRoom"; diff --git a/src/Widgets/BridgeWidgetApi.ts b/src/Widgets/BridgeWidgetApi.ts index 385557890..6cec32246 100644 --- a/src/Widgets/BridgeWidgetApi.ts +++ b/src/Widgets/BridgeWidgetApi.ts @@ -5,7 +5,7 @@ import { ApiError, ErrCode } from "../api"; import { BridgeConfig } from "../config/Config"; import { GetAuthPollResponse, GetAuthResponse, GetConnectionsForServiceResponse } from "./BridgeWidgetInterface"; import { ProvisioningApi, ProvisioningRequest } from "matrix-appservice-bridge"; -import { IBridgeStorageProvider } from "../Stores/StorageProvider"; +import { IBridgeStorageProvider } from "../stores/StorageProvider"; import { ConnectionManager } from "../ConnectionManager"; import BotUsersManager, {BotUser} from "../Managers/BotUsersManager"; import { assertUserPermissionsInRoom, GetConnectionsResponseItem } from "../provisioning/api"; diff --git a/src/appservice.ts b/src/appservice.ts index 2bd97742d..5134989cc 100644 --- a/src/appservice.ts +++ b/src/appservice.ts @@ -2,9 +2,9 @@ import { Logger } from "matrix-appservice-bridge"; import { Appservice, IAppserviceCryptoStorageProvider, IAppserviceRegistration, RustSdkAppserviceCryptoStorageProvider, RustSdkCryptoStoreType } from "matrix-bot-sdk"; import { BridgeConfig } from "./config/Config"; import Metrics from "./Metrics"; -import { MemoryStorageProvider } from "./Stores/MemoryStorageProvider"; -import { RedisStorageProvider } from "./Stores/RedisStorageProvider"; -import { IBridgeStorageProvider } from "./Stores/StorageProvider"; +import { MemoryStorageProvider } from "./stores/MemoryStorageProvider"; +import { RedisStorageProvider } from "./stores/RedisStorageProvider"; +import { IBridgeStorageProvider } from "./stores/StorageProvider"; const log = new Logger("Appservice"); export function getAppservice(config: BridgeConfig, registration: IAppserviceRegistration) { diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index 46ccadb2c..69e1df5c9 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -7,10 +7,16 @@ import axios from "axios"; import Metrics from "../Metrics"; import { randomUUID } from "crypto"; import { readFeed } from "../libRs"; -import { IBridgeStorageProvider } from "../Stores/StorageProvider"; +import { IBridgeStorageProvider } from "../stores/StorageProvider"; import UserAgent from "../UserAgent"; +import { QueueWithBackoff } from "../libRs"; const log = new Logger("FeedReader"); + +const BACKOFF_TIME_MAX_MS = 24 * 60 * 60 * 1000; +const BACKOFF_POW = 1.05; +const BACKOFF_TIME_MS = 5 * 1000; + export class FeedError extends Error { constructor( public url: string, @@ -73,21 +79,11 @@ function normalizeUrl(input: string): string { return url.toString(); } -function shuffle(array: T[]): T[] { - for (let i = array.length - 1; i > 0; i--) { - const j = Math.floor(Math.random() * (i + 1)); - [array[i], array[j]] = [array[j], array[i]]; - } - return array; -} - export class FeedReader { private connections: FeedConnection[]; - // ts should notice that we do in fact initialize it in constructor, but it doesn't (in this version) - private observedFeedUrls: Set = new Set(); - private feedQueue: string[] = []; + private feedQueue = new QueueWithBackoff(BACKOFF_TIME_MS, BACKOFF_POW, BACKOFF_TIME_MAX_MS); // A set of last modified times for each url. private cacheTimes: Map = new Map(); @@ -104,7 +100,7 @@ export class FeedReader { get sleepingInterval() { return ( // Calculate the number of MS to wait in between feeds. - (this.config.pollIntervalSeconds * 1000) / (this.feedQueue.length || 1) + (this.config.pollIntervalSeconds * 1000) / (this.feedQueue.length() || 1) // And multiply by the number of concurrent readers ) * this.config.pollConcurrency; } @@ -120,7 +116,7 @@ export class FeedReader { this.timeouts.fill(undefined); Object.seal(this.timeouts); this.connections = this.connectionManager.getAllConnectionsOfType(FeedConnection); - this.calculateFeedUrls(); + const initialFeeds = this.calculateFeedUrls(); connectionManager.on('new-connection', c => { if (c instanceof FeedConnection) { log.debug('New connection tracked:', c.connectionId); @@ -135,7 +131,7 @@ export class FeedReader { } }); - log.debug('Loaded feed URLs:', this.observedFeedUrls); + log.debug('Loaded feed URLs:', [...initialFeeds].join(', ')); for (let i = 0; i < config.pollConcurrency; i++) { void this.pollFeeds(i); @@ -147,21 +143,21 @@ export class FeedReader { this.timeouts.forEach(t => clearTimeout(t)); } - private calculateFeedUrls(): void { + private calculateFeedUrls(): Set { // just in case we got an invalid URL somehow - const normalizedUrls = []; + const observedFeedUrls = new Set(); for (const conn of this.connections) { try { - normalizedUrls.push(normalizeUrl(conn.feedUrl)); + observedFeedUrls.add(normalizeUrl(conn.feedUrl)); } catch (err: unknown) { log.error(`Invalid feedUrl for connection ${conn.connectionId}: ${conn.feedUrl}. It will not be tracked`); } } - this.observedFeedUrls = new Set(normalizedUrls); - this.feedQueue = shuffle([...this.observedFeedUrls.values()]); - - Metrics.feedsCount.set(this.observedFeedUrls.size); - Metrics.feedsCountDeprecated.set(this.observedFeedUrls.size); + observedFeedUrls.forEach(url => this.feedQueue.push(url)); + this.feedQueue.shuffle(); + Metrics.feedsCount.set(observedFeedUrls.size); + Metrics.feedsCountDeprecated.set(observedFeedUrls.size); + return observedFeedUrls; } /** @@ -203,22 +199,20 @@ export class FeedReader { if (feed) { // If undefined, we got a not-modified. log.debug(`Found ${feed.items.length} entries in ${url}`); - + const seenItems = await this.storage.hasSeenFeedGuids(url, ...feed.items.filter(item => !!item.hashId).map(item => item.hashId!)) for (const item of feed.items) { // Some feeds have a nasty habit of leading a empty tag there, making us parse it as garbage. if (!item.hashId) { log.error(`Could not determine guid for entry in ${url}, skipping`); continue; } - const hashId = `md5:${item.hashId}`; - newGuids.push(hashId); - - if (initialSync) { - log.debug(`Skipping entry ${item.id ?? hashId} since we're performing an initial sync`); + if (seenItems.includes(item.hashId)) { continue; } - if (await this.storage.hasSeenFeedGuid(url, hashId)) { - log.debug('Skipping already seen entry', item.id ?? hashId); + newGuids.push(item.hashId); + + if (initialSync) { + log.debug(`Skipping entry ${item.id ?? item.hashId} since we're performing an initial sync`); continue; } const entry = { @@ -243,12 +237,12 @@ export class FeedReader { if (seenEntriesChanged && newGuids.length) { await this.storage.storeFeedGuids(url, ...newGuids); } - } this.queue.push({ eventName: 'feed.success', sender: 'FeedReader', data: { url } }); // Clear any feed failures this.feedsFailingHttp.delete(url); this.feedsFailingParsing.delete(url); + this.feedQueue.push(url); } catch (err: unknown) { // TODO: Proper Rust Type error. if ((err as Error).message.includes('Failed to fetch feed due to HTTP')) { @@ -256,12 +250,11 @@ export class FeedReader { } else { this.feedsFailingParsing.add(url); } + const backoffDuration = this.feedQueue.backoff(url); const error = err instanceof Error ? err : new Error(`Unknown error ${err}`); const feedError = new FeedError(url.toString(), error, fetchKey); - log.error("Unable to read feed:", feedError.message); + log.error("Unable to read feed:", feedError.message, `backing off for ${backoffDuration}ms`); this.queue.push({ eventName: 'feed.error', sender: 'FeedReader', data: feedError}); - } finally { - this.feedQueue.push(url); } return seenEntriesChanged; } @@ -277,11 +270,11 @@ export class FeedReader { Metrics.feedsFailingDeprecated.set({ reason: "http" }, this.feedsFailingHttp.size ); Metrics.feedsFailingDeprecated.set({ reason: "parsing" }, this.feedsFailingParsing.size); - log.debug(`Checking for updates in ${this.observedFeedUrls.size} RSS/Atom feeds (worker: ${workerId})`); + log.debug(`Checking for updates in ${this.feedQueue.length()} RSS/Atom feeds (worker: ${workerId})`); const fetchingStarted = Date.now(); - const [ url ] = this.feedQueue.splice(0, 1); + const url = this.feedQueue.pop(); let sleepFor = this.sleepingInterval; if (url) { @@ -298,7 +291,7 @@ export class FeedReader { log.warn(`It took us longer to update the feeds than the configured pool interval`); } } else { - // It may be possible that we have more workers than feeds. This will cause the worker to just sleep. + // It is possible that we have more workers than feeds. This will cause the worker to just sleep. log.debug(`No feeds available to poll for worker ${workerId}`); } diff --git a/src/feeds/mod.rs b/src/feeds/mod.rs index 67c567fa0..ca527719c 100644 --- a/src/feeds/mod.rs +++ b/src/feeds/mod.rs @@ -1 +1,2 @@ pub mod parser; +pub mod reader; diff --git a/src/feeds/parser.rs b/src/feeds/parser.rs index 0dcbc7d16..b0c4d1e09 100644 --- a/src/feeds/parser.rs +++ b/src/feeds/parser.rs @@ -20,7 +20,7 @@ pub struct FeedItem { pub pubdate: Option, pub summary: Option, pub author: Option, - pub hash_id: Option, + pub hash_id: String, } #[derive(Serialize, Debug, Deserialize)] @@ -70,7 +70,9 @@ fn parse_channel_to_js_result(channel: &Channel) -> JsRssChannel { .map(|f| f.value) .or(item.link.clone()) .or(item.title.clone()) - .and_then(|f| hash_id(f).ok()), + .and_then(|f| hash_id(f).ok()) + .and_then(|f| Some(format!("md5:{}", f))) + .unwrap(), // TODO: Handle error }) .collect(), } @@ -117,7 +119,7 @@ fn parse_feed_to_js_result(feed: &Feed) -> JsRssChannel { .map(|date| date.to_rfc2822()), summary: item.summary().map(|v| v.value.clone()), author: authors_to_string(item.authors()), - hash_id: hash_id(item.id.clone()).ok(), + hash_id: hash_id(item.id.clone()).unwrap(), }) .collect(), } diff --git a/src/feeds/reader.rs b/src/feeds/reader.rs new file mode 100644 index 000000000..148f38e49 --- /dev/null +++ b/src/feeds/reader.rs @@ -0,0 +1,212 @@ +use std::collections::{HashMap, HashSet}; +use std::future; +use crate::util::QueueWithBackoff; +use std::time::{Duration, Instant}; +use napi::bindgen_prelude::Error as JsError; +use napi::tokio::sync::RwLock; +use std::sync::Arc; +use uuid::Uuid; +use futures::future::{Future, select_all}; +use crate::feeds::parser::{js_read_feed, ReadFeedOptions}; +use crate::stores::memory::MemoryStorageProvider; +use crate::stores::traits::StorageProvider; + +const BACKOFF_TIME_MAX_MS: f64 = 24f64 * 60f64 * 60f64 * 1000f64; +const BACKOFF_POW: f64 = 1.05f64; +const BACKOFF_TIME_MS: f64 = 5f64 * 1000f64; + +#[derive(Serialize, Debug, Deserialize)] +#[napi(object)] +struct HookshotFeedInfo { + pub title: String, + pub url: String, + pub entries: Vec, + pub fetch_key: String, +} + +#[derive(Serialize, Debug, Deserialize)] +#[napi(object)] +struct HookshotFeedEntry { + pub title: Option, + pub pubdate: Option, + pub summary: Option, + pub author: Option, + pub link: Option, +} + +struct CacheTime { + etag: Option, + last_modified: Option, +} + +impl CacheTime { + fn new() -> Self { + CacheTime { + etag: None, + last_modified: None, + } + } +} + +#[napi] +pub struct FeedReader { + queue: QueueWithBackoff, + feeds_to_retain: HashSet, + cache_times: Arc>>, + poll_interval_seconds: f64, + poll_concurrency: u8, + poll_timeout_seconds: i64, +} + +#[napi] +pub struct FeedReaderMetrics { + feeds_failing_http: usize, + feeds_failing_parsing: usize, +} + +#[napi] + +impl FeedReader { + #[napi(constructor)] + pub fn new(poll_interval_seconds: f64, poll_concurrency: u8, poll_timeout_seconds: i64) -> Self { + let mut cache_times: HashMap = HashMap::new(); + let mut lock = Arc::new(RwLock::new(cache_times)); + FeedReader { + queue: QueueWithBackoff::new( + BACKOFF_TIME_MS, + BACKOFF_POW, + BACKOFF_TIME_MAX_MS, + ), + feeds_to_retain: HashSet::new(), + poll_interval_seconds, + poll_concurrency, + poll_timeout_seconds, + cache_times: lock, + } + } + + #[napi] + pub fn get_metrics(&self) -> FeedReaderMetrics { + FeedReaderMetrics { + feeds_failing_http: 0, + feeds_failing_parsing: 0, + } + } + + + #[napi] + pub fn on_new_url(&mut self, url: String) { + self.queue.push(url); + } + + #[napi] + pub fn on_removed_url(&mut self) { + + } + + async fn poll_feed(&self, url: &String, cache_times: Arc>>, mut storage: &impl StorageProvider) -> Result, JsError> { + let seen_entries_changed = false; + let fetch_key = Uuid::new_v4().to_string(); + + let c_t = cache_times.read().await; + let cache_time = c_t.get(url); + let etag = cache_time.and_then(|c| c.etag.clone()).or(None); + let last_modified = cache_time.and_then(|c| c.last_modified.clone()).or(None); + drop(c_t); + + if let Ok(result) = js_read_feed(url.clone(), ReadFeedOptions { + poll_timeout_seconds: self.poll_timeout_seconds, + etag, + last_modified, + user_agent: "faked user agent".to_string(), + }).await { + let mut c_t_w = cache_times.write().await; + c_t_w.insert(url.clone(), CacheTime { + etag: result.etag, + last_modified: result.last_modified, + }); + drop(c_t_w); + + let initial_sync = storage.has_seen_feed(url).await; + let mut new_guids: Vec = Vec::new(); + let new_entries: Vec = Vec::new(); + + + if let Some(feed) = result.feed { + let items = feed.items.iter().map(|x| x.hash_id.clone()).collect::>(); + let seen_items = storage.has_seen_feed_guids( + url, + &items, + ).await; + println!("Got feed result!"); + let mut feed_info = HookshotFeedInfo { + title: feed.title, + url: url.clone(), + entries: vec![], + fetch_key, + }; + for item in feed.items { + println!("Got feed result! {:?}", item); + if seen_items.contains(&item.hash_id) { + continue; + } + + new_guids.push(item.hash_id.clone()); + + if initial_sync { + // Skip. + continue; + } + feed_info.entries.push(HookshotFeedEntry { + title: item.title, + pubdate: item.pubdate, + summary: item.summary, + author: item.author, + link: item.link, + }); + } + storage.store_feed_guids(&url, &new_guids).await; + return Ok(Some(feed_info)); + } else { + // TODO: Implement + } + + + } // TODO: Handle error. + Ok(None) + } + + fn sleeping_interval(&self) -> f64 { + return (self.poll_interval_seconds * 1000.0) / self.queue.length() as f64; + } + + async unsafe fn poll_feed_int(&mut self, url: &String, mut storage_provider: &impl StorageProvider) { + let mut sleep_for = self.sleeping_interval(); + self.feeds_to_retain.insert(url.clone()); + let now = Instant::now(); + let result = self.poll_feed(url, self.cache_times.clone(), &storage_provider).await; + self.feeds_to_retain.remove(url); + let elapsed = now.elapsed(); + sleep_for = (sleep_for - (elapsed.as_millis() as f64)).max(0.0); + async_std::task::sleep(Duration::from_millis(sleep_for as u64)).await; + } + + #[napi] + pub async unsafe fn poll_feeds(&mut self) -> Result<(), JsError> { + let concurrency = self.poll_concurrency as usize; + let mut storage_provider = MemoryStorageProvider::new(); + let mut future_set: Vec<_> = Vec::new(); + loop { + if let Some(url) = self.queue.pop() { + let result = Box::pin(self.poll_feed_int(&url, &storage_provider)); + future_set.push(result); + } else { + async_std::task::sleep(Duration::from_millis(self.sleeping_interval() as u64)).await; + } + if future_set.len() >= concurrency { + let (item_resolved, ready_future_index, _remaining_futures) = + select_all(future_set).await; + } + } + } +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 1d03f680a..81e1cf4b7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,9 @@ pub mod feeds; pub mod format_util; pub mod github; pub mod jira; +pub mod util; + +mod stores; #[macro_use] extern crate napi_derive; diff --git a/src/Stores/MemoryStorageProvider.ts b/src/stores/MemoryStorageProvider.ts similarity index 94% rename from src/Stores/MemoryStorageProvider.ts rename to src/stores/MemoryStorageProvider.ts index 257c1da3c..52b5bf545 100644 --- a/src/Stores/MemoryStorageProvider.ts +++ b/src/stores/MemoryStorageProvider.ts @@ -35,8 +35,9 @@ export class MemoryStorageProvider extends MSP implements IBridgeStorageProvider return this.feedGuids.has(url); } - async hasSeenFeedGuid(url: string, guid: string): Promise { - return this.feedGuids.get(url)?.includes(guid) ?? false; + async hasSeenFeedGuids(url: string, ...guids: string[]): Promise { + const existing = this.feedGuids.get(url); + return existing ? guids.filter((existingGuid) => existing.includes(existingGuid)) : []; } public async setGithubIssue(repo: string, issueNumber: string, data: IssuesGetResponseData, scope = "") { diff --git a/src/Stores/RedisStorageProvider.ts b/src/stores/RedisStorageProvider.ts similarity index 93% rename from src/Stores/RedisStorageProvider.ts rename to src/stores/RedisStorageProvider.ts index 4f2343ce5..e1dc77433 100644 --- a/src/Stores/RedisStorageProvider.ts +++ b/src/stores/RedisStorageProvider.ts @@ -29,11 +29,10 @@ const WIDGET_USER_TOKENS = "widgets.user-tokens."; const FEED_GUIDS = "feeds.guids."; - - const log = new Logger("RedisASProvider"); export class RedisStorageContextualProvider implements IStorageProvider { + constructor(protected readonly redis: Redis, protected readonly contextSuffix = '') { } public setSyncToken(token: string|null){ @@ -216,9 +215,9 @@ export class RedisStorageProvider extends RedisStorageContextualProvider impleme await this.redis.set(key, JSON.stringify(value)); } - public async storeFeedGuids(url: string, ...guid: string[]): Promise { + public async storeFeedGuids(url: string, ...guids: string[]): Promise { const feedKey = `${FEED_GUIDS}${url}`; - await this.redis.lpush(feedKey, ...guid); + await this.redis.lpush(feedKey, ...guids); await this.redis.ltrim(feedKey, 0, MAX_FEED_ITEMS); } @@ -226,7 +225,16 @@ export class RedisStorageProvider extends RedisStorageContextualProvider impleme return (await this.redis.exists(`${FEED_GUIDS}${url}`)) === 1; } - public async hasSeenFeedGuid(url: string, guid: string): Promise { - return (await this.redis.lpos(`${FEED_GUIDS}${url}`, guid)) != null; + public async hasSeenFeedGuids(url: string, ...guids: string[]): Promise { + let multi = this.redis.multi(); + for (const guid of guids) { + multi = multi.lpos(`${FEED_GUIDS}${url}`, guid); + } + const res = await multi.exec(); + if (res === null) { + // Just assume we've seen none. + return []; + } + return guids.filter((_guid, index) => res[index][1] !== null); } } diff --git a/src/Stores/StorageProvider.ts b/src/stores/StorageProvider.ts similarity index 91% rename from src/Stores/StorageProvider.ts rename to src/stores/StorageProvider.ts index 73790ff95..50175d75e 100644 --- a/src/Stores/StorageProvider.ts +++ b/src/stores/StorageProvider.ts @@ -25,7 +25,7 @@ export interface IBridgeStorageProvider extends IAppserviceStorageProvider, ISto setStoredTempFile(key: string, value: string): Promise; getGitlabDiscussionThreads(connectionId: string): Promise; setGitlabDiscussionThreads(connectionId: string, value: SerializedGitlabDiscussionThreads): Promise; - storeFeedGuids(url: string, ...guid: string[]): Promise; - hasSeenFeed(url: string, ...guid: string[]): Promise; - hasSeenFeedGuid(url: string, guid: string): Promise; + storeFeedGuids(url: string, ...guids: string[]): Promise; + hasSeenFeed(url: string): Promise; + hasSeenFeedGuids(url: string, ...guids: string[]): Promise; } \ No newline at end of file diff --git a/src/stores/memory.rs b/src/stores/memory.rs new file mode 100644 index 000000000..1293d00f6 --- /dev/null +++ b/src/stores/memory.rs @@ -0,0 +1,40 @@ +use std::collections::{HashMap, HashSet}; +use crate::stores::traits::StorageProvider; + +pub struct MemoryStorageProvider { + guids: HashMap>, +} + +impl MemoryStorageProvider { + pub fn new() -> Self { + MemoryStorageProvider { + guids: HashMap::new(), + } + } +} + +impl StorageProvider for MemoryStorageProvider { + async fn store_feed_guids(&mut self, url: &String, guids: &Vec) { + let guid_set = self.guids.entry(url.clone()).or_insert(HashSet::default()); + + for guid in guids { + guid_set.insert(guid.clone()); + } + } + + async fn has_seen_feed(&self, url: &String) -> bool { + self.guids.contains_key(url) + } + + async fn has_seen_feed_guids(&self,url: &String, guids: &Vec) -> Vec { + let mut seen_guids = Vec::default(); + if let Some(existing_guids) = self.guids.get(url) { + for guid in guids { + if existing_guids.contains(guid) { + seen_guids.push(guid.clone()); + } + } + } + seen_guids + } +} \ No newline at end of file diff --git a/src/stores/mod.rs b/src/stores/mod.rs new file mode 100644 index 000000000..2f649380a --- /dev/null +++ b/src/stores/mod.rs @@ -0,0 +1,3 @@ +pub mod traits; +// pub mod redis; +pub mod memory; \ No newline at end of file diff --git a/src/stores/redis.rs b/src/stores/redis.rs new file mode 100644 index 000000000..df28d3f6b --- /dev/null +++ b/src/stores/redis.rs @@ -0,0 +1,20 @@ +use redis::{Commands, ConnectionInfo}; +use crate::stores::traits::StorageProvider; + +pub struct RedisStorageProvider { + client: redis::Client, +} + +impl RedisStorageProvider { + pub fn new(self, host: String, port: u16) -> Self { + let client = redis::Client::open((host, port))?; + + RedisStorageProvider { + client, + } + } +} + +impl StorageProvider for RedisStorageProvider { + +} \ No newline at end of file diff --git a/src/stores/traits.rs b/src/stores/traits.rs new file mode 100644 index 000000000..f44fcc818 --- /dev/null +++ b/src/stores/traits.rs @@ -0,0 +1,5 @@ +pub trait StorageProvider { + async fn store_feed_guids(&mut self, url: &String, guids: &Vec); + async fn has_seen_feed(&self, url: &String) -> bool; + async fn has_seen_feed_guids(&self, url: &String, guids: &Vec) -> Vec; +} \ No newline at end of file diff --git a/src/util/mod.rs b/src/util/mod.rs new file mode 100644 index 000000000..215600ad7 --- /dev/null +++ b/src/util/mod.rs @@ -0,0 +1,115 @@ +use rand::prelude::*; +use std::collections::BTreeMap; +use std::collections::HashMap; +use std::collections::VecDeque; +use std::time::{SystemTime, UNIX_EPOCH}; + +const DEFAULT_BACKOFF_TIME_MAX_MS: f64 = 24f64 * 60f64 * 60f64 * 1000f64; +const DEFAULT_BACKOFF_POW: f64 = 1.05f64; +const DEFAULT_BACKOFF_TIME_MS: f64 = 5f64 * 1000f64; + +#[napi] + +pub struct QueueWithBackoff { + queue: VecDeque, + /** + * A map of absolute backoff timestamps mapped to the value. + */ + backoff: BTreeMap, + /** + * The last duration applied when a value was backed off. + */ + last_backoff_duration: HashMap, + + backoff_time: f64, + backoff_exponent: f64, + backoff_max: f64, +} + +impl Default for QueueWithBackoff { + fn default() -> Self { + Self::new( + DEFAULT_BACKOFF_TIME_MS, + DEFAULT_BACKOFF_POW, + DEFAULT_BACKOFF_TIME_MAX_MS, + ) + } +} +#[napi] + +impl QueueWithBackoff { + #[napi(constructor)] + pub fn new(backoff_time: f64, backoff_exponent: f64, backoff_max: f64) -> Self { + QueueWithBackoff { + queue: VecDeque::new(), + backoff: BTreeMap::new(), + last_backoff_duration: HashMap::new(), + backoff_time, + backoff_exponent, + backoff_max, + } + } + + #[napi] + pub fn pop(&mut self) -> Option { + let start = SystemTime::now(); + let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap().as_millis() as u64; + + // We only need to check this once, as we won't be adding to the backoff queue + // as often as we pull from it. + if let Some(item) = self.backoff.first_entry() { + if *item.key() < since_the_epoch { + let v = item.remove(); + self.queue.push_back(v); + } + } + + self.queue.pop_front() + } + + #[napi] + pub fn push(&mut self, item: String) { + self.last_backoff_duration.remove(&item); + self.queue.push_back(item); + } + + #[napi] + pub fn backoff(&mut self, item: String) -> u32 { + let last_backoff = (*self.last_backoff_duration.get(&item).unwrap_or(&0)) as f64; + + let mut rng = rand::thread_rng(); + let y: f64 = rng.gen::() + 0.5f64; // generates a float between 0.5 and 1.1 + + let backoff_duration = ((y * self.backoff_time) + last_backoff.powf(self.backoff_exponent)) + .min(self.backoff_max) as u32; + let backoff_item = item.clone(); + self.last_backoff_duration.insert(item, backoff_duration); + + let start = SystemTime::now(); + let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap(); + + let mut time = since_the_epoch.as_millis() as u64 + backoff_duration as u64; + + // If the backoff queue contains this time (likely) + // then we want to increase the backoff time slightly + // to allow for it. + let incr: f64 = (rng.gen::() * 2f64) + 2f64; + while self.backoff.contains_key(&time) { + time += (incr * self.backoff_time) as u64; + } + + self.backoff.insert(time, backoff_item); + backoff_duration + } + + #[napi] + pub fn length(&self) -> u32 { + self.queue.len() as u32 + } + + #[napi] + pub fn shuffle(&mut self) { + let mut rng = rand::thread_rng(); + self.queue.make_contiguous().shuffle(&mut rng); + } +} diff --git a/tests/FeedReader.spec.ts b/tests/FeedReader.spec.ts index 9b2de2fb4..e30613b51 100644 --- a/tests/FeedReader.spec.ts +++ b/tests/FeedReader.spec.ts @@ -5,7 +5,7 @@ import { ConnectionManager } from "../src/ConnectionManager"; import { IConnection } from "../src/Connections"; import { FeedEntry, FeedReader } from "../src/feeds/FeedReader"; import { MessageQueue, MessageQueueMessage } from "../src/MessageQueue"; -import { MemoryStorageProvider } from "../src/Stores/MemoryStorageProvider"; +import { MemoryStorageProvider } from "../src/stores/MemoryStorageProvider"; import { Server, createServer } from 'http'; import { AddressInfo } from "net"; diff --git a/tests/connections/GithubRepoTest.ts b/tests/connections/GithubRepoTest.ts index 85af40c62..e686f2a5c 100644 --- a/tests/connections/GithubRepoTest.ts +++ b/tests/connections/GithubRepoTest.ts @@ -23,6 +23,7 @@ const GITHUB_ISSUE = { }, html_url: `https://github.com/${GITHUB_ORG_REPO.org}/${GITHUB_ORG_REPO.repo}/issues/1234`, title: "My issue", + assignees: [] }; const GITHUB_ISSUE_CREATED_PAYLOAD = { @@ -137,7 +138,7 @@ describe("GitHubRepoConnection", () => { intent.expectEventBodyContains(GITHUB_ISSUE_CREATED_PAYLOAD.issue.html_url, 0); intent.expectEventBodyContains(GITHUB_ISSUE_CREATED_PAYLOAD.issue.title, 0); }); - it.only("will handle assignees on issue creation", async () => { + it("will handle assignees on issue creation", async () => { const { connection, intent } = createConnection(); await connection.onIssueCreated({ ...GITHUB_ISSUE_CREATED_PAYLOAD, diff --git a/tests/connections/GitlabRepoTest.ts b/tests/connections/GitlabRepoTest.ts index ee87d7658..e56e1265a 100644 --- a/tests/connections/GitlabRepoTest.ts +++ b/tests/connections/GitlabRepoTest.ts @@ -5,7 +5,7 @@ import { ApiError, ErrCode, ValidatorApiError } from "../../src/api"; import { GitLabRepoConnection, GitLabRepoConnectionState } from "../../src/Connections"; import { expect } from "chai"; import { BridgeConfigGitLab } from "../../src/config/Config"; -import { IBridgeStorageProvider } from "../../src/Stores/StorageProvider"; +import { IBridgeStorageProvider } from "../../src/stores/StorageProvider"; import { IntentMock } from "../utils/IntentMock"; const ROOM_ID = "!foo:bar";