fix: return appropriate errors in filter unsubscribe (#941)

This commit is contained in:
Prem Chaitanya Prathi 2023-12-01 06:27:13 +05:30 committed by GitHub
parent b7105f9b9f
commit ac1a699171
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 36 additions and 13 deletions

View File

@ -36,7 +36,8 @@ import (
const FilterPushID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter-push/2.0.0-beta1")
var (
ErrNoPeersAvailable = errors.New("no suitable remote peers")
ErrNoPeersAvailable = errors.New("no suitable remote peers")
ErrSubscriptionNotFound = errors.New("subscription not found")
)
type WakuFilterLightNode struct {
@ -110,19 +111,21 @@ func (wf *WakuFilterLightNode) start() error {
func (wf *WakuFilterLightNode) Stop() {
wf.CommonService.Stop(func() {
wf.h.RemoveStreamHandler(FilterPushID_v20beta1)
res, err := wf.unsubscribeAll(wf.Context())
if err != nil {
wf.log.Warn("unsubscribing from full nodes", zap.Error(err))
}
for _, r := range res.Errors() {
if r.Err != nil {
wf.log.Warn("unsubscribing from full nodes", zap.Error(r.Err), logging.HostID("peerID", r.PeerID))
if wf.subscriptions.Count() > 0 {
res, err := wf.unsubscribeAll(wf.Context())
if err != nil {
wf.log.Warn("unsubscribing from full nodes", zap.Error(err))
}
for _, r := range res.Errors() {
if r.Err != nil {
wf.log.Warn("unsubscribing from full nodes", zap.Error(r.Err), logging.HostID("peerID", r.PeerID))
}
}
//
wf.subscriptions.Clear()
}
//
wf.subscriptions.Clear()
})
}
@ -485,6 +488,13 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr
peers := make(map[peer.ID]struct{})
subs := wf.subscriptions.GetSubscription(params.selectedPeer, cFilter)
if len(subs) == 0 {
result.Add(WakuFilterPushError{
Err: ErrSubscriptionNotFound,
PeerID: params.selectedPeer,
})
continue
}
for _, sub := range subs {
sub.Remove(cTopics...)
peers[sub.PeerID] = struct{}{}
@ -583,14 +593,21 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte
if err != nil {
return nil, err
}
result := &WakuFilterPushResult{}
peers := make(map[peer.ID]struct{})
subs := wf.subscriptions.GetSubscription(params.selectedPeer, protocol.ContentFilter{})
if len(subs) == 0 && params.selectedPeer != "" {
result.Add(WakuFilterPushError{
Err: err,
PeerID: params.selectedPeer,
})
return result, ErrSubscriptionNotFound
}
for _, sub := range subs {
sub.Close()
peers[sub.PeerID] = struct{}{}
}
result := &WakuFilterPushResult{}
if params.wg != nil {
params.wg.Add(len(peers))
}

View File

@ -190,7 +190,7 @@ func (s *FilterTestSuite) waitForMessages(fn func(), subs []*subscription.Subscr
contentTopic: env.Message().GetContentTopic(),
payload: string(env.Message().GetPayload()),
}
s.log.Info("received message ", zap.String("pubSubTopic", received.pubSubTopic), zap.String("contentTopic", received.contentTopic), zap.String("payload", received.payload))
s.log.Debug("received message ", zap.String("pubSubTopic", received.pubSubTopic), zap.String("contentTopic", received.contentTopic), zap.String("payload", received.payload))
if matchOneOfManyMsg(received, expected) {
found++
}

View File

@ -28,6 +28,12 @@ func NewSubscriptionMap(logger *zap.Logger) *SubscriptionsMap {
}
}
func (m *SubscriptionsMap) Count() int {
m.RLock()
defer m.RUnlock()
return len(m.items)
}
func (m *SubscriptionsMap) IsListening(pubsubTopic, contentTopic string) bool {
m.RLock()
defer m.RUnlock()