From cec7c5250514c3bbb33f6c2d1514d6041c9b9f17 Mon Sep 17 00:00:00 2001 From: Andrea Maria Piana Date: Tue, 26 Jan 2021 16:15:26 +0100 Subject: [PATCH] Don't leak goroutines on decorator When an error occours (or the peer disconnect), we return from decorator as an error is published to the chan. There are though still 5 or 6 goroutines that want to write on that channel and at least 3/4 of them will be hanging, leaving them stuck publishing on the chan. This though is probably not the cause of https://github.com/status-im/infra-eth-cluster/issues/39 fills up the stack trace with hung go routines. --- waku/common/rate_limiter.go | 69 +++++++++++++++++++++++++++++-------- whisper/rate_limiter.go | 63 ++++++++++++++++++++++++++------- 2 files changed, 105 insertions(+), 27 deletions(-) diff --git a/waku/common/rate_limiter.go b/waku/common/rate_limiter.go index a293690dc..021509363 100644 --- a/waku/common/rate_limiter.go +++ b/waku/common/rate_limiter.go @@ -161,7 +161,11 @@ func (r *PeerRateLimiter) Decorate(p RateLimiterPeer, rw p2p.MsgReadWriter, runL in, out := p2p.MsgPipe() defer func() { if err := in.Close(); err != nil { - errC <- err + // Don't block as otherwise we might leak go routines + select { + case errC <- err: + default: + } } }() defer func() { @@ -175,8 +179,13 @@ func (r *PeerRateLimiter) Decorate(p RateLimiterPeer, rw p2p.MsgReadWriter, runL for { packet, err := rw.ReadMsg() if err != nil { - errC <- fmt.Errorf("failed to read packet: %v", err) - return + // Don't block as otherwise we might leak go routines + select { + case errC <- fmt.Errorf("failed to read packet: %v", err): + return + default: + return + } } RateLimitsProcessed.Inc() @@ -190,8 +199,14 @@ func (r *PeerRateLimiter) Decorate(p RateLimiterPeer, rw p2p.MsgReadWriter, runL if halted := r.throttleIP(ip, packet.Size); halted { for _, h := range r.handlers { if err := h.ExceedIPLimit(); err != nil { - errC <- fmt.Errorf("exceed rate limit by IP: %v", err) - return + // Don't block as otherwise we might leak go routines + select { + + case errC <- fmt.Errorf("exceed rate limit by IP: %v", err): + return + default: + return + } } } } @@ -203,15 +218,25 @@ func (r *PeerRateLimiter) Decorate(p RateLimiterPeer, rw p2p.MsgReadWriter, runL if halted := r.throttlePeer(peerID, packet.Size); halted { for _, h := range r.handlers { if err := h.ExceedPeerLimit(); err != nil { - errC <- fmt.Errorf("exceeded rate limit by peer: %v", err) - return + // Don't block as otherwise we might leak go routines + select { + case errC <- fmt.Errorf("exceeded rate limit by peer: %v", err): + return + default: + return + } } } } if err := in.WriteMsg(packet); err != nil { - errC <- fmt.Errorf("failed to write packet to pipe: %v", err) - return + // Don't block as otherwise we might leak go routines + select { + case errC <- fmt.Errorf("failed to write packet to pipe: %v", err): + return + default: + return + } } } }() @@ -221,18 +246,34 @@ func (r *PeerRateLimiter) Decorate(p RateLimiterPeer, rw p2p.MsgReadWriter, runL for { packet, err := in.ReadMsg() if err != nil { - errC <- fmt.Errorf("failed to read packet from pipe: %v", err) - return + // Don't block as otherwise we might leak go routines + select { + case errC <- fmt.Errorf("failed to read packet from pipe: %v", err): + return + default: + return + } } if err := rw.WriteMsg(packet); err != nil { - errC <- fmt.Errorf("failed to write packet: %v", err) - return + // Don't block as otherwise we might leak go routines + select { + case errC <- fmt.Errorf("failed to write packet: %v", err): + return + default: + return + } } } }() go func() { - errC <- runLoop(out) + // Don't block as otherwise we might leak go routines + select { + case errC <- runLoop(out): + return + default: + return + } }() return <-errC diff --git a/whisper/rate_limiter.go b/whisper/rate_limiter.go index c0fce02c1..fc2502649 100644 --- a/whisper/rate_limiter.go +++ b/whisper/rate_limiter.go @@ -124,8 +124,13 @@ func (r *PeerRateLimiter) decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoo for { packet, err := rw.ReadMsg() if err != nil { - errC <- fmt.Errorf("failed to read packet: %v", err) - return + // We don't block as that might leak goroutines + select { + case errC <- fmt.Errorf("failed to read packet: %v", err): + return + default: + return + } } rateLimitsProcessed.Inc() @@ -137,8 +142,13 @@ func (r *PeerRateLimiter) decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoo if halted := r.throttleIP(ip); halted { for _, h := range r.handlers { if err := h.ExceedIPLimit(); err != nil { - errC <- fmt.Errorf("exceed rate limit by IP: %v", err) - return + // We don't block as that might leak goroutines + select { + case errC <- fmt.Errorf("exceed rate limit by IP: %v", err): + return + default: + return + } } } } @@ -150,15 +160,25 @@ func (r *PeerRateLimiter) decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoo if halted := r.throttlePeer(peerID); halted { for _, h := range r.handlers { if err := h.ExceedPeerLimit(); err != nil { - errC <- fmt.Errorf("exceeded rate limit by peer: %v", err) - return + // We don't block as that might leak goroutines + select { + case errC <- fmt.Errorf("exceeded rate limit by peer: %v", err): + return + default: + return + } } } } if err := in.WriteMsg(packet); err != nil { - errC <- fmt.Errorf("failed to write packet to pipe: %v", err) - return + // We don't block as that might leak goroutines + select { + case errC <- fmt.Errorf("failed to write packet to pipe: %v", err): + return + default: + return + } } } }() @@ -168,18 +188,35 @@ func (r *PeerRateLimiter) decorate(p *Peer, rw p2p.MsgReadWriter, runLoop runLoo for { packet, err := in.ReadMsg() if err != nil { - errC <- fmt.Errorf("failed to read packet from pipe: %v", err) - return + // We don't block as that might leak goroutines + select { + case errC <- fmt.Errorf("failed to read packet from pipe: %v", err): + return + default: + return + } } if err := rw.WriteMsg(packet); err != nil { - errC <- fmt.Errorf("failed to write packet: %v", err) - return + // We don't block as that might leak goroutines + select { + case errC <- fmt.Errorf("failed to write packet: %v", err): + return + default: + return + } } } }() go func() { - errC <- runLoop(p, out) + // We don't block as that might leak goroutines + select { + case errC <- runLoop(p, out): + return + default: + return + } + }() return <-errC