feat: optimize filter subs (#1144)

Co-authored-by: richΛrd <info@richardramos.me>
This commit is contained in:
Prem Chaitanya Prathi 2024-07-01 19:48:00 +05:30 committed by GitHub
parent e3d7ab1d58
commit 5b5ea977af
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 21 additions and 14 deletions

View File

@ -71,8 +71,12 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte
return sub, nil return sub, nil
} }
func (apiSub *Sub) Unsubscribe() { func (apiSub *Sub) Unsubscribe(contentFilter protocol.ContentFilter) {
apiSub.cancel() _, err := apiSub.wf.Unsubscribe(apiSub.ctx, contentFilter)
//Not reading result unless we want to do specific error handling?
if err != nil {
apiSub.log.Debug("failed to unsubscribe", zap.Error(err), zap.Stringer("content-filter", contentFilter))
}
} }
func (apiSub *Sub) subscriptionLoop() { func (apiSub *Sub) subscriptionLoop() {

View File

@ -46,9 +46,9 @@ func (s *FilterApiTestSuite) TestSubscribe() {
s.Require().Equal(apiConfig.MaxPeers, 2) s.Require().Equal(apiConfig.MaxPeers, 2)
s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic) s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic)
ctx, cancel := context.WithCancel(context.Background())
s.Log.Info("About to perform API Subscribe()") s.Log.Info("About to perform API Subscribe()")
apiSub, err := Subscribe(context.Background(), s.LightNode, contentFilter, apiConfig, s.Log) apiSub, err := Subscribe(ctx, s.LightNode, contentFilter, apiConfig, s.Log)
s.Require().NoError(err) s.Require().NoError(err)
s.Require().Equal(apiSub.ContentFilter, contentFilter) s.Require().Equal(apiSub.ContentFilter, contentFilter)
s.Log.Info("Subscribed") s.Log.Info("Subscribed")
@ -89,7 +89,8 @@ func (s *FilterApiTestSuite) TestSubscribe() {
s.Require().NotEqual(fullNodeData2.FullNodeHost.ID(), sub.PeerID) s.Require().NotEqual(fullNodeData2.FullNodeHost.ID(), sub.PeerID)
} }
apiSub.Unsubscribe() apiSub.Unsubscribe(contentFilter)
cancel()
for range apiSub.DataCh { for range apiSub.DataCh {
} }
s.Log.Info("DataCh is closed") s.Log.Info("DataCh is closed")

View File

@ -37,6 +37,8 @@ type SubscriptionDetails struct {
} }
func (s *SubscriptionDetails) Add(contentTopics ...string) { func (s *SubscriptionDetails) Add(contentTopics ...string) {
s.mapRef.Lock()
defer s.mapRef.Unlock()
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
@ -44,14 +46,14 @@ func (s *SubscriptionDetails) Add(contentTopics ...string) {
if _, ok := s.ContentFilter.ContentTopics[ct]; !ok { if _, ok := s.ContentFilter.ContentTopics[ct]; !ok {
s.ContentFilter.ContentTopics[ct] = struct{}{} s.ContentFilter.ContentTopics[ct] = struct{}{}
// Increase the number of subscriptions for this (pubsubTopic, contentTopic) pair // Increase the number of subscriptions for this (pubsubTopic, contentTopic) pair
s.mapRef.Lock()
s.mapRef.increaseSubFor(s.ContentFilter.PubsubTopic, ct) s.mapRef.increaseSubFor(s.ContentFilter.PubsubTopic, ct)
s.mapRef.Unlock()
} }
} }
} }
func (s *SubscriptionDetails) Remove(contentTopics ...string) { func (s *SubscriptionDetails) Remove(contentTopics ...string) {
s.mapRef.Lock()
defer s.mapRef.Unlock()
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
@ -59,15 +61,13 @@ func (s *SubscriptionDetails) Remove(contentTopics ...string) {
if _, ok := s.ContentFilter.ContentTopics[ct]; ok { if _, ok := s.ContentFilter.ContentTopics[ct]; ok {
delete(s.ContentFilter.ContentTopics, ct) delete(s.ContentFilter.ContentTopics, ct)
// Decrease the number of subscriptions for this (pubsubTopic, contentTopic) pair // Decrease the number of subscriptions for this (pubsubTopic, contentTopic) pair
s.mapRef.Lock()
s.mapRef.decreaseSubFor(s.ContentFilter.PubsubTopic, ct) s.mapRef.decreaseSubFor(s.ContentFilter.PubsubTopic, ct)
s.mapRef.Unlock()
} }
} }
if len(s.ContentFilter.ContentTopics) == 0 { if len(s.ContentFilter.ContentTopics) == 0 {
// err doesn't matter // err doesn't matter
_ = s.mapRef.Delete(s) _ = s.mapRef.DeleteNoLock(s)
} }
} }
@ -105,7 +105,9 @@ func (s *SubscriptionDetails) CloseC() {
func (s *SubscriptionDetails) Close() error { func (s *SubscriptionDetails) Close() error {
s.CloseC() s.CloseC()
return s.mapRef.Delete(s) s.mapRef.Lock()
defer s.mapRef.Unlock()
return s.mapRef.DeleteNoLock(s)
} }
func (s *SubscriptionDetails) SetClosing() { func (s *SubscriptionDetails) SetClosing() {

View File

@ -130,9 +130,9 @@ func (sub *SubscriptionsMap) Has(peerID peer.ID, cf protocol.ContentFilter) bool
return true return true
} }
func (sub *SubscriptionsMap) Delete(subscription *SubscriptionDetails) error {
sub.Lock() // Caller has to acquire lock before invoking this method.This is done to avoid possible deadlock
defer sub.Unlock() func (sub *SubscriptionsMap) DeleteNoLock(subscription *SubscriptionDetails) error {
peerSubscription, ok := sub.items[subscription.PeerID] peerSubscription, ok := sub.items[subscription.PeerID]
if !ok { if !ok {