mirror of
https://github.com/logos-messaging/logos-delivery-go.git
synced 2026-06-28 09:09:54 +00:00
fix: avoid enrChangeCh panic on Stop() timeout and make stopWithTimeout a method
Address review on #1306: - Only close enrChangeCh once wg.Wait() completes; on timeout leave it open to avoid a send-on-closed-channel panic from still-running node goroutines. - Convert stopWithTimeout to a *WakuNode method using w.log (per review). - Reword the cancel comment to not reference a non-existent processLoop symbol. Signed-off-by: kblinichkin <kirill.blinichkin@gmail.com>
This commit is contained in:
parent
6821697534
commit
7053340b9f
@ -527,11 +527,11 @@ func (w *WakuNode) Stop() {
|
||||
return
|
||||
}
|
||||
|
||||
// Cancel the node context before stopping protocols: once processLoop sees
|
||||
// ctx.Done() it stops draining its channels, so a later send would block.
|
||||
// Cancel the node context before stopping protocols: node goroutines that
|
||||
// observe ctx.Done() stop draining their channels, so a later send would block.
|
||||
w.cancel()
|
||||
|
||||
stopWithTimeout(w.bcaster.Stop, "bcaster", 5*time.Second, w.log)
|
||||
w.stopWithTimeout(w.bcaster.Stop, "bcaster", 5*time.Second)
|
||||
|
||||
defer w.connectionNotif.Close()
|
||||
defer w.addressChangesSub.Close()
|
||||
@ -540,32 +540,35 @@ func (w *WakuNode) Stop() {
|
||||
|
||||
// Each Stop() calls wg.Wait() internally and can block on a goroutine stuck
|
||||
// writing to a dead connection; the timeout keeps Stop() bounded.
|
||||
stopWithTimeout(w.relay.Stop, "relay", 10*time.Second, w.log)
|
||||
stopWithTimeout(w.lightPush.Stop, "lightPush", 5*time.Second, w.log)
|
||||
stopWithTimeout(w.legacyStore.Stop, "legacyStore", 5*time.Second, w.log)
|
||||
stopWithTimeout(w.filterFullNode.Stop, "filterFullNode", 5*time.Second, w.log)
|
||||
stopWithTimeout(w.filterLightNode.Stop, "filterLightNode", 5*time.Second, w.log)
|
||||
w.stopWithTimeout(w.relay.Stop, "relay", 10*time.Second)
|
||||
w.stopWithTimeout(w.lightPush.Stop, "lightPush", 5*time.Second)
|
||||
w.stopWithTimeout(w.legacyStore.Stop, "legacyStore", 5*time.Second)
|
||||
w.stopWithTimeout(w.filterFullNode.Stop, "filterFullNode", 5*time.Second)
|
||||
w.stopWithTimeout(w.filterLightNode.Stop, "filterLightNode", 5*time.Second)
|
||||
|
||||
if w.opts.enableDiscV5 {
|
||||
stopWithTimeout(w.discoveryV5.Stop, "discoveryV5", 5*time.Second, w.log)
|
||||
w.stopWithTimeout(w.discoveryV5.Stop, "discoveryV5", 5*time.Second)
|
||||
}
|
||||
stopWithTimeout(w.peerExchange.Stop, "peerExchange", 5*time.Second, w.log)
|
||||
stopWithTimeout(w.rendezvous.Stop, "rendezvous", 5*time.Second, w.log)
|
||||
stopWithTimeout(w.peerConnector.Stop, "peerConnector", 5*time.Second, w.log)
|
||||
stopWithTimeout(func() { _ = w.stopRlnRelay() }, "rlnRelay", 5*time.Second, w.log)
|
||||
stopWithTimeout(w.timesource.Stop, "timesource", 5*time.Second, w.log)
|
||||
w.stopWithTimeout(w.peerExchange.Stop, "peerExchange", 5*time.Second)
|
||||
w.stopWithTimeout(w.rendezvous.Stop, "rendezvous", 5*time.Second)
|
||||
w.stopWithTimeout(w.peerConnector.Stop, "peerConnector", 5*time.Second)
|
||||
w.stopWithTimeout(func() { _ = w.stopRlnRelay() }, "rlnRelay", 5*time.Second)
|
||||
w.stopWithTimeout(w.timesource.Stop, "timesource", 5*time.Second)
|
||||
|
||||
// Bound the wait: goroutines select on ctx.Done() and should exit promptly.
|
||||
waitDone := make(chan struct{})
|
||||
go func() { w.wg.Wait(); close(waitDone) }()
|
||||
select {
|
||||
case <-waitDone:
|
||||
// All node goroutines have exited, so it is safe to close the channels
|
||||
// they send on.
|
||||
close(w.enrChangeCh)
|
||||
case <-time.After(5 * time.Second):
|
||||
w.log.Error("timed out waiting for node goroutines to stop")
|
||||
// Some goroutines may still be running; leave enrChangeCh open, otherwise
|
||||
// a pending send would panic with "send on closed channel".
|
||||
w.log.Error("timed out waiting for node goroutines to stop; leaving enrChangeCh open")
|
||||
}
|
||||
|
||||
close(w.enrChangeCh)
|
||||
|
||||
// host.Close() can block indefinitely draining dead sockets; bound it too.
|
||||
hostClosed := make(chan struct{})
|
||||
go func() { w.host.Close(); close(hostClosed) }()
|
||||
@ -580,13 +583,13 @@ func (w *WakuNode) Stop() {
|
||||
|
||||
// stopWithTimeout runs fn and returns once it completes or timeout elapses,
|
||||
// logging a warning on timeout so one stuck component cannot hang Stop().
|
||||
func stopWithTimeout(fn func(), name string, timeout time.Duration, log *zap.Logger) {
|
||||
func (w *WakuNode) stopWithTimeout(fn func(), name string, timeout time.Duration) {
|
||||
done := make(chan struct{})
|
||||
go func() { fn(); close(done) }()
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(timeout):
|
||||
log.Warn("timed out stopping component", zap.String("component", name))
|
||||
w.log.Warn("timed out stopping component", zap.String("component", name))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user