mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-05-12 05:19:33 +00:00
Merge remote-tracking branch 'origin/master' into feat/zero-default-ports
This commit is contained in:
commit
a4d85b98dd
1
Makefile
1
Makefile
@ -74,6 +74,7 @@ $(NIMBLEDEPS_STAMP): nimble.lock | waku.nims
|
||||
nimble setup --localdeps
|
||||
$(MAKE) build-nph
|
||||
$(MAKE) rebuild-bearssl-nimbledeps
|
||||
$(MAKE) rebuild-nat-libs-nimbledeps
|
||||
touch $@
|
||||
|
||||
update:
|
||||
|
||||
@ -45,7 +45,7 @@ proc newTestWakuRecon*(
|
||||
|
||||
let proto = res.get()
|
||||
|
||||
proto.start()
|
||||
await proto.start()
|
||||
switch.mount(proto)
|
||||
|
||||
return proto
|
||||
@ -55,7 +55,7 @@ proc newTestWakuTransfer*(
|
||||
idsTx: AsyncQueue[(SyncID, PubsubTopic, ContentTopic)],
|
||||
wantsRx: AsyncQueue[PeerId],
|
||||
needsRx: AsyncQueue[(PeerId, WakuMessageHash)],
|
||||
): SyncTransfer =
|
||||
): Future[SyncTransfer] {.async.} =
|
||||
let peerManager = PeerManager.new(switch)
|
||||
|
||||
let proto = SyncTransfer.new(
|
||||
@ -66,7 +66,7 @@ proc newTestWakuTransfer*(
|
||||
remoteNeedsRx = needsRx,
|
||||
)
|
||||
|
||||
proto.start()
|
||||
await proto.start()
|
||||
switch.mount(proto)
|
||||
|
||||
return proto
|
||||
|
||||
@ -63,8 +63,8 @@ suite "Waku Sync: reconciliation":
|
||||
clientPeerInfo = clientSwitch.peerInfo.toRemotePeerInfo()
|
||||
|
||||
asyncTeardown:
|
||||
server.stop()
|
||||
client.stop()
|
||||
await server.stop()
|
||||
await client.stop()
|
||||
|
||||
await allFutures(serverSwitch.stop(), clientSwitch.stop())
|
||||
|
||||
@ -561,8 +561,8 @@ suite "Waku Sync: reconciliation":
|
||||
)
|
||||
|
||||
defer:
|
||||
server.stop()
|
||||
client.stop()
|
||||
await server.stop()
|
||||
await client.stop()
|
||||
|
||||
let res = await client.storeSynchronization(some(serverPeerInfo))
|
||||
assert res.isOk(), $res.error
|
||||
@ -610,8 +610,8 @@ suite "Waku Sync: reconciliation":
|
||||
)
|
||||
|
||||
defer:
|
||||
server.stop()
|
||||
client.stop()
|
||||
await server.stop()
|
||||
await client.stop()
|
||||
|
||||
let res = await client.storeSynchronization(some(serverPeerInfo))
|
||||
assert res.isOk(), $res.error
|
||||
@ -657,8 +657,8 @@ suite "Waku Sync: reconciliation":
|
||||
)
|
||||
|
||||
defer:
|
||||
server.stop()
|
||||
client.stop()
|
||||
await server.stop()
|
||||
await client.stop()
|
||||
|
||||
let res = await client.storeSynchronization(some(serverPeerInfo))
|
||||
assert res.isOk(), $res.error
|
||||
@ -701,8 +701,8 @@ suite "Waku Sync: reconciliation":
|
||||
)
|
||||
|
||||
defer:
|
||||
server.stop()
|
||||
client.stop()
|
||||
await server.stop()
|
||||
await client.stop()
|
||||
|
||||
let res = await client.storeSynchronization(some(serverPeerInfo))
|
||||
assert res.isOk(), $res.error
|
||||
@ -736,8 +736,8 @@ suite "Waku Sync: reconciliation":
|
||||
)
|
||||
|
||||
defer:
|
||||
server.stop()
|
||||
client.stop()
|
||||
await server.stop()
|
||||
await client.stop()
|
||||
|
||||
let res = await client.storeSynchronization(some(serverPeerInfo))
|
||||
assert res.isOk(), $res.error
|
||||
@ -773,8 +773,8 @@ suite "Waku Sync: reconciliation":
|
||||
)
|
||||
|
||||
defer:
|
||||
server.stop()
|
||||
client.stop()
|
||||
await server.stop()
|
||||
await client.stop()
|
||||
|
||||
let res = await client.storeSynchronization(some(serverPeerInfo))
|
||||
assert res.isOk(), $res.error
|
||||
@ -848,8 +848,8 @@ suite "Waku Sync: transfer":
|
||||
remoteNeedsRx = clientRemoteNeeds,
|
||||
)
|
||||
|
||||
server.start()
|
||||
client.start()
|
||||
await server.start()
|
||||
await client.start()
|
||||
|
||||
serverSwitch.mount(server)
|
||||
clientSwitch.mount(client)
|
||||
@ -861,8 +861,8 @@ suite "Waku Sync: transfer":
|
||||
clientPeermanager.addPeer(serverPeerInfo)
|
||||
|
||||
asyncTeardown:
|
||||
server.stop()
|
||||
client.stop()
|
||||
await server.stop()
|
||||
await client.stop()
|
||||
|
||||
await allFutures(serverSwitch.stop(), clientSwitch.stop())
|
||||
|
||||
|
||||
@ -263,7 +263,8 @@ proc mountRelay*(
|
||||
node.wakuRelay.routingRecordsHandler.add(peerExchangeHandler.get())
|
||||
|
||||
if node.started:
|
||||
await node.startRelay()
|
||||
await node.wakuRelay.start()
|
||||
await node.reconnectRelayPeers()
|
||||
|
||||
node.switch.mount(node.wakuRelay, protocolMatcher(WakuRelayCodec))
|
||||
|
||||
|
||||
@ -380,30 +380,16 @@ proc mountStoreSync*(
|
||||
|
||||
return ok()
|
||||
|
||||
proc startRelay*(node: WakuNode) {.async.} =
|
||||
## Setup and start relay protocol
|
||||
info "starting relay protocol"
|
||||
|
||||
proc reconnectRelayPeers*(node: WakuNode) {.async.} =
|
||||
## Reconnect to previously-seen WakuRelay peers.
|
||||
if node.wakuRelay.isNil():
|
||||
error "Failed to start relay. Not mounted."
|
||||
return
|
||||
|
||||
## Setup relay protocol
|
||||
|
||||
# Resume previous relay connections
|
||||
if node.peerManager.switch.peerStore.hasPeers(protocolMatcher(WakuRelayCodec)):
|
||||
info "Found previous WakuRelay peers. Reconnecting."
|
||||
|
||||
# Reconnect to previous relay peers. This will respect a backoff period, if necessary
|
||||
let backoffPeriod =
|
||||
node.wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime)
|
||||
|
||||
await node.peerManager.reconnectPeers(WakuRelayCodec, backoffPeriod)
|
||||
|
||||
# Start the WakuRelay protocol
|
||||
await node.wakuRelay.start()
|
||||
|
||||
info "relay started successfully"
|
||||
if not node.peerManager.switch.peerStore.hasPeers(protocolMatcher(WakuRelayCodec)):
|
||||
return
|
||||
info "Found previous WakuRelay peers. Reconnecting."
|
||||
let backoffPeriod =
|
||||
node.wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime)
|
||||
await node.peerManager.reconnectPeers(WakuRelayCodec, backoffPeriod)
|
||||
|
||||
proc selectRandomPeers*(peers: seq[PeerId], numRandomPeers: int): seq[PeerId] =
|
||||
var randomPeers = peers
|
||||
@ -441,7 +427,10 @@ proc mountRendezvous*(
|
||||
return
|
||||
|
||||
if node.started:
|
||||
await node.wakuRendezvous.start()
|
||||
try:
|
||||
await node.wakuRendezvous.start()
|
||||
except CancelledError as exc:
|
||||
error "failed to start wakuRendezvous", error = exc.msg
|
||||
|
||||
try:
|
||||
node.switch.mount(node.wakuRendezvous, protocolMatcher(WakuRendezVousCodec))
|
||||
@ -589,31 +578,12 @@ proc start*(node: WakuNode) {.async.} =
|
||||
if isBindIpWithZeroPort(address):
|
||||
zeroPortPresent = true
|
||||
|
||||
# Perform relay-specific startup tasks TODO: this should be rethought
|
||||
if not node.wakuRelay.isNil():
|
||||
await node.startRelay()
|
||||
|
||||
if not node.wakuMix.isNil():
|
||||
node.wakuMix.start()
|
||||
|
||||
if not node.wakuMetadata.isNil():
|
||||
node.wakuMetadata.start()
|
||||
|
||||
if not node.wakuStoreResume.isNil():
|
||||
await node.wakuStoreResume.start()
|
||||
|
||||
if not node.wakuRendezvous.isNil():
|
||||
await node.wakuRendezvous.start()
|
||||
|
||||
if not node.wakuRendezvousClient.isNil():
|
||||
await node.wakuRendezvousClient.start()
|
||||
|
||||
if not node.wakuStoreReconciliation.isNil():
|
||||
node.wakuStoreReconciliation.start()
|
||||
|
||||
if not node.wakuStoreTransfer.isNil():
|
||||
node.wakuStoreTransfer.start()
|
||||
|
||||
## The switch uses this mapper to update peer info addrs
|
||||
## with announced addrs after start
|
||||
let addressMapper = proc(
|
||||
@ -623,8 +593,12 @@ proc start*(node: WakuNode) {.async.} =
|
||||
node.switch.peerInfo.addressMappers.add(addressMapper)
|
||||
|
||||
## The switch will update addresses after start using the addressMapper
|
||||
## NOTE: This will dispatch gossipsub start to the WakuRelay.start method override
|
||||
await node.switch.start()
|
||||
|
||||
# After switch.start, run custom Logos Delivery relay start logic
|
||||
await node.reconnectRelayPeers()
|
||||
|
||||
node.started = true
|
||||
|
||||
if not node.wakuFilterClient.isNil():
|
||||
@ -648,6 +622,7 @@ proc stop*(node: WakuNode) {.async.} =
|
||||
|
||||
node.stopProvidersAndListeners()
|
||||
|
||||
## NOTE: This will dispatch gossipsub stop to the WakuRelay.stop method override
|
||||
await node.switch.stop()
|
||||
|
||||
node.peerManager.stop()
|
||||
@ -664,12 +639,6 @@ proc stop*(node: WakuNode) {.async.} =
|
||||
if not node.wakuStoreResume.isNil():
|
||||
await node.wakuStoreResume.stopWait()
|
||||
|
||||
if not node.wakuStoreReconciliation.isNil():
|
||||
node.wakuStoreReconciliation.stop()
|
||||
|
||||
if not node.wakuStoreTransfer.isNil():
|
||||
node.wakuStoreTransfer.stop()
|
||||
|
||||
if not node.wakuPeerExchangeClient.isNil() and
|
||||
not node.wakuPeerExchangeClient.pxLoopHandle.isNil():
|
||||
await node.wakuPeerExchangeClient.pxLoopHandle.cancelAndWait()
|
||||
@ -677,9 +646,6 @@ proc stop*(node: WakuNode) {.async.} =
|
||||
if not node.wakuKademlia.isNil():
|
||||
await node.wakuKademlia.stop()
|
||||
|
||||
if not node.wakuRendezvous.isNil():
|
||||
await node.wakuRendezvous.stopWait()
|
||||
|
||||
if not node.wakuRendezvousClient.isNil():
|
||||
await node.wakuRendezvousClient.stopWait()
|
||||
|
||||
|
||||
@ -108,9 +108,3 @@ proc new*(T: type WakuMetadata, clusterId: uint32, getShards: GetShards): T =
|
||||
clusterId = wm.clusterId, shards = wm.getShards()
|
||||
|
||||
return wm
|
||||
|
||||
proc start*(wm: WakuMetadata) =
|
||||
wm.started = true
|
||||
|
||||
proc stop*(wm: WakuMetadata) =
|
||||
wm.started = false
|
||||
|
||||
@ -104,10 +104,4 @@ proc new*(
|
||||
proc poolSize*(mix: WakuMix): int =
|
||||
mix.nodePool.len
|
||||
|
||||
method start*(mix: WakuMix) =
|
||||
info "starting waku mix protocol"
|
||||
|
||||
method stop*(mix: WakuMix) {.async.} =
|
||||
discard
|
||||
|
||||
# Mix Protocol
|
||||
|
||||
@ -517,12 +517,12 @@ proc topicsHealthLoop(w: WakuRelay) {.async.} =
|
||||
# safety cooldown to protect from edge cases
|
||||
await sleepAsync(100.milliseconds)
|
||||
|
||||
method start*(w: WakuRelay) {.async, base.} =
|
||||
method start*(w: WakuRelay) {.async: (raises: [CancelledError]).} =
|
||||
info "start"
|
||||
await procCall GossipSub(w).start()
|
||||
w.topicHealthLoopHandle = w.topicsHealthLoop()
|
||||
|
||||
method stop*(w: WakuRelay) {.async, base.} =
|
||||
method stop*(w: WakuRelay) {.async: (raises: []).} =
|
||||
info "stop"
|
||||
await procCall GossipSub(w).stop()
|
||||
|
||||
|
||||
@ -211,29 +211,22 @@ proc new*(
|
||||
|
||||
return ok(wrv)
|
||||
|
||||
proc start*(self: WakuRendezVous) {.async: (raises: []).} =
|
||||
method start*(self: WakuRendezVous) {.async: (raises: [CancelledError]).} =
|
||||
# Start the parent GenericRendezVous (starts the register deletion loop)
|
||||
if self.started:
|
||||
warn "waku rendezvous already started"
|
||||
return
|
||||
try:
|
||||
await procCall GenericRendezVous[WakuPeerRecord](self).start()
|
||||
except CancelledError as exc:
|
||||
error "failed to start GenericRendezVous", cause = exc.msg
|
||||
return
|
||||
await procCall GenericRendezVous[WakuPeerRecord](self).start()
|
||||
# start registering forever
|
||||
self.periodicRegistrationFut = self.periodicRegistration()
|
||||
|
||||
info "waku rendezvous discovery started"
|
||||
|
||||
proc stopWait*(self: WakuRendezVous) {.async: (raises: []).} =
|
||||
method stop*(self: WakuRendezVous) {.async: (raises: []).} =
|
||||
if not self.periodicRegistrationFut.isNil():
|
||||
await self.periodicRegistrationFut.cancelAndWait()
|
||||
|
||||
# Stop the parent GenericRendezVous (stops the register deletion loop)
|
||||
await GenericRendezVous[WakuPeerRecord](self).stop()
|
||||
|
||||
# Stop the parent GenericRendezVous (stops the register deletion loop)
|
||||
await GenericRendezVous[WakuPeerRecord](self).stop()
|
||||
await procCall GenericRendezVous[WakuPeerRecord](self).stop()
|
||||
|
||||
info "waku rendezvous discovery stopped"
|
||||
|
||||
@ -468,7 +468,7 @@ proc idsReceiverLoop(self: SyncReconciliation) {.async.} =
|
||||
|
||||
self.messageIngress(id, pubsub, content)
|
||||
|
||||
proc start*(self: SyncReconciliation) =
|
||||
method start*(self: SyncReconciliation) {.async: (raises: [CancelledError]).} =
|
||||
if self.started:
|
||||
return
|
||||
|
||||
@ -484,13 +484,16 @@ proc start*(self: SyncReconciliation) =
|
||||
|
||||
info "Store Sync Reconciliation protocol started"
|
||||
|
||||
proc stop*(self: SyncReconciliation) =
|
||||
if self.syncInterval > ZeroDuration:
|
||||
self.periodicSyncFut.cancelSoon()
|
||||
method stop*(self: SyncReconciliation) {.async: (raises: []).} =
|
||||
defer:
|
||||
self.started = false
|
||||
|
||||
if self.syncInterval > ZeroDuration:
|
||||
self.periodicPruneFut.cancelSoon()
|
||||
await self.periodicSyncFut.cancelAndWait()
|
||||
|
||||
self.idsReceiverFut.cancelSoon()
|
||||
if self.syncInterval > ZeroDuration:
|
||||
await self.periodicPruneFut.cancelAndWait()
|
||||
|
||||
await self.idsReceiverFut.cancelAndWait()
|
||||
|
||||
info "Store Sync Reconciliation protocol stopped"
|
||||
|
||||
@ -217,7 +217,7 @@ proc new*(
|
||||
|
||||
return transfer
|
||||
|
||||
proc start*(self: SyncTransfer) =
|
||||
method start*(self: SyncTransfer) {.async: (raises: [CancelledError]).} =
|
||||
if self.started:
|
||||
return
|
||||
|
||||
@ -228,10 +228,11 @@ proc start*(self: SyncTransfer) =
|
||||
|
||||
info "Store Sync Transfer protocol started"
|
||||
|
||||
proc stop*(self: SyncTransfer) =
|
||||
self.started = false
|
||||
method stop*(self: SyncTransfer) {.async: (raises: []).} =
|
||||
defer:
|
||||
self.started = false
|
||||
|
||||
self.localWantsRxFut.cancelSoon()
|
||||
self.remoteNeedsRxFut.cancelSoon()
|
||||
await self.localWantsRxFut.cancelAndWait()
|
||||
await self.remoteNeedsRxFut.cancelAndWait()
|
||||
|
||||
info "Store Sync Transfer protocol stopped"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user