This commit is contained in:
Fabiana Cecin 2026-05-29 13:13:14 -03:00
parent b451b94085
commit 5538df0ff9
No known key found for this signature in database
GPG Key ID: BCAB8A55CB51B6C7
9 changed files with 21 additions and 24 deletions

View File

@ -187,7 +187,9 @@ proc edgePeersReached(w: Waku, shard: PubsubTopic, n: int): Future[bool] {.async
await sleepAsync(100.milliseconds)
return false
proc edgePeersDroppedBelow(w: Waku, shard: PubsubTopic, n: int): Future[bool] {.async.} =
proc edgePeersDroppedBelow(
w: Waku, shard: PubsubTopic, n: int
): Future[bool] {.async.} =
let deadline = Moment.now() + EdgeWaitTimeout
while Moment.now() < deadline:
if w.node.subscriptionManager.edgeFilterPeerCount(shard) < n:

View File

@ -51,9 +51,8 @@ proc send*(
): Future[Result[RequestId, string]] {.async.} =
?checkApiAvailability(w)
let isSubbed = w.node.subscriptionManager
.isSubscribed(envelope.contentTopic)
.valueOr(false)
let isSubbed =
w.node.subscriptionManager.isSubscribed(envelope.contentTopic).valueOr(false)
if not isSubbed:
info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic
w.node.subscriptionManager.subscribe(envelope.contentTopic).isOkOr:

View File

@ -340,9 +340,8 @@ proc startDnsDiscoveryRetryLoop(waku: Waku): Future[void] {.async.} =
continue
if not waku.wakuDiscv5.isNil():
let dynamicBootstrapEnrs = waku.dynamicBootstrapNodes
.filterIt(it.hasUdpPort())
.mapIt(it.enr.get().toUri())
let dynamicBootstrapEnrs =
waku.dynamicBootstrapNodes.filterIt(it.hasUdpPort()).mapIt(it.enr.get().toUri())
var discv5BootstrapEnrs: seq[enr.Record]
# parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq
for enrUri in dynamicBootstrapEnrs:
@ -354,9 +353,7 @@ proc startDnsDiscoveryRetryLoop(waku: Waku): Future[void] {.async.} =
info "Connecting to dynamic bootstrap peers"
try:
await connectToNodes(
waku.node, waku.dynamicBootstrapNodes, "dynamic bootstrap"
)
await connectToNodes(waku.node, waku.dynamicBootstrapNodes, "dynamic bootstrap")
except CatchableError:
error "failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg()
return

View File

@ -1,7 +1,5 @@
import results, chronos
import
./node/waku_node,
./node/delivery_service/[recv_service, send_service]
import ./node/waku_node, ./node/delivery_service/[recv_service, send_service]
type MessagingClient* = ref object
sendService*: SendService

View File

@ -76,7 +76,9 @@ proc processIncomingMessage(
## or if the message is a duplicate (recently-seen). Otherwise, save it as
## recently-seen, emit a MessageReceivedEvent, and return true.
if not self.node.subscriptionManager.isContentSubscribed(pubsubTopic, message.contentTopic):
if not self.node.subscriptionManager.isContentSubscribed(
pubsubTopic, message.contentTopic
):
trace "skipping message as I am not subscribed",
shard = pubsubTopic, contentTopic = message.contentTopic
return false

View File

@ -95,9 +95,7 @@ proc setupSendProcessorChain(
return ok(processors[0])
proc new*(
T: typedesc[SendService],
preferP2PReliability: bool,
w: WakuNode,
T: typedesc[SendService], preferP2PReliability: bool, w: WakuNode
): Result[T, string] =
if w.wakuRelay.isNil() and w.wakuLightpushClient.isNil():
return err(

View File

@ -76,7 +76,8 @@ proc subscribe*(
return err("Failed to decode subscription event: " & error)
if contentTopicOp.isSome():
return node.subscriptionManager.subscribe(pubsubTopic, contentTopicOp.get(), handler)
return
node.subscriptionManager.subscribe(pubsubTopic, contentTopicOp.get(), handler)
return node.subscriptionManager.subscribeShard(pubsubTopic, handler)
proc unsubscribe*(

View File

@ -93,7 +93,8 @@ type
ShardSubscription* = object
contentTopics*: HashSet[ContentTopic]
directShardSub*: bool ## shard subscribed directly (PubsubSub), independent of content-topic interest
directShardSub*: bool
## shard subscribed directly (PubsubSub), independent of content-topic interest
EdgeFilterSubState* = object
peers*: seq[RemotePeerInfo]

View File

@ -150,8 +150,9 @@ proc subscribeShard*(
entry.directShardSub = true
added = true
do:
self.shards[shard] =
ShardSubscription(contentTopics: initHashSet[ContentTopic](), directShardSub: true)
self.shards[shard] = ShardSubscription(
contentTopics: initHashSet[ContentTopic](), directShardSub: true
)
added = true
if added:
self.edgeFilterWakeup.fire()
@ -220,9 +221,7 @@ proc unsubscribe*(
self.node.doRelayUnsubscribe(shard)
return ok()
proc subscribe*(
self: SubscriptionManager, topic: ContentTopic
): Result[void, string] =
proc subscribe*(self: SubscriptionManager, topic: ContentTopic): Result[void, string] =
## Subscribes to a content topic, resolving its shard via autosharding.
let shard = ?self.getShardForContentTopic(topic)
return self.subscribe(shard, topic)