chore: enhance libwaku store protocol and more (#3223)

* json_message_event: avoid converting a WakuMessageHash into 0x...
* waku_thread: wait until the waku thread completely received the request
* waku_thread: add missing deallocShared
* libwaku avoid nonsense onReceivedMessage cb in waku_relay_publish
This commit is contained in:
Ivan FB 2025-01-03 12:26:46 +01:00 committed by GitHub
parent 04df6849d4
commit 22ce9ee872
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 48 additions and 37 deletions

View File

@ -57,9 +57,6 @@ proc toWakuMessage*(self: JsonMessage): Result[WakuMessage, string] =
proc `%`*(value: Base64String): JsonNode =
%(value.string)
proc `%`*(value: WakuMessageHash): JsonNode =
%(to0xHex(value))
type JsonMessageEvent* = ref object of JsonEvent
pubsubTopic*: string
messageHash*: WakuMessageHash

View File

@ -328,12 +328,7 @@ proc waku_relay_publish(
handleRequest(
ctx,
RequestType.RELAY,
RelayRequest.createShared(
RelayMsgType.PUBLISH,
PubsubTopic($pst),
WakuRelayHandler(onReceivedMessage(ctx)),
wakuMessage,
),
RelayRequest.createShared(RelayMsgType.PUBLISH, PubsubTopic($pst), nil, wakuMessage),
callback,
userData,
)
@ -695,7 +690,7 @@ proc waku_store_query(
handleRequest(
ctx,
RequestType.STORE,
JsonStoreQueryRequest.createShared(jsonQuery, peerAddr, timeoutMs),
StoreRequest.createShared(StoreReqType.REMOTE_QUERY, jsonQuery, peerAddr, timeoutMs),
callback,
userData,
)

View File

@ -14,31 +14,29 @@ import
type StoreReqType* = enum
REMOTE_QUERY ## to perform a query to another Store node
type JsonStoreQueryRequest* = object
type StoreRequest* = object
operation: StoreReqType
jsonQuery: cstring
peerAddr: cstring
timeoutMs: cint
type StoreRequest* = object
operation: StoreReqType
storeReq: pointer
func fromJsonNode(
T: type JsonStoreQueryRequest, jsonContent: JsonNode
T: type StoreRequest, jsonContent: JsonNode
): Result[StoreQueryRequest, string] =
let contentTopics = collect(newSeq):
for cTopic in jsonContent["content_topics"].getElems():
cTopic.getStr()
let msgHashes = collect(newSeq):
for hashJsonObj in jsonContent["message_hashes"].getElems():
var hash: WakuMessageHash
var count: int = 0
for byteValue in hashJsonObj.getElems():
hash[count] = byteValue.getInt().byte
count.inc()
if jsonContent.contains("message_hashes"):
for hashJsonObj in jsonContent["message_hashes"].getElems():
var hash: WakuMessageHash
var count: int = 0
for byteValue in hashJsonObj.getElems():
hash[count] = byteValue.getInt().byte
count.inc()
hash
hash
let pubsubTopic =
if jsonContent.contains("pubsub_topic"):
@ -68,14 +66,17 @@ func fromJsonNode(
else:
none(uint64)
let startTime = ?jsonContent.getProtoInt64("time_start")
let endTime = ?jsonContent.getProtoInt64("time_end")
return ok(
StoreQueryRequest(
requestId: jsonContent["request_id"].getStr(),
includeData: jsonContent["include_data"].getBool(),
pubsubTopic: pubsubTopic,
contentTopics: contentTopics,
startTime: ?jsonContent.getProtoInt64("time_start"),
endTime: ?jsonContent.getProtoInt64("time_end"),
startTime: startTime,
endTime: endTime,
messageHashes: msgHashes,
paginationCursor: paginationCursor,
paginationForward: paginationForward,
@ -84,24 +85,26 @@ func fromJsonNode(
)
proc createShared*(
T: type JsonStoreQueryRequest,
T: type StoreRequest,
op: StoreReqType,
jsonQuery: cstring,
peerAddr: cstring,
timeoutMs: cint,
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].timeoutMs = timeoutMs
ret[].jsonQuery = jsonQuery.alloc()
ret[].peerAddr = peerAddr.alloc()
return ret
proc destroyShared(self: ptr JsonStoreQueryRequest) =
proc destroyShared(self: ptr StoreRequest) =
deallocShared(self[].jsonQuery)
deallocShared(self[].peerAddr)
deallocShared(self)
proc process(
self: ptr JsonStoreQueryRequest, waku: ptr Waku
proc process_remote_query(
self: ptr StoreRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)
@ -110,17 +113,15 @@ proc process(
parseJson($self[].jsonQuery)
if jsonContentRes.isErr():
return err(
"JsonStoreQueryRequest failed parsing store request: " & jsonContentRes.error.msg
)
return err("StoreRequest failed parsing store request: " & jsonContentRes.error.msg)
let storeQueryRequest = JsonStoreQueryRequest.fromJsonNode(jsonContentRes.get())
let storeQueryRequest = ?StoreRequest.fromJsonNode(jsonContentRes.get())
let peer = peers.parsePeerInfo(($self[].peerAddr).split(",")).valueOr:
return err("JsonStoreQueryRequest failed to parse peer addr: " & $error)
return err("StoreRequest failed to parse peer addr: " & $error)
let queryResponse = (await waku.node.wakuStoreClient.query(?storeQueryRequest, peer)).valueOr:
return err("JsonStoreQueryRequest failed store query: " & $error)
let queryResponse = (await waku.node.wakuStoreClient.query(storeQueryRequest, peer)).valueOr:
return err("StoreRequest failed store query: " & $error)
return ok($(%*queryResponse)) ## returning the response in json format
@ -132,7 +133,7 @@ proc process*(
case self.operation
of REMOTE_QUERY:
return await cast[ptr JsonStoreQueryRequest](self[].storeReq).process(waku)
return await self.process_remote_query(waku)
error "store request not handled at all"
return err("store request not handled at all")

View File

@ -10,6 +10,9 @@ type WakuContext* = object
thread: Thread[(ptr WakuContext)]
reqChannel: ChannelSPSCSingle[ptr WakuThreadRequest]
reqSignal: ThreadSignalPtr
# to inform The Waku Thread (a.k.a TWT) that a new request is sent
reqReceivedSignal: ThreadSignalPtr
# to inform the main thread that the request is rx by TWT
userData*: pointer
eventCallback*: pointer
eventUserdata*: pointer
@ -37,6 +40,10 @@ proc runWaku(ctx: ptr WakuContext) {.async.} =
error "waku thread could not receive a request"
continue
let fireRes = ctx.reqReceivedSignal.fireSync()
if fireRes.isErr():
error "could not fireSync back to requester thread", error = fireRes.error
## Handle the request
asyncSpawn WakuThreadRequest.process(request, addr waku)
@ -50,6 +57,8 @@ proc createWakuThread*(): Result[ptr WakuContext, string] =
var ctx = createShared(WakuContext, 1)
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create reqSignal ThreadSignalPtr")
ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create reqReceivedSignal ThreadSignalPtr")
ctx.running.store(true)
@ -73,6 +82,7 @@ proc destroyWakuThread*(ctx: ptr WakuContext): Result[void, string] =
joinThread(ctx.thread)
?ctx.reqSignal.close()
?ctx.reqReceivedSignal.close()
freeShared(ctx)
return ok()
@ -100,4 +110,12 @@ proc sendRequestToWakuThread*(
deallocShared(req)
return err("Couldn't fireSync in time")
## wait until the Waku Thread properly received the request
let res = ctx.reqReceivedSignal.waitSync()
if res.isErr():
deallocShared(req)
return err("Couldn't receive reqReceivedSignal signal")
## Notice that in case of "ok", the deallocShared(req) is performed by the Waku Thread in the
## process proc.
ok()