From 860abca690f7da4ce2c3860eaffb473a72c5aaf4 Mon Sep 17 00:00:00 2001 From: Vitaliy Vlasov Date: Thu, 15 Jun 2023 17:42:54 +0300 Subject: [PATCH] Perform initial filter subscribe asynchronously (#3608) Also properly unsubscribe from filter --- wakuv2/waku.go | 28 +++++++++++++++++++++------- wakuv2/waku_test.go | 2 +- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 702960609..d265b9df9 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -671,22 +671,31 @@ func (w *Waku) runFilterMsgLoop() { if err != nil { w.logger.Error("Failed to subscribe to filter") } - continue + break } for id, sub := range subMap { err := w.isFilterSubAlive(sub) if err != nil { + // Unsubscribe on light node + contentFilter := w.buildContentFilter(f.Topics) + // TODO Better return value handling for WakuFilterPushResult + _, err := w.node.FilterLightnode().Unsubscribe(context.Background(), contentFilter, filter.Peer(sub.PeerID)) + if err != nil { + w.logger.Warn("could not unsubscribe wakuv2 filter for peer", zap.Any("peer", sub.PeerID)) + continue + } + + // Remove entry from maps w.filterPeerDisconnectMap[sub.PeerID] = time.Now().Unix() delete(subMap, id) - // TODO Is it necessary to unsubscribe? // Re-subscribe peers := w.findFilterPeers() if len(peers) > 0 && len(subMap) < w.settings.MinPeersForFilter { - contentFilter := w.buildContentFilter(f.Topics) subDetails, err := w.node.FilterLightnode().Subscribe(context.Background(), contentFilter, filter.WithPeer(peers[0])) if err != nil { w.logger.Warn("could not add wakuv2 filter for peer", zap.Any("peer", peers[0])) + break } subMap[subDetails.ID] = subDetails @@ -1003,10 +1012,15 @@ func (w *Waku) Subscribe(f *common.Filter) (string, error) { } if w.settings.LightClient { - err = w.subscribeToFilter(f) - if err != nil { - return "", err - } + go func() { + ticker := time.NewTicker(1 * time.Second) + for range ticker.C { + err := w.subscribeToFilter(f) + if err == nil { + break + } + } + }() } return s, nil diff --git a/wakuv2/waku_test.go b/wakuv2/waku_test.go index 3e51d53b7..0160f0a24 100644 --- a/wakuv2/waku_test.go +++ b/wakuv2/waku_test.go @@ -214,7 +214,7 @@ func TestWakuV2Filter(t *testing.T) { }) require.NoError(t, err) - time.Sleep(1 * time.Second) + time.Sleep(5 * time.Second) // Ensure there is 1 active filter subscription require.Len(t, w.filterSubscriptions, 1)