From faa231851e8c8290223b020adf27a8670541ed51 Mon Sep 17 00:00:00 2001 From: gabrielmer <101006718+gabrielmer@users.noreply.github.com> Date: Fri, 4 Apr 2025 12:10:25 +0300 Subject: [PATCH] feat: integrating libwaku's waku_get_connected_peers_info (#57) --- waku/common/peers.go | 97 +++++++++++++++++++++++++++++++++++ waku/nwaku.go | 31 +++++++++++ waku/peer_connections_test.go | 69 +++++++++++++++++++++++++ 3 files changed, 197 insertions(+) create mode 100644 waku/common/peers.go diff --git a/waku/common/peers.go b/waku/common/peers.go new file mode 100644 index 0000000..3a233ad --- /dev/null +++ b/waku/common/peers.go @@ -0,0 +1,97 @@ +package common + +import ( + "encoding/json" + "fmt" + "log" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" + "github.com/multiformats/go-multiaddr" +) + +type PeersData map[peer.ID]PeerInfo +type PeerInfo struct { + Protocols []protocol.ID `json:"protocols"` + Addresses []multiaddr.Multiaddr `json:"addresses"` +} + +func ParsePeerInfoFromJSON(jsonStr string) (PeersData, error) { + /* + We expect a JSON string with the format: + + { + : { + "protocols": [ + "protocol1", + "protocol2", + ... + ], + "addresses": [ + "address1", + "address2", + ... + ] + }, + : ... + } + */ + // Create a temporary map to unmarshal the JSON data + var rawMap map[string]struct { + Protocols []string `json:"protocols"` + Addresses []string `json:"addresses"` + } + + // Unmarshal the JSON string to our temporary map + if err := json.Unmarshal([]byte(jsonStr), &rawMap); err != nil { + return nil, fmt.Errorf("failed to unmarshal JSON: %w", err) + } + + // Create the result map + result := make(PeersData) + + // Process each peer entry + for peerIDStr, rawPeer := range rawMap { + // Parse the peer ID + peerID, err := peer.Decode(peerIDStr) + if err != nil { + return nil, fmt.Errorf("failed to decode peer ID %s: %w", peerIDStr, err) + } + + // Convert protocols to libp2pproto.ID + protocols := make([]protocol.ID, len(rawPeer.Protocols)) + for i, protoStr := range rawPeer.Protocols { + protocols[i] = protocol.ID(protoStr) + } + + // Convert addresses to multiaddr.Multiaddr + addresses := make([]multiaddr.Multiaddr, 0, len(rawPeer.Addresses)) + for _, addrStr := range rawPeer.Addresses { + addr, err := multiaddr.NewMultiaddr(addrStr) + if err != nil { + // Log the error but continue with other addresses + log.Printf("failed to parse multiaddress %s: %v", addrStr, err) + continue + } + addresses = append(addresses, addr) + } + + // Add the peer to the result map + result[peerID] = PeerInfo{ + Protocols: protocols, + Addresses: addresses, + } + } + + return result, nil +} + +// EncapsulatePeerID takes a peer.ID and adds a p2p component to all multiaddresses it receives +func EncapsulatePeerID(peerID peer.ID, addrs ...multiaddr.Multiaddr) []multiaddr.Multiaddr { + hostInfo, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", peerID.String())) + var result []multiaddr.Multiaddr + for _, addr := range addrs { + result = append(result, addr.Encapsulate(hostInfo)) + } + return result +} diff --git a/waku/nwaku.go b/waku/nwaku.go index 7dd6bd0..719fe3a 100644 --- a/waku/nwaku.go +++ b/waku/nwaku.go @@ -247,6 +247,10 @@ package waku waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) GoCallback, resp); } + static void cGoWakuGetConnectedPeersInfo(void* wakuCtx, void* resp) { + waku_get_connected_peers_info(wakuCtx, (WakuCallBack) GoCallback, resp); + } + static void cGoWakuLightpushPublish(void* wakuCtx, const char* pubSubTopic, const char* jsonWakuMessage, @@ -1286,6 +1290,33 @@ func (n *WakuNode) GetPeerIDsFromPeerStore() (peer.IDSlice, error) { return nil, fmt.Errorf("GetPeerIdsFromPeerStore: %s", errMsg) } +func (n *WakuNode) GetConnectedPeersInfo() (common.PeersData, error) { + wg := sync.WaitGroup{} + + var resp = C.allocResp(unsafe.Pointer(&wg)) + defer C.freeResp(resp) + + wg.Add(1) + C.cGoWakuGetConnectedPeersInfo(n.wakuCtx, resp) + wg.Wait() + if C.getRet(resp) == C.RET_OK { + jsonStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + if jsonStr == "" { + return nil, nil + } + + peerData, err := common.ParsePeerInfoFromJSON(jsonStr) + + if err != nil { + return nil, fmt.Errorf("GetConnectedPeersInfo - failed parsing JSON: %w", err) + } + + return peerData, nil + } + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return nil, fmt.Errorf("GetConnectedPeersInfo: %s", errMsg) +} + func (n *WakuNode) GetPeerIDsByProtocol(protocol libp2pproto.ID) (peer.IDSlice, error) { wg := sync.WaitGroup{} diff --git a/waku/peer_connections_test.go b/waku/peer_connections_test.go index 5a939a7..c18bcea 100644 --- a/waku/peer_connections_test.go +++ b/waku/peer_connections_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/stretchr/testify/require" + "github.com/waku-org/waku-go-bindings/waku/common" ) // Test node connect & disconnect peers @@ -144,6 +145,74 @@ func TestConnectUsingMultipleStaticPeers(t *testing.T) { Debug("Test passed: multiple nodes connected to a single node using Static Peers") } +func TestConnectedPeersInfo(t *testing.T) { + Debug("Starting TestPeerData") + + node1, err := StartWakuNode("node1", nil) + require.NoError(t, err, "Failed to start Node 1") + + node2, err := StartWakuNode("node2", nil) + require.NoError(t, err, "Failed to start Node 2") + + node3, err := StartWakuNode("node3", nil) + require.NoError(t, err, "Failed to start Node 3") + + addr1, err := node1.ListenAddresses() + require.NoError(t, err, "Failed to get listen addresses for Node 1") + + addr2, err := node2.ListenAddresses() + require.NoError(t, err, "Failed to get listen addresses for Node 2") + + addr3, err := node3.ListenAddresses() + require.NoError(t, err, "Failed to get listen addresses for Node 3") + + node4Config := DefaultWakuConfig + node4Config.Discv5Discovery = false + node4Config.Staticnodes = []string{addr1[0].String(), addr2[0].String(), addr3[0].String()} + + node4, err := StartWakuNode("node4", &node4Config) + require.NoError(t, err, "Failed to start Node 4") + + defer func() { + Debug("Stopping and destroying all Waku nodes") + node1.StopAndDestroy() + node2.StopAndDestroy() + node3.StopAndDestroy() + node4.StopAndDestroy() + }() + + Debug("Verifying connected peers for Node 4") + connectedPeers, err := node4.GetConnectedPeers() + require.NoError(t, err, "Failed to get connected peers for Node 4") + + node1PeerID, err := node1.PeerID() + require.NoError(t, err, "Failed to get PeerID for Node 1") + node2PeerID, err := node2.PeerID() + require.NoError(t, err, "Failed to get PeerID for Node 2") + node3PeerID, err := node3.PeerID() + require.NoError(t, err, "Failed to get PeerID for Node 3") + + require.True(t, slices.Contains(connectedPeers, node1PeerID), "Node 1 should be a peer of Node 4") + require.True(t, slices.Contains(connectedPeers, node2PeerID), "Node 2 should be a peer of Node 4") + require.True(t, slices.Contains(connectedPeers, node3PeerID), "Node 3 should be a peer of Node 4") + + peersInfo, err := node4.GetConnectedPeersInfo() + require.NoError(t, err, "Failed to get node 4's connected peers info") + + require.Equal(t, len(peersInfo), 3, "Expected Node 4's connected peers info to have 3 entries") + + node1DerivedAddr := common.EncapsulatePeerID(node1PeerID, peersInfo[node1PeerID].Addresses...) + require.Equal(t, node1DerivedAddr, addr1, "Expected Node1's derived address to equal its listen address") + + node2DerivedAddr := common.EncapsulatePeerID(node2PeerID, peersInfo[node2PeerID].Addresses...) + require.Equal(t, node2DerivedAddr, addr2, "Expected Node2's derived address to equal its listen address") + + node3DerivedAddr := common.EncapsulatePeerID(node3PeerID, peersInfo[node3PeerID].Addresses...) + require.Equal(t, node3DerivedAddr, addr3, "Expected Node3's derived address to equal its listen address") + + Debug("Test passed: peersInfoData is correct") +} + func TestDiscv5PeerMeshCount(t *testing.T) { Debug("Starting test to verify peer count in mesh using Discv5 after topic subscription")