chore: libwaku - extending the library with peer_manager and peer_exchange features (#3026)

* libwaku: get peerids by protocol and peer exchange request
This commit is contained in:
Ivan FB 2024-09-11 10:13:54 +02:00 committed by GitHub
parent 723b009b20
commit 004b56e422
6 changed files with 145 additions and 10 deletions

View File

@ -97,7 +97,7 @@ void event_handler(int callerRet, const char* msg, size_t len, void* userData) {
exit(1);
}
else if (callerRet == RET_OK) {
printf("Receiving message %s\n", msg);
printf("Receiving event: %s\n", msg);
}
}
@ -326,6 +326,10 @@ int main(int argc, char** argv) {
event_handler,
userData) );
WAKU_CALL( waku_get_peerids_from_peerstore(ctx,
event_handler,
userData) );
show_main_menu();
while(1) {
handle_user_input();

View File

@ -7,6 +7,7 @@
#define __libwaku__
#include <stddef.h>
#include <stdint.h>
// The possible returned values for the functions that return int
#define RET_OK 0
@ -114,6 +115,15 @@ int waku_connect(void* ctx,
WakuCallBack callback,
void* userData);
int waku_get_peerids_from_peerstore(void* ctx,
WakuCallBack callback,
void* userData);
int waku_get_peerids_by_protocol(void* ctx,
const char* protocol,
WakuCallBack callback,
void* userData);
int waku_listen_addresses(void* ctx,
WakuCallBack callback,
void* userData);
@ -150,6 +160,11 @@ int waku_get_my_enr(void* ctx,
WakuCallBack callback,
void* userData);
int waku_peer_exchange_request(void* ctx,
int numPeers,
WakuCallBack callback,
void* userData);
#ifdef __cplusplus
}
#endif

View File

@ -514,6 +514,46 @@ proc waku_connect(
return RET_OK
proc waku_get_peerids_from_peerstore(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
checkLibwakuParams(ctx, callback, userData)
let connRes = waku_thread.sendRequestToWakuThread(
ctx,
RequestType.PEER_MANAGER,
PeerManagementRequest.createShared(
PeerManagementMsgType.GET_ALL_PEER_IDS
),
)
if connRes.isErr():
let msg = $connRes.error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
let msg = $connRes.value
callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_OK
proc waku_get_peerids_by_protocol(
ctx: ptr WakuContext, protocol: cstring, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
checkLibwakuParams(ctx, callback, userData)
let connRes = waku_thread.sendRequestToWakuThread(
ctx,
RequestType.PEER_MANAGER,
PeerManagementRequest.createGetPeerIdsByProtocolRequest($protocol),
)
if connRes.isErr():
let msg = $connRes.error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
let msg = $connRes.value
callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_OK
proc waku_store_query(
ctx: ptr WakuContext,
jsonQuery: cstring,
@ -658,5 +698,21 @@ proc waku_stop_discv5(
return RET_OK
proc waku_peer_exchange_request(
ctx: ptr WakuContext, numPeers: uint64, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
checkLibwakuParams(ctx, callback, userData)
let discoveredPeers = waku_thread.sendRequestToWakuThread(
ctx, RequestType.DISCOVERY, DiscoveryRequest.createPeerExchangeRequest(numPeers)
).valueOr:
let msg = $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
let msg = $discoveredPeers
callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_OK
### End of exported procs
################################################################################

View File

@ -4,7 +4,9 @@ import
../../../../waku/factory/waku,
../../../../waku/discovery/waku_dnsdisc,
../../../../waku/discovery/waku_discv5,
../../../../waku/waku_peer_exchange,
../../../../waku/waku_core/peers,
../../../../waku/node/waku_node,
../../../alloc
type DiscoveryMsgType* = enum
@ -12,6 +14,7 @@ type DiscoveryMsgType* = enum
UPDATE_DISCV5_BOOTSTRAP_NODES
START_DISCV5
STOP_DISCV5
PEER_EXCHANGE
type DiscoveryRequest* = object
operation: DiscoveryMsgType
@ -24,6 +27,9 @@ type DiscoveryRequest* = object
## used in UPDATE_DISCV5_BOOTSTRAP_NODES
nodes: cstring
## used in PEER_EXCHANGE
numPeers: uint64
proc createShared(
T: type DiscoveryRequest,
op: DiscoveryMsgType,
@ -31,6 +37,7 @@ proc createShared(
nameDnsServer: cstring,
timeoutMs: cint,
nodes: cstring,
numPeers: uint64,
): ptr type T =
var ret = createShared(T)
ret[].operation = op
@ -38,6 +45,7 @@ proc createShared(
ret[].nameDnsServer = nameDnsServer.alloc()
ret[].timeoutMs = timeoutMs
ret[].nodes = nodes.alloc()
ret[].numPeers = numPeers
return ret
proc createRetrieveBootstrapNodesRequest*(
@ -47,22 +55,28 @@ proc createRetrieveBootstrapNodesRequest*(
nameDnsServer: cstring,
timeoutMs: cint,
): ptr type T =
return T.createShared(op, enrTreeUrl, nameDnsServer, timeoutMs, "")
return T.createShared(op, enrTreeUrl, nameDnsServer, timeoutMs, "", 0)
proc createUpdateBootstrapNodesRequest*(
T: type DiscoveryRequest, op: DiscoveryMsgType, nodes: cstring
): ptr type T =
return T.createShared(op, "", "", 0, nodes)
return T.createShared(op, "", "", 0, nodes, 0)
proc createDiscV5StartRequest*(T: type DiscoveryRequest): ptr type T =
return T.createShared(START_DISCV5, "", "", 0, "")
return T.createShared(START_DISCV5, "", "", 0, "", 0)
proc createDiscV5StopRequest*(T: type DiscoveryRequest): ptr type T =
return T.createShared(STOP_DISCV5, "", "", 0, "")
return T.createShared(STOP_DISCV5, "", "", 0, "", 0)
proc createPeerExchangeRequest*(
T: type DiscoveryRequest, numPeers: uint64
): ptr type T =
return T.createShared(PEER_EXCHANGE, "", "", 0, "", numPeers)
proc destroyShared(self: ptr DiscoveryRequest) =
deallocShared(self[].enrTreeUrl)
deallocShared(self[].nameDnsServer)
deallocShared(self[].nodes)
deallocShared(self)
proc retrieveBootstrapNodes(
@ -87,6 +101,11 @@ proc updateDiscv5BootstrapNodes(nodes: string, waku: ptr Waku): Result[void, str
return err("error in updateDiscv5BootstrapNodes: " & $error)
return ok()
proc performPeerExchangeRequestTo(
numPeers: uint64, waku: ptr Waku
): Future[Result[int, string]] {.async.} =
return await waku.node.fetchPeerExchangePeers(numPeers)
proc process*(
self: ptr DiscoveryRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
@ -112,6 +131,11 @@ proc process*(
of UPDATE_DISCV5_BOOTSTRAP_NODES:
updateDiscv5BootstrapNodes($self[].nodes, waku).isOkOr:
return err($error)
return ok("discovery request processed correctly")
of PEER_EXCHANGE:
let numValidPeers = (await performPeerExchangeRequestTo(self[].numPeers, waku)).valueOr:
return err("error calling performPeerExchangeRequestTo: " & $error)
return ok($numValidPeers)
return err("discovery request not handled")

View File

@ -1,20 +1,27 @@
import std/[sequtils, strutils]
import chronicles, chronos, results
import ../../../../waku/factory/waku, ../../../../waku/node/waku_node, ../../../alloc
import
../../../../waku/factory/waku,
../../../../waku/node/waku_node,
../../../alloc,
../../../../waku/node/peer_manager
type PeerManagementMsgType* = enum
type PeerManagementMsgType* {.pure.} = enum
CONNECT_TO
GET_ALL_PEER_IDS
GET_PEER_IDS_BY_PROTOCOL
type PeerManagementRequest* = object
operation: PeerManagementMsgType
peerMultiAddr: cstring
dialTimeout: Duration
protocol: cstring
proc createShared*(
T: type PeerManagementRequest,
op: PeerManagementMsgType,
peerMultiAddr: string,
dialTimeout: Duration,
peerMultiAddr = "",
dialTimeout = chronos.milliseconds(0), ## arbitrary Duration as not all ops needs dialTimeout
): ptr type T =
var ret = createShared(T)
ret[].operation = op
@ -22,8 +29,21 @@ proc createShared*(
ret[].dialTimeout = dialTimeout
return ret
proc createGetPeerIdsByProtocolRequest*(
T: type PeerManagementRequest, protocol = ""
): ptr type T =
var ret = createShared(T)
ret[].operation = PeerManagementMsgType.GET_PEER_IDS_BY_PROTOCOL
ret[].protocol = protocol.alloc()
return ret
proc destroyShared(self: ptr PeerManagementRequest) =
deallocShared(self[].peerMultiAddr)
if not isNil(self[].peerMultiAddr):
deallocShared(self[].peerMultiAddr)
if not isNil(self[].protocol):
deallocShared(self[].protocol)
deallocShared(self)
proc connectTo(
@ -53,5 +73,14 @@ proc process*(
let ret = waku.node.connectTo($self[].peerMultiAddr, self[].dialTimeout)
if ret.isErr():
return err(ret.error)
of GET_ALL_PEER_IDS:
## returns a comma-separated string of peerIDs
let peerIDs = waku.node.peerManager.peerStore.peers().mapIt($it.peerId).join(",")
return ok(peerIDs)
of GET_PEER_IDS_BY_PROTOCOL:
## returns a comma-separated string of peerIDs that mount the given protocol
let (inPeers, outPeers) = waku.node.peerManager.connectedPeers($self[].protocol)
let allPeerIDs = inPeers & outPeers
return ok(allPeerIDs.mapIt(it.hex()).join(","))
return ok("")

View File

@ -479,6 +479,13 @@ proc getNumConnectedPeers*(
## Returns the number of connected peers and subscribed to the passed pubsub topic.
## The 'gossipsub' atribute is defined in the GossipSub ref object.
if pubsubTopic == "":
## Return all the connected peers
var numConnPeers = 0
for k, v in w.gossipsub:
numConnPeers.inc(v.len)
return ok(numConnPeers)
if not w.gossipsub.hasKey(pubsubTopic):
return err(
"getNumConnectedPeers - there is no gossipsub peer for the given pubsub topic: " &