fix: race condition in peer connector / manager interaction (#837)

This commit is contained in:
richΛrd 2023-10-25 21:25:56 -04:00 committed by GitHub
parent 0ba8b2caeb
commit c58d0f51e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 16 additions and 7 deletions

View File

@ -35,12 +35,17 @@ type PeerConnectionStrategy struct {
paused atomic.Bool paused atomic.Bool
dialTimeout time.Duration dialTimeout time.Duration
*CommonDiscoveryService *CommonDiscoveryService
subscriptions []<-chan PeerData subscriptions []subscription
backoff backoff.BackoffFactory backoff backoff.BackoffFactory
logger *zap.Logger logger *zap.Logger
} }
type subscription struct {
ctx context.Context
ch <-chan PeerData
}
// backoff describes the strategy used to decide how long to backoff after previously attempting to connect to a peer // backoff describes the strategy used to decide how long to backoff after previously attempting to connect to a peer
func getBackOff() backoff.BackoffFactory { func getBackOff() backoff.BackoffFactory {
rngSrc := rand.NewSource(rand.Int63()) rngSrc := rand.NewSource(rand.Int63())
@ -85,7 +90,7 @@ func (c *PeerConnectionStrategy) Subscribe(ctx context.Context, ch <-chan PeerDa
// if not running yet, store the subscription and return // if not running yet, store the subscription and return
if err := c.ErrOnNotRunning(); err != nil { if err := c.ErrOnNotRunning(); err != nil {
c.mux.Lock() c.mux.Lock()
c.subscriptions = append(c.subscriptions, ch) c.subscriptions = append(c.subscriptions, subscription{ctx, ch})
c.mux.Unlock() c.mux.Unlock()
return return
} }
@ -93,16 +98,18 @@ func (c *PeerConnectionStrategy) Subscribe(ctx context.Context, ch <-chan PeerDa
c.WaitGroup().Add(1) c.WaitGroup().Add(1)
go func() { go func() {
defer c.WaitGroup().Done() defer c.WaitGroup().Done()
c.consumeSubscription(ch) c.consumeSubscription(subscription{ctx, ch})
}() }()
} }
func (c *PeerConnectionStrategy) consumeSubscription(ch <-chan PeerData) { func (c *PeerConnectionStrategy) consumeSubscription(s subscription) {
for { for {
// for returning from the loop when peerConnector is paused. // for returning from the loop when peerConnector is paused.
select { select {
case <-c.Context().Done(): case <-c.Context().Done():
return return
case <-s.ctx.Done():
return
default: default:
} }
// //
@ -110,7 +117,9 @@ func (c *PeerConnectionStrategy) consumeSubscription(ch <-chan PeerData) {
select { select {
case <-c.Context().Done(): case <-c.Context().Done():
return return
case p, ok := <-ch: case <-s.ctx.Done():
return
case p, ok := <-s.ch:
if !ok { if !ok {
return return
} }
@ -166,7 +175,7 @@ func (c *PeerConnectionStrategy) isPaused() bool {
func (c *PeerConnectionStrategy) consumeSubscriptions() { func (c *PeerConnectionStrategy) consumeSubscriptions() {
for _, subs := range c.subscriptions { for _, subs := range c.subscriptions {
c.WaitGroup().Add(1) c.WaitGroup().Add(1)
go func(s <-chan PeerData) { go func(s subscription) {
defer c.WaitGroup().Done() defer c.WaitGroup().Done()
c.consumeSubscription(s) c.consumeSubscription(s)
}(subs) }(subs)

View File

@ -339,7 +339,7 @@ func (pm *PeerManager) AddDiscoveredPeer(p PeerData, connectNow bool) {
} }
} }
if connectNow { if connectNow {
pm.peerConnector.PushToChan(p) go pm.peerConnector.PushToChan(p)
} }
} }