diff --git a/waku/v2/protocol/subscription/subscription_details.go b/waku/v2/protocol/subscription/subscription_details.go index b94f38ef..5131cc11 100644 --- a/waku/v2/protocol/subscription/subscription_details.go +++ b/waku/v2/protocol/subscription/subscription_details.go @@ -111,11 +111,18 @@ func (s *SubscriptionDetails) Close() error { } func (s *SubscriptionDetails) SetClosing() { + // Flip the Closed flag under s.Lock(), but do NOT hold s.mu across the + // channel send s.Lock() - defer s.Unlock() - if !s.Closed { - s.Closed = true - s.Closing <- true + shouldSignal := !s.Closed + s.Closed = true + s.Unlock() + + if shouldSignal { + select { + case s.Closing <- true: + default: + } } } diff --git a/waku/v2/protocol/subscription/subscriptions_map.go b/waku/v2/protocol/subscription/subscriptions_map.go index a6621f13..f040bf5c 100644 --- a/waku/v2/protocol/subscription/subscriptions_map.go +++ b/waku/v2/protocol/subscription/subscriptions_map.go @@ -76,7 +76,7 @@ func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, cf protocol.Content PeerID: peerID, C: make(chan *protocol.Envelope, 1024), ContentFilter: protocol.ContentFilter{PubsubTopic: cf.PubsubTopic, ContentTopics: maps.Clone(cf.ContentTopics)}, - Closing: make(chan bool), + Closing: make(chan bool, 1), } // Increase the number of subscriptions for this (pubsubTopic, contentTopic) pair diff --git a/waku/v2/protocol/subscription/subscriptions_map_test.go b/waku/v2/protocol/subscription/subscriptions_map_test.go index 01a3b788..ab031b17 100644 --- a/waku/v2/protocol/subscription/subscriptions_map_test.go +++ b/waku/v2/protocol/subscription/subscriptions_map_test.go @@ -217,3 +217,51 @@ func TestSubscriptionsNotify(t *testing.T) { wg.Wait() <-successChan } + +// TestSetClosingDoesNotHoldInnerLock verifies that SetClosing does not leave +// the SubscriptionDetails RWMutex held when the Closing channel has no ready +// receiver +func TestSetClosingDoesNotHoldInnerLock(t *testing.T) { + fmap := NewSubscriptionMap(utils.Logger()) + peerID := createPeerID(t) + sub := fmap.NewSubscription(peerID, protocol.ContentFilter{ + PubsubTopic: PUBSUB_TOPIC, + ContentTopics: protocol.NewContentTopicSet("ct1"), + }) + + // Intentionally do NOT spawn a receiver on sub.Closing — reproduces the + // scenario where the api/filter multiplex goroutine or its downstream + // apiSub.closing consumer is stalled (needing outer mapRef.Lock that + // another goroutine holds as outer RLock). + setClosingDone := make(chan struct{}) + go func() { + sub.SetClosing() + close(setClosingDone) + }() + + // Give SetClosing time to reach the blocking send (if unpatched). + time.Sleep(50 * time.Millisecond) + + // A parallel reader that exercises the real GetSubscriptionsForPeer -> + // isPartOf path. isPartOf takes s.RLock() on the SubscriptionDetails. + readerDone := make(chan []*SubscriptionDetails, 1) + go func() { + readerDone <- fmap.GetSubscriptionsForPeer(peerID, protocol.ContentFilter{}) + }() + + select { + case subs := <-readerDone: + require.Len(t, subs, 1) + case <-time.After(time.Second): + t.Fatal("reader blocked: SetClosing is holding SubscriptionDetails lock while sending on unbuffered Closing channel (deadlock)") + } + + // After the fix, SetClosing itself should also complete — either because + // the channel is buffered(1) and the send is instant, or because a select- + // default drops the send when no one is reading. Either is acceptable. + select { + case <-setClosingDone: + case <-time.After(time.Second): + t.Fatal("SetClosing never returned without a receiver — inner lock or channel send is still blocking") + } +}