From 165294483c6201c2d8988920c0f45d1094f781cf Mon Sep 17 00:00:00 2001 From: Matt Haggard Date: Fri, 12 Dec 2025 12:18:52 -0500 Subject: [PATCH 1/3] Improved data storage efficiency of multi-recipient messages --- src/bucketsrelay/v2/objs.nim | 2 +- src/bucketsrelay/v2/proto2.nim | 103 +++++++++++++++++++++++---------- tests/tproto2.nim | 41 ++++++++++++- 3 files changed, 112 insertions(+), 34 deletions(-) diff --git a/src/bucketsrelay/v2/objs.nim b/src/bucketsrelay/v2/objs.nim index 70e6bea..ecc306b 100644 --- a/src/bucketsrelay/v2/objs.nim +++ b/src/bucketsrelay/v2/objs.nim @@ -93,7 +93,7 @@ const RELAY_MAX_NOTES* = 1000 RELAY_NOTE_DURATION* = 5 * 24 * 60 * 60 RELAY_MAX_MESSAGE_SIZE* = 65536 * 2 - RELAY_MAX_KEY_SIZE* = 4096 + RELAY_MAX_KEY_SIZE* = 512 RELAY_MESSAGE_DURATION* = 30 * 24 * 60 * 60 const diff --git a/src/bucketsrelay/v2/proto2.nim b/src/bucketsrelay/v2/proto2.nim index fe97346..a5320a1 100644 --- a/src/bucketsrelay/v2/proto2.nim +++ b/src/bucketsrelay/v2/proto2.nim @@ -192,19 +192,42 @@ proc updateSchema*(db: DbConn) = db.exec(sql"CREATE INDEX note_created ON note(created)") db.exec(sql"CREATE INDEX note_src ON note(src)") - # message - db.exec(sql"""CREATE TABLE message ( + # message - stores message data once + db.exec(sql"""CREATE TABLE message_data ( id INTEGER PRIMARY KEY AUTOINCREMENT, created TIMESTAMP DEFAULT CURRENT_TIMESTAMP, key BLOB NOT NULL, - src TEXT NOT NULL, - dst TEXT NOT NULL, + src BLOB NOT NULL, data BLOB NOT NULL )""") - db.exec(sql"CREATE INDEX message_created ON message(created)") - db.exec(sql"""CREATE UNIQUE INDEX message_dst_key - ON message(dst, key) - WHERE key IS NOT x'' + db.exec(sql"CREATE INDEX message_data_created ON message_data(created)") + + # message recipients - tracks who needs to receive each message + db.exec(sql"""CREATE TABLE message_recipient ( + message_id INTEGER NOT NULL, + dst BLOB NOT NULL, + src BLOB NOT NULL, + key BLOB NOT NULL, + PRIMARY KEY (message_id, dst), + FOREIGN KEY (message_id) REFERENCES message_data(id) ON DELETE CASCADE + )""") + db.exec(sql"CREATE INDEX message_recipient_dst ON message_recipient(dst)") + # Ensure only one message per (dst, src, key) when key is not empty + # Different senders can send messages with the same key to the same recipient + db.exec(sql"""CREATE UNIQUE INDEX message_recipient_dst_src_key + ON message_recipient(dst, src, key) + WHERE key != x'' + """) + + # Automatically delete message_data when last recipient is removed + db.exec(sql"""CREATE TRIGGER cleanup_message_data + AFTER DELETE ON message_recipient + WHEN NOT EXISTS ( + SELECT 1 FROM message_recipient WHERE message_id = OLD.message_id + ) + BEGIN + DELETE FROM message_data WHERE id = OLD.message_id; + END """) # stats @@ -419,20 +442,23 @@ proc delExpiredMessages(relay: Relay) = else: -RELAY_MESSAGE_DURATION let offstring = &"{offset} seconds" - relay.db.exec(sql"DELETE FROM message WHERE created <= datetime('now', ?)", offstring) + # Delete expired message data; CASCADE will automatically delete associated recipients + relay.db.exec(sql"DELETE FROM message_data WHERE created <= datetime('now', ?)", offstring) proc nextMessage(relay: Relay, dst: SignPublicKey): Option[RelayMessage] = let orow = relay.db.getRow(sql""" - SELECT key, src, data, id - FROM message + SELECT md.key, md.src, md.data, md.id + FROM message_recipient mr + JOIN message_data md ON mr.message_id = md.id WHERE - dst = ? + mr.dst = ? ORDER BY - created ASC, - id ASC + md.created ASC, + md.id ASC LIMIT 1""", dst) if orow.isSome: let row = orow.get() + let message_id = row[3].i result = some(RelayMessage( kind: Data, resp_id: 0, # Data messages are not triggered by recipient's command @@ -440,16 +466,9 @@ proc nextMessage(relay: Relay, dst: SignPublicKey): Option[RelayMessage] = data_src: SignPublicKey.fromDB(row[1].b), data_val: row[2].b.string, )) - relay.db.exec(sql"DELETE FROM message WHERE id=?", row[3].i) - -proc delExpiredChunks(relay: Relay) = - let offset = when TESTMODE: - -RELAY_MESSAGE_DURATION + TIME_SKEW - else: - -RELAY_MESSAGE_DURATION - let offstring = &"{offset} seconds" - relay.db.exec(sql"DELETE FROM chunk WHERE last_used <= datetime('now', ?)", offstring) + # Delete this recipient entry (trigger will auto-cleanup message_data if this was the last recipient) + relay.db.exec(sql"DELETE FROM message_recipient WHERE message_id = ? AND dst = ?", message_id, dst) #------------------------------------------------------------------- # relay command handling @@ -532,7 +551,7 @@ proc handleCommand*[T](relay: Relay[T], conn: var RelayConnection[T], cmd: Relay pubkey, ) conn.sendOkay(cmd) - except: + except CatchableError: conn.sendError(cmd, "Duplicate topic", Generic) of FetchNote: if cmd.fetch_topic.len > RELAY_MAX_TOPIC_SIZE: @@ -578,9 +597,12 @@ proc handleCommand*[T](relay: Relay[T], conn: var RelayConnection[T], cmd: Relay pubkey = pubkey, send = 1, ) + + # Collect offline recipients + var offline_dsts: seq[SignPublicKey] for dst_pubkey in cmd.send_dst: if relay.clients.hasKey(dst_pubkey): - # dst is online + # dst is online - send directly var other_conn = relay.clients[dst_pubkey] other_conn.sendMessage(RelayMessage( kind: Data, @@ -595,9 +617,28 @@ proc handleCommand*[T](relay: Relay[T], conn: var RelayConnection[T], cmd: Relay data_out = cmd.send_val.len + cmd.send_key.len, ) else: - # dst is offline - relay.db.exec(sql""" - INSERT OR REPLACE INTO message - (key, src, dst, data) - VALUES (?, ?, ?, ?)""", - cmd.send_key.DbBlob, pubkey, dst_pubkey, cmd.send_val.DbBlob) + # dst is offline - collect for batch storage + offline_dsts.add(dst_pubkey) + + # Store message data once for all offline recipients + if offline_dsts.len > 0: + relay.db.exec(sql"BEGIN") + try: + # Insert message data once and get the ID + let message_id = relay.db.insertID(sql""" + INSERT INTO message_data (key, src, data) + VALUES (?, ?, ?)""", + cmd.send_key.DbBlob, pubkey, cmd.send_val.DbBlob) + + # Add recipient entries for each offline destination + # Use INSERT OR REPLACE to handle duplicate (dst, src, key) - database-enforced + for dst_pubkey in offline_dsts: + relay.db.exec(sql""" + INSERT OR REPLACE INTO message_recipient (message_id, dst, src, key) + VALUES (?, ?, ?, ?)""", + message_id, dst_pubkey, pubkey, cmd.send_key.DbBlob) + + relay.db.exec(sql"COMMIT") + except CatchableError: + relay.db.exec(sql"ROLLBACK") + raise diff --git a/tests/tproto2.nim b/tests/tproto2.nim index bfdf2b7..c8d7c81 100644 --- a/tests/tproto2.nim +++ b/tests/tproto2.nim @@ -676,6 +676,8 @@ suite "data": check carl2.pop(Data).data_val == "cider" test "drop recipient": + # When updating a keyed message to a subset of recipients, + # recipients not in the new send still get their old pending message let relay = testRelay() var alice = relay.authenticatedConn() var bob = relay.authenticatedConn() @@ -692,7 +694,7 @@ suite "data": relay.handleCommand(alice, RelayCommand( kind: SendData, send_key: "apple", - send_dst: @[bob.pk], + send_dst: @[bob.pk], # Carl not included in update send_val: "cider", )) block: @@ -700,7 +702,42 @@ suite "data": check bob2.pop(Data).data_val == "cider" block: var carl2 = relay.authenticatedConn(carl.keys) - check carl2.pop(Data).data_val == "core" + # Carl still has his original pending message + check carl2.pop(Data).data_val == "core" + + test "multiple senders same key": + # Different senders can send messages with the same key to the same recipient + let relay = testRelay() + var alice = relay.authenticatedConn() + var bob = relay.authenticatedConn() + var carl = relay.authenticatedConn() + relay.disconnect(carl) + + relay.handleCommand(alice, RelayCommand( + kind: SendData, + send_key: "status", + send_dst: @[carl.pk], + send_val: "alice_v1", + )) + relay.handleCommand(bob, RelayCommand( + kind: SendData, + send_key: "status", + send_dst: @[carl.pk], + send_val: "bob_v1", + )) + + var carl2 = relay.authenticatedConn(carl.keys) + # Carl should receive both messages, one from alice and one from bob + block: + let msg1 = carl2.pop(Data) + let msg2 = carl2.pop(Data) + # Order might vary, so check both possibilities + check ( + (msg1.data_src == alice.pk and msg1.data_val == "alice_v1" and + msg2.data_src == bob.pk and msg2.data_val == "bob_v1") or + (msg1.data_src == bob.pk and msg1.data_val == "bob_v1" and + msg2.data_src == alice.pk and msg2.data_val == "alice_v1") + ) suite "anon": From 56b91ede275c860e693318a8a64f18df6ab52643 Mon Sep 17 00:00:00 2001 From: Matt Haggard Date: Fri, 12 Dec 2025 13:01:13 -0500 Subject: [PATCH 2/3] Fix stats --- src/bucketsrelay/v2/server2.nim | 32 ++++++++++++++++++----- src/bucketsrelay/v2/templates/stats.nimja | 4 +-- 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/src/bucketsrelay/v2/server2.nim b/src/bucketsrelay/v2/server2.nim index f5b922e..a8cd936 100644 --- a/src/bucketsrelay/v2/server2.nim +++ b/src/bucketsrelay/v2/server2.nim @@ -200,10 +200,14 @@ router myrouter: # total stored let total_stored_note = relay.db.getRow(sql"SELECT coalesce(sum(length(data)), 0) FROM note").get()[0].i - let total_stored_message = relay.db.getRow(sql"SELECT sum(coalesce(length(data), 0) + coalesce(length(key), 0)) FROM message").get()[0].i + let total_stored_message = relay.db.getRow(sql""" + SELECT + COALESCE(SUM(LENGTH(md.data) + LENGTH(md.key)), 0) + FROM message_data md + """).get()[0].i let total_stored = total_stored_note + total_stored_message let num_note = relay.db.getRow(sql"SELECT coalesce(count(*), 0) FROM note").get()[0].i - let num_message = relay.db.getRow(sql"SELECT coalesce(count(*), 0) FROM message").get()[0].i + let num_message = relay.db.getRow(sql"SELECT coalesce(count(*), 0) FROM message_recipient").get()[0].i # top traffic by ip var traffic_by_ip: seq[TransferTotal] @@ -259,11 +263,25 @@ router myrouter: for row in relay.db.getAllRows(sql""" SELECT src, - SUM(COALESCE(LENGTH(data), 0) + COALESCE(LENGTH(key), 0)) AS msg_bytes - FROM - message - GROUP BY - src + SUM(total_bytes) AS total_bytes + FROM ( + -- Note storage + SELECT + src, + SUM(LENGTH(data)) AS total_bytes + FROM note + GROUP BY src + + UNION ALL + + -- Message data storage + SELECT + md.src, + SUM(LENGTH(md.data) + LENGTH(md.key)) AS total_bytes + FROM message_data md + GROUP BY md.src + ) + GROUP BY src ORDER BY 2 DESC LIMIT 10 """): diff --git a/src/bucketsrelay/v2/templates/stats.nimja b/src/bucketsrelay/v2/templates/stats.nimja index 518efe1..825def6 100644 --- a/src/bucketsrelay/v2/templates/stats.nimja +++ b/src/bucketsrelay/v2/templates/stats.nimja @@ -112,7 +112,7 @@ {% endfor %} -

