Fix redundant start/stop calls (#3817)

* remove redundant proto start/stop calls from node start/stop
* fix WakuRelay start/stop not overriding GossipSub start/stop
* replace startRelay with reconnectRelayPeers
This commit is contained in:
Fabiana Cecin 2026-04-22 09:52:57 -03:00 committed by GitHub
parent 4394843299
commit bb8a7e8782
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 61 additions and 109 deletions

View File

@ -45,7 +45,7 @@ proc newTestWakuRecon*(
let proto = res.get() let proto = res.get()
proto.start() await proto.start()
switch.mount(proto) switch.mount(proto)
return proto return proto
@ -55,7 +55,7 @@ proc newTestWakuTransfer*(
idsTx: AsyncQueue[(SyncID, PubsubTopic, ContentTopic)], idsTx: AsyncQueue[(SyncID, PubsubTopic, ContentTopic)],
wantsRx: AsyncQueue[PeerId], wantsRx: AsyncQueue[PeerId],
needsRx: AsyncQueue[(PeerId, WakuMessageHash)], needsRx: AsyncQueue[(PeerId, WakuMessageHash)],
): SyncTransfer = ): Future[SyncTransfer] {.async.} =
let peerManager = PeerManager.new(switch) let peerManager = PeerManager.new(switch)
let proto = SyncTransfer.new( let proto = SyncTransfer.new(
@ -66,7 +66,7 @@ proc newTestWakuTransfer*(
remoteNeedsRx = needsRx, remoteNeedsRx = needsRx,
) )
proto.start() await proto.start()
switch.mount(proto) switch.mount(proto)
return proto return proto

View File

@ -63,8 +63,8 @@ suite "Waku Sync: reconciliation":
clientPeerInfo = clientSwitch.peerInfo.toRemotePeerInfo() clientPeerInfo = clientSwitch.peerInfo.toRemotePeerInfo()
asyncTeardown: asyncTeardown:
server.stop() await server.stop()
client.stop() await client.stop()
await allFutures(serverSwitch.stop(), clientSwitch.stop()) await allFutures(serverSwitch.stop(), clientSwitch.stop())
@ -561,8 +561,8 @@ suite "Waku Sync: reconciliation":
) )
defer: defer:
server.stop() await server.stop()
client.stop() await client.stop()
let res = await client.storeSynchronization(some(serverPeerInfo)) let res = await client.storeSynchronization(some(serverPeerInfo))
assert res.isOk(), $res.error assert res.isOk(), $res.error
@ -610,8 +610,8 @@ suite "Waku Sync: reconciliation":
) )
defer: defer:
server.stop() await server.stop()
client.stop() await client.stop()
let res = await client.storeSynchronization(some(serverPeerInfo)) let res = await client.storeSynchronization(some(serverPeerInfo))
assert res.isOk(), $res.error assert res.isOk(), $res.error
@ -657,8 +657,8 @@ suite "Waku Sync: reconciliation":
) )
defer: defer:
server.stop() await server.stop()
client.stop() await client.stop()
let res = await client.storeSynchronization(some(serverPeerInfo)) let res = await client.storeSynchronization(some(serverPeerInfo))
assert res.isOk(), $res.error assert res.isOk(), $res.error
@ -701,8 +701,8 @@ suite "Waku Sync: reconciliation":
) )
defer: defer:
server.stop() await server.stop()
client.stop() await client.stop()
let res = await client.storeSynchronization(some(serverPeerInfo)) let res = await client.storeSynchronization(some(serverPeerInfo))
assert res.isOk(), $res.error assert res.isOk(), $res.error
@ -736,8 +736,8 @@ suite "Waku Sync: reconciliation":
) )
defer: defer:
server.stop() await server.stop()
client.stop() await client.stop()
let res = await client.storeSynchronization(some(serverPeerInfo)) let res = await client.storeSynchronization(some(serverPeerInfo))
assert res.isOk(), $res.error assert res.isOk(), $res.error
@ -773,8 +773,8 @@ suite "Waku Sync: reconciliation":
) )
defer: defer:
server.stop() await server.stop()
client.stop() await client.stop()
let res = await client.storeSynchronization(some(serverPeerInfo)) let res = await client.storeSynchronization(some(serverPeerInfo))
assert res.isOk(), $res.error assert res.isOk(), $res.error
@ -848,8 +848,8 @@ suite "Waku Sync: transfer":
remoteNeedsRx = clientRemoteNeeds, remoteNeedsRx = clientRemoteNeeds,
) )
server.start() await server.start()
client.start() await client.start()
serverSwitch.mount(server) serverSwitch.mount(server)
clientSwitch.mount(client) clientSwitch.mount(client)
@ -861,8 +861,8 @@ suite "Waku Sync: transfer":
clientPeermanager.addPeer(serverPeerInfo) clientPeermanager.addPeer(serverPeerInfo)
asyncTeardown: asyncTeardown:
server.stop() await server.stop()
client.stop() await client.stop()
await allFutures(serverSwitch.stop(), clientSwitch.stop()) await allFutures(serverSwitch.stop(), clientSwitch.stop())

