From 004b56e422a5c1cbccf925beefefc7accde52231 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Wed, 11 Sep 2024 10:13:54 +0200 Subject: [PATCH] chore: libwaku - extending the library with peer_manager and peer_exchange features (#3026) * libwaku: get peerids by protocol and peer exchange request --- examples/cbindings/waku_example.c | 6 +- library/libwaku.h | 15 +++++ library/libwaku.nim | 56 +++++++++++++++++++ .../requests/discovery_request.nim | 32 +++++++++-- .../requests/peer_manager_request.nim | 39 +++++++++++-- waku/waku_relay/protocol.nim | 7 +++ 6 files changed, 145 insertions(+), 10 deletions(-) diff --git a/examples/cbindings/waku_example.c b/examples/cbindings/waku_example.c index d1987973f..39ee89f27 100644 --- a/examples/cbindings/waku_example.c +++ b/examples/cbindings/waku_example.c @@ -97,7 +97,7 @@ void event_handler(int callerRet, const char* msg, size_t len, void* userData) { exit(1); } else if (callerRet == RET_OK) { - printf("Receiving message %s\n", msg); + printf("Receiving event: %s\n", msg); } } @@ -326,6 +326,10 @@ int main(int argc, char** argv) { event_handler, userData) ); + WAKU_CALL( waku_get_peerids_from_peerstore(ctx, + event_handler, + userData) ); + show_main_menu(); while(1) { handle_user_input(); diff --git a/library/libwaku.h b/library/libwaku.h index 0024dcd96..edd69757d 100644 --- a/library/libwaku.h +++ b/library/libwaku.h @@ -7,6 +7,7 @@ #define __libwaku__ #include +#include // The possible returned values for the functions that return int #define RET_OK 0 @@ -114,6 +115,15 @@ int waku_connect(void* ctx, WakuCallBack callback, void* userData); +int waku_get_peerids_from_peerstore(void* ctx, + WakuCallBack callback, + void* userData); + +int waku_get_peerids_by_protocol(void* ctx, + const char* protocol, + WakuCallBack callback, + void* userData); + int waku_listen_addresses(void* ctx, WakuCallBack callback, void* userData); @@ -150,6 +160,11 @@ int waku_get_my_enr(void* ctx, WakuCallBack callback, void* userData); +int waku_peer_exchange_request(void* ctx, + int numPeers, + WakuCallBack callback, + void* userData); + #ifdef __cplusplus } #endif diff --git a/library/libwaku.nim b/library/libwaku.nim index 1b305dbc4..1400e37bb 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -514,6 +514,46 @@ proc waku_connect( return RET_OK +proc waku_get_peerids_from_peerstore( + ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer +): cint {.dynlib, exportc.} = + checkLibwakuParams(ctx, callback, userData) + + let connRes = waku_thread.sendRequestToWakuThread( + ctx, + RequestType.PEER_MANAGER, + PeerManagementRequest.createShared( + PeerManagementMsgType.GET_ALL_PEER_IDS + ), + ) + if connRes.isErr(): + let msg = $connRes.error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + let msg = $connRes.value + callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_OK + +proc waku_get_peerids_by_protocol( + ctx: ptr WakuContext, protocol: cstring, callback: WakuCallBack, userData: pointer +): cint {.dynlib, exportc.} = + checkLibwakuParams(ctx, callback, userData) + + let connRes = waku_thread.sendRequestToWakuThread( + ctx, + RequestType.PEER_MANAGER, + PeerManagementRequest.createGetPeerIdsByProtocolRequest($protocol), + ) + if connRes.isErr(): + let msg = $connRes.error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + let msg = $connRes.value + callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_OK + proc waku_store_query( ctx: ptr WakuContext, jsonQuery: cstring, @@ -658,5 +698,21 @@ proc waku_stop_discv5( return RET_OK +proc waku_peer_exchange_request( + ctx: ptr WakuContext, numPeers: uint64, callback: WakuCallBack, userData: pointer +): cint {.dynlib, exportc.} = + checkLibwakuParams(ctx, callback, userData) + + let discoveredPeers = waku_thread.sendRequestToWakuThread( + ctx, RequestType.DISCOVERY, DiscoveryRequest.createPeerExchangeRequest(numPeers) + ).valueOr: + let msg = $error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + let msg = $discoveredPeers + callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_OK + ### End of exported procs ################################################################################ diff --git a/library/waku_thread/inter_thread_communication/requests/discovery_request.nim b/library/waku_thread/inter_thread_communication/requests/discovery_request.nim index 6741d5f47..3cbf1de7c 100644 --- a/library/waku_thread/inter_thread_communication/requests/discovery_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/discovery_request.nim @@ -4,7 +4,9 @@ import ../../../../waku/factory/waku, ../../../../waku/discovery/waku_dnsdisc, ../../../../waku/discovery/waku_discv5, + ../../../../waku/waku_peer_exchange, ../../../../waku/waku_core/peers, + ../../../../waku/node/waku_node, ../../../alloc type DiscoveryMsgType* = enum @@ -12,6 +14,7 @@ type DiscoveryMsgType* = enum UPDATE_DISCV5_BOOTSTRAP_NODES START_DISCV5 STOP_DISCV5 + PEER_EXCHANGE type DiscoveryRequest* = object operation: DiscoveryMsgType @@ -24,6 +27,9 @@ type DiscoveryRequest* = object ## used in UPDATE_DISCV5_BOOTSTRAP_NODES nodes: cstring + ## used in PEER_EXCHANGE + numPeers: uint64 + proc createShared( T: type DiscoveryRequest, op: DiscoveryMsgType, @@ -31,6 +37,7 @@ proc createShared( nameDnsServer: cstring, timeoutMs: cint, nodes: cstring, + numPeers: uint64, ): ptr type T = var ret = createShared(T) ret[].operation = op @@ -38,6 +45,7 @@ proc createShared( ret[].nameDnsServer = nameDnsServer.alloc() ret[].timeoutMs = timeoutMs ret[].nodes = nodes.alloc() + ret[].numPeers = numPeers return ret proc createRetrieveBootstrapNodesRequest*( @@ -47,22 +55,28 @@ proc createRetrieveBootstrapNodesRequest*( nameDnsServer: cstring, timeoutMs: cint, ): ptr type T = - return T.createShared(op, enrTreeUrl, nameDnsServer, timeoutMs, "") + return T.createShared(op, enrTreeUrl, nameDnsServer, timeoutMs, "", 0) proc createUpdateBootstrapNodesRequest*( T: type DiscoveryRequest, op: DiscoveryMsgType, nodes: cstring ): ptr type T = - return T.createShared(op, "", "", 0, nodes) + return T.createShared(op, "", "", 0, nodes, 0) proc createDiscV5StartRequest*(T: type DiscoveryRequest): ptr type T = - return T.createShared(START_DISCV5, "", "", 0, "") + return T.createShared(START_DISCV5, "", "", 0, "", 0) proc createDiscV5StopRequest*(T: type DiscoveryRequest): ptr type T = - return T.createShared(STOP_DISCV5, "", "", 0, "") + return T.createShared(STOP_DISCV5, "", "", 0, "", 0) + +proc createPeerExchangeRequest*( + T: type DiscoveryRequest, numPeers: uint64 +): ptr type T = + return T.createShared(PEER_EXCHANGE, "", "", 0, "", numPeers) proc destroyShared(self: ptr DiscoveryRequest) = deallocShared(self[].enrTreeUrl) deallocShared(self[].nameDnsServer) + deallocShared(self[].nodes) deallocShared(self) proc retrieveBootstrapNodes( @@ -87,6 +101,11 @@ proc updateDiscv5BootstrapNodes(nodes: string, waku: ptr Waku): Result[void, str return err("error in updateDiscv5BootstrapNodes: " & $error) return ok() +proc performPeerExchangeRequestTo( + numPeers: uint64, waku: ptr Waku +): Future[Result[int, string]] {.async.} = + return await waku.node.fetchPeerExchangePeers(numPeers) + proc process*( self: ptr DiscoveryRequest, waku: ptr Waku ): Future[Result[string, string]] {.async.} = @@ -112,6 +131,11 @@ proc process*( of UPDATE_DISCV5_BOOTSTRAP_NODES: updateDiscv5BootstrapNodes($self[].nodes, waku).isOkOr: return err($error) + return ok("discovery request processed correctly") + of PEER_EXCHANGE: + let numValidPeers = (await performPeerExchangeRequestTo(self[].numPeers, waku)).valueOr: + return err("error calling performPeerExchangeRequestTo: " & $error) + return ok($numValidPeers) return err("discovery request not handled") diff --git a/library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim b/library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim index 0c4b60b97..439859c6f 100644 --- a/library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim @@ -1,20 +1,27 @@ import std/[sequtils, strutils] import chronicles, chronos, results -import ../../../../waku/factory/waku, ../../../../waku/node/waku_node, ../../../alloc +import + ../../../../waku/factory/waku, + ../../../../waku/node/waku_node, + ../../../alloc, + ../../../../waku/node/peer_manager -type PeerManagementMsgType* = enum +type PeerManagementMsgType* {.pure.} = enum CONNECT_TO + GET_ALL_PEER_IDS + GET_PEER_IDS_BY_PROTOCOL type PeerManagementRequest* = object operation: PeerManagementMsgType peerMultiAddr: cstring dialTimeout: Duration + protocol: cstring proc createShared*( T: type PeerManagementRequest, op: PeerManagementMsgType, - peerMultiAddr: string, - dialTimeout: Duration, + peerMultiAddr = "", + dialTimeout = chronos.milliseconds(0), ## arbitrary Duration as not all ops needs dialTimeout ): ptr type T = var ret = createShared(T) ret[].operation = op @@ -22,8 +29,21 @@ proc createShared*( ret[].dialTimeout = dialTimeout return ret +proc createGetPeerIdsByProtocolRequest*( + T: type PeerManagementRequest, protocol = "" +): ptr type T = + var ret = createShared(T) + ret[].operation = PeerManagementMsgType.GET_PEER_IDS_BY_PROTOCOL + ret[].protocol = protocol.alloc() + return ret + proc destroyShared(self: ptr PeerManagementRequest) = - deallocShared(self[].peerMultiAddr) + if not isNil(self[].peerMultiAddr): + deallocShared(self[].peerMultiAddr) + + if not isNil(self[].protocol): + deallocShared(self[].protocol) + deallocShared(self) proc connectTo( @@ -53,5 +73,14 @@ proc process*( let ret = waku.node.connectTo($self[].peerMultiAddr, self[].dialTimeout) if ret.isErr(): return err(ret.error) + of GET_ALL_PEER_IDS: + ## returns a comma-separated string of peerIDs + let peerIDs = waku.node.peerManager.peerStore.peers().mapIt($it.peerId).join(",") + return ok(peerIDs) + of GET_PEER_IDS_BY_PROTOCOL: + ## returns a comma-separated string of peerIDs that mount the given protocol + let (inPeers, outPeers) = waku.node.peerManager.connectedPeers($self[].protocol) + let allPeerIDs = inPeers & outPeers + return ok(allPeerIDs.mapIt(it.hex()).join(",")) return ok("") diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index e4d178281..fc8aa4172 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -479,6 +479,13 @@ proc getNumConnectedPeers*( ## Returns the number of connected peers and subscribed to the passed pubsub topic. ## The 'gossipsub' atribute is defined in the GossipSub ref object. + if pubsubTopic == "": + ## Return all the connected peers + var numConnPeers = 0 + for k, v in w.gossipsub: + numConnPeers.inc(v.len) + return ok(numConnPeers) + if not w.gossipsub.hasKey(pubsubTopic): return err( "getNumConnectedPeers - there is no gossipsub peer for the given pubsub topic: " &