From 6954612f540915a2a9e6b08b6c2e948b3c260f06 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Sun, 8 Sep 2024 20:38:52 +0200 Subject: [PATCH] some minor progress to add nwaku in status-go --- cmd/ping-community/main.go | 10 +-- wakuv2/api_test.go | 3 +- wakuv2/nwaku.go | 148 ++++++++++++++++++++++++++++++++++++- wakuv2/waku_test.go | 43 ++++++----- 4 files changed, 179 insertions(+), 25 deletions(-) diff --git a/cmd/ping-community/main.go b/cmd/ping-community/main.go index 1b7b20593..7c4776a81 100644 --- a/cmd/ping-community/main.go +++ b/cmd/ping-community/main.go @@ -24,12 +24,12 @@ import ( "github.com/status-im/status-go/multiaccounts" "github.com/status-im/status-go/multiaccounts/accounts" "github.com/status-im/status-go/multiaccounts/settings" + "github.com/status-im/status-go/wakuv2" "github.com/status-im/status-go/logutils" "github.com/status-im/status-go/params" "github.com/status-im/status-go/protocol" "github.com/status-im/status-go/protocol/common" - "github.com/status-im/status-go/protocol/common/shard" "github.com/status-im/status-go/protocol/identity/alias" "github.com/status-im/status-go/protocol/protobuf" wakuextn "github.com/status-im/status-go/services/wakuext" @@ -48,8 +48,8 @@ var ( seedPhrase = flag.String("seed-phrase", "", "Seed phrase") version = flag.Bool("version", false, "Print version and dump configuration") communityID = flag.String("community-id", "", "The id of the community") - shardCluster = flag.Int("shard-cluster", shard.MainStatusShardCluster, "The shard cluster in which the of the community is published") - shardIndex = flag.Int("shard-index", shard.DefaultShardIndex, "The shard index in which the community is published") + shardCluster = flag.Int("shard-cluster", wakuv2.MainStatusShardCluster, "The shard cluster in which the of the community is published") + shardIndex = flag.Int("shard-index", wakuv2.DefaultShardIndex, "The shard index in which the community is published") chatID = flag.String("chat-id", "", "The id of the chat") dataDir = flag.String("dir", getDefaultDataDir(), "Directory used by node to store data") @@ -148,9 +148,9 @@ func main() { messenger := wakuextservice.Messenger() - var s *shard.Shard = nil + var s *wakuv2.Shard = nil if shardCluster != nil && shardIndex != nil { - s = &shard.Shard{ + s = &wakuv2.Shard{ Cluster: uint16(*shardCluster), Index: uint16(*shardIndex), } diff --git a/wakuv2/api_test.go b/wakuv2/api_test.go index ef8c7ab5e..7a060bf5f 100644 --- a/wakuv2/api_test.go +++ b/wakuv2/api_test.go @@ -24,7 +24,6 @@ import ( "golang.org/x/exp/maps" - "github.com/status-im/status-go/protocol/common/shard" "github.com/status-im/status-go/wakuv2/common" ) @@ -57,7 +56,7 @@ func TestMultipleTopicCopyInNewMessageFilter(t *testing.T) { } found := false - candidates := w.filters.GetWatchersByTopic(shard.DefaultShardPubsubTopic(), t1) + candidates := w.filters.GetWatchersByTopic(DefaultShardPubsubTopic(), t1) for _, f := range candidates { if maps.Equal(f.ContentTopics, common.NewTopicSet(crit.ContentTopics)) { found = true diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index c536e51fe..a79f740fc 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -188,6 +188,10 @@ package wakuv2 WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) callback, resp) ); } + void cGoWakuGetNumConnectedPeers(void* ctx, char* pubSubTopic, void* resp) { + WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) callback, resp) ); + } + void cGoWakuLightpushPublish(void* wakuCtx, const char* pubSubTopic, const char* jsonWakuMessage, @@ -214,6 +218,26 @@ package wakuv2 resp)); } + void cGoWakuPeerExchangeQuery(void* wakuCtx, + uint64_t numPeers, + void* resp) { + + WAKU_CALL (waku_peer_exchange_request(wakuCtx, + numPeers, + (WakuCallBack) callback, + resp)); + } + + void cGoWakuGetPeerIdsByProtocol(void* wakuCtx, + const char* protocol, + void* resp) { + + WAKU_CALL (waku_get_peerids_by_protocol(wakuCtx, + protocol, + (WakuCallBack) callback, + resp)); + } + */ import "C" @@ -226,6 +250,8 @@ import ( "encoding/json" "errors" "fmt" + "io" + "net/http" "os" "os/signal" "runtime" @@ -246,6 +272,7 @@ import ( "github.com/jellydator/ttlcache/v3" "github.com/libp2p/go-libp2p/core/metrics" "github.com/libp2p/go-libp2p/core/peer" + peermod "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr" @@ -2092,6 +2119,20 @@ func (self *NWaku) wakuStoreQuery( return "", errors.New(errMsg) } +func (self *NWaku) WakuPeerExchangeRequest(numPeers uint64) (string, error) { + var resp = C.allocResp() + defer C.freeResp(resp) + + C.cGoWakuPeerExchangeQuery(self.wakuCtx, C.uint64_t(numPeers), resp) + if C.getRet(resp) == C.RET_OK { + msg := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return msg, nil + } + errMsg := "error WakuPeerExchangeRequest: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return "", errors.New(errMsg) +} + func (self *NWaku) WakuConnect(peerMultiAddr string, timeoutMs int) error { var resp = C.allocResp() var cPeerMultiAddr = C.CString(peerMultiAddr) @@ -2144,7 +2185,11 @@ func (self *NWaku) ENR() (*enode.Node, error) { if C.getRet(resp) == C.RET_OK { enrStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) - return enode.Parse(enode.ValidSchemes, enrStr) + n, err := enode.Parse(enode.ValidSchemes, enrStr) + if err != nil { + return nil, err + } + return n, nil } errMsg := "error WakuGetMyENR: " + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) @@ -2174,6 +2219,59 @@ func (self *NWaku) ListPeersInMesh(pubsubTopic string) (int, error) { return 0, errors.New(errMsg) } +func (self *NWaku) GetNumConnectedPeers(pubsubTopic string) (int, error) { + var resp = C.allocResp() + var cPubsubTopic = C.CString(pubsubTopic) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cPubsubTopic)) + + C.cGoWakuGetNumConnectedPeers(self.wakuCtx, cPubsubTopic, resp) + + if C.getRet(resp) == C.RET_OK { + numPeersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + numPeers, err := strconv.Atoi(numPeersStr) + if err != nil { + fmt.Println(":", err) + errMsg := "ListPeersInMesh - error converting string to int: " + err.Error() + return 0, errors.New(errMsg) + } + return numPeers, nil + } + errMsg := "error ListPeersInMesh: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return 0, errors.New(errMsg) +} + +func (self *NWaku) GetPeerIdsByProtocol(protocol string) (peer.IDSlice, error) { + var resp = C.allocResp() + var cProtocol = C.CString(protocol) + defer C.freeResp(resp) + defer C.free(unsafe.Pointer(cProtocol)) + + C.cGoWakuGetPeerIdsByProtocol(self.wakuCtx, cProtocol, resp) + + if C.getRet(resp) == C.RET_OK { + peersStr := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + // peersStr contains a comma-separated list of peer ids + itemsPeerIds := strings.Split(peersStr, ",") + + var peers peer.IDSlice + for _, peer := range itemsPeerIds { + id, err := peermod.Decode(peer) + if err != nil { + errMsg := "ListPeersInMesh - error converting string to int: " + err.Error() + return nil, errors.New(errMsg) + } + peers = append(peers, id) + } + + return peers, nil + } + errMsg := "error GetPeerIdsByProtocol: " + + C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp))) + return nil, errors.New(errMsg) +} + // func main() { // config := WakuConfig{ @@ -2424,3 +2522,51 @@ func New(nodeKey *ecdsa.PrivateKey, // return waku, nil } + +type NwakuInfo struct { + ListenAddresses []string `json:"listenAddresses"` + EnrUri string `json:"enrUri"` +} + +func GetNwakuInfo(host *string, port *int) (NwakuInfo, error) { + nwakuRestPort := 8645 + if port != nil { + nwakuRestPort = *port + } + envNwakuRestPort := os.Getenv("NWAKU_REST_PORT") + if envNwakuRestPort != "" { + v, err := strconv.Atoi(envNwakuRestPort) + if err != nil { + return NwakuInfo{}, err + } + nwakuRestPort = v + } + + nwakuRestHost := "localhost" + if host != nil { + nwakuRestHost = *host + } + envNwakuRestHost := os.Getenv("NWAKU_REST_HOST") + if envNwakuRestHost != "" { + nwakuRestHost = envNwakuRestHost + } + + resp, err := http.Get(fmt.Sprintf("http://%s:%d/debug/v1/info", nwakuRestHost, nwakuRestPort)) + if err != nil { + return NwakuInfo{}, err + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return NwakuInfo{}, err + } + + var data NwakuInfo + err = json.Unmarshal(body, &data) + if err != nil { + return NwakuInfo{}, err + } + + return data, nil +} diff --git a/wakuv2/waku_test.go b/wakuv2/waku_test.go index 9792e5082..1495d1beb 100644 --- a/wakuv2/waku_test.go +++ b/wakuv2/waku_test.go @@ -352,7 +352,10 @@ func TestPeerExchange(t *testing.T) { config.EnableDiscV5 = true config.EnablePeerExchangeServer = false config.EnablePeerExchangeClient = false - config.DiscV5BootstrapNodes = []string{pxServerNode.node.ENR().String()} + enr, err := pxServerNode.ENR() + require.NoError(t, err) + + config.DiscV5BootstrapNodes = []string{enr.String()} discV5Node, err := New(nil, "", config, logger.Named("discV5Node"), nil, nil, nil, nil) require.NoError(t, err) require.NoError(t, discV5Node.Start()) @@ -360,7 +363,7 @@ func TestPeerExchange(t *testing.T) { time.Sleep(1 * time.Second) // start light node which use PeerExchange to discover peers - enrNodes := []*enode.Node{pxServerNode.node.ENR()} + enrNodes := []*enode.Node{enr} tree, url := makeTestTree("n", enrNodes, nil) resolver := mapResolver(tree.ToTXT("n")) @@ -385,17 +388,23 @@ func TestPeerExchange(t *testing.T) { // in light client mode,the peer will be closed via `w.node.Host().Network().ClosePeer(peerInfo.ID)` // after invoking identifyAndConnect, instead, we should check the peerStore, peers from peerStore // won't get deleted especially if they are statically added. - if len(lightNode.node.Host().Peerstore().Peers()) == 2 { + numConnected, err := lightNode.GetNumConnectedPeers() + if err != nil { + return err + } + if numConnected == 2 { return nil } return errors.New("no peers discovered") }, options) require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) + _, cancel := context.WithCancel(context.Background()) defer cancel() - require.NoError(t, discV5Node.node.PeerExchange().Request(ctx, 1)) - require.Error(t, discV5Node.node.PeerExchange().Request(ctx, 1)) //should fail due to rate limit + _, err = discV5Node.WakuPeerExchangeRequest(1) + require.NoError(t, err) + _, err = discV5Node.WakuPeerExchangeRequest(1) + require.Error(t, err) //should fail due to rate limit require.NoError(t, lightNode.Stop()) require.NoError(t, pxServerNode.Stop()) @@ -430,7 +439,7 @@ func TestWakuV2Filter(t *testing.T) { // Sanity check, not great, but it's probably helpful err = tt.RetryWithBackOff(func() error { - peers, err := w.node.PeerManager().FilterPeersByProto(nil, nil, filter.FilterSubscribeID_v20beta1) + peers, err := w.GetPeerIdsByProtocol(string(filter.FilterSubscribeID_v20beta1)) if err != nil { return err } @@ -466,20 +475,20 @@ func TestWakuV2Filter(t *testing.T) { time.Sleep(5 * time.Second) // Ensure there is at least 1 active filter subscription - subscriptions := w.node.FilterLightnode().Subscriptions() + subscriptions := w.FilterLightnode().Subscriptions() require.Greater(t, len(subscriptions), 0) messages := filter.Retrieve() require.Len(t, messages, 1) // Mock peers going down - _, err = w.node.FilterLightnode().UnsubscribeWithSubscription(w.ctx, subscriptions[0]) + _, err = w.FilterLightnode().UnsubscribeWithSubscription(w.ctx, subscriptions[0]) require.NoError(t, err) time.Sleep(10 * time.Second) // Ensure there is at least 1 active filter subscription - subscriptions = w.node.FilterLightnode().Subscriptions() + subscriptions = w.FilterLightnode().Subscriptions() require.Greater(t, len(subscriptions), 0) // Ensure that messages are retrieved with a fresh sub @@ -551,11 +560,11 @@ func TestWakuV2Store(t *testing.T) { }() // Connect the two nodes directly - peer2Addr := w2.node.ListenAddresses()[0].String() - err = w1.node.DialPeer(context.Background(), peer2Addr) + peer2Addr := w2.ListenAddresses()[0].String() + err = w1.DialPeer(context.Background(), peer2Addr) require.NoError(t, err) - waitForPeerConnection(t, w2.node.Host().ID(), w1PeersCh) + waitForPeerConnection(t, w2.Host().ID(), w1PeersCh) // Create a filter for the second node to catch messages filter := &common.Filter{ @@ -591,7 +600,7 @@ func TestWakuV2Store(t *testing.T) { // Query the second node's store for the message _, envelopeCount, err := w1.Query( context.Background(), - w2.node.Host().ID(), + w2.Host().ID(), store.FilterCriteria{ TimeStart: proto.Int64((timestampInSeconds - int64(marginInSeconds)) * int64(time.Second)), TimeEnd: proto.Int64((timestampInSeconds + int64(marginInSeconds)) * int64(time.Second)), @@ -729,7 +738,7 @@ func TestLightpushRateLimit(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() //Connect the relay peer and full node - err = w1.node.DialPeer(ctx, w0.node.ListenAddresses()[0].String()) + err = w1.DialPeer(ctx, w0.ListenAddresses()[0].String()) require.NoError(t, err) err = tt.RetryWithBackOff(func() error { @@ -756,9 +765,9 @@ func TestLightpushRateLimit(t *testing.T) { }() //Use this instead of DialPeer to make sure the peer is added to PeerStore and can be selected for Lighpush - w2.node.AddDiscoveredPeer(w1.PeerID(), w1.node.ListenAddresses(), wps.Static, w1.cfg.DefaultShardedPubsubTopics, w1.node.ENR(), true) + w2.AddDiscoveredPeer(w1.PeerID(), w1.ListenAddresses(), wps.Static, w1.cfg.DefaultShardedPubsubTopics, w1.node.ENR(), true) - waitForPeerConnectionWithTimeout(t, w2.node.Host().ID(), w1PeersCh, 5*time.Second) + waitForPeerConnectionWithTimeout(t, w2.Host().ID(), w1PeersCh, 5*time.Second) event := make(chan common.EnvelopeEvent, 10) w2.SubscribeEnvelopeEvents(event)