Merge remote-tracking branch 'origin/feat/simple-lift-messaging' into feat/layered-config

This commit is contained in:
Fabiana Cecin 2026-06-01 20:35:33 -03:00
commit eb350b5c3b
No known key found for this signature in database
GPG Key ID: BCAB8A55CB51B6C7
37 changed files with 1448 additions and 1255 deletions

View File

@ -123,7 +123,7 @@ when isMainModule:
error "Waku initialization failed", error = error error "Waku initialization failed", error = error
quit(QuitFailure) quit(QuitFailure)
(waitFor startWaku(addr waku)).isOkOr: (waitFor waku.start()).isOkOr:
error "Starting waku failed", error = error error "Starting waku failed", error = error
quit(QuitFailure) quit(QuitFailure)

View File

@ -55,7 +55,7 @@ when isMainModule:
error "Waku initialization failed", error = error error "Waku initialization failed", error = error
quit(QuitFailure) quit(QuitFailure)
(waitFor startWaku(addr waku)).isOkOr: (waitFor waku.start()).isOkOr:
error "Starting waku failed", error = error error "Starting waku failed", error = error
quit(QuitFailure) quit(QuitFailure)

View File

@ -1,7 +1,7 @@
## Reliable Channel event types emitted to API consumers. ## Reliable Channel event types emitted to API consumers.
## ##
## Lifecycle events for individual segments (sent / propagated / errored) ## Lifecycle events for individual segments (sent / propagated / errored)
## are the same as the network-level ones the DeliveryService already ## are the same as the network-level ones the MessagingClient already
## emits — `requestId` is shared across layers — so we just re-export ## emits — `requestId` is shared across layers — so we just re-export
## `waku/events/message_events` and avoid declaring duplicates. ## `waku/events/message_events` and avoid declaring duplicates.
## ##

View File

