Skip to content

Commit

Permalink
add cron job
Browse files Browse the repository at this point in the history
  • Loading branch information
Alw3ys committed Dec 13, 2023
1 parent 9dbf5bd commit 0968ab7
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 12 deletions.
13 changes: 9 additions & 4 deletions dosei/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,24 @@ mod cron;
use sqlx::postgres::Postgres;
use sqlx::Pool;
use std::env;
use std::sync::Arc;

use crate::config::Config;
use axum::{routing, Router};
use axum::{routing, Extension, Router};
use log::info;

pub async fn start_server(config: &'static Config) {
let pool = Pool::<Postgres>::connect(&env::var("DATABASE_URL").unwrap())
.await
.unwrap();
cluster::start_node(config);
cron::start_job_manager(config, &pool);
let shared_pool = Arc::new(pool);
info!("Successfully connected to Postgres");
let app = Router::new().route("/", routing::get(move || cron::get_cron_jobs(pool.clone())));
cluster::start_node(config);
cron::start_job_manager(config, Arc::clone(&shared_pool));
let app = Router::new()
.route("/cron-jobs", routing::post(cron::api_create_job))
.route("/cron-jobs", routing::get(cron::api_get_cron_jobs))
.layer(Extension(Arc::clone(&shared_pool)));
let address = config.address.to_string();
info!("Dosei running on http://{} (Press CTRL+C to quit", address);
axum::Server::bind(&address.parse().unwrap())
Expand Down
60 changes: 52 additions & 8 deletions dosei/src/server/cron.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
use crate::config;
use crate::config::Config;
use crate::schema::CronJob;
use axum::Json;
use axum::{Extension, Json};
use bollard::container::CreateContainerOptions;
use bollard::Docker;
use chrono::Utc;
use cron::Schedule;
use dosei_proto::{node_info, ProtoChannel};
use log::info;
use prost::Message;
use serde::Deserialize;
use sqlx::{Pool, Postgres};
use std::error::Error;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use tokio::time::sleep;
use uuid::Uuid;

async fn update_status(config: &'static Config) -> Result<(), Box<dyn Error>> {
let node_info = node_info::NodeInfo {
Expand All @@ -40,9 +43,52 @@ async fn update_status(config: &'static Config) -> Result<(), Box<dyn Error>> {
Ok(())
}

pub async fn get_cron_jobs(pool: Pool<Postgres>) -> Json<Vec<CronJob>> {
#[derive(Deserialize)]
pub struct CreateJobBody {
schedule: String,
entrypoint: String,
deployment_id: Uuid,
owner_id: Uuid,
}

pub async fn api_create_job(
pool: Extension<Arc<Pool<Postgres>>>,
Json(body): Json<CreateJobBody>,
) -> Json<CronJob> {
let cron_job = CronJob {
uuid: Uuid::new_v4(),
schedule: body.schedule,
entrypoint: body.entrypoint,
owner_id: body.owner_id,
deployment_id: body.deployment_id,
updated_at: Default::default(),
created_at: Default::default(),
};
let rec = sqlx::query_as!(
CronJob,
r#"
INSERT INTO cron_jobs (uuid, schedule, entrypoint, owner_id, deployment_id, updated_at, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING *
"#,
cron_job.uuid,
cron_job.schedule,
cron_job.entrypoint,
cron_job.owner_id,
cron_job.deployment_id,
cron_job.updated_at,
cron_job.created_at
).fetch_one(&**pool).await.unwrap();
Json(rec)
}

pub async fn api_get_cron_jobs(pool: Extension<Arc<Pool<Postgres>>>) -> Json<Vec<CronJob>> {
get_cron_jobs(pool.0).await
}

async fn get_cron_jobs(pool: Arc<Pool<Postgres>>) -> Json<Vec<CronJob>> {
let recs = sqlx::query_as!(CronJob, "SELECT * from cron_jobs")
.fetch_all(&pool)
.fetch_all(&*pool)
.await
.unwrap();
Json(recs)
Expand Down Expand Up @@ -71,7 +117,7 @@ async fn run_job(cron_job: CronJob) {
.unwrap();
}

async fn run_jobs(pool: Pool<Postgres>) {
async fn run_jobs(pool: Arc<Pool<Postgres>>) {
let cron_jobs = get_cron_jobs(pool).await;
let now = Utc::now();
for job in cron_jobs.0 {
Expand All @@ -93,7 +139,7 @@ async fn run_jobs(pool: Pool<Postgres>) {
}
}

pub fn start_job_manager(config: &'static Config, pool: &Pool<Postgres>) {
pub fn start_job_manager(config: &'static Config, pool: Arc<Pool<Postgres>>) {
tokio::spawn(async move {
loop {
sleep(Duration::from_secs(1)).await;
Expand All @@ -102,11 +148,9 @@ pub fn start_job_manager(config: &'static Config, pool: &Pool<Postgres>) {
}
}
});
let pool = pool.clone();
tokio::spawn(async move {
loop {
let pool = pool.clone();
run_jobs(pool).await;
run_jobs(Arc::clone(&pool)).await;
sleep(Duration::from_secs(60)).await;
}
});
Expand Down
Binary file added plugins/example/__pycache__/__init__.cpython-311.pyc
Binary file not shown.
Binary file added plugins/example/__pycache__/main.cpython-311.pyc
Binary file not shown.

0 comments on commit 0968ab7

Please sign in to comment.