feat: integrating get_connected_relay_peers (#56)

This commit is contained in:
gabrielmer 2025-04-04 11:53:36 +03:00 committed by GitHub
parent db06f73007
commit ca59aaf856
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 158 additions and 1 deletions

View File

@ -235,6 +235,10 @@ package waku
waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp);
}
static void cGoWakuGetConnectedRelayPeers(void* ctx, char* pubSubTopic, void* resp) {
waku_relay_get_connected_peers(ctx, pubSubTopic, (WakuCallBack) GoCallback, resp);
}
static void cGoWakuGetConnectedPeers(void* wakuCtx, void* resp) {
waku_get_connected_peers(wakuCtx, (WakuCallBack) GoCallback, resp);
}
@ -535,6 +539,60 @@ func (n *WakuNode) GetNumConnectedRelayPeers(optPubsubTopic ...string) (int, err
return 0, errors.New(errMsg)
}
func (n *WakuNode) GetConnectedRelayPeers(optPubsubTopic ...string) (peer.IDSlice, error) {
pubsubTopic := ""
if len(optPubsubTopic) > 0 {
pubsubTopic = optPubsubTopic[0]
}
if n == nil {
err := errors.New("waku node is nil")
Error("Failed to get connected relay peers: %v", err)
return nil, err
}
Debug("Fetching connected relay peers for pubsubTopic: %v, node: %v", pubsubTopic, n.nodeName)
wg := sync.WaitGroup{}
var resp = C.allocResp(unsafe.Pointer(&wg))
defer C.freeResp(resp)
var cPubsubTopic = C.CString(pubsubTopic)
defer C.free(unsafe.Pointer(cPubsubTopic))
wg.Add(1)
C.cGoWakuGetConnectedRelayPeers(n.wakuCtx, cPubsubTopic, resp)
wg.Wait()
if C.getRet(resp) == C.RET_OK {
peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
if peersStr == "" {
Debug("No connected relay peers found for pubsubTopic: %v, node: %v", pubsubTopic, n.nodeName)
return nil, nil
}
peerIDs := strings.Split(peersStr, ",")
var peers peer.IDSlice
for _, peerID := range peerIDs {
id, err := peer.Decode(peerID)
if err != nil {
Error("Failed to decode peer ID for %v: %v", n.nodeName, err)
return nil, err
}
peers = append(peers, id)
}
Debug("Successfully fetched connected relay peers for pubsubTopic: %v, node: %v count: %v", pubsubTopic, n.nodeName, len(peers))
return peers, nil
}
errMsg := "error GetConnectedRelayPeers: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
Error("Failed to get connected relay peers for pubsubTopic: %v:, node: %v. %v", pubsubTopic, n.nodeName, errMsg)
return nil, errors.New(errMsg)
}
func (n *WakuNode) DisconnectPeerByID(peerID peer.ID) error {
wg := sync.WaitGroup{}

View File

@ -2,6 +2,7 @@ package waku
import (
"fmt"
"slices"
"testing"
"time"
@ -66,6 +67,104 @@ func TestVerifyNumConnectedRelayPeers(t *testing.T) {
t.Logf("Successfully connected node2 and node3 to node1. Relay Peers: %d, Total Peers: %d", numRelayPeers, len(connectedPeersNode1))
}
func TestVerifyConnectedRelayPeers(t *testing.T) {
customShard := uint16(65)
customPubsubTopic := FormatWakuRelayTopic(DEFAULT_CLUSTER_ID, customShard)
node1Cfg := DefaultWakuConfig
node1Cfg.Relay = true
node1Cfg.Shards = []uint16{64, customShard}
node1, err := StartWakuNode("node1", &node1Cfg)
if err != nil {
t.Fatalf("Failed to start node1: %v", err)
}
node2Cfg := DefaultWakuConfig
node2Cfg.Relay = true
node2, err := StartWakuNode("node2", &node2Cfg)
if err != nil {
t.Fatalf("Failed to start node2: %v", err)
}
node2PeerID, err := node2.PeerID()
require.NoError(t, err, "Failed to get PeerID for Node 2")
node3, err := StartWakuNode("node3", nil)
if err != nil {
t.Fatalf("Failed to start node3: %v", err)
}
node4Cfg := DefaultWakuConfig
node4Cfg.Relay = true
node4Cfg.Shards = []uint16{customShard}
node4, err := StartWakuNode("node4", &node4Cfg)
if err != nil {
t.Fatalf("Failed to start node4: %v", err)
}
node4PeerID, err := node4.PeerID()
require.NoError(t, err, "Failed to get PeerID for Node 4")
defer func() {
node1.StopAndDestroy()
node2.StopAndDestroy()
node3.StopAndDestroy()
node4.StopAndDestroy()
}()
err = node2.ConnectPeer(node1)
if err != nil {
t.Fatalf("Failed to connect node2 to node1: %v", err)
}
err = node3.ConnectPeer(node1)
if err != nil {
t.Fatalf("Failed to connect node3 to node1: %v", err)
}
err = node4.ConnectPeer(node1)
if err != nil {
t.Fatalf("Failed to connect node4 to node1: %v", err)
}
connectedPeersNode1, err := node1.GetConnectedPeers()
if err != nil {
t.Fatalf("Failed to get connected peers for node1: %v", err)
}
if len(connectedPeersNode1) != 3 {
t.Fatalf("Expected 2 connected peers on node1, but got %d", len(connectedPeersNode1))
}
relayPeers, err := node1.GetConnectedRelayPeers()
if err != nil {
t.Fatalf("Failed to get connected relay peers for node1: %v", err)
}
if len(relayPeers) != 2 {
t.Fatalf("Expected 2 relay peers on node1, but got %d", len(relayPeers))
}
require.True(t, slices.Contains(relayPeers, node2PeerID), "Node 2 should be included in node 1's connected relay peers")
require.True(t, slices.Contains(relayPeers, node4PeerID), "Node 4 should be included in node 1's connected relay peers")
relayPeersDefaultPubsub, err := node1.GetConnectedRelayPeers(DefaultPubsubTopic)
if err != nil {
t.Fatalf("Failed to get connected relay peers for node1 and default pubsub topic: %v", err)
}
if len(relayPeersDefaultPubsub) != 1 {
t.Fatalf("Expected 1 relay peers on node1 for default pubsub topic, but got %d", len(relayPeersDefaultPubsub))
}
require.True(t, slices.Contains(relayPeersDefaultPubsub, node2PeerID), "Node 2 should be included in node 1's connected relay peers for the default pubsub topic")
relayPeersCustomPubsub, err := node1.GetConnectedRelayPeers(customPubsubTopic)
if err != nil {
t.Fatalf("Failed to get connected relay peers for node1 and custom pubsub topic: %v", err)
}
if len(relayPeersCustomPubsub) != 1 {
t.Fatalf("Expected 1 relay peers on node1 for custom pubsub topic, but got %d", len(relayPeersCustomPubsub))
}
require.True(t, slices.Contains(relayPeersCustomPubsub, node4PeerID), "Node 4 should be included in node 1's connected relay peers for the custom pubsub topic")
t.Logf("Successfully connected node2, node3 and node4 to node1. Relay Peers: %d, Total Peers: %d", len(relayPeers), len(connectedPeersNode1))
}
func TestRelayMessageTransmission(t *testing.T) {
Debug("Starting TestRelayMessageTransmission")

View File

@ -10,7 +10,7 @@ import (
var DefaultWakuConfig common.WakuConfig
var DefaultStoreQueryRequest common.StoreQueryRequest
var DEFAULT_CLUSTER_ID = 16
var DEFAULT_CLUSTER_ID = uint16(16)
func init() {