From da3aa73c8f1ceb28deaca8a8dc17496717245b06 Mon Sep 17 00:00:00 2001 From: obaraelijah Date: Sun, 28 Jul 2024 17:07:00 +0300 Subject: [PATCH] feat: Add scripting support --- scripts/semaphore.x9 | 15 ++++++++ src/scripting.rs | 86 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 100 insertions(+), 1 deletion(-) create mode 100644 scripts/semaphore.x9 diff --git a/scripts/semaphore.x9 b/scripts/semaphore.x9 new file mode 100644 index 0000000..9b90791 --- /dev/null +++ b/scripts/semaphore.x9 @@ -0,0 +1,15 @@ +(def-redis-fn sum-list + (li) + (apply + (map int (redis "lrange" li 0 -1)))) + +(defn any? + (p l) + (reduce (fn (a b) (or a b)) (map p l))) + +(def-redis-fn is-prime? + (key) + (bind + (n (int (redis "get" key))) + (not + (any? (fn (x) (= 0 (% n x))) + (range 2 n))))) \ No newline at end of file diff --git a/src/scripting.rs b/src/scripting.rs index 247c288..44be60c 100644 --- a/src/scripting.rs +++ b/src/scripting.rs @@ -1,12 +1,19 @@ use crate::server::process_command; -use crate::types::RedisValueRef; use num_traits::cast::ToPrimitive; use std::{error::Error, sync::Arc}; use tokio::sync::mpsc::{Receiver, Sender}; +use crate::startup::Config; +use crate::types::DumpFile; +use crate::types::RedisValueRef; +use crate::{logger::LOGGER, types::StateStoreRef}; use x9::ast::Expr; use x9::ffi::{ForeignData, IntoX9Function, Variadic, X9Interpreter}; +fn bytes_to_string(s: &[u8]) -> String { + String::from_utf8_lossy(s).to_string() +} + struct FFIError { reason: String, } @@ -78,3 +85,80 @@ impl ForeignData for RedisValueRef { Ok(res) } } +#[derive(Debug)] +pub enum Program { + String(String), + Function(String, Vec), +} + +pub struct ScriptingEngine { + interpreter: X9Interpreter, + #[allow(clippy::type_complexity)] + prog_revc: Receiver<( + Program, + OneShotSender>>, + )>, + // prog_send: Sender>>, + cmd_send: Arc, OneShotSender)>>, +} + +impl ScriptingEngine { + pub fn new( + prog_revc: Receiver<( + Program, + OneShotSender>>, + )>, + cmd_send: Sender<(Vec, OneShotSender)>, + state_store: StateStoreRef, + opts: &Config, + ) -> Result> { + let res = Self { + interpreter: X9Interpreter::new(), + prog_revc, + cmd_send: Arc::new(cmd_send), + }; + res.setup_interpreter(state_store); + res.load_scripts_dir(opts)?; + Ok(res) + } + + fn load_scripts_dir(&self, opts: &Config) -> Result<(), Box> { + if let Some(path) = &opts.scripts_dir { + info!(LOGGER, "Loading scripts in {:?}", path); + self.interpreter.load_lib_dir(path) + } else { + Ok(()) + } + } + + fn add_redis_fn(&self) { + let send_clone = self.cmd_send.clone(); + let send_fn = move |args: Variadic| { + let args = args.into_vec(); + let (sx, mut rx) = oneshot_channel(); + if let Err(e) = send_clone.blocking_send((args, sx)) { + return Err(FFIError::boxed(format!( + "redis-proto failed to send the command: {}", + e + ))); + } + loop { + match rx.try_recv() { + Ok(ret_value) => return Ok(ret_value), + Err(TryRecvError::Empty) => continue, + Err(TryRecvError::Closed) => { + return Err(FFIError::boxed( + "redix-proto failed to return a value!".into(), + )) + } + } + } + }; + self.interpreter.add_function("redis", send_fn.to_x9_fn()); + } + + fn setup_interpreter(&self, state_store: StateStoreRef) { + // "redis" + self.add_redis_fn(); + } +}