The hardware and bandwidth for this mirror is donated by dogado GmbH, the Webhosting and Full Service-Cloud Provider. Check out our Wordpress Tutorial.
If you wish to report a bug, or if you are interested in having us mirror your free-software or open-source project, please feel free to contact us at mirror[@]dogado.de.
nanonext provides bindings to NNG (Nanomsg Next Gen), a high-performance messaging library for building distributed systems.
This is a cheatsheet. Refer to the other vignettes for detailed introductions:
inproc:// (in-process), ipc:// (inter-process), tcp://, ws://, wss://, tls+tcp://send_aio() / recv_aio() return immediately; access results via $data or $result"serial" (R objects), "raw" (bytes), "double", "integer", "character", etc.cv() for zero-latency event synchronisationlibrary(nanonext)
# Functional interface
s <- socket("pair")
listen(s, "tcp://127.0.0.1:5555")
dial(s, "tcp://127.0.0.1:5555")
# Object-oriented interface
n <- nano("pair", listen = "tcp://127.0.0.1:5555")
n$dial("tcp://127.0.0.1:5556")
# Close when done
close(s)
n$close()
| Protocol | Description | Socket Types |
|---|---|---|
| Pair | 1-to-1 bidirectional | "pair" |
| Poly | Polyamorous pair | "poly" |
| Pipeline | One-way data flow | "push", "pull" |
| Req/Rep | RPC pattern | "req", "rep" |
| Pub/Sub | Broadcast/subscribe | "pub", "sub" |
| Survey | Query all peers | "surveyor", "respondent" |
| Bus | Many-to-many mesh | "bus" |
| URL Scheme | Description |
|---|---|
inproc://name |
In-process (fastest, same process) |
ipc:///path |
Inter-process (Unix socket / named pipe) |
tcp://host:port |
TCP/IP network |
ws://host:port/path |
WebSocket |
wss://host:port/path |
WebSocket over TLS |
tls+tcp://host:port |
TLS encrypted TCP |
# Send R object (serialized)
send(s, data.frame(a = 1, b = 2))
# Receive R object
recv(s)
# Send raw bytes (for cross-language exchange)
send(s, c(1.1, 2.2, 3.3), mode = "raw")
# Receive as specific type
recv(s, mode = "double")
recv(s, mode = "character")
recv(s, mode = "raw")
| Mode | Description |
|---|---|
"serial" / 1 |
R serialization (default) |
"character" / 2 |
Coerce to character |
"complex" / 3 |
Coerce to complex |
"double" / 4 |
Coerce to double |
"integer" / 5 |
Coerce to integer |
"logical" / 6 |
Coerce to logical |
"numeric" / 7 |
Coerce to numeric |
"raw" / 8 |
Raw bytes |
"string" / 9 |
Fast option for length-1 character |
# Async send - returns immediately
res <- send_aio(s, data)
res$result # 0 = success, error code otherwise
# Async receive - returns immediately
msg <- recv_aio(s)
msg$data # Value when resolved, 'unresolved' NA otherwise
# Check if resolved
unresolved(msg) # TRUE while pending
# Wait for resolution
call_aio(msg) # Blocks, returns Aio object
collect_aio(msg) # Blocks, returns value directly
msg[] # Blocks (user-interruptible), returns value
# Poll while doing other work
while (unresolved(msg)) {
# do other tasks
}
result <- msg$data
# Multiple async operations
msg1 <- recv_aio(s1)
msg2 <- recv_aio(s2)
# Both run concurrently
# Create condition variable
cv <- cv()
# Check/signal
cv_value(cv) # Get counter value
cv_signal(cv) # Increment counter
cv_reset(cv) # Reset to zero
# Wait (blocks until counter > 0, then decrements)
wait(cv)
# Wait with timeout (ms), returns FALSE on timeout
until(cv, 1000)
# Signal on connection/disconnection
pipe_notify(socket, cv = cv, add = TRUE, remove = TRUE)
# Distinguish message vs disconnect with flag
pipe_notify(socket, cv = cv, remove = TRUE, flag = TRUE)
r <- recv_aio(socket, cv = cv)
wait(cv) || stop("disconnected") # FALSE = pipe event
cv <- cv()
msg <- recv_aio(s, cv = cv)
wait(cv) # Wake on receive completion
msg$data
rep <- socket("rep", listen = "tcp://127.0.0.1:5555")
ctx <- context(rep)
# reply() blocks, waiting for request
reply(ctx, execute = my_function, send_mode = "raw")
close(rep)
req <- socket("req", dial = "tcp://127.0.0.1:5555")
ctx <- context(req)
# request() returns immediately
aio <- request(ctx, data = args, recv_mode = "double")
# Do other work while server processes...
# Get result when needed
result <- aio[]
close(req)
pub <- socket("pub", listen = "inproc://pubsub")
sub <- socket("sub", dial = "inproc://pubsub")
# Subscribe to topic (prefix matching)
subscribe(sub, topic = "news")
subscribe(sub, topic = NULL) # All topics
# Unsubscribe
unsubscribe(sub, topic = "news")
# Publish (topic is message prefix)
send(pub, c("news", "headline"), mode = "raw")
# Receive (includes topic)
recv(sub, mode = "character")
close(pub)
close(sub)
sur <- socket("surveyor", listen = "inproc://survey")
res1 <- socket("respondent", dial = "inproc://survey")
res2 <- socket("respondent", dial = "inproc://survey")
# Set survey timeout (ms)
survey_time(sur, 500)
# Broadcast survey
send(sur, "ping")
# Collect responses (async)
aio1 <- recv_aio(sur)
aio2 <- recv_aio(sur)
# Respondents reply
recv(res1)
send(res1, "pong1")
# Late/missing responses timeout (errorValue 5)
msleep(500)
aio2$data # errorValue if no response
close(sur)
close(res1)
close(res2)
# Generate certificate (cn must match URL host exactly)
cert <- write_cert(cn = "127.0.0.1")
# Create TLS configs
server_tls <- tls_config(server = cert$server)
client_tls <- tls_config(client = cert$client)
# Use with tls+tcp:// or wss://
s1 <- socket(listen = "tls+tcp://127.0.0.1:5555", tls = server_tls)
s2 <- socket(dial = "tls+tcp://127.0.0.1:5555", tls = client_tls)
# Client with CA cert file
client_tls <- tls_config(client = "/path/to/ca-cert.pem")
# Server with cert + key
server_tls <- tls_config(server = c("/path/to/cert.pem", "/path/to/key.pem"))
# Delayed start for configuration
s <- socket(listen = "tcp://127.0.0.1:5555", autostart = FALSE)
# Get option
opt(s$listener[[1]], "recv-size-max")
# Set option
opt(s$listener[[1]], "recv-size-max") <- 8192L
# Start after configuration
start(s$listener[[1]])
| Option | Description |
|---|---|
"recv-size-max" |
Max message size (0 = unlimited) |
"send-timeout" |
Send timeout (ms) |
"recv-timeout" |
Receive timeout (ms) |
"reconnect-time-min" |
Min reconnect interval (ms) |
"reconnect-time-max" |
Max reconnect interval (ms) |
"req:resend-time" |
Request retry interval |
"sub:prefnew" |
Prefer newer messages |
# Register custom serializer for a class
serial <- serial_config(
"class_name",
function(x) serialize(x, NULL), # serialize
unserialize # unserialize
)
opt(socket, "serial") <- serial
stat(socket, "pipes") # Active connections
stat(listener, "accept") # Connection attempts
stat(dialer, "reject") # Rejected connections
Contexts enable concurrent operations on a single socket (for req/rep, surveyor/respondent).
s <- socket("req", dial = "tcp://127.0.0.1:5555")
# Create independent contexts
ctx1 <- context(s)
ctx2 <- context(s)
# Concurrent requests
aio1 <- request(ctx1, data1)
aio2 <- request(ctx2, data2)
# Close contexts (or they close with socket)
close(ctx1)
close(ctx2)
close(s)
# R: send raw doubles
n <- nano("pair", dial = "ipc:///tmp/nanonext")
n$send(c(1.1, 2.2, 3.3), mode = "raw")
result <- n$recv(mode = "double")
# Python: receive as NumPy array
import numpy as np
import pynng
socket = pynng.Pair0(listen="ipc:///tmp/nanonext")
array = np.frombuffer(socket.recv())
socket.send(array.tobytes())
# Errors return as 'errorValue' class
result <- recv(s, block = FALSE)
# Check for errors
is_error_value(result)
# Error codes
# 5 = Timed out
# 6 = Connection refused
# 8 = Try again (non-blocking, no message)
# Get error message
nng_error(5) # "Timed out"
# Sleep (uninterruptible, ms)
msleep(100)
# Random bytes
random(8) # 8 random bytes as hex string
random(8, convert = FALSE) # As raw vector
# Parse URL
parse_url("tcp://127.0.0.1:5555")
These binaries (installable software) and packages are in development.
They may not be fully stable and should be used with caution. We make no claims about them.
Health stats visible at Monitor.