From 54c1637c74e7941fbe9f152df70c0a25f8cd37f6 Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Mon, 3 Jun 2024 16:00:59 +0100 Subject: [PATCH] nanonext 1.1.0 --- DESCRIPTION | 2 +- NEWS.md | 2 +- R/aio.R | 2 +- man/collect_aio.Rd | 2 +- vignettes/nanonext.Rmd | 82 ++++++++++++++++++------------------- vignettes/nanonext.Rmd.orig | 11 +++-- 6 files changed, 50 insertions(+), 51 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index d734f2dbc..fff433418 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: nanonext Type: Package Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library -Version: 1.0.0.9022 +Version: 1.1.0 Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is a socket library implementing 'Scalability Protocols', a reliable, high-performance standard for common communications patterns including diff --git a/NEWS.md b/NEWS.md index 9ff869178..3e86b9626 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,4 +1,4 @@ -# nanonext 1.0.0.9022 (development) +# nanonext 1.1.0 #### New Features diff --git a/R/aio.R b/R/aio.R index ac59d896a..431e35c04 100644 --- a/R/aio.R +++ b/R/aio.R @@ -256,7 +256,7 @@ call_aio_ <- function(aio) invisible(.Call(rnng_wait_thread_create, aio)) #' collect_aio(res) #' #' msg <- recv_aio(s2, timeout = 100) -#' collect_aio_(res) +#' collect_aio_(msg) #' #' close(s1) #' close(s2) diff --git a/man/collect_aio.Rd b/man/collect_aio.Rd index 969477c01..e87ce4cde 100644 --- a/man/collect_aio.Rd +++ b/man/collect_aio.Rd @@ -36,7 +36,7 @@ res <- send_aio(s1, data.frame(a = 1, b = 2), timeout = 100) collect_aio(res) msg <- recv_aio(s2, timeout = 100) -collect_aio_(res) +collect_aio_(msg) close(s1) close(s2) diff --git a/vignettes/nanonext.Rmd b/vignettes/nanonext.Rmd index 2c32eb671..173bcb57a 100644 --- a/vignettes/nanonext.Rmd +++ b/vignettes/nanonext.Rmd @@ -37,7 +37,7 @@ One solution it provides is that of processing real-time data where computation Create socket in Python using the NNG binding 'pynng': -```python +``` python import numpy as np import pynng socket = pynng.Pair0(listen="ipc:///tmp/nanonext.socket") @@ -46,7 +46,7 @@ socket = pynng.Pair0(listen="ipc:///tmp/nanonext.socket") Create nano object in R using `nanonext`, then send a vector of 'doubles', specifying mode as 'raw': -```r +``` r library(nanonext) n <- nano("pair", dial = "ipc:///tmp/nanonext.socket") n$send(c(1.1, 2.2, 3.3, 4.4, 5.5), mode = "raw") @@ -56,7 +56,7 @@ n$send(c(1.1, 2.2, 3.3, 4.4, 5.5), mode = "raw") Receive in Python as a NumPy array of 'floats', and send back to R: -```python +``` python raw = socket.recv() array = np.frombuffer(raw) print(array) @@ -69,7 +69,7 @@ socket.send(msg) Receive in R, specifying the receive mode as 'double': -```r +``` r n$recv(mode = "double") #> [1] 1.1 2.2 3.3 4.4 5.5 ``` @@ -81,7 +81,7 @@ n$recv(mode = "double") `nanonext` implements true async send and receive, leveraging NNG as a massively-scaleable concurrency framework. -```r +``` r s1 <- socket("pair", listen = "inproc://nano") s2 <- socket("pair", dial = "inproc://nano") ``` @@ -91,7 +91,7 @@ s2 <- socket("pair", dial = "inproc://nano") An 'Aio' object returns an unresolved value whilst its asynchronous operation is ongoing, automatically resolving to a final value once complete. -```r +``` r # an async receive is requested, but no messages are waiting (yet to be sent) msg <- recv_aio(s2) msg @@ -103,7 +103,7 @@ msg$data For a 'sendAio' object, the result is stored at `$result`. -```r +``` r res <- send_aio(s1, data.frame(a = 1, b = 2)) res #> < sendAio | $result > @@ -115,7 +115,7 @@ res$result For a 'recvAio' object, the message is stored at `$data`. -```r +``` r # now that a message has been sent, the 'recvAio' resolves automatically msg$data #> a b @@ -125,7 +125,7 @@ msg$data Auxiliary function `unresolved()` may be used in control flow statements to perform actions which depend on resolution of the Aio, both before and after. This means there is no need to actually wait (block) for an Aio to resolve, as the example below demonstrates. -```r +``` r msg <- recv_aio(s2) # unresolved() queries for resolution itself so no need to use it again within the while loop @@ -144,14 +144,18 @@ msg$data The values may also be called explicitly using `call_aio()`. This will wait for completion of the Aio (blocking). -```r +``` r # will wait for completion then return the resolved Aio call_aio(msg) -# to access the resolved value directly (waiting if required) +# to access the resolved value (waiting if required): call_aio(msg)$data #> [1] "resolved" +# or directly: +collect_aio(msg) +#> [1] "resolved" + close(s1) close(s2) ``` @@ -167,7 +171,7 @@ Can be used to perform computationally-expensive calculations or I/O-bound opera [S] Server process: `reply()` will wait for a message and apply a function, in this case `rnorm()`, before sending back the result. This is started in a background 'mirai' process. -```r +``` r m <- mirai::mirai({ library(nanonext) rep <- socket("rep", listen = "tcp://127.0.0.1:6556") @@ -178,7 +182,7 @@ m <- mirai::mirai({ [C] Client process: `request()` performs an async send and receive request and returns immediately with a `recvAio` object. -```r +``` r library(nanonext) req <- socket("req", dial = "tcp://127.0.0.1:6556") aio <- request(context(req), data = 1e8, recv_mode = "double") @@ -187,7 +191,7 @@ aio <- request(context(req), data = 1e8, recv_mode = "double") At this point, the client can run additional code concurrent with the server processing the request. -```r +``` r # do more... ``` @@ -196,13 +200,9 @@ When the result of the server calculation is required, the `recvAio` may be call The return value from the server request is then retrieved and stored in the Aio as `$data`. -```r -call_aio(aio) - -aio -#> < recvAio | $data > -aio$data |> str() -#> num [1:100000000] 1.365 -0.842 -0.816 1.367 -0.813 ... +``` r +call_aio(aio)$data |> str() +#> num [1:100000000] 0.257 -0.413 0.946 0.545 0.071 ... ``` As `call_aio()` is blocking and will wait for completion, an alternative is to query `aio$data` directly. This will return an 'unresolved' logical NA value if the calculation is yet to complete. @@ -232,7 +232,7 @@ The following shows how condition variables and signalling work in practice. Example 1: set up a socket, and wait for the other side to connect: -```r +``` r sock <- socket("pair", listen = "inproc://nanopipe") cv <- cv() # create new condition variable @@ -265,7 +265,7 @@ close(sock) Example 2: wait until a message is received or connection is dropped: -```r +``` r sock <- socket("pair", listen = "inproc://nanosignal") sock2 <- socket("pair", dial = "inproc://nanosignal") @@ -317,11 +317,11 @@ A client configuration requires a PEM-encoded CA certificate (chain) used to ver Additionally, the convenience function `write_cert()` can automatically generate a 4096 bit RSA key pair and self-signed X.509 certificate in the format required by `tls_config()`. The 'cn' argument must be provided and match exactly the hostname / IP address of the URL that is being used, e.g. in the example below '127.0.0.1' must be used throughout, or alternatively 'localhost', but not a mixture of the two. -```r +``` r cert <- write_cert(cn = "127.0.0.1") str(cert) #> List of 2 -#> $ server: chr [1:2] "-----BEGIN CERTIFICATE-----\nMIIFOTCCAyGgAwIBAgIBATANBgkqhkiG9w0BAQsFADA0MRIwEAYDVQQDDAkxMjcu\nMC4wLjExETAPBgNV"| __truncated__ "-----BEGIN RSA PRIVATE KEY-----\nMIIJKAIBAAKCAgEAuMCkX3Rdm9ssjzAfpLbDndtuwvwceenNXQNO9R2/v99teHdn\nTsjeYb+gNNpP"| __truncated__ +#> $ server: chr [1:2] "-----BEGIN CERTIFICATE-----\nMIIFOTCCAyGgAwIBAgIBATANBgkqhkiG9w0BAQsFADA0MRIwEAYDVQQDDAkxMjcu\nMC4wLjExETAPBgNV"| __truncated__ "-----BEGIN RSA PRIVATE KEY-----\nMIIJKgIBAAKCAgEA3CPAXY45HOTzvo4z+U15qFP3jvrcATlNio/qO4HU4L0E82k+\nQ2P1aDuWUg7h"| __truncated__ #> $ client: chr [1:2] "-----BEGIN CERTIFICATE-----\nMIIFOTCCAyGgAwIBAgIBATANBgkqhkiG9w0BAQsFADA0MRIwEAYDVQQDDAkxMjcu\nMC4wLjExETAPBgNV"| __truncated__ "" ser <- tls_config(server = cert$server) @@ -348,7 +348,7 @@ close(s) `nanonext` fully implements NNG's pub/sub protocol as per the below example. A subscriber can subscribe to one or multiple topics broadcast by a publisher. -```r +``` r pub <- socket("pub", listen = "inproc://nanobroadcast") sub <- socket("sub", dial = "inproc://nanobroadcast") @@ -392,7 +392,7 @@ sub |> recv(mode = "character") The subscribed topic can be of any atomic type (not just character), allowing integer, double, logical, complex and raw vectors to be sent and received. -```r +``` r sub |> subscribe(topic = 1) pub |> send(c(1, 10, 10, 20), mode = "raw") #> [1] 0 @@ -416,7 +416,7 @@ This type of pattern is useful for applications such as service discovery. A surveyor sends a survey, which is broadcast to all peer respondents. Respondents are then able to reply, but are not obliged to. The survey itself is a timed event, and responses received after the timeout are discarded. -```r +``` r sur <- socket("surveyor", listen = "inproc://nanoservice") res1 <- socket("respondent", dial = "inproc://nanoservice") res2 <- socket("respondent", dial = "inproc://nanoservice") @@ -470,7 +470,7 @@ It can be seen that the final value resolves into a timeout, which is an integer For normal use, it takes just the URL. It can follow redirects. -```r +``` r ncurl("https://postman-echo.com/get") #> $status #> [1] 200 @@ -479,13 +479,13 @@ ncurl("https://postman-echo.com/get") #> NULL #> #> $data -#> [1] "{\n \"args\": {},\n \"headers\": {\n \"x-forwarded-proto\": \"https\",\n \"x-forwarded-port\": \"443\",\n \"host\": \"postman-echo.com\",\n \"x-amzn-trace-id\": \"Root=1-6634af61-2a7256825245680a005b73e7\"\n },\n \"url\": \"https://postman-echo.com/get\"\n}" +#> [1] "{\n \"args\": {},\n \"headers\": {\n \"x-forwarded-proto\": \"https\",\n \"x-forwarded-port\": \"443\",\n \"host\": \"postman-echo.com\",\n \"x-amzn-trace-id\": \"Root=1-665dccb1-6374c8162c8a5951767e475b\"\n },\n \"url\": \"https://postman-echo.com/get\"\n}" ``` For advanced use, supports additional HTTP methods such as POST or PUT. -```r +``` r res <- ncurl_aio("https://postman-echo.com/post", method = "POST", headers = c(`Content-Type` = "application/json", Authorization = "Bearer APIKEY"), @@ -496,10 +496,10 @@ res call_aio(res)$headers #> $date -#> [1] "Fri, 03 May 2024 09:33:21 GMT" +#> [1] "Mon, 03 Jun 2024 14:01:21 GMT" res$data -#> [1] "{\n \"args\": {},\n \"data\": {\n \"key\": \"value\"\n },\n \"files\": {},\n \"form\": {},\n \"headers\": {\n \"x-forwarded-proto\": \"https\",\n \"x-forwarded-port\": \"443\",\n \"host\": \"postman-echo.com\",\n \"x-amzn-trace-id\": \"Root=1-6634af61-7f19837d47b179ea131ef4e9\",\n \"content-length\": \"16\",\n \"content-type\": \"application/json\",\n \"authorization\": \"Bearer APIKEY\"\n },\n \"json\": {\n \"key\": \"value\"\n },\n \"url\": \"https://postman-echo.com/post\"\n}" +#> [1] "{\n \"args\": {},\n \"data\": {\n \"key\": \"value\"\n },\n \"files\": {},\n \"form\": {},\n \"headers\": {\n \"x-forwarded-proto\": \"https\",\n \"x-forwarded-port\": \"443\",\n \"host\": \"postman-echo.com\",\n \"x-amzn-trace-id\": \"Root=1-665dccb1-74cd2d7d4fe4282876bfb176\",\n \"content-length\": \"16\",\n \"content-type\": \"application/json\",\n \"authorization\": \"Bearer APIKEY\"\n },\n \"json\": {\n \"key\": \"value\"\n },\n \"url\": \"https://postman-echo.com/post\"\n}" ``` In this respect, it may be used as a performant and lightweight method for making REST API requests. @@ -509,7 +509,7 @@ In this respect, it may be used as a performant and lightweight method for makin By specifying `convert = FALSE`, the received binary data is made available as a raw vector. This may be fed into 'json' parsers which can operate directly on such data etc. -```r +``` r sess <- ncurl_session("https://postman-echo.com/get", convert = FALSE, headers = c(`Content-Type` = "application/json", Authorization = "Bearer APIKEY"), @@ -523,7 +523,7 @@ transact(sess) #> #> $headers #> $headers$Date -#> [1] "Fri, 03 May 2024 09:33:22 GMT" +#> [1] "Mon, 03 Jun 2024 14:01:22 GMT" #> #> $headers$`Content-Type` #> [1] "application/json; charset=utf-8" @@ -534,7 +534,7 @@ transact(sess) #> [40] 6f 72 77 61 72 64 65 64 2d 70 72 6f 74 6f 22 3a 20 22 68 74 74 70 73 22 2c 0a 20 20 20 20 22 78 2d 66 6f 72 77 61 72 #> [79] 64 65 64 2d 70 6f 72 74 22 3a 20 22 34 34 33 22 2c 0a 20 20 20 20 22 68 6f 73 74 22 3a 20 22 70 6f 73 74 6d 61 6e 2d #> [118] 65 63 68 6f 2e 63 6f 6d 22 2c 0a 20 20 20 20 22 78 2d 61 6d 7a 6e 2d 74 72 61 63 65 2d 69 64 22 3a 20 22 52 6f 6f 74 -#> [157] 3d 31 2d 36 36 33 34 61 66 36 32 2d 37 38 31 63 34 34 38 66 35 30 37 65 61 62 64 65 33 31 35 61 66 35 37 35 22 2c 0a +#> [157] 3d 31 2d 36 36 35 64 63 63 62 32 2d 31 39 34 62 62 66 39 35 30 64 30 32 64 65 36 33 36 30 32 38 66 32 34 31 22 2c 0a #> [196] 20 20 20 20 22 63 6f 6e 74 65 6e 74 2d 74 79 70 65 22 3a 20 22 61 70 70 6c 69 63 61 74 69 6f 6e 2f 6a 73 6f 6e 22 2c #> [235] 0a 20 20 20 20 22 61 75 74 68 6f 72 69 7a 61 74 69 6f 6e 22 3a 20 22 42 65 61 72 65 72 20 41 50 49 4b 45 59 22 0a 20 #> [274] 20 7d 2c 0a 20 20 22 75 72 6c 22 3a 20 22 68 74 74 70 73 3a 2f 2f 70 6f 73 74 6d 61 6e 2d 65 63 68 6f 2e 63 6f 6d 2f @@ -544,7 +544,7 @@ transact(sess) Optimised functions for base64 encoding and decoding from the 'Mbed TLS' library are also exposed as convenience utilities: -```r +``` r base64enc("hello world!") #> [1] "aGVsbG8gd29ybGQh" @@ -561,7 +561,7 @@ base64dec(base64enc("hello world!")) The stream interface can be used to communicate with (secure) websocket servers. The argument `textframes = TRUE` can be specified where the websocket server uses text rather than binary frames. -```r +``` r # connecting to an echo service s <- stream(dial = "wss://echo.websocket.events/", textframes = TRUE) s @@ -574,7 +574,7 @@ s `send()` and `recv()`, as well as their asynchronous counterparts `send_aio()` and `recv_aio()` can be used on Streams in the same way as Sockets. This affords a great deal of flexibility in ingesting and processing streaming data. -```r +``` r s |> recv() #> [1] "echo.websocket.events sponsored by Lob.com" @@ -612,7 +612,7 @@ See the function documentation page for a list of common options. Once a dialer or listener has started, it is not generally possible to change its configuration. In this case, the dialer or listener should be created specifying 'autostart = FALSE'. -```r +``` r s <- socket(listen = "inproc://options", autostart = FALSE) # no maximum message size @@ -635,7 +635,7 @@ This can be used on a Socket, Listener or Dialer to query useful statistics such See the function documentation page for available statistics. -```r +``` r s <- socket(listen = "inproc://stat") # no active connections (pipes) diff --git a/vignettes/nanonext.Rmd.orig b/vignettes/nanonext.Rmd.orig index f8f5a9e23..4e5ebd6ee 100644 --- a/vignettes/nanonext.Rmd.orig +++ b/vignettes/nanonext.Rmd.orig @@ -137,9 +137,12 @@ The values may also be called explicitly using `call_aio()`. This will wait for # will wait for completion then return the resolved Aio call_aio(msg) -# to access the resolved value directly (waiting if required) +# to access the resolved value (waiting if required): call_aio(msg)$data +# or directly: +collect_aio(msg) + close(s1) close(s2) @@ -184,11 +187,7 @@ When the result of the server calculation is required, the `recvAio` may be call The return value from the server request is then retrieved and stored in the Aio as `$data`. ```{r rpcclient3} -call_aio(aio) - -aio -aio$data |> str() - +call_aio(aio)$data |> str() ``` As `call_aio()` is blocking and will wait for completion, an alternative is to query `aio$data` directly. This will return an 'unresolved' logical NA value if the calculation is yet to complete.