diff --git a/library/libwaku.h b/library/libwaku.h index a189937dc..0024dcd96 100644 --- a/library/libwaku.h +++ b/library/libwaku.h @@ -75,6 +75,12 @@ int waku_relay_publish(void* ctx, WakuCallBack callback, void* userData); +int waku_lightpush_publish(void* ctx, + const char* pubSubTopic, + const char* jsonWakuMessage, + WakuCallBack callback, + void* userData); + int waku_relay_subscribe(void* ctx, const char* pubSubTopic, WakuCallBack callback, @@ -85,6 +91,23 @@ int waku_relay_unsubscribe(void* ctx, WakuCallBack callback, void* userData); +int waku_relay_get_num_connected_peers(void* ctx, + const char* pubSubTopic, + WakuCallBack callback, + void* userData); + +int waku_relay_get_num_peers_in_mesh(void* ctx, + const char* pubSubTopic, + WakuCallBack callback, + void* userData); + +int waku_store_query(void* ctx, + const char* jsonQuery, + const char* peerAddr, + int timeoutMs, + WakuCallBack callback, + void* userData); + int waku_connect(void* ctx, const char* peerMultiAddr, unsigned int timeoutMs, @@ -114,6 +137,14 @@ int waku_discv5_update_bootnodes(void* ctx, WakuCallBack callback, void* userData); +int waku_start_discv5(void* ctx, + WakuCallBack callback, + void* userData); + +int waku_stop_discv5(void* ctx, + WakuCallBack callback, + void* userData); + // Retrieves the ENR information int waku_get_my_enr(void* ctx, WakuCallBack callback, diff --git a/library/libwaku.nim b/library/libwaku.nim index 504736ce1..4fce6509f 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -19,6 +19,7 @@ import ./waku_thread/inter_thread_communication/requests/peer_manager_request, ./waku_thread/inter_thread_communication/requests/protocols/relay_request, ./waku_thread/inter_thread_communication/requests/protocols/store_request, + ./waku_thread/inter_thread_communication/requests/protocols/lightpush_request, ./waku_thread/inter_thread_communication/requests/debug_node_request, ./waku_thread/inter_thread_communication/requests/discovery_request, ./waku_thread/inter_thread_communication/waku_thread_request, @@ -383,6 +384,119 @@ proc waku_relay_unsubscribe( return RET_OK +proc waku_relay_get_num_connected_peers( + ctx: ptr Context, pubSubTopic: cstring, callback: WakuCallBack, userData: pointer +): cint {.dynlib, exportc.} = + ctx[].userData = userData + + let pst = pubSubTopic.alloc() + defer: + deallocShared(pst) + + let numConnPeersRes = waku_thread.sendRequestToWakuThread( + ctx, + RequestType.RELAY, + RelayRequest.createShared(RelayMsgType.LIST_CONNECTED_PEERS, PubsubTopic($pst)), + ) + + if numConnPeersRes.isErr(): + foreignThreadGc: + let msg = "Error in waku_relay_get_num_connected_peers: " & $numConnPeersRes.error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + let numConnPeers = numConnPeersRes.get() + foreignThreadGc: + callback( + RET_OK, unsafeAddr numConnPeers[0], cast[csize_t](len(numConnPeers)), userData + ) + + return RET_OK + +proc waku_relay_get_num_peers_in_mesh( + ctx: ptr Context, pubSubTopic: cstring, callback: WakuCallBack, userData: pointer +): cint {.dynlib, exportc.} = + ctx[].userData = userData + + let pst = pubSubTopic.alloc() + defer: + deallocShared(pst) + + let numPeersInMeshRes = waku_thread.sendRequestToWakuThread( + ctx, + RequestType.RELAY, + RelayRequest.createShared(RelayMsgType.LIST_MESH_PEERS, PubsubTopic($pst)), + ) + + if numPeersInMeshRes.isErr(): + foreignThreadGc: + let msg = "Error in waku_relay_get_num_peers_in_mesh: " & $numPeersInMeshRes.error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + let numPeersInMesh = numPeersInMeshRes.get() + foreignThreadGc: + callback( + RET_OK, unsafeAddr numPeersInMesh[0], cast[csize_t](len(numPeersInMesh)), userData + ) + + return RET_OK + +proc waku_lightpush_publish( + ctx: ptr Context, + pubSubTopic: cstring, + jsonWakuMessage: cstring, + callback: WakuCallBack, + userData: pointer, +): cint {.dynlib, exportc, cdecl.} = + ctx[].userData = userData + + if isNil(callback): + return RET_MISSING_CALLBACK + + let jwm = jsonWakuMessage.alloc() + let pst = pubSubTopic.alloc() + defer: + deallocShared(jwm) + deallocShared(pst) + + var jsonMessage: JsonMessage + try: + let jsonContent = parseJson($jwm) + jsonMessage = JsonMessage.fromJsonNode(jsonContent) + except JsonParsingError: + let msg = fmt"Error parsing json message: {getCurrentExceptionMsg()}" + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + let wakuMessage = jsonMessage.toWakuMessage().valueOr: + let msg = fmt"Problem building the WakuMessage: {error}" + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + let targetPubSubTopic = + if len(pst) == 0: + DefaultPubsubTopic + else: + $pst + + let sendReqRes = waku_thread.sendRequestToWakuThread( + ctx, + RequestType.LIGHTPUSH, + LightpushRequest.createShared( + LightpushMsgType.PUBLISH, PubsubTopic($pst), wakuMessage + ), + ) + + if sendReqRes.isErr(): + let msg = $sendReqRes.error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + let msg = $sendReqRes.value + callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_OK + proc waku_connect( ctx: ptr Context, peerMultiAddr: cstring, @@ -408,21 +522,30 @@ proc waku_connect( proc waku_store_query( ctx: ptr Context, - queryJson: cstring, - peerId: cstring, + jsonQuery: cstring, + peerAddr: cstring, timeoutMs: cint, callback: WakuCallBack, userData: pointer, ): cint {.dynlib, exportc.} = ctx[].userData = userData - ## TODO: implement the logic that make the "self" node to act as a Store client + if isNil(callback): + return RET_MISSING_CALLBACK - # if sendReqRes.isErr(): - # let msg = $sendReqRes.error - # callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) - # return RET_ERR + let sendReqRes = waku_thread.sendRequestToWakuThread( + ctx, + RequestType.STORE, + JsonStoreQueryRequest.createShared(jsonQuery, peerAddr, timeoutMs, callback), + ) + if sendReqRes.isErr(): + let msg = $sendReqRes.error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + let msg = $sendReqRes.value + callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_OK proc waku_listen_addresses( @@ -510,5 +633,39 @@ proc waku_get_my_enr( callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_OK +proc waku_start_discv5( + ctx: ptr Context, callback: WakuCallBack, userData: pointer +): cint {.dynlib, exportc.} = + ctx[].userData = userData + + let resp = waku_thread.sendRequestToWakuThread( + ctx, RequestType.DISCOVERY, DiscoveryRequest.createDiscV5StartRequest() + ).valueOr: + let msg = $error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + let msg = $resp + callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + + return RET_OK + +proc waku_stop_discv5( + ctx: ptr Context, callback: WakuCallBack, userData: pointer +): cint {.dynlib, exportc.} = + ctx[].userData = userData + + let resp = waku_thread.sendRequestToWakuThread( + ctx, RequestType.DISCOVERY, DiscoveryRequest.createDiscV5StopRequest() + ).valueOr: + let msg = $error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + let msg = $resp + callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + + return RET_OK + ### End of exported procs ################################################################################ diff --git a/library/waku_thread/inter_thread_communication/requests/discovery_request.nim b/library/waku_thread/inter_thread_communication/requests/discovery_request.nim index d7c58835f..6741d5f47 100644 --- a/library/waku_thread/inter_thread_communication/requests/discovery_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/discovery_request.nim @@ -10,6 +10,8 @@ import type DiscoveryMsgType* = enum GET_BOOTSTRAP_NODES UPDATE_DISCV5_BOOTSTRAP_NODES + START_DISCV5 + STOP_DISCV5 type DiscoveryRequest* = object operation: DiscoveryMsgType @@ -52,6 +54,12 @@ proc createUpdateBootstrapNodesRequest*( ): ptr type T = return T.createShared(op, "", "", 0, nodes) +proc createDiscV5StartRequest*(T: type DiscoveryRequest): ptr type T = + return T.createShared(START_DISCV5, "", "", 0, "") + +proc createDiscV5StopRequest*(T: type DiscoveryRequest): ptr type T = + return T.createShared(STOP_DISCV5, "", "", 0, "") + proc destroyShared(self: ptr DiscoveryRequest) = deallocShared(self[].enrTreeUrl) deallocShared(self[].nameDnsServer) @@ -86,6 +94,16 @@ proc process*( destroyShared(self) case self.operation + of START_DISCV5: + let res = await waku.wakuDiscv5.start() + res.isOkOr: + return err($error) + + return ok("discv5 started correctly") + of STOP_DISCV5: + await waku.wakuDiscv5.stop() + + return ok("discv5 stopped correctly") of GET_BOOTSTRAP_NODES: let nodes = retrieveBootstrapNodes($self[].enrTreeUrl, $self[].nameDnsServer).valueOr: return err($error) diff --git a/library/waku_thread/inter_thread_communication/requests/protocols/lightpush_request.nim b/library/waku_thread/inter_thread_communication/requests/protocols/lightpush_request.nim new file mode 100644 index 000000000..810e93836 --- /dev/null +++ b/library/waku_thread/inter_thread_communication/requests/protocols/lightpush_request.nim @@ -0,0 +1,103 @@ +import std/net, options +import chronicles, chronos, stew/byteutils, results +import + ../../../../../waku/waku_core/message/message, + ../../../../../waku/factory/waku, + ../../../../../waku/waku_core/message, + ../../../../../waku/waku_core/time, # Timestamp + ../../../../../waku/waku_core/topics/pubsub_topic, + ../../../../../waku/waku_lightpush/client, + ../../../../../waku/waku_lightpush/common, + ../../../../../waku/node/peer_manager/peer_manager, + ../../../../alloc + +type LightpushMsgType* = enum + PUBLISH + +type ThreadSafeWakuMessage* = object + payload: SharedSeq[byte] + contentTopic: cstring + meta: SharedSeq[byte] + version: uint32 + timestamp: Timestamp + ephemeral: bool + when defined(rln): + proof: SharedSeq[byte] + +type LightpushRequest* = object + operation: LightpushMsgType + pubsubTopic: cstring + message: ThreadSafeWakuMessage # only used in 'PUBLISH' requests + +proc createShared*( + T: type LightpushRequest, + op: LightpushMsgType, + pubsubTopic: PubsubTopic, + m = WakuMessage(), +): ptr type T = + var ret = createShared(T) + ret[].operation = op + ret[].pubsubTopic = pubsubTopic.alloc() + ret[].message = ThreadSafeWakuMessage( + payload: allocSharedSeq(m.payload), + contentTopic: m.contentTopic.alloc(), + meta: allocSharedSeq(m.meta), + version: m.version, + timestamp: m.timestamp, + ephemeral: m.ephemeral, + ) + when defined(rln): + ret[].message.proof = allocSharedSeq(m.proof) + + return ret + +proc destroyShared(self: ptr LightpushRequest) = + deallocSharedSeq(self[].message.payload) + deallocShared(self[].message.contentTopic) + deallocSharedSeq(self[].message.meta) + when defined(rln): + deallocSharedSeq(self[].message.proof) + + deallocShared(self) + +proc toWakuMessage(m: ThreadSafeWakuMessage): WakuMessage = + var wakuMessage = WakuMessage() + + wakuMessage.payload = m.payload.toSeq() + wakuMessage.contentTopic = $m.contentTopic + wakuMessage.meta = m.meta.toSeq() + wakuMessage.version = m.version + wakuMessage.timestamp = m.timestamp + wakuMessage.ephemeral = m.ephemeral + + when defined(rln): + wakuMessage.proof = m.proof + + return wakuMessage + +proc process*( + self: ptr LightpushRequest, waku: ptr Waku +): Future[Result[string, string]] {.async.} = + defer: + destroyShared(self) + + case self.operation + of PUBLISH: + let msg = self.message.toWakuMessage() + let pubsubTopic = $self.pubsubTopic + + if waku.node.wakuLightpushClient.isNil(): + return err("LightpushRequest waku.node.wakuLightpushClient is nil") + + let peerOpt = waku.node.peerManager.selectPeer(WakuLightPushCodec) + if peerOpt.isNone(): + return err("failed to lightpublish message, no suitable remote peers") + + ( + await waku.node.wakuLightpushClient.publish( + pubsubTopic, msg, peer = peerOpt.get() + ) + ).isOkOr: + return err("LightpushRequest error publishing: " & $error) + + return ok("") diff --git a/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim b/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim index b560d1d76..ca748889e 100644 --- a/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim @@ -13,6 +13,10 @@ type RelayMsgType* = enum SUBSCRIBE UNSUBSCRIBE PUBLISH + LIST_CONNECTED_PEERS + ## to return the list of all connected peers to an specific pubsub topic + LIST_MESH_PEERS + ## to return the list of only the peers that conform the mesh for a particular pubsub topic type ThreadSafeWakuMessage* = object payload: SharedSeq[byte] @@ -104,5 +108,13 @@ proc process*( elif numPeers > 0: let msgHash = computeMessageHash(pubSubTopic, msg).to0xHex return ok(msgHash) + of LIST_CONNECTED_PEERS: + let numConnPeers = waku.node.wakuRelay.getNumConnectedPeers($self.pubsubTopic).valueOr: + return err($error) + return ok($numConnPeers) + of LIST_MESH_PEERS: + let numPeersInMesh = waku.node.wakuRelay.getNumPeersInMesh($self.pubsubTopic).valueOr: + return err($error) + return ok($numPeersInMesh) return ok("") diff --git a/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim b/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim index da611915f..9a84dacbb 100644 --- a/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim @@ -1,12 +1,21 @@ +import std/[json, sugar, options] import chronos, results -import ../../../../../waku/factory/waku, ../../../../alloc, ../../../../callback +import + ../../../../../waku/factory/waku, + ../../../../alloc, + ../../../../callback, + ../../../../../waku/waku_core/peers, + ../../../../../waku/waku_core/time, + ../../../../../waku/waku_core/message/digest, + ../../../../../waku/waku_store/common, + ../../../../../waku/waku_store/client, + ../../../../../waku/common/paging type StoreReqType* = enum REMOTE_QUERY ## to perform a query to another Store node - LOCAL_QUERY ## to retrieve the data from 'self' node -type StoreQueryRequest* = object - queryJson: cstring +type JsonStoreQueryRequest* = object + jsonQuery: cstring peerAddr: cstring timeoutMs: cint storeCallback: WakuCallBack @@ -15,38 +24,119 @@ type StoreRequest* = object operation: StoreReqType storeReq: pointer -proc createShared*( - T: type StoreRequest, operation: StoreReqType, request: pointer -): ptr type T = - var ret = createShared(T) - ret[].request = request - return ret +func fromJsonNode( + T: type JsonStoreQueryRequest, jsonContent: JsonNode +): StoreQueryRequest = + let contentTopics = collect(newSeq): + for cTopic in jsonContent["content_topics"].getElems(): + cTopic.getStr() + + let msgHashes = collect(newSeq): + for hashJsonObj in jsonContent["message_hashes"].getElems(): + var hash: WakuMessageHash + var count: int = 0 + for byteValue in hashJsonObj.getElems(): + hash[count] = byteValue.getInt().byte + count.inc() + + hash + + let pubsubTopic = + if jsonContent.contains("pubsub_topic"): + some(jsonContent["pubsub_topic"].getStr()) + else: + none(string) + + let startTime = + if jsonContent.contains("time_start"): + some(Timestamp(jsonContent["time_start"].getInt())) + else: + none(Timestamp) + + let endTime = + if jsonContent.contains("time_end"): + some(Timestamp(jsonContent["time_end"].getInt())) + else: + none(Timestamp) + + let paginationCursor = + if jsonContent.contains("pagination_cursor"): + var hash: WakuMessageHash + var count: int = 0 + for byteValue in jsonContent["pagination_cursor"].getElems(): + hash[count] = byteValue.getInt().byte + count.inc() + + some(hash) + else: + none(WakuMessageHash) + + let paginationForwardBool = jsonContent["pagination_forward"].getBool() + let paginationForward = + if paginationForwardBool: PagingDirection.FORWARD else: PagingDirection.BACKWARD + + let paginationLimit = + if jsonContent.contains("pagination_limit"): + some(uint64(jsonContent["pagination_limit"].getInt())) + else: + none(uint64) + + return StoreQueryRequest( + requestId: jsonContent["request_id"].getStr(), + includeData: jsonContent["include_data"].getBool(), + pubsubTopic: pubsubTopic, + contentTopics: contentTopics, + startTime: startTime, + endTime: endTime, + messageHashes: msgHashes, + paginationCursor: paginationCursor, + paginationForward: paginationForward, + paginationLimit: paginationLimit, + ) proc createShared*( - T: type StoreQueryRequest, - queryJson: cstring, + T: type JsonStoreQueryRequest, + jsonQuery: cstring, peerAddr: cstring, timeoutMs: cint, storeCallback: WakuCallBack = nil, ): ptr type T = var ret = createShared(T) ret[].timeoutMs = timeoutMs - ret[].queryJson = queryJson.alloc() + ret[].jsonQuery = jsonQuery.alloc() ret[].peerAddr = peerAddr.alloc() ret[].storeCallback = storeCallback return ret -proc destroyShared(self: ptr StoreQueryRequest) = - deallocShared(self[].queryJson) +proc destroyShared(self: ptr JsonStoreQueryRequest) = + deallocShared(self[].jsonQuery) deallocShared(self[].peerAddr) deallocShared(self) proc process( - self: ptr StoreQueryRequest, waku: ptr Waku + self: ptr JsonStoreQueryRequest, waku: ptr Waku ): Future[Result[string, string]] {.async.} = defer: destroyShared(self) + let jsonContentRes = catch: + parseJson($self[].jsonQuery) + + if jsonContentRes.isErr(): + return err( + "JsonStoreQueryRequest failed parsing store request: " & jsonContentRes.error.msg + ) + + let storeQueryRequest = JsonStoreQueryRequest.fromJsonNode(jsonContentRes.get()) + + let peer = peers.parsePeerInfo($self[].peerAddr).valueOr: + return err("JsonStoreQueryRequest failed to parse peer addr: " & $error) + + let queryResponse = (await waku.node.wakuStoreClient.query(storeQueryRequest, peer)).valueOr: + return err("JsonStoreQueryRequest failed store query: " & $error) + + return ok($(%*queryResponse)) ## returning the response in json format + proc process*( self: ptr StoreRequest, waku: ptr Waku ): Future[Result[string, string]] {.async.} = @@ -55,9 +145,6 @@ proc process*( case self.operation of REMOTE_QUERY: - return await cast[ptr StoreQueryRequest](self[].storeReq).process(waku) - of LOCAL_QUERY: - discard - # cast[ptr StoreQueryRequest](request[].reqContent).process(node) + return await cast[ptr JsonStoreQueryRequest](self[].storeReq).process(waku) - return ok("") + return err("store request not handled at all") diff --git a/library/waku_thread/inter_thread_communication/waku_thread_request.nim b/library/waku_thread/inter_thread_communication/waku_thread_request.nim index 749eac1e9..4f1733047 100644 --- a/library/waku_thread/inter_thread_communication/waku_thread_request.nim +++ b/library/waku_thread/inter_thread_communication/waku_thread_request.nim @@ -10,6 +10,7 @@ import ./requests/peer_manager_request, ./requests/protocols/relay_request, ./requests/protocols/store_request, + ./requests/protocols/lightpush_request, ./requests/debug_node_request, ./requests/discovery_request @@ -20,6 +21,7 @@ type RequestType* {.pure.} = enum STORE DEBUG DISCOVERY + LIGHTPUSH type InterThreadRequest* = object reqType: RequestType @@ -56,6 +58,8 @@ proc process*( cast[ptr DebugNodeRequest](request[].reqContent).process(waku[]) of DISCOVERY: cast[ptr DiscoveryRequest](request[].reqContent).process(waku) + of LIGHTPUSH: + cast[ptr LightpushRequest](request[].reqContent).process(waku) return await retFut diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index 14532b1c0..e4d178281 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -453,3 +453,44 @@ proc publish*( obs.onMessagePublished(pubSubTopic, message) return relayedPeerCount + +proc getNumPeersInMesh*(w: WakuRelay, pubsubTopic: PubsubTopic): Result[int, string] = + ## Returns the number of peers in a mesh defined by the passed pubsub topic. + ## The 'mesh' atribute is defined in the GossipSub ref object. + + if not w.mesh.hasKey(pubsubTopic): + return err( + "getNumPeersInMesh - there is no mesh peer for the given pubsub topic: " & + pubsubTopic + ) + + let peersRes = catch: + w.mesh[pubsubTopic] + + let peers: HashSet[PubSubPeer] = peersRes.valueOr: + return + err("getNumPeersInMesh - exception accessing " & pubsubTopic & ": " & error.msg) + + return ok(peers.len) + +proc getNumConnectedPeers*( + w: WakuRelay, pubsubTopic: PubsubTopic +): Result[int, string] = + ## Returns the number of connected peers and subscribed to the passed pubsub topic. + ## The 'gossipsub' atribute is defined in the GossipSub ref object. + + if not w.gossipsub.hasKey(pubsubTopic): + return err( + "getNumConnectedPeers - there is no gossipsub peer for the given pubsub topic: " & + pubsubTopic + ) + + let peersRes = catch: + w.gossipsub[pubsubTopic] + + let peers: HashSet[PubSubPeer] = peersRes.valueOr: + return err( + "getNumConnectedPeers - exception accessing " & pubsubTopic & ": " & error.msg + ) + + return ok(peers.len)