Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/bucketsrelay/v2/objs.nim
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ const
RELAY_MAX_NOTES* = 1000
RELAY_NOTE_DURATION* = 5 * 24 * 60 * 60
RELAY_MAX_MESSAGE_SIZE* = 65536 * 2
RELAY_MAX_KEY_SIZE* = 4096
RELAY_MAX_KEY_SIZE* = 512
RELAY_MESSAGE_DURATION* = 30 * 24 * 60 * 60

const
Expand Down
103 changes: 72 additions & 31 deletions src/bucketsrelay/v2/proto2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -192,19 +192,42 @@ proc updateSchema*(db: DbConn) =
db.exec(sql"CREATE INDEX note_created ON note(created)")
db.exec(sql"CREATE INDEX note_src ON note(src)")

# message
db.exec(sql"""CREATE TABLE message (
# message - stores message data once
db.exec(sql"""CREATE TABLE message_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
key BLOB NOT NULL,
src TEXT NOT NULL,
dst TEXT NOT NULL,
src BLOB NOT NULL,
data BLOB NOT NULL
)""")
db.exec(sql"CREATE INDEX message_created ON message(created)")
db.exec(sql"""CREATE UNIQUE INDEX message_dst_key
ON message(dst, key)
WHERE key IS NOT x''
db.exec(sql"CREATE INDEX message_data_created ON message_data(created)")

# message recipients - tracks who needs to receive each message
db.exec(sql"""CREATE TABLE message_recipient (
message_id INTEGER NOT NULL,
dst BLOB NOT NULL,
src BLOB NOT NULL,
key BLOB NOT NULL,
PRIMARY KEY (message_id, dst),
FOREIGN KEY (message_id) REFERENCES message_data(id) ON DELETE CASCADE
)""")
db.exec(sql"CREATE INDEX message_recipient_dst ON message_recipient(dst)")
# Ensure only one message per (dst, src, key) when key is not empty
# Different senders can send messages with the same key to the same recipient
db.exec(sql"""CREATE UNIQUE INDEX message_recipient_dst_src_key
ON message_recipient(dst, src, key)
WHERE key != x''
""")

# Automatically delete message_data when last recipient is removed
db.exec(sql"""CREATE TRIGGER cleanup_message_data
AFTER DELETE ON message_recipient
WHEN NOT EXISTS (
SELECT 1 FROM message_recipient WHERE message_id = OLD.message_id
)
BEGIN
DELETE FROM message_data WHERE id = OLD.message_id;
END
""")

# stats
Expand Down Expand Up @@ -419,37 +442,33 @@ proc delExpiredMessages(relay: Relay) =
else:
-RELAY_MESSAGE_DURATION
let offstring = &"{offset} seconds"
relay.db.exec(sql"DELETE FROM message WHERE created <= datetime('now', ?)", offstring)
# Delete expired message data; CASCADE will automatically delete associated recipients
relay.db.exec(sql"DELETE FROM message_data WHERE created <= datetime('now', ?)", offstring)

proc nextMessage(relay: Relay, dst: SignPublicKey): Option[RelayMessage] =
let orow = relay.db.getRow(sql"""
SELECT key, src, data, id
FROM message
SELECT md.key, md.src, md.data, md.id
FROM message_recipient mr
JOIN message_data md ON mr.message_id = md.id
WHERE
dst = ?
mr.dst = ?
ORDER BY
created ASC,
id ASC
md.created ASC,
md.id ASC
LIMIT 1""", dst)
if orow.isSome:
let row = orow.get()
let message_id = row[3].i
result = some(RelayMessage(
kind: Data,
resp_id: 0, # Data messages are not triggered by recipient's command
data_key: row[0].b.string,
data_src: SignPublicKey.fromDB(row[1].b),
data_val: row[2].b.string,
))
relay.db.exec(sql"DELETE FROM message WHERE id=?", row[3].i)

proc delExpiredChunks(relay: Relay) =
let offset = when TESTMODE:
-RELAY_MESSAGE_DURATION + TIME_SKEW
else:
-RELAY_MESSAGE_DURATION
let offstring = &"{offset} seconds"
relay.db.exec(sql"DELETE FROM chunk WHERE last_used <= datetime('now', ?)", offstring)

# Delete this recipient entry (trigger will auto-cleanup message_data if this was the last recipient)
relay.db.exec(sql"DELETE FROM message_recipient WHERE message_id = ? AND dst = ?", message_id, dst)

#-------------------------------------------------------------------
# relay command handling
Expand Down Expand Up @@ -532,7 +551,7 @@ proc handleCommand*[T](relay: Relay[T], conn: var RelayConnection[T], cmd: Relay
pubkey,
)
conn.sendOkay(cmd)
except:
except CatchableError:
conn.sendError(cmd, "Duplicate topic", Generic)
of FetchNote:
if cmd.fetch_topic.len > RELAY_MAX_TOPIC_SIZE:
Expand Down Expand Up @@ -578,9 +597,12 @@ proc handleCommand*[T](relay: Relay[T], conn: var RelayConnection[T], cmd: Relay
pubkey = pubkey,
send = 1,
)

