diff --git a/waku/nwaku.go b/waku/nwaku.go index 695169d..7dd6bd0 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -235,6 +235,10 @@ package waku waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp); } + static void cGoWakuGetConnectedRelayPeers(void* ctx, char* pubSubTopic, void* resp) { + waku_relay_get_connected_peers(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp); + } + static void cGoWakuGetConnectedPeers(void* wakuCtx, void* resp) { waku_get_connected_peers(wakuCtx, (WakuCallBack) GoCallback, resp); } @@ -535,6 +539,60 @@ func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, err return 0, errors.New(errMsg) } +func (n *WakuNode) GetConnectedRelayPeers(optPubsubTopic ...string) (peer.IDSlice, error) { + + pubsubTopic := "" + if len(optPubsubTopic) > 0 { + pubsubTopic = optPubsubTopic[0] + } + + if n == nil { + err := errors.New("waku node is nil") + Error("Failed to get connected relay peers: %v", err) + return nil, err + } + + Debug("Fetching connected relay peers for pubsubTopic: %v, node: %v", pubsubTopic, n.nodeName) + + wg := sync.WaitGroup{} + var resp = C.allocResp(unsafe.Pointer(&wg)) + defer C.freeResp(resp) + + var cPubsubTopic = C.CString(pubsubTopic) + defer C.free(unsafe.Pointer(cPubsubTopic)) + + wg.Add(1) + C.cGoWakuGetConnectedRelayPeers(n.wakuCtx, cPubsubTopic, resp) + wg.Wait() + + if C.getRet(resp) == C.RET_OK { + peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + if peersStr == "" { + Debug("No connected relay peers found for pubsubTopic: %v, node: %v", pubsubTopic, n.nodeName) + return nil, nil + } + + peerIDs := strings.Split(peersStr, ",") + var peers peer.IDSlice + for _, peerID := range peerIDs { + id, err := peer.Decode(peerID) + if err != nil { + Error("Failed to decode peer ID for %v: %v", n.nodeName, err) + return nil, err + } + peers = append(peers, id) + } + + Debug("Successfully fetched connected relay peers for pubsubTopic: %v, node: %v count: %v", pubsubTopic, n.nodeName, len(peers)) + return peers, nil + } + + errMsg := "error GetConnectedRelayPeers: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + Error("Failed to get connected relay peers for pubsubTopic: %v:, node: %v. %v", pubsubTopic, n.nodeName, errMsg) + + return nil, errors.New(errMsg) +} + func (n *WakuNode) DisconnectPeerByID(peerID peer.ID) error { wg := sync.WaitGroup{} diff --git a/waku/relay_test.go b/waku/relay_test.go index 0315029..1a75209 100644 --- a/waku/relay_test.go +++ b/waku/relay_test.go @@ -2,6 +2,7 @@ package waku import ( "fmt" + "slices" "testing" "time" @@ -66,6 +67,104 @@ func TestVerifyNumConnectedRelayPeers(t *testing.T) { t.Logf("Successfully connected node2 and node3 to node1. Relay Peers: %d, Total Peers: %d", numRelayPeers, len(connectedPeersNode1)) } +func TestVerifyConnectedRelayPeers(t *testing.T) { + + customShard := uint16(65) + customPubsubTopic := FormatWakuRelayTopic(DEFAULT_CLUSTER_ID, customShard) + node1Cfg := DefaultWakuConfig + node1Cfg.Relay = true + node1Cfg.Shards = []uint16{64, customShard} + node1, err := StartWakuNode("node1", &node1Cfg) + if err != nil { + t.Fatalf("Failed to start node1: %v", err) + } + node2Cfg := DefaultWakuConfig + node2Cfg.Relay = true + node2, err := StartWakuNode("node2", &node2Cfg) + if err != nil { + t.Fatalf("Failed to start node2: %v", err) + } + + node2PeerID, err := node2.PeerID() + require.NoError(t, err, "Failed to get PeerID for Node 2") + + node3, err := StartWakuNode("node3", nil) + if err != nil { + t.Fatalf("Failed to start node3: %v", err) + } + + node4Cfg := DefaultWakuConfig + node4Cfg.Relay = true + node4Cfg.Shards = []uint16{customShard} + node4, err := StartWakuNode("node4", &node4Cfg) + if err != nil { + t.Fatalf("Failed to start node4: %v", err) + } + + node4PeerID, err := node4.PeerID() + require.NoError(t, err, "Failed to get PeerID for Node 4") + + defer func() { + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + node4.StopAndDestroy() + }() + + err = node2.ConnectPeer(node1) + if err != nil { + t.Fatalf("Failed to connect node2 to node1: %v", err) + } + + err = node3.ConnectPeer(node1) + if err != nil { + t.Fatalf("Failed to connect node3 to node1: %v", err) + } + + err = node4.ConnectPeer(node1) + if err != nil { + t.Fatalf("Failed to connect node4 to node1: %v", err) + } + + connectedPeersNode1, err := node1.GetConnectedPeers() + if err != nil { + t.Fatalf("Failed to get connected peers for node1: %v", err) + } + if len(connectedPeersNode1) != 3 { + t.Fatalf("Expected 2 connected peers on node1, but got %d", len(connectedPeersNode1)) + } + + relayPeers, err := node1.GetConnectedRelayPeers() + if err != nil { + t.Fatalf("Failed to get connected relay peers for node1: %v", err) + } + if len(relayPeers) != 2 { + t.Fatalf("Expected 2 relay peers on node1, but got %d", len(relayPeers)) + } + require.True(t, slices.Contains(relayPeers, node2PeerID), "Node 2 should be included in node 1's connected relay peers") + require.True(t, slices.Contains(relayPeers, node4PeerID), "Node 4 should be included in node 1's connected relay peers") + + relayPeersDefaultPubsub, err := node1.GetConnectedRelayPeers(DefaultPubsubTopic) + if err != nil { + t.Fatalf("Failed to get connected relay peers for node1 and default pubsub topic: %v", err) + } + if len(relayPeersDefaultPubsub) != 1 { + t.Fatalf("Expected 1 relay peers on node1 for default pubsub topic, but got %d", len(relayPeersDefaultPubsub)) + } + require.True(t, slices.Contains(relayPeersDefaultPubsub, node2PeerID), "Node 2 should be included in node 1's connected relay peers for the default pubsub topic") + + relayPeersCustomPubsub, err := node1.GetConnectedRelayPeers(customPubsubTopic) + if err != nil { + t.Fatalf("Failed to get connected relay peers for node1 and custom pubsub topic: %v", err) + } + if len(relayPeersCustomPubsub) != 1 { + t.Fatalf("Expected 1 relay peers on node1 for custom pubsub topic, but got %d", len(relayPeersCustomPubsub)) + } + require.True(t, slices.Contains(relayPeersCustomPubsub, node4PeerID), "Node 4 should be included in node 1's connected relay peers for the custom pubsub topic") + + t.Logf("Successfully connected node2, node3 and node4 to node1. Relay Peers: %d, Total Peers: %d", len(relayPeers), len(connectedPeersNode1)) +} + func TestRelayMessageTransmission(t *testing.T) { Debug("Starting TestRelayMessageTransmission") diff --git a/waku/test_data.go b/waku/test_data.go index 6f62400..920713c 100644 --- a/waku/test_data.go +++ b/waku/test_data.go @@ -10,7 +10,7 @@ import ( var DefaultWakuConfig common.WakuConfig var DefaultStoreQueryRequest common.StoreQueryRequest -var DEFAULT_CLUSTER_ID = 16 +var DEFAULT_CLUSTER_ID = uint16(16) func init() {