From 72f90663cd1dd3dedd6d974fdf654454c18564dc Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande <128452529+Ivansete-status@users.noreply.github.com> Date: Mon, 18 Sep 2023 09:21:50 +0200 Subject: [PATCH] refactor(cbindings): Thread-safe communication between the main thread and the Waku Thread (#1978) * Thread-safe comms between main thread & Waku Thread - ChannelSPSCSingle. * Renaming procs from 'new' to 'createShared'. They use the shared allocator. * peer_manager_request: no need to use ptr WakuNode. * waku_thread: moving the 'waitFor' to upper layer. * waku_thread: `poll()` -> `waitFor sleepAsync(1)` to avoid risk of blocking. * libwaku: thread-safe "sub-objects" in an inter-thread requests. When two threads send data each other, that data cannot contain any GC'ed type (string, seq, ref, closures) at any level. * Allocating the 'configJson' in main thread and deallocating in Waku Thread. --- .gitmodules | 5 + library/alloc.nim | 29 +++++ library/libwaku.nim | 52 ++++---- .../protocols/relay_request.nim | 64 ---------- .../inter_thread_communication/request.nim | 21 ---- .../{ => requests}/node_lifecycle_request.nim | 55 +++++---- .../{ => requests}/peer_manager_request.nim | 36 +++--- .../requests/protocols/relay_request.nim | 114 ++++++++++++++++++ .../waku_thread_request.nim | 57 +++++++++ .../waku_thread_response.nim | 53 ++++++++ library/waku_thread/waku_thread.nim | 57 +++++---- vendor/nim-taskpools | 1 + 12 files changed, 378 insertions(+), 166 deletions(-) delete mode 100644 library/waku_thread/inter_thread_communication/protocols/relay_request.nim delete mode 100644 library/waku_thread/inter_thread_communication/request.nim rename library/waku_thread/inter_thread_communication/{ => requests}/node_lifecycle_request.nim (67%) rename library/waku_thread/inter_thread_communication/{ => requests}/peer_manager_request.nim (50%) create mode 100644 library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim create mode 100644 library/waku_thread/inter_thread_communication/waku_thread_request.nim create mode 100644 library/waku_thread/inter_thread_communication/waku_thread_response.nim create mode 160000 vendor/nim-taskpools diff --git a/.gitmodules b/.gitmodules index 5c7cb0376..9d69d0723 100644 --- a/.gitmodules +++ b/.gitmodules @@ -154,3 +154,8 @@ url = https://github.com/nitely/nim-unicodedb.git ignore = untracked branch = master +[submodule "vendor/nim-taskpools"] + path = vendor/nim-taskpools + url = https://github.com/status-im/nim-taskpools.git + ignore = untracked + branch = stable diff --git a/library/alloc.nim b/library/alloc.nim index 44d5e67ea..a9aee0a8b 100644 --- a/library/alloc.nim +++ b/library/alloc.nim @@ -1,3 +1,5 @@ +## Can be shared safely between threads +type SharedSeq*[T] = tuple[data: ptr UncheckedArray[T], len: int] proc alloc*(str: cstring): cstring = # Byte allocation from the given address. @@ -5,3 +7,30 @@ proc alloc*(str: cstring): cstring = let ret = cast[cstring](allocShared(len(str) + 1)) copyMem(ret, str, len(str) + 1) return ret + +proc alloc*(str: string): cstring = + ## Byte allocation from the given address. + ## There should be the corresponding manual deallocation with deallocShared ! + var ret = cast[cstring](allocShared(str.len + 1)) + let s = cast[seq[char]](str) + for i in 0.. 0: - # TODO: pending to return a valid message Id - return ok("hard-coded-message-id") - - return ok("") diff --git a/library/waku_thread/inter_thread_communication/request.nim b/library/waku_thread/inter_thread_communication/request.nim deleted file mode 100644 index 6d3060de5..000000000 --- a/library/waku_thread/inter_thread_communication/request.nim +++ /dev/null @@ -1,21 +0,0 @@ - -# This file contains the base message request type that will be handled -# by the Waku Node thread. - -import - std/json, - stew/results -import - chronos -import - ../../../waku/node/waku_node, - ../waku_thread - -type - InterThreadRequest* = ref object of RootObj - -method process*(self: InterThreadRequest, node: ptr WakuNode): - Future[Result[string, string]] {.base.} = discard - -proc `$`*(self: InterThreadRequest): string = - return $( %* self ) diff --git a/library/waku_thread/inter_thread_communication/node_lifecycle_request.nim b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim similarity index 67% rename from library/waku_thread/inter_thread_communication/node_lifecycle_request.nim rename to library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim index 0ce14088d..cf134ceeb 100644 --- a/library/waku_thread/inter_thread_communication/node_lifecycle_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim @@ -6,21 +6,21 @@ import stew/results, stew/shims/net import - ../../../waku/common/enr/builder, - ../../../waku/waku_enr/capabilities, - ../../../waku/waku_enr/multiaddr, - ../../../waku/waku_enr/sharding, - ../../../waku/waku_core/message/message, - ../../../waku/waku_core/topics/pubsub_topic, - ../../../waku/node/peer_manager/peer_manager, - ../../../waku/waku_core, - ../../../waku/node/waku_node, - ../../../waku/node/builder, - ../../../waku/node/config, - ../../../waku/waku_relay/protocol, - ../../events/[json_error_event,json_message_event,json_base_event], - ../config, - ./request + ../../../../waku/common/enr/builder, + ../../../../waku/waku_enr/capabilities, + ../../../../waku/waku_enr/multiaddr, + ../../../../waku/waku_enr/sharding, + ../../../../waku/waku_core/message/message, + ../../../../waku/waku_core/topics/pubsub_topic, + ../../../../waku/node/peer_manager/peer_manager, + ../../../../waku/waku_core, + ../../../../waku/node/waku_node, + ../../../../waku/node/builder, + ../../../../waku/node/config, + ../../../../waku/waku_relay/protocol, + ../../../events/[json_error_event,json_message_event,json_base_event], + ../../../alloc, + ../../config type NodeLifecycleMsgType* = enum @@ -29,18 +29,25 @@ type STOP_NODE type - NodeLifecycleRequest* = ref object of InterThreadRequest + NodeLifecycleRequest* = object operation: NodeLifecycleMsgType configJson: cstring ## Only used in 'CREATE_NODE' operation -proc new*(T: type NodeLifecycleRequest, - op: NodeLifecycleMsgType, - configJson: cstring = ""): T = +proc createShared*(T: type NodeLifecycleRequest, + op: NodeLifecycleMsgType, + configJson: cstring = ""): ptr type T = - return NodeLifecycleRequest(operation: op, configJson: configJson) + var ret = createShared(T) + ret[].operation = op + ret[].configJson = configJson.alloc() + return ret + +proc destroyShared(self: ptr NodeLifecycleRequest) = + deallocShared(self[].configJson) + deallocShared(self) proc createNode(configJson: cstring): - Future[Result[WakuNode, string]] {.async.} = + Future[Result[WakuNode, string]] {.async.} = var privateKey: PrivateKey var netConfig = NetConfig.init(ValidIpAddress.init("127.0.0.1"), @@ -108,8 +115,10 @@ proc createNode(configJson: cstring): return ok(newNode) -method process*(self: NodeLifecycleRequest, - node: ptr WakuNode): Future[Result[string, string]] {.async.} = +proc process*(self: ptr NodeLifecycleRequest, + node: ptr WakuNode): Future[Result[string, string]] {.async.} = + + defer: destroyShared(self) case self.operation: of CREATE_NODE: diff --git a/library/waku_thread/inter_thread_communication/peer_manager_request.nim b/library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim similarity index 50% rename from library/waku_thread/inter_thread_communication/peer_manager_request.nim rename to library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim index a76b1ddf8..ddace3d89 100644 --- a/library/waku_thread/inter_thread_communication/peer_manager_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim @@ -7,27 +7,33 @@ import stew/results, stew/shims/net import - ../../../waku/node/waku_node, - ./request + ../../../../waku/node/waku_node, + ../../../alloc type PeerManagementMsgType* = enum CONNECT_TO type - PeerManagementRequest* = ref object of InterThreadRequest + PeerManagementRequest* = object operation: PeerManagementMsgType - peerMultiAddr: string + peerMultiAddr: cstring dialTimeout: Duration -proc new*(T: type PeerManagementRequest, - op: PeerManagementMsgType, - peerMultiAddr: string, - dialTimeout: Duration): T = +proc createShared*(T: type PeerManagementRequest, + op: PeerManagementMsgType, + peerMultiAddr: string, + dialTimeout: Duration): ptr type T = - return PeerManagementRequest(operation: op, - peerMultiAddr: peerMultiAddr, - dialTimeout: dialTimeout) + var ret = createShared(T) + ret[].operation = op + ret[].peerMultiAddr = peerMultiAddr.alloc() + ret[].dialTimeout = dialTimeout + return ret + +proc destroyShared(self: ptr PeerManagementRequest) = + deallocShared(self[].peerMultiAddr) + deallocShared(self) proc connectTo(node: WakuNode, peerMultiAddr: string, @@ -46,13 +52,15 @@ proc connectTo(node: WakuNode, return ok() -method process*(self: PeerManagementRequest, - node: ptr WakuNode): Future[Result[string, string]] {.async.} = +proc process*(self: ptr PeerManagementRequest, + node: WakuNode): Future[Result[string, string]] {.async.} = + + defer: destroyShared(self) case self.operation: of CONNECT_TO: - let ret = node[].connectTo(self.peerMultiAddr, self.dialTimeout) + let ret = node.connectTo($self[].peerMultiAddr, self[].dialTimeout) if ret.isErr(): return err(ret.error) diff --git a/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim b/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim new file mode 100644 index 000000000..dfa2fd7e4 --- /dev/null +++ b/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim @@ -0,0 +1,114 @@ + +import + std/[options,sequtils,strutils] +import + chronicles, + chronos, + stew/results, + stew/shims/net +import + ../../../../../waku/waku_core/message/message, + ../../../../../waku/node/waku_node, + ../../../../../waku/waku_core/time, # Timestamp + ../../../../../waku/waku_core/topics/pubsub_topic, + ../../../../../waku/waku_relay/protocol, + ../../../../alloc + +type + RelayMsgType* = enum + SUBSCRIBE + UNSUBSCRIBE + PUBLISH + +type + ThreadSafeWakuMessage* = object + payload: SharedSeq[byte] + contentTopic: cstring + meta: SharedSeq[byte] + version: uint32 + timestamp: Timestamp + ephemeral: bool + when defined(rln): + proof: SharedSeq[byte] + +type + RelayRequest* = object + operation: RelayMsgType + pubsubTopic: cstring + relayEventCallback: WakuRelayHandler # not used in 'PUBLISH' requests + message: ThreadSafeWakuMessage # only used in 'PUBLISH' requests + +proc createShared*(T: type RelayRequest, + op: RelayMsgType, + pubsubTopic: PubsubTopic, + relayEventCallback: WakuRelayHandler = nil, + m = WakuMessage()): ptr type T = + + var ret = createShared(T) + ret[].operation = op + ret[].pubsubTopic = pubsubTopic.alloc() + ret[].relayEventCallback = relayEventCallback + ret[].message = ThreadSafeWakuMessage( + payload: allocSharedSeq(m.payload), + contentTopic: m.contentTopic.alloc(), + meta: allocSharedSeq(m.meta), + version: m.version, + timestamp: m.timestamp, + ephemeral: m.ephemeral, + ) + when defined(rln): + ret[].message.proof = allocSharedSeq(m.proof) + + return ret + +proc destroyShared(self: ptr RelayRequest) = + deallocSharedSeq(self[].message.payload) + deallocShared(self[].message.contentTopic) + deallocSharedSeq(self[].message.meta) + when defined(rln): + deallocSharedSeq(self[].message.proof) + + deallocShared(self) + +proc toWakuMessage(m: ThreadSafeWakuMessage): WakuMessage = + var wakuMessage = WakuMessage() + + wakuMessage.payload = m.payload.toSeq() + wakuMessage.contentTopic = $m.contentTopic + wakuMessage.meta = m.meta.toSeq() + wakuMessage.version = m.version + wakuMessage.timestamp = m.timestamp + wakuMessage.ephemeral = m.ephemeral + + when defined(rln): + wakuMessage.proof = m.proof + + return wakuMessage + +proc process*(self: ptr RelayRequest, + node: ptr WakuNode): Future[Result[string, string]] {.async.} = + + defer: destroyShared(self) + + if node.wakuRelay.isNil(): + return err("Operation not supported without Waku Relay enabled.") + + case self.operation: + + of SUBSCRIBE: + node.wakuRelay.subscribe($self.pubsubTopic, self.relayEventCallback) + + of UNSUBSCRIBE: + node.wakuRelay.unsubscribe($self.pubsubTopic) + + of PUBLISH: + let numPeers = await node.wakuRelay.publish($self.pubsubTopic, + self.message.toWakuMessage()) + if numPeers == 0: + return err("Message not sent because no peers found.") + + elif numPeers > 0: + # TODO: pending to return a valid message Id + return ok("hard-coded-message-id") + + return ok("") diff --git a/library/waku_thread/inter_thread_communication/waku_thread_request.nim b/library/waku_thread/inter_thread_communication/waku_thread_request.nim new file mode 100644 index 000000000..79ba7797d --- /dev/null +++ b/library/waku_thread/inter_thread_communication/waku_thread_request.nim @@ -0,0 +1,57 @@ + +## This file contains the base message request type that will be handled. +## The requests are created by the main thread and processed by +## the Waku Thread. + +import + std/json, + stew/results +import + chronos +import + ../../../waku/node/waku_node, + ./requests/node_lifecycle_request, + ./requests/peer_manager_request, + ./requests/protocols/relay_request + +type + RequestType* {.pure.} = enum + LIFECYCLE, + PEER_MANAGER, + RELAY, + +type + InterThreadRequest* = object + reqType: RequestType + reqContent: pointer + +proc createShared*(T: type InterThreadRequest, + reqType: RequestType, + reqContent: pointer): ptr type T = + var ret = createShared(T) + ret[].reqType = reqType + ret[].reqContent = reqContent + return ret + +proc process*(T: type InterThreadRequest, + request: ptr InterThreadRequest, + node: ptr WakuNode): + Future[Result[string, string]] {.async.} = + ## Processes the request and deallocates its memory + defer: deallocShared(request) + + echo "Request received: " & $request[].reqType + + let retFut = + case request[].reqType + of LIFECYCLE: + cast[ptr NodeLifecycleRequest](request[].reqContent).process(node) + of PEER_MANAGER: + cast[ptr PeerManagementRequest](request[].reqContent).process(node[]) + of RELAY: + cast[ptr RelayRequest](request[].reqContent).process(node) + + return await retFut + +proc `$`*(self: InterThreadRequest): string = + return $self.reqType \ No newline at end of file diff --git a/library/waku_thread/inter_thread_communication/waku_thread_response.nim b/library/waku_thread/inter_thread_communication/waku_thread_response.nim new file mode 100644 index 000000000..c3005f4d7 --- /dev/null +++ b/library/waku_thread/inter_thread_communication/waku_thread_response.nim @@ -0,0 +1,53 @@ + +## This file contains the base message response type that will be handled. +## The response will be created from the Waku Thread and processed in +## the main thread. + +import + std/json, + stew/results + +type + ResponseType {.pure.} = enum + OK, + ERR, + +type + InterThreadResponse* = object + respType: ResponseType + content: cstring + +proc createShared*(T: type InterThreadResponse, + res: Result[string, string]): ptr type T = + ## Converts a `Result[string, string]` into a `ptr InterThreadResponse` + ## so that it can be transfered to another thread in a safe way. + + var ret = createShared(T) + if res.isOk(): + let value = res.get() + ret[].respType = ResponseType.OK + ret[].content = cast[cstring](allocShared0(value.len + 1)) + copyMem(ret[].content, unsafeAddr value, value.len + 1) + else: + let error = res.error + ret[].respType = ResponseType.ERR + ret[].content = cast[cstring](allocShared0(error.len + 1)) + copyMem(ret[].content, unsafeAddr error, error.len + 1) + return ret + +proc process*(T: type InterThreadResponse, + resp: ptr InterThreadResponse): + Result[string, string] = + ## Converts the received `ptr InterThreadResponse` into a + ## `Result[string, string]`. Notice that the response is expected to be + ## allocated from the Waku Thread and deallocated by the main thread. + + defer: + deallocShared(resp[].content) + deallocShared(resp) + + case resp[].respType + of OK: + return ok($resp[].content) + of ERR: + return err($resp[].content) diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index 8fd1253c8..c9a405117 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -9,18 +9,19 @@ import chronicles, chronos, stew/results, - stew/shims/net + stew/shims/net, + taskpools/channels_spsc_single import ../../../waku/node/waku_node, ../events/[json_error_event,json_message_event,json_base_event], - ./inter_thread_communication/request + ./inter_thread_communication/waku_thread_request, + ./inter_thread_communication/waku_thread_response type Context* = object thread: Thread[(ptr Context)] - reqChannel: Channel[InterThreadRequest] - respChannel: Channel[Result[string, string]] - node: WakuNode + reqChannel: ChannelSPSCSingle[ptr InterThreadRequest] + respChannel: ChannelSPSCSingle[ptr InterThreadResponse] var ctx {.threadvar.}: ptr Context @@ -49,12 +50,20 @@ proc run(ctx: ptr Context) {.thread.} = while running.load == true: ## Trying to get a request from the libwaku main thread - let req = ctx.reqChannel.tryRecv() - if req[0] == true: - let response = waitFor req[1].process(addr node) - ctx.respChannel.send( response ) - poll() + var request: ptr InterThreadRequest + let recvOk = ctx.reqChannel.tryRecv(request) + if recvOk == true: + let resultResponse = + waitFor InterThreadRequest.process(request, addr node) + + ## Converting a `Result` into a thread-safe transferable response type + let threadSafeResp = InterThreadResponse.createShared(resultResponse) + + ## The error-handling is performed in the main thread + discard ctx.respChannel.trySend( threadSafeResp ) + + waitFor sleepAsync(1) tearDownForeignThreadGc() @@ -65,8 +74,6 @@ proc createWakuThread*(): Result[void, string] = waku_init() ctx = createShared(Context, 1) - ctx.reqChannel.open() - ctx.respChannel.open() running.store(true) @@ -83,20 +90,24 @@ proc createWakuThread*(): Result[void, string] = proc stopWakuNodeThread*() = running.store(false) joinThread(ctx.thread) - - ctx.reqChannel.close() - ctx.respChannel.close() - freeShared(ctx) -proc sendRequestToWakuThread*(req: InterThreadRequest): Result[string, string] = +proc sendRequestToWakuThread*(reqType: RequestType, + reqContent: pointer): Result[string, string] = - ctx.reqChannel.send(req) + let req = InterThreadRequest.createShared(reqType, reqContent) - var resp = ctx.respChannel.tryRecv() - while resp[0] == false: - resp = ctx.respChannel.tryRecv() + ## Sending the request + let sentOk = ctx.reqChannel.trySend(req) + if not sentOk: + return err("Couldn't send a request to the waku thread: " & $req[]) + + ## Waiting for the response + var response: ptr InterThreadResponse + var recvOk = ctx.respChannel.tryRecv(response) + while recvOk == false: + recvOk = ctx.respChannel.tryRecv(response) os.sleep(1) - return resp[1] - + ## Converting the thread-safe response into a managed/CG'ed `Result` + return InterThreadResponse.process(response) diff --git a/vendor/nim-taskpools b/vendor/nim-taskpools new file mode 160000 index 000000000..15e23ef1c --- /dev/null +++ b/vendor/nim-taskpools @@ -0,0 +1 @@ +Subproject commit 15e23ef1cf0860330dcc32f50fcce5f840031e28