Top storage by Pubkey

+

Top Storage by Pubkey

@@ -126,7 +126,7 @@ {% endfor %}
Pubkey
-

Top events by Pubkey

+

Top Events by Pubkey

From 69d868f1284c3ba3aecf50b2aca95800d934a7bb Mon Sep 17 00:00:00 2001 From: Matt Haggard Date: Fri, 12 Dec 2025 13:14:18 -0500 Subject: [PATCH 3/3] Fix test assertions --- tests/tproto2.nim | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/tests/tproto2.nim b/tests/tproto2.nim index c8d7c81..4fca179 100644 --- a/tests/tproto2.nim +++ b/tests/tproto2.nim @@ -727,17 +727,16 @@ suite "data": )) var carl2 = relay.authenticatedConn(carl.keys) - # Carl should receive both messages, one from alice and one from bob block: - let msg1 = carl2.pop(Data) - let msg2 = carl2.pop(Data) - # Order might vary, so check both possibilities - check ( - (msg1.data_src == alice.pk and msg1.data_val == "alice_v1" and - msg2.data_src == bob.pk and msg2.data_val == "bob_v1") or - (msg1.data_src == bob.pk and msg1.data_val == "bob_v1" and - msg2.data_src == alice.pk and msg2.data_val == "alice_v1") - ) + let msg = carl2.pop(Data) + check msg.data_key == "status" + check msg.data_src == alice.pk + check msg.data_val == "alice_v1" + block: + let msg = carl2.pop(Data) + check msg.data_key == "status" + check msg.data_src == bob.pk + check msg.data_val == "bob_v1" suite "anon":