chore(cbindings): Thread-safe libwaku. WakuNode instance created directly from the Waku Thread (#1957)

* libwaku: create the WakuNode in the Waku thread. Not in the main thread
* node_lifecycle_request.nim: moving hard-coded value to a const item
* libwaku.nim: start using 'isOkOr' instead of 'isErr()'-approach
* node_lifecycle_request.nim: better 'async' & 'await' usage. Not block the runtime
This commit is contained in:
Ivan Folgueira Bande 2023-09-01 08:37:02 +02:00 committed by GitHub
parent 0c40588aff
commit 68e8d9a79c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 118 additions and 105 deletions

View File

@ -127,7 +127,6 @@ void publish_message(char* pubsubTopic, const char* msg) {
WAKU_CALL( waku_relay_publish(pubsubTopic,
jsonWakuMsg,
10000 /*timeout ms*/,
handle_publish_ok,
handle_error) );
printf("waku relay response [%s]\n", publishResponse);

View File

@ -72,9 +72,18 @@ proc waku_new(configJson: cstring,
if isNil(onErrCb):
return RET_MISSING_CALLBACK
let createThRes = waku_thread.createWakuThread(configJson)
if createThRes.isErr():
let msg = "Error in createWakuThread: " & $createThRes.error
## Create the Waku thread that will keep waiting for req from the main thread.
waku_thread.createWakuThread().isOkOr:
let msg = "Error in createWakuThread: " & $error
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
return RET_ERR
let sendReqRes = waku_thread.sendRequestToWakuThread(
NodeLifecycleRequest.new(
NodeLifecycleMsgType.CREATE_NODE,
configJson))
if sendReqRes.isErr():
let msg = $sendReqRes.error
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
return RET_ERR

View File

@ -6,32 +6,123 @@ import
stew/results,
stew/shims/net
import
../../../waku/common/enr/builder,
../../../waku/waku_enr/capabilities,
../../../waku/waku_enr/multiaddr,
../../../waku/waku_enr/sharding,
../../../waku/waku_core/message/message,
../../../waku/waku_core/topics/pubsub_topic,
../../../waku/node/peer_manager/peer_manager,
../../../waku/waku_core,
../../../waku/node/waku_node,
../../../waku/node/builder,
../../../waku/node/config,
../../../waku/waku_relay/protocol,
../../events/[json_error_event,json_message_event,json_base_event],
../config,
./request
type
NodeLifecycleMsgType* = enum
CREATE_NODE
START_NODE
STOP_NODE
type
NodeLifecycleRequest* = ref object of InterThreadRequest
operation: NodeLifecycleMsgType
configJson: cstring ## Only used in 'CREATE_NODE' operation
proc new*(T: type NodeLifecycleRequest,
op: NodeLifecycleMsgType): T =
op: NodeLifecycleMsgType,
configJson: cstring = ""): T =
return NodeLifecycleRequest(operation: op)
return NodeLifecycleRequest(operation: op, configJson: configJson)
proc createNode(configJson: cstring):
Future[Result[WakuNode, string]] {.async.} =
var privateKey: PrivateKey
var netConfig = NetConfig.init(ValidIpAddress.init("127.0.0.1"),
Port(60000'u16)).value
var relay: bool
var topics = @[""]
var jsonResp: JsonEvent
if not parseConfig($configJson,
privateKey,
netConfig,
relay,
topics,
jsonResp):
return err($jsonResp)
var enrBuilder = EnrBuilder.init(privateKey)
enrBuilder.withIpAddressAndPorts(
netConfig.enrIp,
netConfig.enrPort,
netConfig.discv5UdpPort
)
if netConfig.wakuFlags.isSome():
enrBuilder.withWakuCapabilities(netConfig.wakuFlags.get())
enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs)
let addShardedTopics = enrBuilder.withShardedTopics(topics)
if addShardedTopics.isErr():
let msg = "Error setting shared topics: " & $addShardedTopics.error
return err($JsonErrorEvent.new(msg))
let recordRes = enrBuilder.build()
let record =
if recordRes.isErr():
let msg = "Error building enr record: " & $recordRes.error
return err($JsonErrorEvent.new(msg))
else: recordRes.get()
## TODO: make the next const configurable from 'configJson'.
const MAX_CONNECTIONS = 50.int
var builder = WakuNodeBuilder.init()
builder.withRng(crypto.newRng())
builder.withNodeKey(privateKey)
builder.withRecord(record)
builder.withNetworkConfiguration(netConfig)
builder.withSwitchConfiguration(
maxConnections = some(MAX_CONNECTIONS)
)
let wakuNodeRes = builder.build()
if wakuNodeRes.isErr():
let errorMsg = "failed to create waku node instance: " & wakuNodeRes.error
return err($JsonErrorEvent.new(errorMsg))
var newNode = wakuNodeRes.get()
if relay:
await newNode.mountRelay()
newNode.peerManager.start()
return ok(newNode)
method process*(self: NodeLifecycleRequest,
node: WakuNode): Future[Result[string, string]] {.async.} =
node: ptr WakuNode): Future[Result[string, string]] {.async.} =
case self.operation:
of CREATE_NODE:
let newNodeRes = await createNode(self.configJson)
if newNodeRes.isErr():
return err(newNodeRes.error)
node[] = newNodeRes.get()
of START_NODE:
waitFor node.start()
await node[].start()
of STOP_NODE:
waitFor node.stop()
await node[].stop()
return ok("")

View File

@ -47,12 +47,12 @@ proc connectTo(node: WakuNode,
return ok()
method process*(self: PeerManagementRequest,
node: WakuNode): Future[Result[string, string]] {.async.} =
node: ptr WakuNode): Future[Result[string, string]] {.async.} =
case self.operation:
of CONNECT_TO:
let ret = node.connectTo(self.peerMultiAddr, self.dialTimeout)
let ret = node[].connectTo(self.peerMultiAddr, self.dialTimeout)
if ret.isErr():
return err(ret.error)

View File

@ -38,7 +38,7 @@ proc new*(T: type RelayRequest,
message: message)
method process*(self: RelayRequest,
node: WakuNode): Future[Result[string, string]] {.async.} =
node: ptr WakuNode): Future[Result[string, string]] {.async.} =
if node.wakuRelay.isNil():
return err("Operation not supported without Waku Relay enabled.")

View File

@ -14,7 +14,7 @@ import
type
InterThreadRequest* = ref object of RootObj
method process*(self: InterThreadRequest, node: WakuNode):
method process*(self: InterThreadRequest, node: ptr WakuNode):
Future[Result[string, string]] {.base.} = discard
proc `$`*(self: InterThreadRequest): string =

View File

@ -11,21 +11,8 @@ import
stew/results,
stew/shims/net
import
../../../waku/common/enr/builder,
../../../waku/waku_enr/capabilities,
../../../waku/waku_enr/multiaddr,
../../../waku/waku_enr/sharding,
../../../waku/waku_core/message/message,
../../../waku/waku_core/topics/pubsub_topic,
../../../waku/node/peer_manager/peer_manager,
../../../waku/waku_core,
../../../waku/node/waku_node,
../../../waku/node/builder,
../../../waku/node/config,
../../../waku/waku_relay/protocol,
../events/[json_error_event,json_message_event,json_base_event],
../alloc,
./config,
./inter_thread_communication/request
type
@ -54,91 +41,24 @@ proc waku_init() =
locals = addr(locals)
nimGC_setStackBottom(locals)
proc createNode(configJson: cstring): Result[WakuNode, string] =
var privateKey: PrivateKey
var netConfig = NetConfig.init(ValidIpAddress.init("127.0.0.1"),
Port(60000'u16)).value
var relay: bool
var topics = @[""]
var jsonResp: JsonEvent
let cj = configJson.alloc()
if not parseConfig($cj,
privateKey,
netConfig,
relay,
topics,
jsonResp):
deallocShared(cj)
return err($jsonResp)
deallocShared(cj)
var enrBuilder = EnrBuilder.init(privateKey)
enrBuilder.withIpAddressAndPorts(
netConfig.enrIp,
netConfig.enrPort,
netConfig.discv5UdpPort
)
if netConfig.wakuFlags.isSome():
enrBuilder.withWakuCapabilities(netConfig.wakuFlags.get())
enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs)
let addShardedTopics = enrBuilder.withShardedTopics(topics)
if addShardedTopics.isErr():
let msg = "Error setting shared topics: " & $addShardedTopics.error
return err($JsonErrorEvent.new(msg))
let recordRes = enrBuilder.build()
let record =
if recordRes.isErr():
let msg = "Error building enr record: " & $recordRes.error
return err($JsonErrorEvent.new(msg))
else: recordRes.get()
var builder = WakuNodeBuilder.init()
builder.withRng(crypto.newRng())
builder.withNodeKey(privateKey)
builder.withRecord(record)
builder.withNetworkConfiguration(netConfig)
builder.withSwitchConfiguration(
maxConnections = some(50.int)
)
let wakuNodeRes = builder.build()
if wakuNodeRes.isErr():
let errorMsg = "failed to create waku node instance: " & wakuNodeRes.error
return err($JsonErrorEvent.new(errorMsg))
var newNode = wakuNodeRes.get()
if relay:
waitFor newNode.mountRelay()
newNode.peerManager.start()
return ok(newNode)
proc run(ctx: ptr Context) {.thread.} =
## This is the worker thread body. This thread runs the Waku node
## and attends library user requests (stop, connect_to, etc.)
var node: WakuNode
while running.load == true:
## Trying to get a request from the libwaku main thread
let req = ctx.reqChannel.tryRecv()
if req[0] == true:
let response = waitFor req[1].process(ctx.node)
let response = waitFor req[1].process(addr node)
ctx.respChannel.send( response )
poll()
tearDownForeignThreadGc()
proc createWakuThread*(configJson: cstring): Result[void, string] =
proc createWakuThread*(): Result[void, string] =
## This proc is called from the main thread and it creates
## the Waku working thread.
@ -148,21 +68,15 @@ proc createWakuThread*(configJson: cstring): Result[void, string] =
ctx.reqChannel.open()
ctx.respChannel.open()
let newNodeRes = createNode(configJson)
if newNodeRes.isErr():
return err(newNodeRes.error)
ctx.node = newNodeRes.get()
running.store(true)
try:
createThread(ctx.thread, run, ctx)
except ResourceExhaustedError:
except ValueError, ResourceExhaustedError:
# and freeShared for typed allocations!
freeShared(ctx)
return err("failed to create a thread: " & getCurrentExceptionMsg())
return err("failed to create the Waku thread: " & getCurrentExceptionMsg())
return ok()