diff --git a/notify.go b/notify.go index c11f621..7965263 100644 --- a/notify.go +++ b/notify.go @@ -21,12 +21,15 @@ func (p *PubSubNotif) Connected(n network.Network, c network.Conn) { if c.Stat().Transient { return } - go func() { - select { - case p.newPeers <- c.RemotePeer(): - case <-p.ctx.Done(): - } - }() + + p.newPeersMx.Lock() + defer p.newPeersMx.Unlock() + + p.newPeersPend[c.RemotePeer()] = struct{}{} + select { + case p.newPeers <- struct{}{}: + default: + } } func (p *PubSubNotif) Disconnected(n network.Network, c network.Conn) { @@ -49,13 +52,18 @@ func (p *PubSubNotif) Initialize() { return true } + p.newPeersMx.Lock() + defer p.newPeersMx.Unlock() + for _, pid := range p.host.Network().Peers() { if isTransient(pid) { continue } - select { - case p.newPeers <- pid: - case <-p.ctx.Done(): - } + p.newPeersPend[pid] = struct{}{} + } + + select { + case p.newPeers <- struct{}{}: + default: } } diff --git a/pubsub.go b/pubsub.go index 3c6c75b..cf0fb3b 100644 --- a/pubsub.go +++ b/pubsub.go @@ -89,8 +89,10 @@ type PubSub struct { // removeTopic is a topic cancellation channel rmTopic chan *rmTopicReq - // a notification channel for new peer connections - newPeers chan peer.ID + // a notification channel for new peer connections accumulated + newPeers chan struct{} + newPeersMx sync.Mutex + newPeersPend map[peer.ID]struct{} // a notification channel for new outoging peer streams newPeerStream chan network.Stream @@ -231,7 +233,8 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option signKey: nil, signPolicy: StrictSign, incoming: make(chan *RPC, 32), - newPeers: make(chan peer.ID), + newPeers: make(chan struct{}, 1), + newPeersPend: make(map[peer.ID]struct{}), newPeerStream: make(chan network.Stream), newPeerError: make(chan peer.ID), peerDead: make(chan peer.ID), @@ -480,21 +483,8 @@ func (p *PubSub) processLoop(ctx context.Context) { for { select { - case pid := <-p.newPeers: - if _, ok := p.peers[pid]; ok { - log.Debug("already have connection to peer: ", pid) - continue - } - - if p.blacklist.Contains(pid) { - log.Warn("ignoring connection from blacklisted peer: ", pid) - continue - } - - messages := make(chan *RPC, p.peerOutboundQueueSize) - messages <- p.getHelloPacket() - go p.handleNewPeer(ctx, pid, messages) - p.peers[pid] = messages + case <-p.newPeers: + p.handlePendingPeers() case s := <-p.newPeerStream: pid := s.Conn().RemotePeer() @@ -621,6 +611,35 @@ func (p *PubSub) processLoop(ctx context.Context) { } } +func (p *PubSub) handlePendingPeers() { + p.newPeersMx.Lock() + defer p.newPeersMx.Unlock() + + if len(p.newPeersPend) == 0 { + return + } + + newPeers := p.newPeersPend + p.newPeersPend = make(map[peer.ID]struct{}) + + for pid := range newPeers { + if _, ok := p.peers[pid]; ok { + log.Debug("already have connection to peer: ", pid) + continue + } + + if p.blacklist.Contains(pid) { + log.Warn("ignoring connection from blacklisted peer: ", pid) + continue + } + + messages := make(chan *RPC, p.peerOutboundQueueSize) + messages <- p.getHelloPacket() + go p.handleNewPeer(p.ctx, pid, messages) + p.peers[pid] = messages + } +} + // handleAddTopic adds a tracker for a particular topic. // Only called from processLoop. func (p *PubSub) handleAddTopic(req *addTopicReq) {