@ -20,8 +20,7 @@ import bearssl/rand
import stew/byteutils import stew/byteutils
import libp2p/crypto/crypto as libp2p_crypto import libp2p/crypto/crypto as libp2p_crypto
import waku/api/api import waku/api/types
import waku/factory/waku as waku_factory
import waku/node/delivery_service/send_service import waku/node/delivery_service/send_service
import waku/waku_core/topics import waku/waku_core/topics
@ -32,7 +31,7 @@ import ./rate_limit_manager/rate_limit_manager
import ./encryption/encryption import ./encryption/encryption
export export
api, waku_factory, events, segmentation, scalable_data_sync, rate_limit_manager, types, send_service, events, segmentation, scalable_data_sync, rate_limit_manager,
encryption encryption
const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1" const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1"
@ -47,9 +46,10 @@ type
SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {. SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.
async: (raises: [CatchableError]), gcsafe async: (raises: [CatchableError]), gcsafe
.} .}
## Egress dispatch boundary. Defaults to `waku.send`; tests inject a ## Egress dispatch boundary. Typically wraps `MessagingClient.send`;
## fake that records calls and returns canned `RequestId`s so the ## tests inject a fake that records calls and returns canned
## send state machine can be exercised end-to-end without a network. ## `RequestId`s so the send state machine can be exercised end-to-end
## without a network.
MessagePersistence {.pure.} = enum MessagePersistence {.pure.} = enum
Persistent Persistent
@ -245,20 +245,20 @@ proc onReadyToSend(
meta: LipWireReliableChannelVersion.toBytes(), meta: LipWireReliableChannelVersion.toBytes(),
) )
## `waku.send` is not annotated `(raises: [])`, but this listener is. ## `sendHandler` is not annotated `(raises: [])`, but this listener is.
## Convert any raise to a Result error so the state machine handles ## Convert any raise to a Result error so the state machine handles
## both failure modes (Result.err and exception) through one path. ## both failure modes (Result.err and exception) through one path.
let sendRes = let sendRes =
try: try:
await self.sendHandler(envelope) await self.sendHandler(envelope)
except CatchableError as e: except CatchableError as e:
Result[RequestId, string].err("waku send raised: " & e.msg) Result[RequestId, string].err("messaging send raised: " & e.msg)
let messagingReqId = sendRes.valueOr: let messagingReqId = sendRes.valueOr:
MessageErrorEvent.emit( MessageErrorEvent.emit(
self.brokerCtx, self.brokerCtx,
MessageErrorEvent( MessageErrorEvent(
requestId: channelReqId, messageHash: "", error: "waku send failed: " & error requestId: channelReqId, messageHash: "", error: "messaging send failed: " & error
), ),
) )
self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.Failed self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.Failed
@ -365,7 +365,7 @@ proc onMessageReceived(
proc new*( proc new*(
T: type ReliableChannel, T: type ReliableChannel,
waku: Waku, sendHandler: SendHandler,
channelId: ChannelId, channelId: ChannelId,
contentTopic: ContentTopic, contentTopic: ContentTopic,
senderId: SdsParticipantID, senderId: SdsParticipantID,
@ -373,7 +373,6 @@ proc new*(
sdsConfig: SdsConfig, sdsConfig: SdsConfig,
rateConfig: RateLimitConfig, rateConfig: RateLimitConfig,
brokerCtx: BrokerContext = globalBrokerContext(), brokerCtx: BrokerContext = globalBrokerContext(),
sendHandler: SendHandler = nil,
): T = ): T =
## Pipeline handlers (segmentation/SDS/rate-limit) are constructed ## Pipeline handlers (segmentation/SDS/rate-limit) are constructed
## inside the channel rather than handed in by the caller — they are ## inside the channel rather than handed in by the caller — they are
@ -382,19 +381,11 @@ proc new*(
## `Decrypt` request brokers, so the channel keeps no per-instance ## `Decrypt` request brokers, so the channel keeps no per-instance
## encryption state either. ## encryption state either.
## ##
## `sendHandler` defaults to `waku.send`; tests pass a fake to drive ## `sendHandler` is the egress dispatch. The owning `ReliableChannelManager`
## the send state machine without touching the network. ## typically constructs it as a closure over `MessagingClient.send`. Tests
let resolvedSendHandler = ## pass a fake to drive the send state machine without touching the network.
if sendHandler.isNil():
proc(
envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} =
return await waku.send(envelope)
else:
sendHandler
let chn = T( let chn = T(
sendHandler: resolvedSendHandler, sendHandler: sendHandler,
channelId: channelId, channelId: channelId,
contentTopic: contentTopic, contentTopic: contentTopic,
senderId: senderId, senderId: senderId,

View File

@ -10,11 +10,10 @@ import results
import chronos import chronos
import stew/byteutils import stew/byteutils
import waku/api/api import brokers/broker_context
import waku/api/api_conf
import waku/events/message_events as waku_message_events import waku/events/message_events as waku_message_events
import waku/factory/waku as waku_factory import waku/messaging_client
import waku/node/delivery_service/delivery_service
import waku/waku_core/topics import waku/waku_core/topics
import ./reliable_channel import ./reliable_channel
@ -24,40 +23,43 @@ export reliable_channel
type ReliableChannelManager* = ref object type ReliableChannelManager* = ref object
channels: Table[ChannelId, ReliableChannel] channels: Table[ChannelId, ReliableChannel]
waku: Waku messagingClient: MessagingClient
## Owned by the manager. The channel layer reaches the messaging ## Borrowed from the owning `Waku`.
## API through `waku.send(envelope)`; constructing DeliveryTasks sendHandler: SendHandler
## directly would breach the layer boundary. ## Default egress dispatch for channels created through this manager.
## Constructed at mount time as a closure over `MessagingClient.send`
## so the channel layer itself stays callable-only.
brokerCtx: BrokerContext brokerCtx: BrokerContext
proc new*( proc new*(
T: type ReliableChannelManager, T: type ReliableChannelManager,
conf: WakuNodeConf, messagingClient: MessagingClient,
sendHandler: SendHandler,
brokerCtx: BrokerContext = globalBrokerContext(), brokerCtx: BrokerContext = globalBrokerContext(),
): Future[Result[T, string]] {.async.} = ): Result[T, string] =
## TODO !! The proper ownership chain is: if messagingClient.isNil():
## ReliableChannelManager -> DeliveryService (MessagingClient) -> Waku (Kernel/Protocols) -> WakuNode, return err("messaging client is required")
## and this will be implemented in the future. For now, `createNode` if sendHandler.isNil():
## is called here to get a Waku instance, and the WakuNode is immediately discarded. return err("sendHandler is required")
## This is a temporary workaround to get the API return ok(
T(
let waku = ?(await createNode(conf)) channels: initTable[ChannelId, ReliableChannel](),
messagingClient: messagingClient,
let manager = T( sendHandler: sendHandler,
channels: initTable[ChannelId, ReliableChannel](), waku: waku, brokerCtx: brokerCtx brokerCtx: brokerCtx,
)
) )
return ok(manager)
proc start*(self: ReliableChannelManager): Result[void, string] = proc start*(self: ReliableChannelManager): Result[void, string] =
## Bring the owned DeliveryService up. Separated from `new` so callers ## Placeholder: per-channel listeners are installed in `ReliableChannel.new`,
## can register encryption providers / create channels before traffic ## so the manager has nothing to start at this layer. Kept for symmetry
## starts flowing. ## with the `Waku` mount/start lifecycle and as a hook for future state.
self.waku.deliveryService.startDeliveryService() discard
ok()
proc stop*(self: ReliableChannelManager) {.async.} = proc stop*(self: ReliableChannelManager) {.async.} =
if not self.waku.isNil(): ## Placeholder mirror of `start`.
await self.waku.deliveryService.stopDeliveryService() discard
proc createReliableChannel*( proc createReliableChannel*(
self: ReliableChannelManager, self: ReliableChannelManager,
@ -66,17 +68,17 @@ proc createReliableChannel*(
senderId: SdsParticipantID, senderId: SdsParticipantID,
sendHandler: SendHandler = nil, sendHandler: SendHandler = nil,
): Result[ChannelId, string] = ): Result[ChannelId, string] =
## Spec entry point. The `DeliveryService` and `rng` the channel needs ## Spec entry point. The `sendHandler` and `rng` the channel needs are
## are sourced from the owning `ReliableChannelManager` rather than ## sourced from the owning `ReliableChannelManager` rather than passed
## passed per call. Encryption is wired up through the `Encrypt`/ ## per call. Encryption is wired up through the `Encrypt`/`Decrypt`
## `Decrypt` request brokers — the application installs its own ## request brokers — the application installs its own providers
## providers (or `setNoopEncryption()`) before traffic flows. ## (or `setNoopEncryption()`) before traffic flows.
## ##
## Segmentation, SDS and rate-limit configs will eventually be read ## Segmentation, SDS and rate-limit configs will eventually be read
## from the node's `NodeConfig`. Defaults for now. ## from the node's `NodeConfig`. Defaults for now.
## ##
## `sendHandler` is left `nil` in production so the channel uses the ## `sendHandler` defaults to the manager's default (constructed at mount
## owned `waku.send`; tests pass a fake to bypass the network. ## from `MessagingClient.send`); tests pass a fake to bypass the network.
if self.channels.hasKey(channelId): if self.channels.hasKey(channelId):
return err("channel already exists: " & channelId) return err("channel already exists: " & channelId)
@ -95,8 +97,14 @@ proc createReliableChannel*(
epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch
) )
let effectiveSendHandler =
if sendHandler.isNil():
self.sendHandler
else:
sendHandler
let chn = ReliableChannel.new( let chn = ReliableChannel.new(
waku = self.waku, sendHandler = effectiveSendHandler,
channelId = channelId, channelId = channelId,
contentTopic = contentTopic, contentTopic = contentTopic,
senderId = senderId, senderId = senderId,
@ -104,7 +112,6 @@ proc createReliableChannel*(
sdsConfig = sdsConfig, sdsConfig = sdsConfig,
rateConfig = rateConfig, rateConfig = rateConfig,
brokerCtx = self.brokerCtx, brokerCtx = self.brokerCtx,
sendHandler = sendHandler,
) )
self.channels[channelId] = chn self.channels[channelId] = chn
@ -137,5 +144,5 @@ proc send*(
## `ReliableChannel` installs its own `MessageReceivedEvent` listener ## `ReliableChannel` installs its own `MessageReceivedEvent` listener
## in `ReliableChannel.new`, filters by spec marker and `contentTopic`, ## in `ReliableChannel.new`, filters by spec marker and `contentTopic`,
## and routes to its private `onMessageReceived`. This keeps the lower ## and routes to its private `onMessageReceived`. This keeps the lower
## layer (MessagingAPI/Waku) unaware of the existence of ReliableChannel ## layer (MessagingClient/Waku) unaware of the existence of ReliableChannel
## and keeps the manager out of per-channel event dispatch. ## and keeps the manager out of per-channel event dispatch.

View File

@ -82,8 +82,12 @@ when isMainModule:
echo("Waku node created successfully!") echo("Waku node created successfully!")
node.mountMessagingClient().isOkOr:
echo "Failed to mount messaging: ", error
quit(QuitFailure)
# Start the node # Start the node
(waitFor startWaku(addr node)).isOkOr: (waitFor node.start()).isOkOr:
echo "Failed to start node: ", error echo "Failed to start node: ", error
quit(QuitFailure) quit(QuitFailure)

View File

@ -48,7 +48,7 @@ proc setup*(): Waku =
error "Waku initialization failed", error = error error "Waku initialization failed", error = error
quit(QuitFailure) quit(QuitFailure)
(waitFor startWaku(addr waku)).isOkOr: (waitFor waku.start()).isOkOr:
error "Starting waku failed", error = error error "Starting waku failed", error = error
quit(QuitFailure) quit(QuitFailure)

View File

@ -124,7 +124,17 @@ proc logosdelivery_start_node(
chronicles.error "ConnectionStatusChange.listen failed", err = $error chronicles.error "ConnectionStatusChange.listen failed", err = $error
return err("ConnectionStatusChange.listen failed: " & $error) return err("ConnectionStatusChange.listen failed: " & $error)
(await startWaku(addr ctx.myLib[])).isOkOr: ctx.myLib[].mountMessagingClient().isOkOr:
let errMsg = $error
chronicles.error "mountMessagingClient failed", error = errMsg
return err("failed to mount messaging: " & errMsg)
ctx.myLib[].mountReliableChannelManager().isOkOr:
let errMsg = $error
chronicles.error "mountReliableChannelManager failed", err = errMsg
return err("failed to mount reliable channel manager: " & errMsg)
(await ctx.myLib[].start()).isOkOr:
let errMsg = $error let errMsg = $error
chronicles.error "START_NODE failed", err = errMsg chronicles.error "START_NODE failed", err = errMsg
return err("failed to start: " & errMsg) return err("failed to start: " & errMsg)

View File

@ -71,7 +71,7 @@ registerReqFFI(CreateNodeRequest, ctx: ptr FFIContext[Waku]):
proc waku_start( proc waku_start(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
) {.ffi.} = ) {.ffi.} =
(await startWaku(ctx[].myLib)).isOkOr: (await ctx.myLib[].start()).isOkOr:
error "START_NODE failed", error = error error "START_NODE failed", error = error
return err("failed to start: " & $error) return err("failed to start: " & $error)
return ok("") return ok("")

View File

@ -30,7 +30,7 @@ let
# while others use the repo root. Pass both so the compiler finds either layout. # while others use the repo root. Pass both so the compiler finds either layout.
pathArgs = pathArgs =
builtins.concatStringsSep " " builtins.concatStringsSep " "
(builtins.concatMap (p: [ "--path:${p}" "--path:${p}/src" ]) (builtins.concatMap (p: [ "--path:${p}" "--path:${p}/src" "--path:${p}/sds" ])
(builtins.attrValues otherDeps)); (builtins.attrValues otherDeps));
libExt = libExt =

View File

@ -103,7 +103,9 @@ suite "LM API health checking":
client = (await createNode(conf)).valueOr: client = (await createNode(conf)).valueOr:
raiseAssert error raiseAssert error
(await startWaku(addr client)).isOkOr: client.mountMessagingClient().isOkOr:
raiseAssert error
(await client.start()).isOkOr:
raiseAssert error raiseAssert error
asyncTeardown: asyncTeardown:
@ -281,7 +283,9 @@ suite "LM API health checking":
edgeWaku = (await createNode(edgeConf)).valueOr: edgeWaku = (await createNode(edgeConf)).valueOr:
raiseAssert "Failed to create edge node: " & error raiseAssert "Failed to create edge node: " & error
(await startWaku(addr edgeWaku)).isOkOr: edgeWaku.mountMessagingClient().isOkOr:
raiseAssert "Failed to mount edge messaging: " & error
(await edgeWaku.start()).isOkOr:
raiseAssert "Failed to start edge waku: " & error raiseAssert "Failed to start edge waku: " & error
let relayReq = await RequestProtocolHealth.request( let relayReq = await RequestProtocolHealth.request(

View File

@ -6,6 +6,7 @@ import libp2p/[peerid, peerinfo, crypto/crypto]
import brokers/broker_context import brokers/broker_context
import ../testlib/[common, wakucore, wakunode, testasync] import ../testlib/[common, wakucore, wakunode, testasync]
import ../waku_archive/archive_utils import ../waku_archive/archive_utils
import waku/messaging_client
import import
waku, waku,
@ -16,7 +17,6 @@ import
waku_relay/protocol, waku_relay/protocol,
waku_archive, waku_archive,
waku_archive/common as archive_common, waku_archive/common as archive_common,
node/delivery_service/delivery_service,
node/delivery_service/recv_service, node/delivery_service/recv_service,
] ]
import waku/factory/waku_conf import waku/factory/waku_conf
@ -147,7 +147,8 @@ suite "Messaging API, Receive Service (store recovery)":
subscriber = (await createNode(createApiNodeConf(numShards))).expect( subscriber = (await createNode(createApiNodeConf(numShards))).expect(
"Failed to create subscriber" "Failed to create subscriber"
) )
(await startWaku(addr subscriber)).expect("Failed to start subscriber") subscriber.mountMessagingClient().expect("Failed to mount messaging")
(await subscriber.start()).expect("Failed to start subscriber")
# publish after the subscriber exists but before it connects to the # publish after the subscriber exists but before it connects to the
# store; the message reaches the archive but the subscriber doesn't # store; the message reaches the archive but the subscriber doesn't
@ -185,7 +186,7 @@ suite "Messaging API, Receive Service (store recovery)":
await eventManager.teardown() await eventManager.teardown()
# trigger store check, should recover and deliver via MessageReceivedEvent # trigger store check, should recover and deliver via MessageReceivedEvent
await subscriber.deliveryService.recvService.checkStore() await subscriber.messagingClient.recvService.checkStore()
let received = await eventManager.waitForEvents(TestTimeout) let received = await eventManager.waitForEvents(TestTimeout)
check received check received

View File

@ -241,7 +241,9 @@ suite "Waku API - Send":
lockNewGlobalBrokerContext: lockNewGlobalBrokerContext:
node = (await createNode(createApiNodeConf())).valueOr: node = (await createNode(createApiNodeConf())).valueOr:
raiseAssert error raiseAssert error
(await startWaku(addr node)).isOkOr: node.mountMessagingClient().isOkOr:
raiseAssert "Failed to mount messaging: " & error
(await node.start()).isOkOr:
raiseAssert "Failed to start Waku node: " & error raiseAssert "Failed to start Waku node: " & error
# node is not connected ! # node is not connected !
@ -263,7 +265,9 @@ suite "Waku API - Send":
lockNewGlobalBrokerContext: lockNewGlobalBrokerContext:
node = (await createNode(createApiNodeConf())).valueOr: node = (await createNode(createApiNodeConf())).valueOr:
raiseAssert error raiseAssert error
(await startWaku(addr node)).isOkOr: node.mountMessagingClient().isOkOr:
raiseAssert "Failed to mount messaging: " & error
(await node.start()).isOkOr:
raiseAssert "Failed to start Waku node: " & error raiseAssert "Failed to start Waku node: " & error
await node.node.connectToNodes( await node.node.connectToNodes(
@ -297,7 +301,9 @@ suite "Waku API - Send":
lockNewGlobalBrokerContext: lockNewGlobalBrokerContext:
node = (await createNode(createApiNodeConf())).valueOr: node = (await createNode(createApiNodeConf())).valueOr:
raiseAssert error raiseAssert error
(await startWaku(addr node)).isOkOr: node.mountMessagingClient().isOkOr:
raiseAssert "Failed to mount messaging: " & error
(await node.start()).isOkOr:
raiseAssert "Failed to start Waku node: " & error raiseAssert "Failed to start Waku node: " & error
await node.node.connectToNodes(@[relayNode1PeerInfo]) await node.node.connectToNodes(@[relayNode1PeerInfo])
@ -327,7 +333,9 @@ suite "Waku API - Send":
lockNewGlobalBrokerContext: lockNewGlobalBrokerContext:
node = (await createNode(createApiNodeConf())).valueOr: node = (await createNode(createApiNodeConf())).valueOr:
raiseAssert error raiseAssert error
(await startWaku(addr node)).isOkOr: node.mountMessagingClient().isOkOr:
raiseAssert "Failed to mount messaging: " & error
(await node.start()).isOkOr:
raiseAssert "Failed to start Waku node: " & error raiseAssert "Failed to start Waku node: " & error
await node.node.connectToNodes(@[lightpushNodePeerInfo]) await node.node.connectToNodes(@[lightpushNodePeerInfo])
@ -357,7 +365,9 @@ suite "Waku API - Send":
lockNewGlobalBrokerContext: lockNewGlobalBrokerContext:
node = (await createNode(createApiNodeConf())).valueOr: node = (await createNode(createApiNodeConf())).valueOr:
raiseAssert error raiseAssert error
(await startWaku(addr node)).isOkOr: node.mountMessagingClient().isOkOr:
raiseAssert "Failed to mount messaging: " & error
(await node.start()).isOkOr:
raiseAssert "Failed to start Waku node: " & error raiseAssert "Failed to start Waku node: " & error
await node.node.connectToNodes(@[lightpushNodePeerInfo, storeNodePeerInfo]) await node.node.connectToNodes(@[lightpushNodePeerInfo, storeNodePeerInfo])
@ -411,7 +421,9 @@ suite "Waku API - Send":
lockNewGlobalBrokerContext: lockNewGlobalBrokerContext:
node = (await createNode(createApiNodeConf(cli_args.WakuMode.Edge))).valueOr: node = (await createNode(createApiNodeConf(cli_args.WakuMode.Edge))).valueOr:
raiseAssert error raiseAssert error
(await startWaku(addr node)).isOkOr: node.mountMessagingClient().isOkOr:
raiseAssert "Failed to mount messaging: " & error
(await node.start()).isOkOr:
raiseAssert "Failed to start Waku node: " & error raiseAssert "Failed to start Waku node: " & error
await node.node.connectToNodes(@[fakeLightpushNodePeerInfo]) await node.node.connectToNodes(@[fakeLightpushNodePeerInfo])

View File

@ -5,6 +5,7 @@ import chronos, testutils/unittests, stew/byteutils
import libp2p/[peerid, peerinfo, multiaddress, crypto/crypto] import libp2p/[peerid, peerinfo, multiaddress, crypto/crypto]
import brokers/broker_context import brokers/broker_context
import ../testlib/[common, wakucore, wakunode, testasync] import ../testlib/[common, wakucore, wakunode, testasync]
import waku/messaging_client
import import
waku, waku,
@ -14,13 +15,14 @@ import
events/message_events, events/message_events,
waku_relay/protocol, waku_relay/protocol,
node/kernel_api/filter, node/kernel_api/filter,
node/delivery_service/subscription_manager, node/subscription_manager,
] ]
import waku/factory/waku_conf import waku/factory/waku_conf
import tools/confutils/cli_args import tools/confutils/cli_args
const TestTimeout = chronos.seconds(10) const TestTimeout = chronos.seconds(10)
const NegativeTestTimeout = chronos.seconds(2) const NegativeTestTimeout = chronos.seconds(2)
const EdgeWaitTimeout = chronos.seconds(60)
type ReceiveEventListenerManager = ref object type ReceiveEventListenerManager = ref object
brokerCtx: BrokerContext brokerCtx: BrokerContext
@ -85,7 +87,8 @@ proc setupSubscriberNode(conf: WakuNodeConf): Future[Waku] {.async.} =
var node: Waku var node: Waku
lockNewGlobalBrokerContext: lockNewGlobalBrokerContext:
node = (await createNode(conf)).expect("Failed to create subscriber node") node = (await createNode(conf)).expect("Failed to create subscriber node")
(await startWaku(addr node)).expect("Failed to start subscriber node") node.mountMessagingClient().expect("Failed to mount messaging")
(await node.start()).expect("Failed to start subscriber node")
return node return node
proc setupNetwork( proc setupNetwork(
@ -161,20 +164,39 @@ proc getRelayShard(node: WakuNode, contentTopic: ContentTopic): PubsubTopic =
return PubsubTopic($shardObj) return PubsubTopic($shardObj)
proc waitForMesh(node: WakuNode, shard: PubsubTopic) {.async.} = proc waitForMesh(node: WakuNode, shard: PubsubTopic) {.async.} =
for _ in 0 ..< 50: let deadline = Moment.now() + EdgeWaitTimeout
while Moment.now() < deadline:
if node.wakuRelay.getNumPeersInMesh(shard).valueOr(0) > 0: if node.wakuRelay.getNumPeersInMesh(shard).valueOr(0) > 0:
return return
await sleepAsync(100.milliseconds) await sleepAsync(100.milliseconds)
raise newException(ValueError, "GossipSub Mesh failed to stabilize on " & shard) raise newException(ValueError, "GossipSub Mesh failed to stabilize on " & shard)
proc waitForEdgeSubs(w: Waku, shard: PubsubTopic) {.async.} = proc waitForEdgeSubs(w: Waku, shard: PubsubTopic) {.async.} =
let sm = w.deliveryService.subscriptionManager let deadline = Moment.now() + EdgeWaitTimeout
for _ in 0 ..< 50: while Moment.now() < deadline:
if sm.edgeFilterPeerCount(shard) > 0: if w.node.subscriptionManager.edgeFilterPeerCount(shard) > 0:
return return
await sleepAsync(100.milliseconds) await sleepAsync(100.milliseconds)
raise newException(ValueError, "Edge filter subscription failed on " & shard) raise newException(ValueError, "Edge filter subscription failed on " & shard)
proc edgePeersReached(w: Waku, shard: PubsubTopic, n: int): Future[bool] {.async.} =
let deadline = Moment.now() + EdgeWaitTimeout
while Moment.now() < deadline:
if w.node.subscriptionManager.edgeFilterPeerCount(shard) >= n:
return true
await sleepAsync(100.milliseconds)
return false
proc edgePeersDroppedBelow(
w: Waku, shard: PubsubTopic, n: int
): Future[bool] {.async.} =
let deadline = Moment.now() + EdgeWaitTimeout
while Moment.now() < deadline:
if w.node.subscriptionManager.edgeFilterPeerCount(shard) < n:
return true
await sleepAsync(100.milliseconds)
return false
proc publishToMesh( proc publishToMesh(
net: TestNetwork, contentTopic: ContentTopic, payload: seq[byte] net: TestNetwork, contentTopic: ContentTopic, payload: seq[byte]
): Future[Result[int, string]] {.async.} = ): Future[Result[int, string]] {.async.} =
@ -621,7 +643,8 @@ suite "Messaging API, SubscriptionManager":
var subscriber: Waku var subscriber: Waku
lockNewGlobalBrokerContext: lockNewGlobalBrokerContext:
subscriber = (await createNode(conf)).expect("Failed to create edge subscriber") subscriber = (await createNode(conf)).expect("Failed to create edge subscriber")
(await startWaku(addr subscriber)).expect("Failed to start edge subscriber") subscriber.mountMessagingClient().expect("Failed to mount messaging")
(await subscriber.start()).expect("Failed to start edge subscriber")
# Connect edge subscriber to both filter servers so selectPeers finds both # Connect edge subscriber to both filter servers so selectPeers finds both
await subscriber.node.connectToNodes(@[publisherPeerInfo, meshBuddyPeerInfo]) await subscriber.node.connectToNodes(@[publisherPeerInfo, meshBuddyPeerInfo])
@ -632,12 +655,7 @@ suite "Messaging API, SubscriptionManager":
(await subscriber.subscribe(testTopic)).expect("Failed to subscribe") (await subscriber.subscribe(testTopic)).expect("Failed to subscribe")
# Wait for dialing both filter servers (HealthyThreshold = 2) # Wait for dialing both filter servers (HealthyThreshold = 2)
for _ in 0 ..< 100: check await edgePeersReached(subscriber, shard, 2)
if subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) >= 2:
break
await sleepAsync(100.milliseconds)
check subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) >= 2
# Verify message delivery with both servers alive # Verify message delivery with both servers alive
await waitForMesh(publisher, shard) await waitForMesh(publisher, shard)
@ -659,12 +677,8 @@ suite "Messaging API, SubscriptionManager":
await subscriber.node.disconnectNode(meshBuddyPeerInfo) await subscriber.node.disconnectNode(meshBuddyPeerInfo)
# Wait for the dead peer to be pruned # Wait for the dead peer to be pruned
for _ in 0 ..< 50: check await edgePeersDroppedBelow(subscriber, shard, 2)
if subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) < 2: check subscriber.node.subscriptionManager.edgeFilterPeerCount(shard) >= 1
break
await sleepAsync(100.milliseconds)
check subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) >= 1
# Verify messages still arrive through the surviving filter server (publisher) # Verify messages still arrive through the surviving filter server (publisher)
eventManager = newReceiveEventListenerManager(subscriber.brokerCtx, 1) eventManager = newReceiveEventListenerManager(subscriber.brokerCtx, 1)
@ -758,7 +772,8 @@ suite "Messaging API, SubscriptionManager":
var subscriber: Waku var subscriber: Waku
lockNewGlobalBrokerContext: lockNewGlobalBrokerContext:
subscriber = (await createNode(conf)).expect("Failed to create edge subscriber") subscriber = (await createNode(conf)).expect("Failed to create edge subscriber")
(await startWaku(addr subscriber)).expect("Failed to start edge subscriber") subscriber.mountMessagingClient().expect("Failed to mount messaging")
(await subscriber.start()).expect("Failed to start edge subscriber")
await subscriber.node.connectToNodes( await subscriber.node.connectToNodes(
@[publisherPeerInfo, meshBuddyPeerInfo, sparePeerInfo] @[publisherPeerInfo, meshBuddyPeerInfo, sparePeerInfo]
@ -770,23 +785,13 @@ suite "Messaging API, SubscriptionManager":
(await subscriber.subscribe(testTopic)).expect("Failed to subscribe") (await subscriber.subscribe(testTopic)).expect("Failed to subscribe")
# Wait for 2 confirmed peers (HealthyThreshold). The 3rd is available but not dialed. # Wait for 2 confirmed peers (HealthyThreshold). The 3rd is available but not dialed.
for _ in 0 ..< 100: check await edgePeersReached(subscriber, shard, 2)
if subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) >= 2: require subscriber.node.subscriptionManager.edgeFilterPeerCount(shard) == 2
break
await sleepAsync(100.milliseconds)
require subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) ==
2
await subscriber.node.disconnectNode(meshBuddyPeerInfo) await subscriber.node.disconnectNode(meshBuddyPeerInfo)
# Wait for the sub loop to detect the loss and dial a replacement # Wait for the sub loop to detect the loss and dial a replacement
for _ in 0 ..< 100: check await edgePeersReached(subscriber, shard, 2)
if subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) >= 2:
break
await sleepAsync(100.milliseconds)
check subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) >= 2
await waitForMesh(publisher, shard) await waitForMesh(publisher, shard)

View File

@ -1,6 +1,6 @@
{.used.} {.used.}
import std/[net] import std/[net, options]
import chronos, testutils/unittests, stew/byteutils import chronos, testutils/unittests, stew/byteutils
import brokers/broker_context import brokers/broker_context
@ -24,9 +24,9 @@ proc createApiNodeConf(): WakuNodeConf =
conf.listenAddress = parseIpAddress("0.0.0.0") conf.listenAddress = parseIpAddress("0.0.0.0")
conf.tcpPort = Port(0) conf.tcpPort = Port(0)
conf.discv5UdpPort = Port(0) conf.discv5UdpPort = Port(0)
conf.clusterId = 3'u16 conf.clusterId = some(3'u16)
conf.numShardsInNetwork = 1 conf.numShardsInNetwork = 1
conf.reliabilityEnabled = true conf.reliabilityEnabled = some(true)
conf.rest = false conf.rest = false
return conf return conf
@ -35,7 +35,7 @@ suite "Reliable Channel - ingress":
## Unit test for the receive side of the API: instead of standing ## Unit test for the receive side of the API: instead of standing
## up two libp2p nodes and a relay mesh, we drive the manager ## up two libp2p nodes and a relay mesh, we drive the manager
## directly by emitting a `MessageReceivedEvent` (the exact event ## directly by emitting a `MessageReceivedEvent` (the exact event
## the DeliveryService emits when a `WakuMessage` arrives off the ## the MessagingClient emits when a `WakuMessage` arrives off the
## wire). The manager must: ## wire). The manager must:
## - drop traffic missing the Reliable Channel spec marker ## - drop traffic missing the Reliable Channel spec marker
## - dispatch the matching channel's `onMessageReceived` ## - dispatch the matching channel's `onMessageReceived`
@ -45,13 +45,15 @@ suite "Reliable Channel - ingress":
contentTopic = ContentTopic("/reliable-channel/test/proto") contentTopic = ContentTopic("/reliable-channel/test/proto")
let appPayload = "hello reliable channel".toBytes() let appPayload = "hello reliable channel".toBytes()
var waku: Waku
var manager: ReliableChannelManager var manager: ReliableChannelManager
var brokerCtx: BrokerContext var brokerCtx: BrokerContext
lockNewGlobalBrokerContext: lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext() brokerCtx = globalBrokerContext()
manager = (await ReliableChannelManager.new(createApiNodeConf())).expect( waku = (await createNode(createApiNodeConf())).expect("createNode")
"Failed to create manager" waku.mountMessagingClient().expect("mountMessagingClient")
) waku.mountReliableChannelManager().expect("mountReliableChannelManager")
manager = waku.reliableChannelManager
## Noop encryption providers so the Encrypt/Decrypt brokers have ## Noop encryption providers so the Encrypt/Decrypt brokers have
## something to dispatch to; without this the channel falls back to ## something to dispatch to; without this the channel falls back to
@ -95,7 +97,7 @@ suite "Reliable Channel - ingress":
if arrived: if arrived:
check received.read() == appPayload check received.read() == appPayload
await manager.stop() (await waku.stop()).expect("stop")
asyncTest "manager drops unmarked WakuMessage": asyncTest "manager drops unmarked WakuMessage":
## Mirror of the above: same content topic, but `meta` is empty ## Mirror of the above: same content topic, but `meta` is empty
@ -105,13 +107,15 @@ suite "Reliable Channel - ingress":
contentTopic = ContentTopic("/reliable-channel/test/proto") contentTopic = ContentTopic("/reliable-channel/test/proto")
let appPayload = "foreign payload".toBytes() let appPayload = "foreign payload".toBytes()
var waku: Waku
var manager: ReliableChannelManager var manager: ReliableChannelManager
var brokerCtx: BrokerContext var brokerCtx: BrokerContext
lockNewGlobalBrokerContext: lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext() brokerCtx = globalBrokerContext()
manager = (await ReliableChannelManager.new(createApiNodeConf())).expect( waku = (await createNode(createApiNodeConf())).expect("createNode")
"Failed to create manager" waku.mountMessagingClient().expect("mountMessagingClient")
) waku.mountReliableChannelManager().expect("mountReliableChannelManager")
manager = waku.reliableChannelManager
setNoopEncryption() setNoopEncryption()
@ -146,7 +150,7 @@ suite "Reliable Channel - ingress":
await sleepAsync(100.milliseconds) await sleepAsync(100.milliseconds)
check not fired check not fired
await manager.stop() (await waku.stop()).expect("stop")
suite "Reliable Channel - send state machine": suite "Reliable Channel - send state machine":
asyncTest "MessageSentEvent finalises the channelReqId as Sent": asyncTest "MessageSentEvent finalises the channelReqId as Sent":
@ -162,13 +166,15 @@ suite "Reliable Channel - send state machine":
contentTopic = ContentTopic("/reliable-channel/test/sm-success") contentTopic = ContentTopic("/reliable-channel/test/sm-success")
fakeMsgReqId = RequestId("fake-msg-req-1") fakeMsgReqId = RequestId("fake-msg-req-1")
var waku: Waku
var manager: ReliableChannelManager var manager: ReliableChannelManager
var brokerCtx: BrokerContext var brokerCtx: BrokerContext
lockNewGlobalBrokerContext: lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext() brokerCtx = globalBrokerContext()
manager = (await ReliableChannelManager.new(createApiNodeConf())).expect( waku = (await createNode(createApiNodeConf())).expect("createNode")
"Failed to create manager" waku.mountMessagingClient().expect("mountMessagingClient")
) waku.mountReliableChannelManager().expect("mountReliableChannelManager")
manager = waku.reliableChannelManager
setNoopEncryption() setNoopEncryption()
@ -213,7 +219,7 @@ suite "Reliable Channel - send state machine":
if finalised: if finalised:
check sentFut.read() == channelReqId check sentFut.read() == channelReqId
await manager.stop() (await waku.stop()).expect("stop")
asyncTest "two independent channelReqIds are finalised independently": asyncTest "two independent channelReqIds are finalised independently":
## Two `send()` calls -> two independent `channelReqId`s, each with ## Two `send()` calls -> two independent `channelReqId`s, each with
@ -227,13 +233,15 @@ suite "Reliable Channel - send state machine":
channelId = ChannelId("sm-multi-channel") channelId = ChannelId("sm-multi-channel")
contentTopic = ContentTopic("/reliable-channel/test/sm-multi") contentTopic = ContentTopic("/reliable-channel/test/sm-multi")
var waku: Waku
var manager: ReliableChannelManager var manager: ReliableChannelManager
var brokerCtx: BrokerContext var brokerCtx: BrokerContext
lockNewGlobalBrokerContext: lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext() brokerCtx = globalBrokerContext()
manager = (await ReliableChannelManager.new(createApiNodeConf())).expect( waku = (await createNode(createApiNodeConf())).expect("createNode")
"Failed to create manager" waku.mountMessagingClient().expect("mountMessagingClient")
) waku.mountReliableChannelManager().expect("mountReliableChannelManager")
manager = waku.reliableChannelManager
setNoopEncryption() setNoopEncryption()
@ -303,7 +311,7 @@ suite "Reliable Channel - send state machine":
if erroredArrived: if erroredArrived:
check erroredFut.read() == channelReqId2 check erroredFut.read() == channelReqId2
await manager.stop() (await waku.stop()).expect("stop")
asyncTest "TODO: channelReqId not pruned until ALL its segments are final": asyncTest "TODO: channelReqId not pruned until ALL its segments are final":
## Placeholder for the multi-sibling prune rule. Today's ## Placeholder for the multi-sibling prune rule. Today's

View File

@ -15,8 +15,7 @@ import
node/health_monitor/protocol_health, node/health_monitor/protocol_health,
node/health_monitor/topic_health, node/health_monitor/topic_health,
node/health_monitor/node_health_monitor, node/health_monitor/node_health_monitor,
node/delivery_service/delivery_service, messaging_client,
node/delivery_service/subscription_manager,
node/kernel_api/relay, node/kernel_api/relay,
node/kernel_api/store, node/kernel_api/store,
node/kernel_api/lightpush, node/kernel_api/lightpush,
@ -27,6 +26,7 @@ import
] ]
import ../testlib/[wakunode, wakucore], ../waku_archive/archive_utils import ../testlib/[wakunode, wakucore], ../waku_archive/archive_utils
import waku/node/subscription_manager
const MockDLow = 4 # Mocked GossipSub DLow value const MockDLow = 4 # Mocked GossipSub DLow value
@ -229,8 +229,8 @@ suite "Health Monitor - events":
await nodeA.start() await nodeA.start()
let ds = let ds =
DeliveryService.new(false, nodeA).expect("Failed to create DeliveryService") MessagingClient.new(false, nodeA).expect("Failed to create MessagingClient")
ds.startDeliveryService().expect("Failed to start DeliveryService") ds.start().expect("Failed to start MessagingClient")
let monitorA = NodeHealthMonitor.new(nodeA) let monitorA = NodeHealthMonitor.new(nodeA)
@ -317,7 +317,7 @@ suite "Health Monitor - events":
lastStatus == ConnectionStatus.Disconnected lastStatus == ConnectionStatus.Disconnected
await monitorA.stopHealthMonitor() await monitorA.stopHealthMonitor()
await ds.stopDeliveryService() await ds.stop()
await nodeA.stop() await nodeA.stop()
asyncTest "Edge health driven by confirmed filter subscriptions": asyncTest "Edge health driven by confirmed filter subscriptions":
@ -333,9 +333,9 @@ suite "Health Monitor - events":
await nodeA.start() await nodeA.start()
let ds = let ds =
DeliveryService.new(false, nodeA).expect("Failed to create DeliveryService") MessagingClient.new(false, nodeA).expect("Failed to create MessagingClient")
ds.startDeliveryService().expect("Failed to start DeliveryService") ds.start().expect("Failed to start MessagingClient")
let subMgr = ds.subscriptionManager let subMgr = nodeA.subscriptionManager
var nodeB: WakuNode var nodeB: WakuNode
lockNewGlobalBrokerContext: lockNewGlobalBrokerContext:
@ -416,7 +416,7 @@ suite "Health Monitor - events":
await EventShardTopicHealthChange.dropListener(nodeA.brokerCtx, shardHealthLis) await EventShardTopicHealthChange.dropListener(nodeA.brokerCtx, shardHealthLis)
check shardHealthOk == true check shardHealthOk == true
check subMgr.edgeFilterSubStates.len > 0 check nodeA.subscriptionManager.edgeFilterSubStates.len > 0
healthSignal.clear() healthSignal.clear()
deadline = Moment.now() + TestConnectivityTimeLimit deadline = Moment.now() + TestConnectivityTimeLimit
@ -428,7 +428,7 @@ suite "Health Monitor - events":
check lastStatus == ConnectionStatus.PartiallyConnected check lastStatus == ConnectionStatus.PartiallyConnected
await ds.stopDeliveryService() await ds.stop()
await monitorA.stopHealthMonitor() await monitorA.stopHealthMonitor()
await nodeB.stop() await nodeB.stop()
await nodeA.stop() await nodeA.stop()

