From db06f73007c7645527d36b7765dd3c876d3e0ffc Mon Sep 17 00:00:00 2001 From: gabrielmer <101006718+gabrielmer@users.noreply.github.com> Date: Fri, 4 Apr 2025 11:53:14 +0300 Subject: [PATCH] feat: integrating waku_relay_get_peers_in_mesh (#55) --- waku/nwaku.go | 54 +++++++++++++++++++++++- waku/peer_connections_test.go | 77 ++++++++++++++++++++++++++++++++++- 2 files changed, 128 insertions(+), 3 deletions(-) diff --git a/waku/nwaku.go b/waku/nwaku.go index b5f3eb6..695169d 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -223,6 +223,10 @@ package waku waku_ping_peer(ctx, peerAddr, timeoutMs, (WakuCallBack) GoCallback, resp); } + static void cGoWakuGetPeersInMesh(void* ctx, char* pubSubTopic, void* resp) { + waku_relay_get_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp); + } + static void cGoWakuGetNumPeersInMesh(void* ctx, char* pubSubTopic, void* resp) { waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp); } @@ -595,6 +599,54 @@ func (n *WakuNode) GetConnectedPeers() (peer.IDSlice, error) { return nil, errors.New(errMsg) } +func (n *WakuNode) GetPeersInMesh(pubsubTopic string) (peer.IDSlice, error) { + if n == nil { + err := errors.New("waku node is nil") + Error("Failed to get peers in mesh: %v", err) + return nil, err + } + + Debug("Fetching peers in mesh peers for pubsubTopic: %v, node: %v", pubsubTopic, n.nodeName) + + wg := sync.WaitGroup{} + var resp = C.allocResp(unsafe.Pointer(&wg)) + defer C.freeResp(resp) + + var cPubsubTopic = C.CString(pubsubTopic) + defer C.free(unsafe.Pointer(cPubsubTopic)) + + wg.Add(1) + C.cGoWakuGetPeersInMesh(n.wakuCtx, cPubsubTopic, resp) + wg.Wait() + + if C.getRet(resp) == C.RET_OK { + peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + if peersStr == "" { + Debug("No peers in mesh found for pubsubTopic: %v, node: %v", pubsubTopic, n.nodeName) + return nil, nil + } + + peerIDs := strings.Split(peersStr, ",") + var peers peer.IDSlice + for _, peerID := range peerIDs { + id, err := peer.Decode(peerID) + if err != nil { + Error("Failed to decode peer ID for %v: %v", n.nodeName, err) + return nil, err + } + peers = append(peers, id) + } + + Debug("Successfully fetched mesh peers for pubsubTopic: %v, node: %v count: %v", pubsubTopic, n.nodeName, len(peers)) + return peers, nil + } + + errMsg := "error GetPeersInMesh: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to get peers in mesh for pubsubTopic: %v:, node: %v. %v", pubsubTopic, n.nodeName, errMsg) + + return nil, errors.New(errMsg) +} + func (n *WakuNode) RelaySubscribe(pubsubTopic string) error { if pubsubTopic == "" { return errors.New("pubsub topic is empty") @@ -1242,7 +1294,7 @@ func (n *WakuNode) GetNumConnectedPeers() (int, error) { peers, err := n.GetConnectedPeers() if err != nil { - Error("Failed to fetch connected peers for %v %v ", n.nodeName, err) + Error("Failed to fetch connected peers for %v: %v ", n.nodeName, err) return 0, err } diff --git a/waku/peer_connections_test.go b/waku/peer_connections_test.go index 095a4ab..6d45c96 100644 --- a/waku/peer_connections_test.go +++ b/waku/peer_connections_test.go @@ -163,8 +163,6 @@ func TestDiscv5PeerMeshCount(t *testing.T) { node2, err := StartWakuNode("Node2", &node2Config) require.NoError(t, err, "Failed to start Node2") - require.NoError(t, err, "Failed to get ENR for Node2") - node3Config := DefaultWakuConfig node3Config.Discv5BootstrapNodes = []string{enrNode1.String()} node3Config.Relay = true @@ -212,6 +210,81 @@ func TestDiscv5PeerMeshCount(t *testing.T) { Debug("Test successfully verified peer count change after stopping Node3") } +func TestDiscv5PeerMeshIds(t *testing.T) { + Debug("Starting test to verify peers in mesh using Discv5 after topic subscription") + + node1Config := DefaultWakuConfig + node1Config.Relay = true + Debug("Creating Node1") + node1, err := StartWakuNode("Node1", &node1Config) + require.NoError(t, err, "Failed to start Node1") + + enrNode1, err := node1.ENR() + require.NoError(t, err, "Failed to get ENR for Node1") + + node2Config := DefaultWakuConfig + node2Config.Discv5BootstrapNodes = []string{enrNode1.String()} + node2Config.Relay = true + Debug("Creating Node2 with Node1 as Discv5 bootstrap") + node2, err := StartWakuNode("Node2", &node2Config) + require.NoError(t, err, "Failed to start Node2") + + node2PeerID, err := node2.PeerID() + require.NoError(t, err, "Failed to get PeerID for Node 2") + + node3Config := DefaultWakuConfig + node3Config.Discv5BootstrapNodes = []string{enrNode1.String()} + node3Config.Relay = true + + Debug("Creating Node3 with Node2 as Discv5 bootstrap") + node3, err := StartWakuNode("Node3", &node3Config) + require.NoError(t, err, "Failed to start Node3") + + node3PeerID, err := node3.PeerID() + require.NoError(t, err, "Failed to get PeerID for Node 3") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + }() + + defaultPubsubTopic := DefaultPubsubTopic + Debug("Default pubsub topic retrieved: %s", defaultPubsubTopic) + + err = SubscribeNodesToTopic([]*WakuNode{node1, node2, node3}, defaultPubsubTopic) + require.NoError(t, err, "Failed to subscribe all nodes to the topic") + + Debug("Waiting for nodes to auto-connect via Discv5") + err = WaitForAutoConnection([]*WakuNode{node1, node2, node3}) + require.NoError(t, err, "Nodes did not auto-connect within timeout") + + Debug("Fetching number of peers in mesh for Node1 before stopping Node3") + peersBefore, err := node1.GetPeersInMesh(defaultPubsubTopic) + require.NoError(t, err, "Failed to get number of peers in mesh for Node1 before stopping Node3") + + Debug("Total number of peers in mesh for Node1 before stopping Node3: %d", len(peersBefore)) + require.Equal(t, 2, len(peersBefore), "Expected Node1 to have exactly 2 peers in the mesh before stopping Node3") + require.True(t, slices.Contains(peersBefore, node2PeerID), "Node 2 should be included in node 1's mesh") + require.True(t, slices.Contains(peersBefore, node3PeerID), "Node 3 should be included in node 1's mesh") + + Debug("Stopping Node3") + node3.StopAndDestroy() + + Debug("Waiting for network update after Node3 stops") + time.Sleep(10 * time.Second) + + Debug("Fetching number of peers in mesh for Node1 after stopping Node3") + peersAfter, err := node1.GetPeersInMesh(defaultPubsubTopic) + require.NoError(t, err, "Failed to get number of peers in mesh for Node1 after stopping Node3") + + Debug("Total number of peers in mesh for Node1 after stopping Node3: %d", len(peersAfter)) + require.Equal(t, 1, len(peersAfter), "Expected Node1 to have exactly 1 peer in the mesh after stopping Node3") + require.True(t, slices.Contains(peersBefore, node2PeerID), "Node 2 should be included in node 1's mesh") + + Debug("Test successfully verified peer count change after stopping Node3") +} + func TestDiscv5DisabledNoPeersConnected(t *testing.T) { Debug("Starting TestDiscv5DisabledNoPeersConnected")