diff --git a/README.md b/README.md index 383ac09..0447336 100644 --- a/README.md +++ b/README.md @@ -59,10 +59,6 @@ Clients send the following commands: | `PublishNote` | Send a few bytes to another client addressed by topic (good for key exchange) | | `FetchNote` | Request a note addressed by topic | | `SendData` | Store/forward bytes to other clients, addressed by relay-authenticated public keys | -| `StoreChunk` | Store bytes for other clients to fetch addressed by key and public key. | -| `GetChunk` | Request stored chunk | -| `ChunksPresent` | Ask which chunks exist | - ### Server Events @@ -75,8 +71,6 @@ The relay server sends the following events: | `Who` | Challenge for authenticating a client's public/private keys and spam mitigation | | `Note` | Data payload of a note requested by `FetchNote` | | `Data` | Data payload from another client, addressed by relay-authenticated public key | -| `Chunk` | Data payload response to `GetChunk` request | -| `ChunkStatus` | Response to `ChunksPresent` indicating which chunks exist/don't | ### Authentication @@ -102,13 +96,12 @@ Client Relay ### Data -There are 3 ways clients can exchange data: +There are 2 ways clients can exchange data: 1. Notes - public notes that are accessed by knowing the note *topic*. Notes are a good way to do key exchange. Notes expire after a short time. -2. Messages - ordered, stored-and-forwarded messages sent from one client to another client. These are automatically sent to a client upon connection, and deleted when sent. Messages expire after a while. -3. Chunks - clients store chunks with a string *key* and choose which clients (by their public key) are allowed to fetch uploaded chunks. Chunks may be overwritten. Chunks expire a while after their last update. +2. Messages - ordered, stored-and-forwarded messages sent from one client to another client. These are automatically sent to a client upon connection, and deleted when sent. Messages expire after a while. Messages with a non-blank key will overwrite undelivered messages with the same key. -All forms of exchanging data are unreliable. Build with that in mind. +Build relay clients with the understanding that all forms of exchanging data through this relay are unreliable. #### Notes @@ -146,6 +139,33 @@ Alice Relay Bob │ │ │ ``` +#### Messages with keys + +1. Alice sends `SendData(dst=BOBPK, key=apple, data=core)` +2. Alice sends `SendData(dst=BOBPK, key=banana, data=boat)` +3. Alice sends `SendData(dst=BOBPK, key=apple, data=pie)` (replacing prior `key=apple` message) +4. Bob connects +5. Server sends to Bob `Data(src=ALICEPK, key=banana, data=boat)` +6. Server sends to Bob `Data(src=ALICEPK, key=apple, data=pie)` + +``` +Alice Relay Bob + │ │ │ + ├───Authenticated─┤ │ + │ │ │ + │ SendData(Bob,1) │ │ + ├────────────────►│ │ + │ SendData(Bob,2) │ │ + ├────────────────►│ │ + │ SendData(Bob,1) │ │ + ├────────────────►│ │ + │ │ Data(Alice, 2) │ + │ ├────────────────►│ + │ │ Data(Alice, 1) │ + │ ├────────────────►│ + │ │ │ +``` + #### Chunks 1. Alice sends `StoreChunk(dst=[BOBPK], key=apple, val=seed)` diff --git a/changes/new-Chunks-are-gone-20251210-102219.md b/changes/new-Chunks-are-gone-20251210-102219.md new file mode 100644 index 0000000..f53efcb --- /dev/null +++ b/changes/new-Chunks-are-gone-20251210-102219.md @@ -0,0 +1 @@ +Chunks are gone in favor of overwriteable Data messages diff --git a/src/bucketsrelay/v2/cli.nim b/src/bucketsrelay/v2/cli.nim index 559a5cf..057e01e 100644 --- a/src/bucketsrelay/v2/cli.nim +++ b/src/bucketsrelay/v2/cli.nim @@ -115,48 +115,22 @@ proc doCommand(client: NetstringClient, full: seq[string], ctx: var CmdContext) if dst.string == "": dst = SignPublicKey.deserialize(i.use(args)) let val = i.use(args) - waitFor client.sendData(dst, val) + let key = if i < args.len: + i.use(args) + else: + "" + waitFor client.sendData(@[dst], val, key) of "recv": let data = waitFor client.getData() echo data - of "store": - var dst = ctx.dst - if dst.string == "" or args.len >= 3: - dst = SignPublicKey.deserialize(i.use(args)) - echo "Using key=" & dst.nice - let key = i.use(args) - let val = i.use(args) - waitFor client.storeChunk(@[dst], key, val) - of "get": - var src = ctx.dst - if src.string == "" or args.len >= 2: - src = SignPublicKey.deserialize(i.use(args)) - echo "Using key=" & src.serialize - let key = i.use(args) - let odata = waitFor client.getChunk(src, key) - if odata.isSome: - echo odata.get() - else: - echo "(none)" - of "has": - var src = ctx.dst - if src.string == "" or args.len >= 2: - src = SignPublicKey.deserialize(i.use(args)) - echo "Using key=" & src.serialize - let key = i.use(args) - let res = waitFor client.hasChunk(src, key) - echo $res of "help": echo """ post TOPIC DATA fetch TOPIC dst PUBKEY - Set the destination PUBKEY for future commands - send [PUBKEY] DATA + Set the destination PUBKEY for future sends + send [PUBKEY] DATA [KEY] recv - store [PUBKEY] KEY VAL - get [PUBKEY] KEY - has [PUBKEY] KEY help """ else: diff --git a/src/bucketsrelay/v2/objs.nim b/src/bucketsrelay/v2/objs.nim index e801027..70e6bea 100644 --- a/src/bucketsrelay/v2/objs.nim +++ b/src/bucketsrelay/v2/objs.nim @@ -7,7 +7,7 @@ ## This file should be kept free of dependencies other than the stdlib ## and should not include async stuff ## as it's meant to be referenced by outside libraries that may -## want to do things there own way. +## want to do things their own way. import std/hashes import std/options @@ -36,8 +36,6 @@ type Error Note Data - Chunk - ChunkStatus ErrorCode* = enum Generic = 0 @@ -63,25 +61,15 @@ type note_topic*: string note_data*: string of Data: + data_key*: string data_src*: SignPublicKey data_val*: string - of Chunk: - chunk_src*: SignPublicKey - chunk_key*: string - chunk_val*: Option[string] - of ChunkStatus: - status_src*: SignPublicKey - present*: seq[string] - absent*: seq[string] CommandKind* = enum Iam PublishNote FetchNote SendData - StoreChunk - GetChunks - HasChunks RelayCommand* = object resp_id*: int @@ -95,30 +83,18 @@ type of FetchNote: fetch_topic*: string of SendData: - send_dst*: SignPublicKey + send_key*: string + send_dst*: seq[SignPublicKey] send_val*: string - of StoreChunk: - chunk_dst*: seq[SignPublicKey] - chunk_key*: string - chunk_val*: string - of GetChunks: - chunk_src*: SignPublicKey - chunk_keys*: seq[string] - of HasChunks: - has_src*: SignPublicKey - has_keys*: seq[string] const RELAY_MAX_TOPIC_SIZE* = 512 - RELAY_MAX_NOTE_SIZE* = 4096 + RELAY_MAX_NOTE_SIZE* = 4096 * 2 RELAY_MAX_NOTES* = 1000 RELAY_NOTE_DURATION* = 5 * 24 * 60 * 60 - RELAY_MAX_MESSAGE_SIZE* = 4096 - RELAY_MAX_CHUNK_KEY_SIZE* = 4096 - RELAY_MAX_CHUNK_SIZE* = 65536 - RELAY_MAX_CHUNK_DSTS* = 32 + RELAY_MAX_MESSAGE_SIZE* = 65536 * 2 + RELAY_MAX_KEY_SIZE* = 4096 RELAY_MESSAGE_DURATION* = 30 * 24 * 60 * 60 - RELAY_PUBKEY_MEMORY_SECONDS* = 60 * 24 * 60 * 60 const nicestart = 'a' # '!' @@ -175,17 +151,9 @@ proc `$`*(msg: RelayMessage): string = of Error: result.add &"cmd={msg.err_cmd} code={msg.err_code} msg={msg.err_message.nice}" of Note: - result.add &"'{msg.note_topic.nice}' val={msg.note_data.nicelong}" + result.add &"'{msg.note_topic.nice.abbr}' val={msg.note_data.nicelong}" of Data: - result.add &"{msg.data_src.nice.abbr} val={msg.data_val.nicelong}" - of Chunk: - result.add &"{msg.chunk_src.nice.abbr} {msg.chunk_key.nice.abbr}={msg.chunk_val.nicelong}" - of ChunkStatus: - result.add &"{msg.status_src.nice.abbr} present=[" - result.add msg.present.mapIt(it.nice.abbr).join(", ") - result.add "] absent=[" - result.add msg.absent.mapIt(it.nice.abbr).join(", ") - result.add "]" + result.add &"'{msg.data_key.nice.abbr}' src={msg.data_src.nice.abbr} val={msg.data_val.nicelong}" result.add ")" proc `==`*(a, b: RelayMessage): bool = @@ -202,11 +170,7 @@ proc `==`*(a, b: RelayMessage): bool = of Note: return a.note_data == b.note_data and a.note_topic == b.note_topic of Data: - return a.data_src == b.data_src and a.data_val == b.data_val - of Chunk: - return a.chunk_src == b.chunk_src and a.chunk_key == b.chunk_key and a.chunk_val == b.chunk_val - of ChunkStatus: - return a.status_src == b.status_src and a.present == b.present and a.absent == b.absent + return a.data_src == b.data_src and a.data_val == b.data_val and a.data_key == b.data_key proc `$`*(cmd: RelayCommand): string = result.add $cmd.kind & "(" @@ -218,19 +182,8 @@ proc `$`*(cmd: RelayCommand): string = of FetchNote: result.add &"'{cmd.fetch_topic.nice.abbr}'" of SendData: - result.add &"{cmd.send_dst.nice.abbr} val={cmd.send_val.nicelong}" - of StoreChunk: - result.add &"{cmd.chunk_key.nice.abbr}={cmd.chunk_val.nicelong} dst=[" - result.add cmd.chunk_dst.mapIt(it.nice.abbr).join(", ") - result.add "]" - of GetChunks: - result.add &"{cmd.chunk_src.nice.abbr} keys=[" - result.add cmd.chunk_keys.mapIt(it.nice.abbr).join(", ") - result.add "]" - of HasChunks: - result.add &"{cmd.has_src.nice.abbr} keys=[" - result.add cmd.has_keys.mapIt(it.nice.abbr).join(", ") - result.add "]" + result.add &"key={cmd.send_key.nice.abbr} val={cmd.send_val.nicelong} " + result.add cmd.send_dst.mapIt(it.nice.abbr).join(", ") result.add ")" proc `==`*(a, b: RelayCommand): bool = @@ -245,13 +198,7 @@ proc `==`*(a, b: RelayCommand): bool = of FetchNote: return a.fetch_topic == b.fetch_topic of SendData: - return a.send_dst == b.send_dst and a.send_val == b.send_val - of StoreChunk: - return a.chunk_dst == b.chunk_dst and a.chunk_key == b.chunk_key and a.chunk_val == b.chunk_val - of GetChunks: - return a.chunk_src == b.chunk_src and a.chunk_keys == b.chunk_keys - of HasChunks: - return a.has_src == b.has_src and a.has_keys == b.has_keys + return a.send_dst == b.send_dst and a.send_val == b.send_val and a.send_key == b.send_key #-------------------------------------------------------------- # serialization @@ -331,8 +278,6 @@ proc serialize*(kind: MessageKind): char = of Error: '-' of Note: 'n' of Data: 'd' - of Chunk: 'k' - of ChunkStatus: 's' proc deserialize*(kind: typedesc[MessageKind], val: char): MessageKind = case val @@ -341,8 +286,6 @@ proc deserialize*(kind: typedesc[MessageKind], val: char): MessageKind = of '-': Error of 'n': Note of 'd': Data - of 'k': Chunk - of 's': ChunkStatus else: raise ValueError.newException("Unknown MessageKind: " & val) proc serialize*(kind: CommandKind): char = @@ -351,9 +294,6 @@ proc serialize*(kind: CommandKind): char = of PublishNote: 'p' of FetchNote: 'f' of SendData: 's' - of StoreChunk: 'c' - of GetChunks: 'g' - of HasChunks: 't' proc deserialize*(kind: typedesc[CommandKind], val: char): CommandKind = case val: @@ -361,9 +301,6 @@ proc deserialize*(kind: typedesc[CommandKind], val: char): CommandKind = of 'p': PublishNote of 'f': FetchNote of 's': SendData - of 'c': StoreChunk - of 'g': GetChunks - of 't': HasChunks else: raise ValueError.newException("Unknown CommandKind: " & val) proc serialize*(err: ErrorCode): char = @@ -438,18 +375,9 @@ proc serialize*(msg: RelayMessage): string = result &= msg.note_topic.nsencode result &= msg.note_data.nsencode of Data: + result &= msg.data_key.nsencode result &= msg.data_src.string.nsencode result &= msg.data_val.nsencode - of Chunk: - result &= msg.chunk_src.string.nsencode - result &= msg.chunk_key.nsencode - if msg.chunk_val.isSome: - result &= msg.chunk_val.get().nsencode - of ChunkStatus: - result &= msg.status_src.string.nsencode - result &= nsencode(msg.present.serialize()) - result &= nsencode(msg.absent.serialize()) - proc deserialize*(typ: typedesc[RelayMessage], s: string): RelayMessage = if s.len == 0: @@ -486,28 +414,10 @@ proc deserialize*(typ: typedesc[RelayMessage], s: string): RelayMessage = return RelayMessage( kind: Data, resp_id: resp_id, + data_key: s.nsdecode(idx), data_src: s.nsdecode(idx).SignPublicKey, data_val: s.nsdecode(idx), ) - of Chunk: - return RelayMessage( - kind: Chunk, - resp_id: resp_id, - chunk_src: s.nsdecode(idx).SignPublicKey, - chunk_key: s.nsdecode(idx), - chunk_val: if idx >= s.len: - none[string]() - else: - some(s.nsdecode(idx)), - ) - of ChunkStatus: - return RelayMessage( - kind: ChunkStatus, - resp_id: resp_id, - status_src: s.nsdecode(idx).SignPublicKey, - present: deserialize(seq[string], s.nsdecode(idx)), - absent: deserialize(seq[string], s.nsdecode(idx)), - ) proc serialize*(cmd: RelayCommand): string = result &= cmd.kind.serialize @@ -522,18 +432,9 @@ proc serialize*(cmd: RelayCommand): string = of FetchNote: result &= cmd.fetch_topic.nsencode of SendData: - result &= cmd.send_dst.string.nsencode + result &= cmd.send_key.nsencode + result &= nsencode(cmd.send_dst.serialize()) result &= cmd.send_val.nsencode - of StoreChunk: - result &= nsencode(cmd.chunk_dst.serialize()) - result &= cmd.chunk_key.nsencode - result &= cmd.chunk_val.nsencode - of GetChunks: - result &= cmd.chunk_src.string.nsencode - result &= nsencode(cmd.chunk_keys.serialize()) - of HasChunks: - result &= cmd.has_src.string.nsencode - result &= nsencode(cmd.has_keys.serialize()) proc deserialize*(typ: typedesc[RelayCommand], s: string): RelayCommand = if s.len == 0: @@ -567,28 +468,7 @@ proc deserialize*(typ: typedesc[RelayCommand], s: string): RelayCommand = return RelayCommand( kind: SendData, resp_id: resp_id, - send_dst: s.nsdecode(idx).SignPublicKey, + send_key: s.nsdecode(idx), + send_dst: deserializePubKeys(s.nsdecode(idx)), send_val: s.nsdecode(idx), ) - of StoreChunk: - return RelayCommand( - kind: StoreChunk, - resp_id: resp_id, - chunk_dst: deserializePubKeys(s.nsdecode(idx)), - chunk_key: s.nsdecode(idx), - chunk_val: s.nsdecode(idx), - ) - of GetChunks: - return RelayCommand( - kind: GetChunks, - resp_id: resp_id, - chunk_src: s.nsdecode(idx).SignPublicKey, - chunk_keys: deserialize(seq[string], s.nsdecode(idx)), - ) - of HasChunks: - return RelayCommand( - kind: HasChunks, - resp_id: resp_id, - has_src: s.nsdecode(idx).SignPublicKey, - has_keys: deserialize(seq[string], s.nsdecode(idx)), - ) diff --git a/src/bucketsrelay/v2/proto2.nim b/src/bucketsrelay/v2/proto2.nim index e839a9e..fe97346 100644 --- a/src/bucketsrelay/v2/proto2.nim +++ b/src/bucketsrelay/v2/proto2.nim @@ -17,7 +17,7 @@ import libsodium/sodium_sizes import ./objs; export objs -const LOG_COMMS* = not defined(release) +const LOG_COMMS* = not defined(release) or defined(relaynologcomms) const TESTMODE = defined(testmode) and not defined(release) type @@ -196,29 +196,16 @@ proc updateSchema*(db: DbConn) = db.exec(sql"""CREATE TABLE message ( id INTEGER PRIMARY KEY AUTOINCREMENT, created TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + key BLOB NOT NULL, src TEXT NOT NULL, dst TEXT NOT NULL, data BLOB NOT NULL )""") db.exec(sql"CREATE INDEX message_created ON message(created)") - db.exec(sql"CREATE INDEX message_dst ON message(dst)") - - # chunks - db.exec(sql"""CREATE TABLE chunk ( - src TEXT NOT NULL, - key TEXT NOT NULL, - last_used TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - val BLOB NOT NULL, - PRIMARY KEY (src, key) - )""") - db.exec(sql"CREATE INDEX chunk_last_used ON chunk(last_used)") - db.exec(sql"""CREATE TABLE chunk_dst ( - src TEXT NOT NULL, - key TEXT NOT NULL, - dst TEXT NOT NULL, - PRIMARY KEY (src, key, dst), - FOREIGN KEY (src, key) REFERENCES chunk(src, key) ON DELETE CASCADE - )""") + db.exec(sql"""CREATE UNIQUE INDEX message_dst_key + ON message(dst, key) + WHERE key IS NOT x'' + """) # stats db.exec(sql"""CREATE TABLE stats_transfer ( @@ -237,7 +224,6 @@ proc updateSchema*(db: DbConn) = connect INTEGER DEFAULT 0, publish INTEGER DEFAULT 0, send INTEGER DEFAULT 0, - store INTEGER DEFAULT 0, PRIMARY KEY (period, ip, pubkey) )""") @@ -348,22 +334,15 @@ when TESTMODE: data_out = data_out + excluded.data_out; """, ip, pubkey, period, data_in, data_out) -proc record_event_stat*(db: DbConn, ip: string, pubkey: SignPublicKey, connect = 0, publish = 0, send = 0, store = 0) = +proc record_event_stat*(db: DbConn, ip: string, pubkey: SignPublicKey, connect = 0, publish = 0, send = 0) = db.exec(sql""" - INSERT INTO stats_event (ip, pubkey, connect, publish, send, store) - VALUES (?, ?, ?, ?, ?, ?) + INSERT INTO stats_event (ip, pubkey, connect, publish, send) + VALUES (?, ?, ?, ?, ?) ON CONFLICT(period, ip, pubkey) DO UPDATE SET connect = connect + excluded.connect, publish = publish + excluded.publish, - send = send + excluded.send, - store = store + excluded.store - """, ip, pubkey, connect, publish, send, store) - -proc chunk_space_used*(db: DbConn, pubkey: SignPublicKey): int = - ## Return the amount of space being used by the given public key - db.getRow(sql""" - SELECT coalesce(sum(length(val)), 0) FROM chunk WHERE src = ? - """, pubkey).get()[0].i.int + send = send + excluded.send + """, ip, pubkey, connect, publish, send) proc current_data_in*(db: DbConn, pubkey: SignPublicKey): int = ## Return the amount of data that has been transferred in by the given @@ -444,7 +423,7 @@ proc delExpiredMessages(relay: Relay) = proc nextMessage(relay: Relay, dst: SignPublicKey): Option[RelayMessage] = let orow = relay.db.getRow(sql""" - SELECT src, data, id + SELECT key, src, data, id FROM message WHERE dst = ? @@ -457,10 +436,11 @@ proc nextMessage(relay: Relay, dst: SignPublicKey): Option[RelayMessage] = result = some(RelayMessage( kind: Data, resp_id: 0, # Data messages are not triggered by recipient's command - data_src: SignPublicKey.fromDB(row[0].b), - data_val: row[1].b.string, + data_key: row[0].b.string, + data_src: SignPublicKey.fromDB(row[1].b), + data_val: row[2].b.string, )) - relay.db.exec(sql"DELETE FROM message WHERE id=?", row[2].i) + relay.db.exec(sql"DELETE FROM message WHERE id=?", row[3].i) proc delExpiredChunks(relay: Relay) = let offset = when TESTMODE: @@ -521,7 +501,7 @@ proc handleCommand*[T](relay: Relay[T], conn: var RelayConnection[T], cmd: Relay relay.db.record_transfer_stat( ip = conn.ip, pubkey = pubkey, - data_out = msg.data_val.len, + data_out = msg.data_val.len + msg.data_key.len, ) else: break @@ -579,7 +559,9 @@ proc handleCommand*[T](relay: Relay[T], conn: var RelayConnection[T], cmd: Relay of SendData: if cmd.send_val.len > RELAY_MAX_MESSAGE_SIZE: conn.sendError(cmd, "Data too long", TooLarge) - elif not cmd.send_dst.is_valid(): + elif cmd.send_key.len > RELAY_MAX_KEY_SIZE: + conn.sendError(cmd, "Key too long", TooLarge) + elif cmd.send_dst.any_invalid(): conn.sendError(cmd, "Invalid pubkey", InvalidParams) else: let pubkey = conn.pubkey.get() @@ -589,150 +571,33 @@ proc handleCommand*[T](relay: Relay[T], conn: var RelayConnection[T], cmd: Relay relay.db.record_transfer_stat( ip = conn.ip, pubkey = pubkey, - data_in = cmd.send_val.len, + data_in = cmd.send_val.len + cmd.send_key.len, ) relay.db.record_event_stat( ip = conn.ip, pubkey = pubkey, send = 1, ) - if relay.clients.hasKey(cmd.send_dst): - # dst is online - var other_conn = relay.clients[cmd.send_dst] - other_conn.sendMessage(RelayMessage( - kind: Data, - resp_id: 0, # Not triggered by other_conn's command - data_src: pubkey, - data_val: cmd.send_val, - )) - relay.db.record_transfer_stat( - ip = other_conn.ip, - pubkey = other_conn.pubkey.get(), - data_in = cmd.send_val.len, - ) - else: - # dst is offline - relay.db.exec(sql"INSERT INTO message (src, dst, data) VALUES (?, ?, ?)", - pubkey, cmd.send_dst, cmd.send_val.DbBlob) - of StoreChunk: - if cmd.chunk_key.len > RELAY_MAX_CHUNK_KEY_SIZE: - conn.sendError(cmd, "Key too long", TooLarge) - elif cmd.chunk_val.len > RELAY_MAX_CHUNK_SIZE: - conn.sendError(cmd, "Value too long", TooLarge) - elif cmd.chunk_dst.len > RELAY_MAX_CHUNK_DSTS: - conn.sendError(cmd, "Too many recipients", TooLarge) - elif cmd.chunk_dst.any_invalid(): - conn.sendError(cmd, "Invalid pubkey", InvalidParams) - else: - let pubkey = conn.pubkey.get() - if relay.max_chunk_space > 0 and relay.db.chunk_space_used(pubkey) > relay.max_chunk_space: - conn.sendError(cmd, "Too much chunk data", StorageLimitExceeded) - else: - relay.db.record_event_stat( - ip = conn.ip, - pubkey = pubkey, - store = 1, - ) - relay.db.exec(sql"BEGIN") - try: - relay.db.exec(sql"DELETE FROM chunk_dst WHERE src=? AND key=?", pubkey, cmd.chunk_key.DbBlob) - let offset = when TESTMODE: - $TIME_SKEW & " seconds" - else: - "0 seconds" - relay.db.exec(sql""" - INSERT OR REPLACE INTO chunk (last_used, src, key, val) - VALUES (datetime('now', ?), ?, ?, ?) - """, offset, pubkey, cmd.chunk_key.DbBlob, cmd.chunk_val.DbBlob) - var dsts: seq[SignPublicKey] - dsts.add(cmd.chunk_dst) - if pubkey notin dsts: - dsts.add(pubkey) - for dst in dsts: - relay.db.exec(sql"INSERT INTO chunk_dst (src, key, dst) VALUES (?, ?, ?)", - pubkey, cmd.chunk_key.DbBlob, dst) - relay.db.exec(sql"COMMIT") - except CatchableError: - relay.db.exec(sql"ROLLBACK") - of GetChunks: - for key in cmd.chunk_keys: - if key.len > RELAY_MAX_CHUNK_KEY_SIZE: - conn.sendError(cmd, "Key too long", TooLarge) - return - relay.delExpiredChunks() - let pubkey = conn.pubkey.get() - for key in cmd.chunk_keys: - let orow = relay.db.getRow(sql""" - SELECT - c.val - FROM - chunk_dst AS d - JOIN chunk AS c - ON d.src = c.src - AND d.key = c.key - WHERE - d.src = ? - AND d.key = ? - AND d.dst = ? - """, cmd.chunk_src, key.DbBlob, pubkey) - if orow.isSome: - let row = orow.get() - conn.sendMessage(RelayMessage( - kind: Chunk, - resp_id: cmd.resp_id, - chunk_src: cmd.chunk_src, - chunk_key: key, - chunk_val: some(row[0].b.string), - )) - else: - conn.sendMessage(RelayMessage( - kind: Chunk, - resp_id: cmd.resp_id, - chunk_src: cmd.chunk_src, - chunk_key: key, - chunk_val: none[string](), - )) - of HasChunks: - for key in cmd.has_keys: - if key.len > RELAY_MAX_CHUNK_KEY_SIZE: - conn.sendError(cmd, "Key too long", TooLarge) - return - relay.delExpiredChunks() - let pubkey = conn.pubkey.get() - var present: seq[string] - var absent: seq[string] - for key in cmd.has_keys: - let orow = relay.db.getRow(sql""" - SELECT - 1 - FROM - chunk_dst AS d - JOIN chunk AS c - ON d.src = c.src - AND d.key = c.key - WHERE - d.src = ? - AND d.key = ? - AND d.dst = ? - """, cmd.has_src, key.DbBlob, pubkey) - if orow.isSome: - present.add(key) - if cmd.has_src == pubkey: - # reset the expiration of the chunk, since the owner - # is touching it - let offset = when TESTMODE: - $TIME_SKEW & " seconds" - else: - "0 seconds" - relay.db.exec(sql""" - UPDATE chunk SET last_used = datetime('now', ?) WHERE src = ? AND key = ? - """, offset, cmd.has_src, key.DbBlob) - else: - absent.add(key) - conn.sendMessage(RelayMessage( - kind: ChunkStatus, - resp_id: cmd.resp_id, - status_src: cmd.has_src, - present: present, - absent: absent, - )) + for dst_pubkey in cmd.send_dst: + if relay.clients.hasKey(dst_pubkey): + # dst is online + var other_conn = relay.clients[dst_pubkey] + other_conn.sendMessage(RelayMessage( + kind: Data, + resp_id: 0, # Not triggered by other_conn's command + data_key: cmd.send_key, + data_src: pubkey, + data_val: cmd.send_val, + )) + relay.db.record_transfer_stat( + ip = other_conn.ip, + pubkey = other_conn.pubkey.get(), + data_out = cmd.send_val.len + cmd.send_key.len, + ) + else: + # dst is offline + relay.db.exec(sql""" + INSERT OR REPLACE INTO message + (key, src, dst, data) + VALUES (?, ?, ?, ?)""", + cmd.send_key.DbBlob, pubkey, dst_pubkey, cmd.send_val.DbBlob) diff --git a/src/bucketsrelay/v2/sampleclient.nim b/src/bucketsrelay/v2/sampleclient.nim index f3e3f3a..4589901 100644 --- a/src/bucketsrelay/v2/sampleclient.nim +++ b/src/bucketsrelay/v2/sampleclient.nim @@ -76,48 +76,17 @@ proc fetchNote*(ns: NetstringClient, topic: string): Future[string] {.async.} = else: raise ValueError.newException("No such note: " & topic) -proc sendData*(ns: NetstringClient, dst: SignPublicKey, val: string) {.async.} = +proc sendData*(ns: NetstringClient, dst: seq[SignPublicKey], val: string, key: string) {.async.} = await ns.sendCommand(RelayCommand( kind: SendData, + send_key: key, send_dst: dst, send_val: val, )) -proc getData*(ns: NetstringClient): Future[string] {.async.} = +proc getData*(ns: NetstringClient): Future[tuple[key: string, val: string]] {.async.} = let res = await ns.receiveMessage() if res.kind == Data: - return res.data_val + return (res.data_key, res.data_val) else: raise ValueError.newException("Expecting Data but got: " & $res) - -proc storeChunk*(ns: NetstringClient, dsts: seq[SignPublicKey], key: string, val: string) {.async.} = - await ns.sendCommand(RelayCommand( - kind: StoreChunk, - chunk_dst: dsts, - chunk_key: key, - chunk_val: val, - )) - -proc getChunk*(ns: NetstringClient, src: SignPublicKey, key: string): Future[Option[string]] {.async.} = - await ns.sendCommand(RelayCommand( - kind: GetChunks, - chunk_src: src, - chunk_keys: @[key], - )) - let res = await ns.receiveMessage() - if res.kind == Chunk: - return res.chunk_val - else: - raise ValueError.newException("Expecting Chunk but got: " & $res) - -proc hasChunk*(ns: NetstringClient, src: SignPublicKey, key: string): Future[bool] {.async.} = - await ns.sendCommand(RelayCommand( - kind: HasChunks, - has_src: src, - has_keys: @[key], - )) - let res = await ns.receiveMessage() - if res.kind == ChunkStatus: - return key in res.present - else: - raise ValueError.newException("Expecting ChunkStatus but got: " & $res) diff --git a/src/bucketsrelay/v2/server2.nim b/src/bucketsrelay/v2/server2.nim index 60f4a59..f5b922e 100644 --- a/src/bucketsrelay/v2/server2.nim +++ b/src/bucketsrelay/v2/server2.nim @@ -29,10 +29,12 @@ type msg: RelayMessage const - VERSION = slurp"../CHANGELOG.md".split(" ")[1] + RELAY_VERSION = slurp"../../../CHANGELOG.md".split(" ")[1] logo_png = slurp"./static/logo.png" favicon_png = slurp"./static/favicon.png" +echo "RELAY_VERSION: ", RELAY_VERSION + let ADMIN_USERNAME = getEnv("ADMIN_USERNAME", "admin") let ADMIN_PWHASH = when defined(release): getEnv("ADMIN_PWHASH", "") @@ -141,9 +143,7 @@ proc handleWebsocket(req: Request) {.async, gcsafe.} = type StorageStat = tuple pubkey: SignPublicKey - message_size: int - chunk_size: int - total_size: int + size: int PubkeyEventStat = tuple pubkey: SignPublicKey @@ -200,12 +200,10 @@ router myrouter: # total stored let total_stored_note = relay.db.getRow(sql"SELECT coalesce(sum(length(data)), 0) FROM note").get()[0].i - let total_stored_message = relay.db.getRow(sql"SELECT coalesce(sum(length(data)), 0) FROM message").get()[0].i - let total_stored_chunk = relay.db.getRow(sql"SELECT coalesce(sum(length(val)), 0) FROM chunk").get()[0].i - let total_stored = total_stored_note + total_stored_message + total_stored_chunk + let total_stored_message = relay.db.getRow(sql"SELECT sum(coalesce(length(data), 0) + coalesce(length(key), 0)) FROM message").get()[0].i + let total_stored = total_stored_note + total_stored_message let num_note = relay.db.getRow(sql"SELECT coalesce(count(*), 0) FROM note").get()[0].i let num_message = relay.db.getRow(sql"SELECT coalesce(count(*), 0) FROM message").get()[0].i - let num_chunk = relay.db.getRow(sql"SELECT coalesce(count(*), 0) FROM chunk").get()[0].i # top traffic by ip var traffic_by_ip: seq[TransferTotal] @@ -259,32 +257,19 @@ router myrouter: # top storage by pubkey var storage_by_pubkey: seq[StorageStat] for row in relay.db.getAllRows(sql""" - WITH msg AS ( - SELECT src, SUM(coalesce(LENGTH(data), 0)) AS msg_bytes - FROM message - GROUP BY src - ), - chunksize AS ( - SELECT src, SUM(coalesce(LENGTH(val), 0)) AS chunk_bytes - FROM chunk - GROUP BY src - ) SELECT - COALESCE(m.src, c.src) AS src, - COALESCE(m.msg_bytes, 0) AS msg_bytes, - COALESCE(c.chunk_bytes, 0) AS chunk_bytes, - COALESCE(m.msg_bytes, 0) + COALESCE(c.chunk_bytes, 0) AS total_bytes - FROM msg AS m - FULL OUTER JOIN chunksize AS c - ON m.src = c.src - ORDER BY total_bytes DESC - LIMIT 10; + src, + SUM(COALESCE(LENGTH(data), 0) + COALESCE(LENGTH(key), 0)) AS msg_bytes + FROM + message + GROUP BY + src + ORDER BY 2 DESC + LIMIT 10 """): storage_by_pubkey.add(( pubkey: SignPublicKey.fromDb(row[0].b), - message_size: row[1].i.int, - chunk_size: row[2].i.int, - total_size: row[3].i.int, + size: row[1].i.int, )) # top events by pubkey @@ -344,25 +329,6 @@ router myrouter: pubkey: SignPublicKey.fromDb(row[0].b), count: row[1].i.int, )) - - var store_by_pubkey: seq[PubkeyEventStat] - for row in relay.db.getAllRows(sql""" - SELECT - pubkey, - COALESCE(SUM(store), 0) - FROM - stats_event - WHERE - period >= ? - AND pubkey <> '' - GROUP BY 1 - ORDER BY 2 DESC - LIMIT 10 - """, datarange.a): - store_by_pubkey.add(( - pubkey: SignPublicKey.fromDb(row[0].b), - count: row[1].i.int, - )) var html = "" compileTemplateFile("stats.nimja", baseDir = getScriptDir() / "templates", autoEscape = true, varname = "html") diff --git a/src/bucketsrelay/v2/templates/index.nimja b/src/bucketsrelay/v2/templates/index.nimja index 56c949c..3f9ff1f 100644 --- a/src/bucketsrelay/v2/templates/index.nimja +++ b/src/bucketsrelay/v2/templates/index.nimja @@ -41,11 +41,11 @@ Buckets Relay
- If you use Buckets, this relay lets you securely share your budget among your devices. Data is stored on this relay for a time, but is removed after periods of inactivity. All data passing through this relay is encrypted end-to-end. + If you use Buckets, this relay lets you securely share your budget among your devices. Data is stored until recipients receive it or it expires. All data passing through this relay is encrypted end-to-end.
diff --git a/src/bucketsrelay/v2/templates/stats.nimja b/src/bucketsrelay/v2/templates/stats.nimja index ddde498..518efe1 100644 --- a/src/bucketsrelay/v2/templates/stats.nimja +++ b/src/bucketsrelay/v2/templates/stats.nimja @@ -60,21 +60,18 @@
| Pubkey | -Message bytes | -Chunk bytes | +Bytes |
|---|---|---|---|
| {{ tot.pubkey.abbr }} | -{{ tot.message_size.wcommas }} | -{{ tot.chunk_size.wcommas }} | +{{ tot.size.wcommas }} |
| Pubkey | -Store | -
|---|---|
| {{ st.pubkey.abbr }} | -{{ st.count.wcommas }} | -