View File

@ -9,7 +9,8 @@ import
libp2p/peerId, libp2p/peerId,
libp2p/crypto/crypto, libp2p/crypto/crypto,
eth/keys, eth/keys,
eth/p2p/discoveryv5/enr eth/p2p/discoveryv5/enr,
brokers/broker_context
import import
waku/[ waku/[
@ -184,114 +185,115 @@ suite "Waku Peer Exchange":
suite "Waku Peer Exchange with discv5": suite "Waku Peer Exchange with discv5":
asyncTest "Node successfully exchanges px peers with real discv5": asyncTest "Node successfully exchanges px peers with real discv5":
## Given (copied from test_waku_discv5.nim) lockNewGlobalBrokerContext:
let ## Given (copied from test_waku_discv5.nim)
# todo: px flag let
flags = CapabilitiesBitfield.init( # todo: px flag
lightpush = false, filter = false, store = false, relay = true flags = CapabilitiesBitfield.init(
) lightpush = false, filter = false, store = false, relay = true
bindIp = parseIpAddress("0.0.0.0") )
extIp = parseIpAddress("127.0.0.1") bindIp = parseIpAddress("0.0.0.0")
extIp = parseIpAddress("127.0.0.1")
nodeKey1 = generateSecp256k1Key() nodeKey1 = generateSecp256k1Key()
nodeTcpPort1 = Port(64010) nodeTcpPort1 = Port(64010)
nodeUdpPort1 = Port(9000) nodeUdpPort1 = Port(9000)
node1 = newTestWakuNode( node1 = newTestWakuNode(
nodeKey1, nodeKey1,
bindIp, bindIp,
nodeTcpPort1, nodeTcpPort1,
some(extIp), some(extIp),
wakuFlags = some(flags), wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort1), discv5UdpPort = some(nodeUdpPort1),
)
nodeKey2 = generateSecp256k1Key()
nodeTcpPort2 = Port(64012)
nodeUdpPort2 = Port(9002)
node2 = newTestWakuNode(
nodeKey2,
bindIp,
nodeTcpPort2,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort2),
)
nodeKey3 = generateSecp256k1Key()
nodeTcpPort3 = Port(64014)
nodeUdpPort3 = Port(9004)
node3 = newTestWakuNode(
nodeKey3,
bindIp,
nodeTcpPort3,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort3),
)
# discv5
let conf1 = WakuDiscoveryV5Config(
discv5Config: none(DiscoveryConfig),
address: bindIp,
port: nodeUdpPort1,
privateKey: keys.PrivateKey(nodeKey1.skkey),
bootstrapRecords: @[],
autoupdateRecord: true,
) )
nodeKey2 = generateSecp256k1Key() let disc1 =
nodeTcpPort2 = Port(64012) WakuDiscoveryV5.new(node1.rng, conf1, some(node1.enr), some(node1.peerManager))
nodeUdpPort2 = Port(9002)
node2 = newTestWakuNode( let conf2 = WakuDiscoveryV5Config(
nodeKey2, discv5Config: none(DiscoveryConfig),
bindIp, address: bindIp,
nodeTcpPort2, port: nodeUdpPort2,
some(extIp), privateKey: keys.PrivateKey(nodeKey2.skkey),
wakuFlags = some(flags), bootstrapRecords: @[disc1.protocol.getRecord()],
discv5UdpPort = some(nodeUdpPort2), autoupdateRecord: true,
) )
nodeKey3 = generateSecp256k1Key() let disc2 =
nodeTcpPort3 = Port(64014) WakuDiscoveryV5.new(node2.rng, conf2, some(node2.enr), some(node2.peerManager))
nodeUdpPort3 = Port(9004)
node3 = newTestWakuNode( await allFutures(node1.start(), node2.start(), node3.start())
nodeKey3, let resultDisc1StartRes = await disc1.start()
bindIp, assert resultDisc1StartRes.isOk(), resultDisc1StartRes.error
nodeTcpPort3, let resultDisc2StartRes = await disc2.start()
some(extIp), assert resultDisc2StartRes.isOk(), resultDisc2StartRes.error
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort3), ## When
var attempts = 10
while (disc1.protocol.nodesDiscovered < 1 or disc2.protocol.nodesDiscovered < 1) and
attempts > 0:
await sleepAsync(1.seconds)
attempts -= 1
# node2 can be connected, so will be returned by peer exchange
require (
await node1.peerManager.connectPeer(node2.switch.peerInfo.toRemotePeerInfo())
) )
# discv5 # Mount peer exchange
let conf1 = WakuDiscoveryV5Config( await node1.mountPeerExchange()
discv5Config: none(DiscoveryConfig), await node3.mountPeerExchange()
address: bindIp, await node3.mountPeerExchangeClient()
port: nodeUdpPort1,
privateKey: keys.PrivateKey(nodeKey1.skkey),
bootstrapRecords: @[],
autoupdateRecord: true,
)
let disc1 = let dialResponse =
WakuDiscoveryV5.new(node1.rng, conf1, some(node1.enr), some(node1.peerManager)) await node3.dialForPeerExchange(node1.switch.peerInfo.toRemotePeerInfo())
let conf2 = WakuDiscoveryV5Config( check dialResponse.isOk
discv5Config: none(DiscoveryConfig),
address: bindIp,
port: nodeUdpPort2,
privateKey: keys.PrivateKey(nodeKey2.skkey),
bootstrapRecords: @[disc1.protocol.getRecord()],
autoupdateRecord: true,
)
let disc2 = let
WakuDiscoveryV5.new(node2.rng, conf2, some(node2.enr), some(node2.peerManager)) requestPeers = 1
currentPeers = node3.peerManager.switch.peerStore.peers.len
let res = await node3.fetchPeerExchangePeers(1)
check res.tryGet() == 1
await allFutures(node1.start(), node2.start(), node3.start()) # Then node3 has received 1 peer from node1
let resultDisc1StartRes = await disc1.start() check:
assert resultDisc1StartRes.isOk(), resultDisc1StartRes.error node3.peerManager.switch.peerStore.peers.len == currentPeers + requestPeers
let resultDisc2StartRes = await disc2.start()
assert resultDisc2StartRes.isOk(), resultDisc2StartRes.error
## When await allFutures(
var attempts = 10 [node1.stop(), node2.stop(), node3.stop(), disc1.stop(), disc2.stop()]
while (disc1.protocol.nodesDiscovered < 1 or disc2.protocol.nodesDiscovered < 1) and )
attempts > 0:
await sleepAsync(1.seconds)
attempts -= 1
# node2 can be connected, so will be returned by peer exchange
require (
await node1.peerManager.connectPeer(node2.switch.peerInfo.toRemotePeerInfo())
)
# Mount peer exchange
await node1.mountPeerExchange()
await node3.mountPeerExchange()
await node3.mountPeerExchangeClient()
let dialResponse =
await node3.dialForPeerExchange(node1.switch.peerInfo.toRemotePeerInfo())
check dialResponse.isOk
let
requestPeers = 1
currentPeers = node3.peerManager.switch.peerStore.peers.len
let res = await node3.fetchPeerExchangePeers(1)
check res.tryGet() == 1
# Then node3 has received 1 peer from node1
check:
node3.peerManager.switch.peerStore.peers.len == currentPeers + requestPeers
await allFutures(
[node1.stop(), node2.stop(), node3.stop(), disc1.stop(), disc2.stop()]
)

View File

@ -431,7 +431,7 @@ suite "Waku Discovery v5":
let waku0 = (await Waku.new(conf)).valueOr: let waku0 = (await Waku.new(conf)).valueOr:
raiseAssert error raiseAssert error
(waitFor startWaku(addr waku0)).isOkOr: (waitFor waku0.start()).isOkOr:
raiseAssert error raiseAssert error
confBuilder.withNodeKey(crypto.PrivateKey.random(Secp256k1, myRng[])[]) confBuilder.withNodeKey(crypto.PrivateKey.random(Secp256k1, myRng[])[])
@ -445,7 +445,7 @@ suite "Waku Discovery v5":
let waku1 = (await Waku.new(conf1)).valueOr: let waku1 = (await Waku.new(conf1)).valueOr:
raiseAssert error raiseAssert error
(waitFor startWaku(addr waku1)).isOkOr: (waitFor waku1.start()).isOkOr:
raiseAssert error raiseAssert error
await waku1.node.mountPeerExchange() await waku1.node.mountPeerExchange()
@ -461,7 +461,7 @@ suite "Waku Discovery v5":
let waku2 = (await Waku.new(conf2)).valueOr: let waku2 = (await Waku.new(conf2)).valueOr:
raiseAssert error raiseAssert error
(waitFor startWaku(addr waku2)).isOkOr: (waitFor waku2.start()).isOkOr:
raiseAssert error raiseAssert error
# leave some time for discv5 to act # leave some time for discv5 to act

View File

@ -5,7 +5,8 @@ import
testutils/unittests, testutils/unittests,
chronos, chronos,
libp2p/[switch, peerId, crypto/crypto], libp2p/[switch, peerId, crypto/crypto],
eth/[keys, p2p/discoveryv5/enr] eth/[keys, p2p/discoveryv5/enr],
brokers/broker_context
import import
waku/[ waku/[
@ -31,110 +32,113 @@ suite "Waku Peer Exchange":
suite "request": suite "request":
asyncTest "Retrieve and provide peer exchange peers from discv5": asyncTest "Retrieve and provide peer exchange peers from discv5":
## Given (copied from test_waku_discv5.nim) lockNewGlobalBrokerContext:
let ## Given (copied from test_waku_discv5.nim)
# todo: px flag let
flags = CapabilitiesBitfield.init( # todo: px flag
lightpush = false, filter = false, store = false, relay = true flags = CapabilitiesBitfield.init(
) lightpush = false, filter = false, store = false, relay = true
bindIp = parseIpAddress("0.0.0.0") )
extIp = parseIpAddress("127.0.0.1") bindIp = parseIpAddress("0.0.0.0")
extIp = parseIpAddress("127.0.0.1")
nodeKey1 = generateSecp256k1Key() nodeKey1 = generateSecp256k1Key()
nodeTcpPort1 = Port(64010) nodeTcpPort1 = Port(64010)
nodeUdpPort1 = Port(9000) nodeUdpPort1 = Port(9000)
node1 = newTestWakuNode( node1 = newTestWakuNode(
nodeKey1, nodeKey1,
bindIp, bindIp,
nodeTcpPort1, nodeTcpPort1,
some(extIp), some(extIp),
wakuFlags = some(flags), wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort1), discv5UdpPort = some(nodeUdpPort1),
)
nodeKey2 = generateSecp256k1Key()
nodeTcpPort2 = Port(64012)
nodeUdpPort2 = Port(9002)
node2 = newTestWakuNode(
nodeKey2,
bindIp,
nodeTcpPort2,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort2),
)
nodeKey3 = generateSecp256k1Key()
nodeTcpPort3 = Port(64014)
nodeUdpPort3 = Port(9004)
node3 = newTestWakuNode(
nodeKey3,
bindIp,
nodeTcpPort3,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort3),
)
# discv5
let conf1 = WakuDiscoveryV5Config(
discv5Config: none(DiscoveryConfig),
address: bindIp,
port: nodeUdpPort1,
privateKey: keys.PrivateKey(nodeKey1.skkey),
bootstrapRecords: @[],
autoupdateRecord: true,
) )
nodeKey2 = generateSecp256k1Key() let disc1 = WakuDiscoveryV5.new(
nodeTcpPort2 = Port(64012) node1.rng, conf1, some(node1.enr), some(node1.peerManager)
nodeUdpPort2 = Port(9002)
node2 = newTestWakuNode(
nodeKey2,
bindIp,
nodeTcpPort2,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort2),
) )
nodeKey3 = generateSecp256k1Key() let conf2 = WakuDiscoveryV5Config(
nodeTcpPort3 = Port(64014) discv5Config: none(DiscoveryConfig),
nodeUdpPort3 = Port(9004) address: bindIp,
node3 = newTestWakuNode( port: nodeUdpPort2,
nodeKey3, privateKey: keys.PrivateKey(nodeKey2.skkey),
bindIp, bootstrapRecords: @[disc1.protocol.getRecord()],
nodeTcpPort3, autoupdateRecord: true,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort3),
) )
# discv5 let disc2 = WakuDiscoveryV5.new(
let conf1 = WakuDiscoveryV5Config( node2.rng, conf2, some(node2.enr), some(node2.peerManager)
discv5Config: none(DiscoveryConfig), )
address: bindIp,
port: nodeUdpPort1,
privateKey: keys.PrivateKey(nodeKey1.skkey),
bootstrapRecords: @[],
autoupdateRecord: true,
)
let disc1 = await allFutures(node1.start(), node2.start(), node3.start())
WakuDiscoveryV5.new(node1.rng, conf1, some(node1.enr), some(node1.peerManager)) let resultDisc1StartRes = await disc1.start()
assert resultDisc1StartRes.isOk(), resultDisc1StartRes.error
let resultDisc2StartRes = await disc2.start()
assert resultDisc2StartRes.isOk(), resultDisc2StartRes.error
let conf2 = WakuDiscoveryV5Config( ## When
discv5Config: none(DiscoveryConfig), var attempts = 10
address: bindIp, while (disc1.protocol.nodesDiscovered < 1 or disc2.protocol.nodesDiscovered < 1) and
port: nodeUdpPort2, attempts > 0:
privateKey: keys.PrivateKey(nodeKey2.skkey), await sleepAsync(1.seconds)
bootstrapRecords: @[disc1.protocol.getRecord()], attempts -= 1
autoupdateRecord: true,
)
let disc2 = # node2 can be connected, so will be returned by peer exchange
WakuDiscoveryV5.new(node2.rng, conf2, some(node2.enr), some(node2.peerManager)) require (
await node1.peerManager.connectPeer(node2.switch.peerInfo.toRemotePeerInfo())
)
await allFutures(node1.start(), node2.start(), node3.start()) # Mount peer exchange
let resultDisc1StartRes = await disc1.start() await node1.mountPeerExchange()
assert resultDisc1StartRes.isOk(), resultDisc1StartRes.error await node3.mountPeerExchange()
let resultDisc2StartRes = await disc2.start()
assert resultDisc2StartRes.isOk(), resultDisc2StartRes.error
## When let dialResponse =
var attempts = 10 await node3.dialForPeerExchange(node1.switch.peerInfo.toRemotePeerInfo())
while (disc1.protocol.nodesDiscovered < 1 or disc2.protocol.nodesDiscovered < 1) and let response = dialResponse.get()
attempts > 0:
await sleepAsync(1.seconds)
attempts -= 1
# node2 can be connected, so will be returned by peer exchange ## Then
require ( check:
await node1.peerManager.connectPeer(node2.switch.peerInfo.toRemotePeerInfo()) response.get().peerInfos.len == 1
) response.get().peerInfos[0].enr == disc2.protocol.localNode.record.raw
# Mount peer exchange await allFutures(
await node1.mountPeerExchange() [node1.stop(), node2.stop(), node3.stop(), disc1.stop(), disc2.stop()]
await node3.mountPeerExchange() )
let dialResponse =
await node3.dialForPeerExchange(node1.switch.peerInfo.toRemotePeerInfo())
let response = dialResponse.get()
## Then
check:
response.get().peerInfos.len == 1
response.get().peerInfos[0].enr == disc2.protocol.localNode.record.raw
await allFutures(
[node1.stop(), node2.stop(), node3.stop(), disc1.stop(), disc2.stop()]
)
asyncTest "Request returns some discovered peers": asyncTest "Request returns some discovered peers":
let let

View File

@ -46,7 +46,7 @@ suite "Wakunode2 - Waku initialization":
var waku = (waitFor Waku.new(conf)).valueOr: var waku = (waitFor Waku.new(conf)).valueOr:
raiseAssert error raiseAssert error
(waitFor startWaku(addr waku)).isOkOr: (waitFor waku.start()).isOkOr:
raiseAssert error raiseAssert error
## Then ## Then
@ -71,7 +71,7 @@ suite "Wakunode2 - Waku initialization":
var waku = (waitFor Waku.new(conf)).valueOr: var waku = (waitFor Waku.new(conf)).valueOr:
raiseAssert error raiseAssert error
(waitFor startWaku(addr waku)).isOkOr: (waitFor waku.start()).isOkOr:
raiseAssert error raiseAssert error
## Then ## Then
@ -128,7 +128,7 @@ suite "Wakunode2 - Waku initialization":
(waitFor waku.stop()).isOkOr: (waitFor waku.stop()).isOkOr:
raiseAssert error raiseAssert error
(waitFor startWaku(addr waku)).isOkOr: (waitFor waku.start()).isOkOr:
raiseAssert error raiseAssert error
let portsJson = waku.stateInfo.getNodeInfoItem(NodeInfoId.MyBoundPorts) let portsJson = waku.stateInfo.getNodeInfoItem(NodeInfoId.MyBoundPorts)

