From 673b4ac4ab19e824cf99e1ed9c5f53f309a2316f Mon Sep 17 00:00:00 2001 From: gabrielmer <101006718+gabrielmer@users.noreply.github.com> Date: Fri, 25 Oct 2024 11:03:07 +0300 Subject: [PATCH] feat_: supporting peer exchange with nwaku (#5983) --- third_party/nwaku | 2 +- wakuv2/nwaku.go | 87 ++++++++++++++++++++----- wakuv2/nwaku_test.go | 151 +++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 220 insertions(+), 20 deletions(-) diff --git a/third_party/nwaku b/third_party/nwaku index c5a825e20..de11e576f 160000 --- a/third_party/nwaku +++ b/third_party/nwaku @@ -1 +1 @@ -Subproject commit c5a825e206c1cb3e6e0cf8c01410527804cb76c4 +Subproject commit de11e576f4b69b63b4135cfb9549ef15cdc1ad34 diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index baa4e2e49..bcb32460a 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -208,6 +208,10 @@ package wakuv2 WAKU_CALL (waku_get_my_enr(ctx, (WakuCallBack) callback, resp) ); } + static void cGoWakuGetMyPeerId(void* ctx, void* resp) { + WAKU_CALL (waku_get_my_peerid(ctx, (WakuCallBack) callback, resp) ); + } + static void cGoWakuListPeersInMesh(void* ctx, char* pubSubTopic, void* resp) { WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) callback, resp) ); } @@ -216,6 +220,10 @@ package wakuv2 WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) callback, resp) ); } + static void cGoWakuGetPeerIdsFromPeerStore(void* wakuCtx, void* resp) { + WAKU_CALL (waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) callback, resp) ); + } + static void cGoWakuLightpushPublish(void* wakuCtx, const char* pubSubTopic, const char* jsonWakuMessage, @@ -380,8 +388,12 @@ type WakuConfig struct { Staticnodes []string `json:"staticnodes,omitempty"` Discv5BootstrapNodes []string `json:"discv5BootstrapNodes,omitempty"` Discv5Discovery bool `json:"discv5Discovery,omitempty"` + Discv5UdpPort uint16 `json:"discv5UdpPort,omitempty"` ClusterID uint16 `json:"clusterId,omitempty"` Shards []uint16 `json:"shards,omitempty"` + PeerExchange bool `json:"peerExchange,omitempty"` + PeerExchangeNode string `json:"peerExchangeNode,omitempty"` + TcpPort uint16 `json:"tcpPort,omitempty"` } // Waku represents a dark communication interface through the Ethereum @@ -497,9 +509,11 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, nwakuCfg *WakuCon return nil, err } - err = node.WakuRelaySubscribe(defaultPubsubTopic) - if err != nil { - return nil, err + if nwakuCfg.EnableRelay { + err = node.WakuRelaySubscribe(defaultPubsubTopic) + if err != nil { + return nil, err + } } node.WakuSetEventCallback() @@ -2191,10 +2205,23 @@ func (w *Waku) Clean() error { return nil } -// TODO-nwaku -func (w *Waku) PeerID() peer.ID { - // return w.node.Host().ID() - return "" +func (w *Waku) PeerID() (peer.ID, error) { + var resp = C.allocResp() + defer C.freeResp(resp) + C.cGoWakuGetMyPeerId(w.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + + peerIdStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + id, err := peer.Decode(peerIdStr) + if err != nil { + errMsg := "WakuGetMyPeerId - decoding peerId: %w" + return "", fmt.Errorf(errMsg, err) + } + return id, nil + } + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return "", fmt.Errorf("WakuGetMyPeerId: %s", errMsg) } // validatePrivateKey checks the format of the given private key. @@ -2609,18 +2636,21 @@ func wakuStoreQuery( return "", errors.New(errMsg) } -func (self *Waku) WakuPeerExchangeRequest(numPeers uint64) (string, error) { +func (self *Waku) WakuPeerExchangeRequest(numPeers uint64) (uint64, error) { var resp = C.allocResp() defer C.freeResp(resp) C.cGoWakuPeerExchangeQuery(self.wakuCtx, C.uint64_t(numPeers), resp) if C.getRet(resp) == C.RET_OK { - msg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return msg, nil + numRecvPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + numRecvPeers, err := strconv.ParseUint(numRecvPeersStr, 10, 64) + if err != nil { + return 0, err + } + return numRecvPeers, nil } - errMsg := "error WakuPeerExchangeRequest: " + - C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return "", errors.New(errMsg) + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return 0, fmt.Errorf("WakuPeerExchangeRequest: %s", errMsg) } func (self *Waku) WakuConnect(peerMultiAddr string, timeoutMs int) error { @@ -2753,6 +2783,34 @@ func (self *Waku) GetNumConnectedPeers(paramPubsubTopic ...string) (int, error) return 0, errors.New(errMsg) } +func (self *Waku) GetPeerIdsFromPeerStore() (peer.IDSlice, error) { + var resp = C.allocResp() + defer C.freeResp(resp) + C.cGoWakuGetPeerIdsFromPeerStore(self.wakuCtx, resp) + + if C.getRet(resp) == C.RET_OK { + peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + if peersStr == "" { + return peer.IDSlice{}, nil + } + // peersStr contains a comma-separated list of peer ids + itemsPeerIds := strings.Split(peersStr, ",") + + var peers peer.IDSlice + for _, peerId := range itemsPeerIds { + id, err := peer.Decode(peerId) + if err != nil { + return nil, fmt.Errorf("GetPeerIdsFromPeerStore - decoding peerId: %w", err) + } + peers = append(peers, id) + } + + return peers, nil + } + errMsg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return nil, fmt.Errorf("GetPeerIdsFromPeerStore: %s", errMsg) +} + func (self *Waku) GetPeerIdsByProtocol(protocol string) (peer.IDSlice, error) { var resp = C.allocResp() var cProtocol = C.CString(protocol) @@ -2773,8 +2831,7 @@ func (self *Waku) GetPeerIdsByProtocol(protocol string) (peer.IDSlice, error) { for _, p := range itemsPeerIds { id, err := peer.Decode(p) if err != nil { - errMsg := "GetPeerIdsByProtocol - error converting string to int: " + err.Error() - return nil, errors.New(errMsg) + return nil, fmt.Errorf("GetPeerIdsByProtocol - decoding peerId: %w", err) } peers = append(peers, id) } diff --git a/wakuv2/nwaku_test.go b/wakuv2/nwaku_test.go index fdf6f5458..417348e02 100644 --- a/wakuv2/nwaku_test.go +++ b/wakuv2/nwaku_test.go @@ -13,6 +13,7 @@ import ( "github.com/cenkalti/backoff/v3" "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/protocol/store" + "go.uber.org/zap" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" @@ -155,7 +156,7 @@ func parseNodes(rec []string) []*enode.Node { // IP_ADDRESS=$(hostname -I | awk '{print $1}'); // docker run \ // -p 61000:61000/tcp -p 8000:8000/udp -p 8646:8646/tcp harbor.status.im/wakuorg/nwaku:v0.33.0 \ -// --discv5-discovery=true --cluster-id=16 --log-level=DEBUG \ +// --discv5-discovery=true --cluster-id=16 --log-level=DEBUG --shard=64 --tcp-port=61000 \ // --nat=extip:${IP_ADDRESS} --discv5-udp-port=8000 --rest-address=0.0.0.0 --store --rest-port=8646 \ func TestBasicWakuV2(t *testing.T) { @@ -195,7 +196,6 @@ func TestBasicWakuV2(t *testing.T) { // Sanity check, not great, but it's probably helpful err = tt.RetryWithBackOff(func() error { - numConnected, err := w.GetNumConnectedPeers() if err != nil { return err @@ -323,10 +323,151 @@ func makeTestTree(domain string, nodes []*enode.Node, links []string) (*ethdnsdi return tree, url } -/* func TestPeerExchange(t *testing.T) { logger, err := zap.NewDevelopment() require.NoError(t, err) + + discV5NodeConfig := Config{ + UseThrottledPublish: true, + ClusterID: 16, + } + + // start node that will be discovered by PeerExchange + discV5NodeWakuConfig := WakuConfig{ + EnableRelay: true, + LogLevel: "DEBUG", + Discv5Discovery: true, + ClusterID: 16, + Shards: []uint16{64}, + PeerExchange: false, + Discv5UdpPort: 9001, + TcpPort: 60010, + } + + discV5Node, err := New(nil, "", &discV5NodeConfig, &discV5NodeWakuConfig, logger.Named("discV5Node"), nil, nil, nil, nil) + require.NoError(t, err) + require.NoError(t, discV5Node.Start()) + + time.Sleep(1 * time.Second) + + discV5NodePeerId, err := discV5Node.PeerID() + require.NoError(t, err) + + discv5NodeEnr, err := discV5Node.ENR() + require.NoError(t, err) + + pxServerConfig := Config{ + UseThrottledPublish: true, + ClusterID: 16, + } + + // start node which serves as PeerExchange server + pxServerWakuConfig := WakuConfig{ + EnableRelay: true, + LogLevel: "DEBUG", + Discv5Discovery: true, + ClusterID: 16, + Shards: []uint16{64}, + PeerExchange: true, + Discv5UdpPort: 9000, + Discv5BootstrapNodes: []string{discv5NodeEnr.String()}, + TcpPort: 60011, + } + + pxServerNode, err := New(nil, "", &pxServerConfig, &pxServerWakuConfig, logger.Named("pxServerNode"), nil, nil, nil, nil) + require.NoError(t, err) + require.NoError(t, pxServerNode.Start()) + + // Adding an extra second to make sure PX cache is not empty + time.Sleep(2 * time.Second) + + serverNodeMa, err := pxServerNode.ListenAddresses() + require.NoError(t, err) + require.NotNil(t, serverNodeMa) + + // Sanity check, not great, but it's probably helpful + options := func(b *backoff.ExponentialBackOff) { + b.MaxElapsedTime = 30 * time.Second + } + + // Check that pxServerNode has discV5Node in its Peer Store + err = tt.RetryWithBackOff(func() error { + peers, err := pxServerNode.GetPeerIdsFromPeerStore() + + if err != nil { + return err + } + + if slices.Contains(peers, discV5NodePeerId) { + return nil + } + + return errors.New("pxServer is missing the discv5 node in its peer store") + }, options) + require.NoError(t, err) + + pxClientConfig := Config{ + UseThrottledPublish: true, + ClusterID: 16, + } + + // start light node which uses PeerExchange to discover peers + pxClientWakuConfig := WakuConfig{ + EnableRelay: false, + LogLevel: "DEBUG", + Discv5Discovery: false, + ClusterID: 16, + Shards: []uint16{64}, + PeerExchange: true, + Discv5UdpPort: 9002, + TcpPort: 60012, + PeerExchangeNode: serverNodeMa[0].String(), + } + + lightNode, err := New(nil, "", &pxClientConfig, &pxClientWakuConfig, logger.Named("lightNode"), nil, nil, nil, nil) + require.NoError(t, err) + require.NoError(t, lightNode.Start()) + + time.Sleep(1 * time.Second) + + pxServerPeerId, err := pxServerNode.PeerID() + require.NoError(t, err) + + // Check that the light node discovered the discV5Node and has both nodes in its peer store + err = tt.RetryWithBackOff(func() error { + peers, err := lightNode.GetPeerIdsFromPeerStore() + if err != nil { + return err + } + + if slices.Contains(peers, discV5NodePeerId) && slices.Contains(peers, pxServerPeerId) { + return nil + } + return errors.New("lightnode is missing peers") + }, options) + require.NoError(t, err) + + // Now perform the PX request manually to see if it also works + err = tt.RetryWithBackOff(func() error { + numPeersReceived, err := lightNode.WakuPeerExchangeRequest(1) + if err != nil { + return err + } + + if numPeersReceived == 1 { + return nil + } + return errors.New("Peer Exchange is not returning peers") + }, options) + require.NoError(t, err) + + // Stop nodes + require.NoError(t, lightNode.Stop()) + require.NoError(t, pxServerNode.Stop()) + require.NoError(t, discV5Node.Stop()) + + /* logger, err := zap.NewDevelopment() + require.NoError(t, err) // start node which serve as PeerExchange server config := &Config{} config.ClusterID = 16 @@ -401,9 +542,11 @@ func TestPeerExchange(t *testing.T) { require.NoError(t, lightNode.Stop()) require.NoError(t, pxServerNode.Stop()) - require.NoError(t, discV5Node.Stop()) + require.NoError(t, discV5Node.Stop()) */ } +/* + func TestWakuV2Filter(t *testing.T) { t.Skip("flaky test")