From 97973e3226f70456d788fa0372bb3de9ba48fb0b Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 20 Oct 2022 09:18:23 -0400 Subject: [PATCH] fix: keep only last 30 messages in memory in rpc relay and filter --- waku.go | 6 ++++++ waku/node.go | 2 +- waku/options.go | 11 ++++++----- waku/v2/rpc/filter.go | 15 +++++++++++---- waku/v2/rpc/filter_test.go | 2 +- waku/v2/rpc/relay.go | 15 +++++++++++---- waku/v2/rpc/relay_test.go | 2 +- waku/v2/rpc/waku_rpc.go | 6 +++--- waku/v2/rpc/waku_rpc_test.go | 2 +- 9 files changed, 41 insertions(+), 20 deletions(-) diff --git a/waku.go b/waku.go index befd847d..d0b99e1d 100644 --- a/waku.go +++ b/waku.go @@ -396,6 +396,12 @@ func main() { Usage: "Listening address of the rpc server", Destination: &options.RPCServer.Address, }, + &cli.IntFlag{ + Name: "rpc-relay-cache-capacity", + Value: 30, + Usage: "Capacity of the Relay REST API message cache", + Destination: &options.RPCServer.RelayCacheCapacity, + }, &cli.BoolFlag{ Name: "rpc-admin", Value: false, diff --git a/waku/node.go b/waku/node.go index 996dd38b..81a10bd0 100644 --- a/waku/node.go +++ b/waku/node.go @@ -326,7 +326,7 @@ func Execute(options Options) { var rpcServer *rpc.WakuRpc if options.RPCServer.Enable { - rpcServer = rpc.NewWakuRpc(wakuNode, options.RPCServer.Address, options.RPCServer.Port, options.RPCServer.Admin, options.RPCServer.Private, logger) + rpcServer = rpc.NewWakuRpc(wakuNode, options.RPCServer.Address, options.RPCServer.Port, options.RPCServer.Admin, options.RPCServer.Private, options.RPCServer.RelayCacheCapacity, logger) rpcServer.Start() } diff --git a/waku/options.go b/waku/options.go index d8c475ff..cab6e3dd 100644 --- a/waku/options.go +++ b/waku/options.go @@ -116,11 +116,12 @@ type MetricsOptions struct { // RPCServerOptions are settings used to start a json rpc server type RPCServerOptions struct { - Enable bool - Port int - Address string - Admin bool - Private bool + Enable bool + Port int + Address string + Admin bool + Private bool + RelayCacheCapacity int } // RESTServerOptions are settings used to start a rest http server diff --git a/waku/v2/rpc/filter.go b/waku/v2/rpc/filter.go index 1a290b12..d1caa241 100644 --- a/waku/v2/rpc/filter.go +++ b/waku/v2/rpc/filter.go @@ -17,6 +17,7 @@ type FilterService struct { log *zap.Logger messages map[string][]*pb.WakuMessage + cacheCapacity int messagesMutex sync.RWMutex runner *runnerService @@ -31,11 +32,12 @@ type ContentTopicArgs struct { ContentTopic string `json:"contentTopic,omitempty"` } -func NewFilterService(node *node.WakuNode, log *zap.Logger) *FilterService { +func NewFilterService(node *node.WakuNode, cacheCapacity int, log *zap.Logger) *FilterService { s := &FilterService{ - node: node, - log: log.Named("filter"), - messages: make(map[string][]*pb.WakuMessage), + node: node, + log: log.Named("filter"), + cacheCapacity: cacheCapacity, + messages: make(map[string][]*pb.WakuMessage), } s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope) return s @@ -62,6 +64,11 @@ func (f *FilterService) addEnvelope(envelope *protocol.Envelope) { return } + // Keep a specific max number of messages per topic + if len(f.messages[envelope.PubsubTopic()]) >= f.cacheCapacity { + f.messages[envelope.PubsubTopic()] = f.messages[envelope.PubsubTopic()][1:] + } + f.messages[contentTopic] = append(f.messages[contentTopic], envelope.Message()) } diff --git a/waku/v2/rpc/filter_test.go b/waku/v2/rpc/filter_test.go index bab17486..6c19fcd6 100644 --- a/waku/v2/rpc/filter_test.go +++ b/waku/v2/rpc/filter_test.go @@ -29,7 +29,7 @@ func makeFilterService(t *testing.T) *FilterService { _, err = n.Relay().SubscribeToTopic(context.Background(), testTopic) require.NoError(t, err) - return NewFilterService(n, utils.Logger()) + return NewFilterService(n, 30, utils.Logger()) } func TestFilterSubscription(t *testing.T) { diff --git a/waku/v2/rpc/relay.go b/waku/v2/rpc/relay.go index 775937f7..6de142e3 100644 --- a/waku/v2/rpc/relay.go +++ b/waku/v2/rpc/relay.go @@ -18,6 +18,7 @@ type RelayService struct { log *zap.Logger messages map[string][]*pb.WakuMessage + cacheCapacity int messagesMutex sync.RWMutex runner *runnerService @@ -36,11 +37,12 @@ type TopicArgs struct { Topic string `json:"topic,omitempty"` } -func NewRelayService(node *node.WakuNode, log *zap.Logger) *RelayService { +func NewRelayService(node *node.WakuNode, cacheCapacity int, log *zap.Logger) *RelayService { s := &RelayService{ - node: node, - log: log.Named("relay"), - messages: make(map[string][]*pb.WakuMessage), + node: node, + cacheCapacity: cacheCapacity, + log: log.Named("relay"), + messages: make(map[string][]*pb.WakuMessage), } s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope) @@ -56,6 +58,11 @@ func (r *RelayService) addEnvelope(envelope *protocol.Envelope) { return } + // Keep a specific max number of messages per topic + if len(r.messages[envelope.PubsubTopic()]) >= r.cacheCapacity { + r.messages[envelope.PubsubTopic()] = r.messages[envelope.PubsubTopic()][1:] + } + r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message()) } diff --git a/waku/v2/rpc/relay_test.go b/waku/v2/rpc/relay_test.go index 744140ef..5ad3b365 100644 --- a/waku/v2/rpc/relay_test.go +++ b/waku/v2/rpc/relay_test.go @@ -19,7 +19,7 @@ func makeRelayService(t *testing.T) *RelayService { err = n.Start() require.NoError(t, err) - return NewRelayService(n, utils.Logger()) + return NewRelayService(n, 30, utils.Logger()) } func TestPostV1Message(t *testing.T) { diff --git a/waku/v2/rpc/waku_rpc.go b/waku/v2/rpc/waku_rpc.go index f256768c..4c432f8e 100644 --- a/waku/v2/rpc/waku_rpc.go +++ b/waku/v2/rpc/waku_rpc.go @@ -24,7 +24,7 @@ type WakuRpc struct { adminService *AdminService } -func NewWakuRpc(node *node.WakuNode, address string, port int, enableAdmin bool, enablePrivate bool, log *zap.Logger) *WakuRpc { +func NewWakuRpc(node *node.WakuNode, address string, port int, enableAdmin bool, enablePrivate bool, cacheCapacity int, log *zap.Logger) *WakuRpc { wrpc := new(WakuRpc) wrpc.log = log.Named("rpc") @@ -45,7 +45,7 @@ func NewWakuRpc(node *node.WakuNode, address string, port int, enableAdmin bool, wrpc.log.Error("registering debug service", zap.Error(err)) } - relayService := NewRelayService(node, log) + relayService := NewRelayService(node, cacheCapacity, log) err = s.RegisterService(relayService, "Relay") if err != nil { wrpc.log.Error("registering relay service", zap.Error(err)) @@ -65,7 +65,7 @@ func NewWakuRpc(node *node.WakuNode, address string, port int, enableAdmin bool, wrpc.adminService = adminService } - filterService := NewFilterService(node, log) + filterService := NewFilterService(node, cacheCapacity, log) err = s.RegisterService(filterService, "Filter") if err != nil { wrpc.log.Error("registering filter service", zap.Error(err)) diff --git a/waku/v2/rpc/waku_rpc_test.go b/waku/v2/rpc/waku_rpc_test.go index 1ccee7b5..03e1b7c8 100644 --- a/waku/v2/rpc/waku_rpc_test.go +++ b/waku/v2/rpc/waku_rpc_test.go @@ -14,7 +14,7 @@ func TestWakuRpc(t *testing.T) { n, err := node.New(context.Background(), options) require.NoError(t, err) - rpc := NewWakuRpc(n, "127.0.0.1", 8080, true, true, utils.Logger()) + rpc := NewWakuRpc(n, "127.0.0.1", 8080, true, true, 30, utils.Logger()) require.NotNil(t, rpc.server) require.Equal(t, rpc.server.Addr, "127.0.0.1:8080") }