View File

@ -263,7 +263,8 @@ proc mountRelay*(
node.wakuRelay.routingRecordsHandler.add(peerExchangeHandler.get()) node.wakuRelay.routingRecordsHandler.add(peerExchangeHandler.get())
if node.started: if node.started:
await node.startRelay() await node.wakuRelay.start()
await node.reconnectRelayPeers()
node.switch.mount(node.wakuRelay, protocolMatcher(WakuRelayCodec)) node.switch.mount(node.wakuRelay, protocolMatcher(WakuRelayCodec))

View File

@ -369,30 +369,16 @@ proc mountStoreSync*(
return ok() return ok()
proc startRelay*(node: WakuNode) {.async.} = proc reconnectRelayPeers*(node: WakuNode) {.async.} =
## Setup and start relay protocol ## Reconnect to previously-seen WakuRelay peers.
info "starting relay protocol"
if node.wakuRelay.isNil(): if node.wakuRelay.isNil():
error "Failed to start relay. Not mounted."
return return
if not node.peerManager.switch.peerStore.hasPeers(protocolMatcher(WakuRelayCodec)):
## Setup relay protocol return
info "Found previous WakuRelay peers. Reconnecting."
# Resume previous relay connections let backoffPeriod =
if node.peerManager.switch.peerStore.hasPeers(protocolMatcher(WakuRelayCodec)): node.wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime)
info "Found previous WakuRelay peers. Reconnecting." await node.peerManager.reconnectPeers(WakuRelayCodec, backoffPeriod)
# Reconnect to previous relay peers. This will respect a backoff period, if necessary
let backoffPeriod =
node.wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime)
await node.peerManager.reconnectPeers(WakuRelayCodec, backoffPeriod)
# Start the WakuRelay protocol
await node.wakuRelay.start()
info "relay started successfully"
proc selectRandomPeers*(peers: seq[PeerId], numRandomPeers: int): seq[PeerId] = proc selectRandomPeers*(peers: seq[PeerId], numRandomPeers: int): seq[PeerId] =
var randomPeers = peers var randomPeers = peers
@ -430,7 +416,10 @@ proc mountRendezvous*(
return return
if node.started: if node.started:
await node.wakuRendezvous.start() try:
await node.wakuRendezvous.start()
except CancelledError as exc:
error "failed to start wakuRendezvous", error = exc.msg
try: try:
node.switch.mount(node.wakuRendezvous, protocolMatcher(WakuRendezVousCodec)) node.switch.mount(node.wakuRendezvous, protocolMatcher(WakuRendezVousCodec))
@ -578,31 +567,12 @@ proc start*(node: WakuNode) {.async.} =
if isBindIpWithZeroPort(address): if isBindIpWithZeroPort(address):
zeroPortPresent = true zeroPortPresent = true
# Perform relay-specific startup tasks TODO: this should be rethought
if not node.wakuRelay.isNil():
await node.startRelay()
if not node.wakuMix.isNil():
node.wakuMix.start()
if not node.wakuMetadata.isNil():
node.wakuMetadata.start()
if not node.wakuStoreResume.isNil(): if not node.wakuStoreResume.isNil():
await node.wakuStoreResume.start() await node.wakuStoreResume.start()
if not node.wakuRendezvous.isNil():
await node.wakuRendezvous.start()
if not node.wakuRendezvousClient.isNil(): if not node.wakuRendezvousClient.isNil():
await node.wakuRendezvousClient.start() await node.wakuRendezvousClient.start()
if not node.wakuStoreReconciliation.isNil():
node.wakuStoreReconciliation.start()
if not node.wakuStoreTransfer.isNil():
node.wakuStoreTransfer.start()
## The switch uses this mapper to update peer info addrs ## The switch uses this mapper to update peer info addrs
## with announced addrs after start ## with announced addrs after start
let addressMapper = proc( let addressMapper = proc(
@ -612,8 +582,12 @@ proc start*(node: WakuNode) {.async.} =
node.switch.peerInfo.addressMappers.add(addressMapper) node.switch.peerInfo.addressMappers.add(addressMapper)
## The switch will update addresses after start using the addressMapper ## The switch will update addresses after start using the addressMapper
## NOTE: This will dispatch gossipsub start to the WakuRelay.start method override
await node.switch.start() await node.switch.start()
# After switch.start, run custom Logos Delivery relay start logic
await node.reconnectRelayPeers()
node.started = true node.started = true
if not node.wakuFilterClient.isNil(): if not node.wakuFilterClient.isNil():
@ -637,6 +611,7 @@ proc stop*(node: WakuNode) {.async.} =
node.stopProvidersAndListeners() node.stopProvidersAndListeners()
## NOTE: This will dispatch gossipsub stop to the WakuRelay.stop method override
await node.switch.stop() await node.switch.stop()
node.peerManager.stop() node.peerManager.stop()
@ -653,12 +628,6 @@ proc stop*(node: WakuNode) {.async.} =
if not node.wakuStoreResume.isNil(): if not node.wakuStoreResume.isNil():
await node.wakuStoreResume.stopWait() await node.wakuStoreResume.stopWait()
if not node.wakuStoreReconciliation.isNil():
node.wakuStoreReconciliation.stop()
if not node.wakuStoreTransfer.isNil():
node.wakuStoreTransfer.stop()
if not node.wakuPeerExchangeClient.isNil() and if not node.wakuPeerExchangeClient.isNil() and
not node.wakuPeerExchangeClient.pxLoopHandle.isNil(): not node.wakuPeerExchangeClient.pxLoopHandle.isNil():
await node.wakuPeerExchangeClient.pxLoopHandle.cancelAndWait() await node.wakuPeerExchangeClient.pxLoopHandle.cancelAndWait()
@ -666,9 +635,6 @@ proc stop*(node: WakuNode) {.async.} =
if not node.wakuKademlia.isNil(): if not node.wakuKademlia.isNil():
await node.wakuKademlia.stop() await node.wakuKademlia.stop()
if not node.wakuRendezvous.isNil():
await node.wakuRendezvous.stopWait()
if not node.wakuRendezvousClient.isNil(): if not node.wakuRendezvousClient.isNil():
await node.wakuRendezvousClient.stopWait() await node.wakuRendezvousClient.stopWait()

View File

@ -108,9 +108,3 @@ proc new*(T: type WakuMetadata, clusterId: uint32, getShards: GetShards): T =
clusterId = wm.clusterId, shards = wm.getShards() clusterId = wm.clusterId, shards = wm.getShards()
return wm return wm
proc start*(wm: WakuMetadata) =
wm.started = true
proc stop*(wm: WakuMetadata) =
wm.started = false

View File

@ -104,10 +104,4 @@ proc new*(
proc poolSize*(mix: WakuMix): int = proc poolSize*(mix: WakuMix): int =
mix.nodePool.len mix.nodePool.len
method start*(mix: WakuMix) =
info "starting waku mix protocol"
method stop*(mix: WakuMix) {.async.} =
discard
# Mix Protocol # Mix Protocol

View File

@ -517,12 +517,12 @@ proc topicsHealthLoop(w: WakuRelay) {.async.} =
# safety cooldown to protect from edge cases # safety cooldown to protect from edge cases
await sleepAsync(100.milliseconds) await sleepAsync(100.milliseconds)
method start*(w: WakuRelay) {.async, base.} = method start*(w: WakuRelay) {.async: (raises: [CancelledError]).} =
info "start" info "start"
await procCall GossipSub(w).start() await procCall GossipSub(w).start()
w.topicHealthLoopHandle = w.topicsHealthLoop() w.topicHealthLoopHandle = w.topicsHealthLoop()
method stop*(w: WakuRelay) {.async, base.} = method stop*(w: WakuRelay) {.async: (raises: []).} =
info "stop" info "stop"
await procCall GossipSub(w).stop() await procCall GossipSub(w).stop()

View File

@ -211,29 +211,22 @@ proc new*(
return ok(wrv) return ok(wrv)
proc start*(self: WakuRendezVous) {.async: (raises: []).} = method start*(self: WakuRendezVous) {.async: (raises: [CancelledError]).} =
# Start the parent GenericRendezVous (starts the register deletion loop) # Start the parent GenericRendezVous (starts the register deletion loop)
if self.started: if self.started:
warn "waku rendezvous already started" warn "waku rendezvous already started"
return return
try: await procCall GenericRendezVous[WakuPeerRecord](self).start()
await procCall GenericRendezVous[WakuPeerRecord](self).start()
except CancelledError as exc:
error "failed to start GenericRendezVous", cause = exc.msg
return
# start registering forever # start registering forever
self.periodicRegistrationFut = self.periodicRegistration() self.periodicRegistrationFut = self.periodicRegistration()
info "waku rendezvous discovery started" info "waku rendezvous discovery started"
proc stopWait*(self: WakuRendezVous) {.async: (raises: []).} = method stop*(self: WakuRendezVous) {.async: (raises: []).} =
if not self.periodicRegistrationFut.isNil(): if not self.periodicRegistrationFut.isNil():
await self.periodicRegistrationFut.cancelAndWait() await self.periodicRegistrationFut.cancelAndWait()
# Stop the parent GenericRendezVous (stops the register deletion loop) # Stop the parent GenericRendezVous (stops the register deletion loop)
await GenericRendezVous[WakuPeerRecord](self).stop() await procCall GenericRendezVous[WakuPeerRecord](self).stop()
# Stop the parent GenericRendezVous (stops the register deletion loop)
await GenericRendezVous[WakuPeerRecord](self).stop()
info "waku rendezvous discovery stopped" info "waku rendezvous discovery stopped"

View File

@ -468,7 +468,7 @@ proc idsReceiverLoop(self: SyncReconciliation) {.async.} =
self.messageIngress(id, pubsub, content) self.messageIngress(id, pubsub, content)
proc start*(self: SyncReconciliation) = method start*(self: SyncReconciliation) {.async: (raises: [CancelledError]).} =
if self.started: if self.started:
return return
@ -484,13 +484,16 @@ proc start*(self: SyncReconciliation) =
info "Store Sync Reconciliation protocol started" info "Store Sync Reconciliation protocol started"
proc stop*(self: SyncReconciliation) = method stop*(self: SyncReconciliation) {.async: (raises: []).} =
if self.syncInterval > ZeroDuration: defer:
self.periodicSyncFut.cancelSoon() self.started = false
if self.syncInterval > ZeroDuration: if self.syncInterval > ZeroDuration:
self.periodicPruneFut.cancelSoon() await self.periodicSyncFut.cancelAndWait()
self.idsReceiverFut.cancelSoon() if self.syncInterval > ZeroDuration:
await self.periodicPruneFut.cancelAndWait()
await self.idsReceiverFut.cancelAndWait()
info "Store Sync Reconciliation protocol stopped" info "Store Sync Reconciliation protocol stopped"

View File

@ -217,7 +217,7 @@ proc new*(
return transfer return transfer
proc start*(self: SyncTransfer) = method start*(self: SyncTransfer) {.async: (raises: [CancelledError]).} =
if self.started: if self.started:
return return
@ -228,10 +228,11 @@ proc start*(self: SyncTransfer) =
info "Store Sync Transfer protocol started" info "Store Sync Transfer protocol started"
proc stop*(self: SyncTransfer) = method stop*(self: SyncTransfer) {.async: (raises: []).} =
self.started = false defer:
self.started = false
self.localWantsRxFut.cancelSoon() await self.localWantsRxFut.cancelAndWait()
self.remoteNeedsRxFut.cancelSoon() await self.remoteNeedsRxFut.cancelAndWait()
info "Store Sync Transfer protocol stopped" info "Store Sync Transfer protocol stopped"