From bb9a7dda25ef74be8a8010a2d123fea6b06768e8 Mon Sep 17 00:00:00 2001 From: Martins Erts Date: Thu, 27 Nov 2025 15:48:20 +0200 Subject: [PATCH 1/3] Initial commit --- .env => .env.sample | 8 +- .gitignore | 2 + Cargo.lock | 429 +++++++++++------- README.md | 45 +- docs/architecture/payment-batch-lifecycle.md | 20 + docs/architecture/state-descriptions.md | 12 + docs/architecture/system-overview.md | 40 ++ minotari-client/src/apis/accounts_api.rs | 68 +++ minotari-client/src/models/api_error.rs | 1 + .../src/models/api_error_one_of_3.rs | 10 +- .../src/models/api_error_one_of_4.rs | 26 ++ .../src/models/lock_funds_request.rs | 61 +++ .../src/models/lock_funds_response.rs | 44 ++ minotari-client/src/models/mod.rs | 6 + minotari_payment_processor/Cargo.toml | 9 +- minotari_payment_processor/src/api/mod.rs | 7 +- .../src/api/payments.rs | 31 +- minotari_payment_processor/src/config.rs | 166 +++++++ minotari_payment_processor/src/db/payment.rs | 2 +- .../src/db/payment_batch.rs | 1 + minotari_payment_processor/src/lib.rs | 1 + minotari_payment_processor/src/main.rs | 76 +--- .../src/workers/batch_creator.rs | 80 +++- .../src/workers/broadcaster.rs | 103 ++++- .../src/workers/confirmation_checker.rs | 236 +++++++--- .../src/workers/transaction_signer.rs | 242 +++++++--- .../src/workers/unsigned_tx_creator.rs | 225 +++++++-- 27 files changed, 1475 insertions(+), 476 deletions(-) rename .env => .env.sample (65%) create mode 100644 docs/architecture/payment-batch-lifecycle.md create mode 100644 docs/architecture/state-descriptions.md create mode 100644 docs/architecture/system-overview.md create mode 100644 minotari-client/src/models/api_error_one_of_4.rs create mode 100644 minotari-client/src/models/lock_funds_request.rs create mode 100644 minotari-client/src/models/lock_funds_response.rs create mode 100644 minotari_payment_processor/src/config.rs diff --git a/.env b/.env.sample similarity index 65% rename from .env rename to .env.sample index 0f518cb..3f2a0ec 100644 --- a/.env +++ b/.env.sample @@ -2,11 +2,17 @@ DATABASE_URL="sqlite://data/payments.db" PAYMENT_RECEIVER="http://localhost:9000" BASE_NODE="https://rpc.esmeralda.tari.com" CONSOLE_WALLET_PATH="minotari_console_wallet" +CONSOLE_WALLET_BASE_PATH="." CONSOLE_WALLET_PASSWORD="password" LISTEN_IP="0.0.0.0" LISTEN_PORT="9145" -BATCH_CREATOR_SLEEP_SECS="600" +BATCH_CREATOR_SLEEP_SECS="15" UNSIGNED_TX_CREATOR_SLEEP_SECS="15" TRANSACTION_SIGNER_SLEEP_SECS="10" BROADCASTER_SLEEP_SECS="15" CONFIRMATION_CHECKER_SLEEP_SECS="60" +TARI_NETWORK=Esmeralda + +ACCOUNTS__DEFAULT__NAME="default" +ACCOUNTS__DEFAULT__VIEW_KEY="4b51..." +ACCOUNTS__DEFAULT__PUBLIC_SPEND_KEY="504b..." diff --git a/.gitignore b/.gitignore index e0fb2ea..e5fced1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ /target /data .idea/ + +.env diff --git a/Cargo.lock b/Cargo.lock index f88ee12..bc91b6d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -51,56 +51,6 @@ dependencies = [ "libc", ] -[[package]] -name = "anstream" -version = "0.6.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a" -dependencies = [ - "anstyle", - "anstyle-parse", - "anstyle-query", - "anstyle-wincon", - "colorchoice", - "is_terminal_polyfill", - "utf8parse", -] - -[[package]] -name = "anstyle" -version = "1.0.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78" - -[[package]] -name = "anstyle-parse" -version = "0.2.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2" -dependencies = [ - "utf8parse", -] - -[[package]] -name = "anstyle-query" -version = "1.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e231f6134f61b71076a3eab506c379d4f36122f2af15a9ff04415ea4c3339e2" -dependencies = [ - "windows-sys 0.60.2", -] - -[[package]] -name = "anstyle-wincon" -version = "3.0.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e0633414522a32ffaac8ac6cc8f748e090c5717661fddeea04219e2344f5f2a" -dependencies = [ - "anstyle", - "once_cell_polyfill", - "windows-sys 0.60.2", -] - [[package]] name = "anyhow" version = "1.0.100" @@ -133,6 +83,12 @@ dependencies = [ "password-hash", ] +[[package]] +name = "arraydeque" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d902e3d592a523def97af8f317b08ce16b7ab854c1985a0c671e6f15cebc236" + [[package]] name = "arrayref" version = "0.3.9" @@ -521,78 +477,72 @@ dependencies = [ ] [[package]] -name = "clap" -version = "4.5.48" +name = "concurrent-queue" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2134bb3ea021b78629caa971416385309e0131b351b25e01dc16fb54e1b5fae" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" dependencies = [ - "clap_builder", - "clap_derive", + "crossbeam-utils", ] [[package]] -name = "clap_builder" -version = "4.5.48" +name = "config" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2ba64afa3c0a6df7fa517765e31314e983f51dda798ffba27b988194fb65dc9" +checksum = "68578f196d2a33ff61b27fae256c3164f65e36382648e30666dde05b8cc9dfdf" dependencies = [ - "anstream", - "anstyle", - "clap_lex", - "strsim", + "nom", + "pathdiff", + "serde", + "toml 0.8.23", ] [[package]] -name = "clap_derive" -version = "4.5.47" +name = "config" +version = "0.15.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbfd7eae0b0f1a6e63d4b13c9c478de77c2eb546fba158ad50b4203dc24b9f9c" +checksum = "b30fa8254caad766fc03cb0ccae691e14bf3bd72bfff27f72802ce729551b3d6" dependencies = [ - "heck 0.5.0", - "proc-macro2", - "quote", - "syn 2.0.106", + "async-trait", + "convert_case", + "json5", + "pathdiff", + "ron", + "rust-ini", + "serde-untagged", + "serde_core", + "serde_json", + "toml 0.9.8", + "winnow", + "yaml-rust2", ] [[package]] -name = "clap_lex" -version = "0.7.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b94f61472cee1439c0b966b47e3aca9ae07e45d070759512cd390ea2bebc6675" - -[[package]] -name = "colorchoice" -version = "1.0.4" +name = "const-oid" +version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" +checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" [[package]] -name = "concurrent-queue" -version = "2.5.0" +name = "const-random" +version = "0.1.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" dependencies = [ - "crossbeam-utils", + "const-random-macro", ] [[package]] -name = "config" -version = "0.14.1" +name = "const-random-macro" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68578f196d2a33ff61b27fae256c3164f65e36382648e30666dde05b8cc9dfdf" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" dependencies = [ - "nom", - "pathdiff", - "serde", - "toml 0.8.23", + "getrandom 0.2.16", + "once_cell", + "tiny-keccak", ] -[[package]] -name = "const-oid" -version = "0.9.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" - [[package]] name = "const_format" version = "0.2.35" @@ -613,6 +563,15 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "convert_case" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec182b0ca2f35d8fc196cf3404988fd8b8c739a4d270ff118a398feb0cbec1ca" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -902,6 +861,15 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "dlv-list" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f" +dependencies = [ + "const-random", +] + [[package]] name = "doc-comment" version = "0.3.3" @@ -950,6 +918,17 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" +[[package]] +name = "erased-serde" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89e8918065695684b2b0702da20382d5ae6065cf3327bc2d6436bd49a71ce9f3" +dependencies = [ + "serde", + "serde_core", + "typeid", +] + [[package]] name = "errno" version = "0.3.14" @@ -1265,6 +1244,12 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.5" @@ -1710,12 +1695,6 @@ dependencies = [ "serde", ] -[[package]] -name = "is_terminal_polyfill" -version = "1.70.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" - [[package]] name = "itertools" version = "0.12.1" @@ -1750,6 +1729,17 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json5" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96b0db21af676c1ce64250b5f40f3ce2cf27e4e47cb91ed91eb6fe9350b430c1" +dependencies = [ + "pest", + "pest_derive", + "serde", +] + [[package]] name = "keccak" version = "0.1.5" @@ -1948,16 +1938,17 @@ dependencies = [ [[package]] name = "minotari_ledger_wallet_common" -version = "5.1.0-rc.1" -source = "git+https://github.com/stringhandler/tari/?rev=6430bfffb53af71d80535243eec9165534af2b41#6430bfffb53af71d80535243eec9165534af2b41" +version = "5.2.0-pre.5" +source = "git+https://github.com/tari-project/tari/?rev=ed473c3c94c2219ac1eebb9345384b6a7245606c#ed473c3c94c2219ac1eebb9345384b6a7245606c" dependencies = [ "bs58 0.5.1", + "serde", ] [[package]] name = "minotari_node_wallet_client" -version = "5.1.0-rc.1" -source = "git+https://github.com/stringhandler/tari/?rev=6430bfffb53af71d80535243eec9165534af2b41#6430bfffb53af71d80535243eec9165534af2b41" +version = "5.2.0-pre.5" +source = "git+https://github.com/tari-project/tari/?rev=ed473c3c94c2219ac1eebb9345384b6a7245606c#ed473c3c94c2219ac1eebb9345384b6a7245606c" dependencies = [ "anyhow", "async-trait", @@ -1979,7 +1970,7 @@ dependencies = [ "anyhow", "axum", "chrono", - "clap 4.5.48", + "config 0.15.19", "dotenv", "hex", "minotari-client", @@ -1987,6 +1978,9 @@ dependencies = [ "serde", "serde_json", "sqlx", + "tari_common", + "tari_common_types", + "tari_crypto", "tari_transaction_components", "tari_utilities", "tempfile", @@ -2184,12 +2178,6 @@ dependencies = [ "portable-atomic", ] -[[package]] -name = "once_cell_polyfill" -version = "1.70.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" - [[package]] name = "opaque-debug" version = "0.3.1" @@ -2249,6 +2237,16 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-multimap" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49203cdcae0030493bad186b28da2fa25645fa276a51b6fec8010d281e02ef79" +dependencies = [ + "dlv-list", + "hashbrown 0.14.5", +] + [[package]] name = "parity-scale-codec" version = "3.7.5" @@ -2350,6 +2348,49 @@ version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" +[[package]] +name = "pest" +version = "2.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbcfd20a6d4eeba40179f05735784ad32bdaef05ce8e8af05f180d45bb3e7e22" +dependencies = [ + "memchr", + "ucd-trie", +] + +[[package]] +name = "pest_derive" +version = "2.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51f72981ade67b1ca6adc26ec221be9f463f2b5839c7508998daa17c23d94d7f" +dependencies = [ + "pest", + "pest_generator", +] + +[[package]] +name = "pest_generator" +version = "2.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dee9efd8cdb50d719a80088b76f81aec7c41ed6d522ee750178f83883d271625" +dependencies = [ + "pest", + "pest_meta", + "proc-macro2", + "quote", + "syn 2.0.106", +] + +[[package]] +name = "pest_meta" +version = "2.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf1d70880e76bdc13ba52eafa6239ce793d85c8e43896507e43dd8984ff05b82" +dependencies = [ + "pest", + "sha2", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -2779,6 +2820,20 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "ron" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd490c5b18261893f14449cbd28cb9c0b637aebf161cd77900bfdedaff21ec32" +dependencies = [ + "bitflags 2.9.4", + "once_cell", + "serde", + "serde_derive", + "typeid", + "unicode-ident", +] + [[package]] name = "rsa" version = "0.9.8" @@ -2833,6 +2888,16 @@ dependencies = [ "walkdir", ] +[[package]] +name = "rust-ini" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "796e8d2b6696392a43bea58116b667fb4c29727dc5abd27d6acf338bb4f688c7" +dependencies = [ + "cfg-if", + "ordered-multimap", +] + [[package]] name = "rustc-demangle" version = "0.1.26" @@ -3011,6 +3076,18 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-untagged" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9faf48a4a2d2693be24c6289dbe26552776eb7737074e6722891fadbe6c5058" +dependencies = [ + "erased-serde", + "serde", + "serde_core", + "typeid", +] + [[package]] name = "serde-value" version = "0.7.0" @@ -3085,6 +3162,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_spanned" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e24345aa0fe688594e73770a5f6d1b216508b4f93484c0026d521acd30134392" +dependencies = [ + "serde_core", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -3548,7 +3634,7 @@ version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c6b5c64445ba8094a6ab0c3cd2ad323e07171012d9c98b0b15651daf1787a10" dependencies = [ - "clap 2.34.0", + "clap", "lazy_static", "structopt-derive", ] @@ -3694,12 +3780,12 @@ dependencies = [ [[package]] name = "tari_common" -version = "5.1.0-rc.1" -source = "git+https://github.com/stringhandler/tari/?rev=6430bfffb53af71d80535243eec9165534af2b41#6430bfffb53af71d80535243eec9165534af2b41" +version = "5.2.0-pre.5" +source = "git+https://github.com/tari-project/tari/?rev=ed473c3c94c2219ac1eebb9345384b6a7245606c#ed473c3c94c2219ac1eebb9345384b6a7245606c" dependencies = [ "anyhow", "cargo_toml", - "config", + "config 0.14.1", "dirs-next", "log", "log4rs", @@ -3717,8 +3803,8 @@ dependencies = [ [[package]] name = "tari_common_types" -version = "5.1.0-rc.1" -source = "git+https://github.com/stringhandler/tari/?rev=6430bfffb53af71d80535243eec9165534af2b41#6430bfffb53af71d80535243eec9165534af2b41" +version = "5.2.0-pre.5" +source = "git+https://github.com/tari-project/tari/?rev=ed473c3c94c2219ac1eebb9345384b6a7245606c#ed473c3c94c2219ac1eebb9345384b6a7245606c" dependencies = [ "argon2", "base64 0.21.7", @@ -3732,7 +3818,6 @@ dependencies = [ "digest", "getrandom 0.2.16", "js-sys", - "minotari_ledger_wallet_common", "newtype-ops", "once_cell", "primitive-types", @@ -3777,13 +3862,13 @@ dependencies = [ [[package]] name = "tari_features" -version = "5.1.0-rc.1" -source = "git+https://github.com/stringhandler/tari/?rev=6430bfffb53af71d80535243eec9165534af2b41#6430bfffb53af71d80535243eec9165534af2b41" +version = "5.2.0-pre.5" +source = "git+https://github.com/tari-project/tari/?rev=ed473c3c94c2219ac1eebb9345384b6a7245606c#ed473c3c94c2219ac1eebb9345384b6a7245606c" [[package]] name = "tari_hashing" -version = "5.1.0-rc.1" -source = "git+https://github.com/stringhandler/tari/?rev=6430bfffb53af71d80535243eec9165534af2b41#6430bfffb53af71d80535243eec9165534af2b41" +version = "5.2.0-pre.5" +source = "git+https://github.com/tari-project/tari/?rev=ed473c3c94c2219ac1eebb9345384b6a7245606c#ed473c3c94c2219ac1eebb9345384b6a7245606c" dependencies = [ "blake2", "borsh", @@ -3793,8 +3878,8 @@ dependencies = [ [[package]] name = "tari_jellyfish" -version = "5.1.0-rc.1" -source = "git+https://github.com/stringhandler/tari/?rev=6430bfffb53af71d80535243eec9165534af2b41#6430bfffb53af71d80535243eec9165534af2b41" +version = "5.2.0-pre.5" +source = "git+https://github.com/tari-project/tari/?rev=ed473c3c94c2219ac1eebb9345384b6a7245606c#ed473c3c94c2219ac1eebb9345384b6a7245606c" dependencies = [ "borsh", "digest", @@ -3807,8 +3892,8 @@ dependencies = [ [[package]] name = "tari_max_size" -version = "5.1.0-rc.1" -source = "git+https://github.com/stringhandler/tari/?rev=6430bfffb53af71d80535243eec9165534af2b41#6430bfffb53af71d80535243eec9165534af2b41" +version = "5.2.0-pre.5" +source = "git+https://github.com/tari-project/tari/?rev=ed473c3c94c2219ac1eebb9345384b6a7245606c#ed473c3c94c2219ac1eebb9345384b6a7245606c" dependencies = [ "borsh", "serde", @@ -3818,8 +3903,8 @@ dependencies = [ [[package]] name = "tari_script" -version = "5.1.0-rc.1" -source = "git+https://github.com/stringhandler/tari/?rev=6430bfffb53af71d80535243eec9165534af2b41#6430bfffb53af71d80535243eec9165534af2b41" +version = "5.2.0-pre.5" +source = "git+https://github.com/tari-project/tari/?rev=ed473c3c94c2219ac1eebb9345384b6a7245606c#ed473c3c94c2219ac1eebb9345384b6a7245606c" dependencies = [ "blake2", "borsh", @@ -3834,33 +3919,18 @@ dependencies = [ "thiserror 2.0.17", ] -[[package]] -name = "tari_service_framework" -version = "5.1.0-rc.1" -source = "git+https://github.com/stringhandler/tari/?rev=6430bfffb53af71d80535243eec9165534af2b41#6430bfffb53af71d80535243eec9165534af2b41" -dependencies = [ - "anyhow", - "async-trait", - "futures", - "log", - "tari_shutdown", - "thiserror 2.0.17", - "tokio", - "tower-service", -] - [[package]] name = "tari_shutdown" -version = "5.1.0-rc.1" -source = "git+https://github.com/stringhandler/tari/?rev=6430bfffb53af71d80535243eec9165534af2b41#6430bfffb53af71d80535243eec9165534af2b41" +version = "5.2.0-pre.5" +source = "git+https://github.com/tari-project/tari/?rev=ed473c3c94c2219ac1eebb9345384b6a7245606c#ed473c3c94c2219ac1eebb9345384b6a7245606c" dependencies = [ "futures", ] [[package]] name = "tari_sidechain" -version = "5.1.0-rc.1" -source = "git+https://github.com/stringhandler/tari/?rev=6430bfffb53af71d80535243eec9165534af2b41#6430bfffb53af71d80535243eec9165534af2b41" +version = "5.2.0-pre.5" +source = "git+https://github.com/tari-project/tari/?rev=ed473c3c94c2219ac1eebb9345384b6a7245606c#ed473c3c94c2219ac1eebb9345384b6a7245606c" dependencies = [ "borsh", "hex", @@ -3876,11 +3946,10 @@ dependencies = [ [[package]] name = "tari_transaction_components" -version = "5.1.0-rc.1" -source = "git+https://github.com/stringhandler/tari/?rev=6430bfffb53af71d80535243eec9165534af2b41#6430bfffb53af71d80535243eec9165534af2b41" +version = "5.2.0-pre.5" +source = "git+https://github.com/tari-project/tari/?rev=ed473c3c94c2219ac1eebb9345384b6a7245606c#ed473c3c94c2219ac1eebb9345384b6a7245606c" dependencies = [ "anyhow", - "async-trait", "bitflags 2.9.4", "blake2", "borsh", @@ -3892,6 +3961,7 @@ dependencies = [ "digest", "integer-encoding", "log", + "minotari_ledger_wallet_common", "newtype-ops", "num-derive", "num-format", @@ -3903,7 +3973,6 @@ dependencies = [ "serde_json", "serde_repr", "serde_valid", - "strum", "strum_macros", "tari_common", "tari_common_types", @@ -3911,7 +3980,6 @@ dependencies = [ "tari_hashing", "tari_max_size", "tari_script", - "tari_service_framework", "tari_sidechain", "tari_utilities", "thiserror 2.0.17", @@ -4052,6 +4120,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinystr" version = "0.8.1" @@ -4168,11 +4245,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc1beb996b9d83529a9e75c17a1686767d148d70663143c7854d8b4a09ced362" dependencies = [ "serde", - "serde_spanned", + "serde_spanned 0.6.9", "toml_datetime 0.6.11", "toml_edit 0.22.27", ] +[[package]] +name = "toml" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0dc8b1fb61449e27716ec0e1bdf0f6b8f3e8f6b05391e8497b8b6d7804ea6d8" +dependencies = [ + "serde_core", + "serde_spanned 1.0.3", + "toml_datetime 0.7.3", + "toml_parser", + "winnow", +] + [[package]] name = "toml_datetime" version = "0.6.11" @@ -4199,7 +4289,7 @@ checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" dependencies = [ "indexmap 2.11.4", "serde", - "serde_spanned", + "serde_spanned 0.6.9", "toml_datetime 0.6.11", "toml_write", "winnow", @@ -4316,6 +4406,12 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "typeid" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c" + [[package]] name = "typemap-ors" version = "1.0.0" @@ -4331,6 +4427,12 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" +[[package]] +name = "ucd-trie" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2896d95c02a80c6d6a5d6e953d479f5ddf2dfdb6a244441010e373ac0fb88971" + [[package]] name = "uint" version = "0.9.5" @@ -4449,12 +4551,6 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" -[[package]] -name = "utf8parse" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" - [[package]] name = "utoipa" version = "5.4.0" @@ -5065,6 +5161,17 @@ dependencies = [ "tap", ] +[[package]] +name = "yaml-rust2" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2462ea039c445496d8793d052e13787f2b90e750b833afee748e601c17621ed9" +dependencies = [ + "arraydeque", + "encoding_rs", + "hashlink", +] + [[package]] name = "yoke" version = "0.8.0" diff --git a/README.md b/README.md index 7743c0b..cc70ffb 100644 --- a/README.md +++ b/README.md @@ -46,32 +46,51 @@ make db-dump ## Configuration -The `minotari_payment_processor` is configured using environment variables. These variables can be set in a `.env` file in the project root or directly in your environment. +The `minotari_payment_processor` is configured using environment variables. These variables can be set in a `.env` file in the project root or directly in your system environment. -Below is a list of key environment variables: +Because the application uses structured configuration, hierarchical settings (like accounts) use double underscores (`__`) as separators. + +### Core Settings * **`DATABASE_URL`** (Mandatory): The URL for the SQLite database. * Example: `DATABASE_URL="sqlite://data/payments.db"` +* **`TARI_NETWORK`** (Optional): The Tari network to run on. Defaults to `MainNet`. + * Options: `MainNet`, `Esmeralda`, `NextNet`, `Igor`. + * Example: `TARI_NETWORK="Esmeralda"` * **`PAYMENT_RECEIVER`** (Mandatory): The URL of the Payment Receiver (PR) API. * Example: `PAYMENT_RECEIVER="http://localhost:9000"` * **`BASE_NODE`** (Mandatory): The URL of the Tari Base Node. * Example: `BASE_NODE="https://rpc.esmeralda.tari.com"` * **`CONSOLE_WALLET_PATH`** (Mandatory): The path to the `minotari_console_wallet` executable, used for signing transactions. - * Example: `CONSOLE_WALLET_PATH="minotari_console_wallet"` +* **`CONSOLE_WALLET_BASE_PATH`** (Mandatory): Wallet base path (--base-path). + * Example: `CONSOLE_WALLET_PATH="/usr/local/bin/minotari_console_wallet"` +* **`CONSOLE_WALLET_PASSWORD`** (Mandatory): The password for the console wallet. + * Example: `CONSOLE_WALLET_PASSWORD="my_secure_password"` * **`LISTEN_IP`** (Optional): The IP address the HTTP API server will listen on. Defaults to `0.0.0.0`. * Example: `LISTEN_IP="0.0.0.0"` * **`LISTEN_PORT`** (Optional): The port the HTTP API server will listen on. Defaults to `9145`. * Example: `LISTEN_PORT="9145"` -* **`BATCH_CREATOR_SLEEP_SECS`** (Optional): The sleep duration in seconds for the Batch Creator worker. - * Example: `BATCH_CREATOR_SLEEP_SECS="600"` (10 minutes) -* **`UNSIGNED_TX_CREATOR_SLEEP_SECS`** (Optional): The sleep duration in seconds for the Unsigned Transaction Creator worker. - * Example: `UNSIGNED_TX_CREATOR_SLEEP_SECS="15"` -* **`TRANSACTION_SIGNER_SLEEP_SECS`** (Optional): The sleep duration in seconds for the Transaction Signer worker. - * Example: `TRANSACTION_SIGNER_SLEEP_SECS="10"` -* **`BROADCASTER_SLEEP_SECS`** (Optional): The sleep duration in seconds for the Broadcaster worker. - * Example: `BROADCASTER_SLEEP_SECS="15"` -* **`CONFIRMATION_CHECKER_SLEEP_SECS`** (Optional): The sleep duration in seconds for the Confirmation Checker worker. - * Example: `CONFIRMATION_CHECKER_SLEEP_SECS="60"` + +### Account Configuration + +The application supports configuring multiple payment receiver accounts. Because the number of accounts is dynamic, the configuration uses a specific pattern with double underscores (`__`) to map environment variables to specific accounts. + +The format is: `ACCOUNTS____` + +Each account requires three fields: `NAME`, `VIEW_KEY` (Hex), and `PUBLIC_SPEND_KEY` (Hex). + +**Example configuration for two accounts ("Primary" and "Backup"):** + +```bash +# Account 1: Identifier 'default' +ACCOUNTS__DEFAULT__NAME="default" +ACCOUNTS__DEFAULT__VIEW_KEY="a1b2c3d4..." +ACCOUNTS__DEFAULT__PUBLIC_SPEND_KEY="e5f6g7h8..." + +# Account 2: Identifier 'backup' +ACCOUNTS__BACKUP__NAME="backup" +ACCOUNTS__BACKUP__VIEW_KEY="11223344..." +ACCOUNTS__BACKUP__PUBLIC_SPEND_KEY="55667788..." ## HTTP API diff --git a/docs/architecture/payment-batch-lifecycle.md b/docs/architecture/payment-batch-lifecycle.md new file mode 100644 index 0000000..e37a3d5 --- /dev/null +++ b/docs/architecture/payment-batch-lifecycle.md @@ -0,0 +1,20 @@ +```mermaid +stateDiagram-v2 + [*] --> PendingBatching: Batch Creator
Groups pending payments + + PendingBatching --> AwaitingSignature: Unsigned TX Creator
Fetches unsigned TX from API + PendingBatching --> Failed: Error + + AwaitingSignature --> SigningInProgress: TX Signer
Locks batch + SigningInProgress --> AwaitingBroadcast: TX Signer
Signs via Console Wallet + SigningInProgress --> Failed: Signing Error + + AwaitingBroadcast --> Broadcasting: Broadcaster
Locks batch + Broadcasting --> AwaitingConfirmation: Broadcaster
Submits to Base Node + Broadcasting --> Failed: Node Rejection / Max Retries + + AwaitingConfirmation --> Confirmed: Confirmation Checker
Wait for N blocks + + Confirmed --> [*] + Failed --> [*] +``` diff --git a/docs/architecture/state-descriptions.md b/docs/architecture/state-descriptions.md new file mode 100644 index 0000000..989aa06 --- /dev/null +++ b/docs/architecture/state-descriptions.md @@ -0,0 +1,12 @@ +#### State Descriptions + +| Status | Worker Responsible | Description | +| :--- | :--- | :--- | +| **PENDING_BATCHING** | `Batch Creator` | A new batch has been created from individual payments. It waits for the transaction structure to be built. | +| **AWAITING_SIGNATURE** | `Unsigned TX Creator` | The transaction structure (unsigned) has been retrieved from the Wallet API. It is ready to be signed. | +| **SIGNING_IN_PROGRESS** | `Transaction Signer` | A worker has picked up the batch and is currently calculating the signature (CPU intensive). | +| **AWAITING_BROADCAST** | `Transaction Signer` | The transaction is fully signed and stored in the DB, ready to be sent to the network. | +| **BROADCASTING** | `Broadcaster` | A worker is currently attempting to submit the transaction to the Base Node. | +| **AWAITING_CONFIRMATION** | `Broadcaster` | The transaction was accepted by the mempool. The system is now polling for block depth. | +| **CONFIRMED** | `Confirmation Checker` | The transaction has reached the required block depth (e.g., 10 blocks). | +| **FAILED** | All | A terminal state indicating a non-recoverable error (or max retries reached). | diff --git a/docs/architecture/system-overview.md b/docs/architecture/system-overview.md new file mode 100644 index 0000000..e914261 --- /dev/null +++ b/docs/architecture/system-overview.md @@ -0,0 +1,40 @@ +```mermaid +graph TD + subgraph "External" + Client[Client App] + ExtAPI[Wallet/Account API] + Console[Console Wallet CLI] + Node[Base Node] + end + + subgraph "Payment Processor" + API[Axum API] + DB[(SQLite DB)] + + subgraph "Workers" + W_Batch[Batch Creator] + W_Unsigned[Unsigned TX Creator] + W_Signer[TX Signer] + W_Broadcast[Broadcaster] + W_Confirm[Confirmation Checker] + end + end + + Client -->|POST /payments| API + API -->|Insert 'Pending'| DB + + W_Batch -->|Poll 'Pending'| DB + W_Batch -->|Group & Create Batch| DB + + W_Unsigned -->|Poll 'PendingBatching'| DB + W_Unsigned <-->|Req Unsigned TX| ExtAPI + + W_Signer -->|Poll 'AwaitingSignature'| DB + W_Signer <-->|Exec Process| Console + + W_Broadcast -->|Poll 'AwaitingBroadcast'| DB + W_Broadcast -->|Submit TX| Node + + W_Confirm -->|Poll 'AwaitingConfirmation'| DB + W_Confirm <-->|Check Depth| Node +``` diff --git a/minotari-client/src/apis/accounts_api.rs b/minotari-client/src/apis/accounts_api.rs index 2b77d5e..7b940ae 100644 --- a/minotari-client/src/apis/accounts_api.rs +++ b/minotari-client/src/apis/accounts_api.rs @@ -32,6 +32,16 @@ pub enum ApiGetBalanceError { UnknownValue(serde_json::Value), } +/// struct for typed errors of method [`api_lock_funds`] +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(untagged)] +pub enum ApiLockFundsError { + Status400(models::ApiError), + Status404(models::ApiError), + Status500(models::ApiError), + UnknownValue(serde_json::Value), +} + pub async fn api_create_unsigned_transaction( configuration: &configuration::Configuration, name: &str, @@ -144,3 +154,61 @@ pub async fn api_get_balance( })) } } + +pub async fn api_lock_funds( + configuration: &configuration::Configuration, + name: &str, + lock_funds_request: models::LockFundsRequest, +) -> Result> { + // add a prefix to parameters to efficiently prevent name collisions + let p_path_name = name; + let p_body_lock_funds_request = lock_funds_request; + + let uri_str = format!( + "{}/accounts/{name}/lock_funds", + configuration.base_path, + name = crate::apis::urlencode(p_path_name) + ); + let mut req_builder = configuration.client.request(reqwest::Method::POST, &uri_str); + + if let Some(ref user_agent) = configuration.user_agent { + req_builder = req_builder.header(reqwest::header::USER_AGENT, user_agent.clone()); + } + req_builder = req_builder.json(&p_body_lock_funds_request); + + let req = req_builder.build()?; + let resp = configuration.client.execute(req).await?; + + let status = resp.status(); + let content_type = resp + .headers() + .get("content-type") + .and_then(|v| v.to_str().ok()) + .unwrap_or("application/octet-stream"); + let content_type = super::ContentType::from(content_type); + + if !status.is_client_error() && !status.is_server_error() { + let content = resp.text().await?; + match content_type { + ContentType::Json => serde_json::from_str(&content).map_err(Error::from), + ContentType::Text => { + return Err(Error::from(serde_json::Error::custom( + "Received `text/plain` content type response that cannot be converted to `models::LockFundsResponse`", + ))); + }, + ContentType::Unsupported(unknown_type) => { + return Err(Error::from(serde_json::Error::custom(format!( + "Received `{unknown_type}` content type response that cannot be converted to `models::LockFundsResponse`" + )))); + }, + } + } else { + let content = resp.text().await?; + let entity: Option = serde_json::from_str(&content).ok(); + Err(Error::ResponseError(ResponseContent { + status, + content, + entity, + })) + } +} diff --git a/minotari-client/src/models/api_error.rs b/minotari-client/src/models/api_error.rs index 2c26ad9..5d49345 100644 --- a/minotari-client/src/models/api_error.rs +++ b/minotari-client/src/models/api_error.rs @@ -18,6 +18,7 @@ pub enum ApiError { ApiErrorOneOf1(Box), ApiErrorOneOf2(Box), ApiErrorOneOf3(Box), + ApiErrorOneOf4(Box), } impl Default for ApiError { diff --git a/minotari-client/src/models/api_error_one_of_3.rs b/minotari-client/src/models/api_error_one_of_3.rs index 5173fb7..dce0dbf 100644 --- a/minotari-client/src/models/api_error_one_of_3.rs +++ b/minotari-client/src/models/api_error_one_of_3.rs @@ -13,14 +13,12 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] pub struct ApiErrorOneOf3 { - #[serde(rename = "FailedCreateUnsignedTx")] - pub failed_create_unsigned_tx: String, + #[serde(rename = "FailedToLockFunds")] + pub failed_to_lock_funds: String, } impl ApiErrorOneOf3 { - pub fn new(failed_create_unsigned_tx: String) -> ApiErrorOneOf3 { - ApiErrorOneOf3 { - failed_create_unsigned_tx, - } + pub fn new(failed_to_lock_funds: String) -> ApiErrorOneOf3 { + ApiErrorOneOf3 { failed_to_lock_funds } } } diff --git a/minotari-client/src/models/api_error_one_of_4.rs b/minotari-client/src/models/api_error_one_of_4.rs new file mode 100644 index 0000000..52cdadd --- /dev/null +++ b/minotari-client/src/models/api_error_one_of_4.rs @@ -0,0 +1,26 @@ +/* + * minotari + * + * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) + * + * The version of the OpenAPI document: 0.1.0 + * + * Generated by: https://openapi-generator.tech + */ + +use crate::models; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] +pub struct ApiErrorOneOf4 { + #[serde(rename = "FailedCreateUnsignedTx")] + pub failed_create_unsigned_tx: String, +} + +impl ApiErrorOneOf4 { + pub fn new(failed_create_unsigned_tx: String) -> ApiErrorOneOf4 { + ApiErrorOneOf4 { + failed_create_unsigned_tx, + } + } +} diff --git a/minotari-client/src/models/lock_funds_request.rs b/minotari-client/src/models/lock_funds_request.rs new file mode 100644 index 0000000..fbfb87f --- /dev/null +++ b/minotari-client/src/models/lock_funds_request.rs @@ -0,0 +1,61 @@ +/* + * minotari + * + * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) + * + * The version of the OpenAPI document: 0.1.0 + * + * Generated by: https://openapi-generator.tech + */ + +use crate::models; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] +pub struct LockFundsRequest { + #[serde(rename = "amount")] + pub amount: i64, + #[serde( + rename = "estimated_output_size", + default, + with = "::serde_with::rust::double_option", + skip_serializing_if = "Option::is_none" + )] + pub estimated_output_size: Option>, + #[serde(rename = "fee_per_gram", skip_serializing_if = "Option::is_none")] + pub fee_per_gram: Option, + #[serde( + rename = "idempotency_key", + default, + with = "::serde_with::rust::double_option", + skip_serializing_if = "Option::is_none" + )] + pub idempotency_key: Option>, + #[serde( + rename = "num_outputs", + default, + with = "::serde_with::rust::double_option", + skip_serializing_if = "Option::is_none" + )] + pub num_outputs: Option>, + #[serde( + rename = "seconds_to_lock_utxos", + default, + with = "::serde_with::rust::double_option", + skip_serializing_if = "Option::is_none" + )] + pub seconds_to_lock_utxos: Option>, +} + +impl LockFundsRequest { + pub fn new(amount: i64) -> LockFundsRequest { + LockFundsRequest { + amount, + estimated_output_size: None, + fee_per_gram: None, + idempotency_key: None, + num_outputs: None, + seconds_to_lock_utxos: None, + } + } +} diff --git a/minotari-client/src/models/lock_funds_response.rs b/minotari-client/src/models/lock_funds_response.rs new file mode 100644 index 0000000..2e2d241 --- /dev/null +++ b/minotari-client/src/models/lock_funds_response.rs @@ -0,0 +1,44 @@ +/* + * minotari + * + * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) + * + * The version of the OpenAPI document: 0.1.0 + * + * Generated by: https://openapi-generator.tech + */ + +use crate::models; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] +pub struct LockFundsResponse { + #[serde(rename = "fee_with_change")] + pub fee_with_change: i64, + #[serde(rename = "fee_without_change")] + pub fee_without_change: i64, + #[serde(rename = "requires_change_output")] + pub requires_change_output: bool, + #[serde(rename = "total_value")] + pub total_value: i64, + #[serde(rename = "utxos")] + pub utxos: Vec, +} + +impl LockFundsResponse { + pub fn new( + fee_with_change: i64, + fee_without_change: i64, + requires_change_output: bool, + total_value: i64, + utxos: Vec, + ) -> LockFundsResponse { + LockFundsResponse { + fee_with_change, + fee_without_change, + requires_change_output, + total_value, + utxos, + } + } +} diff --git a/minotari-client/src/models/mod.rs b/minotari-client/src/models/mod.rs index 992062c..39c8c08 100644 --- a/minotari-client/src/models/mod.rs +++ b/minotari-client/src/models/mod.rs @@ -10,8 +10,14 @@ pub mod api_error_one_of_2; pub use self::api_error_one_of_2::ApiErrorOneOf2; pub mod api_error_one_of_3; pub use self::api_error_one_of_3::ApiErrorOneOf3; +pub mod api_error_one_of_4; +pub use self::api_error_one_of_4::ApiErrorOneOf4; pub mod create_transaction_request; pub use self::create_transaction_request::CreateTransactionRequest; +pub mod lock_funds_request; +pub use self::lock_funds_request::LockFundsRequest; +pub mod lock_funds_response; +pub use self::lock_funds_response::LockFundsResponse; pub mod recipient_request; pub use self::recipient_request::RecipientRequest; pub mod wallet_params; diff --git a/minotari_payment_processor/Cargo.toml b/minotari_payment_processor/Cargo.toml index 40fa9e4..58b2a86 100644 --- a/minotari_payment_processor/Cargo.toml +++ b/minotari_payment_processor/Cargo.toml @@ -7,7 +7,7 @@ edition = "2024" anyhow = "1.0.99" axum = { version = "0.8.6", features = ["default", "http2", "macros"] } chrono = "0.4.42" -clap = { version = "4.5.47", features = ["derive"] } +config = "0.15.19" tokio = { version = "1.47.1", features = ["full"] } serde = { workspace = true } serde_json = { workspace = true } @@ -25,8 +25,11 @@ minotari-client = { path = "../minotari-client" } uuid = { version = "1", features = ["v4"] } tempfile = "3.23.0" -minotari_node_wallet_client = { git = "https://github.com/stringhandler/tari/", rev = "6430bfffb53af71d80535243eec9165534af2b41" } -tari_transaction_components = { git = "https://github.com/stringhandler/tari/", rev = "6430bfffb53af71d80535243eec9165534af2b41" } +minotari_node_wallet_client = { git = "https://github.com/tari-project/tari/", rev = "ed473c3c94c2219ac1eebb9345384b6a7245606c" } +tari_common = { git = "https://github.com/tari-project/tari/", rev = "ed473c3c94c2219ac1eebb9345384b6a7245606c" } +tari_common_types = { git = "https://github.com/tari-project/tari/", rev = "ed473c3c94c2219ac1eebb9345384b6a7245606c" } +tari_crypto = { version = "0.22.1", features = ["borsh"] } +tari_transaction_components = { git = "https://github.com/tari-project/tari/", rev = "ed473c3c94c2219ac1eebb9345384b6a7245606c" } tari_utilities = { version = "0.8" } hex = "0.4.3" dotenv = "0.15.0" diff --git a/minotari_payment_processor/src/api/mod.rs b/minotari_payment_processor/src/api/mod.rs index 31f0346..2ccea96 100644 --- a/minotari_payment_processor/src/api/mod.rs +++ b/minotari_payment_processor/src/api/mod.rs @@ -7,6 +7,8 @@ use sqlx::SqlitePool; use utoipa::OpenApi; use utoipa_swagger_ui::SwaggerUi; +use crate::config::PaymentProcessorEnv; + mod error; mod payments; mod version; @@ -14,6 +16,7 @@ mod version; #[derive(Clone)] pub struct AppState { pub db_pool: SqlitePool, + pub env: PaymentProcessorEnv, } impl FromRef for SqlitePool { @@ -42,8 +45,8 @@ impl FromRef for SqlitePool { )] pub struct ApiDoc; -pub fn create_router(db_pool: SqlitePool) -> Router { - let app_state = AppState { db_pool }; +pub fn create_router(db_pool: SqlitePool, env: PaymentProcessorEnv) -> Router { + let app_state = AppState { db_pool, env }; Router::new() .merge(SwaggerUi::new("/swagger-ui").url("/openapi.json", ApiDoc::openapi())) diff --git a/minotari_payment_processor/src/api/payments.rs b/minotari_payment_processor/src/api/payments.rs index 58d4c32..6b311f4 100644 --- a/minotari_payment_processor/src/api/payments.rs +++ b/minotari_payment_processor/src/api/payments.rs @@ -10,18 +10,20 @@ use sqlx::SqlitePool; use utoipa::ToSchema; use crate::{ - api::error::ApiError, + api::{AppState, error::ApiError}, db::{ payment::{Payment, PaymentStatus}, payment_batch::PaymentBatch, }, }; + #[derive(Debug, Clone, Deserialize, ToSchema)] pub struct PaymentRequest { - pub client_id: String, + pub client_id: String, // Idempotency key pub account_name: String, pub recipient_address: String, pub amount: i64, + pub payment_id: Option, // Payment Memo } #[derive(Debug, Clone, Serialize, ToSchema)] @@ -82,17 +84,27 @@ impl From for PaymentResponse { responses( (status = 202, description = "Payment request accepted for processing", body = PaymentResponse), (status = 200, description = "Payment request already exists (idempotent)", body = PaymentResponse), - (status = 400, description = "Bad request", body = ApiError), + (status = 400, description = "Bad request (Invalid amount or Account not found)", body = ApiError), (status = 500, description = "Internal server error", body = ApiError) ) )] pub async fn api_create_payment( - State(db_pool): State, + State(state): State, Json(request): Json, ) -> Result { - let mut transaction = db_pool.begin().await?; + if !state.env.accounts.contains_key(&request.account_name.to_lowercase()) { + return Err(ApiError::BadRequest(format!( + "Account '{}' not found in configuration", + request.account_name + ))); + } + + if request.amount <= 0 { + return Err(ApiError::BadRequest("Amount must be positive".to_string())); + } + + let mut transaction = state.db_pool.begin().await?; - // Idempotency check if let Some(existing_payment) = Payment::get_by_client_id(&mut transaction, &request.client_id, &request.account_name).await? { @@ -100,18 +112,13 @@ pub async fn api_create_payment( return Ok((StatusCode::OK, Json(PaymentResponse::from(existing_payment)))); } - // Validate amount - if request.amount <= 0 { - return Err(ApiError::BadRequest("Amount must be positive".to_string())); - } - let new_payment = Payment::create( &mut transaction, &request.client_id, &request.account_name, &request.recipient_address, request.amount, - None, // payment_id is generated internally + request.payment_id, ) .await?; diff --git a/minotari_payment_processor/src/config.rs b/minotari_payment_processor/src/config.rs new file mode 100644 index 0000000..c2c181f --- /dev/null +++ b/minotari_payment_processor/src/config.rs @@ -0,0 +1,166 @@ +use anyhow::Context; +use config::{Config, Environment}; +use serde::Deserialize; +use std::{collections::HashMap, str::FromStr}; +use tari_common::configuration::Network; +use tari_common_types::{ + tari_address::{TariAddress, TariAddressFeatures}, + types::CompressedPublicKey, +}; +use tari_crypto::keys::PublicKey; +use tari_crypto::{ + compressed_key::CompressedKey, + ristretto::{RistrettoPublicKey, RistrettoSecretKey}, +}; +use tari_utilities::ByteArray; + +#[derive(Debug, Clone)] +pub struct PaymentReceiverAccount { + pub name: String, + pub view_key: RistrettoSecretKey, + pub public_spend_key: CompressedKey, + pub address: TariAddress, +} + +#[derive(Debug, Clone)] +pub struct PaymentProcessorEnv { + pub tari_network: Network, + pub database_url: String, + pub payment_receiver: String, + pub base_node: String, + pub console_wallet_path: String, + pub console_wallet_base_path: String, + pub console_wallet_password: String, + pub listen_ip: String, + pub listen_port: u16, + pub batch_creator_sleep_secs: Option, + pub unsigned_tx_creator_sleep_secs: Option, + pub transaction_signer_sleep_secs: Option, + pub broadcaster_sleep_secs: Option, + pub confirmation_checker_sleep_secs: Option, + pub accounts: HashMap, +} + +#[derive(Deserialize)] +struct RawAccount { + name: String, + view_key: String, + public_spend_key: String, +} + +#[derive(Deserialize)] +struct RawSettings { + #[serde(default = "default_network_str")] + tari_network: String, + database_url: String, + payment_receiver: String, + base_node: String, + console_wallet_path: String, + console_wallet_base_path: String, + console_wallet_password: String, + #[serde(default = "default_ip")] + listen_ip: String, + #[serde(default = "default_port")] + listen_port: u16, + batch_creator_sleep_secs: Option, + unsigned_tx_creator_sleep_secs: Option, + transaction_signer_sleep_secs: Option, + broadcaster_sleep_secs: Option, + confirmation_checker_sleep_secs: Option, + #[serde(default)] + accounts: HashMap, +} + +fn default_ip() -> String { + "0.0.0.0".to_string() +} +fn default_port() -> u16 { + 9145 +} +fn default_network_str() -> String { + "MainNet".to_string() +} + +impl PaymentProcessorEnv { + pub fn load() -> anyhow::Result { + // For nested HashMaps (accounts), it supports "ACCOUNTS__KEY__FIELD" syntax. + let s = Config::builder() + .add_source(Environment::default().separator("__")) + .build()?; + + let raw: RawSettings = s + .try_deserialize() + .context("Failed to read configuration from environment variables")?; + + Self::try_from(raw) + } +} + +impl TryFrom for PaymentProcessorEnv { + type Error = anyhow::Error; + + fn try_from(raw: RawSettings) -> Result { + let tari_network = Network::from_str(&raw.tari_network) + .context(format!("Failed to parse tari_network: {}", raw.tari_network))?; + + let mut accounts = HashMap::new(); + for (_key, raw_acc) in raw.accounts { + let view_key = parse_view_key(&raw_acc.view_key) + .context(format!("Failed to parse view_key for account '{}'", raw_acc.name))?; + + let public_spend_key = parse_public_spend_key(&raw_acc.public_spend_key).context(format!( + "Failed to parse public_spend_key for account '{}'", + raw_acc.name + ))?; + + let address = TariAddress::new_dual_address( + CompressedPublicKey::new_from_pk(RistrettoPublicKey::from_secret_key(&view_key)), + public_spend_key.clone(), + tari_network, + TariAddressFeatures::create_one_sided_only(), + None, + )?; + + accounts.insert( + raw_acc.name.clone().to_lowercase(), + PaymentReceiverAccount { + name: raw_acc.name, + view_key, + public_spend_key, + address, + }, + ); + } + + Ok(Self { + tari_network, + database_url: raw.database_url, + payment_receiver: raw.payment_receiver, + base_node: raw.base_node, + console_wallet_path: raw.console_wallet_path, + console_wallet_base_path: raw.console_wallet_base_path, + console_wallet_password: raw.console_wallet_password, + listen_ip: raw.listen_ip, + listen_port: raw.listen_port, + batch_creator_sleep_secs: raw.batch_creator_sleep_secs, + unsigned_tx_creator_sleep_secs: raw.unsigned_tx_creator_sleep_secs, + transaction_signer_sleep_secs: raw.transaction_signer_sleep_secs, + broadcaster_sleep_secs: raw.broadcaster_sleep_secs, + confirmation_checker_sleep_secs: raw.confirmation_checker_sleep_secs, + accounts, + }) + } +} + +fn parse_view_key(view_key_hex: &str) -> anyhow::Result { + let view_key_bytes = hex::decode(view_key_hex)?; + let view_key = RistrettoSecretKey::from_canonical_bytes(&view_key_bytes).map_err(|e| anyhow::anyhow!(e))?; + Ok(view_key) +} + +fn parse_public_spend_key(public_spend_key_hex: &str) -> anyhow::Result> { + let spend_key_bytes = hex::decode(public_spend_key_hex)?; + let spend_key = + CompressedKey::::from_canonical_bytes(&spend_key_bytes).map_err(|e| anyhow::anyhow!(e))?; + Ok(spend_key) +} diff --git a/minotari_payment_processor/src/db/payment.rs b/minotari_payment_processor/src/db/payment.rs index f0048d7..1851b7f 100644 --- a/minotari_payment_processor/src/db/payment.rs +++ b/minotari_payment_processor/src/db/payment.rs @@ -375,7 +375,7 @@ struct PaymentWithBatch { updated_at: DateTime, batch_id: Option, batch_account_name: Option, - batch_status: Option, // This will be converted to PaymentBatchStatus later + batch_status: Option, batch_pr_idempotency_key: Option, batch_unsigned_tx_json: Option, batch_signed_tx_json: Option, diff --git a/minotari_payment_processor/src/db/payment_batch.rs b/minotari_payment_processor/src/db/payment_batch.rs index 2531871..61bba42 100644 --- a/minotari_payment_processor/src/db/payment_batch.rs +++ b/minotari_payment_processor/src/db/payment_batch.rs @@ -193,6 +193,7 @@ impl PaymentBatch { updated_at as "updated_at: DateTime" FROM payment_batches WHERE status = ? + ORDER BY created_at "#, status ) diff --git a/minotari_payment_processor/src/lib.rs b/minotari_payment_processor/src/lib.rs index da37484..f9f56ef 100644 --- a/minotari_payment_processor/src/lib.rs +++ b/minotari_payment_processor/src/lib.rs @@ -1,3 +1,4 @@ pub mod api; +pub mod config; pub mod db; pub mod workers; diff --git a/minotari_payment_processor/src/main.rs b/minotari_payment_processor/src/main.rs index 4d24d09..58e9e0c 100644 --- a/minotari_payment_processor/src/main.rs +++ b/minotari_payment_processor/src/main.rs @@ -1,80 +1,16 @@ -use anyhow::anyhow; use dotenv::dotenv; use minotari_client::apis::configuration::Configuration as MinotariConfiguration; use minotari_node_wallet_client::http::Client as BaseNodeClient; -use minotari_payment_processor::{api, db, workers}; +use minotari_payment_processor::{api, config::PaymentProcessorEnv, db, workers}; use std::sync::Arc; use tokio::{net::TcpListener, signal}; use url::Url; -struct PaymentProcessorEnv { - pub database_url: String, - pub payment_receiver: String, - pub base_node: String, - pub console_wallet_path: String, - pub console_wallet_password: String, - pub listen_ip: String, - pub listen_port: u16, - pub batch_creator_sleep_secs: Option, - pub unsigned_tx_creator_sleep_secs: Option, - pub transaction_signer_sleep_secs: Option, - pub broadcaster_sleep_secs: Option, - pub confirmation_checker_sleep_secs: Option, -} - -impl PaymentProcessorEnv { - pub fn from_env() -> anyhow::Result { - let database_url = - std::env::var("DATABASE_URL").map_err(|_| anyhow!("DATABASE_URL environment variable not set"))?; - let payment_receiver = - std::env::var("PAYMENT_RECEIVER").map_err(|_| anyhow!("PAYMENT_RECEIVER environment variable not set"))?; - let base_node = std::env::var("BASE_NODE").map_err(|_| anyhow!("BASE_NODE environment variable not set"))?; - let console_wallet_path = std::env::var("CONSOLE_WALLET_PATH") - .map_err(|_| anyhow!("CONSOLE_WALLET_PATH environment variable not set"))?; - let console_wallet_password = std::env::var("CONSOLE_WALLET_PASSWORD") - .map_err(|_| anyhow!("CONSOLE_WALLET_PASSWORD environment variable not set"))?; - let listen_ip = std::env::var("LISTEN_IP").unwrap_or_else(|_| "0.0.0.0".to_string()); - let listen_port = std::env::var("LISTEN_PORT") - .unwrap_or_else(|_| "9145".to_string()) - .parse::()?; - - let batch_creator_sleep_secs = std::env::var("BATCH_CREATOR_SLEEP_SECS") - .ok() - .and_then(|s| s.parse::().ok()); - let unsigned_tx_creator_sleep_secs = std::env::var("UNSIGNED_TX_CREATOR_SLEEP_SECS") - .ok() - .and_then(|s| s.parse::().ok()); - let transaction_signer_sleep_secs = std::env::var("TRANSACTION_SIGNER_SLEEP_SECS") - .ok() - .and_then(|s| s.parse::().ok()); - let broadcaster_sleep_secs = std::env::var("BROADCASTER_SLEEP_SECS") - .ok() - .and_then(|s| s.parse::().ok()); - let confirmation_checker_sleep_secs = std::env::var("CONFIRMATION_CHECKER_SLEEP_SECS") - .ok() - .and_then(|s| s.parse::().ok()); - - Ok(Self { - database_url, - payment_receiver, - base_node, - console_wallet_path, - console_wallet_password, - listen_ip, - listen_port, - batch_creator_sleep_secs, - unsigned_tx_creator_sleep_secs, - transaction_signer_sleep_secs, - broadcaster_sleep_secs, - confirmation_checker_sleep_secs, - }) - } -} - #[tokio::main] async fn main() -> anyhow::Result<()> { dotenv().ok(); - let env = PaymentProcessorEnv::from_env()?; + let env = PaymentProcessorEnv::load()?; + let app_env = env.clone(); println!("Starting Minotari Payment Processor..."); @@ -97,11 +33,15 @@ async fn main() -> anyhow::Result<()> { tokio::spawn(workers::unsigned_tx_creator::run( db_pool.clone(), client_config.clone(), + env.tari_network, + env.accounts.clone(), env.unsigned_tx_creator_sleep_secs, )); tokio::spawn(workers::transaction_signer::run( db_pool.clone(), + env.tari_network, env.console_wallet_path.clone(), + env.console_wallet_base_path.clone(), env.console_wallet_password.clone(), env.transaction_signer_sleep_secs, )); @@ -118,7 +58,7 @@ async fn main() -> anyhow::Result<()> { println!("Minotari Payment Processor started. Press Ctrl+C to shut down."); // Create Axum API router - let app = api::create_router(db_pool.clone()); + let app = api::create_router(db_pool.clone(), app_env); let addr = format!("{}:{}", env.listen_ip, env.listen_port); let listener = TcpListener::bind(&addr).await?; println!("Axum API server listening on {}", addr); diff --git a/minotari_payment_processor/src/workers/batch_creator.rs b/minotari_payment_processor/src/workers/batch_creator.rs index ae57ad9..017ac10 100644 --- a/minotari_payment_processor/src/workers/batch_creator.rs +++ b/minotari_payment_processor/src/workers/batch_creator.rs @@ -1,3 +1,4 @@ +use anyhow::Context; use sqlx::SqlitePool; use std::collections::HashMap; use tokio::time::{self, Duration}; @@ -9,35 +10,42 @@ const DEFAULT_SLEEP_SECS: u64 = 10 * 60; // 10 minutes const MAX_BATCH_SIZE: i64 = 100; pub async fn run(db_pool: SqlitePool, sleep_secs: Option) { - let sleep_secs = sleep_secs.unwrap_or(DEFAULT_SLEEP_SECS); + let sleep_duration = Duration::from_secs(sleep_secs.unwrap_or(DEFAULT_SLEEP_SECS)); + + println!("Batch Creator worker started. Cycle interval: {:?}.", sleep_duration); + loop { - let mut should_sleep = true; - match process_batches(&db_pool).await { + match process_payment_cycle(&db_pool).await { Ok(more_batches_expected) => { - if more_batches_expected { - should_sleep = false; + if !more_batches_expected { + time::sleep(sleep_duration).await; + } else { + println!("INFO: Max batch size reached. Continuing to next cycle immediately."); } }, Err(e) => { - eprintln!("Batch Creator worker error: {:?}", e); + eprintln!("Batch Creator worker critical error: {:?}. Sleeping...", e); + time::sleep(sleep_duration).await; }, } - - if should_sleep { - time::sleep(Duration::from_secs(sleep_secs)).await; - } } } -async fn process_batches(db_pool: &SqlitePool) -> Result { - let mut conn = db_pool.acquire().await?; - let payments = Payment::find_receivable_payments(&mut conn, MAX_BATCH_SIZE).await?; +async fn process_payment_cycle(db_pool: &SqlitePool) -> Result { + let mut conn = db_pool.acquire().await.context("Failed to acquire DB connection")?; + + let payments = Payment::find_receivable_payments(&mut conn, MAX_BATCH_SIZE) + .await + .context("Failed to find receivable payments")?; + let payments_count = payments.len(); if payments.is_empty() { return Ok(false); } + println!("INFO: Found {} receivable payments to process.", payments_count); + let mut payments_by_account: HashMap> = HashMap::new(); for payment in payments { payments_by_account @@ -47,14 +55,48 @@ async fn process_batches(db_pool: &SqlitePool) -> Result { } for (account_name, account_payments) in payments_by_account { - let payment_ids: Vec = account_payments.iter().map(|p| p.id.clone()).collect(); - let pr_idempotency_key = Uuid::new_v4().to_string(); - if let Err(e) = - PaymentBatch::create_with_payments(&mut conn, &account_name, &pr_idempotency_key, &payment_ids).await - { - eprintln!("Failed to create batch for account {}: {:?}", account_name, e); + println!( + "INFO: Processing group for account '{}' with {} payments.", + account_name, + account_payments.len() + ); + + if let Err(e) = process_account_batch(db_pool, &account_name, &account_payments).await { + eprintln!("Failed to create batch for account '{}': {:?}", account_name, e); } } Ok(payments_count == MAX_BATCH_SIZE as usize) } + +async fn process_account_batch( + db_pool: &SqlitePool, + account_name: &str, + payments: &[Payment], +) -> Result<(), anyhow::Error> { + if payments.is_empty() { + return Ok(()); + } + + let payment_ids: Vec = payments.iter().map(|p| p.id.clone()).collect(); + let pr_idempotency_key = Uuid::new_v4().to_string(); + + println!( + "INFO: Creating batch for Account: '{}'. Idempotency Key: {}. Payment Count: {}", + account_name, + pr_idempotency_key, + payments.len() + ); + + let mut tx = db_pool.begin().await.context("Failed to start transaction")?; + + PaymentBatch::create_with_payments(&mut tx, account_name, &pr_idempotency_key, &payment_ids) + .await + .with_context(|| format!("Failed to create batch entry for account {}", account_name))?; + + tx.commit().await.context("Failed to commit batch transaction")?; + + println!("INFO: Successfully committed batch for Account: '{}'.", account_name); + + Ok(()) +} diff --git a/minotari_payment_processor/src/workers/broadcaster.rs b/minotari_payment_processor/src/workers/broadcaster.rs index 74d6ef2..e85ac52 100644 --- a/minotari_payment_processor/src/workers/broadcaster.rs +++ b/minotari_payment_processor/src/workers/broadcaster.rs @@ -1,6 +1,8 @@ +use anyhow::{Context, anyhow}; use minotari_node_wallet_client::{BaseNodeWalletClient, http::Client}; -use sqlx::SqlitePool; -use tari_transaction_components::offline_signing::models::{SignedOneSidedTransactionResult, TransactionResult}; +use sqlx::{SqliteConnection, SqlitePool}; +use tari_transaction_components::offline_signing::models::SignedOneSidedTransactionResult; +use tari_utilities::message_format::MessageFormat; use tokio::time::{self, Duration}; use crate::db::payment_batch::{PaymentBatch, PaymentBatchStatus}; @@ -9,7 +11,13 @@ const DEFAULT_SLEEP_SECS: u64 = 15; pub async fn run(db_pool: SqlitePool, base_node_client: Client, sleep_secs: Option) { let sleep_secs = sleep_secs.unwrap_or(DEFAULT_SLEEP_SECS); + println!( + "Transaction Broadcaster worker started. Polling every {} seconds.", + sleep_secs + ); + let mut interval = time::interval(Duration::from_secs(sleep_secs)); + loop { interval.tick().await; if let Err(e) = process_transactions_to_broadcast(&db_pool, &base_node_client).await { @@ -23,34 +31,81 @@ async fn process_transactions_to_broadcast( base_node_client: &Client, ) -> Result<(), anyhow::Error> { let mut conn = db_pool.acquire().await?; + let batches = PaymentBatch::find_by_status(&mut conn, PaymentBatchStatus::AwaitingBroadcast).await?; + if !batches.is_empty() { + println!("INFO: Found {} batches awaiting broadcast.", batches.len()); + } + for batch in batches { - // Update its status to `BROADCASTING`. - PaymentBatch::update_to_broadcasting(&mut conn, &batch.id).await?; - - let batch_id = batch.id.clone(); - let signed_tx_json = batch - .signed_tx_json - .clone() - .ok_or_else(|| anyhow::anyhow!("Batch {} has no signed_tx_json", batch_id))?; - let signed_tx = SignedOneSidedTransactionResult::from_json(&signed_tx_json)?; - - let response = base_node_client - .submit_transaction(signed_tx.signed_transaction.transaction) - .await?; - - if response.accepted { - PaymentBatch::update_to_awaiting_confirmation(&mut conn, &batch_id).await?; - } else { - let error_message = format!( - "Tari base node rejected transaction for batch {}: {}", - batch_id, response.rejection_reason + if let Err(e) = process_single_batch(&mut conn, base_node_client, &batch).await { + let error_message = e.to_string(); + eprintln!( + "Error broadcasting batch {}: {}. Incrementing retry count.", + batch.id, error_message ); - eprintln!("{}", error_message); - PaymentBatch::increment_retry_count(&mut conn, &batch_id, &error_message).await?; + + if let Err(db_err) = PaymentBatch::increment_retry_count(&mut conn, &batch.id, &error_message).await { + eprintln!( + "CRITICAL: Failed to update retry count for batch {}: {:?}", + batch.id, db_err + ); + } } } Ok(()) } + +async fn process_single_batch( + conn: &mut SqliteConnection, + base_node_client: &Client, + batch: &PaymentBatch, +) -> Result<(), anyhow::Error> { + let batch_id = &batch.id; + println!("INFO: Starting broadcast sequence for Batch ID: {}", batch_id); + + let signed_tx_json = batch + .signed_tx_json + .clone() + .ok_or_else(|| anyhow!("Batch {} has no signed_tx_json", batch_id))?; + + let signed_tx = SignedOneSidedTransactionResult::from_json(&signed_tx_json) + .map_err(|e| anyhow!("Failed to deserialize signed tx: {}", e))?; + + PaymentBatch::update_to_broadcasting(conn, batch_id) + .await + .context("Failed to set status to broadcasting")?; + + println!( + "INFO: Batch {}: Status updated to 'Broadcasting'. Submitting to Base Node...", + batch_id + ); + + let response = base_node_client + .submit_transaction(signed_tx.signed_transaction.transaction) + .await + .context("Network error submitting transaction to Base Node")?; + + if response.accepted { + println!("INFO: Batch {}: Transaction ACCEPTED by Base Node.", batch_id); + + PaymentBatch::update_to_awaiting_confirmation(conn, batch_id) + .await + .context("Failed to update status to AwaitingConfirmation")?; + + println!("INFO: Batch {}: Status updated to 'AwaitingConfirmation'.", batch_id); + } else { + println!( + "WARN: Batch {}: Transaction REJECTED by Base Node. Reason: {}", + batch_id, response.rejection_reason + ); + return Err(anyhow!( + "Tari base node rejected transaction: {}", + response.rejection_reason + )); + } + + Ok(()) +} diff --git a/minotari_payment_processor/src/workers/confirmation_checker.rs b/minotari_payment_processor/src/workers/confirmation_checker.rs index cb0296b..08ad22c 100644 --- a/minotari_payment_processor/src/workers/confirmation_checker.rs +++ b/minotari_payment_processor/src/workers/confirmation_checker.rs @@ -1,20 +1,27 @@ -use anyhow::anyhow; +use anyhow::{Context, anyhow}; use minotari_node_wallet_client::{BaseNodeWalletClient, http::Client}; -use sqlx::{Acquire, SqlitePool}; -use tari_transaction_components::offline_signing::models::{SignedOneSidedTransactionResult, TransactionResult}; +use sqlx::SqlitePool; +use tari_transaction_components::offline_signing::models::SignedOneSidedTransactionResult; +use tari_transaction_components::offline_signing::models::TransactionResult; use tari_transaction_components::rpc::models::TxLocation; use tari_utilities::byte_array::ByteArray; use tokio::time::{self, Duration}; -use crate::db::payment_batch::PaymentBatchStatus; -use crate::db::{payment::Payment, payment_batch::PaymentBatch}; +use crate::db::payment::Payment; +use crate::db::payment_batch::{PaymentBatch, PaymentBatchStatus}; const DEFAULT_SLEEP_SECS: u64 = 60; const REQUIRED_CONFIRMATIONS: u64 = 10; pub async fn run(db_pool: SqlitePool, base_node_client: Client, sleep_secs: Option) { let sleep_secs = sleep_secs.unwrap_or(DEFAULT_SLEEP_SECS); + println!( + "Confirmation Checker worker started. Polling every {} seconds. Required Confirmations: {}", + sleep_secs, REQUIRED_CONFIRMATIONS + ); + let mut interval = time::interval(Duration::from_secs(sleep_secs)); + loop { interval.tick().await; if let Err(e) = check_transaction_confirmations(&db_pool, &base_node_client).await { @@ -25,74 +32,169 @@ pub async fn run(db_pool: SqlitePool, base_node_client: Client, sleep_secs: Opti async fn check_transaction_confirmations(db_pool: &SqlitePool, base_node_client: &Client) -> Result<(), anyhow::Error> { let mut conn = db_pool.acquire().await?; + let batches = PaymentBatch::find_by_status(&mut conn, PaymentBatchStatus::AwaitingConfirmation).await?; + if !batches.is_empty() { + println!("INFO: Found {} batches awaiting confirmation.", batches.len()); + } + for batch in batches { - let batch_id = batch.id.clone(); + if let Err(e) = process_single_batch(db_pool, base_node_client, &batch).await { + let error_message = e.to_string(); + eprintln!( + "Error checking confirmation for batch {}: {}. Incrementing retry count.", + batch.id, error_message + ); - let signed_tx_json = batch - .signed_tx_json - .clone() - .ok_or_else(|| anyhow::anyhow!("Batch {} has no signed_tx_json", batch_id))?; - let signed_tx = SignedOneSidedTransactionResult::from_json(&signed_tx_json)?; - let sig = &signed_tx.signed_transaction.transaction.body.kernels()[0].excess_sig; - let excess_sig_nonce = sig.get_compressed_public_nonce().to_vec(); - let excess_sig_sig = sig.get_signature().to_vec(); - let tx_query_response = base_node_client - .transaction_query(excess_sig_nonce, excess_sig_sig) - .await?; - - match tx_query_response.location { - TxLocation::Mined => { - let mined_height = tx_query_response - .mined_height - .ok_or_else(|| anyhow!("Mined transaction has no mined_height"))?; - - let tip_info = base_node_client.get_tip_info().await?; - let best_block_height = tip_info - .metadata - .ok_or_else(|| anyhow!("Tip info has no metadata"))? - .best_block_height(); - - let confirmations = best_block_height.saturating_sub(mined_height) + 1; - - if confirmations >= REQUIRED_CONFIRMATIONS { - let mined_header_hash = tx_query_response - .mined_header_hash - .ok_or_else(|| anyhow!("Mined transaction has no mined_header_hash"))?; - let mined_timestamp = tx_query_response - .mined_timestamp - .ok_or_else(|| anyhow!("Mined transaction has no mined_timestamp"))?; - - let mut tx = conn.begin().await?; - PaymentBatch::update_to_confirmed( - &mut tx, - &batch_id, - mined_height, - mined_header_hash, - mined_timestamp, - ) - .await?; - let associated_payments = Payment::find_by_batch_id(&mut tx, &batch_id).await?; - let payment_ids: Vec = associated_payments.iter().map(|p| p.id.clone()).collect(); - Payment::update_payments_to_confirmed(&mut tx, &payment_ids).await?; - tx.commit().await?; - println!("Batch {} confirmed successfully.", batch_id); - } else { - println!( - "Batch {} awaiting more confirmations ({} received).", - batch_id, confirmations - ); - } - }, - TxLocation::InMempool => { - println!("Batch {} is in mempool, awaiting mining.", batch_id); - }, - TxLocation::None | TxLocation::NotStored => { - eprintln!("Batch {} transaction not found on base node or mempool.", batch_id); - }, + if let Err(db_err) = PaymentBatch::increment_retry_count(&mut conn, &batch.id, &error_message).await { + eprintln!( + "CRITICAL: Failed to update retry count for batch {}: {:?}", + batch.id, db_err + ); + } } } Ok(()) } + +async fn process_single_batch( + db_pool: &SqlitePool, + base_node_client: &Client, + batch: &PaymentBatch, +) -> Result<(), anyhow::Error> { + let batch_id = &batch.id; + + println!("INFO: Checking status for Batch ID: {}", batch_id); + + let signed_tx_json = batch + .signed_tx_json + .clone() + .ok_or_else(|| anyhow!("Batch {} has no signed_tx_json", batch_id))?; + + let signed_tx = SignedOneSidedTransactionResult::from_json(&signed_tx_json)?; + + let kernel = signed_tx + .signed_transaction + .transaction + .body + .kernels() + .first() + .ok_or_else(|| anyhow!("Transaction has no kernels"))?; + + let excess_sig_nonce = kernel.excess_sig.get_compressed_public_nonce().to_vec(); + let excess_sig_sig = kernel.excess_sig.get_signature().to_vec(); + + println!( + "DEBUG: Batch {}: Querying Base Node for Kernel Signature (Nonce start: {:?})", + batch_id, + &excess_sig_nonce[0..4] + ); + + let tx_query_response = base_node_client + .transaction_query(excess_sig_nonce, excess_sig_sig) + .await + .context("Failed to query transaction from Base Node")?; + + match tx_query_response.location { + TxLocation::Mined => { + println!( + "INFO: Batch {}: Location 'Mined'. Processing confirmations...", + batch_id + ); + handle_mined_transaction(db_pool, base_node_client, batch_id, &tx_query_response).await? + }, + TxLocation::InMempool => { + println!("INFO: Batch {} is currently in the mempool, awaiting mining.", batch_id); + }, + TxLocation::None | TxLocation::NotStored => { + println!( + "WARN: Batch {} location returned as '{:?}'.", + batch_id, tx_query_response.location + ); + return Err(anyhow!( + "Transaction not found on Base Node (Location: {:?}). It may have been dropped or reorged.", + tx_query_response.location + )); + }, + } + + Ok(()) +} + +async fn handle_mined_transaction( + db_pool: &SqlitePool, + base_node_client: &Client, + batch_id: &str, + tx_query_response: &tari_transaction_components::rpc::models::TxQueryResponse, +) -> Result<(), anyhow::Error> { + let mined_height = tx_query_response + .mined_height + .ok_or_else(|| anyhow!("Mined transaction missing mined_height"))?; + + let tip_info = base_node_client + .get_tip_info() + .await + .context("Failed to get tip info from Base Node")?; + + let best_block_height = tip_info + .metadata + .ok_or_else(|| anyhow!("Tip info missing metadata"))? + .best_block_height(); + + let confirmations = best_block_height.saturating_sub(mined_height) + 1; + + println!( + "INFO: Batch {}: Mined Height: {}, Tip Height: {}, Confirmations: {}/{}", + batch_id, mined_height, best_block_height, confirmations, REQUIRED_CONFIRMATIONS + ); + + if confirmations >= REQUIRED_CONFIRMATIONS { + println!( + "INFO: Batch {}: Confirmation threshold reached. Finalizing...", + batch_id + ); + + let mined_header_hash = tx_query_response + .mined_header_hash + .clone() + .ok_or_else(|| anyhow!("Mined transaction missing mined_header_hash"))?; + let mined_timestamp = tx_query_response + .mined_timestamp + .ok_or_else(|| anyhow!("Mined transaction missing mined_timestamp"))?; + + let mut tx = db_pool.begin().await.context("Failed to begin DB transaction")?; + + PaymentBatch::update_to_confirmed(&mut tx, batch_id, mined_height, mined_header_hash, mined_timestamp) + .await + .context("Failed to update batch to Confirmed")?; + + let associated_payments = Payment::find_by_batch_id(&mut tx, batch_id) + .await + .context("Failed to fetch associated payments")?; + + println!( + "INFO: Batch {}: Marking {} associated payments as confirmed.", + batch_id, + associated_payments.len() + ); + + let payment_ids: Vec = associated_payments.iter().map(|p| p.id.clone()).collect(); + + Payment::update_payments_to_confirmed(&mut tx, &payment_ids) + .await + .context("Failed to update payments to Confirmed")?; + + tx.commit().await.context("Failed to commit DB transaction")?; + + println!("INFO: Batch {} confirmed successfully and DB updated.", batch_id); + } else { + println!( + "INFO: Batch {} awaiting more confirmations. (Current: {}, Required: {})", + batch_id, confirmations, REQUIRED_CONFIRMATIONS + ); + } + + Ok(()) +} diff --git a/minotari_payment_processor/src/workers/transaction_signer.rs b/minotari_payment_processor/src/workers/transaction_signer.rs index b49ce00..a8ad71c 100644 --- a/minotari_payment_processor/src/workers/transaction_signer.rs +++ b/minotari_payment_processor/src/workers/transaction_signer.rs @@ -1,7 +1,10 @@ -use sqlx::SqlitePool; +use anyhow::{Context, anyhow}; +use sqlx::{SqliteConnection, SqlitePool}; use std::io::Write; +use tari_common::configuration::Network; use tempfile::NamedTempFile; use tokio::fs; +use tokio::process::Command; use tokio::time::{self, Duration}; use crate::db::payment_batch::{PaymentBatch, PaymentBatchStatus}; @@ -10,15 +13,31 @@ const DEFAULT_SLEEP_SECS: u64 = 10; pub async fn run( db_pool: SqlitePool, + network: Network, console_wallet_path: String, + console_wallet_base_path: String, console_wallet_password: String, sleep_secs: Option, ) { let sleep_secs = sleep_secs.unwrap_or(DEFAULT_SLEEP_SECS); + println!( + "Transaction Signer worker started. Polling every {} seconds.", + sleep_secs + ); + let mut interval = time::interval(Duration::from_secs(sleep_secs)); + loop { interval.tick().await; - if let Err(e) = process_transactions_to_sign(&db_pool, &console_wallet_path, &console_wallet_password).await { + if let Err(e) = process_transactions_to_sign( + &db_pool, + network, + &console_wallet_path, + &console_wallet_base_path, + &console_wallet_password, + ) + .await + { eprintln!("Transaction Signer worker error: {:?}", e); } } @@ -26,70 +45,179 @@ pub async fn run( async fn process_transactions_to_sign( db_pool: &SqlitePool, + network: Network, console_wallet_path: &str, + console_wallet_base_path: &str, console_wallet_password: &str, ) -> Result<(), anyhow::Error> { let mut conn = db_pool.acquire().await?; + let batches = PaymentBatch::find_by_status(&mut conn, PaymentBatchStatus::AwaitingSignature).await?; + if !batches.is_empty() { + println!("INFO: Found {} batches awaiting signature.", batches.len()); + } + for batch in batches { - // Update its status to `SIGNING_IN_PROGRESS` to prevent other workers from picking it up. - PaymentBatch::update_to_signing_in_progress(&mut conn, &batch.id).await?; - - let batch_id = batch.id.clone(); - let unsigned_tx_json = batch - .unsigned_tx_json - .clone() - .ok_or_else(|| anyhow::anyhow!("Batch {} has no unsigned_tx_json", batch_id))?; - - // Create temporary input file - let mut input_file = NamedTempFile::with_prefix("unsigned-tx-")?; - input_file.write_all(unsigned_tx_json.as_bytes())?; - let input_file_path = input_file.path().to_path_buf(); - - // Create temporary output file - let output_file = NamedTempFile::with_prefix("signed-tx-")?; - let output_file_path = output_file.path().to_path_buf(); - - let batch_id_clone = batch_id.clone(); - let input_path_clone = input_file_path.clone(); - let output_path_clone = output_file_path.clone(); - - let console_wallet_path: String = console_wallet_path.to_string().clone(); - let console_wallet_password = console_wallet_password.to_string().clone(); - let signing_result = tokio::task::spawn_blocking(move || { - std::process::Command::new(console_wallet_path) - .env("MINOTARI_WALLET_PASSWORD", console_wallet_password) - .arg("sign-one-sided-transaction") - .arg("--input-file") - .arg(&input_path_clone) - .arg("--output-file") - .arg(&output_path_clone) - .output() - }) - .await?; - - match signing_result { - Ok(output) => { - if output.status.success() { - // On CLI Success (exit code 0) - let signed_tx_json = fs::read_to_string(&output_file_path).await?; - PaymentBatch::update_to_awaiting_broadcast(&mut conn, &batch_id_clone, &signed_tx_json).await?; - } else { - // On CLI Failure (non-zero exit code) - let error_message = String::from_utf8_lossy(&output.stderr).to_string(); - eprintln!("CLI signing failed for batch {}: {}", batch_id_clone, error_message); - PaymentBatch::update_to_failed(&mut conn, &batch_id_clone, &error_message).await?; - } - }, - Err(e) => { + if let Err(e) = process_single_batch( + &mut conn, + network, + console_wallet_path, + console_wallet_base_path, + console_wallet_password, + &batch, + ) + .await + { + let error_message = format!("{:#}", e); + eprintln!( + "Error signing batch {}: {}. Attempting to revert status...", + batch.id, error_message + ); + + let revert_result = if let Some(json) = &batch.unsigned_tx_json { + PaymentBatch::update_to_awaiting_signature(&mut conn, &batch.id, json).await + } else { + Err(anyhow::anyhow!("Cannot revert: Batch missing unsigned_tx_json"))? + }; + + match revert_result { + Ok(_) => println!("INFO: Batch {} reverted to 'AwaitingSignature'.", batch.id), + Err(revert_e) => eprintln!("CRITICAL: Failed to revert batch {} status: {:?}", batch.id, revert_e), + } + + if let Err(db_err) = PaymentBatch::increment_retry_count(&mut conn, &batch.id, &error_message).await { eprintln!( - "Failed to execute minotari_console_wallet for batch {}: {:?}", - batch_id_clone, e + "CRITICAL: Failed to update retry count for batch {}: {:?}", + batch.id, db_err ); - PaymentBatch::update_to_failed(&mut conn, &batch_id_clone, &format!("CLI execution error: {:?}", e)) - .await?; - }, + } + } + } + + Ok(()) +} + +async fn process_single_batch( + conn: &mut SqliteConnection, + network: Network, + console_wallet_path: &str, + console_wallet_base_path: &str, + console_wallet_password: &str, + batch: &PaymentBatch, +) -> Result<(), anyhow::Error> { + let batch_id = &batch.id; + println!("INFO: Starting processing for Batch ID: {}", batch_id); + + PaymentBatch::update_to_signing_in_progress(conn, batch_id) + .await + .context("Failed to update status to SigningInProgress")?; + + println!("INFO: Batch {}: Status updated to 'SigningInProgress'.", batch_id); + + let unsigned_tx_json = batch + .unsigned_tx_json + .clone() + .ok_or_else(|| anyhow!("Batch {} has no unsigned_tx_json", batch_id))?; + + let mut input_file = NamedTempFile::with_prefix("unsigned-tx-").context("Failed to create temp input file")?; + let input_path = input_file.path().to_path_buf(); + + input_file + .write_all(unsigned_tx_json.as_bytes()) + .context("Failed to write unsigned tx to temp file")?; + input_file.flush().context("Failed to flush input file")?; + + let output_file = NamedTempFile::with_prefix("signed-tx-").context("Failed to create temp output file")?; + let output_path = output_file.path().to_path_buf(); + + println!( + "INFO: Batch {}: Temp files prepared. Input: {:?}, Output: {:?}", + batch_id, input_path, output_path + ); + + println!("INFO: Batch {}: Invoking external signer CLI...", batch_id); + + sign_with_cli( + network, + console_wallet_path, + console_wallet_password, + console_wallet_base_path, + &input_path, + &output_path, + ) + .await + .context("External signing process failed")?; + + println!("INFO: Batch {}: External signer CLI finished successfully.", batch_id); + + let signed_tx_json = fs::read_to_string(&output_path) + .await + .context("Failed to read signed transaction from output file")?; + + PaymentBatch::update_to_awaiting_broadcast(conn, batch_id, &signed_tx_json) + .await + .context("Failed to update status to AwaitingBroadcast")?; + + println!( + "INFO: Batch {}: Status updated to 'AwaitingBroadcast'. Processing complete.", + batch_id + ); + + Ok(()) +} + +/// Executes the Minotari Console Wallet. +async fn sign_with_cli( + network: Network, + executable_path: &str, + password: &str, + base_path: &str, + input_path: &std::path::Path, + output_path: &std::path::Path, +) -> Result<(), anyhow::Error> { + let mut cmd = Command::new(executable_path); + cmd.current_dir(base_path) + .env("MINOTARI_WALLET_PASSWORD", password) + .arg("--command-mode-auto-exit") + .arg("--base-path") + .arg(base_path) + .arg("--network") + .arg(network.to_string()) + .arg("--skip-recovery") + .arg("sign-one-sided-transaction") + .arg("--input-file") + .arg(input_path) + .arg("--output-file") + .arg(output_path); + + let command_string = format!( + "MINOTARI_WALLET_PASSWORD=*** {} {}", + cmd.as_std().get_program().to_string_lossy(), + cmd.as_std() + .get_args() + .map(|arg| arg.to_string_lossy()) + .collect::>() + .join(" ") + ); + + println!("DEBUG: Executing Command: {}", command_string); + + let cmd_output = cmd.output().await.context("Failed to execute console wallet command")?; + + if !cmd_output.status.success() { + let stderr = String::from_utf8_lossy(&cmd_output.stderr); + let stdout = String::from_utf8_lossy(&cmd_output.stdout); + return Err(anyhow!( + "CLI exited with error code: {}.\nStderr: {}\nStdout: {}", + cmd_output.status, + stderr, + stdout + )); + } else { + let stdout = String::from_utf8_lossy(&cmd_output.stdout); + if !stdout.trim().is_empty() { + println!("DEBUG: CLI Stdout: {}", stdout); } } diff --git a/minotari_payment_processor/src/workers/unsigned_tx_creator.rs b/minotari_payment_processor/src/workers/unsigned_tx_creator.rs index 2a914c3..7041995 100644 --- a/minotari_payment_processor/src/workers/unsigned_tx_creator.rs +++ b/minotari_payment_processor/src/workers/unsigned_tx_creator.rs @@ -1,21 +1,50 @@ +use anyhow::{Context, anyhow}; use minotari_client::apis::{Error as ApiError, accounts_api, configuration::Configuration}; -use minotari_client::models::{CreateTransactionRequest, RecipientRequest}; -use sqlx::SqlitePool; +use minotari_client::models::LockFundsRequest; +use sqlx::{SqliteConnection, SqlitePool}; +use std::collections::HashMap; use std::sync::Arc; +use tari_common::configuration::Network; +use tari_common_types::tari_address::TariAddress; +use tari_common_types::transaction::TxId; +use tari_transaction_components::consensus::ConsensusConstantsBuilder; +use tari_transaction_components::key_manager::{ + KeyManager, + wallet_types::{ViewWallet, WalletType}, +}; +use tari_transaction_components::offline_signing::{PaymentRecipient, prepare_one_sided_transaction_for_signing}; +use tari_transaction_components::{ + TransactionBuilder, + // offline_signing::offline_signer::OfflineSigner, + tari_amount::MicroMinotari, + transaction_components::{MemoField, OutputFeatures, WalletOutput, memo_field::TxType}, +}; use tokio::time::{self, Duration}; -use crate::db::payment_batch::PaymentBatchStatus; -use crate::db::{payment::Payment, payment_batch::PaymentBatch}; +use crate::config::PaymentReceiverAccount; +use crate::db::payment::Payment; +use crate::db::payment_batch::{PaymentBatch, PaymentBatchStatus}; const DEFAULT_SLEEP_SECS: u64 = 15; -pub async fn run(db_pool: SqlitePool, client_config: Arc, sleep_secs: Option) { +pub async fn run( + db_pool: SqlitePool, + client_config: Arc, + network: Network, + accounts: HashMap, + sleep_secs: Option, +) { let sleep_secs = sleep_secs.unwrap_or(DEFAULT_SLEEP_SECS); + println!( + "Unsigned Transaction Creator worker started. Polling every {} seconds.", + sleep_secs + ); + let mut interval = time::interval(Duration::from_secs(sleep_secs)); loop { interval.tick().await; - if let Err(e) = process_unsigned_transactions(&db_pool, &client_config).await { + if let Err(e) = process_unsigned_transactions(&db_pool, &client_config, network, &accounts).await { eprintln!("Unsigned Transaction Creator worker error: {:?}", e); } } @@ -24,50 +53,162 @@ pub async fn run(db_pool: SqlitePool, client_config: Arc, sleep_s async fn process_unsigned_transactions( db_pool: &SqlitePool, client_config: &Configuration, + network: Network, + accounts: &HashMap, ) -> Result<(), anyhow::Error> { let mut conn = db_pool.acquire().await?; + let batches = PaymentBatch::find_by_status(&mut conn, PaymentBatchStatus::PendingBatching).await?; + if !batches.is_empty() { + println!( + "INFO: Found {} batches pending unsigned transaction creation.", + batches.len() + ); + } + for batch in batches { - let associated_payments = Payment::find_by_batch_id(&mut conn, &batch.id).await?; - let recipients: Vec = associated_payments - .into_iter() - .map(|p| RecipientRequest { - address: p.recipient_address, - amount: p.amount, - payment_id: p.payment_id.map(Some), - }) - .collect(); - - let request_body = CreateTransactionRequest { - idempotency_key: Some(Some(batch.pr_idempotency_key)), - recipients, - seconds_to_lock_utxos: None, // Use default lock timeout - }; - - match accounts_api::api_create_unsigned_transaction(client_config, &batch.account_name, request_body).await { - Ok(response) => { - let response_text = serde_json::to_string(&response)?; - PaymentBatch::update_to_awaiting_signature(&mut conn, &batch.id, &response_text).await?; - }, - Err(ApiError::ResponseError(response_content)) => { - let status = response_content.status; - let response_text = response_content.content; - - let error_message = format!( - "PR API returned unexpected status for batch {}: {} - {}", - batch.id, status, response_text + if let Err(e) = process_single_batch(&mut conn, client_config, network, accounts, &batch).await { + let error_message = e.to_string(); + eprintln!( + "Error processing batch {}: {}. Incrementing retry count.", + batch.id, error_message + ); + + if let Err(db_err) = PaymentBatch::increment_retry_count(&mut conn, &batch.id, &error_message).await { + eprintln!( + "CRITICAL: Failed to update retry count for batch {}: {:?}", + batch.id, db_err ); - eprintln!("{}", error_message); - PaymentBatch::increment_retry_count(&mut conn, &batch.id, &error_message).await?; - }, - Err(e) => { - let error_message = format!("Network error calling PR API for batch {}: {:?}", batch.id, e); - eprintln!("{}", error_message); - PaymentBatch::increment_retry_count(&mut conn, &batch.id, &error_message).await?; - }, + } } } Ok(()) } + +async fn process_single_batch( + conn: &mut SqliteConnection, + client_config: &Configuration, + network: Network, + accounts: &HashMap, + batch: &PaymentBatch, +) -> Result<(), anyhow::Error> { + let batch_id = &batch.id; + println!("INFO: Starting processing for Batch ID: {}", batch_id); + + let associated_payments = Payment::find_by_batch_id(conn, batch_id) + .await + .context("Failed to fetch payments for batch")?; + + if associated_payments.is_empty() { + return Err(anyhow!("Batch {} has no associated payments", batch_id)); + } + + let total_amount: i64 = associated_payments.iter().map(|p| p.amount).sum(); + let account_name = &batch.account_name; + + println!( + "INFO: Batch {}: Locking funds via API. Account: '{}', Total Amount: {}", + batch_id, account_name, total_amount + ); + + let lock_funds_request = LockFundsRequest { + amount: total_amount, + idempotency_key: Some(Some(batch.pr_idempotency_key.clone())), + ..Default::default() + }; + + let locked_funds = match accounts_api::api_lock_funds(client_config, account_name, lock_funds_request).await { + Ok(response) => { + println!( + "INFO: Batch {}: Funds locked successfully. Received {} inputs (UTXOs).", + batch_id, + response.utxos.len() + ); + response + }, + Err(ApiError::ResponseError(response_content)) => { + return Err(anyhow!( + "PR API returned unexpected status: {} - {}", + response_content.status, + response_content.content + )); + }, + Err(e) => return Err(anyhow!("Network error calling PR API: {:?}", e)), + }; + + println!("INFO: Batch {}: Preparing local offline signer...", batch_id); + + let first_recipient = &associated_payments[0]; + let account = accounts + .get(&first_recipient.account_name.to_lowercase()) + .ok_or_else(|| { + anyhow!( + "Account '{}' not found in local configuration", + first_recipient.account_name + ) + })?; + + let view_wallet = ViewWallet::new(account.public_spend_key.clone(), account.view_key.clone(), None); + let key_manager = KeyManager::new(WalletType::ViewWallet(view_wallet)).context("Failed to create KeyManager")?; + + let consensus_constants = ConsensusConstantsBuilder::new(network).build(); + let mut tx_builder = TransactionBuilder::new(consensus_constants, key_manager.clone(), network) + .context("Failed to create TransactionBuilder")?; + + tx_builder.with_fee_per_gram(MicroMinotari(5)); + + for utxo_value in &locked_funds.utxos { + let utxo: WalletOutput = + serde_json::from_value(utxo_value.clone()).map_err(|e| anyhow!("Failed to deserialize utxo: {}", e))?; + tx_builder + .with_input(utxo) + .context("Failed to add input to transaction")?; + } + + let output_features = OutputFeatures::default(); + let tx_id = TxId::new_random(); + let recipients: Vec = associated_payments + .iter() + .map(|p| -> Result { + let payment_id = match &p.payment_id { + Some(s) => MemoField::new_open_from_string(s, TxType::PaymentToOther) + .map_err(|e| anyhow!(e)) + .context("Failed to create payment ID memo")?, + None => MemoField::new_empty(), + }; + + let recipient_address = TariAddress::from_base58(&p.recipient_address) + .map_err(|e| anyhow!(e.to_string())) + .context("Invalid recipient address")?; + + Ok(PaymentRecipient { + amount: MicroMinotari(p.amount as u64), + output_features: output_features.clone(), + address: recipient_address, + payment_id, + }) + }) + .collect::, anyhow::Error>>()?; + + println!("INFO: Batch {}: Building transaction outputs...", batch_id); + + let payment_id = MemoField::new_empty(); + let result = + prepare_one_sided_transaction_for_signing(tx_id, tx_builder, &recipients, payment_id, account.address.clone()) + .context("Failed to prepare one-sided transaction")?; + + let response_text = serde_json::to_string(&result).context("Failed to serialize transaction result")?; + + PaymentBatch::update_to_awaiting_signature(conn, batch_id, &response_text) + .await + .context("Failed to update batch status to AwaitingSignature")?; + + println!( + "INFO: Batch {}: Unsigned transaction created. Status updated to 'AwaitingSignature'.", + batch_id + ); + + Ok(()) +} From 2f0fcd9e23c566b37554ebe767d6b8ae3953ed05 Mon Sep 17 00:00:00 2001 From: Martins Erts Date: Thu, 27 Nov 2025 16:03:28 +0200 Subject: [PATCH 2/3] Make confirmations configurable --- .env.sample | 1 + README.md | 2 ++ minotari_payment_processor/src/config.rs | 3 ++ minotari_payment_processor/src/main.rs | 1 + .../src/workers/confirmation_checker.rs | 32 +++++++++++++------ 5 files changed, 29 insertions(+), 10 deletions(-) diff --git a/.env.sample b/.env.sample index 3f2a0ec..f787602 100644 --- a/.env.sample +++ b/.env.sample @@ -11,6 +11,7 @@ UNSIGNED_TX_CREATOR_SLEEP_SECS="15" TRANSACTION_SIGNER_SLEEP_SECS="10" BROADCASTER_SLEEP_SECS="15" CONFIRMATION_CHECKER_SLEEP_SECS="60" +CONFIRMATION_CHECKER_REQUIRED_CONFIRMATIONS="10" TARI_NETWORK=Esmeralda ACCOUNTS__DEFAULT__NAME="default" diff --git a/README.md b/README.md index cc70ffb..1879bac 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,8 @@ Because the application uses structured configuration, hierarchical settings (li * Example: `LISTEN_IP="0.0.0.0"` * **`LISTEN_PORT`** (Optional): The port the HTTP API server will listen on. Defaults to `9145`. * Example: `LISTEN_PORT="9145"` +* **`CONFIRMATION_CHECKER_REQUIRED_CONFIRMATIONS`** (Optional): The number of confirmations required before a transaction is considered final. Defaults to `10`. + * Example: `CONFIRMATION_CHECKER_REQUIRED_CONFIRMATIONS="10"` ### Account Configuration diff --git a/minotari_payment_processor/src/config.rs b/minotari_payment_processor/src/config.rs index c2c181f..45b7cb4 100644 --- a/minotari_payment_processor/src/config.rs +++ b/minotari_payment_processor/src/config.rs @@ -38,6 +38,7 @@ pub struct PaymentProcessorEnv { pub transaction_signer_sleep_secs: Option, pub broadcaster_sleep_secs: Option, pub confirmation_checker_sleep_secs: Option, + pub confirmation_checker_required_confirmations: Option, pub accounts: HashMap, } @@ -67,6 +68,7 @@ struct RawSettings { transaction_signer_sleep_secs: Option, broadcaster_sleep_secs: Option, confirmation_checker_sleep_secs: Option, + confirmation_checker_required_confirmations: Option, #[serde(default)] accounts: HashMap, } @@ -147,6 +149,7 @@ impl TryFrom for PaymentProcessorEnv { transaction_signer_sleep_secs: raw.transaction_signer_sleep_secs, broadcaster_sleep_secs: raw.broadcaster_sleep_secs, confirmation_checker_sleep_secs: raw.confirmation_checker_sleep_secs, + confirmation_checker_required_confirmations: raw.confirmation_checker_required_confirmations, accounts, }) } diff --git a/minotari_payment_processor/src/main.rs b/minotari_payment_processor/src/main.rs index 58e9e0c..b40e2be 100644 --- a/minotari_payment_processor/src/main.rs +++ b/minotari_payment_processor/src/main.rs @@ -54,6 +54,7 @@ async fn main() -> anyhow::Result<()> { db_pool.clone(), base_node_client.clone(), env.confirmation_checker_sleep_secs, + env.confirmation_checker_required_confirmations.unwrap_or(10), )); println!("Minotari Payment Processor started. Press Ctrl+C to shut down."); diff --git a/minotari_payment_processor/src/workers/confirmation_checker.rs b/minotari_payment_processor/src/workers/confirmation_checker.rs index 08ad22c..c0554b7 100644 --- a/minotari_payment_processor/src/workers/confirmation_checker.rs +++ b/minotari_payment_processor/src/workers/confirmation_checker.rs @@ -11,26 +11,29 @@ use crate::db::payment::Payment; use crate::db::payment_batch::{PaymentBatch, PaymentBatchStatus}; const DEFAULT_SLEEP_SECS: u64 = 60; -const REQUIRED_CONFIRMATIONS: u64 = 10; -pub async fn run(db_pool: SqlitePool, base_node_client: Client, sleep_secs: Option) { +pub async fn run(db_pool: SqlitePool, base_node_client: Client, sleep_secs: Option, required_confirmations: u64) { let sleep_secs = sleep_secs.unwrap_or(DEFAULT_SLEEP_SECS); println!( "Confirmation Checker worker started. Polling every {} seconds. Required Confirmations: {}", - sleep_secs, REQUIRED_CONFIRMATIONS + sleep_secs, required_confirmations ); let mut interval = time::interval(Duration::from_secs(sleep_secs)); loop { interval.tick().await; - if let Err(e) = check_transaction_confirmations(&db_pool, &base_node_client).await { + if let Err(e) = check_transaction_confirmations(&db_pool, &base_node_client, required_confirmations).await { eprintln!("Confirmation Checker worker error: {:?}", e); } } } -async fn check_transaction_confirmations(db_pool: &SqlitePool, base_node_client: &Client) -> Result<(), anyhow::Error> { +async fn check_transaction_confirmations( + db_pool: &SqlitePool, + base_node_client: &Client, + required_confirmations: u64, +) -> Result<(), anyhow::Error> { let mut conn = db_pool.acquire().await?; let batches = PaymentBatch::find_by_status(&mut conn, PaymentBatchStatus::AwaitingConfirmation).await?; @@ -40,7 +43,7 @@ async fn check_transaction_confirmations(db_pool: &SqlitePool, base_node_client: } for batch in batches { - if let Err(e) = process_single_batch(db_pool, base_node_client, &batch).await { + if let Err(e) = process_single_batch(db_pool, base_node_client, &batch, required_confirmations).await { let error_message = e.to_string(); eprintln!( "Error checking confirmation for batch {}: {}. Incrementing retry count.", @@ -63,6 +66,7 @@ async fn process_single_batch( db_pool: &SqlitePool, base_node_client: &Client, batch: &PaymentBatch, + required_confirmations: u64, ) -> Result<(), anyhow::Error> { let batch_id = &batch.id; @@ -103,7 +107,14 @@ async fn process_single_batch( "INFO: Batch {}: Location 'Mined'. Processing confirmations...", batch_id ); - handle_mined_transaction(db_pool, base_node_client, batch_id, &tx_query_response).await? + handle_mined_transaction( + db_pool, + base_node_client, + batch_id, + &tx_query_response, + required_confirmations, + ) + .await? }, TxLocation::InMempool => { println!("INFO: Batch {} is currently in the mempool, awaiting mining.", batch_id); @@ -128,6 +139,7 @@ async fn handle_mined_transaction( base_node_client: &Client, batch_id: &str, tx_query_response: &tari_transaction_components::rpc::models::TxQueryResponse, + required_confirmations: u64, ) -> Result<(), anyhow::Error> { let mined_height = tx_query_response .mined_height @@ -147,10 +159,10 @@ async fn handle_mined_transaction( println!( "INFO: Batch {}: Mined Height: {}, Tip Height: {}, Confirmations: {}/{}", - batch_id, mined_height, best_block_height, confirmations, REQUIRED_CONFIRMATIONS + batch_id, mined_height, best_block_height, confirmations, required_confirmations ); - if confirmations >= REQUIRED_CONFIRMATIONS { + if confirmations >= required_confirmations { println!( "INFO: Batch {}: Confirmation threshold reached. Finalizing...", batch_id @@ -192,7 +204,7 @@ async fn handle_mined_transaction( } else { println!( "INFO: Batch {} awaiting more confirmations. (Current: {}, Required: {})", - batch_id, confirmations, REQUIRED_CONFIRMATIONS + batch_id, confirmations, required_confirmations ); } From 8c0a3b9e3cadcfd3f5564d0f2d3b06feffaad389 Mon Sep 17 00:00:00 2001 From: SW van Heerden Date: Fri, 28 Nov 2025 12:03:12 +0200 Subject: [PATCH 3/3] Update minotari_payment_processor/src/workers/unsigned_tx_creator.rs --- minotari_payment_processor/src/workers/unsigned_tx_creator.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/minotari_payment_processor/src/workers/unsigned_tx_creator.rs b/minotari_payment_processor/src/workers/unsigned_tx_creator.rs index 7041995..245442c 100644 --- a/minotari_payment_processor/src/workers/unsigned_tx_creator.rs +++ b/minotari_payment_processor/src/workers/unsigned_tx_creator.rs @@ -15,7 +15,6 @@ use tari_transaction_components::key_manager::{ use tari_transaction_components::offline_signing::{PaymentRecipient, prepare_one_sided_transaction_for_signing}; use tari_transaction_components::{ TransactionBuilder, - // offline_signing::offline_signer::OfflineSigner, tari_amount::MicroMinotari, transaction_components::{MemoField, OutputFeatures, WalletOutput, memo_field::TxType}, };