From 761ae88bbdeabcdffcb8d8f53642b611bba32255 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 6 Oct 2021 15:25:41 -0400 Subject: [PATCH] fix: remove channel for disconnections Since a subscriber might not be connected always it makes no sense to automatically unsubscribe on disconnet --- waku/v2/node/connectedness.go | 4 +--- waku/v2/protocol/filter/waku_filter.go | 24 +----------------------- 2 files changed, 2 insertions(+), 26 deletions(-) diff --git a/waku/v2/node/connectedness.go b/waku/v2/node/connectedness.go index 39eb14cd..570b4006 100644 --- a/waku/v2/node/connectedness.go +++ b/waku/v2/node/connectedness.go @@ -83,9 +83,7 @@ func (w *WakuNode) connectednessListener() { return case <-w.protocolEventSub.Out(): case <-w.identificationEventSub.Out(): - case p := <-w.connectionNotif.DisconnectChan: - // Notify filter of disconnection - w.filter.DisconnectChan <- p + case <-w.connectionNotif.DisconnectChan: } w.sendConnStatus() } diff --git a/waku/v2/protocol/filter/waku_filter.go b/waku/v2/protocol/filter/waku_filter.go index cb57346f..6e43bdb1 100644 --- a/waku/v2/protocol/filter/waku_filter.go +++ b/waku/v2/protocol/filter/waku_filter.go @@ -46,8 +46,6 @@ type ( subscribers []Subscriber pushHandler MessagePushHandler MsgC chan *protocol.Envelope - - DisconnectChan chan peer.ID } ) @@ -158,24 +156,6 @@ func (wf *WakuFilter) onRequest(s network.Stream) { } } -func (wf *WakuFilter) peerListener() { - for peerID := range wf.DisconnectChan { - log.Info("filter Notification received ", peerID) - i := 0 - // Delete subscribers matching deleted peer - for _, s := range wf.subscribers { - if s.peer != peerID { - wf.subscribers[i] = s - i++ - } - } - - log.Info("filter, deleted subscribers: ", len(wf.subscribers)-i) - wf.subscribers = wf.subscribers[:i] - } - -} - func NewWakuFilter(ctx context.Context, host host.Host, handler MessagePushHandler) *WakuFilter { ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter")) if err != nil { @@ -187,11 +167,9 @@ func NewWakuFilter(ctx context.Context, host host.Host, handler MessagePushHandl wf.MsgC = make(chan *protocol.Envelope) wf.h = host wf.pushHandler = handler - wf.DisconnectChan = make(chan peer.ID) wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest) go wf.FilterListener() - go wf.peerListener() return wf } @@ -220,7 +198,7 @@ func (wf *WakuFilter) FilterListener() { log.Info("pushing a message to light node: ", pushRPC) conn, err := wf.h.NewStream(wf.ctx, peer.ID(subscriber.peer), FilterID_v20beta1) - + // TODO: keep track of errors to automatically unsubscribe a peer? if err != nil { // @TODO more sophisticated error handling here log.Error("failed to open peer stream")