better worker cleanup
1. Don't hang marking a peer as dead if we're shutting down. 2. No need to "drain" the outgoing channel anymore. This may have been necessary to prevent a deadlock where the main loop blocked on sending on sending a message while we waited to tell the main loop that the peer was dead. However, this is no longer an issue (we never block on sending).
This commit is contained in:
parent
a01778ac78
commit
56ea7752e8
22
comm.go
22
comm.go
|
@ -26,10 +26,6 @@ func (p *PubSub) getHelloPacket() *RPC {
|
|||
}
|
||||
|
||||
func (p *PubSub) handleNewStream(s inet.Stream) {
|
||||
defer func() {
|
||||
p.peerDead <- s.Conn().RemotePeer()
|
||||
}()
|
||||
|
||||
r := ggio.NewDelimitedReader(s, 1<<20)
|
||||
for {
|
||||
rpc := new(RPC)
|
||||
|
@ -43,6 +39,10 @@ func (p *PubSub) handleNewStream(s inet.Stream) {
|
|||
// but it doesn't hurt to send it.
|
||||
s.Close()
|
||||
}
|
||||
select {
|
||||
case p.peerDead <- s.Conn().RemotePeer():
|
||||
case <-p.ctx.Done():
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -58,7 +58,6 @@ func (p *PubSub) handleNewStream(s inet.Stream) {
|
|||
}
|
||||
|
||||
func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, outgoing <-chan *RPC) {
|
||||
var dead bool
|
||||
bufw := bufio.NewWriter(s)
|
||||
wc := ggio.NewDelimitedWriter(bufw)
|
||||
|
||||
|
@ -78,21 +77,16 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s inet.Stream, outgo
|
|||
if !ok {
|
||||
return
|
||||
}
|
||||
if dead {
|
||||
// continue in order to drain messages
|
||||
continue
|
||||
}
|
||||
|
||||
err := writeMsg(&rpc.RPC)
|
||||
if err != nil {
|
||||
s.Reset()
|
||||
log.Warningf("writing message to %s: %s", s.Conn().RemotePeer(), err)
|
||||
dead = true
|
||||
go func() {
|
||||
p.peerDead <- s.Conn().RemotePeer()
|
||||
}()
|
||||
select {
|
||||
case p.peerDead <- s.Conn().RemotePeer():
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
|
|
@ -103,6 +103,14 @@ func NewFloodSub(ctx context.Context, h host.Host) *PubSub {
|
|||
|
||||
// processLoop handles all inputs arriving on the channels
|
||||
func (p *PubSub) processLoop(ctx context.Context) {
|
||||
defer func() {
|
||||
// Clean up go routines.
|
||||
for _, ch := range p.peers {
|
||||
close(ch)
|
||||
}
|
||||
p.peers = nil
|
||||
p.topics = nil
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case s := <-p.newPeers:
|
||||
|
|
Loading…
Reference in New Issue