refactor(cbindings): Thread-safe communication between the main thread and the Waku Thread (#1978)

* Thread-safe comms between main thread & Waku Thread - ChannelSPSCSingle.
* Renaming procs from 'new' to 'createShared'. They use the shared allocator.
* peer_manager_request: no need to use ptr WakuNode.
* waku_thread: moving the 'waitFor' to upper layer.
* waku_thread: `poll()` -> `waitFor sleepAsync(1)` to avoid risk of blocking.
* libwaku: thread-safe "sub-objects" in an inter-thread requests.
  When two threads send data each other, that data cannot contain any
  GC'ed type (string, seq, ref, closures) at any level.
* Allocating the 'configJson' in main thread and deallocating in Waku Thread.
This commit is contained in:
Ivan Folgueira Bande 2023-09-18 09:21:50 +02:00 committed by GitHub
parent 2e515a06ed
commit 72f90663cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 378 additions and 166 deletions

5
.gitmodules vendored
View File

@ -154,3 +154,8 @@
url = https://github.com/nitely/nim-unicodedb.git
ignore = untracked
branch = master
[submodule "vendor/nim-taskpools"]
path = vendor/nim-taskpools
url = https://github.com/status-im/nim-taskpools.git
ignore = untracked
branch = stable

View File

@ -1,3 +1,5 @@
## Can be shared safely between threads
type SharedSeq*[T] = tuple[data: ptr UncheckedArray[T], len: int]
proc alloc*(str: cstring): cstring =
# Byte allocation from the given address.
@ -5,3 +7,30 @@ proc alloc*(str: cstring): cstring =
let ret = cast[cstring](allocShared(len(str) + 1))
copyMem(ret, str, len(str) + 1)
return ret
proc alloc*(str: string): cstring =
## Byte allocation from the given address.
## There should be the corresponding manual deallocation with deallocShared !
var ret = cast[cstring](allocShared(str.len + 1))
let s = cast[seq[char]](str)
for i in 0..<str.len:
ret[i] = s[i]
ret[str.len] = '\0'
return ret
proc allocSharedSeq*[T](s: seq[T]): SharedSeq[T] =
let data = cast[ptr T](allocShared(s.len))
copyMem(data, unsafeAddr s, s.len)
return (cast[ptr UncheckedArray[T]](data), s.len)
proc deallocSharedSeq*[T](s: var SharedSeq[T]) =
deallocShared(s.data)
s.len = 0
proc toSeq*[T](s: SharedSeq[T]): seq[T] =
## Creates a seq[T] from a SharedSeq[T]. No explicit dealloc is required
## as req[T] is a GC managed type.
var ret = newSeq[T]()
for i in 0..<s.len:
ret.add(s.data[i])
return ret

View File

@ -16,9 +16,10 @@ import
../../../waku/waku_relay/protocol,
./events/json_message_event,
./waku_thread/waku_thread,
./waku_thread/inter_thread_communication/node_lifecycle_request,
./waku_thread/inter_thread_communication/peer_manager_request,
./waku_thread/inter_thread_communication/protocols/relay_request,
./waku_thread/inter_thread_communication/requests/node_lifecycle_request,
./waku_thread/inter_thread_communication/requests/peer_manager_request,
./waku_thread/inter_thread_communication/requests/protocols/relay_request,
./waku_thread/inter_thread_communication/waku_thread_request,
./alloc
################################################################################
@ -79,7 +80,8 @@ proc waku_new(configJson: cstring,
return RET_ERR
let sendReqRes = waku_thread.sendRequestToWakuThread(
NodeLifecycleRequest.new(
RequestType.LIFECYCLE,
NodeLifecycleRequest.createShared(
NodeLifecycleMsgType.CREATE_NODE,
configJson))
if sendReqRes.isErr():
@ -199,7 +201,8 @@ proc waku_relay_publish(pubSubTopic: cstring,
$pst
let sendReqRes = waku_thread.sendRequestToWakuThread(
RelayRequest.new(RelayMsgType.PUBLISH,
RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.PUBLISH,
PubsubTopic($pst),
WakuRelayHandler(relayEventCallback),
wakuMessage))
@ -214,12 +217,14 @@ proc waku_relay_publish(pubSubTopic: cstring,
proc waku_start() {.dynlib, exportc.} =
discard waku_thread.sendRequestToWakuThread(
NodeLifecycleRequest.new(
RequestType.LIFECYCLE,
NodeLifecycleRequest.createShared(
NodeLifecycleMsgType.START_NODE))
proc waku_stop() {.dynlib, exportc.} =
discard waku_thread.sendRequestToWakuThread(
NodeLifecycleRequest.new(
RequestType.LIFECYCLE,
NodeLifecycleRequest.createShared(
NodeLifecycleMsgType.STOP_NODE))
proc waku_relay_subscribe(
@ -228,8 +233,10 @@ proc waku_relay_subscribe(
{.dynlib, exportc.} =
let pst = pubSubTopic.alloc()
let sendReqRes = waku_thread.sendRequestToWakuThread(
RelayRequest.new(RelayMsgType.SUBSCRIBE,
RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.SUBSCRIBE,
PubsubTopic($pst),
WakuRelayHandler(relayEventCallback)))
deallocShared(pst)
@ -247,8 +254,10 @@ proc waku_relay_unsubscribe(
{.dynlib, exportc.} =
let pst = pubSubTopic.alloc()
let sendReqRes = waku_thread.sendRequestToWakuThread(
RelayRequest.new(RelayMsgType.UNSUBSCRIBE,
RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.SUBSCRIBE,
PubsubTopic($pst),
WakuRelayHandler(relayEventCallback)))
deallocShared(pst)
@ -266,7 +275,8 @@ proc waku_connect(peerMultiAddr: cstring,
{.dynlib, exportc.} =
let connRes = waku_thread.sendRequestToWakuThread(
PeerManagementRequest.new(
RequestType.PEER_MANAGER,
PeerManagementRequest.createShared(
PeerManagementMsgType.CONNECT_TO,
$peerMultiAddr,
chronos.milliseconds(timeoutMs)))

View File

@ -1,64 +0,0 @@
import
std/[options,sequtils,strutils]
import
chronicles,
chronos,
stew/results,
stew/shims/net
import
../../../../waku/waku_core/message/message,
../../../../waku/node/waku_node,
../../../../waku/waku_core/topics/pubsub_topic,
../../../../waku/waku_relay/protocol,
../request
type
RelayMsgType* = enum
SUBSCRIBE
UNSUBSCRIBE
PUBLISH
type
RelayRequest* = ref object of InterThreadRequest
operation: RelayMsgType
pubsubTopic: PubsubTopic
relayEventCallback: WakuRelayHandler # not used in 'PUBLISH' requests
message: WakuMessage # this field is only used in 'PUBLISH' requests
proc new*(T: type RelayRequest,
op: RelayMsgType,
pubsubTopic: PubsubTopic,
relayEventCallback: WakuRelayHandler = nil,
message = WakuMessage()): T =
return RelayRequest(operation: op,
pubsubTopic: pubsubTopic,
relayEventCallback: relayEventCallback,
message: message)
method process*(self: RelayRequest,
node: ptr WakuNode): Future[Result[string, string]] {.async.} =
if node.wakuRelay.isNil():
return err("Operation not supported without Waku Relay enabled.")
case self.operation:
of SUBSCRIBE:
node.wakuRelay.subscribe(self.pubsubTopic, self.relayEventCallback)
of UNSUBSCRIBE:
node.wakuRelay.unsubscribe(self.pubsubTopic)
of PUBLISH:
let numPeers = await node.wakuRelay.publish(self.pubsubTopic,
self.message)
if numPeers == 0:
return err("Message not sent because no peers found.")
elif numPeers > 0:
# TODO: pending to return a valid message Id
return ok("hard-coded-message-id")
return ok("")

View File

@ -1,21 +0,0 @@
# This file contains the base message request type that will be handled
# by the Waku Node thread.
import
std/json,
stew/results
import
chronos
import
../../../waku/node/waku_node,
../waku_thread
type
InterThreadRequest* = ref object of RootObj
method process*(self: InterThreadRequest, node: ptr WakuNode):
Future[Result[string, string]] {.base.} = discard
proc `$`*(self: InterThreadRequest): string =
return $( %* self )

View File

@ -6,21 +6,21 @@ import
stew/results,
stew/shims/net
import
../../../waku/common/enr/builder,
../../../waku/waku_enr/capabilities,
../../../waku/waku_enr/multiaddr,
../../../waku/waku_enr/sharding,
../../../waku/waku_core/message/message,
../../../waku/waku_core/topics/pubsub_topic,
../../../waku/node/peer_manager/peer_manager,
../../../waku/waku_core,
../../../waku/node/waku_node,
../../../waku/node/builder,
../../../waku/node/config,
../../../waku/waku_relay/protocol,
../../events/[json_error_event,json_message_event,json_base_event],
../config,
./request
../../../../waku/common/enr/builder,
../../../../waku/waku_enr/capabilities,
../../../../waku/waku_enr/multiaddr,
../../../../waku/waku_enr/sharding,
../../../../waku/waku_core/message/message,
../../../../waku/waku_core/topics/pubsub_topic,
../../../../waku/node/peer_manager/peer_manager,
../../../../waku/waku_core,
../../../../waku/node/waku_node,
../../../../waku/node/builder,
../../../../waku/node/config,
../../../../waku/waku_relay/protocol,
../../../events/[json_error_event,json_message_event,json_base_event],
../../../alloc,
../../config
type
NodeLifecycleMsgType* = enum
@ -29,15 +29,22 @@ type
STOP_NODE
type
NodeLifecycleRequest* = ref object of InterThreadRequest
NodeLifecycleRequest* = object
operation: NodeLifecycleMsgType
configJson: cstring ## Only used in 'CREATE_NODE' operation
proc new*(T: type NodeLifecycleRequest,
proc createShared*(T: type NodeLifecycleRequest,
op: NodeLifecycleMsgType,
configJson: cstring = ""): T =
configJson: cstring = ""): ptr type T =
return NodeLifecycleRequest(operation: op, configJson: configJson)
var ret = createShared(T)
ret[].operation = op
ret[].configJson = configJson.alloc()
return ret
proc destroyShared(self: ptr NodeLifecycleRequest) =
deallocShared(self[].configJson)
deallocShared(self)
proc createNode(configJson: cstring):
Future[Result[WakuNode, string]] {.async.} =
@ -108,9 +115,11 @@ proc createNode(configJson: cstring):
return ok(newNode)
method process*(self: NodeLifecycleRequest,
proc process*(self: ptr NodeLifecycleRequest,
node: ptr WakuNode): Future[Result[string, string]] {.async.} =
defer: destroyShared(self)
case self.operation:
of CREATE_NODE:
let newNodeRes = await createNode(self.configJson)

View File

@ -7,27 +7,33 @@ import
stew/results,
stew/shims/net
import
../../../waku/node/waku_node,
./request
../../../../waku/node/waku_node,
../../../alloc
type
PeerManagementMsgType* = enum
CONNECT_TO
type
PeerManagementRequest* = ref object of InterThreadRequest
PeerManagementRequest* = object
operation: PeerManagementMsgType
peerMultiAddr: string
peerMultiAddr: cstring
dialTimeout: Duration
proc new*(T: type PeerManagementRequest,
proc createShared*(T: type PeerManagementRequest,
op: PeerManagementMsgType,
peerMultiAddr: string,
dialTimeout: Duration): T =
dialTimeout: Duration): ptr type T =
return PeerManagementRequest(operation: op,
peerMultiAddr: peerMultiAddr,
dialTimeout: dialTimeout)
var ret = createShared(T)
ret[].operation = op
ret[].peerMultiAddr = peerMultiAddr.alloc()
ret[].dialTimeout = dialTimeout
return ret
proc destroyShared(self: ptr PeerManagementRequest) =
deallocShared(self[].peerMultiAddr)
deallocShared(self)
proc connectTo(node: WakuNode,
peerMultiAddr: string,
@ -46,13 +52,15 @@ proc connectTo(node: WakuNode,
return ok()
method process*(self: PeerManagementRequest,
node: ptr WakuNode): Future[Result[string, string]] {.async.} =
proc process*(self: ptr PeerManagementRequest,
node: WakuNode): Future[Result[string, string]] {.async.} =
defer: destroyShared(self)
case self.operation:
of CONNECT_TO:
let ret = node[].connectTo(self.peerMultiAddr, self.dialTimeout)
let ret = node.connectTo($self[].peerMultiAddr, self[].dialTimeout)
if ret.isErr():
return err(ret.error)

View File

@ -0,0 +1,114 @@
import
std/[options,sequtils,strutils]
import
chronicles,
chronos,
stew/results,
stew/shims/net
import
../../../../../waku/waku_core/message/message,
../../../../../waku/node/waku_node,
../../../../../waku/waku_core/time, # Timestamp
../../../../../waku/waku_core/topics/pubsub_topic,
../../../../../waku/waku_relay/protocol,
../../../../alloc
type
RelayMsgType* = enum
SUBSCRIBE
UNSUBSCRIBE
PUBLISH
type
ThreadSafeWakuMessage* = object
payload: SharedSeq[byte]
contentTopic: cstring
meta: SharedSeq[byte]
version: uint32
timestamp: Timestamp
ephemeral: bool
when defined(rln):
proof: SharedSeq[byte]
type
RelayRequest* = object
operation: RelayMsgType
pubsubTopic: cstring
relayEventCallback: WakuRelayHandler # not used in 'PUBLISH' requests
message: ThreadSafeWakuMessage # only used in 'PUBLISH' requests
proc createShared*(T: type RelayRequest,
op: RelayMsgType,
pubsubTopic: PubsubTopic,
relayEventCallback: WakuRelayHandler = nil,
m = WakuMessage()): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].pubsubTopic = pubsubTopic.alloc()
ret[].relayEventCallback = relayEventCallback
ret[].message = ThreadSafeWakuMessage(
payload: allocSharedSeq(m.payload),
contentTopic: m.contentTopic.alloc(),
meta: allocSharedSeq(m.meta),
version: m.version,
timestamp: m.timestamp,
ephemeral: m.ephemeral,
)
when defined(rln):
ret[].message.proof = allocSharedSeq(m.proof)
return ret
proc destroyShared(self: ptr RelayRequest) =
deallocSharedSeq(self[].message.payload)
deallocShared(self[].message.contentTopic)
deallocSharedSeq(self[].message.meta)
when defined(rln):
deallocSharedSeq(self[].message.proof)
deallocShared(self)
proc toWakuMessage(m: ThreadSafeWakuMessage): WakuMessage =
var wakuMessage = WakuMessage()
wakuMessage.payload = m.payload.toSeq()
wakuMessage.contentTopic = $m.contentTopic
wakuMessage.meta = m.meta.toSeq()
wakuMessage.version = m.version
wakuMessage.timestamp = m.timestamp
wakuMessage.ephemeral = m.ephemeral
when defined(rln):
wakuMessage.proof = m.proof
return wakuMessage
proc process*(self: ptr RelayRequest,
node: ptr WakuNode): Future[Result[string, string]] {.async.} =
defer: destroyShared(self)
if node.wakuRelay.isNil():
return err("Operation not supported without Waku Relay enabled.")
case self.operation:
of SUBSCRIBE:
node.wakuRelay.subscribe($self.pubsubTopic, self.relayEventCallback)
of UNSUBSCRIBE:
node.wakuRelay.unsubscribe($self.pubsubTopic)
of PUBLISH:
let numPeers = await node.wakuRelay.publish($self.pubsubTopic,
self.message.toWakuMessage())
if numPeers == 0:
return err("Message not sent because no peers found.")
elif numPeers > 0:
# TODO: pending to return a valid message Id
return ok("hard-coded-message-id")
return ok("")

View File

@ -0,0 +1,57 @@
## This file contains the base message request type that will be handled.
## The requests are created by the main thread and processed by
## the Waku Thread.
import
std/json,
stew/results
import
chronos
import
../../../waku/node/waku_node,
./requests/node_lifecycle_request,
./requests/peer_manager_request,
./requests/protocols/relay_request
type
RequestType* {.pure.} = enum
LIFECYCLE,
PEER_MANAGER,
RELAY,
type
InterThreadRequest* = object
reqType: RequestType
reqContent: pointer
proc createShared*(T: type InterThreadRequest,
reqType: RequestType,
reqContent: pointer): ptr type T =
var ret = createShared(T)
ret[].reqType = reqType
ret[].reqContent = reqContent
return ret
proc process*(T: type InterThreadRequest,
request: ptr InterThreadRequest,
node: ptr WakuNode):
Future[Result[string, string]] {.async.} =
## Processes the request and deallocates its memory
defer: deallocShared(request)
echo "Request received: " & $request[].reqType
let retFut =
case request[].reqType
of LIFECYCLE:
cast[ptr NodeLifecycleRequest](request[].reqContent).process(node)
of PEER_MANAGER:
cast[ptr PeerManagementRequest](request[].reqContent).process(node[])
of RELAY:
cast[ptr RelayRequest](request[].reqContent).process(node)
return await retFut
proc `$`*(self: InterThreadRequest): string =
return $self.reqType

View File

@ -0,0 +1,53 @@
## This file contains the base message response type that will be handled.
## The response will be created from the Waku Thread and processed in
## the main thread.
import
std/json,
stew/results
type
ResponseType {.pure.} = enum
OK,
ERR,
type
InterThreadResponse* = object
respType: ResponseType
content: cstring
proc createShared*(T: type InterThreadResponse,
res: Result[string, string]): ptr type T =
## Converts a `Result[string, string]` into a `ptr InterThreadResponse`
## so that it can be transfered to another thread in a safe way.
var ret = createShared(T)
if res.isOk():
let value = res.get()
ret[].respType = ResponseType.OK
ret[].content = cast[cstring](allocShared0(value.len + 1))
copyMem(ret[].content, unsafeAddr value, value.len + 1)
else:
let error = res.error
ret[].respType = ResponseType.ERR
ret[].content = cast[cstring](allocShared0(error.len + 1))
copyMem(ret[].content, unsafeAddr error, error.len + 1)
return ret
proc process*(T: type InterThreadResponse,
resp: ptr InterThreadResponse):
Result[string, string] =
## Converts the received `ptr InterThreadResponse` into a
## `Result[string, string]`. Notice that the response is expected to be
## allocated from the Waku Thread and deallocated by the main thread.
defer:
deallocShared(resp[].content)
deallocShared(resp)
case resp[].respType
of OK:
return ok($resp[].content)
of ERR:
return err($resp[].content)

View File

@ -9,18 +9,19 @@ import
chronicles,
chronos,
stew/results,
stew/shims/net
stew/shims/net,
taskpools/channels_spsc_single
import
../../../waku/node/waku_node,
../events/[json_error_event,json_message_event,json_base_event],
./inter_thread_communication/request
./inter_thread_communication/waku_thread_request,
./inter_thread_communication/waku_thread_response
type
Context* = object
thread: Thread[(ptr Context)]
reqChannel: Channel[InterThreadRequest]
respChannel: Channel[Result[string, string]]
node: WakuNode
reqChannel: ChannelSPSCSingle[ptr InterThreadRequest]
respChannel: ChannelSPSCSingle[ptr InterThreadResponse]
var ctx {.threadvar.}: ptr Context
@ -49,12 +50,20 @@ proc run(ctx: ptr Context) {.thread.} =
while running.load == true:
## Trying to get a request from the libwaku main thread
let req = ctx.reqChannel.tryRecv()
if req[0] == true:
let response = waitFor req[1].process(addr node)
ctx.respChannel.send( response )
poll()
var request: ptr InterThreadRequest
let recvOk = ctx.reqChannel.tryRecv(request)
if recvOk == true:
let resultResponse =
waitFor InterThreadRequest.process(request, addr node)
## Converting a `Result` into a thread-safe transferable response type
let threadSafeResp = InterThreadResponse.createShared(resultResponse)
## The error-handling is performed in the main thread
discard ctx.respChannel.trySend( threadSafeResp )
waitFor sleepAsync(1)
tearDownForeignThreadGc()
@ -65,8 +74,6 @@ proc createWakuThread*(): Result[void, string] =
waku_init()
ctx = createShared(Context, 1)
ctx.reqChannel.open()
ctx.respChannel.open()
running.store(true)
@ -83,20 +90,24 @@ proc createWakuThread*(): Result[void, string] =
proc stopWakuNodeThread*() =
running.store(false)
joinThread(ctx.thread)
ctx.reqChannel.close()
ctx.respChannel.close()
freeShared(ctx)
proc sendRequestToWakuThread*(req: InterThreadRequest): Result[string, string] =
proc sendRequestToWakuThread*(reqType: RequestType,
reqContent: pointer): Result[string, string] =
ctx.reqChannel.send(req)
let req = InterThreadRequest.createShared(reqType, reqContent)
var resp = ctx.respChannel.tryRecv()
while resp[0] == false:
resp = ctx.respChannel.tryRecv()
## Sending the request
let sentOk = ctx.reqChannel.trySend(req)
if not sentOk:
return err("Couldn't send a request to the waku thread: " & $req[])
## Waiting for the response
var response: ptr InterThreadResponse
var recvOk = ctx.respChannel.tryRecv(response)
while recvOk == false:
recvOk = ctx.respChannel.tryRecv(response)
os.sleep(1)
return resp[1]
## Converting the thread-safe response into a managed/CG'ed `Result`
return InterThreadResponse.process(response)

1
vendor/nim-taskpools vendored Submodule

@ -0,0 +1 @@
Subproject commit 15e23ef1cf0860330dcc32f50fcce5f840031e28