Perform initial filter subscribe asynchronously (#3608)
Also properly unsubscribe from filter
This commit is contained in:
parent
ccde92377d
commit
860abca690
|
@ -671,22 +671,31 @@ func (w *Waku) runFilterMsgLoop() {
|
|||
if err != nil {
|
||||
w.logger.Error("Failed to subscribe to filter")
|
||||
}
|
||||
continue
|
||||
break
|
||||
}
|
||||
for id, sub := range subMap {
|
||||
err := w.isFilterSubAlive(sub)
|
||||
if err != nil {
|
||||
// Unsubscribe on light node
|
||||
contentFilter := w.buildContentFilter(f.Topics)
|
||||
// TODO Better return value handling for WakuFilterPushResult
|
||||
_, err := w.node.FilterLightnode().Unsubscribe(context.Background(), contentFilter, filter.Peer(sub.PeerID))
|
||||
if err != nil {
|
||||
w.logger.Warn("could not unsubscribe wakuv2 filter for peer", zap.Any("peer", sub.PeerID))
|
||||
continue
|
||||
}
|
||||
|
||||
// Remove entry from maps
|
||||
w.filterPeerDisconnectMap[sub.PeerID] = time.Now().Unix()
|
||||
delete(subMap, id)
|
||||
// TODO Is it necessary to unsubscribe?
|
||||
|
||||
// Re-subscribe
|
||||
peers := w.findFilterPeers()
|
||||
if len(peers) > 0 && len(subMap) < w.settings.MinPeersForFilter {
|
||||
contentFilter := w.buildContentFilter(f.Topics)
|
||||
subDetails, err := w.node.FilterLightnode().Subscribe(context.Background(), contentFilter, filter.WithPeer(peers[0]))
|
||||
if err != nil {
|
||||
w.logger.Warn("could not add wakuv2 filter for peer", zap.Any("peer", peers[0]))
|
||||
break
|
||||
}
|
||||
|
||||
subMap[subDetails.ID] = subDetails
|
||||
|
@ -1003,10 +1012,15 @@ func (w *Waku) Subscribe(f *common.Filter) (string, error) {
|
|||
}
|
||||
|
||||
if w.settings.LightClient {
|
||||
err = w.subscribeToFilter(f)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
go func() {
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
for range ticker.C {
|
||||
err := w.subscribeToFilter(f)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return s, nil
|
||||
|
|
|
@ -214,7 +214,7 @@ func TestWakuV2Filter(t *testing.T) {
|
|||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
// Ensure there is 1 active filter subscription
|
||||
require.Len(t, w.filterSubscriptions, 1)
|
||||
|
|
Loading…
Reference in New Issue