From 8c6f7a3ca22ccc2892f09f2b78791f95a69be84b Mon Sep 17 00:00:00 2001 From: Matt Haggard Date: Wed, 3 Dec 2025 14:04:25 -0500 Subject: [PATCH 1/2] Notes must exist before you can fetch them, now. --- .../new-Notes-must-exist-20251203-140409.md | 1 + src/bucketsrelay/v2/objs.nim | 1 + src/bucketsrelay/v2/proto2.nim | 68 +++---------------- tests/tproto2.nim | 23 ++++--- 4 files changed, 25 insertions(+), 68 deletions(-) create mode 100644 changes/new-Notes-must-exist-20251203-140409.md diff --git a/changes/new-Notes-must-exist-20251203-140409.md b/changes/new-Notes-must-exist-20251203-140409.md new file mode 100644 index 0000000..f01702f --- /dev/null +++ b/changes/new-Notes-must-exist-20251203-140409.md @@ -0,0 +1 @@ +Notes must exist before you can fetch them, now. diff --git a/src/bucketsrelay/v2/objs.nim b/src/bucketsrelay/v2/objs.nim index f7714ec..272e3a8 100644 --- a/src/bucketsrelay/v2/objs.nim +++ b/src/bucketsrelay/v2/objs.nim @@ -46,6 +46,7 @@ type StorageLimitExceeded = 3 TransferLimitExceeeded = 4 InvalidParams = 5 + NotFound = 6 RelayMessage* = object resp_id*: int diff --git a/src/bucketsrelay/v2/proto2.nim b/src/bucketsrelay/v2/proto2.nim index 3be39f0..00bec58 100644 --- a/src/bucketsrelay/v2/proto2.nim +++ b/src/bucketsrelay/v2/proto2.nim @@ -240,14 +240,7 @@ proc updateSchema*(db: DbConn) = store INTEGER DEFAULT 0, PRIMARY KEY (period, ip, pubkey) )""") - - #----------- in-memory stuff - db.exec(sql"""CREATE TEMPORARY TABLE note_sub ( - topic TEXT PRIMARY KEY, - pubkey TEXT NOT NULL - )""") - db.exec(sql"CREATE INDEX note_sub_pubkey ON note_sub(pubkey)") - + #------------------------------------------------------------------- # Relay code @@ -318,7 +311,6 @@ proc initAuth*[T](relay: Relay[T], client: T): RelayConnection[T] = proc disconnect*[T](relay: Relay[T], conn: RelayConnection[T]) = if conn.pubkey.isSome: let pubkey = conn.pubkey.get() - relay.db.exec(sql"DELETE FROM note_sub WHERE pubkey=?", pubkey) relay.clients.del(pubkey) info &"[{conn.pubkey.abbr}] disconnected" @@ -416,21 +408,6 @@ proc delExpiredNotes(relay: Relay) = let offstring = &"{offset} seconds" relay.db.exec(sql"DELETE FROM note WHERE created <= datetime('now', ?)", offstring) -proc addNoteSub(relay: Relay, topic: string, pubkey: PublicKey) = - ## Record that a pubkey is subscribed to a topic - try: - relay.db.exec(sql"INSERT INTO note_sub (topic, pubkey) VALUES (?,?)", topic.DbBlob, pubkey) - info &"[{pubkey.abbr}] sub {topic}" - except CatchableError: - raise ValueError.newException("Topic already subscribed") - -proc getNoteSub(relay: Relay, topic: string): Option[PublicKey] = - ## Return a PublicKey who is listening for a note by topic. - relay.delExpiredNotes() - let orow = relay.db.getRow(sql"SELECT pubkey FROM note_sub WHERE topic = ?", topic.DbBlob) - if orow.isSome: - return some(PublicKey.fromDB(orow.get()[0].b)) - proc popNote(relay: Relay, topic: string): Option[string] = let db = relay.db relay.delExpiredNotes() @@ -449,10 +426,6 @@ proc popNote(relay: Relay, topic: string): Option[string] = warn &"[note] error " & getCurrentExceptionMsg() db.exec(sql"ROLLBACK") -proc delNoteSub(relay: Relay, topic: string) = - relay.db.exec(sql"DELETE FROM note_sub WHERE topic = ?", topic.DbBlob) - info &"[note] del {topic}" - proc noteCount(relay: Relay, pubkey: PublicKey): int = ## Return the number of notes currently published by this ip relay.db.getRow(sql"SELECT count(*) FROM note WHERE src = ?", pubkey).get()[0].i.int @@ -572,34 +545,15 @@ proc handleCommand*[T](relay: Relay[T], conn: var RelayConnection[T], cmd: Relay pubkey = pubkey, publish = 1, ) - let opubkey = relay.getNoteSub(cmd.pub_topic) - if opubkey.isSome: - # someone is waiting - var other_conn = relay.clients[opubkey.get()] - conn.sendOkay(cmd) - other_conn.sendMessage(RelayMessage( - kind: Note, - resp_id: 0, # Not triggered by other_conn's command - note_data: cmd.pub_data, - note_topic: cmd.pub_topic, - )) - relay.delNoteSub(cmd.pub_topic) - relay.db.record_transfer_stat( - ip = other_conn.ip, - pubkey = other_conn.pubkey.get(), - data_out = cmd.pub_data.len, + try: + relay.db.exec(sql"INSERT INTO note (topic, data, src) VALUES (?, ?, ?)", + cmd.pub_topic.DbBlob, + cmd.pub_data.DbBlob, + pubkey, ) - else: - # no one is waiting - try: - relay.db.exec(sql"INSERT INTO note (topic, data, src) VALUES (?, ?, ?)", - cmd.pub_topic.DbBlob, - cmd.pub_data.DbBlob, - pubkey, - ) - conn.sendOkay(cmd) - except: - conn.sendError(cmd, "Duplicate topic", Generic) + conn.sendOkay(cmd) + except: + conn.sendError(cmd, "Duplicate topic", Generic) of FetchNote: if cmd.fetch_topic.len > RELAY_MAX_TOPIC_SIZE: conn.sendError(cmd, "Topic too long", TooLarge) @@ -620,8 +574,8 @@ proc handleCommand*[T](relay: Relay[T], conn: var RelayConnection[T], cmd: Relay data_out = data.len, ) else: - # the note isn't here yet - relay.addNoteSub(cmd.fetch_topic, conn.pubkey.get()) + # the note doesn't exist + conn.sendError(cmd, "Topic not found", NotFound) of SendData: if cmd.send_val.len > RELAY_MAX_MESSAGE_SIZE: conn.sendError(cmd, "Data too long", TooLarge) diff --git a/tests/tproto2.nim b/tests/tproto2.nim index d5bcd37..801b909 100644 --- a/tests/tproto2.nim +++ b/tests/tproto2.nim @@ -247,15 +247,9 @@ suite "notes": kind: FetchNote, fetch_topic: "heyo", )) - relay.handleCommand(alice, RelayCommand( - kind: PublishNote, - pub_topic: "heyo", - pub_data: "foo", - )) - check alice.pop(Okay).ok_cmd == PublishNote - let note = alice.pop(Note) - check note.note_data == "foo" - check note.note_topic == "heyo" + let err = alice.pop(Error) + check err.err_cmd == FetchNote + check err.err_code == NotFound test "publish max size topic": let relay = testRelay() @@ -308,7 +302,9 @@ suite "notes": kind: FetchNote, fetch_topic: "topic", )) - check alice.msgCount == 0 + let err = alice.pop(Error) + check err.err_cmd == FetchNote + check err.err_code == NotFound test "fetch note again": let relay = testRelay() @@ -333,7 +329,9 @@ suite "notes": kind: FetchNote, fetch_topic: "sometopic", )) - check alice.msgCount == 0 + let err = alice.pop(Error) + check err.err_cmd == FetchNote + check err.err_code == NotFound test "sub then disconnect, the pub": let relay = testRelay() @@ -344,6 +342,9 @@ suite "notes": kind: FetchNote, fetch_topic: "foo", )) + let err = bob.pop(Error) + check err.err_cmd == FetchNote + check err.err_code == NotFound relay.disconnect(bob) relay.handleCommand(alice, RelayCommand( From 48c1eac32c06628165868674c10d02b59496037f Mon Sep 17 00:00:00 2001 From: Matt Haggard Date: Wed, 3 Dec 2025 14:07:08 -0500 Subject: [PATCH 2/2] Make sure fetch error has resp_id --- tests/tproto2.nim | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/tproto2.nim b/tests/tproto2.nim index 801b909..d08f1ab 100644 --- a/tests/tproto2.nim +++ b/tests/tproto2.nim @@ -245,11 +245,13 @@ suite "notes": var alice = relay.authenticatedConn() relay.handleCommand(alice, RelayCommand( kind: FetchNote, + resp_id: 34, fetch_topic: "heyo", )) let err = alice.pop(Error) check err.err_cmd == FetchNote check err.err_code == NotFound + check err.resp_id == 34 test "publish max size topic": let relay = testRelay()