Skip to content

Commit

Permalink
rethinkdb.nimble
Browse files Browse the repository at this point in the history
  • Loading branch information
ba0f3 committed Jul 1, 2017
1 parent 6c6f54a commit d7e2d14
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 26 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
nimcache/
rethinkdb_data
tests/test.nim
tests/test*
118 changes: 94 additions & 24 deletions private/connection.nim
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
import asyncnet, strutils, logging, json, struct, tables
import scram/client

when not compileOption("threads"):
import asyncdispatch
else:
import net


import ql2, types, utils, datum

const
HANDSHAKE_FIRST_MESSAGE = "{\"protocol_version\": 0,\"authentication_method\": \"SCRAM-SHA-256\",\"authentication\": \"$#\"}\x0"
HANDSHAKE_FINAL_MESSAGE = "{\"authentication\": \"$#\"}\x0"

when not compileOption("threads"):
const
BUFFER_SIZE = 512
type
RethinkClient* = ref object of RootObj
address: string
port: Port
auth: string
username: string
password: string
options: TableRef[string, MutableDatum]
sock: AsyncSocket
sockConnected: bool
Expand All @@ -25,13 +30,15 @@ else:
RethinkClient* = ref object of RootObj
address: string
port: Port
auth: string
username: string
password: string
options: TableRef[string, MutableDatum]
sock: Socket
sockConnected: bool
queryToken: uint64

type
RqlAuthError* = object of SystemError
RqlDriverError* = object of SystemError
RqlClientError* = object of SystemError
RqlCompileError* = object of SystemError
Expand Down Expand Up @@ -101,13 +108,12 @@ proc use*(r: RethinkClient, s: string) =
term.args = @[newDatum(s)]
r.addOption("db", &term)

proc newRethinkClient*(address = "127.0.0.1", port = Port(28015), auth = "", db = ""): RethinkClient =
proc newRethinkClient*(address = "127.0.0.1", port = Port(28015), db: string = nil): RethinkClient =
## Init new client instance
assert address != ""
result = new(RethinkClient)
result.address = address
result.port = port
result.auth = auth
result.options = newTable[string, MutableDatum]()
when not compileOption("threads"):
result.sock = newAsyncSocket()
Expand All @@ -122,36 +128,92 @@ when not compileOption("threads"):
proc handshake(r: RethinkClient) {.async.} =
when defined(debug):
L.log(lvlDebug, "Preparing handshake...")
var data: string
if r.auth.len > 0:
data = pack("<ii$#si" % [$r.auth.len], HandshakeV0_4, r.auth.len.int32, r.auth, HandshakeJSON)
else:
data = pack("<iii", HandshakeV0_4, 0, HandshakeJSON)
var data = pack("<i", HandshakeV1_0)
await r.sock.send(data)
var buf = await r.sock.readUntil('\0')

if buf != "SUCCESS":
data = await r.sock.readUntil('\0')
if data[0] != '{':
raise newException(RqlDriverError, data)
when defined(debug):
L.log(lvlDebug, "Server response: ", data)

let
scramClient = newScramClient(SHA256)
firstMessage = scramClient.prepareFirstMessage(r.username)

data = HANDSHAKE_FIRST_MESSAGE % firstMessage
when defined(debug):
L.log(lvlDebug, "Sending first message: ", data)

await r.sock.send(data)
data = await r.sock.readUntil('\0')
when defined(debug):
L.log(lvlDebug, "Server response: ", data)
var response = parseJson(data)
if not response.hasKey("success") or not response["success"].bval:
raise newException(RqlAuthError, "Error code " & $response["error_code"].num & ": " & response["error"].str)

data = HANDSHAKE_FINAL_MESSAGE % scramClient.prepareFinalMessage(r.password, response["authentication"].str)
when defined(debug):
L.log(lvlDebug, "Sending final message: ", data)

await r.sock.send(data)
data = await r.sock.readUntil('\0')
when defined(debug):
L.log(lvlDebug, "Server response: ", data)
response = parseJson(data)
if not response.hasKey("success") or not response["success"].bval:
raise newException(RqlAuthError, "Error code " & $response["error_code"].num & ": " & response["error"].str)