# Collect offline recipients
var offline_dsts: seq[SignPublicKey]
for dst_pubkey in cmd.send_dst:
if relay.clients.hasKey(dst_pubkey):
# dst is online
# dst is online - send directly
var other_conn = relay.clients[dst_pubkey]
other_conn.sendMessage(RelayMessage(
kind: Data,
Expand All @@ -595,9 +617,28 @@ proc handleCommand*[T](relay: Relay[T], conn: var RelayConnection[T], cmd: Relay
data_out = cmd.send_val.len + cmd.send_key.len,
)
else:
# dst is offline
relay.db.exec(sql"""
INSERT OR REPLACE INTO message
(key, src, dst, data)
VALUES (?, ?, ?, ?)""",
cmd.send_key.DbBlob, pubkey, dst_pubkey, cmd.send_val.DbBlob)
# dst is offline - collect for batch storage
offline_dsts.add(dst_pubkey)

# Store message data once for all offline recipients
if offline_dsts.len > 0:
relay.db.exec(sql"BEGIN")
try:
# Insert message data once and get the ID
let message_id = relay.db.insertID(sql"""
INSERT INTO message_data (key, src, data)
VALUES (?, ?, ?)""",
cmd.send_key.DbBlob, pubkey, cmd.send_val.DbBlob)

# Add recipient entries for each offline destination
# Use INSERT OR REPLACE to handle duplicate (dst, src, key) - database-enforced
for dst_pubkey in offline_dsts:
relay.db.exec(sql"""
INSERT OR REPLACE INTO message_recipient (message_id, dst, src, key)
VALUES (?, ?, ?, ?)""",
message_id, dst_pubkey, pubkey, cmd.send_key.DbBlob)

relay.db.exec(sql"COMMIT")
except CatchableError:
relay.db.exec(sql"ROLLBACK")
raise
32 changes: 25 additions & 7 deletions src/bucketsrelay/v2/server2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,14 @@ router myrouter:

# total stored
let total_stored_note = relay.db.getRow(sql"SELECT coalesce(sum(length(data)), 0) FROM note").get()[0].i
let total_stored_message = relay.db.getRow(sql"SELECT sum(coalesce(length(data), 0) + coalesce(length(key), 0)) FROM message").get()[0].i
let total_stored_message = relay.db.getRow(sql"""
SELECT
COALESCE(SUM(LENGTH(md.data) + LENGTH(md.key)), 0)
FROM message_data md
""").get()[0].i
let total_stored = total_stored_note + total_stored_message
let num_note = relay.db.getRow(sql"SELECT coalesce(count(*), 0) FROM note").get()[0].i
let num_message = relay.db.getRow(sql"SELECT coalesce(count(*), 0) FROM message").get()[0].i
let num_message = relay.db.getRow(sql"SELECT coalesce(count(*), 0) FROM message_recipient").get()[0].i

# top traffic by ip
var traffic_by_ip: seq[TransferTotal]
Expand Down Expand Up @@ -259,11 +263,25 @@ router myrouter:
for row in relay.db.getAllRows(sql"""
SELECT
src,
SUM(COALESCE(LENGTH(data), 0) + COALESCE(LENGTH(key), 0)) AS msg_bytes
FROM
message
GROUP BY
src
SUM(total_bytes) AS total_bytes
FROM (
-- Note storage
SELECT
src,
SUM(LENGTH(data)) AS total_bytes
FROM note
GROUP BY src

UNION ALL

-- Message data storage
SELECT
md.src,
SUM(LENGTH(md.data) + LENGTH(md.key)) AS total_bytes
FROM message_data md
GROUP BY md.src
)
GROUP BY src
ORDER BY 2 DESC
LIMIT 10
"""):
Expand Down
4 changes: 2 additions & 2 deletions src/bucketsrelay/v2/templates/stats.nimja
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@
{% endfor %}
</table>

<h2>Top storage by Pubkey</h2>
<h2>Top Storage by Pubkey</h2>
<table>
<tr>
<th>Pubkey</th>
Expand All @@ -126,7 +126,7 @@
{% endfor %}
</table>

<h2>Top events by Pubkey</h2>
<h2>Top Events by Pubkey</h2>
<div class="rows">
<div>
<table>
Expand Down
40 changes: 38 additions & 2 deletions tests/tproto2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,8 @@ suite "data":
check carl2.pop(Data).data_val == "cider"

test "drop recipient":
# When updating a keyed message to a subset of recipients,
# recipients not in the new send still get their old pending message
let relay = testRelay()
var alice = relay.authenticatedConn()
var bob = relay.authenticatedConn()
Expand All @@ -692,15 +694,49 @@ suite "data":
relay.handleCommand(alice, RelayCommand(
kind: SendData,
send_key: "apple",
send_dst: @[bob.pk],
send_dst: @[bob.pk], # Carl not included in update
send_val: "cider",
))
block:
var bob2 = relay.authenticatedConn(bob.keys)
check bob2.pop(Data).data_val == "cider"
block:
var carl2 = relay.authenticatedConn(carl.keys)
check carl2.pop(Data).data_val == "core"
# Carl still has his original pending message
check carl2.pop(Data).data_val == "core"

test "multiple senders same key":
# Different senders can send messages with the same key to the same recipient
let relay = testRelay()
var alice = relay.authenticatedConn()
var bob = relay.authenticatedConn()
var carl = relay.authenticatedConn()
relay.disconnect(carl)

relay.handleCommand(alice, RelayCommand(
kind: SendData,
send_key: "status",
send_dst: @[carl.pk],
send_val: "alice_v1",
))
relay.handleCommand(bob, RelayCommand(
kind: SendData,
send_key: "status",
send_dst: @[carl.pk],
send_val: "bob_v1",
))

var carl2 = relay.authenticatedConn(carl.keys)
block:
let msg = carl2.pop(Data)
check msg.data_key == "status"
check msg.data_src == alice.pk
check msg.data_val == "alice_v1"
block:
let msg = carl2.pop(Data)
check msg.data_key == "status"
check msg.data_src == bob.pk
check msg.data_val == "bob_v1"

suite "anon":

Expand Down