From 0ac07dd5a10cd547adb8d0a9579102ed6b86a1e2 Mon Sep 17 00:00:00 2001 From: Alex Williams Date: Mon, 22 Jun 2020 01:10:18 +0000 Subject: [PATCH] Add persistence and some new commands: * Stores commands in an AOF using WAL method * Saves and restores a DB from disk * Replays the AOF after restoring the DB * Add 'SAVE' and 'BGSAVE' commands * Add 'CLIENT' command * Add 'PING' command * Add 'EXISTS' command * Add 'GETSET' command * Add 'CONVERT' command * Remove 'LOLWUT' command * Run BGSAVE as a scheduled task * Add statistics for 'persistence' * Add option to save the DB in binary format * Add and fix unit tests * Add timestamp to output and cleanup error messages etc * Don't require IDENT to be sent by the client during auth * Fix problem where messages were lost, abort a client connection that takes too long (60s) * Major optimization, RPUSH with multiple elements is O(N) instead of O(2^N) * Other minor fixes and code cleanup --- .gitignore | 5 ++ CHANGELOG.md | 24 +++++++ README.md | 185 +++++++++++++++++++++++++++++++++++++++++-------- child.l | 54 +++++++++++++++ client.l | 41 ++++++----- commands.l | 106 ++++++++++++++++++++++------ libkv.l | 176 ++++++++-------------------------------------- libkvclient.l | 58 +++++++--------- lolwut.l | 33 --------- persistence.l | 177 ++++++++++++++++++++++++++++++++++++++++++++++ server.l | 18 +++-- sibling.l | 101 +++++++++++++++++++++++++++ test.l | 27 ++++++-- test/test_cs.l | 13 ++-- test/test_kv.l | 51 ++++++++++++-- 15 files changed, 768 insertions(+), 301 deletions(-) create mode 100644 child.l delete mode 100644 lolwut.l create mode 100644 persistence.l create mode 100644 sibling.l diff --git a/.gitignore b/.gitignore index 03ad577..e14d59f 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,6 @@ .modules/ +*.aof +*.db +*.lock +*.old +*.bin diff --git a/CHANGELOG.md b/CHANGELOG.md index b4c8fa8..7c10b4d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,29 @@ # Changelog +## 0.14.0 (2020-07-27) + + * Fix bug where client IP is not saved in client list + * Fix bug where client would not disconnect when sent a `kill` command + * Add persistence which writes commands in an AOF using WAL method + * Saves and restores a DB from disk + * Replays the AOF after restoring the DB + * Add 'SAVE' and 'BGSAVE' commands + * Add 'CLIENT' command + * Add 'PING' command + * Add 'EXISTS' command + * Add 'GETSET' command + * Add 'CONVERT' command + * Remove 'LOLWUT' command + * Run BGSAVE as a scheduled task + * Add statistics for 'persistence' + * Add option to save the DB in binary format + * Add and fix unit tests + * Add timestamp to output and cleanup error messages etc + * Don't require IDENT to be sent by the client during auth + * Fix problem where messages were lost, abort a client connection that takes too long (60s) + * Major optimization, RPUSH with multiple elements is O(N) instead of O(2^N) + * Other minor fixes and code cleanup + ## 0.13.0 (2020-06-27) * Fix issue where child process doesn't actually 'exit' when exiting (in a fork) diff --git a/README.md b/README.md index 4004b61..ead8be0 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Redis-inspired key/value store written in PicoLisp -This program mimics functionality of a [Redis](https://redis.io) in-memory database, but is designed specifically for [PicoLisp](https://picolisp.com) applications without on-disk persistence. +This program mimics functionality of a [Redis](https://redis.io) in-memory database, but is designed specifically for [PicoLisp](https://picolisp.com) applications with optional on-disk persistence. The included `server.l` and `client.l` can be used to send and receive _"Redis-like"_ commands over TCP or UNIX named pipess. @@ -11,14 +11,15 @@ The included `server.l` and `client.l` can be used to send and receive _"Redis-l 3. [Usage](#usage) 4. [Note and Limitations](#notes-and-limitations) 5. [How it works](#how-it-works) - 6. [Testing](#testing) - 7. [Contributing](#contributing) - 8. [Changelog](#changelog) - 9. [License](#license) + 6. [Persistence](#persistence) + 7. [Testing](#testing) + 8. [Contributing](#contributing) + 9. [Changelog](#changelog) + 10. [License](#license) # Requirements - * PicoLisp 32-bit/64-bit `v17.12` to `v20.5.26` + * PicoLisp 32-bit/64-bit `v17.12` to `v20.6.29` * Linux or UNIX-like OS (with support for named pipes) # Getting Started @@ -47,8 +48,7 @@ That should return some interesting info about your server. See below for more e 1. Load the client library in your project: `(load "libkvclient.l")` 2. Set the server password `(setq *KV_pass "yourpass")` 3. Start the client listener with `(kv-start-client)` - 5. Optionally send your client's identity with key/value pairs `(kv-identify "location" "Tokyo" "building" "109")` - 6. Send your command and arguments with `(kv-send-data '("INFO" "server"))` + 4. Send your command and arguments with `(kv-send-data '("INFO" "server"))` Received data will be returned as-is (list, integer, string, etc). Wrap the result like: `(kv-print Result)` to send the output to `STDOUT`: @@ -59,8 +59,6 @@ Received data will be returned as-is (list, integer, string, etc). Wrap the resu -> "yourpass" : (kv-start-client) -> T -: (kv-identify "key1" "value2" "key2" "value3") --> "OK 35F2F81D" : (kv-send-data '("set" "mykey" 12345)) -> "OK" : (kv-send-data '("get" "mykey")) @@ -81,18 +79,20 @@ This section describes usage information for the CLI tools `server.l` and `clien ## Server -The server listens in the foreground for TCP connections on port `6378` by default. Only the `password`, `port`, and `verbosity` are configurable, and a `password` is required: +The server listens in the foreground for TCP connections on port `6378` by default. Only the `password`, `port`, `persistence`, and `verbosity` are configurable, and a `password` is required: ``` # server.l Usage: ./server.l --pass [options] -Example: ./server.l --pass foobared --port 6378 --verbose' +Example: ./server.l --pass foobared --port 6378 --verbose --persist 60 Options: --help Show this help message and exit +--binary Store data in binary format instead of text (default: plaintext) --pass Password used by clients to access the server (required) +--persist Number of seconds between database persists to disk (default: disabled) --port TCP listen port for communication with clients (default: 6378) --verbose Verbose flag (default: False) ``` @@ -122,12 +122,12 @@ The client handles authentication, identification, and sending of _"Redis-like"_ # client.l Usage: ./client.l --pass COMMAND [arguments] -Example: ./client.l --pass foobared --port 6378 INFO server' +Example: ./client.l --pass foobared --port 6378 INFO server Options: --help Show this help message and exit ---id Uniquely identifiable client ID (default: randomly generated) +--name Easily identifiable client name (default: randomly generated) --host Hostname or IP of the key/value server (default: localhost) --pass Password used to access the server (required) --poll Number of seconds for polling the key/value server (default: don't poll) @@ -136,23 +136,30 @@ Options: COMMAND LIST Commands are case-insensitive and don't always require arguments. Examples: - DEL key [key ..] DEL key1 key2 key3 - GET key GET key1 - INFO [section] INFO memory - LINDEX key index LINDEX mylist 0 - LLEN key LLEN mylist - LOLWUT number LOLWUT 5 - LPOP key LPOP mylist - LPOPRPUSH source destination LPOPRPUSH mylist myotherlist - RPUSH key element [element ..] RPUSH mylist task1 task2 task3 - SET key value SET mykey hello +BGSAVE BGSAVE +CLIENT ID|KILL|LIST id [id ..] CLIENT LIST +CONVERT CONVERT +DEL key [key ..] DEL key1 key2 key3 +EXISTS key [key ..] EXISTS key1 key2 key3 +GET key GET key1 +GETSET key value GETSET mykey hello +INFO [section] INFO memory +LINDEX key index LINDEX mylist 0 +LLEN key LLEN mylist +LPOP key LPOP mylist +LPOPRPUSH source destination LPOPRPUSH mylist myotherlist +PING [message] PING hello +RPUSH key element [element ..] RPUSH mylist task1 task2 task3 +SAVE SAVE +SET key value SET mykey hello ``` -The `COMMANDS` take the exact same arguments as their respective [Redis commands](https://redis.io/commands). +Most `COMMANDS` take the exact same arguments as their respective [Redis commands](https://redis.io/commands). ### Examples ``` +# Obtain information about the server ./client.l --pass yourpass INFO server OK 37D13779 @@ -166,14 +173,27 @@ uptime_in_seconds:1 uptime_in_days:0 executable:/usr/bin/picolisp +# Set a key ./client.l --pass yourpass SET mykey myvalue OK 53E02FC6 OK +# Get a key ./client.l --pass yourpass GET mykey OK 40E83305 myvalue +# Get a key, then set it +./client.l --pass yourpass GETSET mykey yourvalue +OK 69E88646 +myvalue + +# Check if a key exists +./client.l --pass yourpass EXISTS mykey +OK 43BFA2C +1 + +# Delete a key ./client.l --pass yourpass DEL mykey OK 4C2B6088 1 @@ -182,7 +202,12 @@ OK 4C2B6088 OK 11242B95 no data -./client.l --pass yourpass --id 11242B95 RPUSH mylist task1 task2 task3 +./client.l --pass yourpass EXISTS mykey +OK 5F1E8D78 +0 + +# Add multiple values to a key (a list) +./client.l --pass yourpass --name 11242B95 RPUSH mylist task1 task2 task3 OK 11242B95 3 @@ -190,25 +215,50 @@ OK 11242B95 OK 4E7E0FC3 5 +# Left pop a value from the head of a list ./client.l --pass yourpass LPOP mylist OK 258514BF task1 +# Check how many values are in a key (a list) ./client.l --pass yourpass LLEN mylist OK 107CF205 4 +# Left pop a value from the head of a list, push it to the tail of another list ./client.l --pass yourpass LPOPRPUSH mylist mynewlist OK 46028880 task2 +# Get the value of a key (a list) using a zero-based index ./client.l --pass yourpass LINDEX mynewlist -1 OK 129AE0F8 task2 -./client.l --pass yourpass LOLWUT 1 -OK 71FE650B -█▆▆▄▁▂▄▂▅▄▂█▁▄▂▅▃▆▂█▃▅▃▄▆▄▇█▇▇▃▃█▅▃▇▄▄▃▇▄▃▇▂▂▄▆▃██▂▄▄█▆▃▅▅▃▅▂▃▄▃▇▇▇▄▄▄▄▂▄▆▁▂▅▁▄▆ +# Ping the server +./client.l --pass yourpass PING +OK 6DCE69EB +PONG + +# Ping the server with a custom message +./client.l --pass yourpass PING "Hello" +OK 6F02D9DC +Hello + +# Save the database in the foreground (blocking) +./client.l --pass yourpass SAVE +OK 1F60EABE +OK + +# Save the database in the background (non-blocking) +./client.l --pass yourpass BGSAVE +OK 1270937D +Background saving started + +# Convert the database from plaintext to binary, or binary to plaintext +./client.l --pass yourpass CONVERT +OK 25E3B970 +OK ``` # Notes and limitations @@ -232,7 +282,7 @@ This section will explain some important technical details about the code, and l * Since PicoLisp is not _event-based_, each new TCP connection spawns a new process, which limits concurrency to the host's available resources. * Not all [Redis commands](https://redis.io/commands) are implemented, because I didn't have an immediate need for them. There are plans to slowly add new commands as the need arises. * Using the `client.l` on the command-line, all values are stored as strings. Please use the TCP socket or named pipe directly to store integers and lists. - * Unlike _Redis_, there is no on-disk persistence and **all keys will be lost** when the server is restarted. This library was originally designed to be used as a temporary FIFO queue, with no need to persist the data. Support for persistence can be added eventually, and I'm open to pull-requests. + * ~~Unlike _Redis_, there is no on-disk persistence and **all keys will be lost** when the server is restarted. This library was originally designed to be used as a temporary FIFO queue, with no need to persist the data. Support for persistence can be added eventually, and I'm open to pull-requests.~~ Support for persistence has been added, see [Persistence](#persistence) below. # How it works @@ -272,6 +322,81 @@ The forked child processes will each create their own named pipe, called `pipe_c The idea is to have the **sibling** be the holder of all the **keys**. Every _"Redis-like"_ command will have their data and statistics stored in the memory of the **sibling** process, and the **sibling** will handle receiving and sending its memory contents (keys/values) through named pipes to the respective **child** processes. +# Persistence + +Similar to [Redis](https://redis.io/topics/persistence), this database implements "snapshotting" (full memory dump to disk) and "AOF" (append-only log file), however both features are tightly coupled, which makes for a much better experience. + +* Persistence is disabled by default, but can be enabled with the `--persist N` parameter, where `N` is the number of seconds between each `BGSAVE` (background save to disk). +* The database is stored in plaintext by default, but can be stored in binary with the `--binary` parameter. Binary format (PLIO) loads and saves _much_ quicker than plaintext, but it becomes difficult to debug a corrupt entry. +* The AOF follows the _WAL_ approach, where each write command is first written to the AOF on disk, and then processed in the key/value memory store. +* The AOF only stores log entries since the previous `SAVE` or `BGSAVE`, so it technically shouldn't grow too large or unmanageable. +* The database snapshot on disk is the most complete and important data, and should be backed up regularly. +* _fsync_ is not managed by the database, so the server admin must ensure AOF log writes are actually persisted to disk. +* The AOF on-disk format is **always plaintext**, to allow easy debugging and repair of a corrupt entry. +* The AOF is opened for writing when the server is started, and closed only when the server is stopped (similar to web server log files). This lowers overhead of appending to the log, but requires care to avoid altering it while the server is running. +* The `SAVE` and `BGSAVE` commands can still be sent even if persistence is disabled. This will dump the in-memory data to disk as if persistence was enabled. + +## How persistence is implemented + +Here we'll assume persistence was previously enabled and data has already been written and saved to disk. + +1. On server start, some memory is pre-allocated according to the DB's file size. +2. The DB is then fully restored to memory +3. If the AOF contains some entries, it is fully replayed to memory +4. The DB is saved once more to disk and the AOF gets wiped +5. A timer is started to perform periodic background DB saves +6. The AOF is opened for writes, and every new client connection sends the command to the AOF +7. When a `BGSAVE` (non-blocking) command is received, a temporay copy of the AOF is made, the current AOF is wiped, and a background process is forked to save the DB to disk +8. When a `SAVE` (blocking) command is received, a the in-memory DB is saved to disk and the AOF is wiped. +9. A backup of the DB file is always made before overwriting the current DB file. +10. To help handle concurrency and persistence, temporary files are named `.kv.db.lock`, `.kv.db.tmp`, `.kv.aof.lock`, and `.kv.aof.tmp`. It's best not to modify or delete those files while the server is running. They can be safely removed while the server is stopped. + +## AOF format + +The AOF is stored by default in the `kv.aof` file as defined by `*KV_aof`. + +Here are two separate entries in a typical AOF: + +``` +("1596099036.281142829" 54042 ("RPUSH" "mytestlist" ("four" "five" "six"))) +("1596099059.683596840" 57240 ("RPUSH" "yourtestlist" ("seven" "eight" "nine"))) +``` + +Each line is a PicoLisp list with only 3 columns: + +* Column 1: `String` Unix timestamp with nanoseconds for when the entry was created +* Column 2: `Integer` Non-cryptographically secure hash (CRC) of the command and its arguments +* Column 3: `List` Command name, first argument, and subsequent arguments + +When replaying the AOF, the server will ensure the hash of command and arguments match, to guarantee the data is intact. Replaying an AOF can be slow, depending on the number of keys/values. + +> **Note:** Manually modifying the AOF will require recomputing and replacing the hash with the result from `(kv-hash)` or PicoLisp `(hash)`. + +``` +(hash '("RPUSH" "mytestlist" ("four" "five" "zero"))) +-> 61453 +``` + +## DB format + +The DB is stored by default in the `kv.db` file as defined by `*KV_db`. When backed up, it is named `.kv.db.old`. + +Here are two separate entries in a typical DB: + +``` +("smalldata" ("test1" "test2" "test3" "test4" "test5" "test6")) +("fooh_1000" "test data 1000") +``` + +Each line is a PicoLisp list with the key in the `(car)`, and values in the `(cadr)`. They are quickly replayed and stored in memory with a simple `(set)` command. + +## Differences from Redis + +* Unlike _Redis_, persistence only allows specifying a time interval between each `BGSAVE`. Since the AOF is **always enabled**, it's not necessary to "save after N changes", so the config is much simpler. +* Log rewriting is not something that "must be done", because chances are the AOF will never grow too large. Of course that depends on the number of changes occurring between each `BGSAVE`, but even then the AOF is wiped when a `BGSAVE` is initiated (and restored/rewritten if the DB happened to be locked). +* The DB snapshot is used to reconstruct the dataset in memory, not the AOF. The AOF is only used to replay the commands since the last DB save, which is much faster and more efficient, particularly when using `--binary`. +* There is no danger of _losing data_ when switching from `RDB` to `AOF`, because such a concept doesn't even exist. + # Testing This library comes with a large suite of [unit and integration tests](https://github.com/aw/picolisp-unit). To run the tests, type: diff --git a/child.l b/child.l new file mode 100644 index 0000000..c19a31b --- /dev/null +++ b/child.l @@ -0,0 +1,54 @@ +# Perform some tasks when the child exits +[de kv-child-exit () + (kv-output "[child]=" *Pid " exiting") + (kv-out-sibling "done") + (when (info *Pipe_child) (call 'rm "-f" *Pipe_child)) ] + +# Receive a message from a sibling over a named pipe and send it to the client +[de kv-listen-child () + (in *Pipe_child + (when (rd) (kv-out-client "message" (cdr @) ] + +# Send a message to the sibling over a named pipe +[de kv-out-sibling (Type . @) + (wait 1) # required or messages get lost + (out *Pipe_sibling + (pr (list Type *Pid (car (rest) ] + +# Receive a message from the client over the TCP socket +[de kv-listen-sock () + (in *Sock + (when (rd) + (let Msg @ + (kv-output "[msg] from client: (pid: " *Pid ") " *Adr " " (sym Msg)) + (kv-out-sibling "message" Msg) + (kv-listen-child) ] + +# Send a message to the client over a TCP socket +[de kv-out-client (Type Msg) + (out *Sock (pr (cons Type Msg) ] + +# non cryptographically secure hash, can be changed in the future +[de kv-hash (String) + (hash String) ] + +# Authenticate the client via handshake, and authorizate with a hashed password +[de kv-auth (Auth) + (and + (lst? Auth) + (= "AUTH" (car Auth)) + (= (kv-hash *KV_pass) (caddr Auth)) + (kv-out-client "AUTH" (kv-hash (pack (cadr Auth) *KV_uuid))) + (kv-out-sibling "message" (list "IDENT" (cons "name" (cadr Auth)) (cons "addr" *Adr) (cons "fd" *Sock))) + (kv-listen-child) ] + +# Receive the initial auth in a child process from the client over a TCP socket +[de kv-child () + (kv-output "[child]=" *Pid " [parent]=" *PPid) + (kv-mkfifo "child") + + (in *Sock + (if (kv-auth (rd)) + (kv-listen-sock) + (kv-out-sibling "error" "NOAUTH") # auth NOT OK, tell the sibling + (kv-out-client "error" "NOAUTH") ] # auth NOT OK, tell the client diff --git a/client.l b/client.l index e7d6199..9320283 100755 --- a/client.l +++ b/client.l @@ -7,10 +7,10 @@ [de APP_HELP ("usage" "./client.l --pass COMMAND [arguments]") - ("example" "./client.l --pass foobared --port 6378 INFO server'^J") + ("example" "./client.l --pass foobared --port 6378 INFO server^J") ("options" ("--help" "Show this help message and exit") () - ("--id " "Uniquely identifiable client ID (default: randomly generated)") + ("--name " "Easily identifiable client name (default: randomly generated)") ("--host " "Hostname or IP of the key/value server (default: localhost)") ("--pass " "Password used to access the server (required)") ("--poll " "Number of seconds for polling the key/value server (default: don't poll)") @@ -18,15 +18,21 @@ () ("COMMAND LIST" "Commands are case-insensitive and don't always require arguments.^J^I^I^I^I^IExamples:") () + (" BGSAVE" "^I^IBGSAVE") + (" CLIENT ID|KILL|LIST id [id ..]" "^ICLIENT LIST") + (" CONVERT" "^I^ICONVERT") (" DEL key [key ..]" "^I^IDEL key1 key2 key3") + (" EXISTS key [key ..]" "^I^IEXISTS key1 key2 key3") (" GET key" "^I^IGET key1") + (" GETSET key value" "^I^IGETSET mykey hello") (" INFO [section]" "^I^IINFO memory") (" LINDEX key index" "^I^ILINDEX mylist 0") (" LLEN key" "^I^ILLEN mylist") - (" LOLWUT number" "^I^ILOLWUT 5") (" LPOP key" "^I^ILPOP mylist") (" LPOPRPUSH source destination" "^ILPOPRPUSH mylist myotherlist") + (" PING [message]" "^I^IPING hello") (" RPUSH key element [element ..]" "^IRPUSH mylist task1 task2 task3") + (" SAVE" "^I^ISAVE") (" SET key value" "^I^ISET mykey hello") ] (chdir (car (file)) (load "libkvclient.l" "clihelpers.l")) @@ -36,22 +42,25 @@ (kv-show-help) (while (opt) (case @ - (--host (setq *KV_host (opt))) # default 'localhost' - (--port (setq *KV_port (opt))) # default '6378' - (--id (setq *KV_clientid (opt)))# default '' - (--poll (setq *KV_poll (opt))) # enable polling of command - (--pass (setq *KV_pass (opt))) # required - (T (queue '*Cmdargs @)) ) ) # save remaining cmdline arguments + (--help (kv-show-help) (bye 1)) + (--host (setq *KV_host (opt))) # default 'localhost' + (--port (setq *KV_port (opt))) # default '6378' + (--name (setq *KV_clientid (opt))) # default '' + (--poll (setq *KV_poll (opt))) # enable polling of command + (--pass (setq *KV_pass (opt))) # required password + (T (queue '*Cmdargs @)) ) ) # save remaining cmdline arguments - (finally (unless (=T @) (bye 1)) + (finally + (unless (=T @) (bye 1)) (catch 'kv-error (when (kv-start-client) - (setq *KV_server_ok (kv-identify)) - (kv-print *KV_server_ok) + (kv-print @) (if *KV_poll - (loop # loop indefinitely - (kv-print (kv-send-data *Cmdargs)) # send the data to the server (loop) - (wait (* 1000 (format *KV_poll))) ) # wait N seconds before looping again - (kv-print (kv-send-data *Cmdargs)) ] # send the data to the server once + (loop + (NIL (when (kv-send-data *Cmdargs) + (kv-print @) + (wait (* 1000 (format *KV_poll))) + T ) ) ) + (kv-print (kv-send-data *Cmdargs)) ] (bye) diff --git a/commands.l b/commands.l index 626a3d8..a97e58f 100644 --- a/commands.l +++ b/commands.l @@ -9,38 +9,77 @@ [de kv-process (Child Request) (let Key (when (cadr Request) (kv-name (cadr Request))) (case (uppc (car Request)) + ["BGSAVE" (kv-bgsave-db *Aof_desc) ] + ["CLIENT" (kv-cmd-client Child (cdr Request) ] + ["CONVERT" (kv-cmd-convert) ] ["DEL" (kv-cmd-del (cadr Request) (; Request 3) ] + ["EXISTS" (kv-cmd-exists (cadr Request) (; Request 3) ] ["GET" (kv-cmd-get (cadr Request) ] + ["GETSET" (kv-cmd-getset (cadr Request) (; Request 3) ] ["IDENT" (kv-cmd-ident Child (cdr Request) ] ["INFO" (kv-cmd-info (cadr Request) ] ["LINDEX" (kv-cmd-lindex Key (; Request 3) ] ["LLEN" (kv-cmd-llen Key) ] - ["LOLWUT" (kv-cmd-lolwut (cadr Request) ] ["LPOP" (kv-cmd-lpop Key) ] ["LPOPRPUSH" (kv-cmd-lpoprpush Key (; Request 3) ] + ["PING" (kv-cmd-ping (cadr Request) ] ["RPUSH" (kv-cmd-rpush (cadr Request) Key (; Request 3) ] + ["SAVE" (kv-save-db) ] ["SET" (kv-cmd-set (cadr Request) (; Request 3) ] [T "Error: Unknown command" ] ] # COMMANDS +[de kv-cmd-flushall () + (mapcar '((N) (off (kv-name N))) Keys) ] + +[de kv-cmd-client (Child Cmd) + (case (uppc (car Cmd)) + ("ID" (kv-cmd-client-id Child)) + ("KILL" (kv-cmd-client-kill (; Cmd 2) (; Cmd 3))) + ("LIST" (kv-cmd-client-list)) + (T "Error: Unknown client command") ] + +[de kv-cmd-client-id (Child) + (cdr (assoc "id" (cadr (assoc Child (get (kv-value "%stats%/connected_clients") ] + +[de kv-cmd-client-kill (Filter Arg) + (case (uppc Filter) + ["ID" (length (make (mapcar '((N) (when (= Arg (cdr (assoc "id" (cadr N)))) (kv-remove-client (car N)) (link T))) (get (kv-value "%stats%/connected_clients") ] + (T 0) ] + +[de kv-cmd-client-list () + (glue "^J" (mapcar '((N) (glue " " (mapcar '((S) (pack (car S) "=" (cdr S))) (cadr N)))) (get (kv-value "%stats%/connected_clients") ] + +# convert a database to/from plaintext<->binary +[de kv-cmd-convert () + (setq *KV_binary (onOff *KV_binary)) + (setq *KV_db (pack (dirname *KV_db) (glue "." (append (head -1 (split (chop *KV_db) ".")) (if *KV_binary '(("b" "i" "n")) '(("d" "b"))))))) + (kv-save-db) ] + [de kv-cmd-del (Key Elements) (if (and Key (not (pre? "%stats%/" Key))) (length (wipe (extract '((N) (unless (pre? "%stats%/" N) (kv-name N))) (conc (list Key) Elements)))) - 0 ] # return 0 if no key is specified + 0 ] # return 0 if no key is specified + +[de kv-cmd-exists (Key Elements) + (if (and Key (not (pre? "%stats%/" Key))) + (cnt '((N) (unless (pre? "%stats%/" N) (kv-value N))) (conc (list Key) Elements)) + 0 ] # return 0 if no key is specified [de kv-cmd-ident (Child Elements) - (when (and Child Elements (lst? Elements)) # NIL if the IDENT isn't a list - [push1 '*KV/%stats%/connected_clients # only add to the list if it's unique - (list Child (append Elements (list (cons "ip" *Adr) ] - (pack "OK " (cdr (assoc "id" Elements) ] + (when (and Child Elements (lst? Elements)) # NIL if the IDENT isn't a list + [push1 '*KV/%stats%/connected_clients # only add unique clients to the list + (list Child (append (list (cons "id" (inc '*KV/%stats%/last_client)) (cons "pid" Child)) Elements) ] + (pack "OK " (cdr (assoc "name" Elements) ] [de kv-cmd-info (Section) (case (lowc Section) - ["server" (kv-info-format "Server" (kv-info-server) ] - ["clients" (kv-info-format "Clients" (kv-info-clients) ] - ["memory" (kv-info-format "Memory" (kv-info-memory) ] - ["stats" (kv-info-format "Stats" (kv-info-stats) ] - [T (kv-info-default) ] ] # any other value returns *all* stats + ["server" (kv-info-format "Server" (kv-info-server) ] + ["clients" (kv-info-format "Clients" (kv-info-clients) ] + ["memory" (kv-info-format "Memory" (kv-info-memory) ] + ["persistence" (kv-info-format "Persistence" (kv-info-persistence) ] + ["stats" (kv-info-format "Stats" (kv-info-stats) ] + (T (kv-info-default) ] [de kv-cmd-lindex (Src Index) (when (and Src (num? Index)) @@ -53,8 +92,7 @@ (length (car Key) ] [de kv-cmd-lpop (Src) - (when (and Src (not (pre? "*KV/%stats%/" Src))) - (pop Src) ] + (when (and Src (not (pre? "*KV/%stats%/" Src))) (pop Src) ] [de kv-cmd-lpoprpush (Src Dst) (when (and Src Dst (not (pre? "*KV/%stats%/" Src)) (not (pre? "%stats%/" Dst)) (pop Src)) @@ -62,23 +100,30 @@ (kv-cmd-rpush Dst (kv-name Dst) (list Result)) Result ] +[de kv-cmd-ping (Msg) + (if Msg @ "PONG") ] + [de kv-cmd-rpush (Key Dst Elements) (when (and Dst Elements (lst? Elements) (not (pre? "%stats%/" Key))) - (push1 (kv-name "keys") Key) # keep a list of all the keys - (mapc '((N) (queue Dst N)) Elements) + (kv-cmd-set Key (append (kv-value Key) Elements)) (length (car Dst) ] [de kv-cmd-set (Key Value) (when (and Key Value (not (pre? "%stats%/" Key)) (set (kv-name Key) Value)) - (push1 (kv-name "keys") Key) # keep a list of all the keys + (push1 (kv-name "keys") Key) # keep a list of all the keys "OK" ] [de kv-cmd-get (Key) - (when Key (get (kv-value Key) ] + (kv-value Key) ] + +[de kv-cmd-getset (Key Value) + (let Result (kv-cmd-get Key) + (kv-cmd-set Key Value) + Result ] # COMMAND helpers [de kv-name (Name) - (car (str (pack "*KV/" Name) ] + (any (pack "*KV/" Name) ] [de kv-value (Name) (car (any (pack "*KV/" Name) ] @@ -118,6 +163,22 @@ (cons "total_system_memory" Total_memory) (cons "total_system_memory_human" (/ Total_memory 1024 1024 1024) "M") ] +[de kv-info-persistence () + (list + (cons "db_format" (if *KV_binary "binary" "plaintext")) + (cons "loading" *KV/%stats%/loading) + (cons "rdb_changes_since_last_save" (if (info *KV_aof) (lines *KV_aof) 0)) + (cons "rdb_bgsave_in_progress" (if (info *KV_db_lock) 1 0)) + (cons "rdb_last_save_time" *KV/%stats%/rdb_last_save_time) + (cons "rdb_last_bgsave_status" *KV/%stats%/rdb_last_bgsave_status) + (cons "rdb_last_cow_size" *KV/%stats%/rdb_last_cow_size) + (cons "aof_enabled" (if *KV_persist 1 0)) + (cons "aof_rewrite_in_progress" *KV/%stats%/aof_rewrite_in_progress) + (cons "aof_last_write_status" *KV/%stats%/aof_last_write_status) + (cons "aof_current_size" (if (info *KV_aof) (car @) 0)) + (cons "aof_base_size" *KV/%stats%/aof_base_size) + ] + [de kv-info-stats () (list (cons "total_connections_received" (length *KV/%stats%/total_connections_received)) @@ -130,7 +191,8 @@ [de kv-info-default () (pack - (kv-info-format "Server" (kv-info-server)) - (kv-info-format "Clients" (kv-info-clients)) - (kv-info-format "Memory" (kv-info-memory)) - (kv-info-format "Stats" (kv-info-stats)) ] + (kv-info-format "Server" (kv-info-server)) + (kv-info-format "Clients" (kv-info-clients)) + (kv-info-format "Memory" (kv-info-memory)) + (kv-info-format "Persistence" (kv-info-persistence)) + (kv-info-format "Stats" (kv-info-stats)) ] diff --git a/libkv.l b/libkv.l index 34cf64c..4f6d317 100644 --- a/libkv.l +++ b/libkv.l @@ -10,11 +10,11 @@ *KV_verbose NIL *KV_port 6378 *KV_pass NIL - *KV_uuid "7672FDB2-4D29-4F10-BA7C-8EAD0E29626E" # for client handshake + *KV_uuid "7672FDB2-4D29-4F10-BA7C-8EAD0E29626E" # for client handshake, do not change *KV_startup_memory (* (heap) 1024 1024) ) # INITIALIZE -# Every integer statistic must be zero'd first. Others are wiped first +# Every statistic must be zero'd or wiped first. (off *KV/%stats%/connected_clients) (zero *KV/%stats%/total_connections_received @@ -23,111 +23,19 @@ *KV/%stats%/total_net_output_bytes *KV/%stats%/rejected_connections *KV/%stats%/keyspace_hits - *KV/%stats%/keyspace_misses ) + *KV/%stats%/keyspace_misses + *KV/%stats%/last_client ) # LOAD -(load "module.l" "commands.l" "lolwut.l") - -# STATS -# Process the message and send the result to the child over the named pipe -[de kv-sibling-job (Pid Msg) - (let (Result (kv-process Pid Msg) # here's the magic, process the message - Pipe_child (pil "tmp/" *PPid "/pipe_child_" Pid) ) - - (wait 1) - (kv-output "[msg] to client: " (sym Result)) - (inc '*KV/%stats%/total_net_output_bytes (bytes Result)) - - (if Result - (inc '*KV/%stats%/keyspace_hits) - (inc '*KV/%stats%/keyspace_misses) ) - - (out Pipe_child # named pipe of the child process - (pr (cons "message" Result) ] - -# Remove the child's process ID from the list of connected clients -[de kv-remove-client (Pid) - (setq *KV/%stats%/connected_clients - (filter '((N) (unless (= (car N) Pid) N)) - *KV/%stats%/connected_clients) ) - NIL ] # NIL breaks from the (kv-sibling) loop - -# Increment some statistics counters for the INFO command when there's an error -[de kv-sibling-error (Pid Msg) - (inc '*KV/%stats%/rejected_connections) - (inc '*KV/%stats%/total_net_output_bytes (bytes Msg)) - (kv-remove-client Pid) # seriously, forget about the child - NIL ] # NIL breaks from the (kv-sibling) loop - -# Process the message depending on its type -[de kv-sibling-message (Type Pid Msg) - (case Type - ("error" (kv-sibling-error Pid Msg)) # things aren't working out - ("done" (kv-remove-client Pid)) # forget about the child, it's over - ("message" (kv-sibling-job Pid Msg) ]# do some work in the sibling - -# Increment some statistics counters for the INFO command -[de kv-stats-update (Pid Msg) - (inc '*KV/%stats%/total_commands_processed) - (inc '*KV/%stats%/total_net_input_bytes (bytes Msg)) - (push1 '*KV/%stats%/total_connections_received Pid) ] +(load "module.l" "commands.l" "persistence.l") # IPC -# Perform some tasks when the sibling exits -[de kv-sibling-exit () - (kv-output "[sibling]=" *Pid " exiting") ] - -# Receive a message in the sibling, from the child, over a named pipe, then -# process the message and send the reply back to the child -[de kv-listen-sibling () - (in *Pipe_sibling # named pipe - (when (rd) # expect one message from the child - (let Msg @ - (kv-output "[msg] from child : (pid: " (cadr Msg) ") " (sym Msg)) - (kv-stats-update (cadr Msg) (caddr Msg)) - (kv-sibling-message # process the child's message - (car Msg) # should be the 'type' of message - (cadr Msg) # should be the 'sender' of the message - (caddr Msg) ] # should be the actual message - -# Receive a message from a sibling over a named pipe and send it to the client -[de kv-listen-child () - (in *Pipe_child # named pipe - (when (rd) # expect one message from the sibling - (let Result @ - (kv-out-sock "message" # send the message to the client - (cdr Result) ] - -# Receive a message from the client over the TCP socket -[de kv-listen-sock () - (in *Sock # TCP socket - (while (rd) # get multiple messages from the client - (let Msg @ - (kv-output "[msg] from client: (pid: " *Pid ") " *Adr " " (sym Msg)) - (kv-out-sibling "message" Msg) # tell the sibling what the client said - (kv-listen-child) ] # listen for sibling messages - -# Send a message to the sibling over a named pipe -[de kv-out-sibling (Type . @) - (out *Pipe_sibling # named pipe - (pr (list Type *Pid (car (rest) ] +(load "child.l") +(load "sibling.l") -# Send a message to the client over a TCP socket -[de kv-out-sock (Type Msg) - (out *Sock (pr (cons Type Msg) ] # TCP socket - -[de kv-hash (String) - (hash String) ] # non cryptographically secure hash - -# Authenticate the client via handshake, and authorizate with a password -[de kv-auth (Auth) - (and - (lst? Auth) # is the client even sending a list? - (= "AUTH" (car Auth)) # handshake - (= (kv-hash *KV_pass) (caddr Auth)) # shared password - (kv-out-sock - "AUTH" - (kv-hash (pack (cadr Auth) *KV_uuid) ] # tell the client we're good +# Set the value of a statistic +[de kv-stat (Key Value) + (set (any (pack "*KV/%stats%/" Key)) Value) ] # Create named pipes in the tmp directory of the parent process [de kv-mkfifo (Type) @@ -136,47 +44,27 @@ (setq *Pipe_child (pil "tmp/" *PPid "/pipe_child_" *Pid)) (setq *Pipe_sibling (tmp "pipe_sibling")) ) - (unless (info Filename) - (call "mkfifo" Filename) ] - -# Send some output to the console -[de kv-output @ - (when *KV_verbose (prinl (rest) ] - -# Perform some tasks when the child exits -[de kv-child-exit () - (kv-output "[child]=" *Pid " exiting") - (kv-out-sibling "done") # tell the sibling the child is done - (when (info *Pipe_child) - (call 'rm *Pipe_child) ] # remove the temporary named pipe + (unless (info Filename) (call "mkfifo" Filename) ] -# Receive the initial auth in a child process from the client over a TCP socket -[de kv-child () - (kv-output "[child]=" *Pid " [parent]=" *PPid) - (kv-mkfifo "child") # make a named pipe for the child - - (in *Sock # TCP Socket - (if (kv-auth (rd)) # try to authenticate - (kv-listen-sock) # auth OK, listen for client messages - (kv-out-sibling "error" "NOAUTH")# auth NOT OK, tell the sibling - (kv-out-sock "AUTH" "NOAUTH") ] # auth NOT OK, tell the client - -# Fork another child process known as the 'sibling' which stores all the data -[de kv-sibling () - (kv-mkfifo "sibling") # make a named pipe for the sibling - (unless (fork) - (kv-output "[sibling]=" *Pid) - (finally - (kv-sibling-exit) # sibling exits - (loop (kv-listen-sibling) ] # loop breaks when the result is NIL +# Loop on a TCP socket listening for client connections +[de kv-listen-loop () + (loop + (setq *Sock (listen *Portsock)) + (NIL (fork) (close *Portsock)) + (close *Sock) ] # Cleanup child processes before exiting [de kv-cleanup-kids () (when (kids) - (kv-output "[parent]=" *Pid " ending child processes: " (kids)) - (tell 'bye) ) # tell all the child processes to exit + (kv-output "[parent]=" *Pid " ending child processes: " (glue "," (kids))) + (tell 'bye) ) (kv-output "[parent]=" *Pid " exiting") ] +# Send some output to the console +[de kv-output @ + (when *KV_verbose (prinl "[" (dat$ (date) "-") "T" (tim$ (time) T) "] " (rest) ] + +# START # Launch a TCP listener and process some messages asynchronously [de kv-listen () (unless *KV_pass @@ -187,15 +75,11 @@ (kv-output "Parent PID: " *Pid) (use (*Portsock *Sock *Pipe_sibling *Pipe_child) - (setq *Portsock (port *KV_port)) # open a TCP port + (setq *Portsock (port *KV_port)) (finally (kv-cleanup-kids) - (kv-sibling) # sibling process - (loop # loop for new clients - (setq *Sock (listen *Portsock)) # listen for TCP connections - (NIL (fork) (close *Portsock)) # fork each new client TCP connection - (close *Sock) ) # close the socket when we're done - - (kv-child) # child process - (kv-child-exit) # final cleanup before the child exits - (bye) ] + (kv-sibling) + (kv-listen-loop) + (finally + (kv-child-exit) + (kv-child) ] diff --git a/libkvclient.l b/libkvclient.l index 1620e40..b27d652 100644 --- a/libkvclient.l +++ b/libkvclient.l @@ -13,9 +13,10 @@ *KV_clientid (hex (abs (rand))) *KV_port 6378 *KV_pass NIL - *KV_uuid "7672FDB2-4D29-4F10-BA7C-8EAD0E29626E" ) # for server handshake + *KV_abort 60 # max time (in seconds) to wait for a message + *KV_uuid "7672FDB2-4D29-4F10-BA7C-8EAD0E29626E" ) # for server handshake, do not change -(off *KV_poll) # disable polling by default +(off *KV_poll) # LOAD (load "module.l") @@ -33,70 +34,63 @@ ((lst? Result) (prinl (glue "," Result))) (T (prinl Result) ] +# non cryptographically secure hash, can be changed in the future [de kv-hash (String) - (hash String) ] # non cryptographically secure hash + (hash String) ] # IPC # Send commands to the server on the TCP socket [de kv-send-commands (Cmdargs) (case (uppc (pop 'Cmdargs)) + ["BGSAVE" (out *Sock (pr (list "BGSAVE") ] + ["CLIENT" (out *Sock (pr (list "CLIENT" (pop 'Cmdargs) (pop 'Cmdargs) (format (pop 'Cmdargs) ] + ["CONVERT" (out *Sock (pr (list "CONVERT") ] ["DEL" (out *Sock (pr (list "DEL" (pop 'Cmdargs) Cmdargs) ] + ["EXISTS" (out *Sock (pr (list "EXISTS" (pop 'Cmdargs) Cmdargs) ] ["GET" (out *Sock (pr (list "GET" (pop 'Cmdargs) ] - ["IDENT" (out *Sock (pr (append (list "IDENT") Cmdargs) ] + ["GETSET" (out *Sock (pr (list "GETSET" (pop 'Cmdargs) (pop 'Cmdargs) ] ["INFO" (out *Sock (pr (list "INFO" (pop 'Cmdargs) ] ["LINDEX" (out *Sock (pr (list "LINDEX" (pop 'Cmdargs) (format (pop 'Cmdargs) ] ["LLEN" (out *Sock (pr (list "LLEN" (pop 'Cmdargs) ] - ["LOLWUT" (out *Sock (pr (list "LOLWUT" (format (pop 'Cmdargs) ] ["LPOP" (out *Sock (pr (list "LPOP" (pop 'Cmdargs) ] ["LPOPRPUSH" (out *Sock (pr (list "LPOPRPUSH" (pop 'Cmdargs) (pop 'Cmdargs) ] + ["PING" (out *Sock (pr (list "PING" (pop 'Cmdargs) ] ["RPUSH" (out *Sock (pr (list "RPUSH" (pop 'Cmdargs) Cmdargs) ] + ["SAVE" (out *Sock (pr (list "SAVE") ] ["SET" (out *Sock (pr (list "SET" (pop 'Cmdargs) (pop 'Cmdargs) ] - [T (setq *Bye '((msg "ERROR: unknown command"))) (bye 1) ] ) ] + (T (setq *Bye '((msg "ERROR: unknown command"))) (bye 1) ] # Send commands to the server and receive a response [de kv-send-data (Cmdargs) - (when (kv-send-commands Cmdargs) - (kv-receive-data) ] # receive data from the server + (when (kv-send-commands Cmdargs) (kv-receive-data) ] # Receive data from the server on the TCP socket, return the result or NIL [de kv-receive () - (in *Sock - (when (rd) - (let Result @ - (case (car Result) - ("message" (cdr Result)) ] # good response from the server + (in *Sock (when (rd) @) ] # Receive data from the server on the TCP socket, return the parsed result [de kv-receive-data () - (in *Sock - (when (rd) - (let Result @ - (if (cdr Result) - (case (car Result) - ("message" (cdr Result)) # good response from the server - (T "unknown data"))# bad response from the server - "no data" ) ] # NIL response from the server - -# Send client identity command to the server with a variable number of arguments -[de kv-identify @ - (kv-send-data - (append - (list "IDENT" - (cons "id" *KV_clientid) - (cons "hostname" (in '(hostname) (line T))) ) - (make (while (rest) (link (cons (next) (next) ] + (abort *KV_abort + (in *Sock + (when (rd) + (let Result @ + (if (cdr Result) + (case (car Result) + ("message" (cdr Result)) # good response from the server + (T "unknown data")) # bad response from the server + "no data" ) ] # NIL response from the server # Authenticate to the server by hashing the password and validating the response [de kv-authenticate () (out *Sock (pr (list "AUTH" *KV_clientid (kv-hash *KV_pass)))) (in *Sock (if (and (rd) (= (cdr @) (kv-hash (pack *KV_clientid *KV_uuid)))) - T + (kv-receive-data) (kv-throw "ERROR: bad auth") ] # Open a TCP socket and connect to the server [de kv-start-client () (setq *Sock (connect *KV_host *KV_port)) (if *Sock - (kv-authenticate) # perform initial handshake and auth + (kv-authenticate) (kv-throw "ERROR: could not connect to server") ] diff --git a/lolwut.l b/lolwut.l deleted file mode 100644 index 82964c3..0000000 --- a/lolwut.l +++ /dev/null @@ -1,33 +0,0 @@ -# picolisp-kv - https://github.com/aw/picolisp-kv -# -# LOLWUT, inspired by Redis - https://redis.io/commands/lolwut -# -# The MIT License (MIT) -# Copyright (c) 2020 Alexander Williams, On-Prem - -(seed (in "/dev/urandom" (rd 20))) - -(setq - *KV/%lolwut%/max_columns 80 - *KV/%lolwut%/max_lines 10 ) - -# LOLWUT command helpers -# sparkLine function borrowed from: https://rosettacode.org/wiki/Sparkline_in_unicode#PicoLisp -[de kv-lolwut-sparkline (Lst) - (let (Min (apply min Lst) - Max (apply max Lst) - Rng (- Max Min) ) - - (pack (mapcar '((N) (char (+ 9601 (*/ (- N Min) 7 Rng)))) Lst) ] - -# COMMANDS -[de kv-cmd-lolwut (Num) - (default Num *KV/%lolwut%/max_lines) - - (glue "^J" - (make - (do (cond - ((le0 Num) 1) - ((> Num 20) 10) - (T Num) ) - (link (kv-lolwut-sparkline (make (do *KV/%lolwut%/max_columns (link (rand 1 100) ] diff --git a/persistence.l b/persistence.l new file mode 100644 index 0000000..b707889 --- /dev/null +++ b/persistence.l @@ -0,0 +1,177 @@ +# picolisp-kv - https://github.com/aw/picolisp-kv +# +# Persistence similar to Redis AOF/Snapshot: https://redis.io/topics/persistence +# +# The MIT License (MIT) +# Copyright (c) 2020 Alexander Williams, On-Prem + +# CONSTANTS +(setq + *KV_persist NIL + *KV_binary NIL + *KV_aof "kv.aof" + *KV_db "kv.db" ) + +[de kv-tmpfile-set () + (setq + *KV_aof_lock (pack (dirname *KV_aof) "." (basename *KV_aof) ".lock") + *KV_aof_tmp (pack (dirname *KV_aof) "." (basename *KV_aof) ".tmp") + *KV_db_lock (pack (dirname *KV_db) "." (basename *KV_db) ".lock") + *KV_db_tmp (pack (dirname *KV_db) "." (basename *KV_db) ".tmp") ] + +(kv-tmpfile-set) + +# INITIALIZE +(off + *KV/%stats%/rdb_last_bgsave_status ) +(zero + *KV/%stats%/loading + *KV/%stats%/rdb_changes_since_last_save + *KV/%stats%/rdb_bgsave_in_progress + *KV/%stats%/rdb_last_save_time + *KV/%stats%/rdb_last_cow_size + *KV/%stats%/aof_rewrite_in_progress + *KV/%stats%/aof_current_size + *KV/%stats%/aof_base_size ) + +# PERSISTENCE +# Rewrite the AOF with new entries if they were added +[de kv-rewrite-aof () + (ctl *KV_aof_lock + (one *KV/%stats%/aof_rewrite_in_progress) + (when (info *KV_aof_tmp) + (kv-output "====== Rewriting AOF ======") + (out (pack "+" *KV_aof_tmp) (in *KV_aof (echo))) # Append the current AOF into the temporary AOF + (out *KV_aof (in *KV_aof_tmp (echo))) # Copy the temporary AOF into the current AOF + (call 'rm "-f" *KV_aof_tmp) + (kv-output "====== AOF saved ======") ) + (zero *KV/%stats%/aof_rewrite_in_progress) ] + +[de kv-remove-aof (Bg) + (unless Bg (out *KV_aof (rewind))) + (call 'rm "-f" *KV_aof_tmp) ] + +# Write the new DB to disk +[de kv-write-db () + (kv-stat "rdb_last_cow_size" (car (info *KV_db_tmp))) + (and + (if (info *KV_db) + (call 'cp *KV_db (pack (dirname *KV_db) "." (basename *KV_db) ".old")) + T ) + (or (kv-output "====== Writing DB ======") T) + (call 'mv *KV_db_tmp *KV_db) ) + (or (kv-output "====== DB saved ======") T) ] + +# Write data to the DB, then write the AOF (truncate or wipe) +[de kv-write-data (Bg) + (and (info *KV_db_tmp) (gt0 (car @)) (kv-write-db) (kv-remove-aof Bg) ] + +# Write the data in binary PLIO (pr) or plaintext (println) format +[de kv-save-data (Key) + (let Result (kv-value Key) + (when Result + (if *KV_binary + (pr (list Key Result)) + (println (list Key Result)) ] + +# Write all the known keys to a temporary DB file +[de kv-save-db-keys () + (out *KV_db_tmp + (mapcar kv-save-data (kv-cmd-get "keys") ] + +# Perform some maintenance tasks when save ends +[de kv-save-cleanup () + (call 'rm "-f" *KV_aof_lock *KV_db_lock) ] + +# Obtain a UNIX timestamp +[de kv-timestamp (Ns) + (in (list 'date (if Ns "+%s.%N" "+%s")) (line T) ] + +# Save the entire DB keyspace to a file +[de kv-save-db (Bg) + (if (kv-locked?) + (kv-rewrite-aof) # restore the AOF if the DB is locked + (out *KV_db_lock (prinl *Pid)) + (kv-output "[dbwriter]=" *Pid " Saving the DB to " *KV_db) + (kv-stat "rdb_last_save_time" (kv-timestamp)) + + (finally + (kv-save-cleanup) + (kv-save-db-keys) + (kv-write-data Bg) + (unless *PPid (bye)) + + (kv-stat "rdb_last_bgsave_status" "OK") ] + +# Check if the DB is locked for writing, and return the error message +[de kv-locked? () + (when (info *KV_db_lock) + (out 2 (prinl "^J======^JDB is locked for writing by Pid " (in *KV_db_lock (line T)) ", not saving^J======^J")) + (kv-stat "rdb_last_bgsave_status" "Error: DB is locked for writing") ] + +# Save the entire DB keyspace to a file in the background (fork) +[de kv-bgsave-db (Aof) + (if (kv-locked?) + @ + (kv-stat "rdb_last_save_time" (kv-timestamp)) + (call 'cp *KV_aof *KV_aof_tmp) # make a copy of the AOF before we dump the DB to disk + (out Aof (rewind)) # wipe the contents of the AOF + (unless (fork) (kv-save-db T) (bye)) + (kv-stat "rdb_last_bgsave_status" "Background saving started") ] + +# Restore the in-memory database from entries stored in the DB file +[de kv-restore-db (Filename) + (kv-stat "loading" 1) + (kv-stat "rdb_last_cow_size" (car (info Filename))) + + # TODO: currently allocating 5x more than DB filesize, must validate + (gc (+ 1 (* 5 (/ (kv-value "%stats%/rdb_last_cow_size") 1024 1024)))) # pre-allocate enough memory for the entire DB + + (in Filename + (while (if *KV_binary (rd) (read)) + (inc '*ERROR_LINE) + (let Result @ + (kv-cmd-set (car Result) (cadr Result)) ) ) ) + + (kv-stat "loading" 0) ] + +# Replay the append-only log file to re-load all the missing keys into the DB +[de kv-replay-aof (Filename) + (kv-stat "aof_base_size" (car (info Filename))) + (kv-stat "loading_aof" 1) + + (in Filename + (while (read) + (inc '*ERROR_LINE) + (let Log @ + (if (= (cadr Log) (kv-hash (caddr Log))) + (kv-process *Pid (caddr Log)) # replay the entry from the log + (quit "Mismatched AOF entry, incorrect hash") ) ) ) ) + + (kv-stat "loading_aof" 0) ] + +# Check if there was a read error, return the error message, and stop the parent +[de kv-read-error (Type Filename) + (when *Msg + (out 2 (prinl "^J======^JERROR: " Type " error on line " *ERROR_LINE " of " Filename ": " *Msg "^J======^J")) + (kill *PPid) + (bye 1) ] + +# Restore the DB or replay the AOF if its filesize is greater than 0 bytes +[de kv-restore (Type Filename) + (use *ERROR_LINE + (zero *ERROR_LINE) + (when (and (info Filename) (gt0 (car @))) + (catch '("EOF Overrun" "Mismatched" "List expected" "Bad input") + (finally + (kv-read-error Type Filename) + (if (= "AOF" Type) + (kv-replay-aof Filename) + (kv-restore-db Filename) ] + +# Save a write command to the append-only log file with a timestamp and hash of the data +[de kv-save-aof (Request Aof) # Aof is a file descriptor + (when (member (car Request) '("DEL" "GETSET" "LPOP" "LPOPRPUSH" "RPUSH" "SET")) + (ctl *KV_aof_lock # try to obtain an exclusive lock + (out Aof (println (list (kv-timestamp T) (kv-hash Request) Request))) + (kv-stat "aof_last_write_status" (if @ "OK" "FAILED") ] diff --git a/server.l b/server.l index e9bf86d..2a336eb 100755 --- a/server.l +++ b/server.l @@ -7,23 +7,33 @@ [de APP_HELP ("usage" "./server.l --pass [options]") - ("example" "./server.l --pass foobared --port 6378 --verbose'^J") + ("example" "./server.l --pass foobared --port 6378 --verbose --persist 60^J") ("options" ("--help" "Show this help message and exit") () + ("--binary" "Store data in binary format instead of text (default: plaintext)") ("--pass " "Password used by clients to access the server (required)") + ("--persist " "Number of seconds between database persists to disk (default: disabled)") ("--port " "TCP listen port for communication with clients (default: 6378)") ("--verbose" "Verbose flag (default: False)") ] (chdir (car (file)) (load "libkv.l" "clihelpers.l")) +# Enable storing the database in binary format (PLIO) +[de kv-enable-binary () + (on *KV_binary) + (setq *KV_db "kv.bin") + (kv-tmpfile-set) ] + # START (ifn (argv) (kv-show-help) (while (opt) (case @ - (--verbose (on *KV_verbose)) # default 'off' - (--port (setq *KV_port (format (opt)))) # default '6378' - (--pass (setq *KV_pass (opt))) # required + (--binary (kv-enable-binary)) # default 'off' + (--verbose (on *KV_verbose)) # default 'off' + (--port (setq *KV_port (format (opt)))) # default '6378' + (--persist (setq *KV_persist (format (opt)))) # default 'off' + (--pass (setq *KV_pass (opt))) # required (T (kv-show-help) (bye 1)) ) ) (kv-listen) ) diff --git a/sibling.l b/sibling.l new file mode 100644 index 0000000..ab080c0 --- /dev/null +++ b/sibling.l @@ -0,0 +1,101 @@ +# Process the message and send the result to the child over the named pipe +[de kv-sibling-job (Pid Msg) + (when *KV_persist (kv-save-aof Msg *Aof_desc)) # save the request to a log file first + + (let (Result (kv-process Pid Msg) + Pipe_child (pil "tmp/" *PPid "/pipe_child_" Pid) ) + + (wait 1) + (kv-output "[msg] to child: " (sym Result)) + (inc '*KV/%stats%/total_net_output_bytes (bytes Result)) + + (if Result + (inc '*KV/%stats%/keyspace_hits) + (inc '*KV/%stats%/keyspace_misses) ) + + (out Pipe_child (pr (cons "message" Result) ] + +# Remove the child's process ID from the list of connected clients +[de kv-remove-client (Pid) + (kv-stat "connected_clients" (filter '((N) (unless (= (car N) Pid) N)) *KV/%stats%/connected_clients)) + NIL ] # NIL breaks from (kv-sibling-loop) + +# Increment some statistics counters for the INFO command when there's an error +[de kv-sibling-error (Pid Msg) + (inc '*KV/%stats%/rejected_connections) + (inc '*KV/%stats%/total_net_output_bytes (bytes Msg)) + (kv-remove-client Pid) + NIL ] # NIL breaks from (kv-sibling-loop) + +# Process the message depending on its type +[de kv-sibling-message (Type Pid Msg) + (case Type + ("error" (kv-sibling-error Pid Msg)) + ("done" (kv-remove-client Pid)) + ("message" (kv-sibling-job Pid Msg) ] + +# Increment some statistics counters for the INFO command +[de kv-stats-update (Pid Msg) + (inc '*KV/%stats%/total_commands_processed) + (inc '*KV/%stats%/total_net_input_bytes (bytes Msg)) + (push1 '*KV/%stats%/total_connections_received Pid) ] + +# Receive a message in the sibling, from the child, over a named pipe, then +# process the message and send the reply back to the child +[de kv-listen-sibling () + [in *Pipe_sibling + (when (rd) + (let Msg @ + (kv-stats-update (cadr Msg) (caddr Msg)) + (kv-sibling-message + (car Msg) # should be the 'type' of message + (cadr Msg) # should be the sender's Pid + (caddr Msg) ] # should be the actual message + T ] + +# Timer to make a BGSAVE if necessary +[de kv-bgsave-timer () + (setq *Elapsed (- (time) *Start)) # how much time elapsed since timer started + (ifn (>= *Elapsed *KV_persist) + (abort (- *KV_persist *Elapsed) (kv-listen-sibling)) + (setq *Start (time)) # restart the timer because it expired + (kv-bgsave-db *Aof_desc) ] + +# Start the loop which listens for new messages +[de kv-sibling-loop () + (use (*Aof_desc *Start *Elapsed) + (setq *Aof_desc (open *KV_aof)) # obtain a file descriptor for the AOF + (setq *Start (time)) # start the clock for the bgsave timer + (loop + (if *KV_persist + (kv-bgsave-timer) + (kv-listen-sibling) ] + +# Restore the DB and AOF, then save it in the foreground (blocking) +[de kv-sibling-restore () + (when *KV_persist + (kv-restore "DB" *KV_db) + (kv-restore "AOF" *KV_aof) + (kv-save-db) ] + +# Remove a locked process with SIGKILL +[de kv-remove-locked () + (when (info *KV_db_lock) (kill (in *KV_db_lock (format (line T))) 9) ] + +# Perform some tasks when the sibling exits, such as removing locks on the DB and AOF +[de kv-sibling-exit () + (kv-output "[sibling]=" *Pid " exiting") + (when *KV_persist + (kv-remove-locked) + (call 'rm "-f" *KV_aof_lock *KV_db_lock) ) + (kill *PPid) ] + +# Fork another child process known as the 'sibling' which stores all the data +[de kv-sibling () + (kv-mkfifo "sibling") + (unless (fork) + (kv-output "[sibling]=" *Pid " started") + (finally + (kv-sibling-exit) + (kv-sibling-restore) + (kv-sibling-loop) ] diff --git a/test.l b/test.l index 1960190..c6e9817 100755 --- a/test.l +++ b/test.l @@ -6,13 +6,30 @@ (setq *KV_pass "testpasswordfortests" - *KV_port 46378 ) + *KV_persist 600 # 10 minutes + *KV_port (rand 40001 49999) ) -(unless (fork) - (exec './server.l "--pass" *KV_pass "--port" *KV_port) ) +(prinl "Starting server on port: " *KV_port) +# run key/value store tests with persistence (chdir (pack (car (file)) "test/") - (mapcar load (filter '((N) (sub? "test_" N)) (dir "."))) ) + (unless (fork) + (exec '../server.l "--pass" *KV_pass "--port" *KV_port "--persist" *KV_persist) ) + (mapcar load (filter '((N) (sub? "test_kv" N)) (dir "."))) ) + +(kill (car (kids))) # ensure the server is stopped + +(wait 1000) + +(setq *KV_port (rand 40001 49999)) + +# run the client/server tests without persistence +(chdir (pack (car (file)) "test/") + (call 'rm "-f" *KV_db (pack "." *KV_db ".old") *KV_aof *KV_aof_lock) # cleanup first + (unless (fork) + (exec '../server.l "--pass" *KV_pass "--port" *KV_port) ) + (mapcar load (filter '((N) (sub? "test_cs" N)) (dir "."))) ) + +(kill (car (kids))) # ensure the server is stopped -(kill (car (kids))) (report) diff --git a/test/test_cs.l b/test/test_cs.l index 66d7bbd..244a906 100644 --- a/test/test_cs.l +++ b/test/test_cs.l @@ -1,18 +1,19 @@ # Client/Server tests [de test-client-cmd @ - (append (list './client.l "--port" *KV_port "--id" *Client_id "--pass" *KV_pass) (rest) ] + (append (list './client.l "--port" *KV_port "--name" *Client_id "--pass" *KV_pass) (rest) ] [de tests-execute () - (call './client.l "--port" *KV_port "--id" *Client_id "--pass" *KV_pass "IDENT") - (assert-equal (pack "OK " *Client_id) (in (test-client-cmd "INFO" "server") (line T)) "[CLIENT/SERVER] Test IDENT command") - (assert-kind-of 'String (in (test-client-cmd "LOLWUT") (line) (line T)) "[CLIENT/SERVER] Test LOLWUT command") + (call './client.l "--port" *KV_port "--name" *Client_id "--pass" *KV_pass "INFO") + (assert-equal (pack "OK " *Client_id) (in (test-client-cmd "INFO" "server") (line T)) "[CLIENT/SERVER] Test INFO command") (assert-equal "OK" (in (test-client-cmd "SET" "testkey" "testdata") (line) (line T)) "[CLIENT/SERVER] Test SET command") (assert-equal "testdata" (in (test-client-cmd "GET" "testkey") (line) (line T)) "[CLIENT/SERVER] Test GET command") (assert-equal "no data" (in (test-client-cmd "GET" "nokey") (line) (line T)) "[CLIENT/SERVER] Test NO DATA result") - (assert-nil (call './client.l "--port" *KV_port "--id" *Client_id "--pass" *KV_pass "UNKNOWN") "[CLIENT/SERVER] Test UNKNOWN command") - (assert-nil (call './client.l "--port" *KV_port "--id" *Client_id "--pass" "badpass" "GET") "[CLIENT/SERVER] Test AUTH with bad password") + (assert-nil (call './client.l "--port" *KV_port "--name" *Client_id "--pass" *KV_pass "UNKNOWN") "[CLIENT/SERVER] Test UNKNOWN command") + (assert-nil (call './client.l "--port" *KV_port "--name" *Client_id "--pass" "badpass" "GET") "[CLIENT/SERVER] Test AUTH with bad password") (assert-kind-of 'Number (in (test-client-cmd "GET" "%stats%/keyspace_hits") (line) (format (line T))) "[CLIENT/SERVER] Test STATS command") + (assert-includes (pack "name=" *Client_id " addr=") (in (test-client-cmd "CLIENT" "LIST") (line) (line T)) "[CLIENT/SERVER] Test CLIENT LIST command") + (assert-kind-of 'Number (in (test-client-cmd "CLIENT" "ID") (line) (format (line T))) "[CLIENT/SERVER] Test CLIENT ID command") ] [de tests-client-server () diff --git a/test/test_kv.l b/test/test_kv.l index a17bfc5..730cbdf 100644 --- a/test/test_kv.l +++ b/test/test_kv.l @@ -7,6 +7,8 @@ (assert-nil (kv-cmd-get (kv-name "nonexistant")) "[GET] Should return NIL if the key doesn't exist") (assert-equal "OK" (kv-cmd-set "testkey" "testvalue") "[SET] Should set a key and return OK") (assert-equal "testvalue" (kv-cmd-get "testkey") "[GET] Should return the value of the key") + (assert-equal "testvalue" (kv-cmd-getset "testkey" "newvalue") "[GETSET] Should return the old value") + (assert-equal "newvalue" (kv-cmd-get "testkey") "[GETSET] Should return the new value") ] [de test-commands-rpush () @@ -49,15 +51,15 @@ (assert-nil (kv-cmd-ident) "[IDENT] Should return NIL if no child pid is provided") (assert-nil (kv-cmd-ident 12345) "[IDENT] Should return NIL if no elements are provided") (assert-nil (kv-cmd-ident 12345 "abc") "[IDENT] Should return NIL if elements aren't a list") - (assert-equal "OK abc" (kv-cmd-ident 12345 (list (cons "id" "abc"))) "[IDENT] Should return OK with the client ID") - (assert-equal '(12345 (("id" . "abc") ("ip"))) (kv-cmd-lindex (kv-name "%stats%/connected_clients") 0) "[LINDEX] Should return the info of the connected client") + (assert-equal "OK abc" (kv-cmd-ident 12345 (list (cons "name" "abc"))) "[IDENT] Should return OK with the client ID") + (assert-equal '12345 (cdr (assoc "pid" (cadr (kv-cmd-lindex (kv-name "%stats%/connected_clients") 0)))) "[LINDEX] Should return the info of the connected client") (assert-kind-of 'Number (kv-cmd-llen (kv-name "%stats%/connected_clients")) "[LLEN] Should show only 1 connected client") ] [de test-commands-info () - (assert-equal 46378 (cdr (assoc "tcp_port" (kv-info-server))) "[INFO-SERVER] Should return a list of server INFO") + (assert-equal *KV_port (cdr (assoc "tcp_port" (kv-info-server))) "[INFO-SERVER] Should return a list of server INFO") (assert-nil (cdr (assoc "connected_clients" (kv-info-server))) "[INFO-CLIENTS] Should return a list of clients INFO") - (assert-equal "1M" (cdr (assoc "used_memory_human" (kv-info-memory))) "[INFO-MEMORY] Should return a list of memory INFO") + (assert-kind-of 'String (cdr (assoc "used_memory_human" (kv-info-memory))) "[INFO-MEMORY] Should return a list of memory INFO") (assert-equal 1 (cdr (assoc "total_connections_received" (kv-info-stats))) "[INFO-STATS] Should return a list of stats INFO") (assert-kind-of 'String (kv-info-default) "[INFO] Should return a string of INFO") ] @@ -97,21 +99,53 @@ (assert-equal "OK" (kv-process 12345 (list "SET" "processkey" "processvalue")) "[PROCESS] Should process the SET command") (assert-equal "processvalue" (kv-process 12345 (list "GET" "processkey")) "[PROCESS] Should process the GET command") (assert-equal 3 (kv-process 12345 (list "RPUSH" "processkey" (11 22 33))) "[PROCESS] Should process the RPUSH command") - (assert-equal "OK 007" (kv-process 12345 (list "IDENT" (list "id" "007"))) "[PROCESS] Should process the IDENT command") + (assert-equal "OK 007" (kv-process 12345 (list "IDENT" (list "name" "007"))) "[PROCESS] Should process the IDENT command") (assert-kind-of 'String (kv-process 12345 (list "INFO")) "[PROCESS] Should process the INFO command") (assert-equal 22 (kv-process 12345 (list "LINDEX" "processkey" 1)) "[PROCESS] Should process the LINDEX command") (assert-equal 3 (kv-process 12345 (list "LLEN" "processkey")) "[PROCESS] Should process the LLEN command") - (assert-kind-of 'String (kv-process 12345 (list "LOLWUT" 1)) "[PROCESS] Should process the LOLWUT command") (assert-equal 11 (kv-process 12345 (list "LPOP" "processkey")) "[PROCESS] Should process the LPOP command") (assert-equal 22 (kv-process 12345 (list "LPOPRPUSH" "processkey" "processdest")) "[PROCESS] Should process the LPOPRPUSH command") (assert-kind-of 'String (kv-process 12345 (list "info" "server")) "[PROCESS] Should process a lowercase command") ] +[de test-commands-bgsave () + (kv-cmd-rpush "mylist" (kv-name "mylist") '("1" "2" "3" "4" "5" "6" "7" "8" "9" "10")) + (let KV_desc (open *KV_aof) + (kv-save-aof '("RPUSH" "mylist" ("1" "2" "3" "4" "5" "6" "7" "8" "9" "10")) KV_desc) + (close KV_desc) ) + (assert-equal "OK" (kv-cmd-get "%stats%/aof_last_write_status") "[BGSAVE] Should be OK for saving a valid AOF entry") + (kv-save-db-keys) + (kv-write-db) + (assert-equal 0 (kv-restore "DB" *KV_db) "[BGSAVE] Should return 0 when restoring the DB") + (assert-equal 0 (kv-restore "AOF" *KV_aof) "[BGSAVE] Should return 0 when restoring the AOF") + (out *KV_db_lock (prinl "12345")) + (assert-equal "Error: DB is locked for writing" (kv-locked?) "[BGSAVE] Should return an error if the DB is locked") + (call 'rm "-f" *KV_db_lock) + (assert-nil (kv-locked?) "[BGSAVE] Should return NIL if the DB is not locked") + (assert-kind-of 'String (kv-timestamp) "[BGSAVE] Should return a UNIX timestamp") + (kv-save-db-keys) + (assert-kind-of 'List (info *KV_db_tmp) "[BGSAVE] Should return a list if a temp DB exists after saving the keys") + (assert-t (kv-write-db) "[BGSAVE] Should write the DB to disk") + (assert-equal 0 (kv-rewrite-aof) "[BGSAVE] Should return 0 when writing the AOF to disk") + ] + +[de test-commands-exists () + (kv-cmd-rpush "existlist" (kv-name "existlist") '("1" "2" "3" "4" "5" "6" "7" "8" "9" "10")) + (kv-cmd-rpush "existlist2" (kv-name "existlist2") '("1" "2" "3" "4" "5" "6" "7" "8" "9" "10")) + (assert-equal 1 (kv-cmd-exists "existlist") "[EXISTS] Should return 1 if the key exists") + (assert-equal 0 (kv-cmd-exists "nonexists") "[EXISTS] Should return 0 if the key doesn't exist") + (assert-equal 2 (kv-cmd-exists "existlist" '("existlist2")) "[EXISTS] Should return 2 if the two keys exist") + (assert-equal 3 (kv-cmd-exists "existlist" '("existlist" "existlist2")) "[EXISTS] Should return 3 if the two keys exist and one is a duplicate") + ] + +[de test-commands-ping () + (assert-equal "PONG" (kv-cmd-ping) "[PING] Should return PONG") + (assert-equal "Hello" (kv-cmd-ping "Hello") "[PING] Should return Hello") + ] [execute (prinl "^J Testing Key/Value^J") '(assert-equal 0 (kv-cmd-llen (kv-name "nonexistant")) "[LLEN] Should return 0 if the key doesn't exist") - '(assert-kind-of 'String (kv-cmd-lolwut 1) "[LOLWUT] Should return a sparkline as a string") '(test-commands-del) '(test-commands-get-set) '(test-commands-ident) @@ -122,4 +156,7 @@ '(test-commands-rpush) '(test-commands-stats-failures) '(test-commands-process) + '(test-commands-bgsave) + '(test-commands-exists) + '(test-commands-ping) ]