if not scramClient.verifyServerFinalMessage(response["authentication"].str):
raise newException(RqlAuthError, "Verification of server final message failed")

when defined(debug):
L.log(lvlDebug, "Handshake success...")
else:
proc handshake(r: RethinkClient) =
when defined(debug):
L.log(lvlDebug, "Preparing handshake...")
var data: string
if r.auth.len > 0:
data = pack("<ii$#si" % [$r.auth.len], HandshakeV0_4, r.auth.len.int32, r.auth, HandshakeJSON)
else:
data = pack("<iii", HandshakeV0_4, 0, HandshakeJSON)
var data = pack("<i", HandshakeV1_0)
r.sock.send(data)
data = r.sock.readUntil('\0')
if data[0] != '{':
raise newException(RqlDriverError, data)
when defined(debug):
L.log(lvlDebug, "Server response: ", data)

var buf = r.sock.readUntil('\0')
let
scramClient = newScramClient(SHA256)
firstMessage = scramClient.prepareFirstMessage(r.username)

if buf[0..6] != "SUCCESS":
raise newException(RqlDriverError, buf)
data = HANDSHAKE_FIRST_MESSAGE % firstMessage
when defined(debug):
L.log(lvlDebug, "Handshake success...")
L.log(lvlDebug, "Sending first message: ", data)

r.sock.send(data)
data = r.sock.readUntil('\0')
when defined(debug):
L.log(lvlDebug, "Server response: ", data)
var response = parseJson(data)
if not response.hasKey("success") or not response["success"].bval:
raise newException(RqlAuthError, "Error code " & $response["error_code"].num & ": " & response["error"].str)

data = HANDSHAKE_FINAL_MESSAGE % scramClient.prepareFinalMessage(r.password, response["authentication"].str)
when defined(debug):
L.log(lvlDebug, "Sending final message: ", data)

r.sock.send(data)
data = r.sock.readUntil('\0')
when defined(debug):
L.log(lvlDebug, "Server response: ", data)
response = parseJson(data)
if not response.hasKey("success") or not response["success"].bval:
raise newException(RqlAuthError, "Error code " & $response["error_code"].num & ": " & response["error"].str)

if not scramClient.verifyServerFinalMessage(response["authentication"].str):
raise newException(RqlAuthError, "Verification of server final message failed")

when defined(debug):
L.log(lvlDebug, "Handshake success...")
proc close*(r: RethinkClient) =
## Close an open connection
r.sock.close()
Expand Down Expand Up @@ -212,9 +274,13 @@ when not compileOption("threads"):

result = newResponse(buf, token)

proc connect*(r: RethinkClient): Future[void] {.async.} =
proc connect*(r: RethinkClient, username = "admin", password: string = ""): Future[void] {.async.} =
## Create a new connection to the database server
if not r.isConnected:
if r.username != username:
r.username = username
if not password.isNilOrEmpty and r.password != password:
r.password = password
when defined(debug):
L.log(lvlDebug, "Connecting to server at $#:$#..." % [r.address, $r.port.int])
await r.sock.connect(r.address, r.port)
Expand Down Expand Up @@ -280,9 +346,13 @@ else:

newResponse(buf, token)

proc connect*(r: RethinkClient) =
proc connect*(r: RethinkClient, username = "admin", password: string = "") =
## Create a new connection to the database server
if not r.isConnected:
if r.username != username:
r.username = username
if not password.isNilOrEmpty and r.password != password:
r.password = password
when defined(debug):
L.log(lvlDebug, "Connecting to server at $#:$#..." % [r.address, $r.port])
r.sock.connect(r.address, r.port)
Expand Down
2 changes: 1 addition & 1 deletion private/rql.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

## This module provides all high-level API for query and manipulate data
import strutils, json, tables, future
import ql2, datum, connection, utils, types
import ql2, datum, connection, utils, types


when not compileOption("threads"):
Expand Down

0 comments on commit d7e2d14

Please sign in to comment.