From 6c4a74fb9cbf78af6a4cb1dc9edd493382977598 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rich=CE=9Brd?= Date: Mon, 27 Sep 2021 08:47:18 -0400 Subject: [PATCH] fix: check for messages first on filter in case a non null request is being sent (#50) * fix: check for messages first on filter in case a non null request is being sent * fix: clean up logs * fix: peer lock --- waku/v2/node/wakunode2.go | 19 +++++++----- waku/v2/node/wakuoptions.go | 6 ++-- waku/v2/protocol/filter/waku_filter.go | 42 ++++++++++++++------------ 3 files changed, 37 insertions(+), 30 deletions(-) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 9618c481..72dd7c90 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -307,11 +307,10 @@ func (w *WakuNode) ID() string { return w.host.ID().Pretty() } -func (w *WakuNode) GetPeerStats() PeerStats { - return w.peers -} - func (w *WakuNode) IsOnline() bool { + w.peersMutex.Lock() + defer w.peersMutex.Unlock() + hasRelay := false hasLightPush := false hasStore := false @@ -340,6 +339,9 @@ func (w *WakuNode) IsOnline() bool { } func (w *WakuNode) HasHistory() bool { + w.peersMutex.Lock() + defer w.peersMutex.Unlock() + for _, v := range w.peers { for _, protocol := range v { if protocol == string(store.WakuStoreProtocolId) { @@ -751,6 +753,8 @@ func (w *WakuNode) ClosePeerById(id peer.ID) error { } func (w *WakuNode) PeerCount() int { + w.peersMutex.Lock() + defer w.peersMutex.Unlock() return len(w.peers) } @@ -800,15 +804,16 @@ func (w *WakuNode) startKeepAlive(t time.Duration) { go func(peer peer.ID) { peerFound := false + w.peersMutex.Lock() for p := range w.peers { if p == peer { peerFound = true break } } - - //log.Info("###PING " + s + " before fetching result") - //logwriter.Write([]byte("###PING " + s + " before fetching result")) + defer w.peersMutex.Unlock() + log.Debug("###PING before fetching result") + pingTicker := time.NewTicker(time.Duration(1) * time.Second) isError := false select { diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 3dbd9c89..13c5eecc 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -96,12 +96,10 @@ func WithWakuRelay(opts ...wakurelay.Option) WakuNodeOption { } } -// WithWakuFilter enables the Waku V2 Filter protocol. This WakuNodeOption -// accepts a list of WakuFilter gossipsub options to setup the protocol -func WithWakuFilter(opts ...wakurelay.Option) WakuNodeOption { +// WithWakuFilter enables the Waku V2 Filter protocol. +func WithWakuFilter() WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableFilter = true - params.wOpts = opts return nil } } diff --git a/waku/v2/protocol/filter/waku_filter.go b/waku/v2/protocol/filter/waku_filter.go index c9be11b6..ec8c63a6 100644 --- a/waku/v2/protocol/filter/waku_filter.go +++ b/waku/v2/protocol/filter/waku_filter.go @@ -95,22 +95,28 @@ func (wf *WakuFilter) onRequest(s network.Stream) { return } - log.Info(fmt.Sprintf("%s: Received query from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer())) + log.Info(fmt.Sprintf("%s: received request from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer())) stats.Record(wf.ctx, metrics.Messages.M(1)) - if filterRPCRequest.Request != nil { + if filterRPCRequest.Push != nil && len(filterRPCRequest.Push.Messages) > 0 { + // We're on a light node. + // This is a message push coming from a full node. + + log.Info("filter light node, received a message push. ", len(filterRPCRequest.Push.Messages), " messages") + wf.pushHandler(filterRPCRequest.RequestId, *filterRPCRequest.Push) + } else if filterRPCRequest.Request != nil { // We're on a full node. // This is a filter request coming from a light node. if filterRPCRequest.Request.Subscribe { subscriber := Subscriber{peer: string(s.Conn().RemotePeer()), requestId: filterRPCRequest.RequestId, filter: *filterRPCRequest.Request} wf.subscribers = append(wf.subscribers, subscriber) - log.Info("Full node, add a filter subscriber ", subscriber) + log.Info("filter full node, add a filter subscriber: ", subscriber.peer) stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len(wf.subscribers)))) } else { peerId := string(s.Conn().RemotePeer()) - log.Info("Full node, remove a filter subscriber ", peerId) + log.Info("filter full node, remove a filter subscriber: ", peerId) contentFilters := filterRPCRequest.Request.ContentFilters var peerIdsToRemove []string for _, subscriber := range wf.subscribers { @@ -151,20 +157,13 @@ func (wf *WakuFilter) onRequest(s network.Stream) { stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len(wf.subscribers)))) } - } else if filterRPCRequest.Push != nil { - // We're on a light node. - // This is a message push coming from a full node. - - log.Info("Light node, received a message push ", *filterRPCRequest.Push) - wf.pushHandler(filterRPCRequest.RequestId, *filterRPCRequest.Push) } - } func (wf *WakuFilter) peerListener() { for e := range wf.peerChan { if e.Connectedness == network.NotConnected { - log.Info("Filter Notification received ", e.Peer) + log.Info("filter Notification received ", e.Peer) i := 0 // Delete subscribers matching deleted peer for _, s := range wf.subscribers { @@ -174,7 +173,7 @@ func (wf *WakuFilter) peerListener() { } } - log.Info("Filter, deleted subscribers: ", len(wf.subscribers)-i) + log.Info("filter, deleted subscribers: ", len(wf.subscribers)-i) wf.subscribers = wf.subscribers[:i] } } @@ -217,20 +216,22 @@ func (wf *WakuFilter) FilterListener() { for _, filter := range subscriber.filter.ContentFilters { if msg.ContentTopic == filter.ContentTopic { - log.Info("Found matching contentTopic ", filter, msg) + log.Info("found matching contentTopic ", filter, msg) msgArr := []*pb.WakuMessage{msg} // Do a message push to light node pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: msgArr}} - log.Info("Pushing a message to light node: ", pushRPC) + log.Info("pushing a message to light node: ", pushRPC) conn, err := wf.h.NewStream(wf.ctx, peer.ID(subscriber.peer), WakuFilterProtocolId) if err != nil { // @TODO more sophisticated error handling here - log.Error("Failed to open peer stream") + log.Error("failed to open peer stream") //waku_filter_errors.inc(labelValues = [dialFailure]) return err } + + defer conn.Close() writer := protoio.NewDelimitedWriter(conn) err = writer.WriteMsg(pushRPC) if err != nil { @@ -262,12 +263,14 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, request pb.FilterRequest) ( conn, err := wf.h.NewStream(ctx, *peer, WakuFilterProtocolId) if conn != nil { + defer conn.Close() + // This is the only successful path to subscription id := protocol.GenerateRequestId() writer := protoio.NewDelimitedWriter(conn) filterRPC := &pb.FilterRPC{RequestId: hex.EncodeToString(id), Request: &request} - log.Info("Sending filterRPC: ", filterRPC) + log.Info("sending filterRPC: ", filterRPC) err = writer.WriteMsg(filterRPC) if err != nil { log.Error("failed to write message", err) @@ -281,7 +284,7 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, request pb.FilterRequest) ( return "", err } } else { - log.Info("Error selecting peer: ", err) + log.Info("error selecting peer: ", err) } return "", nil @@ -294,6 +297,8 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, request pb.FilterRequest) conn, err := wf.h.NewStream(ctx, *peer, WakuFilterProtocolId) if conn != nil { + defer conn.Close() + // This is the only successful path to subscription id := protocol.GenerateRequestId() @@ -303,7 +308,6 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, request pb.FilterRequest) if err != nil { log.Error("failed to write message", err) } - //return some(id) } else { // @TODO more sophisticated error handling here log.Error("failed to connect to remote peer", err)