fix: bound WakuNode.Stop() with timeouts to avoid hang on dead connections (#1306)

Signed-off-by: kblinichkin <kirill.blinichkin@gmail.com>
This commit is contained in:
Kirill Blinichkin 2026-06-18 08:55:21 -05:00 committed by GitHub
parent 81291ef862
commit f3fc70002f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -527,42 +527,72 @@ func (w *WakuNode) Stop() {
return
}
w.bcaster.Stop()
// Cancel the node context before stopping protocols: node goroutines that
// observe ctx.Done() stop draining their channels, so a later send would block.
w.cancel()
w.stopWithTimeout(w.bcaster.Stop, "bcaster", 5*time.Second)
defer w.connectionNotif.Close()
defer w.addressChangesSub.Close()
w.host.Network().StopNotify(w.connectionNotif)
w.relay.Stop()
w.lightPush.Stop()
w.legacyStore.Stop()
w.filterFullNode.Stop()
w.filterLightNode.Stop()
// Each Stop() calls wg.Wait() internally and can block on a goroutine stuck
// writing to a dead connection; the timeout keeps Stop() bounded.
w.stopWithTimeout(w.relay.Stop, "relay", 10*time.Second)
w.stopWithTimeout(w.lightPush.Stop, "lightPush", 5*time.Second)
w.stopWithTimeout(w.legacyStore.Stop, "legacyStore", 5*time.Second)
w.stopWithTimeout(w.filterFullNode.Stop, "filterFullNode", 5*time.Second)
w.stopWithTimeout(w.filterLightNode.Stop, "filterLightNode", 5*time.Second)
if w.opts.enableDiscV5 {
w.discoveryV5.Stop()
w.stopWithTimeout(w.discoveryV5.Stop, "discoveryV5", 5*time.Second)
}
w.peerExchange.Stop()
w.rendezvous.Stop()
w.stopWithTimeout(w.peerExchange.Stop, "peerExchange", 5*time.Second)
w.stopWithTimeout(w.rendezvous.Stop, "rendezvous", 5*time.Second)
w.stopWithTimeout(w.peerConnector.Stop, "peerConnector", 5*time.Second)
w.stopWithTimeout(func() { _ = w.stopRlnRelay() }, "rlnRelay", 5*time.Second)
w.stopWithTimeout(w.timesource.Stop, "timesource", 5*time.Second)
w.peerConnector.Stop()
// Bound the wait: goroutines select on ctx.Done() and should exit promptly.
waitDone := make(chan struct{})
go func() { w.wg.Wait(); close(waitDone) }()
select {
case <-waitDone:
// All node goroutines have exited, so it is safe to close the channels
// they send on.
close(w.enrChangeCh)
case <-time.After(5 * time.Second):
// Some goroutines may still be running; leave enrChangeCh open, otherwise
// a pending send would panic with "send on closed channel".
w.log.Error("timed out waiting for node goroutines to stop; leaving enrChangeCh open")
}
_ = w.stopRlnRelay()
w.timesource.Stop()
w.host.Close()
w.cancel()
w.wg.Wait()
close(w.enrChangeCh)
// host.Close() can block indefinitely draining dead sockets; bound it too.
hostClosed := make(chan struct{})
go func() { w.host.Close(); close(hostClosed) }()
select {
case <-hostClosed:
case <-time.After(5 * time.Second):
w.log.Warn("timed out waiting for host.Close(); proceeding")
}
w.cancel = nil
}
// stopWithTimeout runs fn and returns once it completes or timeout elapses,
// logging a warning on timeout so one stuck component cannot hang Stop().
func (w *WakuNode) stopWithTimeout(fn func(), name string, timeout time.Duration) {
done := make(chan struct{})
go func() { fn(); close(done) }()
select {
case <-done:
case <-time.After(timeout):
w.log.Warn("timed out stopping component", zap.String("component", name))
}
}
// Host returns the libp2p Host used by the WakuNode
func (w *WakuNode) Host() host.Host {
return w.host