View File

@ -3,9 +3,10 @@ import std/[net, options]
import chronicles, chronos, libp2p/peerid, results import chronicles, chronos, libp2p/peerid, results
import waku/factory/waku import waku/factory/waku
import waku/messaging_client
import waku/[requests/health_requests, waku_core, waku_node] import waku/[requests/health_requests, waku_core, waku_node]
import waku/node/delivery_service/send_service import waku/node/delivery_service/send_service
import waku/node/delivery_service/subscription_manager import waku/node/subscription_manager
import ../../tools/confutils/cli_args import ../../tools/confutils/cli_args
import ../../tools/confutils/messaging_conf import ../../tools/confutils/messaging_conf
import ./[api_conf, types] import ./[api_conf, types]
@ -64,39 +65,15 @@ proc subscribe*(
): Future[Result[void, string]] {.async.} = ): Future[Result[void, string]] {.async.} =
?checkApiAvailability(w) ?checkApiAvailability(w)
return w.deliveryService.subscriptionManager.subscribe(contentTopic) return w.node.subscriptionManager.subscribe(contentTopic)
proc unsubscribe*(w: Waku, contentTopic: ContentTopic): Result[void, string] = proc unsubscribe*(w: Waku, contentTopic: ContentTopic): Result[void, string] =
?checkApiAvailability(w) ?checkApiAvailability(w)
return w.deliveryService.subscriptionManager.unsubscribe(contentTopic) return w.node.subscriptionManager.unsubscribe(contentTopic)
proc send*( proc send*(
w: Waku, envelope: MessageEnvelope w: Waku, envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async.} = ): Future[Result[RequestId, string]] {.async.} =
?checkApiAvailability(w) ?checkApiAvailability(w)
return await w.messagingClient.send(envelope)
let isSubbed = w.deliveryService.subscriptionManager
.isSubscribed(envelope.contentTopic)
.valueOr(false)
if not isSubbed:
info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic
w.deliveryService.subscriptionManager.subscribe(envelope.contentTopic).isOkOr:
warn "Failed to auto-subscribe", error = error
return err("Failed to auto-subscribe before sending: " & error)
let requestId = RequestId.new(w.rng)
let deliveryTask = DeliveryTask.new(requestId, envelope, w.brokerCtx).valueOr:
return err("API send: Failed to create delivery task: " & error)
info "API send: scheduling delivery task",
requestId = $requestId,
pubsubTopic = deliveryTask.pubsubTopic,
contentTopic = deliveryTask.msg.contentTopic,
msgHash = deliveryTask.msgHash.to0xHex(),
myPeerId = w.node.peerId()
asyncSpawn w.deliveryService.sendService.send(deliveryTask)
return ok(requestId)

View File

@ -30,12 +30,12 @@ import
waku_enr/sharding, waku_enr/sharding,
waku_enr/multiaddr, waku_enr/multiaddr,
api/types, api/types,
messaging_client,
common/logging, common/logging,
node/peer_manager, node/peer_manager,
node/health_monitor, node/health_monitor,
node/waku_metrics, node/waku_metrics,
node/delivery_service/delivery_service, node/subscription_manager,
node/delivery_service/subscription_manager,
rest_api/message_cache, rest_api/message_cache,
rest_api/endpoint/server, rest_api/endpoint/server,
rest_api/endpoint/builder as rest_server_builder, rest_api/endpoint/builder as rest_server_builder,
@ -48,6 +48,7 @@ import
factory/app_callbacks, factory/app_callbacks,
persistency/persistency, persistency/persistency,
], ],
channels/reliable_channel_manager,
./waku_conf, ./waku_conf,
./waku_state_info ./waku_state_info
@ -73,7 +74,9 @@ type Waku* = ref object
healthMonitor*: NodeHealthMonitor healthMonitor*: NodeHealthMonitor
deliveryService*: DeliveryService messagingClient*: MessagingClient
reliableChannelManager*: ReliableChannelManager
restServer*: WakuRestServerRef restServer*: WakuRestServerRef
metricsServer*: MetricsHttpServerRef metricsServer*: MetricsHttpServerRef
@ -215,10 +218,6 @@ proc new*(
error "Failed setting up app callbacks", error = error error "Failed setting up app callbacks", error = error
return err("Failed setting up app callbacks: " & $error) return err("Failed setting up app callbacks: " & $error)
## Delivery Monitor
let deliveryService = DeliveryService.new(wakuConf.p2pReliability, node).valueOr:
return err("could not create delivery service: " & $error)
var waku = Waku( var waku = Waku(
stateInfo: WakuStateInfo.init(node), stateInfo: WakuStateInfo.init(node),
conf: wakuConf, conf: wakuConf,
@ -226,7 +225,6 @@ proc new*(
key: wakuConf.nodeKey, key: wakuConf.nodeKey,
node: node, node: node,
healthMonitor: healthMonitor, healthMonitor: healthMonitor,
deliveryService: deliveryService,
appCallbacks: appCallbacks, appCallbacks: appCallbacks,
restServer: restServer, restServer: restServer,
brokerCtx: brokerCtx, brokerCtx: brokerCtx,
@ -254,9 +252,9 @@ proc getPorts(
return ok((tcpPort: tcpPort, websocketPort: websocketPort)) return ok((tcpPort: tcpPort, websocketPort: websocketPort))
proc getRunningNetConfig(waku: ptr Waku): Future[Result[NetConfig, string]] {.async.} = proc getRunningNetConfig(waku: Waku): Future[Result[NetConfig, string]] {.async.} =
let conf = waku[].conf let conf = waku.conf
let (tcpPort, websocketPort) = getPorts(waku[].node.switch.peerInfo.listenAddrs).valueOr: let (tcpPort, websocketPort) = getPorts(waku.node.switch.peerInfo.listenAddrs).valueOr:
return err("Could not retrieve ports: " & error) return err("Could not retrieve ports: " & error)
if tcpPort.isSome(): if tcpPort.isSome():
@ -276,67 +274,67 @@ proc getRunningNetConfig(waku: ptr Waku): Future[Result[NetConfig, string]] {.as
return ok(netConf) return ok(netConf)
proc updateEnr(waku: ptr Waku): Future[Result[void, string]] {.async.} = proc updateEnr(waku: Waku): Future[Result[void, string]] {.async.} =
let netConf: NetConfig = (await getRunningNetConfig(waku)).valueOr: let netConf: NetConfig = (await getRunningNetConfig(waku)).valueOr:
return err("error calling updateNetConfig: " & $error) return err("error calling updateNetConfig: " & $error)
let record = enrConfiguration(waku[].conf, netConf).valueOr: let record = enrConfiguration(waku.conf, netConf).valueOr:
return err("ENR setup failed: " & error) return err("ENR setup failed: " & error)
if isClusterMismatched(record, waku[].conf.clusterId): if isClusterMismatched(record, waku.conf.clusterId):
return err("cluster-id mismatch configured shards") return err("cluster-id mismatch configured shards")
waku[].node.enr = record waku.node.enr = record
# If TCP/WS was configured with port 0, node.announcedAddresses was built # If TCP/WS was configured with port 0, node.announcedAddresses was built
# pre-bind with a port value of 0. In any case, the resync is harmless. # pre-bind with a port value of 0. In any case, the resync is harmless.
waku[].node.announcedAddresses = netConf.announcedAddresses waku.node.announcedAddresses = netConf.announcedAddresses
return ok() return ok()
proc updateAddressInENR(waku: ptr Waku): Result[void, string] = proc updateAddressInENR(waku: Waku): Result[void, string] =
let addresses: seq[MultiAddress] = waku[].node.announcedAddresses let addresses: seq[MultiAddress] = waku.node.announcedAddresses
let encodedAddrs = multiaddr.encodeMultiaddrs(addresses) let encodedAddrs = multiaddr.encodeMultiaddrs(addresses)
## First update the enr info contained in WakuNode ## First update the enr info contained in WakuNode
let keyBytes = waku[].key.getRawBytes().valueOr: let keyBytes = waku.key.getRawBytes().valueOr:
return err("failed to retrieve raw bytes from waku key: " & $error) return err("failed to retrieve raw bytes from waku key: " & $error)
let parsedPk = keys.PrivateKey.fromHex(keyBytes.toHex()).valueOr: let parsedPk = keys.PrivateKey.fromHex(keyBytes.toHex()).valueOr:
return err("failed to parse the private key: " & $error) return err("failed to parse the private key: " & $error)
let enrFields = @[toFieldPair(MultiaddrEnrField, encodedAddrs)] let enrFields = @[toFieldPair(MultiaddrEnrField, encodedAddrs)]
waku[].node.enr.update(parsedPk, extraFields = enrFields).isOkOr: waku.node.enr.update(parsedPk, extraFields = enrFields).isOkOr:
return err("failed to update multiaddress in ENR updateAddressInENR: " & $error) return err("failed to update multiaddress in ENR updateAddressInENR: " & $error)
info "Waku node ENR updated successfully with new multiaddress", info "Waku node ENR updated successfully with new multiaddress",
enr = waku[].node.enr.toUri(), record = $(waku[].node.enr) enr = waku.node.enr.toUri(), record = $(waku.node.enr)
## Now update the ENR infor in discv5 ## Now update the ENR infor in discv5
if not waku[].wakuDiscv5.isNil(): if not waku.wakuDiscv5.isNil():
waku[].wakuDiscv5.protocol.localNode.record = waku[].node.enr waku.wakuDiscv5.protocol.localNode.record = waku.node.enr
let enr = waku[].wakuDiscv5.protocol.localNode.record let enr = waku.wakuDiscv5.protocol.localNode.record
info "Waku discv5 ENR updated successfully with new multiaddress", info "Waku discv5 ENR updated successfully with new multiaddress",
enr = enr.toUri(), record = $(enr) enr = enr.toUri(), record = $(enr)
return ok() return ok()
proc updateWaku(waku: ptr Waku): Future[Result[void, string]] {.async.} = proc updateWaku(waku: Waku): Future[Result[void, string]] {.async.} =
(await updateEnr(waku)).isOkOr: (await updateEnr(waku)).isOkOr:
return err("error calling updateEnr: " & $error) return err("error calling updateEnr: " & $error)
?updateAnnouncedAddrWithPrimaryIpAddr(waku[].node) ?updateAnnouncedAddrWithPrimaryIpAddr(waku.node)
?updateAddressInENR(waku) ?updateAddressInENR(waku)
return ok() return ok()
proc startDnsDiscoveryRetryLoop(waku: ptr Waku): Future[void] {.async.} = proc startDnsDiscoveryRetryLoop(waku: Waku): Future[void] {.async.} =
while true: while true:
await sleepAsync(30.seconds) await sleepAsync(30.seconds)
if waku.conf.dnsDiscoveryConf.isSome(): if waku.conf.dnsDiscoveryConf.isSome():
let dnsDiscoveryConf = waku.conf.dnsDiscoveryConf.get() let dnsDiscoveryConf = waku.conf.dnsDiscoveryConf.get()
waku[].dynamicBootstrapNodes = ( waku.dynamicBootstrapNodes = (
await waku_dnsdisc.retrieveDynamicBootstrapNodes( await waku_dnsdisc.retrieveDynamicBootstrapNodes(
dnsDiscoveryConf.enrTreeUrl, dnsDiscoveryConf.nameServers dnsDiscoveryConf.enrTreeUrl, dnsDiscoveryConf.nameServers
) )
@ -344,35 +342,61 @@ proc startDnsDiscoveryRetryLoop(waku: ptr Waku): Future[void] {.async.} =
error "Retrieving dynamic bootstrap nodes failed", error = error error "Retrieving dynamic bootstrap nodes failed", error = error
continue continue
if not waku[].wakuDiscv5.isNil(): if not waku.wakuDiscv5.isNil():
let dynamicBootstrapEnrs = waku[].dynamicBootstrapNodes let dynamicBootstrapEnrs =
.filterIt(it.hasUdpPort()) waku.dynamicBootstrapNodes.filterIt(it.hasUdpPort()).mapIt(it.enr.get().toUri())
.mapIt(it.enr.get().toUri())
var discv5BootstrapEnrs: seq[enr.Record] var discv5BootstrapEnrs: seq[enr.Record]
# parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq # parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq
for enrUri in dynamicBootstrapEnrs: for enrUri in dynamicBootstrapEnrs:
addBootstrapNode(enrUri, discv5BootstrapEnrs) addBootstrapNode(enrUri, discv5BootstrapEnrs)
waku[].wakuDiscv5.updateBootstrapRecords( waku.wakuDiscv5.updateBootstrapRecords(
waku[].wakuDiscv5.protocol.bootstrapRecords & discv5BootstrapEnrs waku.wakuDiscv5.protocol.bootstrapRecords & discv5BootstrapEnrs
) )
info "Connecting to dynamic bootstrap peers" info "Connecting to dynamic bootstrap peers"
try: try:
await connectToNodes( await connectToNodes(waku.node, waku.dynamicBootstrapNodes, "dynamic bootstrap")
waku[].node, waku[].dynamicBootstrapNodes, "dynamic bootstrap"
)
except CatchableError: except CatchableError:
error "failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg() error "failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg()
return return
proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: []).} = proc mountMessagingClient*(waku: Waku): Result[void, string] =
if waku[].node.started: if not waku.messagingClient.isNil():
warn "startWaku: waku node already started" return err("messaging client already mounted")
if waku.node.started:
return err("cannot mount messaging client on a started node")
waku.messagingClient = MessagingClient.new(waku.conf.p2pReliability, waku.node).valueOr:
return err("could not create messaging client: " & $error)
return ok()
proc mountReliableChannelManager*(waku: Waku): Result[void, string] =
if not waku.reliableChannelManager.isNil():
return err("reliable channel manager already mounted")
if waku.messagingClient.isNil():
return err("reliable channel manager requires a mounted messaging client")
if waku.node.started:
return err("cannot mount reliable channel manager on a started node")
let messagingClient = waku.messagingClient
let defaultSendHandler: SendHandler = proc(
envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} =
return await messagingClient.send(envelope)
waku.reliableChannelManager = ReliableChannelManager.new(
messagingClient, defaultSendHandler, waku.brokerCtx
).valueOr:
return err("could not create reliable channel manager: " & $error)
return ok()
proc start*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
if waku.node.started:
warn "start: waku node already started"
return ok() return ok()
info "Retrieve dynamic bootstrap nodes" info "Retrieve dynamic bootstrap nodes"
let conf = waku[].conf let conf = waku.conf
if conf.dnsDiscoveryConf.isSome(): if conf.dnsDiscoveryConf.isSome():
let dnsDiscoveryConf = waku.conf.dnsDiscoveryConf.get() let dnsDiscoveryConf = waku.conf.dnsDiscoveryConf.get()
@ -390,9 +414,9 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
error "Retrieving dynamic bootstrap nodes failed", error "Retrieving dynamic bootstrap nodes failed",
error = dynamicBootstrapNodesRes.error error = dynamicBootstrapNodesRes.error
# Start Dns Discovery retry loop # Start Dns Discovery retry loop
waku[].dnsRetryLoopHandle = waku.startDnsDiscoveryRetryLoop() waku.dnsRetryLoopHandle = waku.startDnsDiscoveryRetryLoop()
else: else:
waku[].dynamicBootstrapNodes = dynamicBootstrapNodesRes.get() waku.dynamicBootstrapNodes = dynamicBootstrapNodesRes.get()
## Initialize persistency singleton instance - we don't need the instance itself here, ## Initialize persistency singleton instance - we don't need the instance itself here,
## but this ensures it's initialized before any store job starts. ## but this ensures it's initialized before any store job starts.
@ -405,12 +429,12 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
let bound = getPorts(waku.node.switch.peerInfo.listenAddrs).valueOr: let bound = getPorts(waku.node.switch.peerInfo.listenAddrs).valueOr:
return err("failed to read bound ports from switch: " & $error) return err("failed to read bound ports from switch: " & $error)
waku[].node.ports.tcp = bound.tcpPort.get(Port(0)).uint16 waku.node.ports.tcp = bound.tcpPort.get(Port(0)).uint16
waku[].node.ports.webSocket = bound.websocketPort.get(Port(0)).uint16 waku.node.ports.webSocket = bound.websocketPort.get(Port(0)).uint16
## Discv5 ## Discv5
if conf.discv5Conf.isSome(): if conf.discv5Conf.isSome():
waku[].wakuDiscV5 = ( waku.wakuDiscV5 = (
await waku_discv5.setupAndStartDiscv5( await waku_discv5.setupAndStartDiscv5(
waku.node.enr, waku.node.enr,
waku.node.peerManager, waku.node.peerManager,
@ -425,23 +449,21 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
).valueOr: ).valueOr:
return err("failed to start waku discovery v5: " & error) return err("failed to start waku discovery v5: " & error)
waku[].node.ports.discv5Udp = waku[].wakuDiscV5.udpPort.uint16 waku.node.ports.discv5Udp = waku.wakuDiscV5.udpPort.uint16
waku[].conf.discv5Conf.get().udpPort = waku[].wakuDiscV5.udpPort waku.conf.discv5Conf.get().udpPort = waku.wakuDiscV5.udpPort
## Update waku data that is set dynamically on node start ## Update waku data that is set dynamically on node start
try: try:
(await updateWaku(waku)).isOkOr: (await updateWaku(waku)).isOkOr:
return err("Error in startWaku: " & $error) return err("Error in start: " & $error)
except CatchableError: except CatchableError:
return err("Caught exception in startWaku: " & getCurrentExceptionMsg()) return err("Caught exception in start: " & getCurrentExceptionMsg())
## Reliability waku.node.subscriptionManager.subscribeAllAutoshards().isOkOr:
if not waku[].deliveryService.isNil(): return err("failed to auto-subscribe autosharding shards: " & $error)
waku[].deliveryService.startDeliveryService().isOkOr:
return err("failed to start delivery service: " & $error)
## Health Monitor ## Health Monitor
waku[].healthMonitor.startHealthMonitor().isOkOr: waku.healthMonitor.startHealthMonitor().isOkOr:
return err("failed to start health monitor: " & $error) return err("failed to start health monitor: " & $error)
## Setup RequestConnectionStatus provider ## Setup RequestConnectionStatus provider
@ -450,7 +472,7 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
globalBrokerContext(), globalBrokerContext(),
proc(): Result[RequestConnectionStatus, string] = proc(): Result[RequestConnectionStatus, string] =
try: try:
let healthReport = waku[].healthMonitor.getSyncNodeHealthReport() let healthReport = waku.healthMonitor.getSyncNodeHealthReport()
return return
ok(RequestConnectionStatus(connectionStatus: healthReport.connectionStatus)) ok(RequestConnectionStatus(connectionStatus: healthReport.connectionStatus))
except CatchableError: except CatchableError:
@ -467,7 +489,7 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
): Future[Result[RequestProtocolHealth, string]] {.async.} = ): Future[Result[RequestProtocolHealth, string]] {.async.} =
try: try:
let protocolHealthStatus = let protocolHealthStatus =
await waku[].healthMonitor.getProtocolHealthInfo(protocol) await waku.healthMonitor.getProtocolHealthInfo(protocol)
return ok(RequestProtocolHealth(healthStatus: protocolHealthStatus)) return ok(RequestProtocolHealth(healthStatus: protocolHealthStatus))
except CatchableError: except CatchableError:
return err("Failed to get protocol health: " & getCurrentExceptionMsg()), return err("Failed to get protocol health: " & getCurrentExceptionMsg()),
@ -480,7 +502,7 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
globalBrokerContext(), globalBrokerContext(),
proc(): Future[Result[RequestHealthReport, string]] {.async.} = proc(): Future[Result[RequestHealthReport, string]] {.async.} =
try: try:
let report = await waku[].healthMonitor.getNodeHealthReport() let report = await waku.healthMonitor.getNodeHealthReport()
return ok(RequestHealthReport(healthReport: report)) return ok(RequestHealthReport(healthReport: report))
except CatchableError: except CatchableError:
return err("Failed to get health report: " & getCurrentExceptionMsg()), return err("Failed to get health report: " & getCurrentExceptionMsg()),
@ -489,9 +511,9 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
if conf.restServerConf.isSome(): if conf.restServerConf.isSome():
rest_server_builder.startRestServerProtocolSupport( rest_server_builder.startRestServerProtocolSupport(
waku[].restServer, waku.restServer,
waku[].node, waku.node,
waku[].wakuDiscv5, waku.wakuDiscv5,
conf.restServerConf.get(), conf.restServerConf.get(),
conf.relay, conf.relay,
conf.lightPush, conf.lightPush,
@ -509,21 +531,27 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
) )
).valueOr: ).valueOr:
return err("Starting monitoring and external interfaces failed: " & error) return err("Starting monitoring and external interfaces failed: " & error)
waku[].metricsServer = server waku.metricsServer = server
waku[].node.ports.metrics = port.uint16 waku.node.ports.metrics = port.uint16
waku[].conf.metricsServerConf.get().httpPort = port waku.conf.metricsServerConf.get().httpPort = port
except CatchableError: except CatchableError:
return err( return err(
"Caught exception starting monitoring and external interfaces failed: " & "Caught exception starting monitoring and external interfaces failed: " &
getCurrentExceptionMsg() getCurrentExceptionMsg()
) )
waku[].healthMonitor.setOverallHealth(HealthStatus.READY) waku.healthMonitor.setOverallHealth(HealthStatus.READY)
if not waku.messagingClient.isNil():
waku.messagingClient.start().isOkOr:
return err("failed to start messaging client: " & $error)
if not waku.reliableChannelManager.isNil():
waku.reliableChannelManager.start().isOkOr:
return err("failed to start reliable channel manager: " & $error)
return ok() return ok()
proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} = proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
## Waku shutdown
if not waku.node.started: if not waku.node.started:
warn "stop: attempting to stop node that isn't running" warn "stop: attempting to stop node that isn't running"
@ -538,9 +566,11 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
if not waku.wakuDiscv5.isNil(): if not waku.wakuDiscv5.isNil():
await waku.wakuDiscv5.stop() await waku.wakuDiscv5.stop()
if not waku.deliveryService.isNil(): if not waku.reliableChannelManager.isNil():
await waku.deliveryService.stopDeliveryService() await waku.reliableChannelManager.stop()
waku.deliveryService = nil
if not waku.messagingClient.isNil():
await waku.messagingClient.stop()
if not waku.node.isNil(): if not waku.node.isNil():
await waku.node.stop() await waku.node.stop()

63
waku/messaging_client.nim Normal file
View File

@ -0,0 +1,63 @@
import results, chronos
import chronicles
import
./api/types,
./node/[
waku_node,
subscription_manager,
delivery_service/recv_service,
delivery_service/send_service,
delivery_service/send_service/delivery_task,
]
type MessagingClient* = ref object
node: WakuNode
sendService*: SendService
recvService*: RecvService
started: bool
proc new*(
T: type MessagingClient, useP2PReliability: bool, node: WakuNode
): Result[T, string] =
let sendService = ?SendService.new(useP2PReliability, node)
let recvService = RecvService.new(node)
ok(T(node: node, sendService: sendService, recvService: recvService))
proc start*(self: MessagingClient): Result[void, string] =
if self.started:
return ok()
self.recvService.startRecvService()
self.sendService.startSendService()
self.started = true
ok()
proc stop*(self: MessagingClient) {.async.} =
if not self.started:
return
await self.sendService.stopSendService()
await self.recvService.stopRecvService()
self.started = false
proc send*(
self: MessagingClient, envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async.} =
## High-level messaging API send. Auto-subscribes to the content topic
## (so the local node sees its own gossipsub broadcast), builds a
## `DeliveryTask`, and hands it to the send service. Returns the request
## id the caller can correlate with `MessageSentEvent` / `MessageErrorEvent`.
let isSubbed =
self.node.subscriptionManager.isSubscribed(envelope.contentTopic).valueOr(false)
if not isSubbed:
info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic
self.node.subscriptionManager.subscribe(envelope.contentTopic).isOkOr:
warn "Failed to auto-subscribe", error = error
return err("Failed to auto-subscribe before sending: " & error)
let requestId = RequestId.new(self.node.rng)
let deliveryTask = DeliveryTask.new(requestId, envelope, self.node.brokerCtx).valueOr:
return err("MessagingClient.send: Failed to create delivery task: " & error)
asyncSpawn self.sendService.send(deliveryTask)
return ok(requestId)

View File

@ -1,44 +0,0 @@
## This module helps to ensure the correct transmission and reception of messages
import results
import chronos, chronicles
import
./recv_service,
./send_service,
./subscription_manager,
waku/[
waku_core, waku_node, waku_store/client, waku_relay/protocol, waku_lightpush/client
]
type DeliveryService* = ref object
sendService*: SendService
recvService*: RecvService
subscriptionManager*: SubscriptionManager
proc new*(
T: type DeliveryService, useP2PReliability: bool, w: WakuNode
): Result[T, string] =
## storeClient is needed to give store visitility to DeliveryService
## wakuRelay and wakuLightpushClient are needed to give a mechanism to SendService to re-publish
let subscriptionManager = SubscriptionManager.new(w)
let sendService = ?SendService.new(useP2PReliability, w, subscriptionManager)
let recvService = RecvService.new(w, subscriptionManager)
return ok(
DeliveryService(
sendService: sendService,
recvService: recvService,
subscriptionManager: subscriptionManager,
)
)
proc startDeliveryService*(self: DeliveryService): Result[void, string] =
?self.subscriptionManager.startSubscriptionManager()
self.recvService.startRecvService()
self.sendService.startSendService()
return ok()
proc stopDeliveryService*(self: DeliveryService) {.async.} =
await self.sendService.stopSendService()
await self.recvService.stopRecvService()
await self.subscriptionManager.stopSubscriptionManager()

View File

@ -4,17 +4,17 @@
import std/[tables, sequtils, options, sets] import std/[tables, sequtils, options, sets]
import chronos, chronicles, libp2p/utility import chronos, chronicles, libp2p/utility
import ../[subscription_manager]
import brokers/broker_context import brokers/broker_context
import import
waku/[ waku/[
waku_core, waku_core,
waku_core/topics,
waku_store/client, waku_store/client,
waku_store/common, waku_store/common,
waku_filter_v2/client, waku_filter_v2/client,
waku_core/topics,
events/message_events, events/message_events,
waku_node, waku_node,
node/subscription_manager,
] ]
const StoreCheckPeriod = chronos.minutes(5) ## How often to perform store queries const StoreCheckPeriod = chronos.minutes(5) ## How often to perform store queries
@ -38,7 +38,6 @@ type RecvService* = ref object of RootObj
brokerCtx: BrokerContext brokerCtx: BrokerContext
node: WakuNode node: WakuNode
seenMsgListener: MessageSeenEventListener seenMsgListener: MessageSeenEventListener
subscriptionManager: SubscriptionManager
recentReceivedMsgs: seq[RecvMessage] recentReceivedMsgs: seq[RecvMessage]
@ -77,7 +76,9 @@ proc processIncomingMessage(
## or if the message is a duplicate (recently-seen). Otherwise, save it as ## or if the message is a duplicate (recently-seen). Otherwise, save it as
## recently-seen, emit a MessageReceivedEvent, and return true. ## recently-seen, emit a MessageReceivedEvent, and return true.
if not self.subscriptionManager.isSubscribed(pubsubTopic, message.contentTopic): if not self.node.subscriptionManager.isContentSubscribed(
pubsubTopic, message.contentTopic
):
trace "skipping message as I am not subscribed", trace "skipping message as I am not subscribed",
shard = pubsubTopic, contentTopic = message.contentTopic shard = pubsubTopic, contentTopic = message.contentTopic
return false return false
@ -101,7 +102,7 @@ proc checkStore*(self: RecvService) {.async.} =
self.endTimeToCheck = getNowInNanosecondTime() self.endTimeToCheck = getNowInNanosecondTime()
## query store and deliver new recovered messages per subscribed topic ## query store and deliver new recovered messages per subscribed topic
for pubsubTopic, contentTopics in self.subscriptionManager.subscribedTopics: for pubsubTopic, contentTopics in self.node.subscriptionManager.subscribedContentTopics:
let storeResp: StoreQueryResponse = ( let storeResp: StoreQueryResponse = (
await self.node.wakuStoreClient.queryToAny( await self.node.wakuStoreClient.queryToAny(
StoreQueryRequest( StoreQueryRequest(
@ -146,7 +147,7 @@ proc msgChecker(self: RecvService) {.async.} =
await sleepAsync(StoreCheckPeriod) await sleepAsync(StoreCheckPeriod)
await self.checkStore() await self.checkStore()
proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionManager): T = proc new*(T: typedesc[RecvService], node: WakuNode): T =
## The storeClient will help to acquire any possible missed messages ## The storeClient will help to acquire any possible missed messages
let now = getNowInNanosecondTime() let now = getNowInNanosecondTime()
@ -154,7 +155,6 @@ proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionManager): T =
node: node, node: node,
startTimeToCheck: now, startTimeToCheck: now,
brokerCtx: node.brokerCtx, brokerCtx: node.brokerCtx,
subscriptionManager: s,
recentReceivedMsgs: @[], recentReceivedMsgs: @[],
) )

View File

@ -6,10 +6,10 @@ import chronos, chronicles, libp2p/utility
import brokers/broker_context import brokers/broker_context
import import
./[send_processor, relay_processor, lightpush_processor, delivery_task], ./[send_processor, relay_processor, lightpush_processor, delivery_task],
../[subscription_manager],
waku/[ waku/[
waku_core, waku_core,
node/waku_node, node/waku_node,
node/subscription_manager,
node/peer_manager, node/peer_manager,
waku_store/client, waku_store/client,
waku_store/common, waku_store/common,
@ -58,7 +58,6 @@ type SendService* = ref object of RootObj
node: WakuNode node: WakuNode
checkStoreForMessages: bool checkStoreForMessages: bool
subscriptionManager: SubscriptionManager
proc setupSendProcessorChain( proc setupSendProcessorChain(
peerManager: PeerManager, peerManager: PeerManager,
@ -96,10 +95,7 @@ proc setupSendProcessorChain(
return ok(processors[0]) return ok(processors[0])
proc new*( proc new*(
T: typedesc[SendService], T: typedesc[SendService], preferP2PReliability: bool, w: WakuNode
preferP2PReliability: bool,
w: WakuNode,
s: SubscriptionManager,
): Result[T, string] = ): Result[T, string] =
if w.wakuRelay.isNil() and w.wakuLightpushClient.isNil(): if w.wakuRelay.isNil() and w.wakuLightpushClient.isNil():
return err( return err(
@ -120,7 +116,6 @@ proc new*(
sendProcessor: sendProcessorChain, sendProcessor: sendProcessorChain,
node: w, node: w,
checkStoreForMessages: checkStoreForMessages, checkStoreForMessages: checkStoreForMessages,
subscriptionManager: s,
) )
return ok(sendService) return ok(sendService)
@ -263,7 +258,7 @@ proc send*(self: SendService, task: DeliveryTask) {.async.} =
info "SendService.send: processing delivery task", info "SendService.send: processing delivery task",
requestId = task.requestId, msgHash = task.msgHash.to0xHex() requestId = task.requestId, msgHash = task.msgHash.to0xHex()
self.subscriptionManager.subscribe(task.msg.contentTopic).isOkOr: self.node.subscriptionManager.subscribe(task.msg.contentTopic).isOkOr:
error "SendService.send: failed to subscribe to content topic", error "SendService.send: failed to subscribe to content topic",
contentTopic = task.msg.contentTopic, error = error contentTopic = task.msg.contentTopic, error = error

View File

@ -1,596 +0,0 @@
import std/[sequtils, sets, tables, options, strutils], chronos, chronicles, results
import libp2p/[peerid, peerinfo]
import brokers/broker_context
import
waku/[
waku_core,
waku_core/topics,
waku_core/topics/sharding,
waku_node,
waku_relay,
waku_filter_v2/common as filter_common,
waku_filter_v2/client as filter_client,
waku_filter_v2/protocol as filter_protocol,
events/health_events,
events/peer_events,
requests/health_requests,
node/peer_manager,
node/health_monitor/topic_health,
node/health_monitor/connection_status,
]
# ---------------------------------------------------------------------------
# Logos Messaging API SubscriptionManager
#
# Maps all topic subscription intent and centralizes all consistency
# maintenance of the pubsub and content topic subscription model across
# the various network drivers that handle topics (Edge/Filter and Core/Relay).
# ---------------------------------------------------------------------------
type EdgeFilterSubState* = object
peers: seq[RemotePeerInfo]
## Filter service peers with confirmed subscriptions on this shard.
pending: seq[Future[void]] ## In-flight dial futures for peers not yet confirmed.
pendingPeers: HashSet[PeerId] ## PeerIds of peers currently being dialed.
currentHealth: TopicHealth
## Cached health derived from peers.len; updated on every peer set change.
func toTopicHealth*(peersCount: int): TopicHealth =
if peersCount >= HealthyThreshold:
TopicHealth.SUFFICIENTLY_HEALTHY
elif peersCount > 0:
TopicHealth.MINIMALLY_HEALTHY
else:
TopicHealth.UNHEALTHY
type SubscriptionManager* = ref object of RootObj
node: WakuNode
contentTopicSubs: Table[PubsubTopic, HashSet[ContentTopic]]
## Map of Shard to ContentTopic needed because e.g. WakuRelay is PubsubTopic only.
## A present key with an empty HashSet value means pubsubtopic already subscribed
## (via subscribePubsubTopics()) but there's no specific content topic interest yet.
edgeFilterSubStates*: Table[PubsubTopic, EdgeFilterSubState]
## Per-shard filter subscription state for edge mode.
edgeFilterWakeup: AsyncEvent
## Signalled when the edge filter sub loop should re-reconcile.
edgeFilterSubLoopFut: Future[void]
edgeFilterHealthLoopFut: Future[void]
peerEventListener: WakuPeerEventListener
## Listener for peer connect/disconnect events (edge filter wakeup).
iterator subscribedTopics*(
self: SubscriptionManager
): (PubsubTopic, HashSet[ContentTopic]) =
## Iterate over all subscribed content topics, batched per shard.
## This is guaranteed to return a non-empty `topics` (content topics) list on iteration.
for pubsub, topics in self.contentTopicSubs.pairs:
# We are iterating over subscribed content topics; if we are subscribed to
# a shard but have no subscription (interest) for any content topic in that
# shard, then avoid triggering an iteration that doesn't advance the intent
# to iterate over content topic subscriptions.
if topics.len == 0:
continue
yield (pubsub, topics)
proc edgeFilterPeerCount*(sm: SubscriptionManager, shard: PubsubTopic): int =
sm.edgeFilterSubStates.withValue(shard, state):
return state.peers.len
return 0
proc new*(T: typedesc[SubscriptionManager], node: WakuNode): T =
SubscriptionManager(
node: node, contentTopicSubs: initTable[PubsubTopic, HashSet[ContentTopic]]()
)
proc addContentTopicInterest(
self: SubscriptionManager, shard: PubsubTopic, topic: ContentTopic
): Result[void, string] =
var changed = false
if not self.contentTopicSubs.hasKey(shard):
self.contentTopicSubs[shard] = initHashSet[ContentTopic]()
changed = true
self.contentTopicSubs.withValue(shard, cTopics):
if not cTopics[].contains(topic):
cTopics[].incl(topic)
changed = true
if changed and not isNil(self.edgeFilterWakeup):
self.edgeFilterWakeup.fire()
return ok()
proc removeContentTopicInterest(
self: SubscriptionManager, shard: PubsubTopic, topic: ContentTopic
): Result[void, string] =
var changed = false
self.contentTopicSubs.withValue(shard, cTopics):
if cTopics[].contains(topic):
cTopics[].excl(topic)
changed = true
if cTopics[].len == 0 and isNil(self.node.wakuRelay):
self.contentTopicSubs.del(shard) # We're done with cTopics here
if changed and not isNil(self.edgeFilterWakeup):
self.edgeFilterWakeup.fire()
return ok()
proc subscribePubsubTopics(
self: SubscriptionManager, shards: seq[PubsubTopic]
): Result[void, string] =
if isNil(self.node.wakuRelay):
return err("subscribePubsubTopics requires a Relay")
var errors: seq[string]
for shard in shards:
if not self.contentTopicSubs.hasKey(shard):
self.node.subscribe((kind: PubsubSub, topic: shard), nil).isOkOr:
errors.add("shard " & shard & ": " & error)
continue
self.contentTopicSubs[shard] = initHashSet[ContentTopic]()
if errors.len > 0:
return err("subscribeShard errors: " & errors.join("; "))
return ok()
proc getShardForContentTopic(
self: SubscriptionManager, topic: ContentTopic
): Result[PubsubTopic, string] =
if self.node.wakuAutoSharding.isSome():
let shardObj = ?self.node.wakuAutoSharding.get().getShard(topic)
return ok($shardObj)
return err("SubscriptionManager requires AutoSharding")
proc isSubscribed*(
self: SubscriptionManager, topic: ContentTopic
): Result[bool, string] =
let shard = ?self.getShardForContentTopic(topic)
return ok(
self.contentTopicSubs.hasKey(shard) and self.contentTopicSubs[shard].contains(topic)
)
proc isSubscribed*(
self: SubscriptionManager, shard: PubsubTopic, contentTopic: ContentTopic
): bool {.raises: [].} =
self.contentTopicSubs.withValue(shard, cTopics):
return cTopics[].contains(contentTopic)
return false
proc subscribe*(self: SubscriptionManager, topic: ContentTopic): Result[void, string] =
if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient):
return err("SubscriptionManager requires either Relay or Filter Client.")
let shard = ?self.getShardForContentTopic(topic)
if not isNil(self.node.wakuRelay) and not self.contentTopicSubs.hasKey(shard):
?self.subscribePubsubTopics(@[shard])
?self.addContentTopicInterest(shard, topic)
return ok()
proc unsubscribe*(
self: SubscriptionManager, topic: ContentTopic
): Result[void, string] =
if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient):
return err("SubscriptionManager requires either Relay or Filter Client.")
let shard = ?self.getShardForContentTopic(topic)
if self.isSubscribed(shard, topic):
?self.removeContentTopicInterest(shard, topic)
return ok()
# ---------------------------------------------------------------------------
# Edge Filter driver for the Logos Messaging API
#
# The SubscriptionManager absorbs natively the responsibility of using the
# Edge Filter protocol to effect subscriptions and message receipt for edge.
# ---------------------------------------------------------------------------
const EdgeFilterSubscribeTimeout = chronos.seconds(15)
## Timeout for a single filter subscribe/unsubscribe RPC to a service peer.
const EdgeFilterPingTimeout = chronos.seconds(5)
## Timeout for a filter ping health check.
const EdgeFilterLoopInterval = chronos.seconds(30)
## Interval for the edge filter health ping loop.
const EdgeFilterSubLoopDebounce = chronos.seconds(1)
## Debounce delay to coalesce rapid-fire wakeups into a single reconciliation pass.
type EdgeDialTask = object
peer: RemotePeerInfo
shard: PubsubTopic
topics: seq[ContentTopic]
proc updateShardHealth(
self: SubscriptionManager, shard: PubsubTopic, state: var EdgeFilterSubState
) =
## Recompute and emit health for a shard after its peer set changed.
let newHealth = toTopicHealth(state.peers.len)
if newHealth != state.currentHealth:
state.currentHealth = newHealth
EventShardTopicHealthChange.emit(self.node.brokerCtx, shard, newHealth)
proc removePeer(self: SubscriptionManager, shard: PubsubTopic, peerId: PeerId) =
## Remove a peer from edgeFilterSubStates for the given shard,
## update health, and wake the sub loop to dial a replacement.
## Best-effort unsubscribe so the service peer stops pushing to us.
self.edgeFilterSubStates.withValue(shard, state):
var peer: RemotePeerInfo
var found = false
for p in state.peers:
if p.peerId == peerId:
peer = p
found = true
break
if not found:
return
state.peers.keepItIf(it.peerId != peerId)
self.updateShardHealth(shard, state[])
self.edgeFilterWakeup.fire()
if not self.node.wakuFilterClient.isNil():
self.contentTopicSubs.withValue(shard, topics):
let ct = toSeq(topics[])
if ct.len > 0:
proc doUnsubscribe() {.async.} =
discard await self.node.wakuFilterClient.unsubscribe(peer, shard, ct)
asyncSpawn doUnsubscribe()
type SendChunkedFilterRpcKind = enum
FilterSubscribe
FilterUnsubscribe
proc sendChunkedFilterRpc(
self: SubscriptionManager,
peer: RemotePeerInfo,
shard: PubsubTopic,
topics: seq[ContentTopic],
kind: SendChunkedFilterRpcKind,
): Future[bool] {.async.} =
## Send a chunked filter subscribe or unsubscribe RPC. Returns true on
## success. On failure the peer is removed and false is returned.
try:
var i = 0
while i < topics.len:
let chunk =
topics[i ..< min(i + filter_protocol.MaxContentTopicsPerRequest, topics.len)]
let fut =
case kind
of FilterSubscribe:
self.node.wakuFilterClient.subscribe(peer, shard, chunk)
of FilterUnsubscribe:
self.node.wakuFilterClient.unsubscribe(peer, shard, chunk)
if not (await fut.withTimeout(EdgeFilterSubscribeTimeout)) or fut.read().isErr():
trace "sendChunkedFilterRpc: chunk failed",
op = kind, shard = shard, peer = peer.peerId
self.removePeer(shard, peer.peerId)
return false
i += filter_protocol.MaxContentTopicsPerRequest
except CatchableError as exc:
debug "sendChunkedFilterRpc: failed",
op = kind, shard = shard, peer = peer.peerId, err = exc.msg
self.removePeer(shard, peer.peerId)
return false
return true
proc syncFilterDeltas(
self: SubscriptionManager,
peer: RemotePeerInfo,
shard: PubsubTopic,
added: seq[ContentTopic],
removed: seq[ContentTopic],
) {.async.} =
## Push content topic changes (adds/removes) to an already-tracked peer.
if added.len > 0:
if not await self.sendChunkedFilterRpc(peer, shard, added, FilterSubscribe):
return
if removed.len > 0:
discard await self.sendChunkedFilterRpc(peer, shard, removed, FilterUnsubscribe)
proc dialFilterPeer(
self: SubscriptionManager,
peer: RemotePeerInfo,
shard: PubsubTopic,
contentTopics: seq[ContentTopic],
) {.async.} =
## Subscribe a new peer to all content topics on a shard and start tracking it.
self.edgeFilterSubStates.withValue(shard, state):
state.pendingPeers.incl(peer.peerId)
try:
if not await self.sendChunkedFilterRpc(peer, shard, contentTopics, FilterSubscribe):
return
self.edgeFilterSubStates.withValue(shard, state):
if state.peers.anyIt(it.peerId == peer.peerId):
trace "dialFilterPeer: peer already tracked, skipping duplicate",
shard = shard, peer = peer.peerId
return
state.peers.add(peer)
self.updateShardHealth(shard, state[])
trace "dialFilterPeer: successfully subscribed to all chunks",
shard = shard, peer = peer.peerId, totalPeers = state.peers.len
do:
trace "dialFilterPeer: shard removed while subscribing, discarding result",
shard = shard, peer = peer.peerId
finally:
self.edgeFilterSubStates.withValue(shard, state):
state.pendingPeers.excl(peer.peerId)
proc edgeFilterHealthLoop*(self: SubscriptionManager) {.async.} =
## Periodically pings all connected filter service peers to verify they are
## still alive at the application layer. Peers that fail the ping are removed.
while true:
await sleepAsync(EdgeFilterLoopInterval)
if self.node.wakuFilterClient.isNil():
warn "filter client is nil within edge filter health loop"
continue
var connected = initTable[PeerId, RemotePeerInfo]()
for state in self.edgeFilterSubStates.values:
for peer in state.peers:
if self.node.peerManager.switch.peerStore.isConnected(peer.peerId):
connected[peer.peerId] = peer
var alive = initHashSet[PeerId]()
if connected.len > 0:
var pingTasks: seq[(PeerId, Future[FilterSubscribeResult])]
for peer in connected.values:
pingTasks.add(
(peer.peerId, self.node.wakuFilterClient.ping(peer, EdgeFilterPingTimeout))
)
# extract future tasks from (PeerId, Future) tuples and await them
await allFutures(pingTasks.mapIt(it[1]))
for (peerId, task) in pingTasks:
if task.read().isOk():
alive.incl(peerId)
var changed = false
for shard, state in self.edgeFilterSubStates.mpairs:
let oldLen = state.peers.len
state.peers.keepItIf(it.peerId notin connected or alive.contains(it.peerId))
if state.peers.len < oldLen:
changed = true
self.updateShardHealth(shard, state)
trace "Edge Filter health degraded by Ping failure",
shard = shard, new = state.currentHealth
if changed:
self.edgeFilterWakeup.fire()
proc selectFilterCandidates(
self: SubscriptionManager, shard: PubsubTopic, exclude: HashSet[PeerId], needed: int
): seq[RemotePeerInfo] =
## Select filter service peer candidates for a shard.
# Start with every filter server peer that can serve the shard
var allCandidates = self.node.peerManager.selectPeers(
filter_common.WakuFilterSubscribeCodec, some(shard)
)
# Remove all already used in this shard or being dialed for it
allCandidates.keepItIf(it.peerId notin exclude)
# Collect peer IDs already tracked on other shards
var trackedOnOther = initHashSet[PeerId]()
for otherShard, otherState in self.edgeFilterSubStates.pairs:
if otherShard != shard:
for peer in otherState.peers:
trackedOnOther.incl(peer.peerId)
# Prefer peers we already have a connection to first, preserving shuffle
var candidates =
allCandidates.filterIt(it.peerId in trackedOnOther) &
allCandidates.filterIt(it.peerId notin trackedOnOther)
# We need to return 'needed' peers only
if candidates.len > needed:
candidates.setLen(needed)
return candidates
proc edgeFilterSubLoop*(self: SubscriptionManager) {.async.} =
## Reconciles filter subscriptions with the desired state from SubscriptionManager.
var lastSynced = initTable[PubsubTopic, HashSet[ContentTopic]]()
while true:
await self.edgeFilterWakeup.wait()
await sleepAsync(EdgeFilterSubLoopDebounce)
self.edgeFilterWakeup.clear()
trace "edgeFilterSubLoop: woke up"
if isNil(self.node.wakuFilterClient):
trace "edgeFilterSubLoop: wakuFilterClient is nil, skipping"
continue
let desired = self.contentTopicSubs
trace "edgeFilterSubLoop: desired state", numShards = desired.len
let allShards = toHashSet(toSeq(desired.keys)) + toHashSet(toSeq(lastSynced.keys))
# Step 1: read state across all shards at once and
# create a list of peer dial tasks and shard tracking to delete.
var dialTasks: seq[EdgeDialTask]
var shardsToDelete: seq[PubsubTopic]
for shard in allShards:
let currTopics = desired.getOrDefault(shard)
let prevTopics = lastSynced.getOrDefault(shard)
if shard notin self.edgeFilterSubStates:
self.edgeFilterSubStates[shard] =
EdgeFilterSubState(currentHealth: TopicHealth.UNHEALTHY)
let addedTopics = toSeq(currTopics - prevTopics)
let removedTopics = toSeq(prevTopics - currTopics)
self.edgeFilterSubStates.withValue(shard, state):
state.peers.keepItIf(
self.node.peerManager.switch.peerStore.isConnected(it.peerId)
)
state.pending.keepItIf(not it.finished)
if addedTopics.len > 0 or removedTopics.len > 0:
for peer in state.peers:
asyncSpawn self.syncFilterDeltas(peer, shard, addedTopics, removedTopics)
if currTopics.len == 0:
shardsToDelete.add(shard)
else:
self.updateShardHealth(shard, state[])
let needed = max(0, HealthyThreshold - state.peers.len - state.pending.len)
if needed > 0:
let tracked = state.peers.mapIt(it.peerId).toHashSet() + state.pendingPeers
let candidates = self.selectFilterCandidates(shard, tracked, needed)
let toDial = min(needed, candidates.len)
trace "edgeFilterSubLoop: shard reconciliation",
shard = shard,
num_peers = state.peers.len,
num_pending = state.pending.len,
num_needed = needed,
num_available = candidates.len,
toDial = toDial
for i in 0 ..< toDial:
dialTasks.add(
EdgeDialTask(
peer: candidates[i], shard: shard, topics: toSeq(currTopics)
)
)
# Step 2: execute deferred shard tracking deletion and dial tasks.
for shard in shardsToDelete:
self.edgeFilterSubStates.withValue(shard, state):
for fut in state.pending:
if not fut.finished:
await fut.cancelAndWait()
self.edgeFilterSubStates.del(shard)
for task in dialTasks:
let fut = self.dialFilterPeer(task.peer, task.shard, task.topics)
self.edgeFilterSubStates.withValue(task.shard, state):
state.pending.add(fut)
lastSynced = desired
proc startEdgeFilterLoops(self: SubscriptionManager): Result[void, string] =
## Start the edge filter orchestration loops.
## Caller must ensure this is only called in edge mode (relay nil, filter client present).
self.edgeFilterWakeup = newAsyncEvent()
self.peerEventListener = WakuPeerEvent.listen(
self.node.brokerCtx,
proc(evt: WakuPeerEvent) {.async: (raises: []), gcsafe.} =
if evt.kind == WakuPeerEventKind.EventDisconnected or
evt.kind == WakuPeerEventKind.EventMetadataUpdated:
self.edgeFilterWakeup.fire()
,
).valueOr:
return err("Failed to listen to peer events for edge filter: " & error)
self.edgeFilterSubLoopFut = self.edgeFilterSubLoop()
self.edgeFilterHealthLoopFut = self.edgeFilterHealthLoop()
return ok()
proc stopEdgeFilterLoops(self: SubscriptionManager) {.async: (raises: []).} =
## Stop the edge filter orchestration loops and clean up pending futures.
if not isNil(self.edgeFilterSubLoopFut):
await self.edgeFilterSubLoopFut.cancelAndWait()
self.edgeFilterSubLoopFut = nil
if not isNil(self.edgeFilterHealthLoopFut):
await self.edgeFilterHealthLoopFut.cancelAndWait()
self.edgeFilterHealthLoopFut = nil
for shard, state in self.edgeFilterSubStates:
for fut in state.pending:
if not fut.finished:
await fut.cancelAndWait()
await WakuPeerEvent.dropListener(self.node.brokerCtx, self.peerEventListener)
# ---------------------------------------------------------------------------
# SubscriptionManager Lifecycle (calls Edge behavior above)
#
# startSubscriptionManager and stopSubscriptionManager orchestrate both the
# core (relay) and edge (filter) paths, and register/clear broker providers.
# ---------------------------------------------------------------------------
proc startSubscriptionManager*(self: SubscriptionManager): Result[void, string] =
# Register edge filter broker providers. The shard/content health providers
# in WakuNode query these via the broker as a fallback when relay health is
# not available. If edge mode is not active, these providers simply return
# NOT_SUBSCRIBED / strength 0, which is harmless.
RequestEdgeShardHealth.setProvider(
self.node.brokerCtx,
proc(shard: PubsubTopic): Result[RequestEdgeShardHealth, string] =
self.edgeFilterSubStates.withValue(shard, state):
return ok(RequestEdgeShardHealth(health: state.currentHealth))
return ok(RequestEdgeShardHealth(health: TopicHealth.NOT_SUBSCRIBED)),
).isOkOr:
error "Can't set provider for RequestEdgeShardHealth", error = error
RequestEdgeFilterPeerCount.setProvider(
self.node.brokerCtx,
proc(): Result[RequestEdgeFilterPeerCount, string] =
var minPeers = high(int)
for state in self.edgeFilterSubStates.values:
minPeers = min(minPeers, state.peers.len)
if minPeers == high(int):
minPeers = 0
return ok(RequestEdgeFilterPeerCount(peerCount: minPeers)),
).isOkOr:
error "Can't set provider for RequestEdgeFilterPeerCount", error = error
if self.node.wakuRelay.isNil():
return self.startEdgeFilterLoops()
# Core mode: auto-subscribe relay to all shards in autosharding.
if self.node.wakuAutoSharding.isSome():
let autoSharding = self.node.wakuAutoSharding.get()
let clusterId = autoSharding.clusterId
let numShards = autoSharding.shardCountGenZero
if numShards > 0:
var clusterPubsubTopics = newSeqOfCap[PubsubTopic](numShards)
for i in 0 ..< numShards:
let shardObj = RelayShard(clusterId: clusterId, shardId: uint16(i))
clusterPubsubTopics.add(PubsubTopic($shardObj))
self.subscribePubsubTopics(clusterPubsubTopics).isOkOr:
error "Failed to auto-subscribe Relay to cluster shards: ", error = error
else:
info "SubscriptionManager has no AutoSharding configured; skipping auto-subscribe."
return ok()
proc stopSubscriptionManager*(self: SubscriptionManager) {.async: (raises: []).} =
if self.node.wakuRelay.isNil():
await self.stopEdgeFilterLoops()
RequestEdgeShardHealth.clearProvider(self.node.brokerCtx)
RequestEdgeFilterPeerCount.clearProvider(self.node.brokerCtx)

View File

@ -14,6 +14,7 @@ import
events/health_events, events/health_events,
events/peer_events, events/peer_events,
node/waku_node, node/waku_node,
node/node_telemetry,
node/peer_manager, node/peer_manager,
node/kernel_api, node/kernel_api,
node/health_monitor/online_monitor, node/health_monitor/online_monitor,

View File

@ -21,6 +21,7 @@ import
import import
../waku_node, ../waku_node,
../node_telemetry,
../../waku_core, ../../waku_core,
../../waku_core/topics/sharding, ../../waku_core/topics/sharding,
../../waku_filter_v2, ../../waku_filter_v2,

View File

@ -19,6 +19,7 @@ import
import import
../waku_node, ../waku_node,
../node_telemetry,
../../waku_peer_exchange, ../../waku_peer_exchange,
../../waku_core, ../../waku_core,
../peer_manager, ../peer_manager,

View File

@ -29,90 +29,18 @@ import
waku_store_sync, waku_store_sync,
waku_rln_relay, waku_rln_relay,
node/waku_node, node/waku_node,
node/subscription_manager,
node/peer_manager, node/peer_manager,
events/message_events, events/message_events,
] ]
export waku_relay.WakuRelayHandler export waku_relay.WakuRelayHandler
declarePublicHistogram waku_histogram_message_size,
"message size histogram in kB",
buckets = [
0.0, 1.0, 3.0, 5.0, 15.0, 50.0, 75.0, 100.0, 125.0, 150.0, 500.0, 700.0, 1000.0, Inf
]
logScope: logScope:
topics = "waku node relay api" topics = "waku node relay api"
## Waku relay ## Waku relay
proc registerRelayHandler(
node: WakuNode, topic: PubsubTopic, appHandler: WakuRelayHandler = nil
): bool =
## Registers the only handler for the given topic.
## Notice that this handler internally calls other handlers, such as filter,
## archive, etc, plus the handler provided by the application.
## Returns `true` if a mesh subscription was created or `false` if the relay
## was already subscribed to the topic.
let alreadySubscribed = node.wakuRelay.isSubscribed(topic)
if not appHandler.isNil():
if not alreadySubscribed or not node.legacyAppHandlers.hasKey(topic):
node.legacyAppHandlers[topic] = appHandler
else:
debug "Legacy appHandler already exists for active PubsubTopic, ignoring new handler",
topic = topic
if alreadySubscribed:
return false
proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
let msgSizeKB = msg.payload.len / 1000
waku_node_messages.inc(labelValues = ["relay"])
waku_histogram_message_size.observe(msgSizeKB)
proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
if node.wakuFilter.isNil():
return
await node.wakuFilter.handleMessage(topic, msg)
proc archiveHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
if node.wakuArchive.isNil():
return
await node.wakuArchive.handleMessage(topic, msg)
proc syncHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
if node.wakuStoreReconciliation.isNil():
return
node.wakuStoreReconciliation.messageIngress(topic, msg)
proc internalHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
MessageSeenEvent.emit(node.brokerCtx, topic, msg)
let uniqueTopicHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await traceHandler(topic, msg)
await filterHandler(topic, msg)
await archiveHandler(topic, msg)
await syncHandler(topic, msg)
await internalHandler(topic, msg)
# Call the legacy (kernel API) app handler if it exists.
# Normally, hasKey is false and the MessageSeenEvent bus (new API) is used instead.
# But we need to support legacy behavior (kernel API use), hence this.
# NOTE: We can delete `legacyAppHandlers` if instead we refactor WakuRelay to support multiple
# PubsubTopic handlers, since that's actually supported by libp2p PubSub (bigger refactor...)
if node.legacyAppHandlers.hasKey(topic) and not node.legacyAppHandlers[topic].isNil():
await node.legacyAppHandlers[topic](topic, msg)
node.wakuRelay.subscribe(topic, uniqueTopicHandler)
proc getTopicOfSubscriptionEvent( proc getTopicOfSubscriptionEvent(
node: WakuNode, subscription: SubscriptionEvent node: WakuNode, subscription: SubscriptionEvent
): Result[(PubsubTopic, Option[ContentTopic]), string] = ): Result[(PubsubTopic, Option[ContentTopic]), string] =
@ -143,21 +71,15 @@ proc subscribe*(
error "Invalid API call to `subscribe`. WakuRelay not mounted." error "Invalid API call to `subscribe`. WakuRelay not mounted."
return err("Invalid API call to `subscribe`. WakuRelay not mounted.") return err("Invalid API call to `subscribe`. WakuRelay not mounted.")
let (pubsubTopic, contentTopicOp) = getTopicOfSubscriptionEvent(node, subscription).valueOr: let (pubsubTopic, _) = getTopicOfSubscriptionEvent(node, subscription).valueOr:
error "Failed to decode subscription event", error = error error "Failed to decode subscription event", error = error
return err("Failed to decode subscription event: " & error) return err("Failed to decode subscription event: " & error)
if node.registerRelayHandler(pubsubTopic, handler): # strict version
info "subscribe", pubsubTopic, contentTopicOp #if contentTopicOp.isSome():
node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic)) # return
else: # node.subscriptionManager.subscribe(pubsubTopic, contentTopicOp.get(), handler)
if isNil(handler): return node.subscriptionManager.subscribeShard(pubsubTopic, handler)
warn "No-effect API call to subscribe. Already subscribed to topic", pubsubTopic
else:
info "subscribe (was already subscribed in the mesh; appHandler set)",
pubsubTopic = pubsubTopic
return ok()
proc unsubscribe*( proc unsubscribe*(
node: WakuNode, subscription: SubscriptionEvent node: WakuNode, subscription: SubscriptionEvent
@ -170,26 +92,14 @@ proc unsubscribe*(
error "Invalid API call to `unsubscribe`. WakuRelay not mounted." error "Invalid API call to `unsubscribe`. WakuRelay not mounted."
return err("Invalid API call to `unsubscribe`. WakuRelay not mounted.") return err("Invalid API call to `unsubscribe`. WakuRelay not mounted.")
let (pubsubTopic, contentTopicOp) = getTopicOfSubscriptionEvent(node, subscription).valueOr: let (pubsubTopic, _) = getTopicOfSubscriptionEvent(node, subscription).valueOr:
error "Failed to decode unsubscribe event", error = error error "Failed to decode unsubscribe event", error = error
return err("Failed to decode unsubscribe event: " & error) return err("Failed to decode unsubscribe event: " & error)
let hadHandler = node.legacyAppHandlers.hasKey(pubsubTopic) # strict version
if hadHandler: #if contentTopicOp.isSome():
node.legacyAppHandlers.del(pubsubTopic) # return node.subscriptionManager.unsubscribe(pubsubTopic, contentTopicOp.get())
return node.subscriptionManager.unsubscribeAll(pubsubTopic)
if node.wakuRelay.isSubscribed(pubsubTopic):
info "unsubscribe", pubsubTopic, contentTopicOp
node.wakuRelay.unsubscribe(pubsubTopic)
node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic))
else:
if not hadHandler:
warn "No-effect API call to `unsubscribe`. Was not subscribed", pubsubTopic
else:
info "unsubscribe (was not subscribed in the mesh; appHandler removed)",
pubsubTopic = pubsubTopic
return ok()
proc isSubscribed*( proc isSubscribed*(
node: WakuNode, subscription: SubscriptionEvent node: WakuNode, subscription: SubscriptionEvent

View File

@ -0,0 +1,27 @@
{.push raises: [].}
import metrics
declarePublicGauge waku_version,
"Waku version info (in git describe format)", ["version"]
declarePublicCounter waku_node_errors, "number of wakunode errors", ["type"]
declarePublicGauge waku_lightpush_peers, "number of lightpush peers"
declarePublicGauge waku_filter_peers, "number of filter peers"
declarePublicGauge waku_store_peers, "number of store peers"
declarePublicGauge waku_px_peers,
"number of peers (in the node's peerManager) supporting the peer exchange protocol"
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
declarePublicHistogram waku_histogram_message_size,
"message size histogram in kB",
buckets = [
0.0, 1.0, 3.0, 5.0, 15.0, 50.0, 75.0, 100.0, 125.0, 150.0, 500.0, 700.0, 1000.0, Inf
]
{.pop.}

116
waku/node/node_types.nim Normal file
View File

@ -0,0 +1,116 @@
{.push raises: [].}
import
std/[options, tables, sets],
chronos,
results,
eth/keys,
bearssl/rand,
eth/p2p/discoveryv5/enr,
libp2p/crypto/crypto,
libp2p/[multiaddress, multicodec],
libp2p/protocols/ping,
libp2p/protocols/mix/mix_protocol,
brokers/broker_context
import
waku/[
waku_core,
waku_relay,
waku_archive,
waku_store/protocol as store,
waku_store/client as store_client,
waku_store/resume,
waku_store_sync,
waku_filter_v2,
waku_filter_v2/client as filter_client,
waku_metadata,
waku_rendezvous/protocol,
waku_rendezvous/client as rendezvous_client,
waku_lightpush_legacy/client as legacy_lightpush_client,
waku_lightpush_legacy as legacy_lightpush_protocol,
waku_lightpush/client as lightpush_client,
waku_lightpush as lightpush_protocol,
waku_peer_exchange,
waku_rln_relay,
waku_mix,
common/rate_limit/setting,
discovery/waku_kademlia,
net/bound_ports,
events/peer_events,
],
./peer_manager,
./health_monitor/topic_health
# key and crypto modules different
type
# TODO: Move to application instance (e.g., `WakuNode2`)
WakuInfo* = object # NOTE One for simplicity, can extend later as needed
listenAddresses*: seq[string]
enrUri*: string #multiaddrStrings*: seq[string]
mixPubKey*: Option[string]
# NOTE based on Eth2Node in NBC eth2_network.nim
WakuNode* = ref object
peerManager*: PeerManager
switch*: Switch
wakuRelay*: WakuRelay
wakuArchive*: waku_archive.WakuArchive
wakuStore*: store.WakuStore
wakuStoreClient*: store_client.WakuStoreClient
wakuStoreResume*: StoreResume
wakuStoreReconciliation*: SyncReconciliation
wakuStoreTransfer*: SyncTransfer
wakuFilter*: waku_filter_v2.WakuFilter
wakuFilterClient*: filter_client.WakuFilterClient
wakuRlnRelay*: WakuRLNRelay
wakuLegacyLightPush*: WakuLegacyLightPush
wakuLegacyLightpushClient*: WakuLegacyLightPushClient
wakuLightPush*: WakuLightPush
wakuLightpushClient*: WakuLightPushClient
wakuPeerExchange*: WakuPeerExchange
wakuPeerExchangeClient*: WakuPeerExchangeClient
wakuMetadata*: WakuMetadata
wakuAutoSharding*: Option[Sharding]
enr*: enr.Record
libp2pPing*: Ping
rng*: ref rand.HmacDrbgContext
brokerCtx*: BrokerContext
wakuRendezvous*: WakuRendezVous
wakuRendezvousClient*: rendezvous_client.WakuRendezVousClient
announcedAddresses*: seq[MultiAddress]
extMultiAddrsOnly*: bool # When true, skip automatic IP address replacement
started*: bool # Indicates that node has started listening
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
rateLimitSettings*: ProtocolRateLimitSettings
legacyAppHandlers*: Table[PubsubTopic, WakuRelayHandler]
## Kernel API Relay appHandlers (if any)
subscriptionManager*: SubscriptionManager
wakuMix*: WakuMix
kademliaDiscoveryLoop*: Future[void]
wakuKademlia*: WakuKademlia
ports*: BoundPorts
ShardSubscription* = object
contentTopics*: HashSet[ContentTopic]
directShardSub*: bool
## shard subscribed directly (PubsubSub), independent of content-topic interest
EdgeFilterSubState* = object
peers*: seq[RemotePeerInfo]
pending*: seq[Future[void]]
pendingPeers*: HashSet[PeerId]
currentHealth*: TopicHealth
SubscriptionManager* = ref object of RootObj
node*: WakuNode
shards*: Table[PubsubTopic, ShardSubscription]
edgeFilterSubStates*: Table[PubsubTopic, EdgeFilterSubState]
edgeFilterWakeup*: AsyncEvent
edgeFilterSubLoopFut*: Future[void]
edgeFilterConnectionLoopFut*: Future[void]
peerEventListener*: WakuPeerEventListener
ownsEdgeShardHealthProvider*: bool
ownsEdgeFilterPeerCountProvider*: bool
{.pop.}

View File

@ -0,0 +1,708 @@
import std/[sequtils, sets, tables, options], chronos, chronicles, metrics, results
import libp2p/[peerid, peerinfo]
import brokers/broker_context
import
waku/[
waku_core,
waku_core/topics/sharding,
node/node_types,
node/node_telemetry,
waku_relay,
waku_archive,
waku_store_sync,
waku_filter_v2/common as filter_common,
waku_filter_v2/client as filter_client,
waku_filter_v2/protocol as filter_protocol,
events/health_events,
events/message_events,
events/peer_events,
requests/health_requests,
node/peer_manager,
node/health_monitor/topic_health,
node/health_monitor/connection_status,
]
{.push raises: [].}
proc registerRelayHandler(
node: WakuNode, shard: PubsubTopic, appHandler: WakuRelayHandler = nil
): bool =
## Returns true iff we did a new (and only) subscription for this shard in GossipSub.
let alreadySubscribed = node.wakuRelay.isSubscribed(shard)
if not appHandler.isNil():
if not alreadySubscribed or not node.legacyAppHandlers.hasKey(shard):
node.legacyAppHandlers[shard] = appHandler
else:
debug "Legacy appHandler already exists for active PubsubTopic, ignoring new handler",
topic = shard
if alreadySubscribed:
return false
proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
let msgSizeKB = msg.payload.len / 1000
waku_node_messages.inc(labelValues = ["relay"])
waku_histogram_message_size.observe(msgSizeKB)
proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
if node.wakuFilter.isNil():
return
await node.wakuFilter.handleMessage(topic, msg)
proc archiveHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
if node.wakuArchive.isNil():
return
await node.wakuArchive.handleMessage(topic, msg)
proc syncHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
if node.wakuStoreReconciliation.isNil():
return
node.wakuStoreReconciliation.messageIngress(topic, msg)
proc internalHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
MessageSeenEvent.emit(node.brokerCtx, topic, msg)
let uniqueTopicHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await traceHandler(topic, msg)
await filterHandler(topic, msg)
await archiveHandler(topic, msg)
await syncHandler(topic, msg)
await internalHandler(topic, msg)
if node.legacyAppHandlers.hasKey(topic) and not node.legacyAppHandlers[topic].isNil():
await node.legacyAppHandlers[topic](topic, msg)
node.wakuRelay.subscribe(shard, uniqueTopicHandler)
return true
proc unregisterRelayHandler(node: WakuNode, shard: PubsubTopic): bool =
## Returns true iff we had a subscription for this shard in GossipSub and it was removed.
if node.legacyAppHandlers.hasKey(shard):
node.legacyAppHandlers.del(shard)
if node.wakuRelay.isSubscribed(shard):
node.wakuRelay.unsubscribe(shard)
return true
return false
proc doRelaySubscribe(
node: WakuNode, shard: PubsubTopic, appHandler: WakuRelayHandler = nil
): bool =
## Subscribes the node to a shard.
## Returns true if we actually subscribed (transitioned from unsubscribed to subscribed).
## Emit the shard subscription event if we actually subscribed.
let installed = node.registerRelayHandler(shard, appHandler)
if installed:
node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: shard))
return installed
proc doRelayUnsubscribe(node: WakuNode, shard: PubsubTopic): bool =
## Unsubscribes the node from a shard.
## Returns true if we actually unsubscribed (transitioned from subscribed to unsubscribed).
## Emit the shard unsubscription event if we actually unsubscribed.
let unsubscribed = node.unregisterRelayHandler(shard)
if unsubscribed:
node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: shard))
return unsubscribed
proc new*(T: type SubscriptionManager, node: WakuNode): T =
T(
node: node,
shards: initTable[PubsubTopic, ShardSubscription](),
edgeFilterSubStates: initTable[PubsubTopic, EdgeFilterSubState](),
edgeFilterWakeup: newAsyncEvent(),
)
func wanted(entry: ShardSubscription): bool =
## True if the shard has content-topic interest or a direct subscription.
return entry.contentTopics.len > 0 or entry.directShardSub
proc isContentSubscribed*(
self: SubscriptionManager, shard: PubsubTopic, contentTopic: ContentTopic
): bool =
self.shards.withValue(shard, sub):
return contentTopic in sub.contentTopics
return false
iterator subscribedContentTopics*(
self: SubscriptionManager
): (PubsubTopic, HashSet[ContentTopic]) =
## Yields each shard with its non-empty content-topic set.
for shard, sub in self.shards.pairs:
if sub.contentTopics.len > 0:
yield (shard, sub.contentTopics)
func toTopicHealth*(peersCount: int): TopicHealth =
if peersCount >= HealthyThreshold:
return TopicHealth.SUFFICIENTLY_HEALTHY
elif peersCount > 0:
return TopicHealth.MINIMALLY_HEALTHY
else:
return TopicHealth.UNHEALTHY
proc edgeFilterPeerCount*(self: SubscriptionManager, shard: PubsubTopic): int =
self.edgeFilterSubStates.withValue(shard, state):
return state.peers.len
return 0
proc getShardForContentTopic(
self: SubscriptionManager, topic: ContentTopic
): Result[PubsubTopic, string] =
if self.node.wakuAutoSharding.isSome():
let shardObj = ?self.node.wakuAutoSharding.get().getShard(topic)
return ok($shardObj)
return err("autosharding is not configured; pass an explicit shard")
proc subscribeShard*(
self: SubscriptionManager, shard: PubsubTopic, handler: WakuRelayHandler = nil
): Result[void, string] =
## Subscribes to the shard directly and joins the relay mesh.
var added = false
self.shards.withValue(shard, entry):
if not entry.directShardSub:
entry.directShardSub = true
added = true
do:
self.shards[shard] = ShardSubscription(
contentTopics: initHashSet[ContentTopic](), directShardSub: true
)
added = true
if added:
self.edgeFilterWakeup.fire()
if not isNil(self.node.wakuRelay):
discard self.node.doRelaySubscribe(shard, handler)
return ok()
proc unsubscribeShard*(
self: SubscriptionManager, shard: PubsubTopic
): Result[void, string] =
## Drops the direct shard subscription; unsubscribes the mesh if no content topic wants it.
var removed = false
var shardEmpty = false
self.shards.withValue(shard, entry):
if entry.directShardSub:
entry.directShardSub = false
removed = true
shardEmpty = not entry[].wanted()
if removed:
self.edgeFilterWakeup.fire()
if shardEmpty:
self.shards.del(shard)
if not isNil(self.node.wakuRelay):
discard self.node.doRelayUnsubscribe(shard)
return ok()
proc subscribe*(
self: SubscriptionManager,
shard: PubsubTopic,
contentTopic: ContentTopic,
handler: WakuRelayHandler = nil,
): Result[void, string] =
## Adds content-topic interest on the shard and joins the relay mesh.
var added = false
self.shards.withValue(shard, entry):
if contentTopic notin entry.contentTopics:
entry.contentTopics.incl(contentTopic)
added = true
do:
var entry = ShardSubscription(contentTopics: initHashSet[ContentTopic]())
entry.contentTopics.incl(contentTopic)
self.shards[shard] = entry
added = true
if added:
self.edgeFilterWakeup.fire()
if not isNil(self.node.wakuRelay):
discard self.node.doRelaySubscribe(shard, handler)
return ok()
proc unsubscribe*(
self: SubscriptionManager, shard: PubsubTopic, contentTopic: ContentTopic
): Result[void, string] =
## Drops content-topic interest on the shard; unsubscribes the mesh if nothing else wants it.
var removed = false
var shardEmpty = false
self.shards.withValue(shard, entry):
if contentTopic in entry.contentTopics:
entry.contentTopics.excl(contentTopic)
removed = true
shardEmpty = not entry[].wanted()
if removed:
self.edgeFilterWakeup.fire()
if shardEmpty:
self.shards.del(shard)
if not isNil(self.node.wakuRelay):
discard self.node.doRelayUnsubscribe(shard)
return ok()
proc subscribe*(self: SubscriptionManager, topic: ContentTopic): Result[void, string] =
## Subscribes to a content topic, resolving its shard via autosharding.
let shard = ?self.getShardForContentTopic(topic)
return self.subscribe(shard, topic)
proc unsubscribe*(
self: SubscriptionManager, topic: ContentTopic
): Result[void, string] =
## Unsubscribes from a content topic, resolving its shard via autosharding.
let shard = ?self.getShardForContentTopic(topic)
return self.unsubscribe(shard, topic)
proc unsubscribeAll*(
self: SubscriptionManager, shard: PubsubTopic
): Result[void, string] =
## Drops every content topic on the shard, then the direct subscription.
var snapshot: seq[ContentTopic]
self.shards.withValue(shard, sub):
snapshot = toSeq(sub.contentTopics)
for contentTopic in snapshot:
?self.unsubscribe(shard, contentTopic)
return self.unsubscribeShard(shard)
proc isSubscribed*(
self: SubscriptionManager, topic: ContentTopic
): Result[bool, string] =
let shard = ?self.getShardForContentTopic(topic)
return ok(self.isContentSubscribed(shard, topic))
proc subscribeAllAutoshards*(self: SubscriptionManager): Result[void, string] =
## Subscribes the relay to every shard in the configured autosharding cluster.
if self.node.wakuRelay.isNil() or self.node.wakuAutoSharding.isNone():
return ok()
let autoSharding = self.node.wakuAutoSharding.get()
let numShards = autoSharding.shardCountGenZero
if numShards == 0:
return ok()
for i in 0'u32 ..< numShards:
let shardObj = RelayShard(clusterId: autoSharding.clusterId, shardId: uint16(i))
self.subscribeShard(PubsubTopic($shardObj)).isOkOr:
error "failed to auto-subscribe relay to cluster shard",
shard = $shardObj, error = error
ok()
{.pop.}
const EdgeFilterSubscribeTimeout = chronos.seconds(15)
## Timeout for a single filter subscribe/unsubscribe RPC to a service peer.
const EdgeFilterPingTimeout = chronos.seconds(5)
## Timeout for a filter ping health check.
const EdgeFilterLoopInterval = chronos.seconds(30)
## Interval for the edge filter health ping loop.
const EdgeFilterSubLoopDebounce = chronos.seconds(1)
## Debounce delay to coalesce rapid-fire wakeups into a single reconciliation pass.
type EdgeDialTask = object
peer: RemotePeerInfo
shard: PubsubTopic
topics: seq[ContentTopic]
proc updateShardHealth(
self: SubscriptionManager, shard: PubsubTopic, state: var EdgeFilterSubState
) =
## Recompute and emit health for a shard after its peer set changed.
let newHealth = toTopicHealth(state.peers.len)
if newHealth != state.currentHealth:
state.currentHealth = newHealth
EventShardTopicHealthChange.emit(self.node.brokerCtx, shard, newHealth)
proc removePeer(self: SubscriptionManager, shard: PubsubTopic, peerId: PeerId) =
## Remove a peer from edgeFilterSubStates for the given shard,
## update health, and wake the sub loop to dial a replacement.
## Best-effort unsubscribe so the service peer stops pushing to us.
self.edgeFilterSubStates.withValue(shard, state):
var idx = -1
for i, p in state.peers:
if p.peerId == peerId:
idx = i
break
if idx < 0:
return
let peer = state.peers[idx]
state.peers.del(idx)
self.updateShardHealth(shard, state[])
self.edgeFilterWakeup.fire()
if not self.node.wakuFilterClient.isNil():
self.shards.withValue(shard, sub):
let ct = toSeq(sub.contentTopics)
if ct.len > 0:
proc doUnsubscribe() {.async.} =
discard await self.node.wakuFilterClient.unsubscribe(peer, shard, ct)
asyncSpawn doUnsubscribe()
type SendChunkedFilterRpcKind = enum
FilterSubscribe
FilterUnsubscribe
proc sendChunkedFilterRpc(
self: SubscriptionManager,
peer: RemotePeerInfo,
shard: PubsubTopic,
topics: seq[ContentTopic],
kind: SendChunkedFilterRpcKind,
): Future[bool] {.async.} =
## Send a chunked filter subscribe or unsubscribe RPC. Returns true on
## success. On failure the peer is removed and false is returned.
try:
var i = 0
while i < topics.len:
let chunk =
topics[i ..< min(i + filter_protocol.MaxContentTopicsPerRequest, topics.len)]
let fut =
case kind
of FilterSubscribe:
self.node.wakuFilterClient.subscribe(peer, shard, chunk)
of FilterUnsubscribe:
self.node.wakuFilterClient.unsubscribe(peer, shard, chunk)
if not (await fut.withTimeout(EdgeFilterSubscribeTimeout)) or fut.read().isErr():
trace "sendChunkedFilterRpc: chunk failed",
op = kind, shard = shard, peer = peer.peerId
self.removePeer(shard, peer.peerId)
return false
i += filter_protocol.MaxContentTopicsPerRequest
except CatchableError as exc:
debug "sendChunkedFilterRpc: failed",
op = kind, shard = shard, peer = peer.peerId, err = exc.msg
self.removePeer(shard, peer.peerId)
return false
return true
proc syncFilterDeltas(
self: SubscriptionManager,
peer: RemotePeerInfo,
shard: PubsubTopic,
added: seq[ContentTopic],
removed: seq[ContentTopic],
) {.async.} =
## Push content topic changes (adds/removes) to an already-tracked peer.
if added.len > 0:
if not await self.sendChunkedFilterRpc(peer, shard, added, FilterSubscribe):
return
if removed.len > 0:
discard await self.sendChunkedFilterRpc(peer, shard, removed, FilterUnsubscribe)
proc dialFilterPeer(
self: SubscriptionManager,
peer: RemotePeerInfo,
shard: PubsubTopic,
contentTopics: seq[ContentTopic],
) {.async.} =
## Subscribe a new peer to all content topics on a shard and start tracking it.
self.edgeFilterSubStates.withValue(shard, state):
state.pendingPeers.incl(peer.peerId)
try:
if not await self.sendChunkedFilterRpc(peer, shard, contentTopics, FilterSubscribe):
return
self.edgeFilterSubStates.withValue(shard, state):
if state.peers.anyIt(it.peerId == peer.peerId):
trace "dialFilterPeer: peer already tracked, skipping duplicate",
shard = shard, peer = peer.peerId
return
state.peers.add(peer)
self.updateShardHealth(shard, state[])
trace "dialFilterPeer: successfully subscribed to all chunks",
shard = shard, peer = peer.peerId, totalPeers = state.peers.len
do:
trace "dialFilterPeer: shard removed while subscribing, discarding result",
shard = shard, peer = peer.peerId
finally:
self.edgeFilterSubStates.withValue(shard, state):
state.pendingPeers.excl(peer.peerId)
proc edgeFilterConnectionLoop(self: SubscriptionManager) {.async.} =
## Periodically pings all tracked filter service peers to verify they are
## still alive at the application layer. Peers that fail the ping are removed.
while true:
await sleepAsync(EdgeFilterLoopInterval)
if self.node.wakuFilterClient.isNil():
warn "filter client is nil within edge filter connection loop"
continue
var connected = initTable[PeerId, RemotePeerInfo]()
for state in self.edgeFilterSubStates.values:
for peer in state.peers:
if self.node.peerManager.switch.peerStore.isConnected(peer.peerId):
connected[peer.peerId] = peer
var alive = initHashSet[PeerId]()
if connected.len > 0:
var pingTasks: seq[(PeerId, Future[FilterSubscribeResult])]
for peer in connected.values:
pingTasks.add(
(peer.peerId, self.node.wakuFilterClient.ping(peer, EdgeFilterPingTimeout))
)
await allFutures(pingTasks.mapIt(it[1]))
for (peerId, task) in pingTasks:
if task.read().isOk():
alive.incl(peerId)
var changed = false
for shard, state in self.edgeFilterSubStates.mpairs:
let oldLen = state.peers.len
state.peers.keepItIf(it.peerId notin connected or alive.contains(it.peerId))
if state.peers.len < oldLen:
changed = true
self.updateShardHealth(shard, state)
trace "Edge Filter health degraded by Ping failure",
shard = shard, new = state.currentHealth
if changed:
self.edgeFilterWakeup.fire()
proc selectFilterCandidates(
self: SubscriptionManager, shard: PubsubTopic, exclude: HashSet[PeerId], needed: int
): seq[RemotePeerInfo] =
## Select filter service peer candidates for a shard.
# Start with every filter server peer that can serve the shard
var allCandidates = self.node.peerManager.selectPeers(
filter_common.WakuFilterSubscribeCodec, some(shard)
)
# Remove all already used in this shard or being dialed for it
allCandidates.keepItIf(it.peerId notin exclude)
# Collect peer IDs already tracked on other shards
var trackedOnOther = initHashSet[PeerId]()
for otherShard, otherState in self.edgeFilterSubStates.pairs:
if otherShard != shard:
for peer in otherState.peers:
trackedOnOther.incl(peer.peerId)
# Prefer peers we already have a connection to first, preserving shuffle
var candidates =
allCandidates.filterIt(it.peerId in trackedOnOther) &
allCandidates.filterIt(it.peerId notin trackedOnOther)
# We need to return 'needed' peers only
if candidates.len > needed:
candidates.setLen(needed)
return candidates
proc edgeFilterSubLoop(self: SubscriptionManager) {.async.} =
## Reconciles filter subscriptions with the desired state from SubscriptionManager.
var lastSynced = initTable[PubsubTopic, HashSet[ContentTopic]]()
while true:
await self.edgeFilterWakeup.wait()
await sleepAsync(EdgeFilterSubLoopDebounce)
self.edgeFilterWakeup.clear()
trace "edgeFilterSubLoop: woke up"
if isNil(self.node.wakuFilterClient):
trace "edgeFilterSubLoop: wakuFilterClient is nil, skipping"
continue
var newSynced = initTable[PubsubTopic, HashSet[ContentTopic]]()
var allShards: HashSet[PubsubTopic]
for shard, sub in self.shards.pairs:
if sub.contentTopics.len > 0:
newSynced[shard] = sub.contentTopics
allShards.incl(shard)
for shard in lastSynced.keys:
allShards.incl(shard)
trace "edgeFilterSubLoop: desired state", numShards = newSynced.len
# Step 1: read state across all shards at once and
# create a list of peer dial tasks and shard tracking to delete.
var dialTasks: seq[EdgeDialTask]
var shardsToDelete: seq[PubsubTopic]
for shard in allShards:
# Compute added/removed deltas via direct iteration; no HashSet copies.
var addedTopics: seq[ContentTopic]
var removedTopics: seq[ContentTopic]
newSynced.withValue(shard, curr):
lastSynced.withValue(shard, prev):
for t in curr[]:
if t notin prev[]:
addedTopics.add(t)
for t in prev[]:
if t notin curr[]:
removedTopics.add(t)
do:
for t in curr[]:
addedTopics.add(t)
do:
lastSynced.withValue(shard, prev):
for t in prev[]:
removedTopics.add(t)
discard self.edgeFilterSubStates.mgetOrPut(
shard, EdgeFilterSubState(currentHealth: TopicHealth.UNHEALTHY)
)
self.edgeFilterSubStates.withValue(shard, state):
state.peers.keepItIf(
self.node.peerManager.switch.peerStore.isConnected(it.peerId)
)
state.pending.keepItIf(not it.finished)
if addedTopics.len > 0 or removedTopics.len > 0:
for peer in state.peers:
asyncSpawn self.syncFilterDeltas(peer, shard, addedTopics, removedTopics)
if shard notin newSynced:
shardsToDelete.add(shard)
else:
self.updateShardHealth(shard, state[])
let needed = max(0, HealthyThreshold - state.peers.len - state.pending.len)
if needed > 0:
var tracked: HashSet[PeerId]
for p in state.peers:
tracked.incl(p.peerId)
for p in state.pendingPeers:
tracked.incl(p)
let candidates = self.selectFilterCandidates(shard, tracked, needed)
let toDial = min(needed, candidates.len)
trace "edgeFilterSubLoop: shard reconciliation",
shard = shard,
num_peers = state.peers.len,
num_pending = state.pending.len,
num_needed = needed,
num_available = candidates.len,
toDial = toDial
var dialTopics: seq[ContentTopic]
newSynced.withValue(shard, curr):
dialTopics = toSeq(curr[])
for i in 0 ..< toDial:
dialTasks.add(
EdgeDialTask(peer: candidates[i], shard: shard, topics: dialTopics)
)
# Step 2: execute deferred shard tracking deletion and dial tasks.
for shard in shardsToDelete:
self.edgeFilterSubStates.withValue(shard, state):
for fut in state.pending:
if not fut.finished:
await fut.cancelAndWait()
self.edgeFilterSubStates.del(shard)
for task in dialTasks:
let fut = self.dialFilterPeer(task.peer, task.shard, task.topics)
self.edgeFilterSubStates.withValue(task.shard, state):
state.pending.add(fut)
lastSynced = newSynced
proc startEdgeFilterLoops(self: SubscriptionManager): Result[void, string] =
## Start the edge filter orchestration loops.
## Caller must ensure this is only called in edge mode (relay nil, filter client present).
self.peerEventListener = WakuPeerEvent.listen(
self.node.brokerCtx,
proc(evt: WakuPeerEvent) {.async: (raises: []), gcsafe.} =
if evt.kind == WakuPeerEventKind.EventDisconnected:
# We know a peer is gone, so if it was a service filter peer for this
# edge node, remove it from the list of service filter peers for each
# shard it served and re-evaluate shard health for the affected shards.
for shard, state in self.edgeFilterSubStates.mpairs:
let oldLen = state.peers.len
state.peers.keepItIf(it.peerId != evt.peerId)
if state.peers.len < oldLen:
self.updateShardHealth(shard, state)
self.edgeFilterWakeup.fire()
elif evt.kind == WakuPeerEventKind.EventMetadataUpdated:
self.edgeFilterWakeup.fire(),
).valueOr:
return err("Failed to listen to peer events for edge filter: " & error)
self.edgeFilterSubLoopFut = self.edgeFilterSubLoop()
self.edgeFilterConnectionLoopFut = self.edgeFilterConnectionLoop()
return ok()
proc stopEdgeFilterLoops(self: SubscriptionManager) {.async: (raises: []).} =
## Stop the edge filter orchestration loops and clean up pending futures.
if not isNil(self.edgeFilterSubLoopFut):
await self.edgeFilterSubLoopFut.cancelAndWait()
self.edgeFilterSubLoopFut = nil
if not isNil(self.edgeFilterConnectionLoopFut):
await self.edgeFilterConnectionLoopFut.cancelAndWait()
self.edgeFilterConnectionLoopFut = nil
for shard, state in self.edgeFilterSubStates:
for fut in state.pending:
if not fut.finished:
await fut.cancelAndWait()
await WakuPeerEvent.dropListener(self.node.brokerCtx, self.peerEventListener)
proc start*(self: SubscriptionManager): Result[void, string] =
let edgeShardHealthRes = RequestEdgeShardHealth.setProvider(
self.node.brokerCtx,
proc(shard: PubsubTopic): Result[RequestEdgeShardHealth, string] =
self.edgeFilterSubStates.withValue(shard, state):
return ok(RequestEdgeShardHealth(health: state.currentHealth))
return ok(RequestEdgeShardHealth(health: TopicHealth.NOT_SUBSCRIBED)),
)
self.ownsEdgeShardHealthProvider = edgeShardHealthRes.isOk()
if edgeShardHealthRes.isErr():
error "Can't set provider for RequestEdgeShardHealth",
error = edgeShardHealthRes.error
let edgeFilterPeerCountRes = RequestEdgeFilterPeerCount.setProvider(
self.node.brokerCtx,
proc(): Result[RequestEdgeFilterPeerCount, string] =
var minPeers = high(int)
for state in self.edgeFilterSubStates.values:
minPeers = min(minPeers, state.peers.len)
if minPeers == high(int):
minPeers = 0
return ok(RequestEdgeFilterPeerCount(peerCount: minPeers)),
)
self.ownsEdgeFilterPeerCountProvider = edgeFilterPeerCountRes.isOk()
if edgeFilterPeerCountRes.isErr():
error "Can't set provider for RequestEdgeFilterPeerCount",
error = edgeFilterPeerCountRes.error
# Start Edge workers only when we are in Edge mode (relay not mounted)
# AND the filter client is mounted (otherwise the loops have nothing
# to talk to and just spam "filter client is nil" warnings).
if self.node.wakuRelay.isNil() and not self.node.wakuFilterClient.isNil():
return self.startEdgeFilterLoops()
return ok()
proc stop*(self: SubscriptionManager) {.async: (raises: []).} =
# Stop Edge workers if we started them in `start` (Edge mode + filter client).
if self.node.wakuRelay.isNil() and not self.node.wakuFilterClient.isNil():
await self.stopEdgeFilterLoops()
# Only clear providers we actually registered: another SubscriptionManager
# sharing this brokerCtx may have won the race, and clearing its provider
# would leave the broker silently provider-less.
if self.ownsEdgeShardHealthProvider:
RequestEdgeShardHealth.clearProvider(self.node.brokerCtx)
self.ownsEdgeShardHealthProvider = false
if self.ownsEdgeFilterPeerCountProvider:
RequestEdgeFilterPeerCount.clearProvider(self.node.brokerCtx)
self.ownsEdgeFilterPeerCountProvider = false

View File

@ -4,6 +4,7 @@ import chronicles, chronos, metrics, metrics/chronos_httpserver
import import
waku/[net/auto_port, waku_rln_relay/protocol_metrics as rln_metrics, utils/collector], waku/[net/auto_port, waku_rln_relay/protocol_metrics as rln_metrics, utils/collector],
./peer_manager, ./peer_manager,
./node_telemetry,
./waku_node ./waku_node
const LogInterval = 10.minutes const LogInterval = 10.minutes

View File

@ -60,23 +60,14 @@ import
requests/health_requests, requests/health_requests,
events/health_events, events/health_events,
events/message_events, events/message_events,
events/peer_events,
], ],
waku/discovery/waku_kademlia, waku/discovery/waku_kademlia,
waku/net/[bound_ports, net_config], waku/net/[bound_ports, net_config],
./peer_manager, ./peer_manager,
./health_monitor/health_status, ./health_monitor/health_status,
./health_monitor/topic_health ./health_monitor/topic_health,
./node_telemetry
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
declarePublicGauge waku_version,
"Waku version info (in git describe format)", ["version"]
declarePublicCounter waku_node_errors, "number of wakunode errors", ["type"]
declarePublicGauge waku_lightpush_peers, "number of lightpush peers"
declarePublicGauge waku_filter_peers, "number of filter peers"
declarePublicGauge waku_store_peers, "number of store peers"
declarePublicGauge waku_px_peers,
"number of peers (in the node's peerManager) supporting the peer exchange protocol"
logScope: logScope:
topics = "waku node" topics = "waku node"
@ -94,53 +85,10 @@ const clientId* = "Nimbus Waku v2 node"
const WakuNodeVersionString* = "version / git commit hash: " & git_version const WakuNodeVersionString* = "version / git commit hash: " & git_version
# key and crypto modules different import ./node_types
type export node_types
# TODO: Move to application instance (e.g., `WakuNode2`)
WakuInfo* = object # NOTE One for simplicity, can extend later as needed
listenAddresses*: seq[string]
enrUri*: string #multiaddrStrings*: seq[string]
mixPubKey*: Option[string]
# NOTE based on Eth2Node in NBC eth2_network.nim import ./subscription_manager
WakuNode* = ref object
peerManager*: PeerManager
switch*: Switch
wakuRelay*: WakuRelay
wakuArchive*: waku_archive.WakuArchive
wakuStore*: store.WakuStore
wakuStoreClient*: store_client.WakuStoreClient
wakuStoreResume*: StoreResume
wakuStoreReconciliation*: SyncReconciliation
wakuStoreTransfer*: SyncTransfer
wakuFilter*: waku_filter_v2.WakuFilter
wakuFilterClient*: filter_client.WakuFilterClient
wakuRlnRelay*: WakuRLNRelay
wakuLegacyLightPush*: WakuLegacyLightPush
wakuLegacyLightpushClient*: WakuLegacyLightPushClient
wakuLightPush*: WakuLightPush
wakuLightpushClient*: WakuLightPushClient
wakuPeerExchange*: WakuPeerExchange
wakuPeerExchangeClient*: WakuPeerExchangeClient
wakuMetadata*: WakuMetadata
wakuAutoSharding*: Option[Sharding]
enr*: enr.Record
libp2pPing*: Ping
rng*: ref rand.HmacDrbgContext
brokerCtx*: BrokerContext
wakuRendezvous*: WakuRendezVous
wakuRendezvousClient*: rendezvous_client.WakuRendezVousClient
announcedAddresses*: seq[MultiAddress]
extMultiAddrsOnly*: bool # When true, skip automatic IP address replacement
started*: bool # Indicates that node has started listening
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
rateLimitSettings*: ProtocolRateLimitSettings
legacyAppHandlers*: Table[PubsubTopic, WakuRelayHandler]
## Kernel API Relay appHandlers (if any)
wakuMix*: WakuMix
kademliaDiscoveryLoop*: Future[void]
wakuKademlia*: WakuKademlia
ports*: BoundPorts
proc deduceRelayShard( proc deduceRelayShard(
node: WakuNode, node: WakuNode,
@ -230,6 +178,8 @@ proc new*(
peerManager.setShardGetter(node.getShardsGetter(@[])) peerManager.setShardGetter(node.getShardsGetter(@[]))
node.subscriptionManager = SubscriptionManager.new(node)
return node return node
proc peerInfo*(node: WakuNode): PeerInfo = proc peerInfo*(node: WakuNode): PeerInfo =
@ -600,6 +550,9 @@ proc start*(node: WakuNode) {.async.} =
node.startProvidersAndListeners() node.startProvidersAndListeners()
node.subscriptionManager.start().isOkOr:
error "failed to start subscription manager", error = error
if not zeroPortPresent: if not zeroPortPresent:
updateAnnouncedAddrWithPrimaryIpAddr(node).isOkOr: updateAnnouncedAddrWithPrimaryIpAddr(node).isOkOr:
error "failed update announced addr", error = $error error "failed update announced addr", error = $error
@ -611,6 +564,8 @@ proc start*(node: WakuNode) {.async.} =
proc stop*(node: WakuNode) {.async.} = proc stop*(node: WakuNode) {.async.} =
## By stopping the switch we are stopping all the underlying mounted protocols ## By stopping the switch we are stopping all the underlying mounted protocols
await node.subscriptionManager.stop()
node.stopProvidersAndListeners() node.stopProvidersAndListeners()
## NOTE: This will dispatch gossipsub stop to the WakuRelay.stop method override ## NOTE: This will dispatch gossipsub stop to the WakuRelay.stop method override

View File

@ -38,14 +38,14 @@ RequestBroker:
proc signature(protocol: WakuProtocol): Future[Result[RequestProtocolHealth, string]] proc signature(protocol: WakuProtocol): Future[Result[RequestProtocolHealth, string]]
# Get edge filter health for a single shard (set by DeliveryService when edge mode is active) # Get edge filter health for a single shard (set when edge mode is active)
RequestBroker(sync): RequestBroker(sync):
type RequestEdgeShardHealth* = object type RequestEdgeShardHealth* = object
health*: TopicHealth health*: TopicHealth
proc signature(shard: PubsubTopic): Result[RequestEdgeShardHealth, string] proc signature(shard: PubsubTopic): Result[RequestEdgeShardHealth, string]
# Get edge filter confirmed peer count (set by DeliveryService when edge mode is active) # Get edge filter confirmed peer count (set when edge mode is active)
RequestBroker(sync): RequestBroker(sync):
type RequestEdgeFilterPeerCount* = object type RequestEdgeFilterPeerCount* = object
peerCount*: int peerCount*: int