mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-09 08:13:10 +00:00
use a priority lock instead of a semaphore
so that there is no case of infinite accumulation of pending peers in the queue. also adds a connectedness check before adding the peer.
This commit is contained in:
parent
0732576319
commit
9d86090f42
24
notify.go
24
notify.go
@ -23,14 +23,11 @@ func (p *PubSubNotif) Connected(n network.Network, c network.Conn) {
|
||||
}
|
||||
|
||||
go func() {
|
||||
select {
|
||||
case <-p.newPeersSema:
|
||||
case <-p.ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
p.newPeersPrioLk.RLock()
|
||||
p.newPeersMx.Lock()
|
||||
p.newPeersPend[c.RemotePeer()] = struct{}{}
|
||||
p.newPeersSema <- struct{}{}
|
||||
p.newPeersMx.Unlock()
|
||||
p.newPeersPrioLk.RUnlock()
|
||||
|
||||
select {
|
||||
case p.newPeers <- struct{}{}:
|
||||
@ -59,20 +56,17 @@ func (p *PubSubNotif) Initialize() {
|
||||
return true
|
||||
}
|
||||
|
||||
select {
|
||||
case <-p.newPeersSema:
|
||||
case <-p.ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
p.newPeersPrioLk.RLock()
|
||||
p.newPeersMx.Lock()
|
||||
for _, pid := range p.host.Network().Peers() {
|
||||
if isTransient(pid) {
|
||||
continue
|
||||
}
|
||||
|
||||
p.newPeersPend[pid] = struct{}{}
|
||||
}
|
||||
|
||||
p.newPeersSema <- struct{}{}
|
||||
p.newPeersMx.Unlock()
|
||||
p.newPeersPrioLk.RUnlock()
|
||||
|
||||
select {
|
||||
case p.newPeers <- struct{}{}:
|
||||
|
||||
26
pubsub.go
26
pubsub.go
@ -90,9 +90,10 @@ type PubSub struct {
|
||||
rmTopic chan *rmTopicReq
|
||||
|
||||
// a notification channel for new peer connections accumulated
|
||||
newPeers chan struct{}
|
||||
newPeersSema chan struct{}
|
||||
newPeersPend map[peer.ID]struct{}
|
||||
newPeers chan struct{}
|
||||
newPeersPrioLk sync.RWMutex
|
||||
newPeersMx sync.Mutex
|
||||
newPeersPend map[peer.ID]struct{}
|
||||
|
||||
// a notification channel for new outoging peer streams
|
||||
newPeerStream chan network.Stream
|
||||
@ -234,7 +235,6 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
|
||||
signPolicy: StrictSign,
|
||||
incoming: make(chan *RPC, 32),
|
||||
newPeers: make(chan struct{}, 1),
|
||||
newPeersSema: make(chan struct{}, 1),
|
||||
newPeersPend: make(map[peer.ID]struct{}),
|
||||
newPeerStream: make(chan network.Stream),
|
||||
newPeerError: make(chan peer.ID),
|
||||
@ -264,8 +264,6 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
|
||||
counter: uint64(time.Now().UnixNano()),
|
||||
}
|
||||
|
||||
ps.newPeersSema <- struct{}{}
|
||||
|
||||
for _, opt := range opts {
|
||||
err := opt(ps)
|
||||
if err != nil {
|
||||
@ -615,16 +613,8 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
||||
}
|
||||
|
||||
func (p *PubSub) handlePendingPeers() {
|
||||
select {
|
||||
case <-p.newPeersSema:
|
||||
defer func() {
|
||||
p.newPeersSema <- struct{}{}
|
||||
}()
|
||||
|
||||
default:
|
||||
// contention, return and wait for the next notification without blocking the event loop
|
||||
return
|
||||
}
|
||||
p.newPeersPrioLk.Lock()
|
||||
defer p.newPeersPrioLk.Unlock()
|
||||
|
||||
if len(p.newPeersPend) == 0 {
|
||||
return
|
||||
@ -634,6 +624,10 @@ func (p *PubSub) handlePendingPeers() {
|
||||
p.newPeersPend = make(map[peer.ID]struct{})
|
||||
|
||||
for pid := range newPeers {
|
||||
if p.host.Network().Connectedness(pid) != network.Connected {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, ok := p.peers[pid]; ok {
|
||||
log.Debug("already have connection to peer: ", pid)
|
||||
continue
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user