mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-02 12:53:09 +00:00
don't accumulate pending goroutines from new connections
This commit is contained in:
parent
65f48fb2c8
commit
1f8e231861
28
notify.go
28
notify.go
@ -21,12 +21,15 @@ func (p *PubSubNotif) Connected(n network.Network, c network.Conn) {
|
||||
if c.Stat().Transient {
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
select {
|
||||
case p.newPeers <- c.RemotePeer():
|
||||
case <-p.ctx.Done():
|
||||
}
|
||||
}()
|
||||
|
||||
p.newPeersMx.Lock()
|
||||
defer p.newPeersMx.Unlock()
|
||||
|
||||
p.newPeersPend[c.RemotePeer()] = struct{}{}
|
||||
select {
|
||||
case p.newPeers <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PubSubNotif) Disconnected(n network.Network, c network.Conn) {
|
||||
@ -49,13 +52,18 @@ func (p *PubSubNotif) Initialize() {
|
||||
return true
|
||||
}
|
||||
|
||||
p.newPeersMx.Lock()
|
||||
defer p.newPeersMx.Unlock()
|
||||
|
||||
for _, pid := range p.host.Network().Peers() {
|
||||
if isTransient(pid) {
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case p.newPeers <- pid:
|
||||
case <-p.ctx.Done():
|
||||
}
|
||||
p.newPeersPend[pid] = struct{}{}
|
||||
}
|
||||
|
||||
select {
|
||||
case p.newPeers <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
55
pubsub.go
55
pubsub.go
@ -89,8 +89,10 @@ type PubSub struct {
|
||||
// removeTopic is a topic cancellation channel
|
||||
rmTopic chan *rmTopicReq
|
||||
|
||||
// a notification channel for new peer connections
|
||||
newPeers chan peer.ID
|
||||
// a notification channel for new peer connections accumulated
|
||||
newPeers chan struct{}
|
||||
newPeersMx sync.Mutex
|
||||
newPeersPend map[peer.ID]struct{}
|
||||
|
||||
// a notification channel for new outoging peer streams
|
||||
newPeerStream chan network.Stream
|
||||
@ -231,7 +233,8 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
|
||||
signKey: nil,
|
||||
signPolicy: StrictSign,
|
||||
incoming: make(chan *RPC, 32),
|
||||
newPeers: make(chan peer.ID),
|
||||
newPeers: make(chan struct{}, 1),
|
||||
newPeersPend: make(map[peer.ID]struct{}),
|
||||
newPeerStream: make(chan network.Stream),
|
||||
newPeerError: make(chan peer.ID),
|
||||
peerDead: make(chan peer.ID),
|
||||
@ -480,21 +483,8 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
||||
|
||||
for {
|
||||
select {
|
||||
case pid := <-p.newPeers:
|
||||
if _, ok := p.peers[pid]; ok {
|
||||
log.Debug("already have connection to peer: ", pid)
|
||||
continue
|
||||
}
|
||||
|
||||
if p.blacklist.Contains(pid) {
|
||||
log.Warn("ignoring connection from blacklisted peer: ", pid)
|
||||
continue
|
||||
}
|
||||
|
||||
messages := make(chan *RPC, p.peerOutboundQueueSize)
|
||||
messages <- p.getHelloPacket()
|
||||
go p.handleNewPeer(ctx, pid, messages)
|
||||
p.peers[pid] = messages
|
||||
case <-p.newPeers:
|
||||
p.handlePendingPeers()
|
||||
|
||||
case s := <-p.newPeerStream:
|
||||
pid := s.Conn().RemotePeer()
|
||||
@ -621,6 +611,35 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PubSub) handlePendingPeers() {
|
||||
p.newPeersMx.Lock()
|
||||
defer p.newPeersMx.Unlock()
|
||||
|
||||
if len(p.newPeersPend) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
newPeers := p.newPeersPend
|
||||
p.newPeersPend = make(map[peer.ID]struct{})
|
||||
|
||||
for pid := range newPeers {
|
||||
if _, ok := p.peers[pid]; ok {
|
||||
log.Debug("already have connection to peer: ", pid)
|
||||
continue
|
||||
}
|
||||
|
||||
if p.blacklist.Contains(pid) {
|
||||
log.Warn("ignoring connection from blacklisted peer: ", pid)
|
||||
continue
|
||||
}
|
||||
|
||||
messages := make(chan *RPC, p.peerOutboundQueueSize)
|
||||
messages <- p.getHelloPacket()
|
||||
go p.handleNewPeer(p.ctx, pid, messages)
|
||||
p.peers[pid] = messages
|
||||
}
|
||||
}
|
||||
|
||||
// handleAddTopic adds a tracker for a particular topic.
|
||||
// Only called from processLoop.
|
||||
func (p *PubSub) handleAddTopic(req *addTopicReq) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user