The hardware and bandwidth for this mirror is donated by dogado GmbH, the Webhosting and Full Service-Cloud Provider. Check out our Wordpress Tutorial.
If you wish to report a bug, or if you are interested in having us mirror your free-software or open-source project, please feel free to contact us at mirror[@]dogado.de.

nanonext - Quick Reference

Core Concepts

nanonext provides bindings to NNG (Nanomsg Next Gen), a high-performance messaging library for building distributed systems.

This is a cheatsheet. Refer to the other vignettes for detailed introductions:

Key Takeaways

1. Sockets and Connections

Create Sockets

library(nanonext)

# Functional interface
s <- socket("pair")
listen(s, "tcp://127.0.0.1:5555")
dial(s, "tcp://127.0.0.1:5555")

# Object-oriented interface
n <- nano("pair", listen = "tcp://127.0.0.1:5555")
n$dial("tcp://127.0.0.1:5556")

# Close when done
close(s)
n$close()

Protocols

Protocol Description Socket Types
Pair 1-to-1 bidirectional "pair"
Poly Polyamorous pair "poly"
Pipeline One-way data flow "push", "pull"
Req/Rep RPC pattern "req", "rep"
Pub/Sub Broadcast/subscribe "pub", "sub"
Survey Query all peers "surveyor", "respondent"
Bus Many-to-many mesh "bus"

Transports

URL Scheme Description
inproc://name In-process (fastest, same process)
ipc:///path Inter-process (Unix socket / named pipe)
tcp://host:port TCP/IP network
ws://host:port/path WebSocket
wss://host:port/path WebSocket over TLS
tls+tcp://host:port TLS encrypted TCP

2. Send and Receive

Synchronous

# Send R object (serialized)
send(s, data.frame(a = 1, b = 2))

# Receive R object
recv(s)

# Send raw bytes (for cross-language exchange)
send(s, c(1.1, 2.2, 3.3), mode = "raw")

# Receive as specific type
recv(s, mode = "double")
recv(s, mode = "character")
recv(s, mode = "raw")

Receive Modes

Mode Description
"serial" / 1 R serialization (default)
"character" / 2 Coerce to character
"complex" / 3 Coerce to complex
"double" / 4 Coerce to double
"integer" / 5 Coerce to integer
"logical" / 6 Coerce to logical
"numeric" / 7 Coerce to numeric
"raw" / 8 Raw bytes
"string" / 9 Fast option for length-1 character

3. Async I/O

Basic Async

# Async send - returns immediately
res <- send_aio(s, data)
res$result          # 0 = success, error code otherwise

# Async receive - returns immediately
msg <- recv_aio(s)
msg$data            # Value when resolved, 'unresolved' NA otherwise

# Check if resolved
unresolved(msg)     # TRUE while pending

# Wait for resolution
call_aio(msg)       # Blocks, returns Aio object
collect_aio(msg)    # Blocks, returns value directly
msg[]               # Blocks (user-interruptible), returns value

Non-blocking Patterns

# Poll while doing other work
while (unresolved(msg)) {
  # do other tasks
}
result <- msg$data

# Multiple async operations
msg1 <- recv_aio(s1)
msg2 <- recv_aio(s2)
# Both run concurrently

4. Condition Variables

Basics

# Create condition variable
cv <- cv()

# Check/signal
cv_value(cv)        # Get counter value
cv_signal(cv)       # Increment counter
cv_reset(cv)        # Reset to zero

# Wait (blocks until counter > 0, then decrements)
wait(cv)

# Wait with timeout (ms), returns FALSE on timeout
until(cv, 1000)

Pipe Notifications

# Signal on connection/disconnection
pipe_notify(socket, cv = cv, add = TRUE, remove = TRUE)

# Distinguish message vs disconnect with flag
pipe_notify(socket, cv = cv, remove = TRUE, flag = TRUE)
r <- recv_aio(socket, cv = cv)
wait(cv) || stop("disconnected")  # FALSE = pipe event

Async with CV

cv <- cv()
msg <- recv_aio(s, cv = cv)
wait(cv)            # Wake on receive completion
msg$data

5. Request/Reply (RPC)

Server

rep <- socket("rep", listen = "tcp://127.0.0.1:5555")
ctx <- context(rep)

# reply() blocks, waiting for request
reply(ctx, execute = my_function, send_mode = "raw")

close(rep)

Client

req <- socket("req", dial = "tcp://127.0.0.1:5555")
ctx <- context(req)

# request() returns immediately
aio <- request(ctx, data = args, recv_mode = "double")

# Do other work while server processes...

# Get result when needed
result <- aio[]

close(req)

6. Pub/Sub

