From 9315de8d8aee5f24df35ff2feb72faf7d8fc294e Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 7 Nov 2023 17:59:02 +0530 Subject: [PATCH 1/3] feat: discv5 filter out nodes that have empty waku capabilities (#865) --- waku/v2/discv5/discover.go | 17 ++++++++--------- waku/v2/discv5/discover_test.go | 20 ++++++++++++++++++++ 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index 2a5162db..ba0a7c36 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -21,6 +21,7 @@ import ( "go.uber.org/zap" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/p2p/nat" ) @@ -250,23 +251,21 @@ func (d *DiscoveryV5) Stop() { }) } -/* func isWakuNode(node *enode.Node) bool { - enrField := new(utils.WakuEnrBitfield) - if err := node.Record().Load(enr.WithEntry(utils.WakuENRField, &enrField)); err != nil { + enrField := new(wenr.WakuEnrBitfield) + if err := node.Record().Load(enr.WithEntry(wenr.WakuENRField, &enrField)); err != nil { if !enr.IsNotFound(err) { - utils.Logger().Named("discv5").Error("could not retrieve port for enr ", zap.Any("node", node)) + utils.Logger().Named("discv5").Error("could not retrieve waku2 ENR field for enr ", zap.Any("node", node)) } return false } if enrField != nil { - return *enrField != uint8(0) + return *enrField != uint8(0) // #RFC 31 requirement } return false } -*/ func (d *DiscoveryV5) evaluateNode() func(node *enode.Node) bool { return func(node *enode.Node) bool { @@ -274,10 +273,10 @@ func (d *DiscoveryV5) evaluateNode() func(node *enode.Node) bool { return false } - // TODO: consider node filtering based on ENR; we do not filter based on ENR in the first waku discv5 beta stage - /*if !isWakuNode(node) { + // node filtering based on ENR; we do not filter based on ENR in the first waku discv5 beta stage + if !isWakuNode(node) { return false - }*/ + } _, err := wenr.EnodeToPeerInfo(node) diff --git a/waku/v2/discv5/discover_test.go b/waku/v2/discv5/discover_test.go index 0759b699..ba906b03 100644 --- a/waku/v2/discv5/discover_test.go +++ b/waku/v2/discv5/discover_test.go @@ -101,6 +101,7 @@ func extractIP(addr multiaddr.Multiaddr) (*net.TCPAddr, error) { func TestDiscV5(t *testing.T) { // Host1 <-> Host2 <-> Host3 + // Host4(No waku capabilities) <-> Host2 // H1 host1, _, prvKey1 := createHost(t) @@ -138,9 +139,22 @@ func TestDiscV5(t *testing.T) { require.NoError(t, err) d3.SetHost(host3) + // H4 doesn't have any Waku capabilities + host4, _, prvKey4 := createHost(t) + ip4, _ := extractIP(host2.Addrs()[0]) + udpPort4, err := tests.FindFreeUDPPort(t, "127.0.0.1", 3) + require.NoError(t, err) + l4, err := newLocalnode(prvKey4, ip4, udpPort4, 0, nil, utils.Logger()) + require.NoError(t, err) + peerconn4 := peermanager.NewTestPeerDiscoverer() + d4, err := NewDiscoveryV5(prvKey4, l4, peerconn4, prometheus.DefaultRegisterer, utils.Logger(), WithUDPPort(uint(udpPort4)), WithBootnodes([]*enode.Node{d2.localnode.Node()})) + require.NoError(t, err) + d2.SetHost(host2) + defer d1.Stop() defer d2.Stop() defer d3.Stop() + defer d4.Stop() err = d1.Start(context.Background()) require.NoError(t, err) @@ -151,9 +165,13 @@ func TestDiscV5(t *testing.T) { err = d3.Start(context.Background()) require.NoError(t, err) + err = d4.Start(context.Background()) + require.NoError(t, err) + time.Sleep(2 * time.Second) // Wait for nodes to be discovered require.True(t, peerconn3.HasPeer(host1.ID()) && peerconn3.HasPeer(host2.ID())) + require.False(t, peerconn3.HasPeer(host4.ID())) //host4 should not be discoverable, rather filtered out. d3.Stop() peerconn3.Clear() @@ -165,4 +183,6 @@ func TestDiscV5(t *testing.T) { time.Sleep(2 * time.Second) // Wait for nodes to be discovered require.True(t, peerconn3.HasPeer(host1.ID()) && peerconn3.HasPeer(host2.ID())) + require.False(t, peerconn3.HasPeer(host4.ID())) //host4 should not be discoverable, rather filtered out. + } From 2616d43c9d521f1913cdace8ac2437d82237b46c Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 7 Nov 2023 20:26:48 +0530 Subject: [PATCH 2/3] chore: update relay REST and RPC API's and fix unit tests (#866) * update relay REST API's to remove duplicate message cache, fix relay tests and admin test * chore: enable REST and RPC unit tests * update lightpush rest api to match yaml * fix: filter rest unit test failures * skipping legacy filter tests * chore: add unit tests for autosharding relay REST API, fix success response (#868) --- Makefile | 4 +- cmd/waku/node.go | 9 +- cmd/waku/server/rest/filter.go | 10 +- cmd/waku/server/rest/filter_test.go | 2 +- cmd/waku/server/rest/lightpush_rest.go | 2 +- cmd/waku/server/rest/relay.go | 130 ++++------------- cmd/waku/server/rest/relay_test.go | 188 +++++++++++++++++++++---- cmd/waku/server/rest/waku_rest.go | 27 ++-- cmd/waku/server/rest/waku_rest_test.go | 2 +- cmd/waku/server/rpc/admin_test.go | 3 +- cmd/waku/server/rpc/filter_test.go | 3 + cmd/waku/server/rpc/relay.go | 69 ++------- cmd/waku/server/rpc/relay_test.go | 3 +- waku/v2/protocol/relay/config.go | 8 ++ waku/v2/protocol/relay/waku_relay.go | 29 +++- 15 files changed, 271 insertions(+), 218 deletions(-) diff --git a/Makefile b/Makefile index e68348b1..8d64a429 100644 --- a/Makefile +++ b/Makefile @@ -77,10 +77,10 @@ lint-full: @golangci-lint run ./... --config=./.golangci.full.yaml --deadline=5m test-with-race: - ${GOBIN} test -race -timeout 300s ./waku/... + ${GOBIN} test -race -timeout 300s ./waku/... ./cmd/waku/server/... test: - ${GOBIN} test -timeout 300s ./waku/... -coverprofile=${GO_TEST_OUTFILE}.tmp -coverpkg ./... + ${GOBIN} test -timeout 300s ./waku/... ./cmd/waku/server/... -coverprofile=${GO_TEST_OUTFILE}.tmp -coverpkg ./... cat ${GO_TEST_OUTFILE}.tmp | grep -v ".pb.go" > ${GO_TEST_OUTFILE} ${GOBIN} tool cover -html=${GO_TEST_OUTFILE} -o ${GO_HTML_COV} diff --git a/cmd/waku/node.go b/cmd/waku/node.go index 2129a1e5..56e3f83a 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -444,7 +444,14 @@ func Execute(options NodeOptions) error { var restServer *rest.WakuRest if options.RESTServer.Enable { wg.Add(1) - restServer = rest.NewWakuRest(wakuNode, options.RESTServer.Address, options.RESTServer.Port, options.PProf, options.RESTServer.Admin, options.RESTServer.RelayCacheCapacity, options.RESTServer.FilterCacheCapacity, logger) + restConfig := rest.RestConfig{Address: options.RESTServer.Address, + Port: uint(options.RESTServer.Port), + EnablePProf: options.PProf, + EnableAdmin: options.RESTServer.Admin, + RelayCacheCapacity: uint(options.RESTServer.RelayCacheCapacity), + FilterCacheCapacity: uint(options.RESTServer.FilterCacheCapacity)} + + restServer = rest.NewWakuRest(wakuNode, restConfig, logger) restServer.Start(ctx, &wg) } diff --git a/cmd/waku/server/rest/filter.go b/cmd/waku/server/rest/filter.go index d6719e03..02e1e681 100644 --- a/cmd/waku/server/rest/filter.go +++ b/cmd/waku/server/rest/filter.go @@ -240,11 +240,13 @@ func (s *FilterService) unsubscribeGetMessage(result *filter.WakuFilterPushResul var peerIds string ind := 0 for _, entry := range result.Errors() { - s.log.Error("can't unsubscribe for ", zap.String("peer", entry.PeerID.String()), zap.Error(entry.Err)) - if ind != 0 { - peerIds += ", " + if entry.Err != nil { + s.log.Error("can't unsubscribe for ", zap.String("peer", entry.PeerID.String()), zap.Error(entry.Err)) + if ind != 0 { + peerIds += ", " + } + peerIds += entry.PeerID.String() } - peerIds += entry.PeerID.String() ind++ } if peerIds != "" { diff --git a/cmd/waku/server/rest/filter_test.go b/cmd/waku/server/rest/filter_test.go index 52a5af3a..09d873e2 100644 --- a/cmd/waku/server/rest/filter_test.go +++ b/cmd/waku/server/rest/filter_test.go @@ -365,7 +365,7 @@ func TestFilterGetMessages(t *testing.T) { router.ServeHTTP(rr, req) require.Equal(t, http.StatusNotFound, rr.Code) require.Equal(t, - fmt.Sprintf("Not subscribed to pubsubTopic:%s contentTopic: %s", notSubscibredPubsubTopic, contentTopic), + fmt.Sprintf("not subscribed to pubsubTopic:%s contentTopic: %s", notSubscibredPubsubTopic, contentTopic), rr.Body.String(), ) } diff --git a/cmd/waku/server/rest/lightpush_rest.go b/cmd/waku/server/rest/lightpush_rest.go index 8500df4a..e33f4b7a 100644 --- a/cmd/waku/server/rest/lightpush_rest.go +++ b/cmd/waku/server/rest/lightpush_rest.go @@ -71,6 +71,6 @@ func (serv *LightpushService) postMessagev1(w http.ResponseWriter, req *http.Req _, err = w.Write([]byte(err.Error())) serv.log.Error("writing response", zap.Error(err)) } else { - w.WriteHeader(http.StatusOK) + writeErrOrResponse(w, err, true) } } diff --git a/cmd/waku/server/rest/relay.go b/cmd/waku/server/rest/relay.go index 968ed338..d80e0073 100644 --- a/cmd/waku/server/rest/relay.go +++ b/cmd/waku/server/rest/relay.go @@ -1,13 +1,9 @@ package rest import ( - "context" "encoding/json" - "errors" "net/http" - "net/url" "strings" - "sync" "github.com/go-chi/chi/v5" "github.com/waku-org/go-waku/cmd/waku/server" @@ -26,29 +22,21 @@ const routeRelayV1AutoMessages = "/relay/v1/auto/messages" // RelayService represents the REST service for WakuRelay type RelayService struct { - node *node.WakuNode - cancel context.CancelFunc + node *node.WakuNode log *zap.Logger - messages map[string][]*pb.WakuMessage - cacheCapacity int - messagesMutex sync.RWMutex - - runner *runnerService + cacheCapacity uint } // NewRelayService returns an instance of RelayService -func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *zap.Logger) *RelayService { +func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity uint, log *zap.Logger) *RelayService { s := &RelayService{ node: node, log: log.Named("relay"), cacheCapacity: cacheCapacity, - messages: make(map[string][]*pb.WakuMessage), } - s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope) - m.Post(routeRelayV1Subscriptions, s.postV1Subscriptions) m.Delete(routeRelayV1Subscriptions, s.deleteV1Subscriptions) m.Get(routeRelayV1Messages, s.getV1Messages) @@ -65,46 +53,6 @@ func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *za return s } -func (r *RelayService) addEnvelope(envelope *protocol.Envelope) { - r.messagesMutex.Lock() - defer r.messagesMutex.Unlock() - - if _, ok := r.messages[envelope.PubsubTopic()]; !ok { - return - } - - // Keep a specific max number of messages per topic - if len(r.messages[envelope.PubsubTopic()]) >= r.cacheCapacity { - r.messages[envelope.PubsubTopic()] = r.messages[envelope.PubsubTopic()][1:] - } - - r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message()) -} - -// Start starts the RelayService -func (r *RelayService) Start(ctx context.Context) { - ctx, cancel := context.WithCancel(ctx) - r.cancel = cancel - - r.messagesMutex.Lock() - // Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these - for _, topic := range r.node.Relay().Topics() { - r.log.Info("adding topic handler for existing subscription", zap.String("topic", topic)) - r.messages[topic] = []*pb.WakuMessage{} - } - r.messagesMutex.Unlock() - - r.runner.Start(ctx) -} - -// Stop stops the RelayService -func (r *RelayService) Stop() { - if r.cancel == nil { - return - } - r.cancel() -} - func (r *RelayService) deleteV1Subscriptions(w http.ResponseWriter, req *http.Request) { var topics []string decoder := json.NewDecoder(req.Body) @@ -114,16 +62,11 @@ func (r *RelayService) deleteV1Subscriptions(w http.ResponseWriter, req *http.Re } defer req.Body.Close() - r.messagesMutex.Lock() - defer r.messagesMutex.Unlock() - var err error for _, topic := range topics { err = r.node.Relay().Unsubscribe(req.Context(), protocol.NewContentFilter(topic)) if err != nil { r.log.Error("unsubscribing from topic", zap.String("topic", strings.Replace(strings.Replace(topic, "\n", "", -1), "\r", "", -1)), zap.Error(err)) - } else { - delete(r.messages, topic) } } @@ -140,26 +83,29 @@ func (r *RelayService) postV1Subscriptions(w http.ResponseWriter, req *http.Requ defer req.Body.Close() var err error - var sub *relay.Subscription - var subs []*relay.Subscription + var successCnt int var topicToSubscribe string for _, topic := range topics { if topic == "" { - subs, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(relay.DefaultWakuTopic)) topicToSubscribe = relay.DefaultWakuTopic } else { - subs, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topic)) topicToSubscribe = topic } + _, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topicToSubscribe), relay.WithCacheSize(r.cacheCapacity)) + if err != nil { r.log.Error("subscribing to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err)) - } else { - sub = subs[0] - sub.Unsubscribe() - r.messagesMutex.Lock() - r.messages[topic] = []*pb.WakuMessage{} - r.messagesMutex.Unlock() + continue } + successCnt++ + } + + // on partial subscribe failure + if successCnt > 0 && err != nil { + r.log.Error("partial subscribe failed", zap.Error(err)) + // on partial failure + writeResponse(w, err, http.StatusOK) + return } writeErrOrResponse(w, err, true) @@ -170,20 +116,22 @@ func (r *RelayService) getV1Messages(w http.ResponseWriter, req *http.Request) { if topic == "" { return } - - r.messagesMutex.Lock() - defer r.messagesMutex.Unlock() - - if _, ok := r.messages[topic]; !ok { + //TODO: Update the API to also take a contentTopic since relay now supports filtering based on contentTopic as well. + sub, err := r.node.Relay().GetSubscriptionWithPubsubTopic(topic, "") + if err != nil { w.WriteHeader(http.StatusNotFound) - _, err := w.Write([]byte("not subscribed to topic")) + _, err = w.Write([]byte("not subscribed to topic")) r.log.Error("writing response", zap.Error(err)) return } + var response []*pb.WakuMessage + select { + case msg := <-sub.Ch: + response = append(response, msg.Message()) + default: + break + } - response := r.messages[topic] - - r.messages[topic] = []*pb.WakuMessage{} writeErrOrResponse(w, nil, response) } @@ -205,11 +153,6 @@ func (r *RelayService) postV1Message(w http.ResponseWriter, req *http.Request) { topic = relay.DefaultWakuTopic } - if !r.node.Relay().IsSubscribed(topic) { - writeErrOrResponse(w, errors.New("not subscribed to pubsubTopic"), nil) - return - } - if err := server.AppendRLNProof(r.node, message); err != nil { writeErrOrResponse(w, err, nil) return @@ -250,7 +193,7 @@ func (r *RelayService) postV1AutoSubscriptions(w http.ResponseWriter, req *http. defer req.Body.Close() var err error - _, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", cTopics...)) + _, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", cTopics...), relay.WithCacheSize(r.cacheCapacity)) if err != nil { r.log.Error("subscribing to topics", zap.Strings("contentTopics", cTopics), zap.Error(err)) } @@ -260,27 +203,14 @@ func (r *RelayService) postV1AutoSubscriptions(w http.ResponseWriter, req *http. _, err := w.Write([]byte(err.Error())) r.log.Error("writing response", zap.Error(err)) } else { - w.WriteHeader(http.StatusOK) + writeErrOrResponse(w, err, true) } } func (r *RelayService) getV1AutoMessages(w http.ResponseWriter, req *http.Request) { - cTopic := chi.URLParam(req, "contentTopic") - if cTopic == "" { - w.WriteHeader(http.StatusBadRequest) - _, err := w.Write([]byte("contentTopic is required")) - r.log.Error("writing response", zap.Error(err)) - return - } - cTopic, err := url.QueryUnescape(cTopic) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - _, err = w.Write([]byte("invalid contentTopic format")) - r.log.Error("writing response", zap.Error(err)) - return - } + cTopic := topicFromPath(w, req, "contentTopic", r.log) sub, err := r.node.Relay().GetSubscription(cTopic) if err != nil { w.WriteHeader(http.StatusNotFound) diff --git a/cmd/waku/server/rest/relay_test.go b/cmd/waku/server/rest/relay_test.go index 3fe994b9..7f10c930 100644 --- a/cmd/waku/server/rest/relay_test.go +++ b/cmd/waku/server/rest/relay_test.go @@ -7,6 +7,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "net/url" "testing" "time" @@ -15,8 +16,8 @@ import ( "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/utils" ) @@ -34,7 +35,6 @@ func TestPostV1Message(t *testing.T) { router := chi.NewRouter() _ = makeRelayService(t, router) - msg := &pb.WakuMessage{ Payload: []byte{1, 2, 3}, ContentTopic: "abc", @@ -54,10 +54,7 @@ func TestPostV1Message(t *testing.T) { func TestRelaySubscription(t *testing.T) { router := chi.NewRouter() - d := makeRelayService(t, router) - - go d.Start(context.Background()) - defer d.Stop() + r := makeRelayService(t, router) // Wait for node to start time.Sleep(500 * time.Millisecond) @@ -67,48 +64,42 @@ func TestRelaySubscription(t *testing.T) { require.NoError(t, err) rr := httptest.NewRecorder() - req, _ := http.NewRequest(http.MethodPost, "/relay/v1/subscriptions", bytes.NewReader(topicsJSONBytes)) + req, _ := http.NewRequest(http.MethodPost, routeRelayV1Subscriptions, bytes.NewReader(topicsJSONBytes)) router.ServeHTTP(rr, req) require.Equal(t, http.StatusOK, rr.Code) require.Equal(t, "true", rr.Body.String()) // Test max messages in subscription now := utils.GetUnixEpoch() - d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+1), now, "test")) - d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+2), now, "test")) - d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+3), now, "test")) + _, err = r.node.Relay().Publish(context.Background(), + tests.CreateWakuMessage("test", now+1), relay.WithPubSubTopic("test")) + require.NoError(t, err) + _, err = r.node.Relay().Publish(context.Background(), + tests.CreateWakuMessage("test", now+2), relay.WithPubSubTopic("test")) + require.NoError(t, err) + + _, err = r.node.Relay().Publish(context.Background(), + tests.CreateWakuMessage("test", now+3), relay.WithPubSubTopic("test")) + require.NoError(t, err) // Wait for the messages to be processed - time.Sleep(500 * time.Millisecond) - - require.Len(t, d.messages["test"], 3) - - d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+4), now+4, "test")) - - time.Sleep(500 * time.Millisecond) - - // Should only have 3 messages - require.Len(t, d.messages["test"], 3) + time.Sleep(5 * time.Millisecond) // Test deletion rr = httptest.NewRecorder() - req, _ = http.NewRequest(http.MethodDelete, "/relay/v1/subscriptions", bytes.NewReader(topicsJSONBytes)) + req, _ = http.NewRequest(http.MethodDelete, routeRelayV1Subscriptions, bytes.NewReader(topicsJSONBytes)) router.ServeHTTP(rr, req) require.Equal(t, http.StatusOK, rr.Code) require.Equal(t, "true", rr.Body.String()) - require.Len(t, d.messages["test"], 0) - } func TestRelayGetV1Messages(t *testing.T) { router := chi.NewRouter() + router1 := chi.NewRouter() serviceA := makeRelayService(t, router) - go serviceA.Start(context.Background()) - defer serviceA.Stop() - serviceB := makeRelayService(t, router) - go serviceB.Start(context.Background()) - defer serviceB.Stop() + + serviceB := makeRelayService(t, router1) hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", serviceB.node.Host().ID().Pretty())) require.NoError(t, err) @@ -129,7 +120,7 @@ func TestRelayGetV1Messages(t *testing.T) { require.NoError(t, err) rr := httptest.NewRecorder() - req, _ := http.NewRequest(http.MethodPost, "/relay/v1/subscriptions", bytes.NewReader(topicsJSONBytes)) + req, _ := http.NewRequest(http.MethodPost, routeRelayV1Subscriptions, bytes.NewReader(topicsJSONBytes)) router.ServeHTTP(rr, req) require.Equal(t, http.StatusOK, rr.Code) @@ -165,9 +156,144 @@ func TestRelayGetV1Messages(t *testing.T) { rr = httptest.NewRecorder() req, _ = http.NewRequest(http.MethodGet, "/relay/v1/messages/test", bytes.NewReader([]byte{})) - router.ServeHTTP(rr, req) + router1.ServeHTTP(rr, req) + require.Equal(t, http.StatusNotFound, rr.Code) +} + +func TestPostAutoV1Message(t *testing.T) { + router := chi.NewRouter() + + _ = makeRelayService(t, router) + msg := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: "/toychat/1/huilong/proto", + Version: 0, + Timestamp: utils.GetUnixEpoch(), + } + msgJSONBytes, err := json.Marshal(msg) + require.NoError(t, err) + + rr := httptest.NewRecorder() + req, _ := http.NewRequest(http.MethodPost, routeRelayV1AutoMessages, bytes.NewReader(msgJSONBytes)) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) +} + +func TestRelayAutoSubUnsub(t *testing.T) { + router := chi.NewRouter() + + r := makeRelayService(t, router) + + // Wait for node to start + time.Sleep(500 * time.Millisecond) + + cTopic1 := "/toychat/1/huilong/proto" + + cTopics := []string{cTopic1} + topicsJSONBytes, err := json.Marshal(cTopics) + require.NoError(t, err) + + rr := httptest.NewRecorder() + req, _ := http.NewRequest(http.MethodPost, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes)) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) + require.Equal(t, "true", rr.Body.String()) + + // Test publishing messages after subscription + now := utils.GetUnixEpoch() + _, err = r.node.Relay().Publish(context.Background(), + tests.CreateWakuMessage(cTopic1, now+1)) + require.NoError(t, err) + + // Wait for the messages to be processed + time.Sleep(5 * time.Millisecond) + + // Test deletion + rr = httptest.NewRecorder() + req, _ = http.NewRequest(http.MethodDelete, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes)) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) + require.Equal(t, "true", rr.Body.String()) + + cTopics = append(cTopics, "test") + topicsJSONBytes, err = json.Marshal(cTopics) + require.NoError(t, err) + + rr = httptest.NewRecorder() + req, _ = http.NewRequest(http.MethodPost, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes)) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusBadRequest, rr.Code) + +} + +func TestRelayGetV1AutoMessages(t *testing.T) { + router := chi.NewRouter() + router1 := chi.NewRouter() + + serviceA := makeRelayService(t, router) + + serviceB := makeRelayService(t, router1) + + hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", serviceB.node.Host().ID().Pretty())) + require.NoError(t, err) + + var addr multiaddr.Multiaddr + for _, a := range serviceB.node.Host().Addrs() { + addr = a.Encapsulate(hostInfo) + break + } + err = serviceA.node.DialPeerWithMultiAddress(context.Background(), addr) + require.NoError(t, err) + + // Wait for the dial to complete + time.Sleep(1 * time.Second) + + cTopic1 := "/toychat/1/huilong/proto" + + cTopics := []string{cTopic1} + topicsJSONBytes, err := json.Marshal(cTopics) + require.NoError(t, err) + + rr := httptest.NewRecorder() + req, _ := http.NewRequest(http.MethodPost, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes)) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) + require.Equal(t, "true", rr.Body.String()) + + // Wait for the subscription to be started + time.Sleep(1 * time.Second) + + msg := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: cTopic1, + Version: 0, + Timestamp: utils.GetUnixEpoch(), + } + msgJsonBytes, err := json.Marshal(msg) + require.NoError(t, err) + + rr = httptest.NewRecorder() + req, _ = http.NewRequest(http.MethodPost, routeRelayV1AutoMessages, bytes.NewReader(msgJsonBytes)) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) + + // Wait for the message to be received + time.Sleep(1 * time.Second) + + rr = httptest.NewRecorder() + req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", routeRelayV1AutoMessages, url.QueryEscape(cTopic1)), bytes.NewReader([]byte{})) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) + + var messages []*pb.WakuMessage err = json.Unmarshal(rr.Body.Bytes(), &messages) require.NoError(t, err) - require.Len(t, messages, 0) + require.Len(t, messages, 1) + + rr = httptest.NewRecorder() + req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", routeRelayV1AutoMessages, url.QueryEscape(cTopic1)), bytes.NewReader([]byte{})) + router1.ServeHTTP(rr, req) + require.Equal(t, http.StatusNotFound, rr.Code) + } diff --git a/cmd/waku/server/rest/waku_rest.go b/cmd/waku/server/rest/waku_rest.go index 98c5437c..3126f37e 100644 --- a/cmd/waku/server/rest/waku_rest.go +++ b/cmd/waku/server/rest/waku_rest.go @@ -22,7 +22,16 @@ type WakuRest struct { filterService *FilterService } -func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool, enableAdmin bool, relayCacheCapacity, filterCacheCapacity int, log *zap.Logger) *WakuRest { +type RestConfig struct { + Address string + Port uint + EnablePProf bool + EnableAdmin bool + RelayCacheCapacity uint + FilterCacheCapacity uint +} + +func NewWakuRest(node *node.WakuNode, config RestConfig, log *zap.Logger) *WakuRest { wrpc := new(WakuRest) wrpc.log = log.Named("rest") @@ -30,7 +39,7 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool mux.Use(middleware.Logger) mux.Use(middleware.NoCache) - if enablePProf { + if config.EnablePProf { mux.Mount("/debug", middleware.Profiler()) } @@ -39,7 +48,7 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool _ = NewStoreService(node, mux) _ = NewLightpushService(node, mux, log) - listenAddr := fmt.Sprintf("%s:%d", address, port) + listenAddr := fmt.Sprintf("%s:%d", config.Address, config.Port) server := &http.Server{ Addr: listenAddr, @@ -50,19 +59,16 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool wrpc.server = server if node.Relay() != nil { - relayService := NewRelayService(node, mux, relayCacheCapacity, log) - server.RegisterOnShutdown(func() { - relayService.Stop() - }) + relayService := NewRelayService(node, mux, config.RelayCacheCapacity, log) wrpc.relayService = relayService } - if enableAdmin { + if config.EnableAdmin { _ = NewAdminService(node, mux, wrpc.log) } if node.FilterLightnode() != nil { - filterService := NewFilterService(node, mux, filterCacheCapacity, log) + filterService := NewFilterService(node, mux, int(config.FilterCacheCapacity), log) server.RegisterOnShutdown(func() { filterService.Stop() }) @@ -75,9 +81,6 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool func (r *WakuRest) Start(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() - if r.node.Relay() != nil { - go r.relayService.Start(ctx) - } if r.node.FilterLightnode() != nil { go r.filterService.Start(ctx) } diff --git a/cmd/waku/server/rest/waku_rest_test.go b/cmd/waku/server/rest/waku_rest_test.go index d27810b4..0d62c22e 100644 --- a/cmd/waku/server/rest/waku_rest_test.go +++ b/cmd/waku/server/rest/waku_rest_test.go @@ -13,7 +13,7 @@ func TestWakuRest(t *testing.T) { n, err := node.New(options) require.NoError(t, err) - rpc := NewWakuRest(n, "127.0.0.1", 8080, false, false, 10, 0, utils.Logger()) + rpc := NewWakuRest(n, RestConfig{Address: "127.0.0.1", Port: 8080, EnablePProf: false, EnableAdmin: false, RelayCacheCapacity: 10}, utils.Logger()) require.NotNil(t, rpc.server) require.Equal(t, rpc.server.Addr, "127.0.0.1:8080") } diff --git a/cmd/waku/server/rpc/admin_test.go b/cmd/waku/server/rpc/admin_test.go index 41966e6f..a17979e1 100644 --- a/cmd/waku/server/rpc/admin_test.go +++ b/cmd/waku/server/rpc/admin_test.go @@ -35,7 +35,8 @@ func TestV1Peers(t *testing.T) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay := relay.NewWakuRelay(nil, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + bcast := relay.NewBroadcaster(10) + relay := relay.NewWakuRelay(bcast, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) relay.SetHost(host) err = relay.Start(context.Background()) require.NoError(t, err) diff --git a/cmd/waku/server/rpc/filter_test.go b/cmd/waku/server/rpc/filter_test.go index e6c7d698..160afca7 100644 --- a/cmd/waku/server/rpc/filter_test.go +++ b/cmd/waku/server/rpc/filter_test.go @@ -50,6 +50,7 @@ func makeFilterService(t *testing.T, isFullNode bool) *FilterService { } func TestFilterSubscription(t *testing.T) { + t.Skip("skipping since it is legacy filter") port, err := tests.FindFreePort(t, "", 5) require.NoError(t, err) @@ -110,6 +111,8 @@ func TestFilterSubscription(t *testing.T) { } func TestFilterGetV1Messages(t *testing.T) { + t.Skip("skipping since it is legacy filter") + serviceA := makeFilterService(t, true) var reply SuccessReply diff --git a/cmd/waku/server/rpc/relay.go b/cmd/waku/server/rpc/relay.go index 5f431046..4994706b 100644 --- a/cmd/waku/server/rpc/relay.go +++ b/cmd/waku/server/rpc/relay.go @@ -1,15 +1,12 @@ package rpc import ( - "errors" "fmt" "net/http" - "sync" "github.com/waku-org/go-waku/cmd/waku/server" "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "go.uber.org/zap" ) @@ -20,11 +17,7 @@ type RelayService struct { log *zap.Logger - messages map[string][]*pb.WakuMessage cacheCapacity int - messagesMutex sync.RWMutex - - runner *runnerService } // RelayMessageArgs represents the requests used for posting messages @@ -54,46 +47,17 @@ func NewRelayService(node *node.WakuNode, cacheCapacity int, log *zap.Logger) *R node: node, cacheCapacity: cacheCapacity, log: log.Named("relay"), - messages: make(map[string][]*pb.WakuMessage), } - s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope) - return s } -func (r *RelayService) addEnvelope(envelope *protocol.Envelope) { - r.messagesMutex.Lock() - defer r.messagesMutex.Unlock() - - if _, ok := r.messages[envelope.PubsubTopic()]; !ok { - return - } - - // Keep a specific max number of messages per topic - if len(r.messages[envelope.PubsubTopic()]) >= r.cacheCapacity { - r.messages[envelope.PubsubTopic()] = r.messages[envelope.PubsubTopic()][1:] - } - - r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message()) -} - // Start starts the RelayService func (r *RelayService) Start() { - r.messagesMutex.Lock() - // Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these - for _, topic := range r.node.Relay().Topics() { - r.log.Info("adding topic handler for existing subscription", zap.String("topic", topic)) - r.messages[topic] = make([]*pb.WakuMessage, 0) - } - r.messagesMutex.Unlock() - - r.runner.Start() } // Stop stops the RelayService func (r *RelayService) Stop() { - r.runner.Stop() } // PostV1Message is invoked when the json rpc request uses the post_waku_v2_relay_v1_message method @@ -105,10 +69,6 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, topic = args.Topic } - if !r.node.Relay().IsSubscribed(topic) { - return errors.New("not subscribed to pubsubTopic") - } - msg := args.Message.toProto() if err = server.AppendRLNProof(r.node, msg); err != nil { @@ -204,17 +164,12 @@ func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, r if topic == "" { topic = relay.DefaultWakuTopic } - var sub *relay.Subscription - subs, err := r.node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic)) + _, err = r.node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic)) if err != nil { r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err)) return err } - sub = subs[0] - sub.Unsubscribe() - r.messagesMutex.Lock() - r.messages[topic] = make([]*pb.WakuMessage, 0) - r.messagesMutex.Unlock() + } *reply = true @@ -230,8 +185,6 @@ func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, r.log.Error("unsubscribing from topic", zap.String("topic", topic), zap.Error(err)) return err } - - delete(r.messages, topic) } *reply = true @@ -240,18 +193,16 @@ func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, // GetV1Messages is invoked when the json rpc request uses the get_waku_v2_relay_v1_messages method func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *MessagesReply) error { - r.messagesMutex.Lock() - defer r.messagesMutex.Unlock() - if _, ok := r.messages[args.Topic]; !ok { - return fmt.Errorf("topic %s not subscribed", args.Topic) + sub, err := r.node.Relay().GetSubscriptionWithPubsubTopic(args.Topic, "") + if err != nil { + return err } - - for i := range r.messages[args.Topic] { - *reply = append(*reply, ProtoToRPC(r.messages[args.Topic][i])) + select { + case msg := <-sub.Ch: + *reply = append(*reply, ProtoToRPC(msg.Message())) + default: + break } - - r.messages[args.Topic] = make([]*pb.WakuMessage, 0) - return nil } diff --git a/cmd/waku/server/rpc/relay_test.go b/cmd/waku/server/rpc/relay_test.go index 61427faf..cd9a408f 100644 --- a/cmd/waku/server/rpc/relay_test.go +++ b/cmd/waku/server/rpc/relay_test.go @@ -111,7 +111,8 @@ func TestRelayGetV1Messages(t *testing.T) { &RelayMessageArgs{ Topic: "test", Message: ProtoToRPC(&pb.WakuMessage{ - Payload: []byte("test"), + Payload: []byte("test"), + ContentTopic: "testContentTopic", }), }, &reply, diff --git a/waku/v2/protocol/relay/config.go b/waku/v2/protocol/relay/config.go index d343edd5..16799863 100644 --- a/waku/v2/protocol/relay/config.go +++ b/waku/v2/protocol/relay/config.go @@ -15,6 +15,7 @@ var DefaultRelaySubscriptionBufferSize int = 1024 type RelaySubscribeParameters struct { dontConsume bool + cacheSize uint } type RelaySubscribeOption func(*RelaySubscribeParameters) error @@ -28,6 +29,13 @@ func WithoutConsumer() RelaySubscribeOption { } } +func WithCacheSize(size uint) RelaySubscribeOption { + return func(params *RelaySubscribeParameters) error { + params.cacheSize = size + return nil + } +} + func msgIDFn(pmsg *pubsub_pb.Message) string { return string(hash.SHA256(pmsg.Data)) } diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index f55eba1e..c195ae07 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -279,13 +279,31 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts . return hash, nil } +// GetSubscriptionWithPubsubTopic fetches subscription matching pubsub and contentTopic +func (w *WakuRelay) GetSubscriptionWithPubsubTopic(pubsubTopic string, contentTopic string) (*Subscription, error) { + var contentFilter waku_proto.ContentFilter + if contentTopic != "" { + contentFilter = waku_proto.NewContentFilter(pubsubTopic, contentTopic) + } else { + contentFilter = waku_proto.NewContentFilter(pubsubTopic) + } + cSubs := w.contentSubs[pubsubTopic] + for _, sub := range cSubs { + if sub.contentFilter.Equals(contentFilter) { + return sub, nil + } + } + return nil, errors.New("no subscription found for content topic") +} + +// GetSubscription fetches subscription matching a contentTopic(via autosharding) func (w *WakuRelay) GetSubscription(contentTopic string) (*Subscription, error) { - pubSubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic) + pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic) if err != nil { return nil, err } - contentFilter := waku_proto.NewContentFilter(pubSubTopic, contentTopic) - cSubs := w.contentSubs[pubSubTopic] + contentFilter := waku_proto.NewContentFilter(pubsubTopic, contentTopic) + cSubs := w.contentSubs[pubsubTopic] for _, sub := range cSubs { if sub.contentFilter.Equals(contentFilter) { return sub, nil @@ -331,6 +349,9 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont return nil, err } } + if params.cacheSize <= 0 { + params.cacheSize = uint(DefaultRelaySubscriptionBufferSize) + } for pubSubTopic, cTopics := range pubSubTopicMap { w.log.Info("subscribing to", zap.String("pubsubTopic", pubSubTopic), zap.Strings("contenTopics", cTopics)) @@ -347,7 +368,7 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont } } - subscription := w.bcaster.Register(cFilter, WithBufferSize(DefaultRelaySubscriptionBufferSize), + subscription := w.bcaster.Register(cFilter, WithBufferSize(int(params.cacheSize)), WithConsumerOption(params.dontConsume)) // Create Content subscription From 3226def4cf89f4ebe38c54569ce0c4f31f2698a9 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 7 Nov 2023 22:43:19 +0530 Subject: [PATCH 3/3] feat: On Demand Peer Discovery based on shard and service (#834) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * refactor discovery and common service to separate package to remove package inter-dependencies * relay on-demand discovery ,use proto to enr field mapping * chore: no need to dial discovered peers as peermanager already does that * on demand discovery for service peers during peer selection * identify supported protocols for discovered peers and add to service slots * fix: tests to use proper static sharding topics * fix: random selection with default pubsubTopic --------- Co-authored-by: richΛrd --- cmd/waku/node.go | 14 - tests/utils.go | 83 +++++ waku/v2/discv5/discover.go | 10 +- waku/v2/discv5/discover_test.go | 118 +------ .../mock_peer_discoverer.go | 5 +- waku/v2/node/wakunode2.go | 8 +- waku/v2/peermanager/peer_connector.go | 9 +- waku/v2/peermanager/peer_discovery.go | 117 +++++++ waku/v2/peermanager/peer_manager.go | 321 +++++------------- waku/v2/peermanager/peer_manager_test.go | 108 +++++- waku/v2/peermanager/peer_selection.go | 227 +++++++++++++ waku/v2/protocol/enr/enr.go | 4 +- waku/v2/protocol/filter/client.go | 5 +- waku/v2/protocol/filter/filter_test.go | 5 +- waku/v2/protocol/filter/options.go | 7 + waku/v2/protocol/filter/server.go | 11 +- waku/v2/protocol/legacy_filter/waku_filter.go | 5 +- waku/v2/protocol/lightpush/waku_lightpush.go | 5 + waku/v2/protocol/peer_exchange/client.go | 5 +- waku/v2/protocol/peer_exchange/protocol.go | 7 +- .../peer_exchange/waku_peer_exchange_test.go | 107 +----- waku/v2/protocol/relay/waku_relay.go | 6 +- waku/v2/protocol/store/waku_store_common.go | 4 + waku/v2/rendezvous/rendezvous.go | 10 +- waku/v2/rendezvous/rendezvous_test.go | 6 +- .../common_discovery_service.go | 7 +- .../{protocol => service}/common_service.go | 2 +- .../common_service_test.go | 2 +- 28 files changed, 715 insertions(+), 503 deletions(-) rename waku/v2/{peermanager => discv5}/mock_peer_discoverer.go (93%) create mode 100644 waku/v2/peermanager/peer_discovery.go create mode 100644 waku/v2/peermanager/peer_selection.go rename waku/v2/{peermanager => service}/common_discovery_service.go (93%) rename waku/v2/{protocol => service}/common_service.go (99%) rename waku/v2/{protocol => service}/common_service_test.go (96%) diff --git a/cmd/waku/node.go b/cmd/waku/node.go index 56e3f83a..534b1e00 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -421,20 +421,6 @@ func Execute(options NodeOptions) error { } } - if len(discoveredNodes) != 0 { - for _, n := range discoveredNodes { - go func(ctx context.Context, info peer.AddrInfo) { - ctx, cancel := context.WithTimeout(ctx, dialTimeout) - defer cancel() - err = wakuNode.DialPeerWithInfo(ctx, info) - if err != nil { - logger.Error("dialing peer", logging.HostID("peer", info.ID), zap.Error(err)) - } - }(ctx, n.PeerInfo) - - } - } - var rpcServer *rpc.WakuRPC if options.RPCServer.Enable { rpcServer = rpc.NewWakuRPC(wakuNode, options.RPCServer.Address, options.RPCServer.Port, options.RPCServer.Admin, options.PProf, options.RPCServer.RelayCacheCapacity, logger) diff --git a/tests/utils.go b/tests/utils.go index ae54a594..01100b77 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -2,21 +2,33 @@ package tests import ( "context" + "crypto/ecdsa" "crypto/rand" "encoding/hex" "fmt" "io" + "math" "net" + "strconv" "testing" + gcrypto "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/config" "github.com/libp2p/go-libp2p/core/crypto" + libp2pcrypto "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/waku/v2/peerstore" + wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/utils" + "go.uber.org/zap" ) // GetHostAddress returns the first listen address used by a host @@ -137,3 +149,74 @@ func RandomHex(n int) (string, error) { } return hex.EncodeToString(bytes), nil } + +func NewLocalnode(priv *ecdsa.PrivateKey, ipAddr *net.TCPAddr, udpPort int, wakuFlags wenr.WakuEnrBitfield, advertiseAddr *net.IP, log *zap.Logger) (*enode.LocalNode, error) { + db, err := enode.OpenDB("") + if err != nil { + return nil, err + } + localnode := enode.NewLocalNode(db, priv) + localnode.SetFallbackUDP(udpPort) + localnode.Set(enr.WithEntry(wenr.WakuENRField, wakuFlags)) + localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) + localnode.SetStaticIP(ipAddr.IP) + + if udpPort > 0 && udpPort <= math.MaxUint16 { + localnode.Set(enr.UDP(uint16(udpPort))) // lgtm [go/incorrect-integer-conversion] + } else { + log.Error("setting udpPort", zap.Int("port", udpPort)) + } + + if ipAddr.Port > 0 && ipAddr.Port <= math.MaxUint16 { + localnode.Set(enr.TCP(uint16(ipAddr.Port))) // lgtm [go/incorrect-integer-conversion] + } else { + log.Error("setting tcpPort", zap.Int("port", ipAddr.Port)) + } + + if advertiseAddr != nil { + localnode.SetStaticIP(*advertiseAddr) + } + + return localnode, nil +} + +func CreateHost(t *testing.T, opts ...config.Option) (host.Host, int, *ecdsa.PrivateKey) { + privKey, err := gcrypto.GenerateKey() + require.NoError(t, err) + + sPrivKey := libp2pcrypto.PrivKey(utils.EcdsaPrivKeyToSecp256k1PrivKey(privKey)) + + port, err := FindFreePort(t, "127.0.0.1", 3) + require.NoError(t, err) + + sourceMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)) + require.NoError(t, err) + + opts = append(opts, libp2p.ListenAddrs(sourceMultiAddr), + libp2p.Identity(sPrivKey)) + + host, err := libp2p.New(opts...) + require.NoError(t, err) + + return host, port, privKey +} + +func ExtractIP(addr multiaddr.Multiaddr) (*net.TCPAddr, error) { + ipStr, err := addr.ValueForProtocol(multiaddr.P_IP4) + if err != nil { + return nil, err + } + + portStr, err := addr.ValueForProtocol(multiaddr.P_TCP) + if err != nil { + return nil, err + } + port, err := strconv.Atoi(portStr) + if err != nil { + return nil, err + } + return &net.TCPAddr{ + IP: net.ParseIP(ipStr), + Port: port, + }, nil +} diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index ba0a7c36..ebcecedf 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -14,9 +14,9 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-discover/discover" "github.com/waku-org/go-waku/logging" - "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/peerstore" wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" + "github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" @@ -29,7 +29,7 @@ var ErrNoDiscV5Listener = errors.New("no discv5 listener") // PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol type PeerConnector interface { - Subscribe(context.Context, <-chan peermanager.PeerData) + Subscribe(context.Context, <-chan service.PeerData) } type DiscoveryV5 struct { @@ -46,7 +46,7 @@ type DiscoveryV5 struct { log *zap.Logger - *peermanager.CommonDiscoveryService + *service.CommonDiscoveryService } type discV5Parameters struct { @@ -139,7 +139,7 @@ func NewDiscoveryV5(priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConn params: params, peerConnector: peerConnector, NAT: NAT, - CommonDiscoveryService: peermanager.NewCommonDiscoveryService(), + CommonDiscoveryService: service.NewCommonDiscoveryService(), localnode: localnode, metrics: newMetrics(reg), config: discover.Config{ @@ -438,7 +438,7 @@ func (d *DiscoveryV5) peerLoop(ctx context.Context) error { defer iterator.Close() d.Iterate(ctx, iterator, func(n *enode.Node, p peer.AddrInfo) error { - peer := peermanager.PeerData{ + peer := service.PeerData{ Origin: peerstore.Discv5, AddrInfo: p, ENR: n, diff --git a/waku/v2/discv5/discover_test.go b/waku/v2/discv5/discover_test.go index ba906b03..8e55536d 100644 --- a/waku/v2/discv5/discover_test.go +++ b/waku/v2/discv5/discover_test.go @@ -2,151 +2,67 @@ package discv5 import ( "context" - "crypto/ecdsa" - "fmt" - "math" - "net" - "strconv" "testing" "time" - gcrypto "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/p2p/enr" "github.com/prometheus/client_golang/prometheus" - "github.com/waku-org/go-waku/waku/v2/peermanager" + wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" - "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/utils" - "go.uber.org/zap" - - "github.com/libp2p/go-libp2p" - libp2pcrypto "github.com/libp2p/go-libp2p/core/crypto" - "github.com/libp2p/go-libp2p/core/host" ) -func createHost(t *testing.T) (host.Host, int, *ecdsa.PrivateKey) { - privKey, err := gcrypto.GenerateKey() - require.NoError(t, err) - - sPrivKey := libp2pcrypto.PrivKey(utils.EcdsaPrivKeyToSecp256k1PrivKey(privKey)) - - port, err := tests.FindFreePort(t, "127.0.0.1", 3) - require.NoError(t, err) - - sourceMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)) - require.NoError(t, err) - - host, err := libp2p.New( - libp2p.ListenAddrs(sourceMultiAddr), - libp2p.Identity(sPrivKey), - ) - require.NoError(t, err) - - return host, port, privKey -} - -func newLocalnode(priv *ecdsa.PrivateKey, ipAddr *net.TCPAddr, udpPort int, wakuFlags wenr.WakuEnrBitfield, advertiseAddr *net.IP, log *zap.Logger) (*enode.LocalNode, error) { - db, err := enode.OpenDB("") - if err != nil { - return nil, err - } - localnode := enode.NewLocalNode(db, priv) - localnode.SetFallbackUDP(udpPort) - localnode.Set(enr.WithEntry(wenr.WakuENRField, wakuFlags)) - localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) - localnode.SetStaticIP(ipAddr.IP) - - if udpPort > 0 && udpPort <= math.MaxUint16 { - localnode.Set(enr.UDP(uint16(udpPort))) // lgtm [go/incorrect-integer-conversion] - } else { - log.Error("setting udpPort", zap.Int("port", udpPort)) - } - - if ipAddr.Port > 0 && ipAddr.Port <= math.MaxUint16 { - localnode.Set(enr.TCP(uint16(ipAddr.Port))) // lgtm [go/incorrect-integer-conversion] - } else { - log.Error("setting tcpPort", zap.Int("port", ipAddr.Port)) - } - - if advertiseAddr != nil { - localnode.SetStaticIP(*advertiseAddr) - } - - return localnode, nil -} - -func extractIP(addr multiaddr.Multiaddr) (*net.TCPAddr, error) { - ipStr, err := addr.ValueForProtocol(multiaddr.P_IP4) - if err != nil { - return nil, err - } - - portStr, err := addr.ValueForProtocol(multiaddr.P_TCP) - if err != nil { - return nil, err - } - port, err := strconv.Atoi(portStr) - if err != nil { - return nil, err - } - return &net.TCPAddr{ - IP: net.ParseIP(ipStr), - Port: port, - }, nil -} - func TestDiscV5(t *testing.T) { // Host1 <-> Host2 <-> Host3 // Host4(No waku capabilities) <-> Host2 // H1 - host1, _, prvKey1 := createHost(t) + host1, _, prvKey1 := tests.CreateHost(t) udpPort1, err := tests.FindFreeUDPPort(t, "127.0.0.1", 3) require.NoError(t, err) - ip1, _ := extractIP(host1.Addrs()[0]) - l1, err := newLocalnode(prvKey1, ip1, udpPort1, wenr.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) + ip1, _ := tests.ExtractIP(host1.Addrs()[0]) + l1, err := tests.NewLocalnode(prvKey1, ip1, udpPort1, wenr.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) require.NoError(t, err) - peerconn1 := peermanager.NewTestPeerDiscoverer() + peerconn1 := NewTestPeerDiscoverer() d1, err := NewDiscoveryV5(prvKey1, l1, peerconn1, prometheus.DefaultRegisterer, utils.Logger(), WithUDPPort(uint(udpPort1))) require.NoError(t, err) d1.SetHost(host1) // H2 - host2, _, prvKey2 := createHost(t) - ip2, _ := extractIP(host2.Addrs()[0]) + host2, _, prvKey2 := tests.CreateHost(t) + ip2, _ := tests.ExtractIP(host2.Addrs()[0]) udpPort2, err := tests.FindFreeUDPPort(t, "127.0.0.1", 3) require.NoError(t, err) - l2, err := newLocalnode(prvKey2, ip2, udpPort2, wenr.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) + l2, err := tests.NewLocalnode(prvKey2, ip2, udpPort2, wenr.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) require.NoError(t, err) - peerconn2 := peermanager.NewTestPeerDiscoverer() + peerconn2 := NewTestPeerDiscoverer() d2, err := NewDiscoveryV5(prvKey2, l2, peerconn2, prometheus.DefaultRegisterer, utils.Logger(), WithUDPPort(uint(udpPort2)), WithBootnodes([]*enode.Node{d1.localnode.Node()})) require.NoError(t, err) d2.SetHost(host2) // H3 - host3, _, prvKey3 := createHost(t) - ip3, _ := extractIP(host3.Addrs()[0]) + host3, _, prvKey3 := tests.CreateHost(t) + ip3, _ := tests.ExtractIP(host3.Addrs()[0]) udpPort3, err := tests.FindFreeUDPPort(t, "127.0.0.1", 3) require.NoError(t, err) - l3, err := newLocalnode(prvKey3, ip3, udpPort3, wenr.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) + l3, err := tests.NewLocalnode(prvKey3, ip3, udpPort3, wenr.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) require.NoError(t, err) - peerconn3 := peermanager.NewTestPeerDiscoverer() + peerconn3 := NewTestPeerDiscoverer() d3, err := NewDiscoveryV5(prvKey3, l3, peerconn3, prometheus.DefaultRegisterer, utils.Logger(), WithUDPPort(uint(udpPort3)), WithBootnodes([]*enode.Node{d2.localnode.Node()})) require.NoError(t, err) d3.SetHost(host3) // H4 doesn't have any Waku capabilities - host4, _, prvKey4 := createHost(t) - ip4, _ := extractIP(host2.Addrs()[0]) + host4, _, prvKey4 := tests.CreateHost(t) + ip4, _ := tests.ExtractIP(host2.Addrs()[0]) udpPort4, err := tests.FindFreeUDPPort(t, "127.0.0.1", 3) require.NoError(t, err) - l4, err := newLocalnode(prvKey4, ip4, udpPort4, 0, nil, utils.Logger()) + l4, err := tests.NewLocalnode(prvKey4, ip4, udpPort4, 0, nil, utils.Logger()) require.NoError(t, err) - peerconn4 := peermanager.NewTestPeerDiscoverer() + peerconn4 := NewTestPeerDiscoverer() d4, err := NewDiscoveryV5(prvKey4, l4, peerconn4, prometheus.DefaultRegisterer, utils.Logger(), WithUDPPort(uint(udpPort4)), WithBootnodes([]*enode.Node{d2.localnode.Node()})) require.NoError(t, err) d2.SetHost(host2) diff --git a/waku/v2/peermanager/mock_peer_discoverer.go b/waku/v2/discv5/mock_peer_discoverer.go similarity index 93% rename from waku/v2/peermanager/mock_peer_discoverer.go rename to waku/v2/discv5/mock_peer_discoverer.go index f7ea5138..5bef8542 100644 --- a/waku/v2/peermanager/mock_peer_discoverer.go +++ b/waku/v2/discv5/mock_peer_discoverer.go @@ -1,10 +1,11 @@ -package peermanager +package discv5 import ( "context" "sync" "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/service" ) // TestPeerDiscoverer is mock peer discoverer for testing @@ -23,7 +24,7 @@ func NewTestPeerDiscoverer() *TestPeerDiscoverer { } // Subscribe is for subscribing to peer discoverer -func (t *TestPeerDiscoverer) Subscribe(ctx context.Context, ch <-chan PeerData) { +func (t *TestPeerDiscoverer) Subscribe(ctx context.Context, ch <-chan service.PeerData) { go func() { for { select { diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 8fc659c2..0bb611f7 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -43,6 +43,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/rendezvous" + "github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" @@ -289,6 +290,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { } w.opts.legacyFilterOpts = append(w.opts.legacyFilterOpts, legacy_filter.WithPeerManager(w.peermanager)) + w.opts.filterOpts = append(w.opts.filterOpts, filter.WithPeerManager(w.peermanager)) w.legacyFilter = legacy_filter.NewWakuFilter(w.bcaster, w.opts.isLegacyFilterFullNode, w.timesource, w.opts.prometheusReg, w.log, w.opts.legacyFilterOpts...) w.filterFullNode = filter.NewWakuFilterFullNode(w.timesource, w.opts.prometheusReg, w.log, w.opts.filterOpts...) @@ -691,7 +693,9 @@ func (w *WakuNode) mountDiscV5() error { } var err error - w.discoveryV5, err = discv5.NewDiscoveryV5(w.opts.privKey, w.localNode, w.peerConnector, w.opts.prometheusReg, w.log, discV5Options...) + discv5Inst, err := discv5.NewDiscoveryV5(w.opts.privKey, w.localNode, w.peerConnector, w.opts.prometheusReg, w.log, discV5Options...) + w.discoveryV5 = discv5Inst + w.peermanager.SetDiscv5(discv5Inst) return err } @@ -714,7 +718,7 @@ func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics // AddDiscoveredPeer to add a discovered peer to the node peerStore func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubsubTopics []string, connectNow bool) { - p := peermanager.PeerData{ + p := service.PeerData{ Origin: origin, AddrInfo: peer.AddrInfo{ ID: ID, diff --git a/waku/v2/peermanager/peer_connector.go b/waku/v2/peermanager/peer_connector.go index a4dbd2c6..553313af 100644 --- a/waku/v2/peermanager/peer_connector.go +++ b/waku/v2/peermanager/peer_connector.go @@ -18,6 +18,7 @@ import ( "github.com/waku-org/go-waku/logging" wps "github.com/waku-org/go-waku/waku/v2/peerstore" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/service" "go.uber.org/zap" @@ -34,7 +35,7 @@ type PeerConnectionStrategy struct { paused atomic.Bool dialTimeout time.Duration - *CommonDiscoveryService + *service.CommonDiscoveryService subscriptions []subscription backoff backoff.BackoffFactory @@ -43,7 +44,7 @@ type PeerConnectionStrategy struct { type subscription struct { ctx context.Context - ch <-chan PeerData + ch <-chan service.PeerData } // backoff describes the strategy used to decide how long to backoff after previously attempting to connect to a peer @@ -71,7 +72,7 @@ func NewPeerConnectionStrategy(pm *PeerManager, pc := &PeerConnectionStrategy{ cache: cache, dialTimeout: dialTimeout, - CommonDiscoveryService: NewCommonDiscoveryService(), + CommonDiscoveryService: service.NewCommonDiscoveryService(), pm: pm, backoff: getBackOff(), logger: logger.Named("discovery-connector"), @@ -86,7 +87,7 @@ type connCacheData struct { } // Subscribe receives channels on which discovered peers should be pushed -func (c *PeerConnectionStrategy) Subscribe(ctx context.Context, ch <-chan PeerData) { +func (c *PeerConnectionStrategy) Subscribe(ctx context.Context, ch <-chan service.PeerData) { // if not running yet, store the subscription and return if err := c.ErrOnNotRunning(); err != nil { c.mux.Lock() diff --git a/waku/v2/peermanager/peer_discovery.go b/waku/v2/peermanager/peer_discovery.go new file mode 100644 index 00000000..72ee3077 --- /dev/null +++ b/waku/v2/peermanager/peer_discovery.go @@ -0,0 +1,117 @@ +package peermanager + +import ( + "context" + "errors" + + "github.com/libp2p/go-libp2p/core/protocol" + "github.com/waku-org/go-waku/waku/v2/discv5" + wps "github.com/waku-org/go-waku/waku/v2/peerstore" + waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" + wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/service" + "go.uber.org/zap" +) + +// DiscoverAndConnectToPeers discovers peers using discoveryv5 and connects to the peers. +// It discovers peers till maxCount peers are found for the cluster,shard and protocol or the context passed expires. +func (pm *PeerManager) DiscoverAndConnectToPeers(ctx context.Context, cluster uint16, + shard uint16, serviceProtocol protocol.ID, maxCount int) error { + if pm.discoveryService == nil { + return nil + } + peers, err := pm.discoverOnDemand(cluster, shard, serviceProtocol, ctx, maxCount) + if err != nil { + return err + } + + pm.logger.Debug("discovered peers on demand ", zap.Int("noOfPeers", len(peers))) + connectNow := false + //Add discovered peers to peerStore and connect to them + for idx, p := range peers { + if serviceProtocol != relay.WakuRelayID_v200 && idx <= maxCount { + //how many connections to initiate? Maybe this could be a config exposed to client API. + //For now just going ahead with initiating connections with 2 nodes in case of non-relay service peers + //In case of relay let it go through connectivityLoop + connectNow = true + } + pm.AddDiscoveredPeer(p, connectNow) + } + return nil +} + +// RegisterWakuProtocol to be used by Waku protocols that could be used for peer discovery +// Which means protoocl should be as defined in waku2 ENR key in https://rfc.vac.dev/spec/31/. +func (pm *PeerManager) RegisterWakuProtocol(proto protocol.ID, bitField uint8) { + pm.wakuprotoToENRFieldMap[proto] = WakuProtoInfo{waku2ENRBitField: bitField} +} + +// OnDemandPeerDiscovery initiates an on demand peer discovery and +// filters peers based on cluster,shard and any wakuservice protocols specified +func (pm *PeerManager) discoverOnDemand(cluster uint16, + shard uint16, wakuProtocol protocol.ID, ctx context.Context, maxCount int) ([]service.PeerData, error) { + var peers []service.PeerData + + wakuProtoInfo, ok := pm.wakuprotoToENRFieldMap[wakuProtocol] + if !ok { + pm.logger.Error("cannot do on demand discovery for non-waku protocol", zap.String("protocol", string(wakuProtocol))) + return nil, errors.New("cannot do on demand discovery for non-waku protocol") + } + iterator, err := pm.discoveryService.PeerIterator( + discv5.FilterShard(cluster, shard), + discv5.FilterCapabilities(wakuProtoInfo.waku2ENRBitField), + ) + if err != nil { + pm.logger.Error("failed to find peers for shard and services", zap.Uint16("cluster", cluster), + zap.Uint16("shard", shard), zap.String("service", string(wakuProtocol)), zap.Error(err)) + return peers, err + } + + //Iterate and fill peers. + defer iterator.Close() + + for iterator.Next() { + + pInfo, err := wenr.EnodeToPeerInfo(iterator.Node()) + if err != nil { + continue + } + pData := service.PeerData{ + Origin: wps.Discv5, + ENR: iterator.Node(), + AddrInfo: *pInfo, + } + peers = append(peers, pData) + + if len(peers) >= maxCount { + pm.logger.Debug("found required number of nodes, stopping on demand discovery", zap.Uint16("cluster", cluster), + zap.Uint16("shard", shard), zap.Int("required-nodes", maxCount)) + break + } + + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + } + return peers, nil +} + +func (pm *PeerManager) discoverPeersByPubsubTopic(pubsubTopic string, proto protocol.ID, ctx context.Context, maxCount int) { + shardInfo, err := waku_proto.TopicsToRelayShards(pubsubTopic) + if err != nil { + pm.logger.Error("failed to convert pubsub topic to shard", zap.String("topic", pubsubTopic), zap.Error(err)) + return + } + if len(shardInfo) > 0 { + err = pm.DiscoverAndConnectToPeers(ctx, shardInfo[0].ClusterID, shardInfo[0].ShardIDs[0], proto, maxCount) + if err != nil { + pm.logger.Error("failed to discover and conenct to peers", zap.Error(err)) + } + } else { + pm.logger.Debug("failed to convert pubsub topic to shard as topic is named pubsubTopic", zap.String("topic", pubsubTopic)) + } +} diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index d39ae51a..f309b008 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -3,10 +3,10 @@ package peermanager import ( "context" "errors" - "math/rand" "sync" "time" + "github.com/ethereum/go-ethereum/p2p/enr" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" @@ -14,13 +14,14 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/protocol" - "github.com/libp2p/go-libp2p/p2p/protocol/ping" ma "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/discv5" wps "github.com/waku-org/go-waku/waku/v2/peerstore" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/service" "go.uber.org/zap" ) @@ -30,20 +31,29 @@ type NodeTopicDetails struct { topic *pubsub.Topic } +// WakuProtoInfo holds protocol specific info +// To be used at a later stage to set various config such as criteria for peer management specific to each Waku protocols +// This should make peer-manager agnostic to protocol +type WakuProtoInfo struct { + waku2ENRBitField uint8 +} + // PeerManager applies various controls and manage connections towards peers. type PeerManager struct { - peerConnector *PeerConnectionStrategy - maxPeers int - maxRelayPeers int - logger *zap.Logger - InRelayPeersTarget int - OutRelayPeersTarget int - host host.Host - serviceSlots *ServiceSlots - ctx context.Context - sub event.Subscription - topicMutex sync.RWMutex - subRelayTopics map[string]*NodeTopicDetails + peerConnector *PeerConnectionStrategy + maxPeers int + maxRelayPeers int + logger *zap.Logger + InRelayPeersTarget int + OutRelayPeersTarget int + host host.Host + serviceSlots *ServiceSlots + ctx context.Context + sub event.Subscription + topicMutex sync.RWMutex + subRelayTopics map[string]*NodeTopicDetails + discoveryService *discv5.DiscoveryV5 + wakuprotoToENRFieldMap map[protocol.ID]WakuProtoInfo } // PeerSelection provides various options based on which Peer is selected from a list of peers. @@ -88,13 +98,14 @@ func NewPeerManager(maxConnections int, maxPeers int, logger *zap.Logger) *PeerM } pm := &PeerManager{ - logger: logger.Named("peer-manager"), - maxRelayPeers: maxRelayPeers, - InRelayPeersTarget: inRelayPeersTarget, - OutRelayPeersTarget: outRelayPeersTarget, - serviceSlots: NewServiceSlot(), - subRelayTopics: make(map[string]*NodeTopicDetails), - maxPeers: maxPeers, + logger: logger.Named("peer-manager"), + maxRelayPeers: maxRelayPeers, + InRelayPeersTarget: inRelayPeersTarget, + OutRelayPeersTarget: outRelayPeersTarget, + serviceSlots: NewServiceSlot(), + subRelayTopics: make(map[string]*NodeTopicDetails), + maxPeers: maxPeers, + wakuprotoToENRFieldMap: map[protocol.ID]WakuProtoInfo{}, } logger.Info("PeerManager init values", zap.Int("maxConnections", maxConnections), zap.Int("maxRelayPeers", maxRelayPeers), @@ -105,6 +116,11 @@ func NewPeerManager(maxConnections int, maxPeers int, logger *zap.Logger) *PeerM return pm } +// SetDiscv5 sets the discoveryv5 service to be used for peer discovery. +func (pm *PeerManager) SetDiscv5(discv5 *discv5.DiscoveryV5) { + pm.discoveryService = discv5 +} + // SetHost sets the host to be used in order to access the peerStore. func (pm *PeerManager) SetHost(host host.Host) { pm.host = host @@ -117,6 +133,9 @@ func (pm *PeerManager) SetPeerConnector(pc *PeerConnectionStrategy) { // Start starts the processing to be done by peer manager. func (pm *PeerManager) Start(ctx context.Context) { + + pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField) + pm.ctx = ctx if pm.sub != nil { go pm.peerEventLoop(ctx) @@ -198,7 +217,7 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() { //Find not connected peers. notConnectedPeers := pm.getNotConnectedPers(topicStr) if notConnectedPeers.Len() == 0 { - //TODO: Trigger on-demand discovery for this topic. + pm.discoverPeersByPubsubTopic(topicStr, relay.WakuRelayID_v200, pm.ctx, 2) continue } //Connect to eligible peers. @@ -231,14 +250,14 @@ func (pm *PeerManager) connectToRelayPeers() { // addrInfoToPeerData returns addressinfo for a peer // If addresses are expired, it removes the peer from host peerStore and returns nil. -func addrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host) *PeerData { +func addrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host) *service.PeerData { addrs := host.Peerstore().Addrs(peerID) if len(addrs) == 0 { //Addresses expired, remove peer from peerStore host.Peerstore().RemovePeer(peerID) return nil } - return &PeerData{ + return &service.PeerData{ Origin: origin, AddrInfo: peer.AddrInfo{ ID: peerID, @@ -295,10 +314,42 @@ func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice) { } } +func (pm *PeerManager) processPeerENR(p *service.PeerData) []protocol.ID { + shards, err := wenr.RelaySharding(p.ENR.Record()) + if err != nil { + pm.logger.Error("could not derive relayShards from ENR", zap.Error(err), + logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String())) + } else { + if shards != nil { + p.PubSubTopics = make([]string, 0) + topics := shards.Topics() + for _, topic := range topics { + topicStr := topic.String() + p.PubSubTopics = append(p.PubSubTopics, topicStr) + } + } else { + pm.logger.Debug("ENR doesn't have relay shards", logging.HostID("peer", p.AddrInfo.ID)) + } + } + supportedProtos := []protocol.ID{} + //Identify and specify protocols supported by the peer based on the discovered peer's ENR + var enrField wenr.WakuEnrBitfield + if err := p.ENR.Record().Load(enr.WithEntry(wenr.WakuENRField, &enrField)); err == nil { + for proto, protoENR := range pm.wakuprotoToENRFieldMap { + protoENRField := protoENR.waku2ENRBitField + if protoENRField&enrField != 0 { + supportedProtos = append(supportedProtos, proto) + //Add Service peers to serviceSlots. + pm.addPeerToServiceSlot(proto, p.AddrInfo.ID) + } + } + } + return supportedProtos +} + // AddDiscoveredPeer to add dynamically discovered peers. // Note that these peers will not be set in service-slots. -// TODO: It maybe good to set in service-slots based on services supported in the ENR -func (pm *PeerManager) AddDiscoveredPeer(p PeerData, connectNow bool) { +func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) { //Doing this check again inside addPeer, in order to avoid additional complexity of rollingBack other changes. if pm.maxPeers <= pm.host.Peerstore().Peers().Len() { return @@ -309,27 +360,13 @@ func (pm *PeerManager) AddDiscoveredPeer(p PeerData, connectNow bool) { pm.logger.Debug("Found discovered peer already in peerStore", logging.HostID("peer", p.AddrInfo.ID)) return } - // Try to fetch shard info from ENR to arrive at pubSub topics. + supportedProtos := []protocol.ID{} if len(p.PubSubTopics) == 0 && p.ENR != nil { - shards, err := wenr.RelaySharding(p.ENR.Record()) - if err != nil { - pm.logger.Error("Could not derive relayShards from ENR", zap.Error(err), - logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String())) - } else { - if shards != nil { - p.PubSubTopics = make([]string, 0) - topics := shards.Topics() - for _, topic := range topics { - topicStr := topic.String() - p.PubSubTopics = append(p.PubSubTopics, topicStr) - } - } else { - pm.logger.Debug("ENR doesn't have relay shards", logging.HostID("peer", p.AddrInfo.ID)) - } - } + // Try to fetch shard info and supported protocols from ENR to arrive at pubSub topics. + supportedProtos = pm.processPeerENR(&p) } - _ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin, p.PubSubTopics) + _ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin, p.PubSubTopics, supportedProtos...) if p.ENR != nil { err := pm.host.Peerstore().(wps.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR) @@ -429,199 +466,3 @@ func (pm *PeerManager) addPeerToServiceSlot(proto protocol.ID, peerID peer.ID) { // getPeers returns nil for WakuRelayIDv200 protocol, but we don't run this ServiceSlot code for WakuRelayIDv200 protocol pm.serviceSlots.getPeers(proto).add(peerID) } - -// SelectPeerByContentTopic is used to return a random peer that supports a given protocol for given contentTopic. -// If a list of specific peers is passed, the peer will be chosen from that list assuming -// it supports the chosen protocol and contentTopic, otherwise it will chose a peer from the service slot. -// If a peer cannot be found in the service slot, a peer will be selected from node peerstore -func (pm *PeerManager) SelectPeerByContentTopic(proto protocol.ID, contentTopic string, specificPeers ...peer.ID) (peer.ID, error) { - pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic) - if err != nil { - return "", err - } - return pm.SelectPeer(PeerSelectionCriteria{PubsubTopic: pubsubTopic, Proto: proto, SpecificPeers: specificPeers}) -} - -// SelectRandomPeer is used to return a random peer that supports a given protocol. -// If a list of specific peers is passed, the peer will be chosen from that list assuming -// it supports the chosen protocol, otherwise it will chose a peer from the service slot. -// If a peer cannot be found in the service slot, a peer will be selected from node peerstore -// if pubSubTopic is specified, peer is selected from list that support the pubSubTopic -func (pm *PeerManager) SelectRandomPeer(criteria PeerSelectionCriteria) (peer.ID, error) { - // @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service. - // Ideally depending on the query and our set of peers we take a subset of ideal peers. - // This will require us to check for various factors such as: - // - which topics they track - // - latency? - - peerID, err := pm.selectServicePeer(criteria.Proto, criteria.PubsubTopic, criteria.SpecificPeers...) - if err == nil { - return peerID, nil - } else if !errors.Is(err, ErrNoPeersAvailable) { - pm.logger.Debug("could not retrieve random peer from slot", zap.String("protocol", string(criteria.Proto)), zap.String("pubsubTopic", criteria.PubsubTopic), zap.Error(err)) - return "", err - } - - // if not found in serviceSlots or proto == WakuRelayIDv200 - filteredPeers, err := pm.FilterPeersByProto(criteria.SpecificPeers, criteria.Proto) - if err != nil { - return "", err - } - if criteria.PubsubTopic != "" { - filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(criteria.PubsubTopic, filteredPeers...) - } - return selectRandomPeer(filteredPeers, pm.logger) -} - -func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubSubTopic string, specificPeers ...peer.ID) (peer.ID, error) { - //Try to fetch from serviceSlot - if slot := pm.serviceSlots.getPeers(proto); slot != nil { - if pubSubTopic == "" { - return slot.getRandom() - } else { //PubsubTopic based selection - keys := make([]peer.ID, 0, len(slot.m)) - for i := range slot.m { - keys = append(keys, i) - } - selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(pubSubTopic, keys...) - return selectRandomPeer(selectedPeers, pm.logger) - } - } - - return "", ErrNoPeersAvailable -} - -// PeerSelectionCriteria is the selection Criteria that is used by PeerManager to select peers. -type PeerSelectionCriteria struct { - SelectionType PeerSelection - Proto protocol.ID - PubsubTopic string - SpecificPeers peer.IDSlice - Ctx context.Context -} - -// SelectPeer selects a peer based on selectionType specified. -// Context is required only in case of selectionType set to LowestRTT -func (pm *PeerManager) SelectPeer(criteria PeerSelectionCriteria) (peer.ID, error) { - - switch criteria.SelectionType { - case Automatic: - return pm.SelectRandomPeer(criteria) - case LowestRTT: - if criteria.Ctx == nil { - criteria.Ctx = context.Background() - pm.logger.Warn("context is not passed for peerSelectionwithRTT, using background context") - } - return pm.SelectPeerWithLowestRTT(criteria) - default: - return "", errors.New("unknown peer selection type specified") - } -} - -type pingResult struct { - p peer.ID - rtt time.Duration -} - -// SelectPeerWithLowestRTT will select a peer that supports a specific protocol with the lowest reply time -// If a list of specific peers is passed, the peer will be chosen from that list assuming -// it supports the chosen protocol, otherwise it will chose a peer from the node peerstore -// TO OPTIMIZE: As of now the peer with lowest RTT is identified when select is called, this should be optimized -// to maintain the RTT as part of peer-scoring and just select based on that. -func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) (peer.ID, error) { - var peers peer.IDSlice - var err error - if criteria.Ctx == nil { - criteria.Ctx = context.Background() - } - - if criteria.PubsubTopic != "" { - peers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(criteria.PubsubTopic, criteria.SpecificPeers...) - } - - peers, err = pm.FilterPeersByProto(peers, criteria.Proto) - if err != nil { - return "", err - } - wg := sync.WaitGroup{} - waitCh := make(chan struct{}) - pingCh := make(chan pingResult, 1000) - - wg.Add(len(peers)) - - go func() { - for _, p := range peers { - go func(p peer.ID) { - defer wg.Done() - ctx, cancel := context.WithTimeout(criteria.Ctx, 3*time.Second) - defer cancel() - result := <-ping.Ping(ctx, pm.host, p) - if result.Error == nil { - pingCh <- pingResult{ - p: p, - rtt: result.RTT, - } - } else { - pm.logger.Debug("could not ping", logging.HostID("peer", p), zap.Error(result.Error)) - } - }(p) - } - wg.Wait() - close(waitCh) - close(pingCh) - }() - - select { - case <-waitCh: - var min *pingResult - for p := range pingCh { - if min == nil { - min = &p - } else { - if p.rtt < min.rtt { - min = &p - } - } - } - if min == nil { - return "", ErrNoPeersAvailable - } - - return min.p, nil - case <-criteria.Ctx.Done(): - return "", ErrNoPeersAvailable - } -} - -// selectRandomPeer selects randomly a peer from the list of peers passed. -func selectRandomPeer(peers peer.IDSlice, log *zap.Logger) (peer.ID, error) { - if len(peers) >= 1 { - peerID := peers[rand.Intn(len(peers))] - // TODO: proper heuristic here that compares peer scores and selects "best" one. For now a random peer for the given protocol is returned - return peerID, nil // nolint: gosec - } - - return "", ErrNoPeersAvailable -} - -// FilterPeersByProto filters list of peers that support specified protocols. -// If specificPeers is nil, all peers in the host's peerStore are considered for filtering. -func (pm *PeerManager) FilterPeersByProto(specificPeers peer.IDSlice, proto ...protocol.ID) (peer.IDSlice, error) { - peerSet := specificPeers - if len(peerSet) == 0 { - peerSet = pm.host.Peerstore().Peers() - } - - var peers peer.IDSlice - for _, peer := range peerSet { - protocols, err := pm.host.Peerstore().SupportsProtocols(peer, proto...) - if err != nil { - return nil, err - } - - if len(protocols) > 0 { - peers = append(peers, peer) - } - } - return peers, nil -} diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index 385c6f0a..8d90f87c 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -8,13 +8,19 @@ import ( "testing" "time" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/host" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" "github.com/multiformats/go-multiaddr" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/discv5" wps "github.com/waku-org/go-waku/waku/v2/peerstore" wakuproto "github.com/waku-org/go-waku/waku/v2/protocol" + wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/utils" ) @@ -128,27 +134,27 @@ func TestPeerSelection(t *testing.T) { defer h3.Close() protocol := libp2pProtocol.ID("test/protocol") - _, err = pm.AddPeer(getAddr(h2), wps.Static, []string{"/waku/rs/2/1", "/waku/rs/2/2"}, libp2pProtocol.ID(protocol)) + _, err = pm.AddPeer(getAddr(h2), wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, libp2pProtocol.ID(protocol)) require.NoError(t, err) - _, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/rs/2/1"}, libp2pProtocol.ID(protocol)) + _, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol)) require.NoError(t, err) _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol}) require.NoError(t, err) - peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/rs/2/2"}) + peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/2/rs/2/2"}) require.NoError(t, err) require.Equal(t, h2.ID(), peerID) - _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/rs/2/3"}) + _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/2/rs/2/3"}) require.Error(t, ErrNoPeersAvailable, err) - _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/rs/2/1"}) + _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/2/rs/2/1"}) require.NoError(t, err) //Test for selectWithLowestRTT - _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopic: "/waku/rs/2/1"}) + _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopic: "/waku/2/rs/2/1"}) require.NoError(t, err) } @@ -208,7 +214,6 @@ func TestConnectToRelayPeers(t *testing.T) { ctx, pm, deferFn := initTest(t) pc, err := NewPeerConnectionStrategy(pm, 120*time.Second, pm.logger) require.NoError(t, err) - pm.SetPeerConnector(pc) err = pc.Start(ctx) require.NoError(t, err) pm.Start(ctx) @@ -218,3 +223,92 @@ func TestConnectToRelayPeers(t *testing.T) { pm.connectToRelayPeers() } + +func createHostWithDiscv5AndPM(t *testing.T, hostName string, topic string, enrField uint8, bootnode ...*enode.Node) (host.Host, *PeerManager, *discv5.DiscoveryV5) { + ps, err := pstoremem.NewPeerstore() + require.NoError(t, err) + wakuPeerStore := wps.NewWakuPeerstore(ps) + + host, _, prvKey1 := tests.CreateHost(t, libp2p.Peerstore(wakuPeerStore)) + + logger := utils.Logger().Named(hostName) + + udpPort, err := tests.FindFreeUDPPort(t, "127.0.0.1", 3) + require.NoError(t, err) + ip1, _ := tests.ExtractIP(host.Addrs()[0]) + localNode, err := tests.NewLocalnode(prvKey1, ip1, udpPort, enrField, nil, logger) + require.NoError(t, err) + + rs, err := wakuproto.TopicsToRelayShards(topic) + require.NoError(t, err) + + err = wenr.Update(localNode, wenr.WithWakuRelaySharding(rs[0])) + require.NoError(t, err) + pm := NewPeerManager(10, 20, logger) + pm.SetHost(host) + peerconn, err := NewPeerConnectionStrategy(pm, 30*time.Second, logger) + require.NoError(t, err) + discv5, err := discv5.NewDiscoveryV5(prvKey1, localNode, peerconn, prometheus.DefaultRegisterer, logger, discv5.WithUDPPort(uint(udpPort)), discv5.WithBootnodes(bootnode)) + require.NoError(t, err) + discv5.SetHost(host) + pm.SetDiscv5(discv5) + pm.SetPeerConnector(peerconn) + + return host, pm, discv5 +} + +func TestOnDemandPeerDiscovery(t *testing.T) { + topic := "/waku/2/rs/1/1" + + // Host1 <-> Host2 <-> Host3 + host1, _, d1 := createHostWithDiscv5AndPM(t, "host1", topic, wenr.NewWakuEnrBitfield(true, true, false, true)) + + host2, _, d2 := createHostWithDiscv5AndPM(t, "host2", topic, wenr.NewWakuEnrBitfield(false, true, true, true), d1.Node()) + host3, pm3, d3 := createHostWithDiscv5AndPM(t, "host3", topic, wenr.NewWakuEnrBitfield(true, true, true, true), d2.Node()) + + defer d1.Stop() + defer d2.Stop() + defer d3.Stop() + + defer host1.Close() + defer host2.Close() + defer host3.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + err := d1.Start(ctx) + require.NoError(t, err) + + err = d2.Start(ctx) + require.NoError(t, err) + + err = d3.Start(ctx) + require.NoError(t, err) + + //Discovery should fail for non-waku protocol + _, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopic: topic, Proto: "/test"}) + require.Error(t, err) + + _, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: "/test"}) + require.Error(t, err) + + ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + var enrField uint8 + enrField |= (1 << 1) + pm3.RegisterWakuProtocol("/vac/waku/store/2.0.0-beta4", enrField) + peerID, err := pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopic: topic, Proto: "/vac/waku/store/2.0.0-beta4", Ctx: ctx}) + require.NoError(t, err) + require.Equal(t, peerID, host2.ID()) + + var enrField1 uint8 + + enrField1 |= (1 << 3) + pm3.RegisterWakuProtocol("/vac/waku/lightpush/2.0.0-beta1", enrField1) + peerID, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopic: topic, Proto: "/vac/waku/lightpush/2.0.0-beta1", Ctx: ctx}) + require.NoError(t, err) + require.Equal(t, peerID, host1.ID()) + +} diff --git a/waku/v2/peermanager/peer_selection.go b/waku/v2/peermanager/peer_selection.go new file mode 100644 index 00000000..e785c177 --- /dev/null +++ b/waku/v2/peermanager/peer_selection.go @@ -0,0 +1,227 @@ +package peermanager + +import ( + "context" + "errors" + "math/rand" + "sync" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" + "github.com/waku-org/go-waku/logging" + wps "github.com/waku-org/go-waku/waku/v2/peerstore" + waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" + "go.uber.org/zap" +) + +// SelectPeerByContentTopic is used to return a random peer that supports a given protocol for given contentTopic. +// If a list of specific peers is passed, the peer will be chosen from that list assuming +// it supports the chosen protocol and contentTopic, otherwise it will chose a peer from the service slot. +// If a peer cannot be found in the service slot, a peer will be selected from node peerstore +func (pm *PeerManager) SelectPeerByContentTopic(proto protocol.ID, contentTopic string, specificPeers ...peer.ID) (peer.ID, error) { + pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic) + if err != nil { + return "", err + } + return pm.SelectPeer(PeerSelectionCriteria{PubsubTopic: pubsubTopic, Proto: proto, SpecificPeers: specificPeers}) +} + +// SelectRandomPeer is used to return a random peer that supports a given protocol. +// If a list of specific peers is passed, the peer will be chosen from that list assuming +// it supports the chosen protocol, otherwise it will chose a peer from the service slot. +// If a peer cannot be found in the service slot, a peer will be selected from node peerstore +// if pubSubTopic is specified, peer is selected from list that support the pubSubTopic +func (pm *PeerManager) SelectRandomPeer(criteria PeerSelectionCriteria) (peer.ID, error) { + // @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service. + // Ideally depending on the query and our set of peers we take a subset of ideal peers. + // This will require us to check for various factors such as: + // - which topics they track + // - latency? + + peerID, err := pm.selectServicePeer(criteria.Proto, criteria.PubsubTopic, criteria.Ctx, criteria.SpecificPeers...) + if err == nil { + return peerID, nil + } else if !errors.Is(err, ErrNoPeersAvailable) { + pm.logger.Debug("could not retrieve random peer from slot", zap.String("protocol", string(criteria.Proto)), + zap.String("pubsubTopic", criteria.PubsubTopic), zap.Error(err)) + return "", err + } + + // if not found in serviceSlots or proto == WakuRelayIDv200 + filteredPeers, err := pm.FilterPeersByProto(criteria.SpecificPeers, criteria.Proto) + if err != nil { + return "", err + } + if criteria.PubsubTopic != "" { + filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(criteria.PubsubTopic, filteredPeers...) + } + return selectRandomPeer(filteredPeers, pm.logger) +} + +func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubsubTopic string, ctx context.Context, specificPeers ...peer.ID) (peer.ID, error) { + var peerID peer.ID + var err error + for retryCnt := 0; retryCnt < 1; retryCnt++ { + //Try to fetch from serviceSlot + if slot := pm.serviceSlots.getPeers(proto); slot != nil { + if pubsubTopic == "" { + return slot.getRandom() + } else { //PubsubTopic based selection + keys := make([]peer.ID, 0, len(slot.m)) + for i := range slot.m { + keys = append(keys, i) + } + selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(pubsubTopic, keys...) + peerID, err = selectRandomPeer(selectedPeers, pm.logger) + if err == nil { + return peerID, nil + } else { + pm.logger.Debug("Discovering peers by pubsubTopic", zap.String("pubsubTopic", pubsubTopic)) + //Trigger on-demand discovery for this topic and connect to peer immediately. + //For now discover atleast 1 peer for the criteria + pm.discoverPeersByPubsubTopic(pubsubTopic, proto, ctx, 1) + //Try to fetch peers again. + continue + } + } + } + } + if peerID == "" { + pm.logger.Debug("could not retrieve random peer from slot", zap.Error(err)) + } + return "", ErrNoPeersAvailable +} + +// PeerSelectionCriteria is the selection Criteria that is used by PeerManager to select peers. +type PeerSelectionCriteria struct { + SelectionType PeerSelection + Proto protocol.ID + PubsubTopic string + SpecificPeers peer.IDSlice + Ctx context.Context +} + +// SelectPeer selects a peer based on selectionType specified. +// Context is required only in case of selectionType set to LowestRTT +func (pm *PeerManager) SelectPeer(criteria PeerSelectionCriteria) (peer.ID, error) { + + switch criteria.SelectionType { + case Automatic: + return pm.SelectRandomPeer(criteria) + case LowestRTT: + return pm.SelectPeerWithLowestRTT(criteria) + default: + return "", errors.New("unknown peer selection type specified") + } +} + +type pingResult struct { + p peer.ID + rtt time.Duration +} + +// SelectPeerWithLowestRTT will select a peer that supports a specific protocol with the lowest reply time +// If a list of specific peers is passed, the peer will be chosen from that list assuming +// it supports the chosen protocol, otherwise it will chose a peer from the node peerstore +// TO OPTIMIZE: As of now the peer with lowest RTT is identified when select is called, this should be optimized +// to maintain the RTT as part of peer-scoring and just select based on that. +func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) (peer.ID, error) { + var peers peer.IDSlice + var err error + if criteria.Ctx == nil { + pm.logger.Warn("context is not passed for peerSelectionwithRTT, using background context") + criteria.Ctx = context.Background() + } + + if criteria.PubsubTopic != "" { + peers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(criteria.PubsubTopic, criteria.SpecificPeers...) + } + + peers, err = pm.FilterPeersByProto(peers, criteria.Proto) + if err != nil { + return "", err + } + wg := sync.WaitGroup{} + waitCh := make(chan struct{}) + pingCh := make(chan pingResult, 1000) + + wg.Add(len(peers)) + + go func() { + for _, p := range peers { + go func(p peer.ID) { + defer wg.Done() + ctx, cancel := context.WithTimeout(criteria.Ctx, 3*time.Second) + defer cancel() + result := <-ping.Ping(ctx, pm.host, p) + if result.Error == nil { + pingCh <- pingResult{ + p: p, + rtt: result.RTT, + } + } else { + pm.logger.Debug("could not ping", logging.HostID("peer", p), zap.Error(result.Error)) + } + }(p) + } + wg.Wait() + close(waitCh) + close(pingCh) + }() + + select { + case <-waitCh: + var min *pingResult + for p := range pingCh { + if min == nil { + min = &p + } else { + if p.rtt < min.rtt { + min = &p + } + } + } + if min == nil { + return "", ErrNoPeersAvailable + } + + return min.p, nil + case <-criteria.Ctx.Done(): + return "", ErrNoPeersAvailable + } +} + +// selectRandomPeer selects randomly a peer from the list of peers passed. +func selectRandomPeer(peers peer.IDSlice, log *zap.Logger) (peer.ID, error) { + if len(peers) >= 1 { + peerID := peers[rand.Intn(len(peers))] + // TODO: proper heuristic here that compares peer scores and selects "best" one. For now a random peer for the given protocol is returned + return peerID, nil // nolint: gosec + } + + return "", ErrNoPeersAvailable +} + +// FilterPeersByProto filters list of peers that support specified protocols. +// If specificPeers is nil, all peers in the host's peerStore are considered for filtering. +func (pm *PeerManager) FilterPeersByProto(specificPeers peer.IDSlice, proto ...protocol.ID) (peer.IDSlice, error) { + peerSet := specificPeers + if len(peerSet) == 0 { + peerSet = pm.host.Peerstore().Peers() + } + + var peers peer.IDSlice + for _, peer := range peerSet { + protocols, err := pm.host.Peerstore().SupportsProtocols(peer, proto...) + if err != nil { + return nil, err + } + + if len(protocols) > 0 { + peers = append(peers, peer) + } + } + return peers, nil +} diff --git a/waku/v2/protocol/enr/enr.go b/waku/v2/protocol/enr/enr.go index 8e330a96..7f8d9e56 100644 --- a/waku/v2/protocol/enr/enr.go +++ b/waku/v2/protocol/enr/enr.go @@ -146,6 +146,8 @@ func EnodeToPeerInfo(node *enode.Node) (*peer.AddrInfo, error) { if err != nil { return nil, err } - + if len(res) == 0 { + return nil, errors.New("could not retrieve peer addresses from enr") + } return &res[0], nil } diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 1dfb6e72..eaefeafa 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -23,6 +23,7 @@ import ( wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/subscription" + "github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/timesource" "go.uber.org/zap" "golang.org/x/exp/slices" @@ -37,7 +38,7 @@ var ( ) type WakuFilterLightNode struct { - *protocol.CommonService + *service.CommonService h host.Host broadcaster relay.Broadcaster //TODO: Move the broadcast functionality outside of relay client to a higher SDK layer.s timesource timesource.Timesource @@ -79,7 +80,7 @@ func NewWakuFilterLightNode(broadcaster relay.Broadcaster, pm *peermanager.PeerM wf.broadcaster = broadcaster wf.timesource = timesource wf.pm = pm - wf.CommonService = protocol.NewCommonService() + wf.CommonService = service.NewCommonService() wf.metrics = newMetrics(reg) return wf diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index 334e9f8b..2ff583af 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -19,6 +19,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/subscription" + "github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" @@ -408,7 +409,7 @@ func (s *FilterTestSuite) TestRunningGuard() { _, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID())) - s.Require().ErrorIs(err, protocol.ErrNotStarted) + s.Require().ErrorIs(err, service.ErrNotStarted) err = s.lightNode.Start(s.ctx) s.Require().NoError(err) @@ -454,7 +455,7 @@ func (s *FilterTestSuite) TestStartStop() { startNode := func() { for i := 0; i < 100; i++ { err := s.lightNode.Start(context.Background()) - if errors.Is(err, protocol.ErrAlreadyStarted) { + if errors.Is(err, service.ErrAlreadyStarted) { continue } s.Require().NoError(err) diff --git a/waku/v2/protocol/filter/options.go b/waku/v2/protocol/filter/options.go index f7f46e92..9495ee35 100644 --- a/waku/v2/protocol/filter/options.go +++ b/waku/v2/protocol/filter/options.go @@ -51,6 +51,7 @@ type ( FilterParameters struct { Timeout time.Duration MaxSubscribers int + pm *peermanager.PeerManager } Option func(*FilterParameters) @@ -156,6 +157,12 @@ func WithMaxSubscribers(maxSubscribers int) Option { } } +func WithPeerManager(pm *peermanager.PeerManager) Option { + return func(params *FilterParameters) { + params.pm = pm + } +} + func DefaultOptions() []Option { return []Option{ WithTimeout(24 * time.Hour), diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index bed0503d..2352a54c 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -17,6 +17,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/timesource" "go.uber.org/zap" ) @@ -24,7 +25,7 @@ import ( // FilterSubscribeID_v20beta1 is the current Waku Filter protocol identifier for servers to // allow filter clients to subscribe, modify, refresh and unsubscribe a desired set of filter criteria const FilterSubscribeID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter-subscribe/2.0.0-beta1") - +const FilterSubscribeENRField = uint8(1 << 2) const peerHasNoSubscription = "peer has no subscriptions" type ( @@ -33,7 +34,7 @@ type ( msgSub *relay.Subscription metrics Metrics log *zap.Logger - *protocol.CommonService + *service.CommonService subscriptions *SubscribersMap maxSubscriptions int @@ -52,11 +53,13 @@ func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Regi opt(params) } - wf.CommonService = protocol.NewCommonService() + wf.CommonService = service.NewCommonService() wf.metrics = newMetrics(reg) wf.subscriptions = NewSubscribersMap(params.Timeout) wf.maxSubscriptions = params.MaxSubscribers - + if params.pm != nil { + params.pm.RegisterWakuProtocol(FilterSubscribeID_v20beta1, FilterSubscribeENRField) + } return wf } diff --git a/waku/v2/protocol/legacy_filter/waku_filter.go b/waku/v2/protocol/legacy_filter/waku_filter.go index 1e5d92d5..cb8aa6fe 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter.go +++ b/waku/v2/protocol/legacy_filter/waku_filter.go @@ -18,6 +18,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/timesource" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -47,7 +48,7 @@ type ( } WakuFilter struct { - *protocol.CommonService + *service.CommonService h host.Host pm *peermanager.PeerManager isFullNode bool @@ -76,7 +77,7 @@ func NewWakuFilter(broadcaster relay.Broadcaster, isFullNode bool, timesource ti } wf.isFullNode = isFullNode - wf.CommonService = protocol.NewCommonService() + wf.CommonService = service.NewCommonService() wf.filters = NewFilterMap(broadcaster, timesource) wf.subscribers = NewSubscribers(params.Timeout) wf.metrics = newMetrics(reg) diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 3439c17b..efd1ced4 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -23,6 +23,7 @@ import ( // LightPushID_v20beta1 is the current Waku LightPush protocol identifier const LightPushID_v20beta1 = libp2pProtocol.ID("/vac/waku/lightpush/2.0.0-beta1") +const LightPushENRField = uint8(1 << 3) var ( ErrNoPeersAvailable = errors.New("no suitable remote peers") @@ -49,6 +50,7 @@ func NewWakuLightPush(relay *relay.WakuRelay, pm *peermanager.PeerManager, reg p wakuLP.log = log.Named("lightpush") wakuLP.pm = pm wakuLP.metrics = newMetrics(reg) + return wakuLP } @@ -69,6 +71,9 @@ func (wakuLP *WakuLightPush) Start(ctx context.Context) error { wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest(ctx)) wakuLP.log.Info("Light Push protocol started") + if wakuLP.pm != nil { + wakuLP.pm.RegisterWakuProtocol(LightPushID_v20beta1, LightPushENRField) + } return nil } diff --git a/waku/v2/protocol/peer_exchange/client.go b/waku/v2/protocol/peer_exchange/client.go index f1ba7c39..111dc9fd 100644 --- a/waku/v2/protocol/peer_exchange/client.go +++ b/waku/v2/protocol/peer_exchange/client.go @@ -14,6 +14,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/peerstore" wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb" + "github.com/waku-org/go-waku/waku/v2/service" "go.uber.org/zap" ) @@ -124,11 +125,11 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb go func() { defer wakuPX.WaitGroup().Done() - peerCh := make(chan peermanager.PeerData) + peerCh := make(chan service.PeerData) defer close(peerCh) wakuPX.peerConnector.Subscribe(ctx, peerCh) for _, p := range discoveredPeers { - peer := peermanager.PeerData{ + peer := service.PeerData{ Origin: peerstore.PeerExchange, AddrInfo: p.addrInfo, ENR: p.enr, diff --git a/waku/v2/protocol/peer_exchange/protocol.go b/waku/v2/protocol/peer_exchange/protocol.go index 8230abaa..5b1834f1 100644 --- a/waku/v2/protocol/peer_exchange/protocol.go +++ b/waku/v2/protocol/peer_exchange/protocol.go @@ -18,6 +18,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb" + "github.com/waku-org/go-waku/waku/v2/service" "go.uber.org/zap" ) @@ -32,7 +33,7 @@ var ( // PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol type PeerConnector interface { - Subscribe(context.Context, <-chan peermanager.PeerData) + Subscribe(context.Context, <-chan service.PeerData) } type WakuPeerExchange struct { @@ -42,7 +43,7 @@ type WakuPeerExchange struct { metrics Metrics log *zap.Logger - *protocol.CommonService + *service.CommonService peerConnector PeerConnector enrCache *enrCache @@ -63,7 +64,7 @@ func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, wakuPX.enrCache = newEnrCache wakuPX.peerConnector = peerConnector wakuPX.pm = pm - wakuPX.CommonService = protocol.NewCommonService() + wakuPX.CommonService = service.NewCommonService() return wakuPX, nil } diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go index ca64b55f..5d2fbaa1 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go @@ -2,131 +2,46 @@ package peer_exchange import ( "context" - "crypto/ecdsa" - "fmt" - "math" - "net" - "strconv" "testing" "time" - gcrypto "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/p2p/enr" - "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peerstore" - "github.com/multiformats/go-multiaddr" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/discv5" - "github.com/waku-org/go-waku/waku/v2/peermanager" wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/utils" - "go.uber.org/zap" - - libp2pcrypto "github.com/libp2p/go-libp2p/core/crypto" ) -func createHost(t *testing.T) (host.Host, int, *ecdsa.PrivateKey) { - privKey, err := gcrypto.GenerateKey() - require.NoError(t, err) - - sPrivKey := libp2pcrypto.PrivKey(utils.EcdsaPrivKeyToSecp256k1PrivKey(privKey)) - - port, err := tests.FindFreePort(t, "127.0.0.1", 3) - require.NoError(t, err) - - sourceMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)) - require.NoError(t, err) - - host, err := libp2p.New( - libp2p.ListenAddrs(sourceMultiAddr), - libp2p.Identity(sPrivKey), - ) - require.NoError(t, err) - - return host, port, privKey -} - -func extractIP(addr multiaddr.Multiaddr) (*net.TCPAddr, error) { - ipStr, err := addr.ValueForProtocol(multiaddr.P_IP4) - if err != nil { - return nil, err - } - - portStr, err := addr.ValueForProtocol(multiaddr.P_TCP) - if err != nil { - return nil, err - } - port, err := strconv.Atoi(portStr) - if err != nil { - return nil, err - } - return &net.TCPAddr{ - IP: net.ParseIP(ipStr), - Port: port, - }, nil -} - -func newLocalnode(priv *ecdsa.PrivateKey, ipAddr *net.TCPAddr, udpPort int, wakuFlags wenr.WakuEnrBitfield, advertiseAddr *net.IP, log *zap.Logger) (*enode.LocalNode, error) { - db, err := enode.OpenDB("") - if err != nil { - return nil, err - } - localnode := enode.NewLocalNode(db, priv) - localnode.SetFallbackUDP(udpPort) - localnode.Set(enr.WithEntry(wenr.WakuENRField, wakuFlags)) - localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) - localnode.SetStaticIP(ipAddr.IP) - - if udpPort > 0 && udpPort <= math.MaxUint16 { - localnode.Set(enr.UDP(uint16(udpPort))) // lgtm [go/incorrect-integer-conversion] - } else { - log.Error("setting udpPort", zap.Int("port", udpPort)) - } - - if ipAddr.Port > 0 && ipAddr.Port <= math.MaxUint16 { - localnode.Set(enr.TCP(uint16(ipAddr.Port))) // lgtm [go/incorrect-integer-conversion] - } else { - log.Error("setting tcpPort", zap.Int("port", ipAddr.Port)) - } - - if advertiseAddr != nil { - localnode.SetStaticIP(*advertiseAddr) - } - - return localnode, nil -} - func TestRetrieveProvidePeerExchangePeers(t *testing.T) { // H1 - host1, _, prvKey1 := createHost(t) + host1, _, prvKey1 := tests.CreateHost(t) udpPort1, err := tests.FindFreePort(t, "127.0.0.1", 3) require.NoError(t, err) - ip1, _ := extractIP(host1.Addrs()[0]) - l1, err := newLocalnode(prvKey1, ip1, udpPort1, wenr.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger()) + ip1, _ := tests.ExtractIP(host1.Addrs()[0]) + l1, err := tests.NewLocalnode(prvKey1, ip1, udpPort1, wenr.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger()) require.NoError(t, err) - discv5PeerConn1 := peermanager.NewTestPeerDiscoverer() + discv5PeerConn1 := discv5.NewTestPeerDiscoverer() d1, err := discv5.NewDiscoveryV5(prvKey1, l1, discv5PeerConn1, prometheus.DefaultRegisterer, utils.Logger(), discv5.WithUDPPort(uint(udpPort1))) require.NoError(t, err) d1.SetHost(host1) // H2 - host2, _, prvKey2 := createHost(t) - ip2, _ := extractIP(host2.Addrs()[0]) + host2, _, prvKey2 := tests.CreateHost(t) + ip2, _ := tests.ExtractIP(host2.Addrs()[0]) udpPort2, err := tests.FindFreePort(t, "127.0.0.1", 3) require.NoError(t, err) - l2, err := newLocalnode(prvKey2, ip2, udpPort2, wenr.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger()) + l2, err := tests.NewLocalnode(prvKey2, ip2, udpPort2, wenr.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger()) require.NoError(t, err) - discv5PeerConn2 := peermanager.NewTestPeerDiscoverer() + discv5PeerConn2 := discv5.NewTestPeerDiscoverer() d2, err := discv5.NewDiscoveryV5(prvKey2, l2, discv5PeerConn2, prometheus.DefaultRegisterer, utils.Logger(), discv5.WithUDPPort(uint(udpPort2)), discv5.WithBootnodes([]*enode.Node{d1.Node()})) require.NoError(t, err) d2.SetHost(host2) // H3 - host3, _, _ := createHost(t) + host3, _, _ := tests.CreateHost(t) defer d1.Stop() defer d2.Stop() @@ -143,12 +58,12 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) { time.Sleep(3 * time.Second) // Wait some time for peers to be discovered // mount peer exchange - pxPeerConn1 := peermanager.NewTestPeerDiscoverer() + pxPeerConn1 := discv5.NewTestPeerDiscoverer() px1, err := NewWakuPeerExchange(d1, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger()) require.NoError(t, err) px1.SetHost(host1) - pxPeerConn3 := peermanager.NewTestPeerDiscoverer() + pxPeerConn3 := discv5.NewTestPeerDiscoverer() px3, err := NewWakuPeerExchange(nil, pxPeerConn3, nil, prometheus.DefaultRegisterer, utils.Logger()) require.NoError(t, err) px3.SetHost(host3) diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index c195ae07..d9047d8b 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -18,11 +18,13 @@ import ( "github.com/waku-org/go-waku/logging" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/timesource" ) // WakuRelayID_v200 is the current protocol ID used for WakuRelay const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0") +const WakuRelayENRField = uint8(1 << 0) // DefaultWakuTopic is the default pubsub topic used across all Waku protocols var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic{}.String() @@ -62,7 +64,7 @@ type WakuRelay struct { EvtPeerTopic event.Emitter } contentSubs map[string]map[int]*Subscription - *waku_proto.CommonService + *service.CommonService } // NewWakuRelay returns a new instance of a WakuRelay struct @@ -76,7 +78,7 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou w.topicValidators = make(map[string][]validatorFn) w.bcaster = bcaster w.minPeersToPublish = minPeersToPublish - w.CommonService = waku_proto.NewCommonService() + w.CommonService = service.NewCommonService() w.log = log.Named("relay") w.events = eventbus.NewBus() w.metrics = newMetrics(reg, w.log) diff --git a/waku/v2/protocol/store/waku_store_common.go b/waku/v2/protocol/store/waku_store_common.go index 33d7a6a4..1781d3e3 100644 --- a/waku/v2/protocol/store/waku_store_common.go +++ b/waku/v2/protocol/store/waku_store_common.go @@ -16,6 +16,7 @@ import ( // StoreID_v20beta4 is the current Waku Store protocol identifier const StoreID_v20beta4 = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta4") +const StoreENRField = uint8(1 << 1) // MaxPageSize is the maximum number of waku messages to return per page const MaxPageSize = 20 @@ -64,5 +65,8 @@ func NewWakuStore(p MessageProvider, pm *peermanager.PeerManager, timesource tim wakuStore.pm = pm wakuStore.metrics = newMetrics(reg) + if pm != nil { + pm.RegisterWakuProtocol(StoreID_v20beta4, StoreENRField) + } return wakuStore } diff --git a/waku/v2/rendezvous/rendezvous.go b/waku/v2/rendezvous/rendezvous.go index db48faec..a53a74a4 100644 --- a/waku/v2/rendezvous/rendezvous.go +++ b/waku/v2/rendezvous/rendezvous.go @@ -8,9 +8,9 @@ import ( "github.com/libp2p/go-libp2p/core/host" rvs "github.com/waku-org/go-libp2p-rendezvous" - "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/service" "go.uber.org/zap" ) @@ -31,12 +31,12 @@ type Rendezvous struct { peerConnector PeerConnector log *zap.Logger - *peermanager.CommonDiscoveryService + *service.CommonDiscoveryService } // PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol type PeerConnector interface { - Subscribe(context.Context, <-chan peermanager.PeerData) + Subscribe(context.Context, <-chan service.PeerData) } // NewRendezvous creates an instance of Rendezvous struct @@ -46,7 +46,7 @@ func NewRendezvous(db *DB, peerConnector PeerConnector, log *zap.Logger) *Rendez db: db, peerConnector: peerConnector, log: logger, - CommonDiscoveryService: peermanager.NewCommonDiscoveryService(), + CommonDiscoveryService: service.NewCommonDiscoveryService(), } } @@ -104,7 +104,7 @@ func (r *Rendezvous) DiscoverWithNamespace(ctx context.Context, namespace string rp.SetSuccess(cookie) for _, p := range addrInfo { - peer := peermanager.PeerData{ + peer := service.PeerData{ Origin: peerstore.Rendezvous, AddrInfo: p, PubSubTopics: []string{namespace}, diff --git a/waku/v2/rendezvous/rendezvous_test.go b/waku/v2/rendezvous/rendezvous_test.go index a2b1d881..3aeb6685 100644 --- a/waku/v2/rendezvous/rendezvous_test.go +++ b/waku/v2/rendezvous/rendezvous_test.go @@ -14,16 +14,16 @@ import ( "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/persistence/sqlite" - "github.com/waku-org/go-waku/waku/v2/peermanager" + "github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/utils" ) type PeerConn struct { sync.RWMutex - ch <-chan peermanager.PeerData + ch <-chan service.PeerData } -func (p *PeerConn) Subscribe(ctx context.Context, ch <-chan peermanager.PeerData) { +func (p *PeerConn) Subscribe(ctx context.Context, ch <-chan service.PeerData) { p.Lock() p.ch = ch p.Unlock() diff --git a/waku/v2/peermanager/common_discovery_service.go b/waku/v2/service/common_discovery_service.go similarity index 93% rename from waku/v2/peermanager/common_discovery_service.go rename to waku/v2/service/common_discovery_service.go index 0fae5fb5..72bf96f1 100644 --- a/waku/v2/peermanager/common_discovery_service.go +++ b/waku/v2/service/common_discovery_service.go @@ -1,4 +1,4 @@ -package peermanager +package service import ( "context" @@ -7,7 +7,6 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/libp2p/go-libp2p/core/peer" wps "github.com/waku-org/go-waku/waku/v2/peerstore" - "github.com/waku-org/go-waku/waku/v2/protocol" ) // PeerData contains information about a peer useful in establishing connections with it. @@ -19,13 +18,13 @@ type PeerData struct { } type CommonDiscoveryService struct { - commonService *protocol.CommonService + commonService *CommonService channel chan PeerData } func NewCommonDiscoveryService() *CommonDiscoveryService { return &CommonDiscoveryService{ - commonService: protocol.NewCommonService(), + commonService: NewCommonService(), } } diff --git a/waku/v2/protocol/common_service.go b/waku/v2/service/common_service.go similarity index 99% rename from waku/v2/protocol/common_service.go rename to waku/v2/service/common_service.go index 65746961..9bf3ea12 100644 --- a/waku/v2/protocol/common_service.go +++ b/waku/v2/service/common_service.go @@ -1,4 +1,4 @@ -package protocol +package service import ( "context" diff --git a/waku/v2/protocol/common_service_test.go b/waku/v2/service/common_service_test.go similarity index 96% rename from waku/v2/protocol/common_service_test.go rename to waku/v2/service/common_service_test.go index cd707e11..db81043b 100644 --- a/waku/v2/protocol/common_service_test.go +++ b/waku/v2/service/common_service_test.go @@ -1,4 +1,4 @@ -package protocol +package service import ( "context"