diff --git a/Makefile b/Makefile index e68348b1..8d64a429 100644 --- a/Makefile +++ b/Makefile @@ -77,10 +77,10 @@ lint-full: @golangci-lint run ./... --config=./.golangci.full.yaml --deadline=5m test-with-race: - ${GOBIN} test -race -timeout 300s ./waku/... + ${GOBIN} test -race -timeout 300s ./waku/... ./cmd/waku/server/... test: - ${GOBIN} test -timeout 300s ./waku/... -coverprofile=${GO_TEST_OUTFILE}.tmp -coverpkg ./... + ${GOBIN} test -timeout 300s ./waku/... ./cmd/waku/server/... -coverprofile=${GO_TEST_OUTFILE}.tmp -coverpkg ./... cat ${GO_TEST_OUTFILE}.tmp | grep -v ".pb.go" > ${GO_TEST_OUTFILE} ${GOBIN} tool cover -html=${GO_TEST_OUTFILE} -o ${GO_HTML_COV} diff --git a/cmd/waku/node.go b/cmd/waku/node.go index 2129a1e5..56e3f83a 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -444,7 +444,14 @@ func Execute(options NodeOptions) error { var restServer *rest.WakuRest if options.RESTServer.Enable { wg.Add(1) - restServer = rest.NewWakuRest(wakuNode, options.RESTServer.Address, options.RESTServer.Port, options.PProf, options.RESTServer.Admin, options.RESTServer.RelayCacheCapacity, options.RESTServer.FilterCacheCapacity, logger) + restConfig := rest.RestConfig{Address: options.RESTServer.Address, + Port: uint(options.RESTServer.Port), + EnablePProf: options.PProf, + EnableAdmin: options.RESTServer.Admin, + RelayCacheCapacity: uint(options.RESTServer.RelayCacheCapacity), + FilterCacheCapacity: uint(options.RESTServer.FilterCacheCapacity)} + + restServer = rest.NewWakuRest(wakuNode, restConfig, logger) restServer.Start(ctx, &wg) } diff --git a/cmd/waku/server/rest/filter.go b/cmd/waku/server/rest/filter.go index d6719e03..02e1e681 100644 --- a/cmd/waku/server/rest/filter.go +++ b/cmd/waku/server/rest/filter.go @@ -240,11 +240,13 @@ func (s *FilterService) unsubscribeGetMessage(result *filter.WakuFilterPushResul var peerIds string ind := 0 for _, entry := range result.Errors() { - s.log.Error("can't unsubscribe for ", zap.String("peer", entry.PeerID.String()), zap.Error(entry.Err)) - if ind != 0 { - peerIds += ", " + if entry.Err != nil { + s.log.Error("can't unsubscribe for ", zap.String("peer", entry.PeerID.String()), zap.Error(entry.Err)) + if ind != 0 { + peerIds += ", " + } + peerIds += entry.PeerID.String() } - peerIds += entry.PeerID.String() ind++ } if peerIds != "" { diff --git a/cmd/waku/server/rest/filter_test.go b/cmd/waku/server/rest/filter_test.go index 52a5af3a..09d873e2 100644 --- a/cmd/waku/server/rest/filter_test.go +++ b/cmd/waku/server/rest/filter_test.go @@ -365,7 +365,7 @@ func TestFilterGetMessages(t *testing.T) { router.ServeHTTP(rr, req) require.Equal(t, http.StatusNotFound, rr.Code) require.Equal(t, - fmt.Sprintf("Not subscribed to pubsubTopic:%s contentTopic: %s", notSubscibredPubsubTopic, contentTopic), + fmt.Sprintf("not subscribed to pubsubTopic:%s contentTopic: %s", notSubscibredPubsubTopic, contentTopic), rr.Body.String(), ) } diff --git a/cmd/waku/server/rest/lightpush_rest.go b/cmd/waku/server/rest/lightpush_rest.go index 8500df4a..e33f4b7a 100644 --- a/cmd/waku/server/rest/lightpush_rest.go +++ b/cmd/waku/server/rest/lightpush_rest.go @@ -71,6 +71,6 @@ func (serv *LightpushService) postMessagev1(w http.ResponseWriter, req *http.Req _, err = w.Write([]byte(err.Error())) serv.log.Error("writing response", zap.Error(err)) } else { - w.WriteHeader(http.StatusOK) + writeErrOrResponse(w, err, true) } } diff --git a/cmd/waku/server/rest/relay.go b/cmd/waku/server/rest/relay.go index 968ed338..d80e0073 100644 --- a/cmd/waku/server/rest/relay.go +++ b/cmd/waku/server/rest/relay.go @@ -1,13 +1,9 @@ package rest import ( - "context" "encoding/json" - "errors" "net/http" - "net/url" "strings" - "sync" "github.com/go-chi/chi/v5" "github.com/waku-org/go-waku/cmd/waku/server" @@ -26,29 +22,21 @@ const routeRelayV1AutoMessages = "/relay/v1/auto/messages" // RelayService represents the REST service for WakuRelay type RelayService struct { - node *node.WakuNode - cancel context.CancelFunc + node *node.WakuNode log *zap.Logger - messages map[string][]*pb.WakuMessage - cacheCapacity int - messagesMutex sync.RWMutex - - runner *runnerService + cacheCapacity uint } // NewRelayService returns an instance of RelayService -func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *zap.Logger) *RelayService { +func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity uint, log *zap.Logger) *RelayService { s := &RelayService{ node: node, log: log.Named("relay"), cacheCapacity: cacheCapacity, - messages: make(map[string][]*pb.WakuMessage), } - s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope) - m.Post(routeRelayV1Subscriptions, s.postV1Subscriptions) m.Delete(routeRelayV1Subscriptions, s.deleteV1Subscriptions) m.Get(routeRelayV1Messages, s.getV1Messages) @@ -65,46 +53,6 @@ func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *za return s } -func (r *RelayService) addEnvelope(envelope *protocol.Envelope) { - r.messagesMutex.Lock() - defer r.messagesMutex.Unlock() - - if _, ok := r.messages[envelope.PubsubTopic()]; !ok { - return - } - - // Keep a specific max number of messages per topic - if len(r.messages[envelope.PubsubTopic()]) >= r.cacheCapacity { - r.messages[envelope.PubsubTopic()] = r.messages[envelope.PubsubTopic()][1:] - } - - r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message()) -} - -// Start starts the RelayService -func (r *RelayService) Start(ctx context.Context) { - ctx, cancel := context.WithCancel(ctx) - r.cancel = cancel - - r.messagesMutex.Lock() - // Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these - for _, topic := range r.node.Relay().Topics() { - r.log.Info("adding topic handler for existing subscription", zap.String("topic", topic)) - r.messages[topic] = []*pb.WakuMessage{} - } - r.messagesMutex.Unlock() - - r.runner.Start(ctx) -} - -// Stop stops the RelayService -func (r *RelayService) Stop() { - if r.cancel == nil { - return - } - r.cancel() -} - func (r *RelayService) deleteV1Subscriptions(w http.ResponseWriter, req *http.Request) { var topics []string decoder := json.NewDecoder(req.Body) @@ -114,16 +62,11 @@ func (r *RelayService) deleteV1Subscriptions(w http.ResponseWriter, req *http.Re } defer req.Body.Close() - r.messagesMutex.Lock() - defer r.messagesMutex.Unlock() - var err error for _, topic := range topics { err = r.node.Relay().Unsubscribe(req.Context(), protocol.NewContentFilter(topic)) if err != nil { r.log.Error("unsubscribing from topic", zap.String("topic", strings.Replace(strings.Replace(topic, "\n", "", -1), "\r", "", -1)), zap.Error(err)) - } else { - delete(r.messages, topic) } } @@ -140,26 +83,29 @@ func (r *RelayService) postV1Subscriptions(w http.ResponseWriter, req *http.Requ defer req.Body.Close() var err error - var sub *relay.Subscription - var subs []*relay.Subscription + var successCnt int var topicToSubscribe string for _, topic := range topics { if topic == "" { - subs, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(relay.DefaultWakuTopic)) topicToSubscribe = relay.DefaultWakuTopic } else { - subs, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topic)) topicToSubscribe = topic } + _, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topicToSubscribe), relay.WithCacheSize(r.cacheCapacity)) + if err != nil { r.log.Error("subscribing to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err)) - } else { - sub = subs[0] - sub.Unsubscribe() - r.messagesMutex.Lock() - r.messages[topic] = []*pb.WakuMessage{} - r.messagesMutex.Unlock() + continue } + successCnt++ + } + + // on partial subscribe failure + if successCnt > 0 && err != nil { + r.log.Error("partial subscribe failed", zap.Error(err)) + // on partial failure + writeResponse(w, err, http.StatusOK) + return } writeErrOrResponse(w, err, true) @@ -170,20 +116,22 @@ func (r *RelayService) getV1Messages(w http.ResponseWriter, req *http.Request) { if topic == "" { return } - - r.messagesMutex.Lock() - defer r.messagesMutex.Unlock() - - if _, ok := r.messages[topic]; !ok { + //TODO: Update the API to also take a contentTopic since relay now supports filtering based on contentTopic as well. + sub, err := r.node.Relay().GetSubscriptionWithPubsubTopic(topic, "") + if err != nil { w.WriteHeader(http.StatusNotFound) - _, err := w.Write([]byte("not subscribed to topic")) + _, err = w.Write([]byte("not subscribed to topic")) r.log.Error("writing response", zap.Error(err)) return } + var response []*pb.WakuMessage + select { + case msg := <-sub.Ch: + response = append(response, msg.Message()) + default: + break + } - response := r.messages[topic] - - r.messages[topic] = []*pb.WakuMessage{} writeErrOrResponse(w, nil, response) } @@ -205,11 +153,6 @@ func (r *RelayService) postV1Message(w http.ResponseWriter, req *http.Request) { topic = relay.DefaultWakuTopic } - if !r.node.Relay().IsSubscribed(topic) { - writeErrOrResponse(w, errors.New("not subscribed to pubsubTopic"), nil) - return - } - if err := server.AppendRLNProof(r.node, message); err != nil { writeErrOrResponse(w, err, nil) return @@ -250,7 +193,7 @@ func (r *RelayService) postV1AutoSubscriptions(w http.ResponseWriter, req *http. defer req.Body.Close() var err error - _, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", cTopics...)) + _, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", cTopics...), relay.WithCacheSize(r.cacheCapacity)) if err != nil { r.log.Error("subscribing to topics", zap.Strings("contentTopics", cTopics), zap.Error(err)) } @@ -260,27 +203,14 @@ func (r *RelayService) postV1AutoSubscriptions(w http.ResponseWriter, req *http. _, err := w.Write([]byte(err.Error())) r.log.Error("writing response", zap.Error(err)) } else { - w.WriteHeader(http.StatusOK) + writeErrOrResponse(w, err, true) } } func (r *RelayService) getV1AutoMessages(w http.ResponseWriter, req *http.Request) { - cTopic := chi.URLParam(req, "contentTopic") - if cTopic == "" { - w.WriteHeader(http.StatusBadRequest) - _, err := w.Write([]byte("contentTopic is required")) - r.log.Error("writing response", zap.Error(err)) - return - } - cTopic, err := url.QueryUnescape(cTopic) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - _, err = w.Write([]byte("invalid contentTopic format")) - r.log.Error("writing response", zap.Error(err)) - return - } + cTopic := topicFromPath(w, req, "contentTopic", r.log) sub, err := r.node.Relay().GetSubscription(cTopic) if err != nil { w.WriteHeader(http.StatusNotFound) diff --git a/cmd/waku/server/rest/relay_test.go b/cmd/waku/server/rest/relay_test.go index 3fe994b9..7f10c930 100644 --- a/cmd/waku/server/rest/relay_test.go +++ b/cmd/waku/server/rest/relay_test.go @@ -7,6 +7,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "net/url" "testing" "time" @@ -15,8 +16,8 @@ import ( "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/utils" ) @@ -34,7 +35,6 @@ func TestPostV1Message(t *testing.T) { router := chi.NewRouter() _ = makeRelayService(t, router) - msg := &pb.WakuMessage{ Payload: []byte{1, 2, 3}, ContentTopic: "abc", @@ -54,10 +54,7 @@ func TestPostV1Message(t *testing.T) { func TestRelaySubscription(t *testing.T) { router := chi.NewRouter() - d := makeRelayService(t, router) - - go d.Start(context.Background()) - defer d.Stop() + r := makeRelayService(t, router) // Wait for node to start time.Sleep(500 * time.Millisecond) @@ -67,48 +64,42 @@ func TestRelaySubscription(t *testing.T) { require.NoError(t, err) rr := httptest.NewRecorder() - req, _ := http.NewRequest(http.MethodPost, "/relay/v1/subscriptions", bytes.NewReader(topicsJSONBytes)) + req, _ := http.NewRequest(http.MethodPost, routeRelayV1Subscriptions, bytes.NewReader(topicsJSONBytes)) router.ServeHTTP(rr, req) require.Equal(t, http.StatusOK, rr.Code) require.Equal(t, "true", rr.Body.String()) // Test max messages in subscription now := utils.GetUnixEpoch() - d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+1), now, "test")) - d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+2), now, "test")) - d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+3), now, "test")) + _, err = r.node.Relay().Publish(context.Background(), + tests.CreateWakuMessage("test", now+1), relay.WithPubSubTopic("test")) + require.NoError(t, err) + _, err = r.node.Relay().Publish(context.Background(), + tests.CreateWakuMessage("test", now+2), relay.WithPubSubTopic("test")) + require.NoError(t, err) + + _, err = r.node.Relay().Publish(context.Background(), + tests.CreateWakuMessage("test", now+3), relay.WithPubSubTopic("test")) + require.NoError(t, err) // Wait for the messages to be processed - time.Sleep(500 * time.Millisecond) - - require.Len(t, d.messages["test"], 3) - - d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+4), now+4, "test")) - - time.Sleep(500 * time.Millisecond) - - // Should only have 3 messages - require.Len(t, d.messages["test"], 3) + time.Sleep(5 * time.Millisecond) // Test deletion rr = httptest.NewRecorder() - req, _ = http.NewRequest(http.MethodDelete, "/relay/v1/subscriptions", bytes.NewReader(topicsJSONBytes)) + req, _ = http.NewRequest(http.MethodDelete, routeRelayV1Subscriptions, bytes.NewReader(topicsJSONBytes)) router.ServeHTTP(rr, req) require.Equal(t, http.StatusOK, rr.Code) require.Equal(t, "true", rr.Body.String()) - require.Len(t, d.messages["test"], 0) - } func TestRelayGetV1Messages(t *testing.T) { router := chi.NewRouter() + router1 := chi.NewRouter() serviceA := makeRelayService(t, router) - go serviceA.Start(context.Background()) - defer serviceA.Stop() - serviceB := makeRelayService(t, router) - go serviceB.Start(context.Background()) - defer serviceB.Stop() + + serviceB := makeRelayService(t, router1) hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", serviceB.node.Host().ID().Pretty())) require.NoError(t, err) @@ -129,7 +120,7 @@ func TestRelayGetV1Messages(t *testing.T) { require.NoError(t, err) rr := httptest.NewRecorder() - req, _ := http.NewRequest(http.MethodPost, "/relay/v1/subscriptions", bytes.NewReader(topicsJSONBytes)) + req, _ := http.NewRequest(http.MethodPost, routeRelayV1Subscriptions, bytes.NewReader(topicsJSONBytes)) router.ServeHTTP(rr, req) require.Equal(t, http.StatusOK, rr.Code) @@ -165,9 +156,144 @@ func TestRelayGetV1Messages(t *testing.T) { rr = httptest.NewRecorder() req, _ = http.NewRequest(http.MethodGet, "/relay/v1/messages/test", bytes.NewReader([]byte{})) - router.ServeHTTP(rr, req) + router1.ServeHTTP(rr, req) + require.Equal(t, http.StatusNotFound, rr.Code) +} + +func TestPostAutoV1Message(t *testing.T) { + router := chi.NewRouter() + + _ = makeRelayService(t, router) + msg := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: "/toychat/1/huilong/proto", + Version: 0, + Timestamp: utils.GetUnixEpoch(), + } + msgJSONBytes, err := json.Marshal(msg) + require.NoError(t, err) + + rr := httptest.NewRecorder() + req, _ := http.NewRequest(http.MethodPost, routeRelayV1AutoMessages, bytes.NewReader(msgJSONBytes)) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) +} + +func TestRelayAutoSubUnsub(t *testing.T) { + router := chi.NewRouter() + + r := makeRelayService(t, router) + + // Wait for node to start + time.Sleep(500 * time.Millisecond) + + cTopic1 := "/toychat/1/huilong/proto" + + cTopics := []string{cTopic1} + topicsJSONBytes, err := json.Marshal(cTopics) + require.NoError(t, err) + + rr := httptest.NewRecorder() + req, _ := http.NewRequest(http.MethodPost, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes)) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) + require.Equal(t, "true", rr.Body.String()) + + // Test publishing messages after subscription + now := utils.GetUnixEpoch() + _, err = r.node.Relay().Publish(context.Background(), + tests.CreateWakuMessage(cTopic1, now+1)) + require.NoError(t, err) + + // Wait for the messages to be processed + time.Sleep(5 * time.Millisecond) + + // Test deletion + rr = httptest.NewRecorder() + req, _ = http.NewRequest(http.MethodDelete, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes)) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) + require.Equal(t, "true", rr.Body.String()) + + cTopics = append(cTopics, "test") + topicsJSONBytes, err = json.Marshal(cTopics) + require.NoError(t, err) + + rr = httptest.NewRecorder() + req, _ = http.NewRequest(http.MethodPost, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes)) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusBadRequest, rr.Code) + +} + +func TestRelayGetV1AutoMessages(t *testing.T) { + router := chi.NewRouter() + router1 := chi.NewRouter() + + serviceA := makeRelayService(t, router) + + serviceB := makeRelayService(t, router1) + + hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", serviceB.node.Host().ID().Pretty())) + require.NoError(t, err) + + var addr multiaddr.Multiaddr + for _, a := range serviceB.node.Host().Addrs() { + addr = a.Encapsulate(hostInfo) + break + } + err = serviceA.node.DialPeerWithMultiAddress(context.Background(), addr) + require.NoError(t, err) + + // Wait for the dial to complete + time.Sleep(1 * time.Second) + + cTopic1 := "/toychat/1/huilong/proto" + + cTopics := []string{cTopic1} + topicsJSONBytes, err := json.Marshal(cTopics) + require.NoError(t, err) + + rr := httptest.NewRecorder() + req, _ := http.NewRequest(http.MethodPost, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes)) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) + require.Equal(t, "true", rr.Body.String()) + + // Wait for the subscription to be started + time.Sleep(1 * time.Second) + + msg := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: cTopic1, + Version: 0, + Timestamp: utils.GetUnixEpoch(), + } + msgJsonBytes, err := json.Marshal(msg) + require.NoError(t, err) + + rr = httptest.NewRecorder() + req, _ = http.NewRequest(http.MethodPost, routeRelayV1AutoMessages, bytes.NewReader(msgJsonBytes)) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) + + // Wait for the message to be received + time.Sleep(1 * time.Second) + + rr = httptest.NewRecorder() + req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", routeRelayV1AutoMessages, url.QueryEscape(cTopic1)), bytes.NewReader([]byte{})) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) + + var messages []*pb.WakuMessage err = json.Unmarshal(rr.Body.Bytes(), &messages) require.NoError(t, err) - require.Len(t, messages, 0) + require.Len(t, messages, 1) + + rr = httptest.NewRecorder() + req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", routeRelayV1AutoMessages, url.QueryEscape(cTopic1)), bytes.NewReader([]byte{})) + router1.ServeHTTP(rr, req) + require.Equal(t, http.StatusNotFound, rr.Code) + } diff --git a/cmd/waku/server/rest/waku_rest.go b/cmd/waku/server/rest/waku_rest.go index 98c5437c..3126f37e 100644 --- a/cmd/waku/server/rest/waku_rest.go +++ b/cmd/waku/server/rest/waku_rest.go @@ -22,7 +22,16 @@ type WakuRest struct { filterService *FilterService } -func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool, enableAdmin bool, relayCacheCapacity, filterCacheCapacity int, log *zap.Logger) *WakuRest { +type RestConfig struct { + Address string + Port uint + EnablePProf bool + EnableAdmin bool + RelayCacheCapacity uint + FilterCacheCapacity uint +} + +func NewWakuRest(node *node.WakuNode, config RestConfig, log *zap.Logger) *WakuRest { wrpc := new(WakuRest) wrpc.log = log.Named("rest") @@ -30,7 +39,7 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool mux.Use(middleware.Logger) mux.Use(middleware.NoCache) - if enablePProf { + if config.EnablePProf { mux.Mount("/debug", middleware.Profiler()) } @@ -39,7 +48,7 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool _ = NewStoreService(node, mux) _ = NewLightpushService(node, mux, log) - listenAddr := fmt.Sprintf("%s:%d", address, port) + listenAddr := fmt.Sprintf("%s:%d", config.Address, config.Port) server := &http.Server{ Addr: listenAddr, @@ -50,19 +59,16 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool wrpc.server = server if node.Relay() != nil { - relayService := NewRelayService(node, mux, relayCacheCapacity, log) - server.RegisterOnShutdown(func() { - relayService.Stop() - }) + relayService := NewRelayService(node, mux, config.RelayCacheCapacity, log) wrpc.relayService = relayService } - if enableAdmin { + if config.EnableAdmin { _ = NewAdminService(node, mux, wrpc.log) } if node.FilterLightnode() != nil { - filterService := NewFilterService(node, mux, filterCacheCapacity, log) + filterService := NewFilterService(node, mux, int(config.FilterCacheCapacity), log) server.RegisterOnShutdown(func() { filterService.Stop() }) @@ -75,9 +81,6 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool func (r *WakuRest) Start(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() - if r.node.Relay() != nil { - go r.relayService.Start(ctx) - } if r.node.FilterLightnode() != nil { go r.filterService.Start(ctx) } diff --git a/cmd/waku/server/rest/waku_rest_test.go b/cmd/waku/server/rest/waku_rest_test.go index d27810b4..0d62c22e 100644 --- a/cmd/waku/server/rest/waku_rest_test.go +++ b/cmd/waku/server/rest/waku_rest_test.go @@ -13,7 +13,7 @@ func TestWakuRest(t *testing.T) { n, err := node.New(options) require.NoError(t, err) - rpc := NewWakuRest(n, "127.0.0.1", 8080, false, false, 10, 0, utils.Logger()) + rpc := NewWakuRest(n, RestConfig{Address: "127.0.0.1", Port: 8080, EnablePProf: false, EnableAdmin: false, RelayCacheCapacity: 10}, utils.Logger()) require.NotNil(t, rpc.server) require.Equal(t, rpc.server.Addr, "127.0.0.1:8080") } diff --git a/cmd/waku/server/rpc/admin_test.go b/cmd/waku/server/rpc/admin_test.go index 41966e6f..a17979e1 100644 --- a/cmd/waku/server/rpc/admin_test.go +++ b/cmd/waku/server/rpc/admin_test.go @@ -35,7 +35,8 @@ func TestV1Peers(t *testing.T) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay := relay.NewWakuRelay(nil, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + bcast := relay.NewBroadcaster(10) + relay := relay.NewWakuRelay(bcast, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) relay.SetHost(host) err = relay.Start(context.Background()) require.NoError(t, err) diff --git a/cmd/waku/server/rpc/filter_test.go b/cmd/waku/server/rpc/filter_test.go index e6c7d698..160afca7 100644 --- a/cmd/waku/server/rpc/filter_test.go +++ b/cmd/waku/server/rpc/filter_test.go @@ -50,6 +50,7 @@ func makeFilterService(t *testing.T, isFullNode bool) *FilterService { } func TestFilterSubscription(t *testing.T) { + t.Skip("skipping since it is legacy filter") port, err := tests.FindFreePort(t, "", 5) require.NoError(t, err) @@ -110,6 +111,8 @@ func TestFilterSubscription(t *testing.T) { } func TestFilterGetV1Messages(t *testing.T) { + t.Skip("skipping since it is legacy filter") + serviceA := makeFilterService(t, true) var reply SuccessReply diff --git a/cmd/waku/server/rpc/relay.go b/cmd/waku/server/rpc/relay.go index 5f431046..4994706b 100644 --- a/cmd/waku/server/rpc/relay.go +++ b/cmd/waku/server/rpc/relay.go @@ -1,15 +1,12 @@ package rpc import ( - "errors" "fmt" "net/http" - "sync" "github.com/waku-org/go-waku/cmd/waku/server" "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "go.uber.org/zap" ) @@ -20,11 +17,7 @@ type RelayService struct { log *zap.Logger - messages map[string][]*pb.WakuMessage cacheCapacity int - messagesMutex sync.RWMutex - - runner *runnerService } // RelayMessageArgs represents the requests used for posting messages @@ -54,46 +47,17 @@ func NewRelayService(node *node.WakuNode, cacheCapacity int, log *zap.Logger) *R node: node, cacheCapacity: cacheCapacity, log: log.Named("relay"), - messages: make(map[string][]*pb.WakuMessage), } - s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope) - return s } -func (r *RelayService) addEnvelope(envelope *protocol.Envelope) { - r.messagesMutex.Lock() - defer r.messagesMutex.Unlock() - - if _, ok := r.messages[envelope.PubsubTopic()]; !ok { - return - } - - // Keep a specific max number of messages per topic - if len(r.messages[envelope.PubsubTopic()]) >= r.cacheCapacity { - r.messages[envelope.PubsubTopic()] = r.messages[envelope.PubsubTopic()][1:] - } - - r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message()) -} - // Start starts the RelayService func (r *RelayService) Start() { - r.messagesMutex.Lock() - // Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these - for _, topic := range r.node.Relay().Topics() { - r.log.Info("adding topic handler for existing subscription", zap.String("topic", topic)) - r.messages[topic] = make([]*pb.WakuMessage, 0) - } - r.messagesMutex.Unlock() - - r.runner.Start() } // Stop stops the RelayService func (r *RelayService) Stop() { - r.runner.Stop() } // PostV1Message is invoked when the json rpc request uses the post_waku_v2_relay_v1_message method @@ -105,10 +69,6 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, topic = args.Topic } - if !r.node.Relay().IsSubscribed(topic) { - return errors.New("not subscribed to pubsubTopic") - } - msg := args.Message.toProto() if err = server.AppendRLNProof(r.node, msg); err != nil { @@ -204,17 +164,12 @@ func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, r if topic == "" { topic = relay.DefaultWakuTopic } - var sub *relay.Subscription - subs, err := r.node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic)) + _, err = r.node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic)) if err != nil { r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err)) return err } - sub = subs[0] - sub.Unsubscribe() - r.messagesMutex.Lock() - r.messages[topic] = make([]*pb.WakuMessage, 0) - r.messagesMutex.Unlock() + } *reply = true @@ -230,8 +185,6 @@ func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, r.log.Error("unsubscribing from topic", zap.String("topic", topic), zap.Error(err)) return err } - - delete(r.messages, topic) } *reply = true @@ -240,18 +193,16 @@ func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, // GetV1Messages is invoked when the json rpc request uses the get_waku_v2_relay_v1_messages method func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *MessagesReply) error { - r.messagesMutex.Lock() - defer r.messagesMutex.Unlock() - if _, ok := r.messages[args.Topic]; !ok { - return fmt.Errorf("topic %s not subscribed", args.Topic) + sub, err := r.node.Relay().GetSubscriptionWithPubsubTopic(args.Topic, "") + if err != nil { + return err } - - for i := range r.messages[args.Topic] { - *reply = append(*reply, ProtoToRPC(r.messages[args.Topic][i])) + select { + case msg := <-sub.Ch: + *reply = append(*reply, ProtoToRPC(msg.Message())) + default: + break } - - r.messages[args.Topic] = make([]*pb.WakuMessage, 0) - return nil } diff --git a/cmd/waku/server/rpc/relay_test.go b/cmd/waku/server/rpc/relay_test.go index 61427faf..cd9a408f 100644 --- a/cmd/waku/server/rpc/relay_test.go +++ b/cmd/waku/server/rpc/relay_test.go @@ -111,7 +111,8 @@ func TestRelayGetV1Messages(t *testing.T) { &RelayMessageArgs{ Topic: "test", Message: ProtoToRPC(&pb.WakuMessage{ - Payload: []byte("test"), + Payload: []byte("test"), + ContentTopic: "testContentTopic", }), }, &reply, diff --git a/waku/v2/protocol/relay/config.go b/waku/v2/protocol/relay/config.go index d343edd5..16799863 100644 --- a/waku/v2/protocol/relay/config.go +++ b/waku/v2/protocol/relay/config.go @@ -15,6 +15,7 @@ var DefaultRelaySubscriptionBufferSize int = 1024 type RelaySubscribeParameters struct { dontConsume bool + cacheSize uint } type RelaySubscribeOption func(*RelaySubscribeParameters) error @@ -28,6 +29,13 @@ func WithoutConsumer() RelaySubscribeOption { } } +func WithCacheSize(size uint) RelaySubscribeOption { + return func(params *RelaySubscribeParameters) error { + params.cacheSize = size + return nil + } +} + func msgIDFn(pmsg *pubsub_pb.Message) string { return string(hash.SHA256(pmsg.Data)) } diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index f55eba1e..c195ae07 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -279,13 +279,31 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts . return hash, nil } +// GetSubscriptionWithPubsubTopic fetches subscription matching pubsub and contentTopic +func (w *WakuRelay) GetSubscriptionWithPubsubTopic(pubsubTopic string, contentTopic string) (*Subscription, error) { + var contentFilter waku_proto.ContentFilter + if contentTopic != "" { + contentFilter = waku_proto.NewContentFilter(pubsubTopic, contentTopic) + } else { + contentFilter = waku_proto.NewContentFilter(pubsubTopic) + } + cSubs := w.contentSubs[pubsubTopic] + for _, sub := range cSubs { + if sub.contentFilter.Equals(contentFilter) { + return sub, nil + } + } + return nil, errors.New("no subscription found for content topic") +} + +// GetSubscription fetches subscription matching a contentTopic(via autosharding) func (w *WakuRelay) GetSubscription(contentTopic string) (*Subscription, error) { - pubSubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic) + pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic) if err != nil { return nil, err } - contentFilter := waku_proto.NewContentFilter(pubSubTopic, contentTopic) - cSubs := w.contentSubs[pubSubTopic] + contentFilter := waku_proto.NewContentFilter(pubsubTopic, contentTopic) + cSubs := w.contentSubs[pubsubTopic] for _, sub := range cSubs { if sub.contentFilter.Equals(contentFilter) { return sub, nil @@ -331,6 +349,9 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont return nil, err } } + if params.cacheSize <= 0 { + params.cacheSize = uint(DefaultRelaySubscriptionBufferSize) + } for pubSubTopic, cTopics := range pubSubTopicMap { w.log.Info("subscribing to", zap.String("pubsubTopic", pubSubTopic), zap.Strings("contenTopics", cTopics)) @@ -347,7 +368,7 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont } } - subscription := w.bcaster.Register(cFilter, WithBufferSize(DefaultRelaySubscriptionBufferSize), + subscription := w.bcaster.Register(cFilter, WithBufferSize(int(params.cacheSize)), WithConsumerOption(params.dontConsume)) // Create Content subscription