From 56ea7752e815ec26f4fde72c1f371dbb6eaa4197 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Sat, 14 Oct 2017 11:13:23 -0700 Subject: [PATCH] better worker cleanup 1. Don't hang marking a peer as dead if we're shutting down. 2. No need to "drain" the outgoing channel anymore. This may have been necessary to prevent a deadlock where the main loop blocked on sending on sending a message while we waited to tell the main loop that the peer was dead. However, this is no longer an issue (we never block on sending). --- comm.go | 22 ++++++++-------------- floodsub.go | 8 ++++++++ 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/comm.go b/comm.go index 7b68fd5..adda4d4 100644 --- a/comm.go +++ b/comm.go @@ -26,10 +26,6 @@ func (p *PubSub) getHelloPacket() *RPC { } func (p *PubSub) handleNewStream(s inet.Stream) { - defer func() { - p.peerDead <- s.Conn().RemotePeer() - }() - r := ggio.NewDelimitedReader(s, 1<<20) for { rpc := new(RPC) @@ -43,6 +39,10 @@ func (p *PubSub) handleNewStream(s inet.Stream) { // but it doesn't hurt to send it. s.Close() } + select { + case p.peerDead <- s.Conn().RemotePeer(): + case <-p.ctx.Done(): + } return } @@ -58,7 +58,6 @@ func (p *PubSub) handleNewStream(s inet.Stream) { } func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, outgoing <-chan *RPC) { - var dead bool bufw := bufio.NewWriter(s) wc := ggio.NewDelimitedWriter(bufw) @@ -78,21 +77,16 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, outgo if !ok { return } - if dead { - // continue in order to drain messages - continue - } err := writeMsg(&rpc.RPC) if err != nil { s.Reset() log.Warningf("writing message to %s: %s", s.Conn().RemotePeer(), err) - dead = true - go func() { - p.peerDead <- s.Conn().RemotePeer() - }() + select { + case p.peerDead <- s.Conn().RemotePeer(): + case <-ctx.Done(): + } } - case <-ctx.Done(): return } diff --git a/floodsub.go b/floodsub.go index 466e73b..5c56d1a 100644 --- a/floodsub.go +++ b/floodsub.go @@ -103,6 +103,14 @@ func NewFloodSub(ctx context.Context, h host.Host) *PubSub { // processLoop handles all inputs arriving on the channels func (p *PubSub) processLoop(ctx context.Context) { + defer func() { + // Clean up go routines. + for _, ch := range p.peers { + close(ch) + } + p.peers = nil + p.topics = nil + }() for { select { case s := <-p.newPeers: