mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-03 22:43:09 +00:00
chore: libwaku - extending the library with peer_manager and peer_exchange features (#3026)
* libwaku: get peerids by protocol and peer exchange request
This commit is contained in:
parent
ce9a8c468a
commit
5ea1cf0cf3
@ -97,7 +97,7 @@ void event_handler(int callerRet, const char* msg, size_t len, void* userData) {
|
|||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
else if (callerRet == RET_OK) {
|
else if (callerRet == RET_OK) {
|
||||||
printf("Receiving message %s\n", msg);
|
printf("Receiving event: %s\n", msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -326,6 +326,10 @@ int main(int argc, char** argv) {
|
|||||||
event_handler,
|
event_handler,
|
||||||
userData) );
|
userData) );
|
||||||
|
|
||||||
|
WAKU_CALL( waku_get_peerids_from_peerstore(ctx,
|
||||||
|
event_handler,
|
||||||
|
userData) );
|
||||||
|
|
||||||
show_main_menu();
|
show_main_menu();
|
||||||
while(1) {
|
while(1) {
|
||||||
handle_user_input();
|
handle_user_input();
|
||||||
|
|||||||
@ -7,6 +7,7 @@
|
|||||||
#define __libwaku__
|
#define __libwaku__
|
||||||
|
|
||||||
#include <stddef.h>
|
#include <stddef.h>
|
||||||
|
#include <stdint.h>
|
||||||
|
|
||||||
// The possible returned values for the functions that return int
|
// The possible returned values for the functions that return int
|
||||||
#define RET_OK 0
|
#define RET_OK 0
|
||||||
@ -114,6 +115,15 @@ int waku_connect(void* ctx,
|
|||||||
WakuCallBack callback,
|
WakuCallBack callback,
|
||||||
void* userData);
|
void* userData);
|
||||||
|
|
||||||
|
int waku_get_peerids_from_peerstore(void* ctx,
|
||||||
|
WakuCallBack callback,
|
||||||
|
void* userData);
|
||||||
|
|
||||||
|
int waku_get_peerids_by_protocol(void* ctx,
|
||||||
|
const char* protocol,
|
||||||
|
WakuCallBack callback,
|
||||||
|
void* userData);
|
||||||
|
|
||||||
int waku_listen_addresses(void* ctx,
|
int waku_listen_addresses(void* ctx,
|
||||||
WakuCallBack callback,
|
WakuCallBack callback,
|
||||||
void* userData);
|
void* userData);
|
||||||
@ -150,6 +160,11 @@ int waku_get_my_enr(void* ctx,
|
|||||||
WakuCallBack callback,
|
WakuCallBack callback,
|
||||||
void* userData);
|
void* userData);
|
||||||
|
|
||||||
|
int waku_peer_exchange_request(void* ctx,
|
||||||
|
int numPeers,
|
||||||
|
WakuCallBack callback,
|
||||||
|
void* userData);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@ -514,6 +514,46 @@ proc waku_connect(
|
|||||||
|
|
||||||
return RET_OK
|
return RET_OK
|
||||||
|
|
||||||
|
proc waku_get_peerids_from_peerstore(
|
||||||
|
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
|
||||||
|
): cint {.dynlib, exportc.} =
|
||||||
|
checkLibwakuParams(ctx, callback, userData)
|
||||||
|
|
||||||
|
let connRes = waku_thread.sendRequestToWakuThread(
|
||||||
|
ctx,
|
||||||
|
RequestType.PEER_MANAGER,
|
||||||
|
PeerManagementRequest.createShared(
|
||||||
|
PeerManagementMsgType.GET_ALL_PEER_IDS
|
||||||
|
),
|
||||||
|
)
|
||||||
|
if connRes.isErr():
|
||||||
|
let msg = $connRes.error
|
||||||
|
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||||
|
return RET_ERR
|
||||||
|
|
||||||
|
let msg = $connRes.value
|
||||||
|
callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||||
|
return RET_OK
|
||||||
|
|
||||||
|
proc waku_get_peerids_by_protocol(
|
||||||
|
ctx: ptr WakuContext, protocol: cstring, callback: WakuCallBack, userData: pointer
|
||||||
|
): cint {.dynlib, exportc.} =
|
||||||
|
checkLibwakuParams(ctx, callback, userData)
|
||||||
|
|
||||||
|
let connRes = waku_thread.sendRequestToWakuThread(
|
||||||
|
ctx,
|
||||||
|
RequestType.PEER_MANAGER,
|
||||||
|
PeerManagementRequest.createGetPeerIdsByProtocolRequest($protocol),
|
||||||
|
)
|
||||||
|
if connRes.isErr():
|
||||||
|
let msg = $connRes.error
|
||||||
|
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||||
|
return RET_ERR
|
||||||
|
|
||||||
|
let msg = $connRes.value
|
||||||
|
callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||||
|
return RET_OK
|
||||||
|
|
||||||
proc waku_store_query(
|
proc waku_store_query(
|
||||||
ctx: ptr WakuContext,
|
ctx: ptr WakuContext,
|
||||||
jsonQuery: cstring,
|
jsonQuery: cstring,
|
||||||
@ -658,5 +698,21 @@ proc waku_stop_discv5(
|
|||||||
|
|
||||||
return RET_OK
|
return RET_OK
|
||||||
|
|
||||||
|
proc waku_peer_exchange_request(
|
||||||
|
ctx: ptr WakuContext, numPeers: uint64, callback: WakuCallBack, userData: pointer
|
||||||
|
): cint {.dynlib, exportc.} =
|
||||||
|
checkLibwakuParams(ctx, callback, userData)
|
||||||
|
|
||||||
|
let discoveredPeers = waku_thread.sendRequestToWakuThread(
|
||||||
|
ctx, RequestType.DISCOVERY, DiscoveryRequest.createPeerExchangeRequest(numPeers)
|
||||||
|
).valueOr:
|
||||||
|
let msg = $error
|
||||||
|
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||||
|
return RET_ERR
|
||||||
|
|
||||||
|
let msg = $discoveredPeers
|
||||||
|
callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||||
|
return RET_OK
|
||||||
|
|
||||||
### End of exported procs
|
### End of exported procs
|
||||||
################################################################################
|
################################################################################
|
||||||
|
|||||||
@ -4,7 +4,9 @@ import
|
|||||||
../../../../waku/factory/waku,
|
../../../../waku/factory/waku,
|
||||||
../../../../waku/discovery/waku_dnsdisc,
|
../../../../waku/discovery/waku_dnsdisc,
|
||||||
../../../../waku/discovery/waku_discv5,
|
../../../../waku/discovery/waku_discv5,
|
||||||
|
../../../../waku/waku_peer_exchange,
|
||||||
../../../../waku/waku_core/peers,
|
../../../../waku/waku_core/peers,
|
||||||
|
../../../../waku/node/waku_node,
|
||||||
../../../alloc
|
../../../alloc
|
||||||
|
|
||||||
type DiscoveryMsgType* = enum
|
type DiscoveryMsgType* = enum
|
||||||
@ -12,6 +14,7 @@ type DiscoveryMsgType* = enum
|
|||||||
UPDATE_DISCV5_BOOTSTRAP_NODES
|
UPDATE_DISCV5_BOOTSTRAP_NODES
|
||||||
START_DISCV5
|
START_DISCV5
|
||||||
STOP_DISCV5
|
STOP_DISCV5
|
||||||
|
PEER_EXCHANGE
|
||||||
|
|
||||||
type DiscoveryRequest* = object
|
type DiscoveryRequest* = object
|
||||||
operation: DiscoveryMsgType
|
operation: DiscoveryMsgType
|
||||||
@ -24,6 +27,9 @@ type DiscoveryRequest* = object
|
|||||||
## used in UPDATE_DISCV5_BOOTSTRAP_NODES
|
## used in UPDATE_DISCV5_BOOTSTRAP_NODES
|
||||||
nodes: cstring
|
nodes: cstring
|
||||||
|
|
||||||
|
## used in PEER_EXCHANGE
|
||||||
|
numPeers: uint64
|
||||||
|
|
||||||
proc createShared(
|
proc createShared(
|
||||||
T: type DiscoveryRequest,
|
T: type DiscoveryRequest,
|
||||||
op: DiscoveryMsgType,
|
op: DiscoveryMsgType,
|
||||||
@ -31,6 +37,7 @@ proc createShared(
|
|||||||
nameDnsServer: cstring,
|
nameDnsServer: cstring,
|
||||||
timeoutMs: cint,
|
timeoutMs: cint,
|
||||||
nodes: cstring,
|
nodes: cstring,
|
||||||
|
numPeers: uint64,
|
||||||
): ptr type T =
|
): ptr type T =
|
||||||
var ret = createShared(T)
|
var ret = createShared(T)
|
||||||
ret[].operation = op
|
ret[].operation = op
|
||||||
@ -38,6 +45,7 @@ proc createShared(
|
|||||||
ret[].nameDnsServer = nameDnsServer.alloc()
|
ret[].nameDnsServer = nameDnsServer.alloc()
|
||||||
ret[].timeoutMs = timeoutMs
|
ret[].timeoutMs = timeoutMs
|
||||||
ret[].nodes = nodes.alloc()
|
ret[].nodes = nodes.alloc()
|
||||||
|
ret[].numPeers = numPeers
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
proc createRetrieveBootstrapNodesRequest*(
|
proc createRetrieveBootstrapNodesRequest*(
|
||||||
@ -47,22 +55,28 @@ proc createRetrieveBootstrapNodesRequest*(
|
|||||||
nameDnsServer: cstring,
|
nameDnsServer: cstring,
|
||||||
timeoutMs: cint,
|
timeoutMs: cint,
|
||||||
): ptr type T =
|
): ptr type T =
|
||||||
return T.createShared(op, enrTreeUrl, nameDnsServer, timeoutMs, "")
|
return T.createShared(op, enrTreeUrl, nameDnsServer, timeoutMs, "", 0)
|
||||||
|
|
||||||
proc createUpdateBootstrapNodesRequest*(
|
proc createUpdateBootstrapNodesRequest*(
|
||||||
T: type DiscoveryRequest, op: DiscoveryMsgType, nodes: cstring
|
T: type DiscoveryRequest, op: DiscoveryMsgType, nodes: cstring
|
||||||
): ptr type T =
|
): ptr type T =
|
||||||
return T.createShared(op, "", "", 0, nodes)
|
return T.createShared(op, "", "", 0, nodes, 0)
|
||||||
|
|
||||||
proc createDiscV5StartRequest*(T: type DiscoveryRequest): ptr type T =
|
proc createDiscV5StartRequest*(T: type DiscoveryRequest): ptr type T =
|
||||||
return T.createShared(START_DISCV5, "", "", 0, "")
|
return T.createShared(START_DISCV5, "", "", 0, "", 0)
|
||||||
|
|
||||||
proc createDiscV5StopRequest*(T: type DiscoveryRequest): ptr type T =
|
proc createDiscV5StopRequest*(T: type DiscoveryRequest): ptr type T =
|
||||||
return T.createShared(STOP_DISCV5, "", "", 0, "")
|
return T.createShared(STOP_DISCV5, "", "", 0, "", 0)
|
||||||
|
|
||||||
|
proc createPeerExchangeRequest*(
|
||||||
|
T: type DiscoveryRequest, numPeers: uint64
|
||||||
|
): ptr type T =
|
||||||
|
return T.createShared(PEER_EXCHANGE, "", "", 0, "", numPeers)
|
||||||
|
|
||||||
proc destroyShared(self: ptr DiscoveryRequest) =
|
proc destroyShared(self: ptr DiscoveryRequest) =
|
||||||
deallocShared(self[].enrTreeUrl)
|
deallocShared(self[].enrTreeUrl)
|
||||||
deallocShared(self[].nameDnsServer)
|
deallocShared(self[].nameDnsServer)
|
||||||
|
deallocShared(self[].nodes)
|
||||||
deallocShared(self)
|
deallocShared(self)
|
||||||
|
|
||||||
proc retrieveBootstrapNodes(
|
proc retrieveBootstrapNodes(
|
||||||
@ -87,6 +101,11 @@ proc updateDiscv5BootstrapNodes(nodes: string, waku: ptr Waku): Result[void, str
|
|||||||
return err("error in updateDiscv5BootstrapNodes: " & $error)
|
return err("error in updateDiscv5BootstrapNodes: " & $error)
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
|
proc performPeerExchangeRequestTo(
|
||||||
|
numPeers: uint64, waku: ptr Waku
|
||||||
|
): Future[Result[int, string]] {.async.} =
|
||||||
|
return await waku.node.fetchPeerExchangePeers(numPeers)
|
||||||
|
|
||||||
proc process*(
|
proc process*(
|
||||||
self: ptr DiscoveryRequest, waku: ptr Waku
|
self: ptr DiscoveryRequest, waku: ptr Waku
|
||||||
): Future[Result[string, string]] {.async.} =
|
): Future[Result[string, string]] {.async.} =
|
||||||
@ -112,6 +131,11 @@ proc process*(
|
|||||||
of UPDATE_DISCV5_BOOTSTRAP_NODES:
|
of UPDATE_DISCV5_BOOTSTRAP_NODES:
|
||||||
updateDiscv5BootstrapNodes($self[].nodes, waku).isOkOr:
|
updateDiscv5BootstrapNodes($self[].nodes, waku).isOkOr:
|
||||||
return err($error)
|
return err($error)
|
||||||
|
|
||||||
return ok("discovery request processed correctly")
|
return ok("discovery request processed correctly")
|
||||||
|
of PEER_EXCHANGE:
|
||||||
|
let numValidPeers = (await performPeerExchangeRequestTo(self[].numPeers, waku)).valueOr:
|
||||||
|
return err("error calling performPeerExchangeRequestTo: " & $error)
|
||||||
|
return ok($numValidPeers)
|
||||||
|
|
||||||
return err("discovery request not handled")
|
return err("discovery request not handled")
|
||||||
|
|||||||
@ -1,20 +1,27 @@
|
|||||||
import std/[sequtils, strutils]
|
import std/[sequtils, strutils]
|
||||||
import chronicles, chronos, results
|
import chronicles, chronos, results
|
||||||
import ../../../../waku/factory/waku, ../../../../waku/node/waku_node, ../../../alloc
|
import
|
||||||
|
../../../../waku/factory/waku,
|
||||||
|
../../../../waku/node/waku_node,
|
||||||
|
../../../alloc,
|
||||||
|
../../../../waku/node/peer_manager
|
||||||
|
|
||||||
type PeerManagementMsgType* = enum
|
type PeerManagementMsgType* {.pure.} = enum
|
||||||
CONNECT_TO
|
CONNECT_TO
|
||||||
|
GET_ALL_PEER_IDS
|
||||||
|
GET_PEER_IDS_BY_PROTOCOL
|
||||||
|
|
||||||
type PeerManagementRequest* = object
|
type PeerManagementRequest* = object
|
||||||
operation: PeerManagementMsgType
|
operation: PeerManagementMsgType
|
||||||
peerMultiAddr: cstring
|
peerMultiAddr: cstring
|
||||||
dialTimeout: Duration
|
dialTimeout: Duration
|
||||||
|
protocol: cstring
|
||||||
|
|
||||||
proc createShared*(
|
proc createShared*(
|
||||||
T: type PeerManagementRequest,
|
T: type PeerManagementRequest,
|
||||||
op: PeerManagementMsgType,
|
op: PeerManagementMsgType,
|
||||||
peerMultiAddr: string,
|
peerMultiAddr = "",
|
||||||
dialTimeout: Duration,
|
dialTimeout = chronos.milliseconds(0), ## arbitrary Duration as not all ops needs dialTimeout
|
||||||
): ptr type T =
|
): ptr type T =
|
||||||
var ret = createShared(T)
|
var ret = createShared(T)
|
||||||
ret[].operation = op
|
ret[].operation = op
|
||||||
@ -22,8 +29,21 @@ proc createShared*(
|
|||||||
ret[].dialTimeout = dialTimeout
|
ret[].dialTimeout = dialTimeout
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
proc createGetPeerIdsByProtocolRequest*(
|
||||||
|
T: type PeerManagementRequest, protocol = ""
|
||||||
|
): ptr type T =
|
||||||
|
var ret = createShared(T)
|
||||||
|
ret[].operation = PeerManagementMsgType.GET_PEER_IDS_BY_PROTOCOL
|
||||||
|
ret[].protocol = protocol.alloc()
|
||||||
|
return ret
|
||||||
|
|
||||||
proc destroyShared(self: ptr PeerManagementRequest) =
|
proc destroyShared(self: ptr PeerManagementRequest) =
|
||||||
deallocShared(self[].peerMultiAddr)
|
if not isNil(self[].peerMultiAddr):
|
||||||
|
deallocShared(self[].peerMultiAddr)
|
||||||
|
|
||||||
|
if not isNil(self[].protocol):
|
||||||
|
deallocShared(self[].protocol)
|
||||||
|
|
||||||
deallocShared(self)
|
deallocShared(self)
|
||||||
|
|
||||||
proc connectTo(
|
proc connectTo(
|
||||||
@ -53,5 +73,14 @@ proc process*(
|
|||||||
let ret = waku.node.connectTo($self[].peerMultiAddr, self[].dialTimeout)
|
let ret = waku.node.connectTo($self[].peerMultiAddr, self[].dialTimeout)
|
||||||
if ret.isErr():
|
if ret.isErr():
|
||||||
return err(ret.error)
|
return err(ret.error)
|
||||||
|
of GET_ALL_PEER_IDS:
|
||||||
|
## returns a comma-separated string of peerIDs
|
||||||
|
let peerIDs = waku.node.peerManager.peerStore.peers().mapIt($it.peerId).join(",")
|
||||||
|
return ok(peerIDs)
|
||||||
|
of GET_PEER_IDS_BY_PROTOCOL:
|
||||||
|
## returns a comma-separated string of peerIDs that mount the given protocol
|
||||||
|
let (inPeers, outPeers) = waku.node.peerManager.connectedPeers($self[].protocol)
|
||||||
|
let allPeerIDs = inPeers & outPeers
|
||||||
|
return ok(allPeerIDs.mapIt(it.hex()).join(","))
|
||||||
|
|
||||||
return ok("")
|
return ok("")
|
||||||
|
|||||||
@ -479,6 +479,13 @@ proc getNumConnectedPeers*(
|
|||||||
## Returns the number of connected peers and subscribed to the passed pubsub topic.
|
## Returns the number of connected peers and subscribed to the passed pubsub topic.
|
||||||
## The 'gossipsub' atribute is defined in the GossipSub ref object.
|
## The 'gossipsub' atribute is defined in the GossipSub ref object.
|
||||||
|
|
||||||
|
if pubsubTopic == "":
|
||||||
|
## Return all the connected peers
|
||||||
|
var numConnPeers = 0
|
||||||
|
for k, v in w.gossipsub:
|
||||||
|
numConnPeers.inc(v.len)
|
||||||
|
return ok(numConnPeers)
|
||||||
|
|
||||||
if not w.gossipsub.hasKey(pubsubTopic):
|
if not w.gossipsub.hasKey(pubsubTopic):
|
||||||
return err(
|
return err(
|
||||||
"getNumConnectedPeers - there is no gossipsub peer for the given pubsub topic: " &
|
"getNumConnectedPeers - there is no gossipsub peer for the given pubsub topic: " &
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user