chore: update relay REST and RPC API's and fix unit tests (#866)

* update relay REST API's to remove duplicate message cache, fix relay tests and admin test

* chore: enable REST and RPC unit tests

* update lightpush rest api to match yaml

* fix: filter rest unit test failures

* skipping legacy filter tests

* chore: add unit tests for autosharding relay REST API, fix success response (#868)
This commit is contained in:
Prem Chaitanya Prathi 2023-11-07 20:26:48 +05:30 committed by GitHub
parent 9315de8d8a
commit 2616d43c9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 271 additions and 218 deletions

View File

@ -77,10 +77,10 @@ lint-full:
@golangci-lint run ./... --config=./.golangci.full.yaml --deadline=5m
test-with-race:
${GOBIN} test -race -timeout 300s ./waku/...
${GOBIN} test -race -timeout 300s ./waku/... ./cmd/waku/server/...
test:
${GOBIN} test -timeout 300s ./waku/... -coverprofile=${GO_TEST_OUTFILE}.tmp -coverpkg ./...
${GOBIN} test -timeout 300s ./waku/... ./cmd/waku/server/... -coverprofile=${GO_TEST_OUTFILE}.tmp -coverpkg ./...
cat ${GO_TEST_OUTFILE}.tmp | grep -v ".pb.go" > ${GO_TEST_OUTFILE}
${GOBIN} tool cover -html=${GO_TEST_OUTFILE} -o ${GO_HTML_COV}

View File

@ -444,7 +444,14 @@ func Execute(options NodeOptions) error {
var restServer *rest.WakuRest
if options.RESTServer.Enable {
wg.Add(1)
restServer = rest.NewWakuRest(wakuNode, options.RESTServer.Address, options.RESTServer.Port, options.PProf, options.RESTServer.Admin, options.RESTServer.RelayCacheCapacity, options.RESTServer.FilterCacheCapacity, logger)
restConfig := rest.RestConfig{Address: options.RESTServer.Address,
Port: uint(options.RESTServer.Port),
EnablePProf: options.PProf,
EnableAdmin: options.RESTServer.Admin,
RelayCacheCapacity: uint(options.RESTServer.RelayCacheCapacity),
FilterCacheCapacity: uint(options.RESTServer.FilterCacheCapacity)}
restServer = rest.NewWakuRest(wakuNode, restConfig, logger)
restServer.Start(ctx, &wg)
}

View File

@ -240,11 +240,13 @@ func (s *FilterService) unsubscribeGetMessage(result *filter.WakuFilterPushResul
var peerIds string
ind := 0
for _, entry := range result.Errors() {
if entry.Err != nil {
s.log.Error("can't unsubscribe for ", zap.String("peer", entry.PeerID.String()), zap.Error(entry.Err))
if ind != 0 {
peerIds += ", "
}
peerIds += entry.PeerID.String()
}
ind++
}
if peerIds != "" {

View File

@ -365,7 +365,7 @@ func TestFilterGetMessages(t *testing.T) {
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusNotFound, rr.Code)
require.Equal(t,
fmt.Sprintf("Not subscribed to pubsubTopic:%s contentTopic: %s", notSubscibredPubsubTopic, contentTopic),
fmt.Sprintf("not subscribed to pubsubTopic:%s contentTopic: %s", notSubscibredPubsubTopic, contentTopic),
rr.Body.String(),
)
}

View File

@ -71,6 +71,6 @@ func (serv *LightpushService) postMessagev1(w http.ResponseWriter, req *http.Req
_, err = w.Write([]byte(err.Error()))
serv.log.Error("writing response", zap.Error(err))
} else {
w.WriteHeader(http.StatusOK)
writeErrOrResponse(w, err, true)
}
}

View File

@ -1,13 +1,9 @@
package rest
import (
"context"
"encoding/json"
"errors"
"net/http"
"net/url"
"strings"
"sync"
"github.com/go-chi/chi/v5"
"github.com/waku-org/go-waku/cmd/waku/server"
@ -27,28 +23,20 @@ const routeRelayV1AutoMessages = "/relay/v1/auto/messages"
// RelayService represents the REST service for WakuRelay
type RelayService struct {
node *node.WakuNode
cancel context.CancelFunc
log *zap.Logger
messages map[string][]*pb.WakuMessage
cacheCapacity int
messagesMutex sync.RWMutex
runner *runnerService
cacheCapacity uint
}
// NewRelayService returns an instance of RelayService
func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *zap.Logger) *RelayService {
func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity uint, log *zap.Logger) *RelayService {
s := &RelayService{
node: node,
log: log.Named("relay"),
cacheCapacity: cacheCapacity,
messages: make(map[string][]*pb.WakuMessage),
}
s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope)
m.Post(routeRelayV1Subscriptions, s.postV1Subscriptions)
m.Delete(routeRelayV1Subscriptions, s.deleteV1Subscriptions)
m.Get(routeRelayV1Messages, s.getV1Messages)
@ -65,46 +53,6 @@ func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *za
return s
}
func (r *RelayService) addEnvelope(envelope *protocol.Envelope) {
r.messagesMutex.Lock()
defer r.messagesMutex.Unlock()
if _, ok := r.messages[envelope.PubsubTopic()]; !ok {
return
}
// Keep a specific max number of messages per topic
if len(r.messages[envelope.PubsubTopic()]) >= r.cacheCapacity {
r.messages[envelope.PubsubTopic()] = r.messages[envelope.PubsubTopic()][1:]
}
r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message())
}
// Start starts the RelayService
func (r *RelayService) Start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
r.cancel = cancel
r.messagesMutex.Lock()
// Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these
for _, topic := range r.node.Relay().Topics() {
r.log.Info("adding topic handler for existing subscription", zap.String("topic", topic))
r.messages[topic] = []*pb.WakuMessage{}
}
r.messagesMutex.Unlock()
r.runner.Start(ctx)
}
// Stop stops the RelayService
func (r *RelayService) Stop() {
if r.cancel == nil {
return
}
r.cancel()
}
func (r *RelayService) deleteV1Subscriptions(w http.ResponseWriter, req *http.Request) {
var topics []string
decoder := json.NewDecoder(req.Body)
@ -114,16 +62,11 @@ func (r *RelayService) deleteV1Subscriptions(w http.ResponseWriter, req *http.Re
}
defer req.Body.Close()
r.messagesMutex.Lock()
defer r.messagesMutex.Unlock()
var err error
for _, topic := range topics {
err = r.node.Relay().Unsubscribe(req.Context(), protocol.NewContentFilter(topic))
if err != nil {
r.log.Error("unsubscribing from topic", zap.String("topic", strings.Replace(strings.Replace(topic, "\n", "", -1), "\r", "", -1)), zap.Error(err))
} else {
delete(r.messages, topic)
}
}
@ -140,26 +83,29 @@ func (r *RelayService) postV1Subscriptions(w http.ResponseWriter, req *http.Requ
defer req.Body.Close()
var err error
var sub *relay.Subscription
var subs []*relay.Subscription
var successCnt int
var topicToSubscribe string
for _, topic := range topics {
if topic == "" {
subs, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(relay.DefaultWakuTopic))
topicToSubscribe = relay.DefaultWakuTopic
} else {
subs, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topic))
topicToSubscribe = topic
}
_, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topicToSubscribe), relay.WithCacheSize(r.cacheCapacity))
if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err))
} else {
sub = subs[0]
sub.Unsubscribe()
r.messagesMutex.Lock()
r.messages[topic] = []*pb.WakuMessage{}
r.messagesMutex.Unlock()
continue
}
successCnt++
}
// on partial subscribe failure
if successCnt > 0 && err != nil {
r.log.Error("partial subscribe failed", zap.Error(err))
// on partial failure
writeResponse(w, err, http.StatusOK)
return
}
writeErrOrResponse(w, err, true)
@ -170,20 +116,22 @@ func (r *RelayService) getV1Messages(w http.ResponseWriter, req *http.Request) {
if topic == "" {
return
}
r.messagesMutex.Lock()
defer r.messagesMutex.Unlock()
if _, ok := r.messages[topic]; !ok {
//TODO: Update the API to also take a contentTopic since relay now supports filtering based on contentTopic as well.
sub, err := r.node.Relay().GetSubscriptionWithPubsubTopic(topic, "")
if err != nil {
w.WriteHeader(http.StatusNotFound)
_, err := w.Write([]byte("not subscribed to topic"))
_, err = w.Write([]byte("not subscribed to topic"))
r.log.Error("writing response", zap.Error(err))
return
}
var response []*pb.WakuMessage
select {
case msg := <-sub.Ch:
response = append(response, msg.Message())
default:
break
}
response := r.messages[topic]
r.messages[topic] = []*pb.WakuMessage{}
writeErrOrResponse(w, nil, response)
}
@ -205,11 +153,6 @@ func (r *RelayService) postV1Message(w http.ResponseWriter, req *http.Request) {
topic = relay.DefaultWakuTopic
}
if !r.node.Relay().IsSubscribed(topic) {
writeErrOrResponse(w, errors.New("not subscribed to pubsubTopic"), nil)
return
}
if err := server.AppendRLNProof(r.node, message); err != nil {
writeErrOrResponse(w, err, nil)
return
@ -250,7 +193,7 @@ func (r *RelayService) postV1AutoSubscriptions(w http.ResponseWriter, req *http.
defer req.Body.Close()
var err error
_, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", cTopics...))
_, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", cTopics...), relay.WithCacheSize(r.cacheCapacity))
if err != nil {
r.log.Error("subscribing to topics", zap.Strings("contentTopics", cTopics), zap.Error(err))
}
@ -260,27 +203,14 @@ func (r *RelayService) postV1AutoSubscriptions(w http.ResponseWriter, req *http.
_, err := w.Write([]byte(err.Error()))
r.log.Error("writing response", zap.Error(err))
} else {
w.WriteHeader(http.StatusOK)
writeErrOrResponse(w, err, true)
}
}
func (r *RelayService) getV1AutoMessages(w http.ResponseWriter, req *http.Request) {
cTopic := chi.URLParam(req, "contentTopic")
if cTopic == "" {
w.WriteHeader(http.StatusBadRequest)
_, err := w.Write([]byte("contentTopic is required"))
r.log.Error("writing response", zap.Error(err))
return
}
cTopic, err := url.QueryUnescape(cTopic)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, err = w.Write([]byte("invalid contentTopic format"))
r.log.Error("writing response", zap.Error(err))
return
}
cTopic := topicFromPath(w, req, "contentTopic", r.log)
sub, err := r.node.Relay().GetSubscription(cTopic)
if err != nil {
w.WriteHeader(http.StatusNotFound)

View File

@ -7,6 +7,7 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"
@ -15,8 +16,8 @@ import (
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
)
@ -34,7 +35,6 @@ func TestPostV1Message(t *testing.T) {
router := chi.NewRouter()
_ = makeRelayService(t, router)
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "abc",
@ -54,10 +54,7 @@ func TestPostV1Message(t *testing.T) {
func TestRelaySubscription(t *testing.T) {
router := chi.NewRouter()
d := makeRelayService(t, router)
go d.Start(context.Background())
defer d.Stop()
r := makeRelayService(t, router)
// Wait for node to start
time.Sleep(500 * time.Millisecond)
@ -67,48 +64,42 @@ func TestRelaySubscription(t *testing.T) {
require.NoError(t, err)
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodPost, "/relay/v1/subscriptions", bytes.NewReader(topicsJSONBytes))
req, _ := http.NewRequest(http.MethodPost, routeRelayV1Subscriptions, bytes.NewReader(topicsJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
require.Equal(t, "true", rr.Body.String())
// Test max messages in subscription
now := utils.GetUnixEpoch()
d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+1), now, "test"))
d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+2), now, "test"))
d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+3), now, "test"))
_, err = r.node.Relay().Publish(context.Background(),
tests.CreateWakuMessage("test", now+1), relay.WithPubSubTopic("test"))
require.NoError(t, err)
_, err = r.node.Relay().Publish(context.Background(),
tests.CreateWakuMessage("test", now+2), relay.WithPubSubTopic("test"))
require.NoError(t, err)
_, err = r.node.Relay().Publish(context.Background(),
tests.CreateWakuMessage("test", now+3), relay.WithPubSubTopic("test"))
require.NoError(t, err)
// Wait for the messages to be processed
time.Sleep(500 * time.Millisecond)
require.Len(t, d.messages["test"], 3)
d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+4), now+4, "test"))
time.Sleep(500 * time.Millisecond)
// Should only have 3 messages
require.Len(t, d.messages["test"], 3)
time.Sleep(5 * time.Millisecond)
// Test deletion
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodDelete, "/relay/v1/subscriptions", bytes.NewReader(topicsJSONBytes))
req, _ = http.NewRequest(http.MethodDelete, routeRelayV1Subscriptions, bytes.NewReader(topicsJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
require.Equal(t, "true", rr.Body.String())
require.Len(t, d.messages["test"], 0)
}
func TestRelayGetV1Messages(t *testing.T) {
router := chi.NewRouter()
router1 := chi.NewRouter()
serviceA := makeRelayService(t, router)
go serviceA.Start(context.Background())
defer serviceA.Stop()
serviceB := makeRelayService(t, router)
go serviceB.Start(context.Background())
defer serviceB.Stop()
serviceB := makeRelayService(t, router1)
hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", serviceB.node.Host().ID().Pretty()))
require.NoError(t, err)
@ -129,7 +120,7 @@ func TestRelayGetV1Messages(t *testing.T) {
require.NoError(t, err)
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodPost, "/relay/v1/subscriptions", bytes.NewReader(topicsJSONBytes))
req, _ := http.NewRequest(http.MethodPost, routeRelayV1Subscriptions, bytes.NewReader(topicsJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
@ -165,9 +156,144 @@ func TestRelayGetV1Messages(t *testing.T) {
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodGet, "/relay/v1/messages/test", bytes.NewReader([]byte{}))
router.ServeHTTP(rr, req)
router1.ServeHTTP(rr, req)
require.Equal(t, http.StatusNotFound, rr.Code)
}
func TestPostAutoV1Message(t *testing.T) {
router := chi.NewRouter()
_ = makeRelayService(t, router)
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "/toychat/1/huilong/proto",
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msgJSONBytes, err := json.Marshal(msg)
require.NoError(t, err)
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodPost, routeRelayV1AutoMessages, bytes.NewReader(msgJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
}
func TestRelayAutoSubUnsub(t *testing.T) {
router := chi.NewRouter()
r := makeRelayService(t, router)
// Wait for node to start
time.Sleep(500 * time.Millisecond)
cTopic1 := "/toychat/1/huilong/proto"
cTopics := []string{cTopic1}
topicsJSONBytes, err := json.Marshal(cTopics)
require.NoError(t, err)
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodPost, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
require.Equal(t, "true", rr.Body.String())
// Test publishing messages after subscription
now := utils.GetUnixEpoch()
_, err = r.node.Relay().Publish(context.Background(),
tests.CreateWakuMessage(cTopic1, now+1))
require.NoError(t, err)
// Wait for the messages to be processed
time.Sleep(5 * time.Millisecond)
// Test deletion
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodDelete, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
require.Equal(t, "true", rr.Body.String())
cTopics = append(cTopics, "test")
topicsJSONBytes, err = json.Marshal(cTopics)
require.NoError(t, err)
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodPost, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusBadRequest, rr.Code)
}
func TestRelayGetV1AutoMessages(t *testing.T) {
router := chi.NewRouter()
router1 := chi.NewRouter()
serviceA := makeRelayService(t, router)
serviceB := makeRelayService(t, router1)
hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", serviceB.node.Host().ID().Pretty()))
require.NoError(t, err)
var addr multiaddr.Multiaddr
for _, a := range serviceB.node.Host().Addrs() {
addr = a.Encapsulate(hostInfo)
break
}
err = serviceA.node.DialPeerWithMultiAddress(context.Background(), addr)
require.NoError(t, err)
// Wait for the dial to complete
time.Sleep(1 * time.Second)
cTopic1 := "/toychat/1/huilong/proto"
cTopics := []string{cTopic1}
topicsJSONBytes, err := json.Marshal(cTopics)
require.NoError(t, err)
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodPost, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
require.Equal(t, "true", rr.Body.String())
// Wait for the subscription to be started
time.Sleep(1 * time.Second)
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: cTopic1,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msgJsonBytes, err := json.Marshal(msg)
require.NoError(t, err)
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodPost, routeRelayV1AutoMessages, bytes.NewReader(msgJsonBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
// Wait for the message to be received
time.Sleep(1 * time.Second)
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", routeRelayV1AutoMessages, url.QueryEscape(cTopic1)), bytes.NewReader([]byte{}))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
var messages []*pb.WakuMessage
err = json.Unmarshal(rr.Body.Bytes(), &messages)
require.NoError(t, err)
require.Len(t, messages, 0)
require.Len(t, messages, 1)
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", routeRelayV1AutoMessages, url.QueryEscape(cTopic1)), bytes.NewReader([]byte{}))
router1.ServeHTTP(rr, req)
require.Equal(t, http.StatusNotFound, rr.Code)
}

View File

@ -22,7 +22,16 @@ type WakuRest struct {
filterService *FilterService
}
func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool, enableAdmin bool, relayCacheCapacity, filterCacheCapacity int, log *zap.Logger) *WakuRest {
type RestConfig struct {
Address string
Port uint
EnablePProf bool
EnableAdmin bool
RelayCacheCapacity uint
FilterCacheCapacity uint
}
func NewWakuRest(node *node.WakuNode, config RestConfig, log *zap.Logger) *WakuRest {
wrpc := new(WakuRest)
wrpc.log = log.Named("rest")
@ -30,7 +39,7 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool
mux.Use(middleware.Logger)
mux.Use(middleware.NoCache)
if enablePProf {
if config.EnablePProf {
mux.Mount("/debug", middleware.Profiler())
}
@ -39,7 +48,7 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool
_ = NewStoreService(node, mux)
_ = NewLightpushService(node, mux, log)
listenAddr := fmt.Sprintf("%s:%d", address, port)
listenAddr := fmt.Sprintf("%s:%d", config.Address, config.Port)
server := &http.Server{
Addr: listenAddr,
@ -50,19 +59,16 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool
wrpc.server = server
if node.Relay() != nil {
relayService := NewRelayService(node, mux, relayCacheCapacity, log)
server.RegisterOnShutdown(func() {
relayService.Stop()
})
relayService := NewRelayService(node, mux, config.RelayCacheCapacity, log)
wrpc.relayService = relayService
}
if enableAdmin {
if config.EnableAdmin {
_ = NewAdminService(node, mux, wrpc.log)
}
if node.FilterLightnode() != nil {
filterService := NewFilterService(node, mux, filterCacheCapacity, log)
filterService := NewFilterService(node, mux, int(config.FilterCacheCapacity), log)
server.RegisterOnShutdown(func() {
filterService.Stop()
})
@ -75,9 +81,6 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool
func (r *WakuRest) Start(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
if r.node.Relay() != nil {
go r.relayService.Start(ctx)
}
if r.node.FilterLightnode() != nil {
go r.filterService.Start(ctx)
}

View File

@ -13,7 +13,7 @@ func TestWakuRest(t *testing.T) {
n, err := node.New(options)
require.NoError(t, err)
rpc := NewWakuRest(n, "127.0.0.1", 8080, false, false, 10, 0, utils.Logger())
rpc := NewWakuRest(n, RestConfig{Address: "127.0.0.1", Port: 8080, EnablePProf: false, EnableAdmin: false, RelayCacheCapacity: 10}, utils.Logger())
require.NotNil(t, rpc.server)
require.Equal(t, rpc.server.Addr, "127.0.0.1:8080")
}

View File

@ -35,7 +35,8 @@ func TestV1Peers(t *testing.T) {
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
relay := relay.NewWakuRelay(nil, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
bcast := relay.NewBroadcaster(10)
relay := relay.NewWakuRelay(bcast, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
relay.SetHost(host)
err = relay.Start(context.Background())
require.NoError(t, err)

View File

@ -50,6 +50,7 @@ func makeFilterService(t *testing.T, isFullNode bool) *FilterService {
}
func TestFilterSubscription(t *testing.T) {
t.Skip("skipping since it is legacy filter")
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)
@ -110,6 +111,8 @@ func TestFilterSubscription(t *testing.T) {
}
func TestFilterGetV1Messages(t *testing.T) {
t.Skip("skipping since it is legacy filter")
serviceA := makeFilterService(t, true)
var reply SuccessReply

View File

@ -1,15 +1,12 @@
package rpc
import (
"errors"
"fmt"
"net/http"
"sync"
"github.com/waku-org/go-waku/cmd/waku/server"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap"
)
@ -20,11 +17,7 @@ type RelayService struct {
log *zap.Logger
messages map[string][]*pb.WakuMessage
cacheCapacity int
messagesMutex sync.RWMutex
runner *runnerService
}
// RelayMessageArgs represents the requests used for posting messages
@ -54,46 +47,17 @@ func NewRelayService(node *node.WakuNode, cacheCapacity int, log *zap.Logger) *R
node: node,
cacheCapacity: cacheCapacity,
log: log.Named("relay"),
messages: make(map[string][]*pb.WakuMessage),
}
s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope)
return s
}
func (r *RelayService) addEnvelope(envelope *protocol.Envelope) {
r.messagesMutex.Lock()
defer r.messagesMutex.Unlock()
if _, ok := r.messages[envelope.PubsubTopic()]; !ok {
return
}
// Keep a specific max number of messages per topic
if len(r.messages[envelope.PubsubTopic()]) >= r.cacheCapacity {
r.messages[envelope.PubsubTopic()] = r.messages[envelope.PubsubTopic()][1:]
}
r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message())
}
// Start starts the RelayService
func (r *RelayService) Start() {
r.messagesMutex.Lock()
// Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these
for _, topic := range r.node.Relay().Topics() {
r.log.Info("adding topic handler for existing subscription", zap.String("topic", topic))
r.messages[topic] = make([]*pb.WakuMessage, 0)
}
r.messagesMutex.Unlock()
r.runner.Start()
}
// Stop stops the RelayService
func (r *RelayService) Stop() {
r.runner.Stop()
}
// PostV1Message is invoked when the json rpc request uses the post_waku_v2_relay_v1_message method
@ -105,10 +69,6 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs,
topic = args.Topic
}
if !r.node.Relay().IsSubscribed(topic) {
return errors.New("not subscribed to pubsubTopic")
}
msg := args.Message.toProto()
if err = server.AppendRLNProof(r.node, msg); err != nil {
@ -204,17 +164,12 @@ func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, r
if topic == "" {
topic = relay.DefaultWakuTopic
}
var sub *relay.Subscription
subs, err := r.node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic))
_, err = r.node.Relay().Subscribe(ctx, protocol.NewContentFilter(topic))
if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err))
return err
}
sub = subs[0]
sub.Unsubscribe()
r.messagesMutex.Lock()
r.messages[topic] = make([]*pb.WakuMessage, 0)
r.messagesMutex.Unlock()
}
*reply = true
@ -230,8 +185,6 @@ func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs,
r.log.Error("unsubscribing from topic", zap.String("topic", topic), zap.Error(err))
return err
}
delete(r.messages, topic)
}
*reply = true
@ -240,18 +193,16 @@ func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs,
// GetV1Messages is invoked when the json rpc request uses the get_waku_v2_relay_v1_messages method
func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *MessagesReply) error {
r.messagesMutex.Lock()
defer r.messagesMutex.Unlock()
if _, ok := r.messages[args.Topic]; !ok {
return fmt.Errorf("topic %s not subscribed", args.Topic)
sub, err := r.node.Relay().GetSubscriptionWithPubsubTopic(args.Topic, "")
if err != nil {
return err
}
for i := range r.messages[args.Topic] {
*reply = append(*reply, ProtoToRPC(r.messages[args.Topic][i]))
select {
case msg := <-sub.Ch:
*reply = append(*reply, ProtoToRPC(msg.Message()))
default:
break
}
r.messages[args.Topic] = make([]*pb.WakuMessage, 0)
return nil
}

View File

@ -112,6 +112,7 @@ func TestRelayGetV1Messages(t *testing.T) {
Topic: "test",
Message: ProtoToRPC(&pb.WakuMessage{
Payload: []byte("test"),
ContentTopic: "testContentTopic",
}),
},
&reply,

View File

@ -15,6 +15,7 @@ var DefaultRelaySubscriptionBufferSize int = 1024
type RelaySubscribeParameters struct {
dontConsume bool
cacheSize uint
}
type RelaySubscribeOption func(*RelaySubscribeParameters) error
@ -28,6 +29,13 @@ func WithoutConsumer() RelaySubscribeOption {
}
}
func WithCacheSize(size uint) RelaySubscribeOption {
return func(params *RelaySubscribeParameters) error {
params.cacheSize = size
return nil
}
}
func msgIDFn(pmsg *pubsub_pb.Message) string {
return string(hash.SHA256(pmsg.Data))
}

View File

@ -279,13 +279,31 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts .
return hash, nil
}
// GetSubscriptionWithPubsubTopic fetches subscription matching pubsub and contentTopic
func (w *WakuRelay) GetSubscriptionWithPubsubTopic(pubsubTopic string, contentTopic string) (*Subscription, error) {
var contentFilter waku_proto.ContentFilter
if contentTopic != "" {
contentFilter = waku_proto.NewContentFilter(pubsubTopic, contentTopic)
} else {
contentFilter = waku_proto.NewContentFilter(pubsubTopic)
}
cSubs := w.contentSubs[pubsubTopic]
for _, sub := range cSubs {
if sub.contentFilter.Equals(contentFilter) {
return sub, nil
}
}
return nil, errors.New("no subscription found for content topic")
}
// GetSubscription fetches subscription matching a contentTopic(via autosharding)
func (w *WakuRelay) GetSubscription(contentTopic string) (*Subscription, error) {
pubSubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic)
pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic)
if err != nil {
return nil, err
}
contentFilter := waku_proto.NewContentFilter(pubSubTopic, contentTopic)
cSubs := w.contentSubs[pubSubTopic]
contentFilter := waku_proto.NewContentFilter(pubsubTopic, contentTopic)
cSubs := w.contentSubs[pubsubTopic]
for _, sub := range cSubs {
if sub.contentFilter.Equals(contentFilter) {
return sub, nil
@ -331,6 +349,9 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont
return nil, err
}
}
if params.cacheSize <= 0 {
params.cacheSize = uint(DefaultRelaySubscriptionBufferSize)
}
for pubSubTopic, cTopics := range pubSubTopicMap {
w.log.Info("subscribing to", zap.String("pubsubTopic", pubSubTopic), zap.Strings("contenTopics", cTopics))
@ -347,7 +368,7 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont
}
}
subscription := w.bcaster.Register(cFilter, WithBufferSize(DefaultRelaySubscriptionBufferSize),
subscription := w.bcaster.Register(cFilter, WithBufferSize(int(params.cacheSize)),
WithConsumerOption(params.dontConsume))
// Create Content subscription