feat: integrating waku_relay_get_peers_in_mesh (#55)

This commit is contained in:
gabrielmer 2025-04-04 11:53:14 +03:00 committed by GitHub
parent b4f68a640c
commit db06f73007
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 128 additions and 3 deletions

View File

@ -223,6 +223,10 @@ package waku
waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) GoCallback, resp);
}
static void cGoWakuGetPeersInMesh(void* ctx, char* pubSubTopic, void* resp) {
waku_relay_get_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp);
}
static void cGoWakuGetNumPeersInMesh(void* ctx, char* pubSubTopic, void* resp) {
waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp);
}
@ -595,6 +599,54 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) {
return nil, errors.New(errMsg)
}
func (n *WakuNode) GetPeersInMesh(pubsubTopic string) (peer.IDSlice, error) {
if n == nil {
err := errors.New("waku node is nil")
Error("Failed to get peers in mesh: %v", err)
return nil, err
}
Debug("Fetching peers in mesh peers for pubsubTopic: %v, node: %v", pubsubTopic, n.nodeName)
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
var cPubsubTopic = C.CString(pubsubTopic)
defer C.free(unsafe.Pointer(cPubsubTopic))
wg.Add(1)
C.cGoWakuGetPeersInMesh(n.wakuCtx, cPubsubTopic, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
if peersStr == "" {
Debug("No peers in mesh found for pubsubTopic: %v, node: %v", pubsubTopic, n.nodeName)
return nil, nil
}
peerIDs := strings.Split(peersStr, ",")
var peers peer.IDSlice
for _, peerID := range peerIDs {
id, err := peer.Decode(peerID)
if err != nil {
Error("Failed to decode peer ID for %v: %v", n.nodeName, err)
return nil, err
}
peers = append(peers, id)
}
Debug("Successfully fetched mesh peers for pubsubTopic: %v, node: %v count: %v", pubsubTopic, n.nodeName, len(peers))
return peers, nil
}
errMsg := "error GetPeersInMesh: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
Error("Failed to get peers in mesh for pubsubTopic: %v:, node: %v. %v", pubsubTopic, n.nodeName, errMsg)
return nil, errors.New(errMsg)
}
func (n *WakuNode) RelaySubscribe(pubsubTopic string) error {
if pubsubTopic == "" {
return errors.New("pubsub topic is empty")
@ -1242,7 +1294,7 @@ func (n *WakuNode) GetNumConnectedPeers() (int, error) {
peers, err := n.GetConnectedPeers()
if err != nil {
Error("Failed to fetch connected peers for %v %v ", n.nodeName, err)
Error("Failed to fetch connected peers for %v: %v ", n.nodeName, err)
return 0, err
}

View File

@ -163,8 +163,6 @@ func TestDiscv5PeerMeshCount(t *testing.T) {
node2, err := StartWakuNode("Node2", &node2Config)
require.NoError(t, err, "Failed to start Node2")
require.NoError(t, err, "Failed to get ENR for Node2")
node3Config := DefaultWakuConfig
node3Config.Discv5BootstrapNodes = []string{enrNode1.String()}
node3Config.Relay = true
@ -212,6 +210,81 @@ func TestDiscv5PeerMeshCount(t *testing.T) {
Debug("Test successfully verified peer count change after stopping Node3")
}
func TestDiscv5PeerMeshIds(t *testing.T) {
Debug("Starting test to verify peers in mesh using Discv5 after topic subscription")
node1Config := DefaultWakuConfig
node1Config.Relay = true
Debug("Creating Node1")
node1, err := StartWakuNode("Node1", &node1Config)
require.NoError(t, err, "Failed to start Node1")
enrNode1, err := node1.ENR()
require.NoError(t, err, "Failed to get ENR for Node1")
node2Config := DefaultWakuConfig
node2Config.Discv5BootstrapNodes = []string{enrNode1.String()}
node2Config.Relay = true
Debug("Creating Node2 with Node1 as Discv5 bootstrap")
node2, err := StartWakuNode("Node2", &node2Config)
require.NoError(t, err, "Failed to start Node2")
node2PeerID, err := node2.PeerID()
require.NoError(t, err, "Failed to get PeerID for Node 2")
node3Config := DefaultWakuConfig
node3Config.Discv5BootstrapNodes = []string{enrNode1.String()}
node3Config.Relay = true
Debug("Creating Node3 with Node2 as Discv5 bootstrap")
node3, err := StartWakuNode("Node3", &node3Config)
require.NoError(t, err, "Failed to start Node3")
node3PeerID, err := node3.PeerID()
require.NoError(t, err, "Failed to get PeerID for Node 3")
defer func() {
Debug("Stopping and destroying all Waku nodes")
node1.StopAndDestroy()
node2.StopAndDestroy()
}()
defaultPubsubTopic := DefaultPubsubTopic
Debug("Default pubsub topic retrieved: %s", defaultPubsubTopic)
err = SubscribeNodesToTopic([]*WakuNode{node1, node2, node3}, defaultPubsubTopic)
require.NoError(t, err, "Failed to subscribe all nodes to the topic")
Debug("Waiting for nodes to auto-connect via Discv5")
err = WaitForAutoConnection([]*WakuNode{node1, node2, node3})
require.NoError(t, err, "Nodes did not auto-connect within timeout")
Debug("Fetching number of peers in mesh for Node1 before stopping Node3")
peersBefore, err := node1.GetPeersInMesh(defaultPubsubTopic)
require.NoError(t, err, "Failed to get number of peers in mesh for Node1 before stopping Node3")
Debug("Total number of peers in mesh for Node1 before stopping Node3: %d", len(peersBefore))
require.Equal(t, 2, len(peersBefore), "Expected Node1 to have exactly 2 peers in the mesh before stopping Node3")
require.True(t, slices.Contains(peersBefore, node2PeerID), "Node 2 should be included in node 1's mesh")
require.True(t, slices.Contains(peersBefore, node3PeerID), "Node 3 should be included in node 1's mesh")
Debug("Stopping Node3")
node3.StopAndDestroy()
Debug("Waiting for network update after Node3 stops")
time.Sleep(10 * time.Second)
Debug("Fetching number of peers in mesh for Node1 after stopping Node3")
peersAfter, err := node1.GetPeersInMesh(defaultPubsubTopic)
require.NoError(t, err, "Failed to get number of peers in mesh for Node1 after stopping Node3")
Debug("Total number of peers in mesh for Node1 after stopping Node3: %d", len(peersAfter))
require.Equal(t, 1, len(peersAfter), "Expected Node1 to have exactly 1 peer in the mesh after stopping Node3")
require.True(t, slices.Contains(peersBefore, node2PeerID), "Node 2 should be included in node 1's mesh")
Debug("Test successfully verified peer count change after stopping Node3")
}
func TestDiscv5DisabledNoPeersConnected(t *testing.T) {
Debug("Starting TestDiscv5DisabledNoPeersConnected")