mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-16 09:55:07 +00:00
29614e2e52
* Simplifying libwaku.nim by extracting config parser to config.nim * Adding json_base_event.nim * Starting to control-version the libwaku.h We are creating this libwaku.h inspired by the one that is automatically generated by the nim compiler when `make libwaku` is invoked. Therefore, the self-generated header is then placed in: nimcache/release/libwaku/libwaku.h * Better waku_example.c organization * libwaku.nim: better memory management We need to create a 'cstring' internally from the 'const char*' passed from outside the library. We invoke 'allocShared' in order to create the internal 'cstring', and invoke 'deallocShared' in order to manually free the memory.
410 lines
12 KiB
Nim
410 lines
12 KiB
Nim
|
|
import
|
|
std/[json,sequtils,times,strformat,options,atomics,strutils],
|
|
strutils,
|
|
os
|
|
import
|
|
chronicles,
|
|
chronos,
|
|
stew/shims/net
|
|
import
|
|
../../waku/common/enr/builder,
|
|
../../waku/v2/waku_enr/capabilities,
|
|
../../waku/v2/waku_enr/multiaddr,
|
|
../../waku/v2/waku_enr/sharding,
|
|
../../waku/v2/waku_core/message/message,
|
|
../../waku/v2/waku_core/topics/pubsub_topic,
|
|
../../waku/v2/node/peer_manager/peer_manager,
|
|
../../waku/v2/node/waku_node,
|
|
../../waku/v2/node/builder,
|
|
../../waku/v2/node/config,
|
|
../../waku/v2/waku_relay/protocol,
|
|
./events/[json_error_event,json_message_event,json_base_event],
|
|
./config
|
|
|
|
################################################################################
|
|
### Wrapper around the waku node
|
|
################################################################################
|
|
|
|
################################################################################
|
|
### Exported types
|
|
|
|
const RET_OK: cint = 0
|
|
const RET_ERR: cint = 1
|
|
const RET_MISSING_CALLBACK: cint = 2
|
|
|
|
### End of exported types
|
|
################################################################################
|
|
|
|
################################################################################
|
|
### Not-exported components
|
|
|
|
proc alloc(str: cstring): cstring =
|
|
# Byte allocation from the given address.
|
|
# There should be the corresponding manual deallocation with deallocShared !
|
|
let ret = cast[cstring](allocShared(len(str) + 1))
|
|
copyMem(ret, str, len(str) + 1)
|
|
return ret
|
|
|
|
type
|
|
WakuCallBack = proc(msg: ptr cchar, len: csize_t) {.cdecl, gcsafe.}
|
|
|
|
# May keep a reference to a callback defined externally
|
|
var extRelayEventCallback: WakuCallBack = nil
|
|
|
|
proc relayEventCallback(pubsubTopic: string,
|
|
msg: WakuMessage):
|
|
Future[void] {.gcsafe, raises: [Defect].} =
|
|
# Callback that hadles the Waku Relay events. i.e. messages or errors.
|
|
if not isNil(extRelayEventCallback):
|
|
try:
|
|
let event = $JsonMessageEvent.new(pubsubTopic, msg)
|
|
extRelayEventCallback(unsafeAddr event[0], cast[csize_t](len(event)))
|
|
except Exception,CatchableError:
|
|
error "Exception when calling 'eventCallBack': " &
|
|
getCurrentExceptionMsg()
|
|
else:
|
|
error "extRelayEventCallback is nil"
|
|
|
|
var retFut = newFuture[void]()
|
|
retFut.complete()
|
|
return retFut
|
|
|
|
# WakuNode instance
|
|
var node {.threadvar.}: WakuNode
|
|
|
|
### End of not-exported components
|
|
################################################################################
|
|
|
|
################################################################################
|
|
### Exported procs
|
|
|
|
# Every Nim library must have this function called - the name is derived from
|
|
# the `--nimMainPrefix` command line option
|
|
proc NimMain() {.importc.}
|
|
|
|
var initialized: Atomic[bool]
|
|
|
|
proc waku_init_lib() {.dynlib, exportc, cdecl.} =
|
|
if not initialized.exchange(true):
|
|
NimMain() # Every Nim library needs to call `NimMain` once exactly
|
|
when declared(setupForeignThreadGc): setupForeignThreadGc()
|
|
when declared(nimGC_setStackBottom):
|
|
var locals {.volatile, noinit.}: pointer
|
|
locals = addr(locals)
|
|
nimGC_setStackBottom(locals)
|
|
|
|
proc waku_new(configJson: cstring,
|
|
onErrCb: WakuCallback): cint
|
|
{.dynlib, exportc, cdecl.} =
|
|
# Creates a new instance of the WakuNode.
|
|
# Notice that the ConfigNode type is also exported and available for users.
|
|
|
|
if isNil(onErrCb):
|
|
return RET_MISSING_CALLBACK
|
|
|
|
var privateKey: PrivateKey
|
|
var netConfig = NetConfig.init(ValidIpAddress.init("127.0.0.1"),
|
|
Port(60000'u16)).value
|
|
var relay: bool
|
|
var topics = @[""]
|
|
var jsonResp: JsonEvent
|
|
|
|
let cj = configJson.alloc()
|
|
|
|
if not parseConfig($cj,
|
|
privateKey,
|
|
netConfig,
|
|
relay,
|
|
topics,
|
|
jsonResp):
|
|
deallocShared(cj)
|
|
let resp = $jsonResp
|
|
onErrCb(unsafeAddr resp[0], cast[csize_t](len(resp)))
|
|
return RET_ERR
|
|
|
|
deallocShared(cj)
|
|
|
|
var enrBuilder = EnrBuilder.init(privateKey)
|
|
|
|
enrBuilder.withIpAddressAndPorts(
|
|
netConfig.enrIp,
|
|
netConfig.enrPort,
|
|
netConfig.discv5UdpPort
|
|
)
|
|
|
|
if netConfig.wakuFlags.isSome():
|
|
enrBuilder.withWakuCapabilities(netConfig.wakuFlags.get())
|
|
|
|
enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs)
|
|
|
|
let addShardedTopics = enrBuilder.withShardedTopics(topics)
|
|
if addShardedTopics.isErr():
|
|
let resp = $addShardedTopics.error
|
|
onErrCb(unsafeAddr resp[0], cast[csize_t](len(resp)))
|
|
return RET_ERR
|
|
|
|
let recordRes = enrBuilder.build()
|
|
let record =
|
|
if recordRes.isErr():
|
|
let resp = $recordRes.error
|
|
onErrCb(unsafeAddr resp[0], cast[csize_t](len(resp)))
|
|
return RET_ERR
|
|
else: recordRes.get()
|
|
|
|
var builder = WakuNodeBuilder.init()
|
|
builder.withRng(crypto.newRng())
|
|
builder.withNodeKey(privateKey)
|
|
builder.withRecord(record)
|
|
builder.withNetworkConfiguration(netConfig)
|
|
builder.withSwitchConfiguration(
|
|
maxConnections = some(50.int)
|
|
)
|
|
|
|
let wakuNodeRes = builder.build()
|
|
if wakuNodeRes.isErr():
|
|
let errorMsg = "failed to create waku node instance: " & wakuNodeRes.error
|
|
let jsonErrEvent = $JsonErrorEvent.new(errorMsg)
|
|
|
|
onErrCb(unsafeAddr jsonErrEvent[0], cast[csize_t](len(jsonErrEvent)))
|
|
return RET_ERR
|
|
|
|
node = wakuNodeRes.get()
|
|
|
|
if relay:
|
|
waitFor node.mountRelay()
|
|
node.peerManager.start()
|
|
|
|
return RET_OK
|
|
|
|
proc waku_version(onOkCb: WakuCallBack): cint {.dynlib, exportc.} =
|
|
if isNil(onOkCb):
|
|
return RET_MISSING_CALLBACK
|
|
|
|
onOkCb(cast[ptr cchar](WakuNodeVersionString),
|
|
cast[csize_t](len(WakuNodeVersionString)))
|
|
|
|
return RET_OK
|
|
|
|
proc waku_set_relay_callback(callback: WakuCallBack) {.dynlib, exportc.} =
|
|
extRelayEventCallback = callback
|
|
|
|
proc waku_content_topic(appName: cstring,
|
|
appVersion: cuint,
|
|
contentTopicName: cstring,
|
|
encoding: cstring,
|
|
onOkCb: WakuCallBack): cint {.dynlib, exportc.} =
|
|
# https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding
|
|
|
|
if isNil(onOkCb):
|
|
return RET_MISSING_CALLBACK
|
|
|
|
let appStr = appName.alloc()
|
|
let ctnStr = contentTopicName.alloc()
|
|
let encodingStr = encoding.alloc()
|
|
|
|
let contentTopic = fmt"/{$appStr}/{appVersion}/{$ctnStr}/{$encodingStr}"
|
|
onOkCb(unsafeAddr contentTopic[0], cast[csize_t](len(contentTopic)))
|
|
|
|
deallocShared(appStr)
|
|
deallocShared(ctnStr)
|
|
deallocShared(encodingStr)
|
|
|
|
return RET_OK
|
|
|
|
proc waku_pubsub_topic(topicName: cstring,
|
|
onOkCb: WakuCallBack): cint {.dynlib, exportc, cdecl.} =
|
|
if isNil(onOkCb):
|
|
return RET_MISSING_CALLBACK
|
|
|
|
let topicNameStr = topicName.alloc()
|
|
|
|
# https://rfc.vac.dev/spec/36/#extern-char-waku_pubsub_topicchar-name-char-encoding
|
|
let outPubsubTopic = fmt"/waku/2/{$topicNameStr}"
|
|
onOkCb(unsafeAddr outPubsubTopic[0], cast[csize_t](len(outPubsubTopic)))
|
|
|
|
deallocShared(topicNameStr)
|
|
|
|
return RET_OK
|
|
|
|
proc waku_default_pubsub_topic(onOkCb: WakuCallBack): cint {.dynlib, exportc.} =
|
|
# https://rfc.vac.dev/spec/36/#extern-char-waku_default_pubsub_topic
|
|
if isNil(onOkCb):
|
|
return RET_MISSING_CALLBACK
|
|
|
|
onOkCb(cast[ptr cchar](DefaultPubsubTopic),
|
|
cast[csize_t](len(DefaultPubsubTopic)))
|
|
|
|
return RET_OK
|
|
|
|
proc waku_relay_publish(pubSubTopic: cstring,
|
|
jsonWakuMessage: cstring,
|
|
timeoutMs: cuint,
|
|
onOkCb: WakuCallBack,
|
|
onErrCb: WakuCallBack): cint
|
|
|
|
{.dynlib, exportc, cdecl.} =
|
|
# https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms
|
|
|
|
if isNil(onOkCb) or isNil(onErrCb):
|
|
return RET_MISSING_CALLBACK
|
|
|
|
let jwm = jsonWakuMessage.alloc()
|
|
var jsonContent:JsonNode
|
|
try:
|
|
jsonContent = parseJson($jwm)
|
|
except JsonParsingError:
|
|
deallocShared(jwm)
|
|
let msg = fmt"Error parsing json message: {getCurrentExceptionMsg()}"
|
|
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
|
|
return RET_ERR
|
|
|
|
deallocShared(jwm)
|
|
|
|
var wakuMessage: WakuMessage
|
|
try:
|
|
var version = 0'u32
|
|
if jsonContent.hasKey("version"):
|
|
version = (uint32) jsonContent["version"].getInt()
|
|
|
|
wakuMessage = WakuMessage(
|
|
# Visit https://rfc.vac.dev/spec/14/ for further details
|
|
payload: jsonContent["payload"].getStr().toSeq().mapIt(byte (it)),
|
|
contentTopic: $jsonContent["content_topic"].getStr(),
|
|
version: version,
|
|
timestamp: getTime().toUnix(),
|
|
ephemeral: false
|
|
)
|
|
except KeyError:
|
|
let msg = fmt"Problem building the WakuMessage: {getCurrentExceptionMsg()}"
|
|
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
|
|
return RET_ERR
|
|
|
|
let pst = pubSubTopic.alloc()
|
|
|
|
let targetPubSubTopic = if len(pst) == 0:
|
|
DefaultPubsubTopic
|
|
else:
|
|
$pst
|
|
|
|
if node.wakuRelay.isNil():
|
|
let msg = "Can't publish. WakuRelay is not enabled."
|
|
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
|
|
return RET_ERR
|
|
|
|
let pubMsgFut = node.wakuRelay.publish(targetPubSubTopic, wakuMessage)
|
|
|
|
# With the next loop we convert an asynchronous call into a synchronous one
|
|
for i in 0 .. timeoutMs:
|
|
if pubMsgFut.finished():
|
|
break
|
|
sleep(1)
|
|
|
|
if pubMsgFut.finished():
|
|
let numPeers = pubMsgFut.read()
|
|
if numPeers == 0:
|
|
let msg = "Message not sent because no peers found."
|
|
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
|
|
return RET_ERR
|
|
elif numPeers > 0:
|
|
# TODO: pending to return a valid message Id (response when all is correct)
|
|
let msg = "hard-coded-message-id"
|
|
onOkCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
|
|
return RET_OK
|
|
|
|
else:
|
|
let msg = "Timeout expired"
|
|
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
|
|
return RET_ERR
|
|
|
|
proc waku_start() {.dynlib, exportc.} =
|
|
waitFor node.start()
|
|
|
|
proc waku_stop() {.dynlib, exportc.} =
|
|
waitFor node.stop()
|
|
|
|
proc waku_relay_subscribe(
|
|
pubSubTopic: cstring,
|
|
onErrCb: WakuCallBack): cint
|
|
{.dynlib, exportc.} =
|
|
# @params
|
|
# topic: Pubsub topic to subscribe to. If empty, it subscribes to the default pubsub topic.
|
|
if isNil(onErrCb):
|
|
return RET_MISSING_CALLBACK
|
|
|
|
if isNil(extRelayEventCallback):
|
|
let msg = $"""Cannot subscribe without a callback.
|
|
# Kindly set it with the 'waku_set_relay_callback' function"""
|
|
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
|
|
return RET_MISSING_CALLBACK
|
|
|
|
if node.wakuRelay.isNil():
|
|
let msg = $"Cannot subscribe without Waku Relay enabled."
|
|
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
|
|
return RET_ERR
|
|
|
|
let pst = pubSubTopic.alloc()
|
|
node.wakuRelay.subscribe(PubsubTopic($pst),
|
|
WakuRelayHandler(relayEventCallback))
|
|
deallocShared(pst)
|
|
|
|
return RET_OK
|
|
|
|
proc waku_relay_unsubscribe(
|
|
pubSubTopic: cstring,
|
|
onErrCb: WakuCallBack): cint
|
|
{.dynlib, exportc.} =
|
|
# @params
|
|
# topic: Pubsub topic to subscribe to. If empty, it unsubscribes to the default pubsub topic.
|
|
if isNil(onErrCb):
|
|
return RET_MISSING_CALLBACK
|
|
|
|
if isNil(extRelayEventCallback):
|
|
let msg = """Cannot unsubscribe without a callback.
|
|
# Kindly set it with the 'waku_set_relay_callback' function"""
|
|
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
|
|
return RET_MISSING_CALLBACK
|
|
|
|
if node.wakuRelay.isNil():
|
|
let msg = "Cannot unsubscribe without Waku Relay enabled."
|
|
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
|
|
return RET_ERR
|
|
|
|
let pst = pubSubTopic.alloc()
|
|
node.wakuRelay.unsubscribe(PubsubTopic($pst))
|
|
deallocShared(pst)
|
|
|
|
return RET_OK
|
|
|
|
proc waku_connect(peerMultiAddr: cstring,
|
|
timeoutMs: cuint,
|
|
onErrCb: WakuCallBack): cint
|
|
{.dynlib, exportc.} =
|
|
# peerMultiAddr: comma-separated list of fully-qualified multiaddresses.
|
|
# var ret = newString(len + 1)
|
|
# if len > 0:
|
|
# copyMem(addr ret[0], str, len + 1)
|
|
|
|
let address = peerMultiAddr.alloc()
|
|
let peers = ($address).split(",").mapIt(strip(it))
|
|
|
|
# TODO: the timeoutMs is not being used at all!
|
|
let connectFut = node.connectToNodes(peers, source="static")
|
|
while not connectFut.finished():
|
|
poll()
|
|
|
|
deallocShared(address)
|
|
|
|
if not connectFut.completed():
|
|
let msg = "Timeout expired."
|
|
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
|
|
return RET_ERR
|
|
|
|
return RET_OK
|
|
|
|
proc waku_poll() {.dynlib, exportc, gcsafe.} =
|
|
poll()
|
|
|
|
### End of exported procs
|
|
################################################################################
|