mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-04-14 04:03:20 +00:00
Merge branch 'master' into deprec-vendor-2
This commit is contained in:
commit
e88e1c0445
@ -6,12 +6,15 @@ import
|
||||
os
|
||||
import
|
||||
libp2p/protocols/ping,
|
||||
libp2p/protocols/protocol,
|
||||
libp2p/crypto/[crypto, secp],
|
||||
libp2p/nameresolving/dnsresolver,
|
||||
libp2p/multicodec
|
||||
import
|
||||
./certsgenerator,
|
||||
waku/[waku_enr, node/peer_manager, waku_core, waku_node, factory/builder]
|
||||
waku/[waku_enr, node/peer_manager, waku_core, waku_node, factory/builder],
|
||||
waku/waku_metadata/protocol,
|
||||
waku/common/callbacks
|
||||
|
||||
# protocols and their tag
|
||||
const ProtocolsTable = {
|
||||
@ -45,7 +48,7 @@ type WakuCanaryConf* = object
|
||||
|
||||
timeout* {.
|
||||
desc: "Timeout to consider that the connection failed",
|
||||
defaultValue: chronos.seconds(10),
|
||||
defaultValue: chronos.seconds(20),
|
||||
name: "timeout",
|
||||
abbr: "t"
|
||||
.}: chronos.Duration
|
||||
@ -251,12 +254,26 @@ proc main(rng: ref HmacDrbgContext): Future[int] {.async.} =
|
||||
error "failed to mount libp2p ping protocol: " & getCurrentExceptionMsg()
|
||||
quit(QuitFailure)
|
||||
|
||||
node.mountMetadata(conf.clusterId, conf.shards).isOkOr:
|
||||
error "failed to mount metadata protocol", error
|
||||
# Mount metadata with a custom getter that returns CLI shards directly,
|
||||
# since the canary doesn't mount relay (which is what the default getter reads from).
|
||||
# Without this fix, the canary always sends remoteShards=[] in metadata requests.
|
||||
let cliShards = conf.shards
|
||||
let shardsGetter: GetShards = proc(): seq[uint16] {.closure, gcsafe, raises: [].} =
|
||||
return cliShards
|
||||
|
||||
let metadata = WakuMetadata.new(conf.clusterId, shardsGetter)
|
||||
node.wakuMetadata = metadata
|
||||
node.peerManager.wakuMetadata = metadata
|
||||
let mountRes = catch:
|
||||
node.switch.mount(metadata, protocolMatcher(WakuMetadataCodec))
|
||||
mountRes.isOkOr:
|
||||
error "failed to mount metadata protocol", error = error.msg
|
||||
quit(QuitFailure)
|
||||
|
||||
await node.start()
|
||||
|
||||
debug "Connecting to peer", peer = peer, timeout = conf.timeout
|
||||
|
||||
var pingFut: Future[bool]
|
||||
if conf.ping:
|
||||
pingFut = pingNode(node, peer).withTimeout(conf.timeout)
|
||||
@ -266,8 +283,18 @@ proc main(rng: ref HmacDrbgContext): Future[int] {.async.} =
|
||||
error "Timedout after", timeout = conf.timeout
|
||||
quit(QuitFailure)
|
||||
|
||||
# Clean disconnect with defer so the remote node doesn't see
|
||||
# "Stream Underlying Connection Closed!" when we exit
|
||||
defer:
|
||||
debug "Cleanly disconnecting from peer", peerId = peer.peerId
|
||||
await node.peerManager.disconnectNode(peer.peerId)
|
||||
await node.stop()
|
||||
|
||||
debug "Connected, checking connection status", peerId = peer.peerId
|
||||
|
||||
let lp2pPeerStore = node.switch.peerStore
|
||||
let conStatus = node.peerManager.switch.peerStore[ConnectionBook][peer.peerId]
|
||||
debug "Connection status", peerId = peer.peerId, conStatus = conStatus
|
||||
|
||||
var pingSuccess = true
|
||||
if conf.ping:
|
||||
@ -283,14 +310,15 @@ proc main(rng: ref HmacDrbgContext): Future[int] {.async.} =
|
||||
|
||||
if conStatus in [Connected, CanConnect]:
|
||||
let nodeProtocols = lp2pPeerStore[ProtoBook][peer.peerId]
|
||||
debug "Peer protocols", peerId = peer.peerId, protocols = nodeProtocols
|
||||
|
||||
if not areProtocolsSupported(conf.protocols, nodeProtocols):
|
||||
error "Not all protocols are supported",
|
||||
expected = conf.protocols, supported = nodeProtocols
|
||||
quit(QuitFailure)
|
||||
return 1
|
||||
elif conStatus == CannotConnect:
|
||||
error "Could not connect", peerId = peer.peerId
|
||||
quit(QuitFailure)
|
||||
return 1
|
||||
return 0
|
||||
|
||||
when isMainModule:
|
||||
|
||||
@ -115,7 +115,7 @@ proc subscribePubsubTopics(
|
||||
if isNil(self.node.wakuRelay):
|
||||
return err("subscribePubsubTopics requires a Relay")
|
||||
|
||||
var errors: seq[string] = @[]
|
||||
var errors: seq[string]
|
||||
|
||||
for shard in shards:
|
||||
if not self.contentTopicSubs.hasKey(shard):
|
||||
@ -196,6 +196,11 @@ const EdgeFilterLoopInterval = chronos.seconds(30)
|
||||
const EdgeFilterSubLoopDebounce = chronos.seconds(1)
|
||||
## Debounce delay to coalesce rapid-fire wakeups into a single reconciliation pass.
|
||||
|
||||
type EdgeDialTask = object
|
||||
peer: RemotePeerInfo
|
||||
shard: PubsubTopic
|
||||
topics: seq[ContentTopic]
|
||||
|
||||
proc updateShardHealth(
|
||||
self: SubscriptionManager, shard: PubsubTopic, state: var EdgeFilterSubState
|
||||
) =
|
||||
@ -335,7 +340,7 @@ proc edgeFilterHealthLoop*(self: SubscriptionManager) {.async.} =
|
||||
var alive = initHashSet[PeerId]()
|
||||
|
||||
if connected.len > 0:
|
||||
var pingTasks: seq[(PeerId, Future[FilterSubscribeResult])] = @[]
|
||||
var pingTasks: seq[(PeerId, Future[FilterSubscribeResult])]
|
||||
for peer in connected.values:
|
||||
pingTasks.add(
|
||||
(peer.peerId, self.node.wakuFilterClient.ping(peer, EdgeFilterPingTimeout))
|
||||
@ -362,6 +367,36 @@ proc edgeFilterHealthLoop*(self: SubscriptionManager) {.async.} =
|
||||
if changed:
|
||||
self.edgeFilterWakeup.fire()
|
||||
|
||||
proc selectFilterCandidates(
|
||||
self: SubscriptionManager, shard: PubsubTopic, exclude: HashSet[PeerId], needed: int
|
||||
): seq[RemotePeerInfo] =
|
||||
## Select filter service peer candidates for a shard.
|
||||
|
||||
# Start with every filter server peer that can serve the shard
|
||||
var allCandidates = self.node.peerManager.selectPeers(
|
||||
filter_common.WakuFilterSubscribeCodec, some(shard)
|
||||
)
|
||||
|
||||
# Remove all already used in this shard or being dialed for it
|
||||
allCandidates.keepItIf(it.peerId notin exclude)
|
||||
|
||||
# Collect peer IDs already tracked on other shards
|
||||
var trackedOnOther = initHashSet[PeerId]()
|
||||
for otherShard, otherState in self.edgeFilterSubStates.pairs:
|
||||
if otherShard != shard:
|
||||
for peer in otherState.peers:
|
||||
trackedOnOther.incl(peer.peerId)
|
||||
|
||||
# Prefer peers we already have a connection to first, preserving shuffle
|
||||
var candidates =
|
||||
allCandidates.filterIt(it.peerId in trackedOnOther) &
|
||||
allCandidates.filterIt(it.peerId notin trackedOnOther)
|
||||
|
||||
# We need to return 'needed' peers only
|
||||
if candidates.len > needed:
|
||||
candidates.setLen(needed)
|
||||
return candidates
|
||||
|
||||
proc edgeFilterSubLoop*(self: SubscriptionManager) {.async.} =
|
||||
## Reconciles filter subscriptions with the desired state from SubscriptionManager.
|
||||
var lastSynced = initTable[PubsubTopic, HashSet[ContentTopic]]()
|
||||
@ -382,6 +417,12 @@ proc edgeFilterSubLoop*(self: SubscriptionManager) {.async.} =
|
||||
|
||||
let allShards = toHashSet(toSeq(desired.keys)) + toHashSet(toSeq(lastSynced.keys))
|
||||
|
||||
# Step 1: read state across all shards at once and
|
||||
# create a list of peer dial tasks and shard tracking to delete.
|
||||
|
||||
var dialTasks: seq[EdgeDialTask]
|
||||
var shardsToDelete: seq[PubsubTopic]
|
||||
|
||||
for shard in allShards:
|
||||
let currTopics = desired.getOrDefault(shard)
|
||||
let prevTopics = lastSynced.getOrDefault(shard)
|
||||
@ -404,11 +445,7 @@ proc edgeFilterSubLoop*(self: SubscriptionManager) {.async.} =
|
||||
asyncSpawn self.syncFilterDeltas(peer, shard, addedTopics, removedTopics)
|
||||
|
||||
if currTopics.len == 0:
|
||||
for fut in state.pending:
|
||||
if not fut.finished:
|
||||
await fut.cancelAndWait()
|
||||
self.edgeFilterSubStates.del(shard)
|
||||
# invalidates `state` — do not use after this
|
||||
shardsToDelete.add(shard)
|
||||
else:
|
||||
self.updateShardHealth(shard, state[])
|
||||
|
||||
@ -416,11 +453,7 @@ proc edgeFilterSubLoop*(self: SubscriptionManager) {.async.} =
|
||||
|
||||
if needed > 0:
|
||||
let tracked = state.peers.mapIt(it.peerId).toHashSet() + state.pendingPeers
|
||||
var candidates = self.node.peerManager.selectPeers(
|
||||
filter_common.WakuFilterSubscribeCodec, some(shard)
|
||||
)
|
||||
candidates.keepItIf(it.peerId notin tracked)
|
||||
|
||||
let candidates = self.selectFilterCandidates(shard, tracked, needed)
|
||||
let toDial = min(needed, candidates.len)
|
||||
|
||||
trace "edgeFilterSubLoop: shard reconciliation",
|
||||
@ -432,8 +465,25 @@ proc edgeFilterSubLoop*(self: SubscriptionManager) {.async.} =
|
||||
toDial = toDial
|
||||
|
||||
for i in 0 ..< toDial:
|
||||
let fut = self.dialFilterPeer(candidates[i], shard, toSeq(currTopics))
|
||||
state.pending.add(fut)
|
||||
dialTasks.add(
|
||||
EdgeDialTask(
|
||||
peer: candidates[i], shard: shard, topics: toSeq(currTopics)
|
||||
)
|
||||
)
|
||||
|
||||
# Step 2: execute deferred shard tracking deletion and dial tasks.
|
||||
|
||||
for shard in shardsToDelete:
|
||||
self.edgeFilterSubStates.withValue(shard, state):
|
||||
for fut in state.pending:
|
||||
if not fut.finished:
|
||||
await fut.cancelAndWait()
|
||||
self.edgeFilterSubStates.del(shard)
|
||||
|
||||
for task in dialTasks:
|
||||
let fut = self.dialFilterPeer(task.peer, task.shard, task.topics)
|
||||
self.edgeFilterSubStates.withValue(task.shard, state):
|
||||
state.pending.add(fut)
|
||||
|
||||
lastSynced = desired
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user