fix: keep only last 30 messages in memory in rpc relay and filter

This commit is contained in:
Richard Ramos 2022-10-20 09:18:23 -04:00 committed by RichΛrd
parent df38038bc0
commit 97973e3226
9 changed files with 41 additions and 20 deletions

View File

@ -396,6 +396,12 @@ func main() {
Usage: "Listening address of the rpc server",
Destination: &options.RPCServer.Address,
},
&cli.IntFlag{
Name: "rpc-relay-cache-capacity",
Value: 30,
Usage: "Capacity of the Relay REST API message cache",
Destination: &options.RPCServer.RelayCacheCapacity,
},
&cli.BoolFlag{
Name: "rpc-admin",
Value: false,

View File

@ -326,7 +326,7 @@ func Execute(options Options) {
var rpcServer *rpc.WakuRpc
if options.RPCServer.Enable {
rpcServer = rpc.NewWakuRpc(wakuNode, options.RPCServer.Address, options.RPCServer.Port, options.RPCServer.Admin, options.RPCServer.Private, logger)
rpcServer = rpc.NewWakuRpc(wakuNode, options.RPCServer.Address, options.RPCServer.Port, options.RPCServer.Admin, options.RPCServer.Private, options.RPCServer.RelayCacheCapacity, logger)
rpcServer.Start()
}

View File

@ -121,6 +121,7 @@ type RPCServerOptions struct {
Address string
Admin bool
Private bool
RelayCacheCapacity int
}
// RESTServerOptions are settings used to start a rest http server

View File

@ -17,6 +17,7 @@ type FilterService struct {
log *zap.Logger
messages map[string][]*pb.WakuMessage
cacheCapacity int
messagesMutex sync.RWMutex
runner *runnerService
@ -31,10 +32,11 @@ type ContentTopicArgs struct {
ContentTopic string `json:"contentTopic,omitempty"`
}
func NewFilterService(node *node.WakuNode, log *zap.Logger) *FilterService {
func NewFilterService(node *node.WakuNode, cacheCapacity int, log *zap.Logger) *FilterService {
s := &FilterService{
node: node,
log: log.Named("filter"),
cacheCapacity: cacheCapacity,
messages: make(map[string][]*pb.WakuMessage),
}
s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope)
@ -62,6 +64,11 @@ func (f *FilterService) addEnvelope(envelope *protocol.Envelope) {
return
}
// Keep a specific max number of messages per topic
if len(f.messages[envelope.PubsubTopic()]) >= f.cacheCapacity {
f.messages[envelope.PubsubTopic()] = f.messages[envelope.PubsubTopic()][1:]
}
f.messages[contentTopic] = append(f.messages[contentTopic], envelope.Message())
}

View File

@ -29,7 +29,7 @@ func makeFilterService(t *testing.T) *FilterService {
_, err = n.Relay().SubscribeToTopic(context.Background(), testTopic)
require.NoError(t, err)
return NewFilterService(n, utils.Logger())
return NewFilterService(n, 30, utils.Logger())
}
func TestFilterSubscription(t *testing.T) {

View File

@ -18,6 +18,7 @@ type RelayService struct {
log *zap.Logger
messages map[string][]*pb.WakuMessage
cacheCapacity int
messagesMutex sync.RWMutex
runner *runnerService
@ -36,9 +37,10 @@ type TopicArgs struct {
Topic string `json:"topic,omitempty"`
}
func NewRelayService(node *node.WakuNode, log *zap.Logger) *RelayService {
func NewRelayService(node *node.WakuNode, cacheCapacity int, log *zap.Logger) *RelayService {
s := &RelayService{
node: node,
cacheCapacity: cacheCapacity,
log: log.Named("relay"),
messages: make(map[string][]*pb.WakuMessage),
}
@ -56,6 +58,11 @@ func (r *RelayService) addEnvelope(envelope *protocol.Envelope) {
return
}
// Keep a specific max number of messages per topic
if len(r.messages[envelope.PubsubTopic()]) >= r.cacheCapacity {
r.messages[envelope.PubsubTopic()] = r.messages[envelope.PubsubTopic()][1:]
}
r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message())
}

View File

@ -19,7 +19,7 @@ func makeRelayService(t *testing.T) *RelayService {
err = n.Start()
require.NoError(t, err)
return NewRelayService(n, utils.Logger())
return NewRelayService(n, 30, utils.Logger())
}
func TestPostV1Message(t *testing.T) {

View File

@ -24,7 +24,7 @@ type WakuRpc struct {
adminService *AdminService
}
func NewWakuRpc(node *node.WakuNode, address string, port int, enableAdmin bool, enablePrivate bool, log *zap.Logger) *WakuRpc {
func NewWakuRpc(node *node.WakuNode, address string, port int, enableAdmin bool, enablePrivate bool, cacheCapacity int, log *zap.Logger) *WakuRpc {
wrpc := new(WakuRpc)
wrpc.log = log.Named("rpc")
@ -45,7 +45,7 @@ func NewWakuRpc(node *node.WakuNode, address string, port int, enableAdmin bool,
wrpc.log.Error("registering debug service", zap.Error(err))
}
relayService := NewRelayService(node, log)
relayService := NewRelayService(node, cacheCapacity, log)
err = s.RegisterService(relayService, "Relay")
if err != nil {
wrpc.log.Error("registering relay service", zap.Error(err))
@ -65,7 +65,7 @@ func NewWakuRpc(node *node.WakuNode, address string, port int, enableAdmin bool,
wrpc.adminService = adminService
}
filterService := NewFilterService(node, log)
filterService := NewFilterService(node, cacheCapacity, log)
err = s.RegisterService(filterService, "Filter")
if err != nil {
wrpc.log.Error("registering filter service", zap.Error(err))

View File

@ -14,7 +14,7 @@ func TestWakuRpc(t *testing.T) {
n, err := node.New(context.Background(), options)
require.NoError(t, err)
rpc := NewWakuRpc(n, "127.0.0.1", 8080, true, true, utils.Logger())
rpc := NewWakuRpc(n, "127.0.0.1", 8080, true, true, 30, utils.Logger())
require.NotNil(t, rpc.server)
require.Equal(t, rpc.server.Addr, "127.0.0.1:8080")
}