From b289ec5017973e72716ea44920fdb14f2e718632 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 13 Jul 2021 20:24:29 +0300 Subject: [PATCH] batch process dead peer notifications --- comm.go | 35 +++++++++++++++--------- pubsub.go | 80 ++++++++++++++++++++++++++++++++++--------------------- 2 files changed, 72 insertions(+), 43 deletions(-) diff --git a/comm.go b/comm.go index 29f3cea..d45ce80 100644 --- a/comm.go +++ b/comm.go @@ -88,21 +88,31 @@ func (p *PubSub) handleNewStream(s network.Stream) { } } +func (p *PubSub) notifyPeerDead(pid peer.ID) { + p.peerDeadPrioLk.RLock() + p.peerDeadMx.Lock() + p.peerDeadPend[pid] = struct{}{} + p.peerDeadMx.Unlock() + p.peerDeadPrioLk.RUnlock() + + select { + case p.peerDead <- struct{}{}: + default: + } +} + func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan *RPC) { s, err := p.host.NewStream(p.ctx, pid, p.rt.Protocols()...) if err != nil { log.Debug("opening new stream to peer: ", err, pid) - var ch chan peer.ID if err == ms.ErrNotSupported { - ch = p.newPeerError + select { + case p.newPeerError <- pid: + case <-ctx.Done(): + } } else { - ch = p.peerDead - } - - select { - case ch <- pid: - case <-ctx.Done(): + p.notifyPeerDead(pid) } return } @@ -116,18 +126,17 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan } func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) { + pid := s.Conn().RemotePeer() r := protoio.NewDelimitedReader(s, p.maxMessageSize) rpc := new(RPC) for { err := r.ReadMsg(&rpc.RPC) if err != nil { - select { - case p.peerDead <- s.Conn().RemotePeer(): - case <-ctx.Done(): - } + p.notifyPeerDead(pid) return } - log.Debugf("unexpected message from %s", s.Conn().RemotePeer()) + + log.Debugf("unexpected message from %s", pid) } } diff --git a/pubsub.go b/pubsub.go index bd28084..45a397f 100644 --- a/pubsub.go +++ b/pubsub.go @@ -102,7 +102,10 @@ type PubSub struct { newPeerError chan peer.ID // a notification channel for when our peers die - peerDead chan peer.ID + peerDead chan struct{} + peerDeadPrioLk sync.RWMutex + peerDeadMx sync.Mutex + peerDeadPend map[peer.ID]struct{} // The set of topics we are subscribed to mySubs map[string]map[*Subscription]struct{} @@ -238,7 +241,8 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option newPeersPend: make(map[peer.ID]struct{}), newPeerStream: make(chan network.Stream), newPeerError: make(chan peer.ID), - peerDead: make(chan peer.ID), + peerDead: make(chan struct{}, 1), + peerDeadPend: make(map[peer.ID]struct{}), cancelCh: make(chan *Subscription), getPeers: make(chan *listPeerReq), addSub: make(chan *addSubReq), @@ -509,34 +513,8 @@ func (p *PubSub) processLoop(ctx context.Context) { case pid := <-p.newPeerError: delete(p.peers, pid) - case pid := <-p.peerDead: - ch, ok := p.peers[pid] - if !ok { - continue - } - - close(ch) - - if p.host.Network().Connectedness(pid) == network.Connected { - // still connected, must be a duplicate connection being closed. - // we respawn the writer as we need to ensure there is a stream active - log.Debugf("peer declared dead but still connected; respawning writer: %s", pid) - messages := make(chan *RPC, p.peerOutboundQueueSize) - messages <- p.getHelloPacket() - go p.handleNewPeer(ctx, pid, messages) - p.peers[pid] = messages - continue - } - - delete(p.peers, pid) - for t, tmap := range p.topics { - if _, ok := tmap[pid]; ok { - delete(tmap, pid) - p.notifyLeave(t, pid) - } - } - - p.rt.RemovePeer(pid) + case <-p.peerDead: + p.handleDeadPeers() case treq := <-p.getTopics: var out []string @@ -645,6 +623,48 @@ func (p *PubSub) handlePendingPeers() { } } +func (p *PubSub) handleDeadPeers() { + p.peerDeadPrioLk.Lock() + defer p.peerDeadPrioLk.Unlock() + + if len(p.peerDeadPend) == 0 { + return + } + + deadPeers := p.peerDeadPend + p.peerDeadPend = make(map[peer.ID]struct{}) + + for pid := range deadPeers { + ch, ok := p.peers[pid] + if !ok { + continue + } + + close(ch) + + if p.host.Network().Connectedness(pid) == network.Connected { + // still connected, must be a duplicate connection being closed. + // we respawn the writer as we need to ensure there is a stream active + log.Debugf("peer declared dead but still connected; respawning writer: %s", pid) + messages := make(chan *RPC, p.peerOutboundQueueSize) + messages <- p.getHelloPacket() + go p.handleNewPeer(p.ctx, pid, messages) + p.peers[pid] = messages + continue + } + + delete(p.peers, pid) + for t, tmap := range p.topics { + if _, ok := tmap[pid]; ok { + delete(tmap, pid) + p.notifyLeave(t, pid) + } + } + + p.rt.RemovePeer(pid) + } +} + // handleAddTopic adds a tracker for a particular topic. // Only called from processLoop. func (p *PubSub) handleAddTopic(req *addTopicReq) {