refactor(cbindings): libwaku - run waku node in a secondary working thread (#1865)

* Refactoring to have a working waku_thread
This commit is contained in:
Ivan Folgueira Bande 2023-07-31 09:52:04 +02:00 committed by GitHub
parent d2b6075bdc
commit 069c1ad2a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 528 additions and 226 deletions

View File

@ -0,0 +1,58 @@
#include "base64.h"
// Base64 encoding
// source: https://nachtimwald.com/2017/11/18/base64-encode-and-decode-in-c/
size_t b64_encoded_size(size_t inlen)
{
size_t ret;
ret = inlen;
if (inlen % 3 != 0)
ret += 3 - (inlen % 3);
ret /= 3;
ret *= 4;
return ret;
}
const char b64chars[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
char *b64_encode(const unsigned char *in, size_t len)
{
char *out;
size_t elen;
size_t i;
size_t j;
size_t v;
if (in == NULL || len == 0)
return NULL;
elen = b64_encoded_size(len);
out = malloc(elen+1);
out[elen] = '\0';
for (i=0, j=0; i<len; i+=3, j+=4) {
v = in[i];
v = i+1 < len ? v << 8 | in[i+1] : v << 8;
v = i+2 < len ? v << 8 | in[i+2] : v << 8;
out[j] = b64chars[(v >> 18) & 0x3F];
out[j+1] = b64chars[(v >> 12) & 0x3F];
if (i+1 < len) {
out[j+2] = b64chars[(v >> 6) & 0x3F];
} else {
out[j+2] = '=';
}
if (i+2 < len) {
out[j+3] = b64chars[v & 0x3F];
} else {
out[j+3] = '=';
}
}
return out;
}
// End of Base64 encoding

View File

@ -0,0 +1,11 @@
#ifndef _BASE64_H_
#define _BASE64_H_
#include <stdlib.h>
size_t b64_encoded_size(size_t inlen);
char *b64_encode(const unsigned char *in, size_t len);
#endif

View File

@ -79,7 +79,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
static struct argp argp = { options, parse_opt, args_doc, doc, 0, 0, 0 };
char* contentTopic = NULL;
void handle_content_topic(char* msg, size_t len) {
void handle_content_topic(const char* msg, size_t len) {
if (contentTopic != NULL) {
free(contentTopic);
}
@ -89,7 +89,7 @@ void handle_content_topic(char* msg, size_t len) {
}
char* publishResponse = NULL;
void handle_publish_ok(char* msg, size_t len) {
void handle_publish_ok(const char* msg, size_t len) {
printf("Publish Ok: %s %lu\n", msg, len);
if (publishResponse != NULL) {
@ -100,14 +100,14 @@ void handle_publish_ok(char* msg, size_t len) {
strcpy(publishResponse, msg);
}
void handle_error(char* msg, size_t len) {
void handle_error(const char* msg, size_t len) {
printf("Error: %s\n", msg);
exit(1);
}
#define MAX_MSG_SIZE 65535
void publish_message(char* pubsubTopic, char* msg) {
void publish_message(char* pubsubTopic, const char* msg) {
char jsonWakuMsg[MAX_MSG_SIZE];
char *msgPayload = b64_encode(msg, strlen(msg));
@ -138,15 +138,15 @@ void show_help_and_exit() {
exit(1);
}
void event_handler(char* msg, size_t len) {
void event_handler(const char* msg, size_t len) {
printf("Receiving message %s\n", msg);
}
void print_default_pubsub_topic(char* msg, size_t len) {
void print_default_pubsub_topic(const char* msg, size_t len) {
printf("Default pubsub topic: %s\n", msg);
}
void print_waku_version(char* msg, size_t len) {
void print_waku_version(const char* msg, size_t len) {
printf("Git Version: %s\n", msg);
}
@ -243,8 +243,6 @@ void handle_user_input() {
int main(int argc, char** argv) {
waku_init_lib();
struct ConfigNode cfgNode;
// default values
snprintf(cfgNode.host, 128, "0.0.0.0");
@ -272,12 +270,13 @@ int main(int argc, char** argv) {
WAKU_CALL( waku_default_pubsub_topic(print_default_pubsub_topic) );
WAKU_CALL( waku_version(print_waku_version) );
printf("Bind addr: %s:%u\n", cfgNode.host, cfgNode.port);
printf("Waku Relay enabled: %s\n", cfgNode.relay == 1 ? "YES": "NO");
WAKU_CALL( waku_new(jsonConfig, handle_error) );
waku_set_relay_callback(event_handler);
waku_set_event_callback(event_handler);
waku_start();
printf("Establishing connection with: %s\n", cfgNode.peers);
@ -291,6 +290,6 @@ int main(int argc, char** argv) {
show_main_menu();
while(1) {
handle_user_input();
waku_poll();
// waku_poll();
}
}

7
library/alloc.nim Normal file
View File

@ -0,0 +1,7 @@
proc alloc*(str: cstring): cstring =
# Byte allocation from the given address.
# There should be the corresponding manual deallocation with deallocShared !
let ret = cast[cstring](allocShared(len(str) + 1))
copyMem(ret, str, len(str) + 1)
return ret

View File

@ -11,11 +11,7 @@
#define RET_ERR 1
#define RET_MISSING_CALLBACK 2
typedef void (*WakuCallBack) (char* msg, size_t len_0);
// This should only be called once.
// It initializes the nim runtime and GC.
void waku_init_lib(void);
typedef void (*WakuCallBack) (const char* msg, size_t len_0);
// Creates a new instance of the waku node.
// Sets up the waku node from the given configuration.
@ -27,7 +23,7 @@ void waku_stop(void);
int waku_version(WakuCallBack onOkCb);
void waku_set_relay_callback(WakuCallBack callback);
void waku_set_event_callback(WakuCallBack callback);
int waku_content_topic(const char* appName,
unsigned int appVersion,
@ -56,6 +52,4 @@ int waku_connect(const char* peerMultiAddr,
unsigned int timeoutMs,
WakuCallBack onErrCb);
void waku_poll(void);
#endif /* __libwaku__ */

View File

@ -1,26 +1,25 @@
{.pragma: exported, exportc, cdecl, raises: [].}
{.pragma: callback, cdecl, raises: [], gcsafe.}
{.passc: "-fPIC".}
import
std/[json,sequtils,times,strformat,options,atomics,strutils],
strutils,
os
strutils
import
chronicles,
chronos,
stew/shims/net
chronos
import
../../waku/common/enr/builder,
../../waku/v2/waku_enr/capabilities,
../../waku/v2/waku_enr/multiaddr,
../../waku/v2/waku_enr/sharding,
../../waku/v2/waku_core/message/message,
../../waku/v2/waku_core/topics/pubsub_topic,
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/node/waku_node,
../../waku/v2/node/builder,
../../waku/v2/node/config,
../../waku/v2/waku_relay/protocol,
./events/[json_error_event,json_message_event,json_base_event],
./config
../../waku/v2/waku_core/topics/pubsub_topic,
../../../waku/v2/waku_relay/protocol,
./events/json_message_event,
./waku_thread/waku_thread as waku_thread_module,
./waku_thread/inter_thread_communication/node_lifecycle_request,
./waku_thread/inter_thread_communication/peer_manager_request,
./waku_thread/inter_thread_communication/protocols/relay_request,
./alloc
################################################################################
### Wrapper around the waku node
@ -33,45 +32,30 @@ const RET_OK: cint = 0
const RET_ERR: cint = 1
const RET_MISSING_CALLBACK: cint = 2
type
WakuCallBack* = proc(msg: ptr cchar, len: csize_t) {.cdecl, gcsafe.}
### End of exported types
################################################################################
################################################################################
### Not-exported components
proc alloc(str: cstring): cstring =
# Byte allocation from the given address.
# There should be the corresponding manual deallocation with deallocShared !
let ret = cast[cstring](allocShared(len(str) + 1))
copyMem(ret, str, len(str) + 1)
return ret
type
WakuCallBack = proc(msg: ptr cchar, len: csize_t) {.cdecl, gcsafe.}
# May keep a reference to a callback defined externally
var extRelayEventCallback: WakuCallBack = nil
var extEventCallback*: WakuCallBack = nil
proc relayEventCallback(pubsubTopic: string,
msg: WakuMessage):
Future[void] {.gcsafe, raises: [Defect].} =
proc relayEventCallback(pubsubTopic: PubsubTopic,
msg: WakuMessage): Future[void] {.async, gcsafe.} =
# Callback that hadles the Waku Relay events. i.e. messages or errors.
if not isNil(extRelayEventCallback):
if not isNil(extEventCallback):
try:
let event = $JsonMessageEvent.new(pubsubTopic, msg)
extRelayEventCallback(unsafeAddr event[0], cast[csize_t](len(event)))
extEventCallback(unsafeAddr event[0], cast[csize_t](len(event)))
except Exception,CatchableError:
error "Exception when calling 'eventCallBack': " &
getCurrentExceptionMsg()
else:
error "extRelayEventCallback is nil"
var retFut = newFuture[void]()
retFut.complete()
return retFut
# WakuNode instance
var node {.threadvar.}: WakuNode
error "extEventCallback is nil"
### End of not-exported components
################################################################################
@ -79,21 +63,6 @@ var node {.threadvar.}: WakuNode
################################################################################
### Exported procs
# Every Nim library must have this function called - the name is derived from
# the `--nimMainPrefix` command line option
proc NimMain() {.importc.}
var initialized: Atomic[bool]
proc waku_init_lib() {.dynlib, exportc, cdecl.} =
if not initialized.exchange(true):
NimMain() # Every Nim library needs to call `NimMain` once exactly
when declared(setupForeignThreadGc): setupForeignThreadGc()
when declared(nimGC_setStackBottom):
var locals {.volatile, noinit.}: pointer
locals = addr(locals)
nimGC_setStackBottom(locals)
proc waku_new(configJson: cstring,
onErrCb: WakuCallback): cint
{.dynlib, exportc, cdecl.} =
@ -103,78 +72,12 @@ proc waku_new(configJson: cstring,
if isNil(onErrCb):
return RET_MISSING_CALLBACK
var privateKey: PrivateKey
var netConfig = NetConfig.init(ValidIpAddress.init("127.0.0.1"),
Port(60000'u16)).value
var relay: bool
var topics = @[""]
var jsonResp: JsonEvent
let cj = configJson.alloc()
if not parseConfig($cj,
privateKey,
netConfig,
relay,
topics,
jsonResp):
deallocShared(cj)
let resp = $jsonResp
onErrCb(unsafeAddr resp[0], cast[csize_t](len(resp)))
let createThRes = waku_thread_module.createWakuThread(configJson)
if createThRes.isErr():
let msg = "Error in createWakuThread: " & $createThRes.error
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
return RET_ERR
deallocShared(cj)
var enrBuilder = EnrBuilder.init(privateKey)
enrBuilder.withIpAddressAndPorts(
netConfig.enrIp,
netConfig.enrPort,
netConfig.discv5UdpPort
)
if netConfig.wakuFlags.isSome():
enrBuilder.withWakuCapabilities(netConfig.wakuFlags.get())
enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs)
let addShardedTopics = enrBuilder.withShardedTopics(topics)
if addShardedTopics.isErr():
let resp = $addShardedTopics.error
onErrCb(unsafeAddr resp[0], cast[csize_t](len(resp)))
return RET_ERR
let recordRes = enrBuilder.build()
let record =
if recordRes.isErr():
let resp = $recordRes.error
onErrCb(unsafeAddr resp[0], cast[csize_t](len(resp)))
return RET_ERR
else: recordRes.get()
var builder = WakuNodeBuilder.init()
builder.withRng(crypto.newRng())
builder.withNodeKey(privateKey)
builder.withRecord(record)
builder.withNetworkConfiguration(netConfig)
builder.withSwitchConfiguration(
maxConnections = some(50.int)
)
let wakuNodeRes = builder.build()
if wakuNodeRes.isErr():
let errorMsg = "failed to create waku node instance: " & wakuNodeRes.error
let jsonErrEvent = $JsonErrorEvent.new(errorMsg)
onErrCb(unsafeAddr jsonErrEvent[0], cast[csize_t](len(jsonErrEvent)))
return RET_ERR
node = wakuNodeRes.get()
if relay:
waitFor node.mountRelay()
node.peerManager.start()
return RET_OK
proc waku_version(onOkCb: WakuCallBack): cint {.dynlib, exportc.} =
@ -186,8 +89,8 @@ proc waku_version(onOkCb: WakuCallBack): cint {.dynlib, exportc.} =
return RET_OK
proc waku_set_relay_callback(callback: WakuCallBack) {.dynlib, exportc.} =
extRelayEventCallback = callback
proc waku_set_event_callback(callback: WakuCallBack) {.dynlib, exportc.} =
extEventCallback = callback
proc waku_content_topic(appName: cstring,
appVersion: cuint,
@ -287,123 +190,84 @@ proc waku_relay_publish(pubSubTopic: cstring,
else:
$pst
if node.wakuRelay.isNil():
let msg = "Can't publish. WakuRelay is not enabled."
let sendReqRes = waku_thread_module.sendRequestToWakuThread(
RelayRequest.new(RelayMsgType.PUBLISH,
PubsubTopic($pst),
WakuRelayHandler(relayEventCallback),
wakuMessage))
deallocShared(pst)
if sendReqRes.isErr():
let msg = $sendReqRes.error
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
return RET_ERR
let pubMsgFut = node.wakuRelay.publish(targetPubSubTopic, wakuMessage)
# With the next loop we convert an asynchronous call into a synchronous one
for i in 0 .. timeoutMs:
if pubMsgFut.finished():
break
sleep(1)
if pubMsgFut.finished():
let numPeers = pubMsgFut.read()
if numPeers == 0:
let msg = "Message not sent because no peers found."
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
return RET_ERR
elif numPeers > 0:
# TODO: pending to return a valid message Id (response when all is correct)
let msg = "hard-coded-message-id"
onOkCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
return RET_OK
else:
let msg = "Timeout expired"
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
return RET_ERR
return RET_OK
proc waku_start() {.dynlib, exportc.} =
waitFor node.start()
discard waku_thread_module.sendRequestToWakuThread(
NodeLifecycleRequest.new(
NodeLifecycleMsgType.START_NODE))
proc waku_stop() {.dynlib, exportc.} =
waitFor node.stop()
discard waku_thread_module.sendRequestToWakuThread(
NodeLifecycleRequest.new(
NodeLifecycleMsgType.STOP_NODE))
proc waku_relay_subscribe(
pubSubTopic: cstring,
onErrCb: WakuCallBack): cint
{.dynlib, exportc.} =
# @params
# topic: Pubsub topic to subscribe to. If empty, it subscribes to the default pubsub topic.
if isNil(onErrCb):
return RET_MISSING_CALLBACK
if isNil(extRelayEventCallback):
let msg = $"""Cannot subscribe without a callback.
# Kindly set it with the 'waku_set_relay_callback' function"""
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
return RET_MISSING_CALLBACK
if node.wakuRelay.isNil():
let msg = $"Cannot subscribe without Waku Relay enabled."
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
return RET_ERR
let pst = pubSubTopic.alloc()
node.wakuRelay.subscribe(PubsubTopic($pst),
WakuRelayHandler(relayEventCallback))
let sendReqRes = waku_thread_module.sendRequestToWakuThread(
RelayRequest.new(RelayMsgType.SUBSCRIBE,
PubsubTopic($pst),
WakuRelayHandler(relayEventCallback)))
deallocShared(pst)
if sendReqRes.isErr():
let msg = $sendReqRes.error
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
return RET_ERR
return RET_OK
proc waku_relay_unsubscribe(
pubSubTopic: cstring,
onErrCb: WakuCallBack): cint
{.dynlib, exportc.} =
# @params
# topic: Pubsub topic to subscribe to. If empty, it unsubscribes to the default pubsub topic.
if isNil(onErrCb):
return RET_MISSING_CALLBACK
if isNil(extRelayEventCallback):
let msg = """Cannot unsubscribe without a callback.
# Kindly set it with the 'waku_set_relay_callback' function"""
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
return RET_MISSING_CALLBACK
if node.wakuRelay.isNil():
let msg = "Cannot unsubscribe without Waku Relay enabled."
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
return RET_ERR
let pst = pubSubTopic.alloc()
node.wakuRelay.unsubscribe(PubsubTopic($pst))
let sendReqRes = waku_thread_module.sendRequestToWakuThread(
RelayRequest.new(RelayMsgType.UNSUBSCRIBE,
PubsubTopic($pst),
WakuRelayHandler(relayEventCallback)))
deallocShared(pst)
if sendReqRes.isErr():
let msg = $sendReqRes.error
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
return RET_ERR
return RET_OK
proc waku_connect(peerMultiAddr: cstring,
timeoutMs: cuint,
onErrCb: WakuCallBack): cint
{.dynlib, exportc.} =
# peerMultiAddr: comma-separated list of fully-qualified multiaddresses.
# var ret = newString(len + 1)
# if len > 0:
# copyMem(addr ret[0], str, len + 1)
let address = peerMultiAddr.alloc()
let peers = ($address).split(",").mapIt(strip(it))
# TODO: the timeoutMs is not being used at all!
let connectFut = node.connectToNodes(peers, source="static")
while not connectFut.finished():
poll()
deallocShared(address)
if not connectFut.completed():
let msg = "Timeout expired."
let connRes = waku_thread_module.sendRequestToWakuThread(
PeerManagementRequest.new(
PeerManagementMsgType.CONNECT_TO,
$peerMultiAddr,
chronos.milliseconds(timeoutMs)))
if connRes.isErr():
let msg = $connRes.error
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
return RET_ERR
return RET_OK
proc waku_poll() {.dynlib, exportc, gcsafe.} =
poll()
### End of exported procs
################################################################################

View File

@ -10,7 +10,7 @@ import
../../waku/common/utils/nat,
../../waku/v2/node/waku_node,
../../waku/v2/node/config,
./events/[json_error_event,json_base_event]
../events/[json_error_event,json_base_event]
proc parsePrivateKey(jsonNode: JsonNode,
privateKey: var PrivateKey,

View File

@ -0,0 +1,37 @@
import
std/options
import
chronos,
stew/results,
stew/shims/net
import
../../../waku/v2/node/waku_node,
./request
type
NodeLifecycleMsgType* = enum
START_NODE
STOP_NODE
type
NodeLifecycleRequest* = ref object of InterThreadRequest
operation: NodeLifecycleMsgType
proc new*(T: type NodeLifecycleRequest,
op: NodeLifecycleMsgType): T =
return NodeLifecycleRequest(operation: op)
method process*(self: NodeLifecycleRequest,
node: WakuNode): Future[Result[string, string]] {.async.} =
case self.operation:
of START_NODE:
waitFor node.start()
of STOP_NODE:
waitFor node.stop()
return ok("")

View File

@ -0,0 +1,59 @@
import
std/[options,sequtils,strutils]
import
chronicles,
chronos,
stew/results,
stew/shims/net
import
../../../waku/v2/node/waku_node,
./request
type
PeerManagementMsgType* = enum
CONNECT_TO
type
PeerManagementRequest* = ref object of InterThreadRequest
operation: PeerManagementMsgType
peerMultiAddr: string
dialTimeout: Duration
proc new*(T: type PeerManagementRequest,
op: PeerManagementMsgType,
peerMultiAddr: string,
dialTimeout: Duration): T =
return PeerManagementRequest(operation: op,
peerMultiAddr: peerMultiAddr,
dialTimeout: dialTimeout)
proc connectTo(node: WakuNode,
peerMultiAddr: string,
dialTimeout: Duration): Result[void, string] =
let peers = (peerMultiAddr).split(",").mapIt(strip(it))
# TODO: the dialTimeout is not being used at all!
let connectFut = node.connectToNodes(peers, source="static")
while not connectFut.finished():
poll()
if not connectFut.completed():
let msg = "Timeout expired."
return err(msg)
return ok()
method process*(self: PeerManagementRequest,
node: WakuNode): Future[Result[string, string]] {.async.} =
case self.operation:
of CONNECT_TO:
let ret = node.connectTo(self.peerMultiAddr, self.dialTimeout)
if ret.isErr():
return err(ret.error)
return ok("")

View File

@ -0,0 +1,64 @@
import
std/[options,sequtils,strutils]
import
chronicles,
chronos,
stew/results,
stew/shims/net
import
../../../../waku/v2/waku_core/message/message,
../../../../waku/v2/node/waku_node,
../../../../waku/v2/waku_core/topics/pubsub_topic,
../../../../waku/v2/waku_relay/protocol,
../request
type
RelayMsgType* = enum
SUBSCRIBE
UNSUBSCRIBE
PUBLISH
type
RelayRequest* = ref object of InterThreadRequest
operation: RelayMsgType
pubsubTopic: PubsubTopic
relayEventCallback: WakuRelayHandler # not used in 'PUBLISH' requests
message: WakuMessage # this field is only used in 'PUBLISH' requests
proc new*(T: type RelayRequest,
op: RelayMsgType,
pubsubTopic: PubsubTopic,
relayEventCallback: WakuRelayHandler = nil,
message = WakuMessage()): T =
return RelayRequest(operation: op,
pubsubTopic: pubsubTopic,
relayEventCallback: relayEventCallback,
message: message)
method process*(self: RelayRequest,
node: WakuNode): Future[Result[string, string]] {.async.} =
if node.wakuRelay.isNil():
return err("Operation not supported without Waku Relay enabled.")
case self.operation:
of SUBSCRIBE:
node.wakuRelay.subscribe(self.pubsubTopic, self.relayEventCallback)
of UNSUBSCRIBE:
node.wakuRelay.unsubscribe(self.pubsubTopic)
of PUBLISH:
let numPeers = await node.wakuRelay.publish(self.pubsubTopic,
self.message)
if numPeers == 0:
return err("Message not sent because no peers found.")
elif numPeers > 0:
# TODO: pending to return a valid message Id
return ok("hard-coded-message-id")
return ok("")

View File

@ -0,0 +1,21 @@
# This file contains the base message request type that will be handled
# by the Waku Node thread.
import
std/json,
stew/results
import
chronos
import
../../../waku/v2/node/waku_node,
../waku_thread
type
InterThreadRequest* = ref object of RootObj
method process*(self: InterThreadRequest, node: WakuNode):
Future[Result[string, string]] {.base.} = discard
proc `$`*(self: InterThreadRequest): string =
return $( %* self )

View File

@ -0,0 +1,188 @@
{.pragma: exported, exportc, cdecl, raises: [].}
{.pragma: callback, cdecl, raises: [], gcsafe.}
{.passc: "-fPIC".}
import
std/[json,sequtils,times,strformat,options,atomics,strutils,os]
import
chronicles,
chronos,
stew/results,
stew/shims/net
import
../../../waku/common/enr/builder,
../../../waku/v2/waku_enr/capabilities,
../../../waku/v2/waku_enr/multiaddr,
../../../waku/v2/waku_enr/sharding,
../../../waku/v2/waku_core/message/message,
../../../waku/v2/waku_core/topics/pubsub_topic,
../../../waku/v2/node/peer_manager/peer_manager,
../../../waku/v2/waku_core,
../../../waku/v2/node/waku_node,
../../../waku/v2/node/builder,
../../../waku/v2/node/config,
../../../waku/v2/waku_relay/protocol,
../events/[json_error_event,json_message_event,json_base_event],
../alloc,
./config,
./inter_thread_communication/request
type
Context* = object
thread: Thread[(ptr Context)]
reqChannel: Channel[InterThreadRequest]
respChannel: Channel[Result[string, string]]
node: WakuNode
var ctx {.threadvar.}: ptr Context
# To control when the thread is running
var running: Atomic[bool]
# Every Nim library must have this function called - the name is derived from
# the `--nimMainPrefix` command line option
proc NimMain() {.importc.}
var initialized: Atomic[bool]
proc waku_init() =
if not initialized.exchange(true):
NimMain() # Every Nim library needs to call `NimMain` once exactly
when declared(setupForeignThreadGc): setupForeignThreadGc()
when declared(nimGC_setStackBottom):
var locals {.volatile, noinit.}: pointer
locals = addr(locals)
nimGC_setStackBottom(locals)
proc createNode(configJson: cstring): Result[WakuNode, string] =
var privateKey: PrivateKey
var netConfig = NetConfig.init(ValidIpAddress.init("127.0.0.1"),
Port(60000'u16)).value
var relay: bool
var topics = @[""]
var jsonResp: JsonEvent
let cj = configJson.alloc()
if not parseConfig($cj,
privateKey,
netConfig,
relay,
topics,
jsonResp):
deallocShared(cj)
return err($jsonResp)
deallocShared(cj)
var enrBuilder = EnrBuilder.init(privateKey)
enrBuilder.withIpAddressAndPorts(
netConfig.enrIp,
netConfig.enrPort,
netConfig.discv5UdpPort
)
if netConfig.wakuFlags.isSome():
enrBuilder.withWakuCapabilities(netConfig.wakuFlags.get())
enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs)
let addShardedTopics = enrBuilder.withShardedTopics(topics)
if addShardedTopics.isErr():
let msg = "Error setting shared topics: " & $addShardedTopics.error
return err($JsonErrorEvent.new(msg))
let recordRes = enrBuilder.build()
let record =
if recordRes.isErr():
let msg = "Error building enr record: " & $recordRes.error
return err($JsonErrorEvent.new(msg))
else: recordRes.get()
var builder = WakuNodeBuilder.init()
builder.withRng(crypto.newRng())
builder.withNodeKey(privateKey)
builder.withRecord(record)
builder.withNetworkConfiguration(netConfig)
builder.withSwitchConfiguration(
maxConnections = some(50.int)
)
let wakuNodeRes = builder.build()
if wakuNodeRes.isErr():
let errorMsg = "failed to create waku node instance: " & wakuNodeRes.error
return err($JsonErrorEvent.new(errorMsg))
var newNode = wakuNodeRes.get()
if relay:
waitFor newNode.mountRelay()
newNode.peerManager.start()
return ok(newNode)
proc run(ctx: ptr Context) {.thread.} =
## This is the worker thread body. This thread runs the Waku node
## and attends library user requests (stop, connect_to, etc.)
while running.load == true:
## Trying to get a request from the libwaku main thread
let req = ctx.reqChannel.tryRecv()
if req[0] == true:
let response = waitFor req[1].process(ctx.node)
ctx.respChannel.send( response )
poll()
tearDownForeignThreadGc()
proc createWakuThread*(configJson: cstring): Result[void, string] =
## This proc is called from the main thread and it creates
## the Waku working thread.
waku_init()
ctx = createShared(Context, 1)
ctx.reqChannel.open()
ctx.respChannel.open()
let newNodeRes = createNode(configJson)
if newNodeRes.isErr():
return err(newNodeRes.error)
ctx.node = newNodeRes.get()
running.store(true)
try:
createThread(ctx.thread, run, ctx)
except ResourceExhaustedError:
# and freeShared for typed allocations!
freeShared(ctx)
return err("failed to create a thread: " & getCurrentExceptionMsg())
return ok()
proc stopWakuNodeThread*() =
running.store(false)
joinThread(ctx.thread)
ctx.reqChannel.close()
ctx.respChannel.close()
freeShared(ctx)
proc sendRequestToWakuThread*(req: InterThreadRequest): Result[string, string] =
ctx.reqChannel.send(req)
var resp = ctx.respChannel.tryRecv()
while resp[0] == false:
resp = ctx.respChannel.tryRecv()
os.sleep(1)
return resp[1]