Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/new-Notes-must-exist-20251203-140409.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Notes must exist before you can fetch them, now.
1 change: 1 addition & 0 deletions src/bucketsrelay/v2/objs.nim
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type
StorageLimitExceeded = 3
TransferLimitExceeeded = 4
InvalidParams = 5
NotFound = 6

RelayMessage* = object
resp_id*: int
Expand Down
68 changes: 11 additions & 57 deletions src/bucketsrelay/v2/proto2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,7 @@ proc updateSchema*(db: DbConn) =
store INTEGER DEFAULT 0,
PRIMARY KEY (period, ip, pubkey)
)""")

#----------- in-memory stuff
db.exec(sql"""CREATE TEMPORARY TABLE note_sub (
topic TEXT PRIMARY KEY,
pubkey TEXT NOT NULL
)""")
db.exec(sql"CREATE INDEX note_sub_pubkey ON note_sub(pubkey)")



#-------------------------------------------------------------------
# Relay code
Expand Down Expand Up @@ -318,7 +311,6 @@ proc initAuth*[T](relay: Relay[T], client: T): RelayConnection[T] =
proc disconnect*[T](relay: Relay[T], conn: RelayConnection[T]) =
if conn.pubkey.isSome:
let pubkey = conn.pubkey.get()
relay.db.exec(sql"DELETE FROM note_sub WHERE pubkey=?", pubkey)
relay.clients.del(pubkey)
info &"[{conn.pubkey.abbr}] disconnected"

Expand Down Expand Up @@ -416,21 +408,6 @@ proc delExpiredNotes(relay: Relay) =
let offstring = &"{offset} seconds"
relay.db.exec(sql"DELETE FROM note WHERE created <= datetime('now', ?)", offstring)

proc addNoteSub(relay: Relay, topic: string, pubkey: PublicKey) =
## Record that a pubkey is subscribed to a topic
try:
relay.db.exec(sql"INSERT INTO note_sub (topic, pubkey) VALUES (?,?)", topic.DbBlob, pubkey)
info &"[{pubkey.abbr}] sub {topic}"
except CatchableError:
raise ValueError.newException("Topic already subscribed")

proc getNoteSub(relay: Relay, topic: string): Option[PublicKey] =
## Return a PublicKey who is listening for a note by topic.
relay.delExpiredNotes()
let orow = relay.db.getRow(sql"SELECT pubkey FROM note_sub WHERE topic = ?", topic.DbBlob)
if orow.isSome:
return some(PublicKey.fromDB(orow.get()[0].b))

proc popNote(relay: Relay, topic: string): Option[string] =
let db = relay.db
relay.delExpiredNotes()
Expand All @@ -449,10 +426,6 @@ proc popNote(relay: Relay, topic: string): Option[string] =
warn &"[note] error " & getCurrentExceptionMsg()
db.exec(sql"ROLLBACK")

proc delNoteSub(relay: Relay, topic: string) =
relay.db.exec(sql"DELETE FROM note_sub WHERE topic = ?", topic.DbBlob)
info &"[note] del {topic}"

proc noteCount(relay: Relay, pubkey: PublicKey): int =
## Return the number of notes currently published by this ip
relay.db.getRow(sql"SELECT count(*) FROM note WHERE src = ?", pubkey).get()[0].i.int
Expand Down Expand Up @@ -572,34 +545,15 @@ proc handleCommand*[T](relay: Relay[T], conn: var RelayConnection[T], cmd: Relay
pubkey = pubkey,
publish = 1,
)
let opubkey = relay.getNoteSub(cmd.pub_topic)
if opubkey.isSome:
# someone is waiting
var other_conn = relay.clients[opubkey.get()]
conn.sendOkay(cmd)
other_conn.sendMessage(RelayMessage(
kind: Note,
resp_id: 0, # Not triggered by other_conn's command
note_data: cmd.pub_data,
note_topic: cmd.pub_topic,
))
relay.delNoteSub(cmd.pub_topic)
relay.db.record_transfer_stat(
ip = other_conn.ip,
pubkey = other_conn.pubkey.get(),
data_out = cmd.pub_data.len,
try:
relay.db.exec(sql"INSERT INTO note (topic, data, src) VALUES (?, ?, ?)",
cmd.pub_topic.DbBlob,
cmd.pub_data.DbBlob,
pubkey,
)
else:
# no one is waiting
try:
relay.db.exec(sql"INSERT INTO note (topic, data, src) VALUES (?, ?, ?)",
cmd.pub_topic.DbBlob,
cmd.pub_data.DbBlob,
pubkey,
)
conn.sendOkay(cmd)
except:
conn.sendError(cmd, "Duplicate topic", Generic)
conn.sendOkay(cmd)
except:
conn.sendError(cmd, "Duplicate topic", Generic)
of FetchNote:
if cmd.fetch_topic.len > RELAY_MAX_TOPIC_SIZE:
conn.sendError(cmd, "Topic too long", TooLarge)
Expand All @@ -620,8 +574,8 @@ proc handleCommand*[T](relay: Relay[T], conn: var RelayConnection[T], cmd: Relay
data_out = data.len,
)
else:
# the note isn't here yet
relay.addNoteSub(cmd.fetch_topic, conn.pubkey.get())
# the note doesn't exist
conn.sendError(cmd, "Topic not found", NotFound)
of SendData:
if cmd.send_val.len > RELAY_MAX_MESSAGE_SIZE:
conn.sendError(cmd, "Data too long", TooLarge)
Expand Down
25 changes: 14 additions & 11 deletions tests/tproto2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -245,17 +245,13 @@ suite "notes":
var alice = relay.authenticatedConn()
relay.handleCommand(alice, RelayCommand(
kind: FetchNote,
resp_id: 34,
fetch_topic: "heyo",
))
relay.handleCommand(alice, RelayCommand(
kind: PublishNote,
pub_topic: "heyo",
pub_data: "foo",
))
check alice.pop(Okay).ok_cmd == PublishNote
let note = alice.pop(Note)
check note.note_data == "foo"
check note.note_topic == "heyo"
let err = alice.pop(Error)
check err.err_cmd == FetchNote
check err.err_code == NotFound
check err.resp_id == 34

test "publish max size topic":
let relay = testRelay()
Expand Down Expand Up @@ -308,7 +304,9 @@ suite "notes":
kind: FetchNote,
fetch_topic: "topic",
))
check alice.msgCount == 0
let err = alice.pop(Error)
check err.err_cmd == FetchNote
check err.err_code == NotFound

test "fetch note again":
let relay = testRelay()
Expand All @@ -333,7 +331,9 @@ suite "notes":
kind: FetchNote,
fetch_topic: "sometopic",
))
check alice.msgCount == 0
let err = alice.pop(Error)
check err.err_cmd == FetchNote
check err.err_code == NotFound

test "sub then disconnect, the pub":
let relay = testRelay()
Expand All @@ -344,6 +344,9 @@ suite "notes":
kind: FetchNote,
fetch_topic: "foo",
))
let err = bob.pop(Error)
check err.err_cmd == FetchNote
check err.err_code == NotFound
relay.disconnect(bob)

relay.handleCommand(alice, RelayCommand(
Expand Down