pub <- socket("pub", listen = "inproc://pubsub")
sub <- socket("sub", dial = "inproc://pubsub")

# Subscribe to topic (prefix matching)
subscribe(sub, topic = "news")
subscribe(sub, topic = NULL)      # All topics

# Unsubscribe
unsubscribe(sub, topic = "news")

# Publish (topic is message prefix)
send(pub, c("news", "headline"), mode = "raw")

# Receive (includes topic)
recv(sub, mode = "character")

close(pub)
close(sub)

7. Surveyor/Respondent

sur <- socket("surveyor", listen = "inproc://survey")
res1 <- socket("respondent", dial = "inproc://survey")
res2 <- socket("respondent", dial = "inproc://survey")

# Set survey timeout (ms)
survey_time(sur, 500)

# Broadcast survey
send(sur, "ping")

# Collect responses (async)
aio1 <- recv_aio(sur)
aio2 <- recv_aio(sur)

# Respondents reply
recv(res1)
send(res1, "pong1")

# Late/missing responses timeout (errorValue 5)
msleep(500)
aio2$data           # errorValue if no response

close(sur)
close(res1)
close(res2)

8. TLS Secure Connections

Self-signed Certificates

# Generate certificate (cn must match URL host exactly)
cert <- write_cert(cn = "127.0.0.1")

# Create TLS configs
server_tls <- tls_config(server = cert$server)
client_tls <- tls_config(client = cert$client)

# Use with tls+tcp:// or wss://
s1 <- socket(listen = "tls+tcp://127.0.0.1:5555", tls = server_tls)
s2 <- socket(dial = "tls+tcp://127.0.0.1:5555", tls = client_tls)

CA Certificates

# Client with CA cert file
client_tls <- tls_config(client = "/path/to/ca-cert.pem")

# Server with cert + key
server_tls <- tls_config(server = c("/path/to/cert.pem", "/path/to/key.pem"))

9. Options and Statistics

Get/Set Options

# Delayed start for configuration
s <- socket(listen = "tcp://127.0.0.1:5555", autostart = FALSE)

# Get option
opt(s$listener[[1]], "recv-size-max")

# Set option
opt(s$listener[[1]], "recv-size-max") <- 8192L

# Start after configuration
start(s$listener[[1]])

Common Options

Option Description
"recv-size-max" Max message size (0 = unlimited)
"send-timeout" Send timeout (ms)
"recv-timeout" Receive timeout (ms)
"reconnect-time-min" Min reconnect interval (ms)
"reconnect-time-max" Max reconnect interval (ms)
"req:resend-time" Request retry interval
"sub:prefnew" Prefer newer messages

Custom Serialization

# Register custom serializer for a class
serial <- serial_config(
  "class_name",
  function(x) serialize(x, NULL),  # serialize
  unserialize                      # unserialize
)
opt(socket, "serial") <- serial

Statistics

stat(socket, "pipes")      # Active connections
stat(listener, "accept")   # Connection attempts
stat(dialer, "reject")     # Rejected connections

10. Contexts

Contexts enable concurrent operations on a single socket (for req/rep, surveyor/respondent).

s <- socket("req", dial = "tcp://127.0.0.1:5555")

# Create independent contexts
ctx1 <- context(s)
ctx2 <- context(s)

# Concurrent requests
aio1 <- request(ctx1, data1)
aio2 <- request(ctx2, data2)

# Close contexts (or they close with socket)
close(ctx1)
close(ctx2)
close(s)

11. Cross-language Exchange

R to Python (NumPy)

# R: send raw doubles
n <- nano("pair", dial = "ipc:///tmp/nanonext")
n$send(c(1.1, 2.2, 3.3), mode = "raw")
result <- n$recv(mode = "double")
# Python: receive as NumPy array
import numpy as np
import pynng
socket = pynng.Pair0(listen="ipc:///tmp/nanonext")
array = np.frombuffer(socket.recv())
socket.send(array.tobytes())

12. Error Handling

# Errors return as 'errorValue' class
result <- recv(s, block = FALSE)

# Check for errors
is_error_value(result)

# Error codes
# 5  = Timed out
# 6  = Connection refused
# 8  = Try again (non-blocking, no message)

# Get error message
nng_error(5)        # "Timed out"

13. Utilities

# Sleep (uninterruptible, ms)
msleep(100)

# Random bytes
random(8)                          # 8 random bytes as hex string
random(8, convert = FALSE)         # As raw vector

# Parse URL
parse_url("tcp://127.0.0.1:5555")

These binaries (installable software) and packages are in development.
They may not be fully stable and should be used with caution. We make no claims about them.
Health stats visible at Monitor.