From 9f46c3c12396178cd27444112a2a7dfb8a52e576 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Fri, 3 Jan 2025 12:26:46 +0100 Subject: [PATCH] chore: enhance libwaku store protocol and more (#3223) * json_message_event: avoid converting a WakuMessageHash into 0x... * waku_thread: wait until the waku thread completely received the request * waku_thread: add missing deallocShared * libwaku avoid nonsense onReceivedMessage cb in waku_relay_publish --- library/events/json_message_event.nim | 3 - library/libwaku.nim | 9 +-- .../requests/protocols/store_request.nim | 55 ++++++++++--------- library/waku_thread/waku_thread.nim | 18 ++++++ 4 files changed, 48 insertions(+), 37 deletions(-) diff --git a/library/events/json_message_event.nim b/library/events/json_message_event.nim index ffb6065f5..2d066e7b3 100644 --- a/library/events/json_message_event.nim +++ b/library/events/json_message_event.nim @@ -57,9 +57,6 @@ proc toWakuMessage*(self: JsonMessage): Result[WakuMessage, string] = proc `%`*(value: Base64String): JsonNode = %(value.string) -proc `%`*(value: WakuMessageHash): JsonNode = - %(to0xHex(value)) - type JsonMessageEvent* = ref object of JsonEvent pubsubTopic*: string messageHash*: WakuMessageHash diff --git a/library/libwaku.nim b/library/libwaku.nim index d617c00bb..51e6db7b9 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -328,12 +328,7 @@ proc waku_relay_publish( handleRequest( ctx, RequestType.RELAY, - RelayRequest.createShared( - RelayMsgType.PUBLISH, - PubsubTopic($pst), - WakuRelayHandler(onReceivedMessage(ctx)), - wakuMessage, - ), + RelayRequest.createShared(RelayMsgType.PUBLISH, PubsubTopic($pst), nil, wakuMessage), callback, userData, ) @@ -695,7 +690,7 @@ proc waku_store_query( handleRequest( ctx, RequestType.STORE, - JsonStoreQueryRequest.createShared(jsonQuery, peerAddr, timeoutMs), + StoreRequest.createShared(StoreReqType.REMOTE_QUERY, jsonQuery, peerAddr, timeoutMs), callback, userData, ) diff --git a/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim b/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim index 3e2523fec..9ccf8bced 100644 --- a/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim @@ -14,31 +14,29 @@ import type StoreReqType* = enum REMOTE_QUERY ## to perform a query to another Store node -type JsonStoreQueryRequest* = object +type StoreRequest* = object + operation: StoreReqType jsonQuery: cstring peerAddr: cstring timeoutMs: cint -type StoreRequest* = object - operation: StoreReqType - storeReq: pointer - func fromJsonNode( - T: type JsonStoreQueryRequest, jsonContent: JsonNode + T: type StoreRequest, jsonContent: JsonNode ): Result[StoreQueryRequest, string] = let contentTopics = collect(newSeq): for cTopic in jsonContent["content_topics"].getElems(): cTopic.getStr() let msgHashes = collect(newSeq): - for hashJsonObj in jsonContent["message_hashes"].getElems(): - var hash: WakuMessageHash - var count: int = 0 - for byteValue in hashJsonObj.getElems(): - hash[count] = byteValue.getInt().byte - count.inc() + if jsonContent.contains("message_hashes"): + for hashJsonObj in jsonContent["message_hashes"].getElems(): + var hash: WakuMessageHash + var count: int = 0 + for byteValue in hashJsonObj.getElems(): + hash[count] = byteValue.getInt().byte + count.inc() - hash + hash let pubsubTopic = if jsonContent.contains("pubsub_topic"): @@ -68,14 +66,17 @@ func fromJsonNode( else: none(uint64) + let startTime = ?jsonContent.getProtoInt64("time_start") + let endTime = ?jsonContent.getProtoInt64("time_end") + return ok( StoreQueryRequest( requestId: jsonContent["request_id"].getStr(), includeData: jsonContent["include_data"].getBool(), pubsubTopic: pubsubTopic, contentTopics: contentTopics, - startTime: ?jsonContent.getProtoInt64("time_start"), - endTime: ?jsonContent.getProtoInt64("time_end"), + startTime: startTime, + endTime: endTime, messageHashes: msgHashes, paginationCursor: paginationCursor, paginationForward: paginationForward, @@ -84,24 +85,26 @@ func fromJsonNode( ) proc createShared*( - T: type JsonStoreQueryRequest, + T: type StoreRequest, + op: StoreReqType, jsonQuery: cstring, peerAddr: cstring, timeoutMs: cint, ): ptr type T = var ret = createShared(T) + ret[].operation = op ret[].timeoutMs = timeoutMs ret[].jsonQuery = jsonQuery.alloc() ret[].peerAddr = peerAddr.alloc() return ret -proc destroyShared(self: ptr JsonStoreQueryRequest) = +proc destroyShared(self: ptr StoreRequest) = deallocShared(self[].jsonQuery) deallocShared(self[].peerAddr) deallocShared(self) -proc process( - self: ptr JsonStoreQueryRequest, waku: ptr Waku +proc process_remote_query( + self: ptr StoreRequest, waku: ptr Waku ): Future[Result[string, string]] {.async.} = defer: destroyShared(self) @@ -110,17 +113,15 @@ proc process( parseJson($self[].jsonQuery) if jsonContentRes.isErr(): - return err( - "JsonStoreQueryRequest failed parsing store request: " & jsonContentRes.error.msg - ) + return err("StoreRequest failed parsing store request: " & jsonContentRes.error.msg) - let storeQueryRequest = JsonStoreQueryRequest.fromJsonNode(jsonContentRes.get()) + let storeQueryRequest = ?StoreRequest.fromJsonNode(jsonContentRes.get()) let peer = peers.parsePeerInfo(($self[].peerAddr).split(",")).valueOr: - return err("JsonStoreQueryRequest failed to parse peer addr: " & $error) + return err("StoreRequest failed to parse peer addr: " & $error) - let queryResponse = (await waku.node.wakuStoreClient.query(?storeQueryRequest, peer)).valueOr: - return err("JsonStoreQueryRequest failed store query: " & $error) + let queryResponse = (await waku.node.wakuStoreClient.query(storeQueryRequest, peer)).valueOr: + return err("StoreRequest failed store query: " & $error) return ok($(%*queryResponse)) ## returning the response in json format @@ -132,7 +133,7 @@ proc process*( case self.operation of REMOTE_QUERY: - return await cast[ptr JsonStoreQueryRequest](self[].storeReq).process(waku) + return await self.process_remote_query(waku) error "store request not handled at all" return err("store request not handled at all") diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index 1c7b87536..4e8019b08 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -10,6 +10,9 @@ type WakuContext* = object thread: Thread[(ptr WakuContext)] reqChannel: ChannelSPSCSingle[ptr WakuThreadRequest] reqSignal: ThreadSignalPtr + # to inform The Waku Thread (a.k.a TWT) that a new request is sent + reqReceivedSignal: ThreadSignalPtr + # to inform the main thread that the request is rx by TWT userData*: pointer eventCallback*: pointer eventUserdata*: pointer @@ -37,6 +40,10 @@ proc runWaku(ctx: ptr WakuContext) {.async.} = error "waku thread could not receive a request" continue + let fireRes = ctx.reqReceivedSignal.fireSync() + if fireRes.isErr(): + error "could not fireSync back to requester thread", error = fireRes.error + ## Handle the request asyncSpawn WakuThreadRequest.process(request, addr waku) @@ -50,6 +57,8 @@ proc createWakuThread*(): Result[ptr WakuContext, string] = var ctx = createShared(WakuContext, 1) ctx.reqSignal = ThreadSignalPtr.new().valueOr: return err("couldn't create reqSignal ThreadSignalPtr") + ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr: + return err("couldn't create reqReceivedSignal ThreadSignalPtr") ctx.running.store(true) @@ -73,6 +82,7 @@ proc destroyWakuThread*(ctx: ptr WakuContext): Result[void, string] = joinThread(ctx.thread) ?ctx.reqSignal.close() + ?ctx.reqReceivedSignal.close() freeShared(ctx) return ok() @@ -100,4 +110,12 @@ proc sendRequestToWakuThread*( deallocShared(req) return err("Couldn't fireSync in time") + ## wait until the Waku Thread properly received the request + let res = ctx.reqReceivedSignal.waitSync() + if res.isErr(): + deallocShared(req) + return err("Couldn't receive reqReceivedSignal signal") + + ## Notice that in case of "ok", the deallocShared(req) is performed by the Waku Thread in the + ## process proc. ok()