Don't leak goroutines on decorator

When an error occours (or the peer disconnect), we return from decorator
as an error is published to the chan.
There are though still 5 or 6 goroutines that want to write on that
channel and at least 3/4 of them will be hanging, leaving them stuck
publishing on the chan.
This though is probably not the cause of
https://github.com/status-im/infra-eth-cluster/issues/39
fills up the stack trace with hung go routines.
This commit is contained in:
Andrea Maria Piana 2021-01-26 16:15:26 +01:00
parent f73fb9c3f6
commit cec7c52505
2 changed files with 105 additions and 27 deletions

View File

@ -161,7 +161,11 @@ func (r *PeerRateLimiter) Decorate(p RateLimiterPeer, rw p2p.MsgReadWriter, runL
in, out := p2p.MsgPipe()
defer func() {
if err := in.Close(); err != nil {
errC <- err
// Don't block as otherwise we might leak go routines
select {
case errC <- err:
default:
}
}
}()
defer func() {
@ -175,8 +179,13 @@ func (r *PeerRateLimiter) Decorate(p RateLimiterPeer, rw p2p.MsgReadWriter, runL
for {
packet, err := rw.ReadMsg()
if err != nil {
errC <- fmt.Errorf("failed to read packet: %v", err)
// Don't block as otherwise we might leak go routines
select {
case errC <- fmt.Errorf("failed to read packet: %v", err):
return
default:
return
}
}
RateLimitsProcessed.Inc()
@ -190,8 +199,14 @@ func (r *PeerRateLimiter) Decorate(p RateLimiterPeer, rw p2p.MsgReadWriter, runL
if halted := r.throttleIP(ip, packet.Size); halted {
for _, h := range r.handlers {
if err := h.ExceedIPLimit(); err != nil {
errC <- fmt.Errorf("exceed rate limit by IP: %v", err)
// Don't block as otherwise we might leak go routines
select {
case errC <- fmt.Errorf("exceed rate limit by IP: %v", err):
return
default:
return
}
}
}
}
@ -203,15 +218,25 @@ func (r *PeerRateLimiter) Decorate(p RateLimiterPeer, rw p2p.MsgReadWriter, runL
if halted := r.throttlePeer(peerID, packet.Size); halted {
for _, h := range r.handlers {
if err := h.ExceedPeerLimit(); err != nil {
errC <- fmt.Errorf("exceeded rate limit by peer: %v", err)
// Don't block as otherwise we might leak go routines
select {
case errC <- fmt.Errorf("exceeded rate limit by peer: %v", err):
return
default:
return
}
}
}
}
if err := in.WriteMsg(packet); err != nil {
errC <- fmt.Errorf("failed to write packet to pipe: %v", err)
// Don't block as otherwise we might leak go routines
select {
case errC <- fmt.Errorf("failed to write packet to pipe: %v", err):
return
default:
return
}
}
}
}()
@ -221,18 +246,34 @@ func (r *PeerRateLimiter) Decorate(p RateLimiterPeer, rw p2p.MsgReadWriter, runL
for {
packet, err := in.ReadMsg()
if err != nil {
errC <- fmt.Errorf("failed to read packet from pipe: %v", err)
// Don't block as otherwise we might leak go routines
select {
case errC <- fmt.Errorf("failed to read packet from pipe: %v", err):
return
default:
return
}
}
if err := rw.WriteMsg(packet); err != nil {
errC <- fmt.Errorf("failed to write packet: %v", err)
// Don't block as otherwise we might leak go routines
select {
case errC <- fmt.Errorf("failed to write packet: %v", err):
return
default:
return
}
}
}
}()
go func() {
errC <- runLoop(out)
// Don't block as otherwise we might leak go routines
select {
case errC <- runLoop(out):
return
default:
return
}
}()
return <-errC

View File

@ -124,8 +124,13 @@ func (r *PeerRateLimiter) decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoo
for {
packet, err := rw.ReadMsg()
if err != nil {
errC <- fmt.Errorf("failed to read packet: %v", err)
// We don't block as that might leak goroutines
select {
case errC <- fmt.Errorf("failed to read packet: %v", err):
return
default:
return
}
}
rateLimitsProcessed.Inc()
@ -137,8 +142,13 @@ func (r *PeerRateLimiter) decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoo
if halted := r.throttleIP(ip); halted {
for _, h := range r.handlers {
if err := h.ExceedIPLimit(); err != nil {
errC <- fmt.Errorf("exceed rate limit by IP: %v", err)
// We don't block as that might leak goroutines
select {
case errC <- fmt.Errorf("exceed rate limit by IP: %v", err):
return
default:
return
}
}
}
}
@ -150,15 +160,25 @@ func (r *PeerRateLimiter) decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoo
if halted := r.throttlePeer(peerID); halted {
for _, h := range r.handlers {
if err := h.ExceedPeerLimit(); err != nil {
errC <- fmt.Errorf("exceeded rate limit by peer: %v", err)
// We don't block as that might leak goroutines
select {
case errC <- fmt.Errorf("exceeded rate limit by peer: %v", err):
return
default:
return
}
}
}
}
if err := in.WriteMsg(packet); err != nil {
errC <- fmt.Errorf("failed to write packet to pipe: %v", err)
// We don't block as that might leak goroutines
select {
case errC <- fmt.Errorf("failed to write packet to pipe: %v", err):
return
default:
return
}
}
}
}()
@ -168,18 +188,35 @@ func (r *PeerRateLimiter) decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoo
for {
packet, err := in.ReadMsg()
if err != nil {
errC <- fmt.Errorf("failed to read packet from pipe: %v", err)
// We don't block as that might leak goroutines
select {
case errC <- fmt.Errorf("failed to read packet from pipe: %v", err):
return
default:
return
}
}
if err := rw.WriteMsg(packet); err != nil {
errC <- fmt.Errorf("failed to write packet: %v", err)
// We don't block as that might leak goroutines
select {
case errC <- fmt.Errorf("failed to write packet: %v", err):
return
default:
return
}
}
}
}()
go func() {
errC <- runLoop(p, out)
// We don't block as that might leak goroutines
select {
case errC <- runLoop(p, out):
return
default:
return
}
}()
return <-errC