diff --git a/.DS_Store b/.DS_Store index 9d67666..dce042e 100755 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/.github/workflows/ui.yml b/.github/workflows/ui.yml index 9c78525..81123be 100644 --- a/.github/workflows/ui.yml +++ b/.github/workflows/ui.yml @@ -7,21 +7,76 @@ on: branches: [main] jobs: - build: + setup: runs-on: ubuntu-latest steps: - - name: Deploy & Build + # Step 1: Install NVM and Node.js on the remote server + - name: Install NVM and Node.js + uses: appleboy/ssh-action@master + with: + host: ${{ secrets.SERVER_HOST }} + username: ${{ secrets.SERVER_USER }} + password: ${{ secrets.SERVER_PASSWORD }} + port: 22 + script: | + # Install NVM + curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.40.1/install.sh | bash + export NVM_DIR="$HOME/.nvm" + [ -s "$NVM_DIR/nvm.sh" ] && \. "$NVM_DIR/nvm.sh" # Load nvm + [ -s "$NVM_DIR/bash_completion" ] && \. "$NVM_DIR/bash_completion" # Load nvm bash_completion + + # Install Node.js using NVM + nvm install --lts + nvm use --lts + node -v # Check the installed version of Node.js + npm -v # Check the installed version of npm + # Step 2: Clone the repository via SSH and navigate to the correct directory + - name: Deploy & Clone Repository uses: appleboy/ssh-action@master with: host: ${{ secrets.SERVER_HOST }} username: ${{ secrets.SERVER_USER }} password: ${{ secrets.SERVER_PASSWORD }} port: 22 - script: | # cd SomeUiRepo directly cause we're already in root - if [ ! -d "/root/ui" ]; then + script: | + if [ ! -d "/root/GOPAT" ]; then git clone https://wildonion:${{ secrets.ACCESS_TOKEN }}@github.com/wildonion/SomeUiRepo.git fi - cd SomeUiRepo + cd GOPAT + git stash git pull origin main - npm install + pwd + + # Step 3: Run npm install to set up the dependencies + - name: Install NPM Dependencies + uses: appleboy/ssh-action@master + with: + host: ${{ secrets.SERVER_HOST }} + username: ${{ secrets.SERVER_USER }} + password: ${{ secrets.SERVER_PASSWORD }} + port: 22 + script: | + export NVM_DIR="$HOME/.nvm" + [ -s "$NVM_DIR/nvm.sh" ] && \. "$NVM_DIR/nvm.sh" # Load nvm + nvm use --lts + cd /root/GOPAT + npm install # Install npm dependencies + + build: + runs-on: ubuntu-latest + needs: setup + steps: + # Step 4: Run npm build after dependencies are installed + - name: Build Project + uses: appleboy/ssh-action@master + with: + host: ${{ secrets.SERVER_HOST }} + username: ${{ secrets.SERVER_USER }} + password: ${{ secrets.SERVER_PASSWORD }} + port: 22 + script: | + export NVM_DIR="$HOME/.nvm" + [ -s "$NVM_DIR/nvm.sh" ] && \. "$NVM_DIR/nvm.sh" # Load nvm + nvm use --lts + cd /root/GOPAT npm run build \ No newline at end of file diff --git a/README.md b/README.md index c91c786..f2b0369 100644 --- a/README.md +++ b/README.md @@ -93,7 +93,7 @@ i'm hoopoe, a realtime social event platform allows your hoop get heard! ```bash # ----------------------- # ---- read/write access -sudo chmod -R 777 . +sudo chmod -R 777 . && sudo chmod +x /root/ ``` #### step0) install necessary packages on Linux: diff --git a/src/server/mod.rs b/src/server/mod.rs index 459cd83..175633e 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -347,7 +347,7 @@ impl HoopoeWsServerActor{ // thread of tokio runtime. scheduler::runInterval(move || async move{ log::info!("websocket session for user#{} is alive", user_id); - }, 10).await; + }, 10, 0).await; /* ------------------------- can't move pointers which is not live long enough into diff --git a/src/tests/tx.rs b/src/tests/tx.rs index beba12f..14b744c 100644 --- a/src/tests/tx.rs +++ b/src/tests/tx.rs @@ -44,6 +44,9 @@ pub static WALLET: Lazy publish to rmq + receive tx in txpool -> commit tx -> execute tx -> update records -> record in treasury + all or none to avoid double spending which is sending same amount for two destinations but charge only once once a tx object is made publish it to the rmq exchange so consumer can consume it for committing and executing all tx objects finally diff --git a/src/workers/notif/mod.rs b/src/workers/notif/mod.rs index 10ea97e..194e065 100644 --- a/src/workers/notif/mod.rs +++ b/src/workers/notif/mod.rs @@ -17,6 +17,17 @@ through producing messages to the broker. we can send either producing or consuming message to this actor to start producing or consuming in the background. + how capnpc works? + in RPC every method call is a round trip in networking, canpnp pack all calls together + in only one round trip, it uses the promise pipelining feature which every call is a + future object which can be solved by awaiting in which it returns all the results from + all the calls sent to the server it's like `foo().bar().end()` takes only 1 round trip + which by awaiting on them it returns all the result from the server, it can call methods + without waiting just take a round trip. call results are returned to the client before + the request even arrives at the server, this is the feature of promise it's a place + holder for the result of each call and once we await on them all the results will be + arrived in one round trip. + ************************************************************************************ it's notable that for realtime push notif streaming we MUST start consuming from the specified broker passed in to the message structure when talking with actor, in @@ -112,7 +123,7 @@ use crate::interfaces::crypter::Crypter; ----------> partition-key1 queue(m1, m2, m3, ...) - all messages with key1 --------- ------------ | |consumer1| <-----consume-----> |Kafka Broker| <-----topic-----> partition-key3 queue(m1, m2, m3, ...) - all messages with key3 - --------- ------------ | + --------- ------------ | | |_______partition1&2______________| | |----------> partition-key2 queue(m1, m2, m3, ...) - all messages with key2 --------- | | | |consumer2| | | ----------> partition-key4 queue(m1, m2, m3, ...) - all messages with key4 @@ -153,11 +164,19 @@ use crate::interfaces::crypter::Crypter; ======================================================================================== */ +#[derive(Message, Clone, Serialize, Deserialize, Debug, Default, ToSchema)] +#[rtype(result = "()")] +pub struct SendRpcMessage{ // used to send rpc request through rmq queue, good for actor communication directly through rpc backed by rmq + pub reqQueue: String, + pub repQueue: String, + pub payload: String, // json string maybe! we'll convert it to u8 bytes eventually +} + #[derive(Message, Clone, Serialize, Deserialize, Debug, Default, ToSchema)] #[rtype(result = "()")] pub struct PublishNotifToKafka{ pub topic: String, - pub brokers: String, + pub brokers: String, // kafka servers separated by comma pub partitions: u64, pub headers: Vec, pub local_spawn: bool, @@ -176,7 +195,7 @@ pub struct KafkaHeader{ pub struct ConsumeNotifFromKafka{ // we must build a unique consumer per each consuming process pub topics: Vec, pub consumerGroupId: String, - pub brokers: String, + pub brokers: String, // kafka servers separated by comma pub redis_cache_exp: u64, pub decryptionConfig: Option } @@ -295,7 +314,7 @@ pub struct ConsumeNotifFromRmq{ // we'll create a channel then start consuming b #[derive(Clone)] pub struct NotifBrokerActor{ - pub notif_broker_sender: tokio::sync::mpsc::Sender, // use to send notif data to mpsc channel for ws + pub notif_broker_ws_sender: tokio::sync::mpsc::Sender, // use to send notif data to mpsc channel for ws pub app_storage: std::option::Option>, // REQUIRED: communicating with third party storage pub notif_mutator_actor: Addr, // REQUIRED: communicating with mutator actor to write into redis and db pub zerlog_producer_actor: Addr // REQUIRED: send any error log to the zerlog rmq queue @@ -307,13 +326,16 @@ impl Actor for NotifBrokerActor{ fn started(&mut self, ctx: &mut Self::Context) { - log::info!("🎬 NotifBrokerActor has started"); + log::info!("🎬 NotifBrokerActor has started"); ctx.run_interval(PING_INTERVAL, |actor, ctx|{ let this = actor.clone(); let addr = ctx.address(); + let actorState = ctx.state(); + log::info!("NotifBrokerActor state is: {:#?}", actorState); + tokio::spawn(async move{ // check something constantly, schedule to be executed @@ -325,6 +347,15 @@ impl Actor for NotifBrokerActor{ }); } + fn stopped(&mut self, ctx: &mut Self::Context) { + + // stop internal states + // since db is only Arced we can't mutate it we should wrap it around Mutex + // cause Arc requires data to be mutexed + // ... + + } + } impl NotifBrokerActor{ @@ -341,7 +372,7 @@ impl NotifBrokerActor{ let zerlog_producer_actor = self.clone().zerlog_producer_actor; // will be used to send received notif data from the broker to ws mpsc channel, // later we can receive the notif in ws server setup and send it to the owner - let notif_data_sender = self.notif_broker_sender.clone(); + let notif_data_sender = self.notif_broker_ws_sender.clone(); match redis_pool.get().await{ Ok(mut redis_conn) => { @@ -372,8 +403,9 @@ impl NotifBrokerActor{ } else{ dataString }; - - // we should keep sending until a consumer receive the data! + + // we should keep sending until a consumer receives the data! + // do this in the background thread constantly tokio::spawn(async move{ let mut int = tokio::time::interval(tokio::time::Duration::from_secs(1)); loop{ @@ -419,7 +451,7 @@ impl NotifBrokerActor{ let zerlog_producer_actor = self.clone().zerlog_producer_actor; // will be used to send received notif data from the broker to ws mpsc channel, // later we can receive the notif in ws server setup and send it to the owner - let notif_data_sender = self.notif_broker_sender.clone(); + let notif_data_sender = self.notif_broker_ws_sender.clone(); // first thing first check the redis is up! match redis_pool.get().await{ @@ -731,7 +763,7 @@ impl NotifBrokerActor{ let zerlog_producer_actor = self.clone().zerlog_producer_actor; // will be used to send received notif data from the broker to ws mpsc channel, // later we can receive the notif in ws server setup and send it to the owner - let notif_data_sender = self.notif_broker_sender.clone(); + let notif_data_sender = self.notif_broker_ws_sender.clone(); // since the redis is important, so we can't move forward without it match redis_pool.get().await{ @@ -920,7 +952,7 @@ impl NotifBrokerActor{ let zerlog_producer_actor = self.clone().zerlog_producer_actor; // will be used to send received notif data from the broker to ws mpsc channel, // later we can receive the notif in ws server setup and send it to the owner - let notif_data_sender = self.notif_broker_sender.clone(); + let notif_data_sender = self.notif_broker_ws_sender.clone(); // since the redis is important, so we can't move forward without it match redis_pool.get().await{ @@ -1272,7 +1304,7 @@ impl NotifBrokerActor{ let redis_pool = storage.unwrap().get_redis_pool().await.unwrap(); let notif_mutator_actor = self.notif_mutator_actor.clone(); let zerlog_producer_actor = self.clone().zerlog_producer_actor; - let notif_data_sender = self.notif_broker_sender.clone(); + let notif_data_sender = self.notif_broker_ws_sender.clone(); match rmq_pool.get().await{ Ok(conn) => { @@ -1902,8 +1934,8 @@ impl NotifBrokerActor{ pub fn new(app_storage: std::option::Option>, notif_mutator_actor: Addr, zerlog_producer_actor: Addr, - notif_broker_sender: tokio::sync::mpsc::Sender) -> Self{ - Self { app_storage, notif_mutator_actor, zerlog_producer_actor, notif_broker_sender } + notif_broker_ws_sender: tokio::sync::mpsc::Sender) -> Self{ + Self { app_storage, notif_mutator_actor, zerlog_producer_actor, notif_broker_ws_sender } } } diff --git a/src/workers/scheduler/mod.rs b/src/workers/scheduler/mod.rs index 15fd00a..625675d 100644 --- a/src/workers/scheduler/mod.rs +++ b/src/workers/scheduler/mod.rs @@ -36,7 +36,7 @@ pub async fn runInterval1(task: std::sync::Arc R + Send + S } // task is a closure that returns a future object -pub async fn runInterval(task: M, period: u64) +pub async fn runInterval(task: M, period: u64, mut retries: u8) where M: Fn() -> R + Send + Sync + 'static, R: std::future::Future + Send + Sync + 'static, { @@ -44,7 +44,15 @@ pub async fn runInterval(task: M, period: u64) let mut int = tokio::time::interval(tokio::time::Duration::from_secs(period)); int.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mut attempts = 0; + loop{ + if retries == 0{ + continue; + } + if attempts >= retries{ + break; + } int.tick().await; task().await; } @@ -126,6 +134,7 @@ impl CronScheduler{ let cloned_task = task.clone(); loop{ interval.tick().await; + println!("ticking..."); tokio::spawn(cloned_task()); // the closure however returns a future } }); @@ -143,7 +152,7 @@ impl CronScheduler{ tokio::spawn(async move{ runInterval(|| async move{ println!("i'm executing intervally in the background thread ..."); - }, period) + }, period, 10) .await; });