diff --git a/tests/api/test_api_subscription.nim b/tests/api/test_api_subscription.nim index 23936776d..8f587b535 100644 --- a/tests/api/test_api_subscription.nim +++ b/tests/api/test_api_subscription.nim @@ -187,7 +187,9 @@ proc edgePeersReached(w: Waku, shard: PubsubTopic, n: int): Future[bool] {.async await sleepAsync(100.milliseconds) return false -proc edgePeersDroppedBelow(w: Waku, shard: PubsubTopic, n: int): Future[bool] {.async.} = +proc edgePeersDroppedBelow( + w: Waku, shard: PubsubTopic, n: int +): Future[bool] {.async.} = let deadline = Moment.now() + EdgeWaitTimeout while Moment.now() < deadline: if w.node.subscriptionManager.edgeFilterPeerCount(shard) < n: diff --git a/waku/api/api.nim b/waku/api/api.nim index 56ab198e7..e0e7d4d7b 100644 --- a/waku/api/api.nim +++ b/waku/api/api.nim @@ -51,9 +51,8 @@ proc send*( ): Future[Result[RequestId, string]] {.async.} = ?checkApiAvailability(w) - let isSubbed = w.node.subscriptionManager - .isSubscribed(envelope.contentTopic) - .valueOr(false) + let isSubbed = + w.node.subscriptionManager.isSubscribed(envelope.contentTopic).valueOr(false) if not isSubbed: info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic w.node.subscriptionManager.subscribe(envelope.contentTopic).isOkOr: diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 49225996c..9f1493fe5 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -340,9 +340,8 @@ proc startDnsDiscoveryRetryLoop(waku: Waku): Future[void] {.async.} = continue if not waku.wakuDiscv5.isNil(): - let dynamicBootstrapEnrs = waku.dynamicBootstrapNodes - .filterIt(it.hasUdpPort()) - .mapIt(it.enr.get().toUri()) + let dynamicBootstrapEnrs = + waku.dynamicBootstrapNodes.filterIt(it.hasUdpPort()).mapIt(it.enr.get().toUri()) var discv5BootstrapEnrs: seq[enr.Record] # parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq for enrUri in dynamicBootstrapEnrs: @@ -354,9 +353,7 @@ proc startDnsDiscoveryRetryLoop(waku: Waku): Future[void] {.async.} = info "Connecting to dynamic bootstrap peers" try: - await connectToNodes( - waku.node, waku.dynamicBootstrapNodes, "dynamic bootstrap" - ) + await connectToNodes(waku.node, waku.dynamicBootstrapNodes, "dynamic bootstrap") except CatchableError: error "failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg() return diff --git a/waku/messaging_client.nim b/waku/messaging_client.nim index c5e2f307f..32cb64a9e 100644 --- a/waku/messaging_client.nim +++ b/waku/messaging_client.nim @@ -1,7 +1,5 @@ import results, chronos -import - ./node/waku_node, - ./node/delivery_service/[recv_service, send_service] +import ./node/waku_node, ./node/delivery_service/[recv_service, send_service] type MessagingClient* = ref object sendService*: SendService diff --git a/waku/node/delivery_service/recv_service/recv_service.nim b/waku/node/delivery_service/recv_service/recv_service.nim index 6cb448114..500926cc7 100644 --- a/waku/node/delivery_service/recv_service/recv_service.nim +++ b/waku/node/delivery_service/recv_service/recv_service.nim @@ -76,7 +76,9 @@ proc processIncomingMessage( ## or if the message is a duplicate (recently-seen). Otherwise, save it as ## recently-seen, emit a MessageReceivedEvent, and return true. - if not self.node.subscriptionManager.isContentSubscribed(pubsubTopic, message.contentTopic): + if not self.node.subscriptionManager.isContentSubscribed( + pubsubTopic, message.contentTopic + ): trace "skipping message as I am not subscribed", shard = pubsubTopic, contentTopic = message.contentTopic return false diff --git a/waku/node/delivery_service/send_service/send_service.nim b/waku/node/delivery_service/send_service/send_service.nim index 23e7d6397..242dfc111 100644 --- a/waku/node/delivery_service/send_service/send_service.nim +++ b/waku/node/delivery_service/send_service/send_service.nim @@ -95,9 +95,7 @@ proc setupSendProcessorChain( return ok(processors[0]) proc new*( - T: typedesc[SendService], - preferP2PReliability: bool, - w: WakuNode, + T: typedesc[SendService], preferP2PReliability: bool, w: WakuNode ): Result[T, string] = if w.wakuRelay.isNil() and w.wakuLightpushClient.isNil(): return err( diff --git a/waku/node/kernel_api/relay.nim b/waku/node/kernel_api/relay.nim index 6fcced9e5..ef2c999ce 100644 --- a/waku/node/kernel_api/relay.nim +++ b/waku/node/kernel_api/relay.nim @@ -76,7 +76,8 @@ proc subscribe*( return err("Failed to decode subscription event: " & error) if contentTopicOp.isSome(): - return node.subscriptionManager.subscribe(pubsubTopic, contentTopicOp.get(), handler) + return + node.subscriptionManager.subscribe(pubsubTopic, contentTopicOp.get(), handler) return node.subscriptionManager.subscribeShard(pubsubTopic, handler) proc unsubscribe*( diff --git a/waku/node/node_types.nim b/waku/node/node_types.nim index 3a23993c7..1d41a1c97 100644 --- a/waku/node/node_types.nim +++ b/waku/node/node_types.nim @@ -93,7 +93,8 @@ type ShardSubscription* = object contentTopics*: HashSet[ContentTopic] - directShardSub*: bool ## shard subscribed directly (PubsubSub), independent of content-topic interest + directShardSub*: bool + ## shard subscribed directly (PubsubSub), independent of content-topic interest EdgeFilterSubState* = object peers*: seq[RemotePeerInfo] diff --git a/waku/node/subscription_manager.nim b/waku/node/subscription_manager.nim index 597c0f48e..4ac215acf 100644 --- a/waku/node/subscription_manager.nim +++ b/waku/node/subscription_manager.nim @@ -150,8 +150,9 @@ proc subscribeShard*( entry.directShardSub = true added = true do: - self.shards[shard] = - ShardSubscription(contentTopics: initHashSet[ContentTopic](), directShardSub: true) + self.shards[shard] = ShardSubscription( + contentTopics: initHashSet[ContentTopic](), directShardSub: true + ) added = true if added: self.edgeFilterWakeup.fire() @@ -220,9 +221,7 @@ proc unsubscribe*( self.node.doRelayUnsubscribe(shard) return ok() -proc subscribe*( - self: SubscriptionManager, topic: ContentTopic -): Result[void, string] = +proc subscribe*(self: SubscriptionManager, topic: ContentTopic): Result[void, string] = ## Subscribes to a content topic, resolving its shard via autosharding. let shard = ?self.getShardForContentTopic(topic) return self.subscribe(shard, topic)