From f6b5f1eb6f089d1ec0af086594a38740ecb99201 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Mon, 6 Sep 2021 14:40:43 -0400 Subject: [PATCH] fix: check for messages first on filter in case a non null request is being sent --- waku/v2/node/wakuoptions.go | 6 ++---- waku/v2/protocol/filter/waku_filter.go | 23 ++++++++++++++--------- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 3dbd9c89..13c5eecc 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -96,12 +96,10 @@ func WithWakuRelay(opts ...wakurelay.Option) WakuNodeOption { } } -// WithWakuFilter enables the Waku V2 Filter protocol. This WakuNodeOption -// accepts a list of WakuFilter gossipsub options to setup the protocol -func WithWakuFilter(opts ...wakurelay.Option) WakuNodeOption { +// WithWakuFilter enables the Waku V2 Filter protocol. +func WithWakuFilter() WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableFilter = true - params.wOpts = opts return nil } } diff --git a/waku/v2/protocol/filter/waku_filter.go b/waku/v2/protocol/filter/waku_filter.go index c9be11b6..62df6328 100644 --- a/waku/v2/protocol/filter/waku_filter.go +++ b/waku/v2/protocol/filter/waku_filter.go @@ -95,11 +95,17 @@ func (wf *WakuFilter) onRequest(s network.Stream) { return } - log.Info(fmt.Sprintf("%s: Received query from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer())) + log.Info(fmt.Sprintf("%s: Received request from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer())) stats.Record(wf.ctx, metrics.Messages.M(1)) - if filterRPCRequest.Request != nil { + if filterRPCRequest.Push != nil && len(filterRPCRequest.Push.Messages) > 0 { + // We're on a light node. + // This is a message push coming from a full node. + + log.Info("Light node, received a message push ", *filterRPCRequest.Push) + wf.pushHandler(filterRPCRequest.RequestId, *filterRPCRequest.Push) + } else if filterRPCRequest.Request != nil { // We're on a full node. // This is a filter request coming from a light node. if filterRPCRequest.Request.Subscribe { @@ -151,14 +157,7 @@ func (wf *WakuFilter) onRequest(s network.Stream) { stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len(wf.subscribers)))) } - } else if filterRPCRequest.Push != nil { - // We're on a light node. - // This is a message push coming from a full node. - - log.Info("Light node, received a message push ", *filterRPCRequest.Push) - wf.pushHandler(filterRPCRequest.RequestId, *filterRPCRequest.Push) } - } func (wf *WakuFilter) peerListener() { @@ -231,6 +230,8 @@ func (wf *WakuFilter) FilterListener() { //waku_filter_errors.inc(labelValues = [dialFailure]) return err } + + defer conn.Close() writer := protoio.NewDelimitedWriter(conn) err = writer.WriteMsg(pushRPC) if err != nil { @@ -262,6 +263,8 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, request pb.FilterRequest) ( conn, err := wf.h.NewStream(ctx, *peer, WakuFilterProtocolId) if conn != nil { + defer conn.Close() + // This is the only successful path to subscription id := protocol.GenerateRequestId() @@ -294,6 +297,8 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, request pb.FilterRequest) conn, err := wf.h.NewStream(ctx, *peer, WakuFilterProtocolId) if conn != nil { + defer conn.Close() + // This is the only successful path to subscription id := protocol.GenerateRequestId()