fix: check for messages first on filter in case a non null request is being sent

This commit is contained in:
Richard Ramos 2021-09-06 14:40:43 -04:00
parent 67ac969a65
commit f6b5f1eb6f
No known key found for this signature in database
GPG Key ID: 80D4B01265FDFE8F
2 changed files with 16 additions and 13 deletions

View File

@ -96,12 +96,10 @@ func WithWakuRelay(opts ...wakurelay.Option) WakuNodeOption {
}
}
// WithWakuFilter enables the Waku V2 Filter protocol. This WakuNodeOption
// accepts a list of WakuFilter gossipsub options to setup the protocol
func WithWakuFilter(opts ...wakurelay.Option) WakuNodeOption {
// WithWakuFilter enables the Waku V2 Filter protocol.
func WithWakuFilter() WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableFilter = true
params.wOpts = opts
return nil
}
}

View File

@ -95,11 +95,17 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
return
}
log.Info(fmt.Sprintf("%s: Received query from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer()))
log.Info(fmt.Sprintf("%s: Received request from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer()))
stats.Record(wf.ctx, metrics.Messages.M(1))
if filterRPCRequest.Request != nil {
if filterRPCRequest.Push != nil && len(filterRPCRequest.Push.Messages) > 0 {
// We're on a light node.
// This is a message push coming from a full node.
log.Info("Light node, received a message push ", *filterRPCRequest.Push)
wf.pushHandler(filterRPCRequest.RequestId, *filterRPCRequest.Push)
} else if filterRPCRequest.Request != nil {
// We're on a full node.
// This is a filter request coming from a light node.
if filterRPCRequest.Request.Subscribe {
@ -151,14 +157,7 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len(wf.subscribers))))
}
} else if filterRPCRequest.Push != nil {
// We're on a light node.
// This is a message push coming from a full node.
log.Info("Light node, received a message push ", *filterRPCRequest.Push)
wf.pushHandler(filterRPCRequest.RequestId, *filterRPCRequest.Push)
}
}
func (wf *WakuFilter) peerListener() {
@ -231,6 +230,8 @@ func (wf *WakuFilter) FilterListener() {
//waku_filter_errors.inc(labelValues = [dialFailure])
return err
}
defer conn.Close()
writer := protoio.NewDelimitedWriter(conn)
err = writer.WriteMsg(pushRPC)
if err != nil {
@ -262,6 +263,8 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, request pb.FilterRequest) (
conn, err := wf.h.NewStream(ctx, *peer, WakuFilterProtocolId)
if conn != nil {
defer conn.Close()
// This is the only successful path to subscription
id := protocol.GenerateRequestId()
@ -294,6 +297,8 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, request pb.FilterRequest)
conn, err := wf.h.NewStream(ctx, *peer, WakuFilterProtocolId)
if conn != nil {
defer conn.Close()
// This is the only successful path to subscription
id := protocol.GenerateRequestId()