mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-06-04 05:00:02 +00:00
Clean separation between MessagingClient and kernel/core
* Convert DeliveryService into optionally mountable MessagingClient
* Move SubscriptionManager to core layer (WakuNode)
* Ensure libwaku kernel_api/ still works (deprecated; removal pending)
* Create node_types.nim to allow WakuNode to compose subsystems cleanly
* Create node_telemetry.nim to centralize Prometheus types
* Remove unnecessary "ptr Waku" / "addr waku" indirection
* Rename Waku.startWaku -> Waku.start for upcoming Waku rename
* Write complete proc surface for SubscriptionManager (all intents expressible)
* Rename edgeFilterHealthLoop -> edgeFilterConnectionLoop ("Health" means monitoring)
* logosdelivery_start_node calls mountMessagingClient then starts
* libwaku and wakunode2 do not mount messagingClient
* misc refactors/moves, improvements, fixes
This commit is contained in:
parent
c738c7b65e
commit
b451b94085
@ -123,7 +123,7 @@ when isMainModule:
|
||||
error "Waku initialization failed", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
(waitFor startWaku(addr waku)).isOkOr:
|
||||
(waitFor waku.start()).isOkOr:
|
||||
error "Starting waku failed", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
|
||||
@ -55,7 +55,7 @@ when isMainModule:
|
||||
error "Waku initialization failed", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
(waitFor startWaku(addr waku)).isOkOr:
|
||||
(waitFor waku.start()).isOkOr:
|
||||
error "Starting waku failed", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
|
||||
@ -82,8 +82,12 @@ when isMainModule:
|
||||
|
||||
echo("Waku node created successfully!")
|
||||
|
||||
node.mountMessagingClient().isOkOr:
|
||||
echo "Failed to mount messaging: ", error
|
||||
quit(QuitFailure)
|
||||
|
||||
# Start the node
|
||||
(waitFor startWaku(addr node)).isOkOr:
|
||||
(waitFor node.start()).isOkOr:
|
||||
echo "Failed to start node: ", error
|
||||
quit(QuitFailure)
|
||||
|
||||
|
||||
@ -48,7 +48,7 @@ proc setup*(): Waku =
|
||||
error "Waku initialization failed", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
(waitFor startWaku(addr waku)).isOkOr:
|
||||
(waitFor waku.start()).isOkOr:
|
||||
error "Starting waku failed", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
|
||||
@ -172,7 +172,12 @@ proc logosdelivery_start_node(
|
||||
chronicles.error "ConnectionStatusChange.listen failed", err = $error
|
||||
return err("ConnectionStatusChange.listen failed: " & $error)
|
||||
|
||||
(await startWaku(addr ctx.myLib[])).isOkOr:
|
||||
ctx.myLib[].mountMessagingClient().isOkOr:
|
||||
let errMsg = $error
|
||||
chronicles.error "mountMessagingClient failed", err = errMsg
|
||||
return err("failed to mount messaging: " & errMsg)
|
||||
|
||||
(await ctx.myLib[].start()).isOkOr:
|
||||
let errMsg = $error
|
||||
chronicles.error "START_NODE failed", err = errMsg
|
||||
return err("failed to start: " & errMsg)
|
||||
|
||||
@ -71,7 +71,7 @@ registerReqFFI(CreateNodeRequest, ctx: ptr FFIContext[Waku]):
|
||||
proc waku_start(
|
||||
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
|
||||
) {.ffi.} =
|
||||
(await startWaku(ctx[].myLib)).isOkOr:
|
||||
(await ctx.myLib[].start()).isOkOr:
|
||||
error "START_NODE failed", error = error
|
||||
return err("failed to start: " & $error)
|
||||
return ok("")
|
||||
|
||||
@ -103,7 +103,9 @@ suite "LM API health checking":
|
||||
|
||||
client = (await createNode(conf)).valueOr:
|
||||
raiseAssert error
|
||||
(await startWaku(addr client)).isOkOr:
|
||||
client.mountMessagingClient().isOkOr:
|
||||
raiseAssert error
|
||||
(await client.start()).isOkOr:
|
||||
raiseAssert error
|
||||
|
||||
asyncTeardown:
|
||||
@ -281,7 +283,9 @@ suite "LM API health checking":
|
||||
edgeWaku = (await createNode(edgeConf)).valueOr:
|
||||
raiseAssert "Failed to create edge node: " & error
|
||||
|
||||
(await startWaku(addr edgeWaku)).isOkOr:
|
||||
edgeWaku.mountMessagingClient().isOkOr:
|
||||
raiseAssert "Failed to mount edge messaging: " & error
|
||||
(await edgeWaku.start()).isOkOr:
|
||||
raiseAssert "Failed to start edge waku: " & error
|
||||
|
||||
let relayReq = await RequestProtocolHealth.request(
|
||||
|
||||
@ -6,6 +6,7 @@ import libp2p/[peerid, peerinfo, crypto/crypto]
|
||||
import brokers/broker_context
|
||||
import ../testlib/[common, wakucore, wakunode, testasync]
|
||||
import ../waku_archive/archive_utils
|
||||
import waku/messaging_client
|
||||
|
||||
import
|
||||
waku,
|
||||
@ -16,7 +17,6 @@ import
|
||||
waku_relay/protocol,
|
||||
waku_archive,
|
||||
waku_archive/common as archive_common,
|
||||
node/delivery_service/delivery_service,
|
||||
node/delivery_service/recv_service,
|
||||
]
|
||||
import waku/factory/waku_conf
|
||||
@ -147,7 +147,8 @@ suite "Messaging API, Receive Service (store recovery)":
|
||||
subscriber = (await createNode(createApiNodeConf(numShards))).expect(
|
||||
"Failed to create subscriber"
|
||||
)
|
||||
(await startWaku(addr subscriber)).expect("Failed to start subscriber")
|
||||
subscriber.mountMessagingClient().expect("Failed to mount messaging")
|
||||
(await subscriber.start()).expect("Failed to start subscriber")
|
||||
|
||||
# publish after the subscriber exists but before it connects to the
|
||||
# store; the message reaches the archive but the subscriber doesn't
|
||||
@ -185,7 +186,7 @@ suite "Messaging API, Receive Service (store recovery)":
|
||||
await eventManager.teardown()
|
||||
|
||||
# trigger store check, should recover and deliver via MessageReceivedEvent
|
||||
await subscriber.deliveryService.recvService.checkStore()
|
||||
await subscriber.messagingClient.recvService.checkStore()
|
||||
|
||||
let received = await eventManager.waitForEvents(TestTimeout)
|
||||
check received
|
||||
|
||||
@ -241,7 +241,9 @@ suite "Waku API - Send":
|
||||
lockNewGlobalBrokerContext:
|
||||
node = (await createNode(createApiNodeConf())).valueOr:
|
||||
raiseAssert error
|
||||
(await startWaku(addr node)).isOkOr:
|
||||
node.mountMessagingClient().isOkOr:
|
||||
raiseAssert "Failed to mount messaging: " & error
|
||||
(await node.start()).isOkOr:
|
||||
raiseAssert "Failed to start Waku node: " & error
|
||||
# node is not connected !
|
||||
|
||||
@ -263,7 +265,9 @@ suite "Waku API - Send":
|
||||
lockNewGlobalBrokerContext:
|
||||
node = (await createNode(createApiNodeConf())).valueOr:
|
||||
raiseAssert error
|
||||
(await startWaku(addr node)).isOkOr:
|
||||
node.mountMessagingClient().isOkOr:
|
||||
raiseAssert "Failed to mount messaging: " & error
|
||||
(await node.start()).isOkOr:
|
||||
raiseAssert "Failed to start Waku node: " & error
|
||||
|
||||
await node.node.connectToNodes(
|
||||
@ -297,7 +301,9 @@ suite "Waku API - Send":
|
||||
lockNewGlobalBrokerContext:
|
||||
node = (await createNode(createApiNodeConf())).valueOr:
|
||||
raiseAssert error
|
||||
(await startWaku(addr node)).isOkOr:
|
||||
node.mountMessagingClient().isOkOr:
|
||||
raiseAssert "Failed to mount messaging: " & error
|
||||
(await node.start()).isOkOr:
|
||||
raiseAssert "Failed to start Waku node: " & error
|
||||
|
||||
await node.node.connectToNodes(@[relayNode1PeerInfo])
|
||||
@ -327,7 +333,9 @@ suite "Waku API - Send":
|
||||
lockNewGlobalBrokerContext:
|
||||
node = (await createNode(createApiNodeConf())).valueOr:
|
||||
raiseAssert error
|
||||
(await startWaku(addr node)).isOkOr:
|
||||
node.mountMessagingClient().isOkOr:
|
||||
raiseAssert "Failed to mount messaging: " & error
|
||||
(await node.start()).isOkOr:
|
||||
raiseAssert "Failed to start Waku node: " & error
|
||||
|
||||
await node.node.connectToNodes(@[lightpushNodePeerInfo])
|
||||
@ -357,7 +365,9 @@ suite "Waku API - Send":
|
||||
lockNewGlobalBrokerContext:
|
||||
node = (await createNode(createApiNodeConf())).valueOr:
|
||||
raiseAssert error
|
||||
(await startWaku(addr node)).isOkOr:
|
||||
node.mountMessagingClient().isOkOr:
|
||||
raiseAssert "Failed to mount messaging: " & error
|
||||
(await node.start()).isOkOr:
|
||||
raiseAssert "Failed to start Waku node: " & error
|
||||
|
||||
await node.node.connectToNodes(@[lightpushNodePeerInfo, storeNodePeerInfo])
|
||||
@ -411,7 +421,9 @@ suite "Waku API - Send":
|
||||
lockNewGlobalBrokerContext:
|
||||
node = (await createNode(createApiNodeConf(cli_args.WakuMode.Edge))).valueOr:
|
||||
raiseAssert error
|
||||
(await startWaku(addr node)).isOkOr:
|
||||
node.mountMessagingClient().isOkOr:
|
||||
raiseAssert "Failed to mount messaging: " & error
|
||||
(await node.start()).isOkOr:
|
||||
raiseAssert "Failed to start Waku node: " & error
|
||||
|
||||
await node.node.connectToNodes(@[fakeLightpushNodePeerInfo])
|
||||
|
||||
@ -5,6 +5,7 @@ import chronos, testutils/unittests, stew/byteutils
|
||||
import libp2p/[peerid, peerinfo, multiaddress, crypto/crypto]
|
||||
import brokers/broker_context
|
||||
import ../testlib/[common, wakucore, wakunode, testasync]
|
||||
import waku/messaging_client
|
||||
|
||||
import
|
||||
waku,
|
||||
@ -14,13 +15,14 @@ import
|
||||
events/message_events,
|
||||
waku_relay/protocol,
|
||||
node/kernel_api/filter,
|
||||
node/delivery_service/subscription_manager,
|
||||
node/subscription_manager,
|
||||
]
|
||||
import waku/factory/waku_conf
|
||||
import tools/confutils/cli_args
|
||||
|
||||
const TestTimeout = chronos.seconds(10)
|
||||
const NegativeTestTimeout = chronos.seconds(2)
|
||||
const EdgeWaitTimeout = chronos.seconds(60)
|
||||
|
||||
type ReceiveEventListenerManager = ref object
|
||||
brokerCtx: BrokerContext
|
||||
@ -85,7 +87,8 @@ proc setupSubscriberNode(conf: WakuNodeConf): Future[Waku] {.async.} =
|
||||
var node: Waku
|
||||
lockNewGlobalBrokerContext:
|
||||
node = (await createNode(conf)).expect("Failed to create subscriber node")
|
||||
(await startWaku(addr node)).expect("Failed to start subscriber node")
|
||||
node.mountMessagingClient().expect("Failed to mount messaging")
|
||||
(await node.start()).expect("Failed to start subscriber node")
|
||||
return node
|
||||
|
||||
proc setupNetwork(
|
||||
@ -161,20 +164,37 @@ proc getRelayShard(node: WakuNode, contentTopic: ContentTopic): PubsubTopic =
|
||||
return PubsubTopic($shardObj)
|
||||
|
||||
proc waitForMesh(node: WakuNode, shard: PubsubTopic) {.async.} =
|
||||
for _ in 0 ..< 50:
|
||||
let deadline = Moment.now() + EdgeWaitTimeout
|
||||
while Moment.now() < deadline:
|
||||
if node.wakuRelay.getNumPeersInMesh(shard).valueOr(0) > 0:
|
||||
return
|
||||
await sleepAsync(100.milliseconds)
|
||||
raise newException(ValueError, "GossipSub Mesh failed to stabilize on " & shard)
|
||||
|
||||
proc waitForEdgeSubs(w: Waku, shard: PubsubTopic) {.async.} =
|
||||
let sm = w.deliveryService.subscriptionManager
|
||||
for _ in 0 ..< 50:
|
||||
if sm.edgeFilterPeerCount(shard) > 0:
|
||||
let deadline = Moment.now() + EdgeWaitTimeout
|
||||
while Moment.now() < deadline:
|
||||
if w.node.subscriptionManager.edgeFilterPeerCount(shard) > 0:
|
||||
return
|
||||
await sleepAsync(100.milliseconds)
|
||||
raise newException(ValueError, "Edge filter subscription failed on " & shard)
|
||||
|
||||
proc edgePeersReached(w: Waku, shard: PubsubTopic, n: int): Future[bool] {.async.} =
|
||||
let deadline = Moment.now() + EdgeWaitTimeout
|
||||
while Moment.now() < deadline:
|
||||
if w.node.subscriptionManager.edgeFilterPeerCount(shard) >= n:
|
||||
return true
|
||||
await sleepAsync(100.milliseconds)
|
||||
return false
|
||||
|
||||
proc edgePeersDroppedBelow(w: Waku, shard: PubsubTopic, n: int): Future[bool] {.async.} =
|
||||
let deadline = Moment.now() + EdgeWaitTimeout
|
||||
while Moment.now() < deadline:
|
||||
if w.node.subscriptionManager.edgeFilterPeerCount(shard) < n:
|
||||
return true
|
||||
await sleepAsync(100.milliseconds)
|
||||
return false
|
||||
|
||||
proc publishToMesh(
|
||||
net: TestNetwork, contentTopic: ContentTopic, payload: seq[byte]
|
||||
): Future[Result[int, string]] {.async.} =
|
||||
@ -621,7 +641,8 @@ suite "Messaging API, SubscriptionManager":
|
||||
var subscriber: Waku
|
||||
lockNewGlobalBrokerContext:
|
||||
subscriber = (await createNode(conf)).expect("Failed to create edge subscriber")
|
||||
(await startWaku(addr subscriber)).expect("Failed to start edge subscriber")
|
||||
subscriber.mountMessagingClient().expect("Failed to mount messaging")
|
||||
(await subscriber.start()).expect("Failed to start edge subscriber")
|
||||
|
||||
# Connect edge subscriber to both filter servers so selectPeers finds both
|
||||
await subscriber.node.connectToNodes(@[publisherPeerInfo, meshBuddyPeerInfo])
|
||||
@ -632,12 +653,7 @@ suite "Messaging API, SubscriptionManager":
|
||||
(await subscriber.subscribe(testTopic)).expect("Failed to subscribe")
|
||||
|
||||
# Wait for dialing both filter servers (HealthyThreshold = 2)
|
||||
for _ in 0 ..< 100:
|
||||
if subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) >= 2:
|
||||
break
|
||||
await sleepAsync(100.milliseconds)
|
||||
|
||||
check subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) >= 2
|
||||
check await edgePeersReached(subscriber, shard, 2)
|
||||
|
||||
# Verify message delivery with both servers alive
|
||||
await waitForMesh(publisher, shard)
|
||||
@ -659,12 +675,8 @@ suite "Messaging API, SubscriptionManager":
|
||||
await subscriber.node.disconnectNode(meshBuddyPeerInfo)
|
||||
|
||||
# Wait for the dead peer to be pruned
|
||||
for _ in 0 ..< 50:
|
||||
if subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) < 2:
|
||||
break
|
||||
await sleepAsync(100.milliseconds)
|
||||
|
||||
check subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) >= 1
|
||||
check await edgePeersDroppedBelow(subscriber, shard, 2)
|
||||
check subscriber.node.subscriptionManager.edgeFilterPeerCount(shard) >= 1
|
||||
|
||||
# Verify messages still arrive through the surviving filter server (publisher)
|
||||
eventManager = newReceiveEventListenerManager(subscriber.brokerCtx, 1)
|
||||
@ -758,7 +770,8 @@ suite "Messaging API, SubscriptionManager":
|
||||
var subscriber: Waku
|
||||
lockNewGlobalBrokerContext:
|
||||
subscriber = (await createNode(conf)).expect("Failed to create edge subscriber")
|
||||
(await startWaku(addr subscriber)).expect("Failed to start edge subscriber")
|
||||
subscriber.mountMessagingClient().expect("Failed to mount messaging")
|
||||
(await subscriber.start()).expect("Failed to start edge subscriber")
|
||||
|
||||
await subscriber.node.connectToNodes(
|
||||
@[publisherPeerInfo, meshBuddyPeerInfo, sparePeerInfo]
|
||||
@ -770,23 +783,13 @@ suite "Messaging API, SubscriptionManager":
|
||||
(await subscriber.subscribe(testTopic)).expect("Failed to subscribe")
|
||||
|
||||
# Wait for 2 confirmed peers (HealthyThreshold). The 3rd is available but not dialed.
|
||||
for _ in 0 ..< 100:
|
||||
if subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) >= 2:
|
||||
break
|
||||
await sleepAsync(100.milliseconds)
|
||||
|
||||
require subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) ==
|
||||
2
|
||||
check await edgePeersReached(subscriber, shard, 2)
|
||||
require subscriber.node.subscriptionManager.edgeFilterPeerCount(shard) == 2
|
||||
|
||||
await subscriber.node.disconnectNode(meshBuddyPeerInfo)
|
||||
|
||||
# Wait for the sub loop to detect the loss and dial a replacement
|
||||
for _ in 0 ..< 100:
|
||||
if subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) >= 2:
|
||||
break
|
||||
await sleepAsync(100.milliseconds)
|
||||
|
||||
check subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) >= 2
|
||||
check await edgePeersReached(subscriber, shard, 2)
|
||||
|
||||
await waitForMesh(publisher, shard)
|
||||
|
||||
|
||||
@ -15,8 +15,7 @@ import
|
||||
node/health_monitor/protocol_health,
|
||||
node/health_monitor/topic_health,
|
||||
node/health_monitor/node_health_monitor,
|
||||
node/delivery_service/delivery_service,
|
||||
node/delivery_service/subscription_manager,
|
||||
messaging_client,
|
||||
node/kernel_api/relay,
|
||||
node/kernel_api/store,
|
||||
node/kernel_api/lightpush,
|
||||
@ -27,6 +26,7 @@ import
|
||||
]
|
||||
|
||||
import ../testlib/[wakunode, wakucore], ../waku_archive/archive_utils
|
||||
import waku/node/subscription_manager
|
||||
|
||||
const MockDLow = 4 # Mocked GossipSub DLow value
|
||||
|
||||
@ -229,8 +229,8 @@ suite "Health Monitor - events":
|
||||
await nodeA.start()
|
||||
|
||||
let ds =
|
||||
DeliveryService.new(false, nodeA).expect("Failed to create DeliveryService")
|
||||
ds.startDeliveryService().expect("Failed to start DeliveryService")
|
||||
MessagingClient.new(false, nodeA).expect("Failed to create MessagingClient")
|
||||
ds.start().expect("Failed to start MessagingClient")
|
||||
|
||||
let monitorA = NodeHealthMonitor.new(nodeA)
|
||||
|
||||
@ -317,7 +317,7 @@ suite "Health Monitor - events":
|
||||
lastStatus == ConnectionStatus.Disconnected
|
||||
|
||||
await monitorA.stopHealthMonitor()
|
||||
await ds.stopDeliveryService()
|
||||
await ds.stop()
|
||||
await nodeA.stop()
|
||||
|
||||
asyncTest "Edge health driven by confirmed filter subscriptions":
|
||||
@ -333,9 +333,9 @@ suite "Health Monitor - events":
|
||||
await nodeA.start()
|
||||
|
||||
let ds =
|
||||
DeliveryService.new(false, nodeA).expect("Failed to create DeliveryService")
|
||||
ds.startDeliveryService().expect("Failed to start DeliveryService")
|
||||
let subMgr = ds.subscriptionManager
|
||||
MessagingClient.new(false, nodeA).expect("Failed to create MessagingClient")
|
||||
ds.start().expect("Failed to start MessagingClient")
|
||||
let subMgr = nodeA.subscriptionManager
|
||||
|
||||
var nodeB: WakuNode
|
||||
lockNewGlobalBrokerContext:
|
||||
@ -416,7 +416,7 @@ suite "Health Monitor - events":
|
||||
await EventShardTopicHealthChange.dropListener(nodeA.brokerCtx, shardHealthLis)
|
||||
|
||||
check shardHealthOk == true
|
||||
check subMgr.edgeFilterSubStates.len > 0
|
||||
check nodeA.subscriptionManager.edgeFilterSubStates.len > 0
|
||||
|
||||
healthSignal.clear()
|
||||
deadline = Moment.now() + TestConnectivityTimeLimit
|
||||
@ -428,7 +428,7 @@ suite "Health Monitor - events":
|
||||
|
||||
check lastStatus == ConnectionStatus.PartiallyConnected
|
||||
|
||||
await ds.stopDeliveryService()
|
||||
await ds.stop()
|
||||
await monitorA.stopHealthMonitor()
|
||||
await nodeB.stop()
|
||||
await nodeA.stop()
|
||||
|
||||
@ -431,7 +431,7 @@ suite "Waku Discovery v5":
|
||||
|
||||
let waku0 = (await Waku.new(conf)).valueOr:
|
||||
raiseAssert error
|
||||
(waitFor startWaku(addr waku0)).isOkOr:
|
||||
(waitFor waku0.start()).isOkOr:
|
||||
raiseAssert error
|
||||
|
||||
confBuilder.withNodeKey(crypto.PrivateKey.random(Secp256k1, myRng[])[])
|
||||
@ -445,7 +445,7 @@ suite "Waku Discovery v5":
|
||||
|
||||
let waku1 = (await Waku.new(conf1)).valueOr:
|
||||
raiseAssert error
|
||||
(waitFor startWaku(addr waku1)).isOkOr:
|
||||
(waitFor waku1.start()).isOkOr:
|
||||
raiseAssert error
|
||||
|
||||
await waku1.node.mountPeerExchange()
|
||||
@ -461,7 +461,7 @@ suite "Waku Discovery v5":
|
||||
|
||||
let waku2 = (await Waku.new(conf2)).valueOr:
|
||||
raiseAssert error
|
||||
(waitFor startWaku(addr waku2)).isOkOr:
|
||||
(waitFor waku2.start()).isOkOr:
|
||||
raiseAssert error
|
||||
|
||||
# leave some time for discv5 to act
|
||||
|
||||
@ -698,6 +698,13 @@ suite "WakuNode - Relay":
|
||||
node.unsubscribe((kind: ContentUnsub, topic: contentTopicB)).isOkOr:
|
||||
assert false, "Failed to unsubscribe to topic: " & $error
|
||||
|
||||
check node.wakuRelay.isSubscribed(shard)
|
||||
|
||||
node.unsubscribe((kind: ContentUnsub, topic: contentTopicA)).isOkOr:
|
||||
assert false, "Failed to unsubscribe to topic: " & $error
|
||||
node.unsubscribe((kind: ContentUnsub, topic: contentTopicC)).isOkOr:
|
||||
assert false, "Failed to unsubscribe to topic: " & $error
|
||||
|
||||
## After unsubcription, the node should not be subscribed to the shard anymore
|
||||
check not node.wakuRelay.isSubscribed(shard)
|
||||
|
||||
|
||||
@ -46,7 +46,7 @@ suite "Wakunode2 - Waku initialization":
|
||||
var waku = (waitFor Waku.new(conf)).valueOr:
|
||||
raiseAssert error
|
||||
|
||||
(waitFor startWaku(addr waku)).isOkOr:
|
||||
(waitFor waku.start()).isOkOr:
|
||||
raiseAssert error
|
||||
|
||||
## Then
|
||||
@ -71,7 +71,7 @@ suite "Wakunode2 - Waku initialization":
|
||||
var waku = (waitFor Waku.new(conf)).valueOr:
|
||||
raiseAssert error
|
||||
|
||||
(waitFor startWaku(addr waku)).isOkOr:
|
||||
(waitFor waku.start()).isOkOr:
|
||||
raiseAssert error
|
||||
|
||||
## Then
|
||||
@ -128,7 +128,7 @@ suite "Wakunode2 - Waku initialization":
|
||||
(waitFor waku.stop()).isOkOr:
|
||||
raiseAssert error
|
||||
|
||||
(waitFor startWaku(addr waku)).isOkOr:
|
||||
(waitFor waku.start()).isOkOr:
|
||||
raiseAssert error
|
||||
|
||||
let portsJson = waku.stateInfo.getNodeInfoItem(NodeInfoId.MyBoundPorts)
|
||||
|
||||
@ -1,9 +1,10 @@
|
||||
import chronicles, chronos, results
|
||||
|
||||
import waku/factory/waku
|
||||
import waku/messaging_client
|
||||
import waku/[requests/health_requests, waku_core, waku_node]
|
||||
import waku/node/delivery_service/send_service
|
||||
import waku/node/delivery_service/subscription_manager
|
||||
import waku/node/subscription_manager
|
||||
import libp2p/peerid
|
||||
import ../../tools/confutils/cli_args
|
||||
import ./[api_conf, types]
|
||||
@ -38,24 +39,24 @@ proc subscribe*(
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
?checkApiAvailability(w)
|
||||
|
||||
return w.deliveryService.subscriptionManager.subscribe(contentTopic)
|
||||
return w.node.subscriptionManager.subscribe(contentTopic)
|
||||
|
||||
proc unsubscribe*(w: Waku, contentTopic: ContentTopic): Result[void, string] =
|
||||
?checkApiAvailability(w)
|
||||
|
||||
return w.deliveryService.subscriptionManager.unsubscribe(contentTopic)
|
||||
return w.node.subscriptionManager.unsubscribe(contentTopic)
|
||||
|
||||
proc send*(
|
||||
w: Waku, envelope: MessageEnvelope
|
||||
): Future[Result[RequestId, string]] {.async.} =
|
||||
?checkApiAvailability(w)
|
||||
|
||||
let isSubbed = w.deliveryService.subscriptionManager
|
||||
let isSubbed = w.node.subscriptionManager
|
||||
.isSubscribed(envelope.contentTopic)
|
||||
.valueOr(false)
|
||||
if not isSubbed:
|
||||
info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic
|
||||
w.deliveryService.subscriptionManager.subscribe(envelope.contentTopic).isOkOr:
|
||||
w.node.subscriptionManager.subscribe(envelope.contentTopic).isOkOr:
|
||||
warn "Failed to auto-subscribe", error = error
|
||||
return err("Failed to auto-subscribe before sending: " & error)
|
||||
|
||||
@ -71,6 +72,6 @@ proc send*(
|
||||
msgHash = deliveryTask.msgHash.to0xHex(),
|
||||
myPeerId = w.node.peerId()
|
||||
|
||||
asyncSpawn w.deliveryService.sendService.send(deliveryTask)
|
||||
asyncSpawn w.messagingClient.sendService.send(deliveryTask)
|
||||
|
||||
return ok(requestId)
|
||||
|
||||
@ -30,12 +30,12 @@ import
|
||||
waku_enr/sharding,
|
||||
waku_enr/multiaddr,
|
||||
api/types,
|
||||
messaging_client,
|
||||
common/logging,
|
||||
node/peer_manager,
|
||||
node/health_monitor,
|
||||
node/waku_metrics,
|
||||
node/delivery_service/delivery_service,
|
||||
node/delivery_service/subscription_manager,
|
||||
node/subscription_manager,
|
||||
rest_api/message_cache,
|
||||
rest_api/endpoint/server,
|
||||
rest_api/endpoint/builder as rest_server_builder,
|
||||
@ -73,7 +73,7 @@ type Waku* = ref object
|
||||
|
||||
healthMonitor*: NodeHealthMonitor
|
||||
|
||||
deliveryService*: DeliveryService
|
||||
messagingClient*: MessagingClient
|
||||
|
||||
restServer*: WakuRestServerRef
|
||||
metricsServer*: MetricsHttpServerRef
|
||||
@ -215,10 +215,6 @@ proc new*(
|
||||
error "Failed setting up app callbacks", error = error
|
||||
return err("Failed setting up app callbacks: " & $error)
|
||||
|
||||
## Delivery Monitor
|
||||
let deliveryService = DeliveryService.new(wakuConf.p2pReliability, node).valueOr:
|
||||
return err("could not create delivery service: " & $error)
|
||||
|
||||
var waku = Waku(
|
||||
stateInfo: WakuStateInfo.init(node),
|
||||
conf: wakuConf,
|
||||
@ -226,7 +222,6 @@ proc new*(
|
||||
key: wakuConf.nodeKey,
|
||||
node: node,
|
||||
healthMonitor: healthMonitor,
|
||||
deliveryService: deliveryService,
|
||||
appCallbacks: appCallbacks,
|
||||
restServer: restServer,
|
||||
brokerCtx: brokerCtx,
|
||||
@ -254,9 +249,9 @@ proc getPorts(
|
||||
|
||||
return ok((tcpPort: tcpPort, websocketPort: websocketPort))
|
||||
|
||||
proc getRunningNetConfig(waku: ptr Waku): Future[Result[NetConfig, string]] {.async.} =
|
||||
let conf = waku[].conf
|
||||
let (tcpPort, websocketPort) = getPorts(waku[].node.switch.peerInfo.listenAddrs).valueOr:
|
||||
proc getRunningNetConfig(waku: Waku): Future[Result[NetConfig, string]] {.async.} =
|
||||
let conf = waku.conf
|
||||
let (tcpPort, websocketPort) = getPorts(waku.node.switch.peerInfo.listenAddrs).valueOr:
|
||||
return err("Could not retrieve ports: " & error)
|
||||
|
||||
if tcpPort.isSome():
|
||||
@ -276,67 +271,67 @@ proc getRunningNetConfig(waku: ptr Waku): Future[Result[NetConfig, string]] {.as
|
||||
|
||||
return ok(netConf)
|
||||
|
||||
proc updateEnr(waku: ptr Waku): Future[Result[void, string]] {.async.} =
|
||||
proc updateEnr(waku: Waku): Future[Result[void, string]] {.async.} =
|
||||
let netConf: NetConfig = (await getRunningNetConfig(waku)).valueOr:
|
||||
return err("error calling updateNetConfig: " & $error)
|
||||
let record = enrConfiguration(waku[].conf, netConf).valueOr:
|
||||
let record = enrConfiguration(waku.conf, netConf).valueOr:
|
||||
return err("ENR setup failed: " & error)
|
||||
|
||||
if isClusterMismatched(record, waku[].conf.clusterId):
|
||||
if isClusterMismatched(record, waku.conf.clusterId):
|
||||
return err("cluster-id mismatch configured shards")
|
||||
|
||||
waku[].node.enr = record
|
||||
waku.node.enr = record
|
||||
|
||||
# If TCP/WS was configured with port 0, node.announcedAddresses was built
|
||||
# pre-bind with a port value of 0. In any case, the resync is harmless.
|
||||
waku[].node.announcedAddresses = netConf.announcedAddresses
|
||||
waku.node.announcedAddresses = netConf.announcedAddresses
|
||||
|
||||
return ok()
|
||||
|
||||
proc updateAddressInENR(waku: ptr Waku): Result[void, string] =
|
||||
let addresses: seq[MultiAddress] = waku[].node.announcedAddresses
|
||||
proc updateAddressInENR(waku: Waku): Result[void, string] =
|
||||
let addresses: seq[MultiAddress] = waku.node.announcedAddresses
|
||||
let encodedAddrs = multiaddr.encodeMultiaddrs(addresses)
|
||||
|
||||
## First update the enr info contained in WakuNode
|
||||
let keyBytes = waku[].key.getRawBytes().valueOr:
|
||||
let keyBytes = waku.key.getRawBytes().valueOr:
|
||||
return err("failed to retrieve raw bytes from waku key: " & $error)
|
||||
|
||||
let parsedPk = keys.PrivateKey.fromHex(keyBytes.toHex()).valueOr:
|
||||
return err("failed to parse the private key: " & $error)
|
||||
|
||||
let enrFields = @[toFieldPair(MultiaddrEnrField, encodedAddrs)]
|
||||
waku[].node.enr.update(parsedPk, extraFields = enrFields).isOkOr:
|
||||
waku.node.enr.update(parsedPk, extraFields = enrFields).isOkOr:
|
||||
return err("failed to update multiaddress in ENR updateAddressInENR: " & $error)
|
||||
|
||||
info "Waku node ENR updated successfully with new multiaddress",
|
||||
enr = waku[].node.enr.toUri(), record = $(waku[].node.enr)
|
||||
enr = waku.node.enr.toUri(), record = $(waku.node.enr)
|
||||
|
||||
## Now update the ENR infor in discv5
|
||||
if not waku[].wakuDiscv5.isNil():
|
||||
waku[].wakuDiscv5.protocol.localNode.record = waku[].node.enr
|
||||
let enr = waku[].wakuDiscv5.protocol.localNode.record
|
||||
if not waku.wakuDiscv5.isNil():
|
||||
waku.wakuDiscv5.protocol.localNode.record = waku.node.enr
|
||||
let enr = waku.wakuDiscv5.protocol.localNode.record
|
||||
|
||||
info "Waku discv5 ENR updated successfully with new multiaddress",
|
||||
enr = enr.toUri(), record = $(enr)
|
||||
|
||||
return ok()
|
||||
|
||||
proc updateWaku(waku: ptr Waku): Future[Result[void, string]] {.async.} =
|
||||
proc updateWaku(waku: Waku): Future[Result[void, string]] {.async.} =
|
||||
(await updateEnr(waku)).isOkOr:
|
||||
return err("error calling updateEnr: " & $error)
|
||||
|
||||
?updateAnnouncedAddrWithPrimaryIpAddr(waku[].node)
|
||||
?updateAnnouncedAddrWithPrimaryIpAddr(waku.node)
|
||||
|
||||
?updateAddressInENR(waku)
|
||||
|
||||
return ok()
|
||||
|
||||
proc startDnsDiscoveryRetryLoop(waku: ptr Waku): Future[void] {.async.} =
|
||||
proc startDnsDiscoveryRetryLoop(waku: Waku): Future[void] {.async.} =
|
||||
while true:
|
||||
await sleepAsync(30.seconds)
|
||||
if waku.conf.dnsDiscoveryConf.isSome():
|
||||
let dnsDiscoveryConf = waku.conf.dnsDiscoveryConf.get()
|
||||
waku[].dynamicBootstrapNodes = (
|
||||
waku.dynamicBootstrapNodes = (
|
||||
await waku_dnsdisc.retrieveDynamicBootstrapNodes(
|
||||
dnsDiscoveryConf.enrTreeUrl, dnsDiscoveryConf.nameServers
|
||||
)
|
||||
@ -344,8 +339,8 @@ proc startDnsDiscoveryRetryLoop(waku: ptr Waku): Future[void] {.async.} =
|
||||
error "Retrieving dynamic bootstrap nodes failed", error = error
|
||||
continue
|
||||
|
||||
if not waku[].wakuDiscv5.isNil():
|
||||
let dynamicBootstrapEnrs = waku[].dynamicBootstrapNodes
|
||||
if not waku.wakuDiscv5.isNil():
|
||||
let dynamicBootstrapEnrs = waku.dynamicBootstrapNodes
|
||||
.filterIt(it.hasUdpPort())
|
||||
.mapIt(it.enr.get().toUri())
|
||||
var discv5BootstrapEnrs: seq[enr.Record]
|
||||
@ -353,26 +348,35 @@ proc startDnsDiscoveryRetryLoop(waku: ptr Waku): Future[void] {.async.} =
|
||||
for enrUri in dynamicBootstrapEnrs:
|
||||
addBootstrapNode(enrUri, discv5BootstrapEnrs)
|
||||
|
||||
waku[].wakuDiscv5.updateBootstrapRecords(
|
||||
waku[].wakuDiscv5.protocol.bootstrapRecords & discv5BootstrapEnrs
|
||||
waku.wakuDiscv5.updateBootstrapRecords(
|
||||
waku.wakuDiscv5.protocol.bootstrapRecords & discv5BootstrapEnrs
|
||||
)
|
||||
|
||||
info "Connecting to dynamic bootstrap peers"
|
||||
try:
|
||||
await connectToNodes(
|
||||
waku[].node, waku[].dynamicBootstrapNodes, "dynamic bootstrap"
|
||||
waku.node, waku.dynamicBootstrapNodes, "dynamic bootstrap"
|
||||
)
|
||||
except CatchableError:
|
||||
error "failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg()
|
||||
return
|
||||
|
||||
proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: []).} =
|
||||
if waku[].node.started:
|
||||
warn "startWaku: waku node already started"
|
||||
proc mountMessagingClient*(waku: Waku): Result[void, string] =
|
||||
if not waku.messagingClient.isNil():
|
||||
return err("messaging client already mounted")
|
||||
if waku.node.started:
|
||||
return err("cannot mount messaging client on a started node")
|
||||
waku.messagingClient = MessagingClient.new(waku.conf.p2pReliability, waku.node).valueOr:
|
||||
return err("could not create messaging client: " & $error)
|
||||
return ok()
|
||||
|
||||
proc start*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
|
||||
if waku.node.started:
|
||||
warn "start: waku node already started"
|
||||
return ok()
|
||||
|
||||
info "Retrieve dynamic bootstrap nodes"
|
||||
let conf = waku[].conf
|
||||
let conf = waku.conf
|
||||
|
||||
if conf.dnsDiscoveryConf.isSome():
|
||||
let dnsDiscoveryConf = waku.conf.dnsDiscoveryConf.get()
|
||||
@ -390,9 +394,9 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
|
||||
error "Retrieving dynamic bootstrap nodes failed",
|
||||
error = dynamicBootstrapNodesRes.error
|
||||
# Start Dns Discovery retry loop
|
||||
waku[].dnsRetryLoopHandle = waku.startDnsDiscoveryRetryLoop()
|
||||
waku.dnsRetryLoopHandle = waku.startDnsDiscoveryRetryLoop()
|
||||
else:
|
||||
waku[].dynamicBootstrapNodes = dynamicBootstrapNodesRes.get()
|
||||
waku.dynamicBootstrapNodes = dynamicBootstrapNodesRes.get()
|
||||
|
||||
## Initialize persistency singleton instance - we don't need the instance itself here,
|
||||
## but this ensures it's initialized before any store job starts.
|
||||
@ -405,12 +409,12 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
|
||||
|
||||
let bound = getPorts(waku.node.switch.peerInfo.listenAddrs).valueOr:
|
||||
return err("failed to read bound ports from switch: " & $error)
|
||||
waku[].node.ports.tcp = bound.tcpPort.get(Port(0)).uint16
|
||||
waku[].node.ports.webSocket = bound.websocketPort.get(Port(0)).uint16
|
||||
waku.node.ports.tcp = bound.tcpPort.get(Port(0)).uint16
|
||||
waku.node.ports.webSocket = bound.websocketPort.get(Port(0)).uint16
|
||||
|
||||
## Discv5
|
||||
if conf.discv5Conf.isSome():
|
||||
waku[].wakuDiscV5 = (
|
||||
waku.wakuDiscV5 = (
|
||||
await waku_discv5.setupAndStartDiscv5(
|
||||
waku.node.enr,
|
||||
waku.node.peerManager,
|
||||
@ -425,23 +429,21 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
|
||||
).valueOr:
|
||||
return err("failed to start waku discovery v5: " & error)
|
||||
|
||||
waku[].node.ports.discv5Udp = waku[].wakuDiscV5.udpPort.uint16
|
||||
waku[].conf.discv5Conf.get().udpPort = waku[].wakuDiscV5.udpPort
|
||||
waku.node.ports.discv5Udp = waku.wakuDiscV5.udpPort.uint16
|
||||
waku.conf.discv5Conf.get().udpPort = waku.wakuDiscV5.udpPort
|
||||
|
||||
## Update waku data that is set dynamically on node start
|
||||
try:
|
||||
(await updateWaku(waku)).isOkOr:
|
||||
return err("Error in startWaku: " & $error)
|
||||
return err("Error in start: " & $error)
|
||||
except CatchableError:
|
||||
return err("Caught exception in startWaku: " & getCurrentExceptionMsg())
|
||||
return err("Caught exception in start: " & getCurrentExceptionMsg())
|
||||
|
||||
## Reliability
|
||||
if not waku[].deliveryService.isNil():
|
||||
waku[].deliveryService.startDeliveryService().isOkOr:
|
||||
return err("failed to start delivery service: " & $error)
|
||||
waku.node.subscriptionManager.subscribeAllAutoshards().isOkOr:
|
||||
return err("failed to auto-subscribe autosharding shards: " & $error)
|
||||
|
||||
## Health Monitor
|
||||
waku[].healthMonitor.startHealthMonitor().isOkOr:
|
||||
waku.healthMonitor.startHealthMonitor().isOkOr:
|
||||
return err("failed to start health monitor: " & $error)
|
||||
|
||||
## Setup RequestConnectionStatus provider
|
||||
@ -450,7 +452,7 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
|
||||
globalBrokerContext(),
|
||||
proc(): Result[RequestConnectionStatus, string] =
|
||||
try:
|
||||
let healthReport = waku[].healthMonitor.getSyncNodeHealthReport()
|
||||
let healthReport = waku.healthMonitor.getSyncNodeHealthReport()
|
||||
return
|
||||
ok(RequestConnectionStatus(connectionStatus: healthReport.connectionStatus))
|
||||
except CatchableError:
|
||||
@ -467,7 +469,7 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
|
||||
): Future[Result[RequestProtocolHealth, string]] {.async.} =
|
||||
try:
|
||||
let protocolHealthStatus =
|
||||
await waku[].healthMonitor.getProtocolHealthInfo(protocol)
|
||||
await waku.healthMonitor.getProtocolHealthInfo(protocol)
|
||||
return ok(RequestProtocolHealth(healthStatus: protocolHealthStatus))
|
||||
except CatchableError:
|
||||
return err("Failed to get protocol health: " & getCurrentExceptionMsg()),
|
||||
@ -480,7 +482,7 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
|
||||
globalBrokerContext(),
|
||||
proc(): Future[Result[RequestHealthReport, string]] {.async.} =
|
||||
try:
|
||||
let report = await waku[].healthMonitor.getNodeHealthReport()
|
||||
let report = await waku.healthMonitor.getNodeHealthReport()
|
||||
return ok(RequestHealthReport(healthReport: report))
|
||||
except CatchableError:
|
||||
return err("Failed to get health report: " & getCurrentExceptionMsg()),
|
||||
@ -489,9 +491,9 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
|
||||
|
||||
if conf.restServerConf.isSome():
|
||||
rest_server_builder.startRestServerProtocolSupport(
|
||||
waku[].restServer,
|
||||
waku[].node,
|
||||
waku[].wakuDiscv5,
|
||||
waku.restServer,
|
||||
waku.node,
|
||||
waku.wakuDiscv5,
|
||||
conf.restServerConf.get(),
|
||||
conf.relay,
|
||||
conf.lightPush,
|
||||
@ -509,21 +511,23 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
|
||||
)
|
||||
).valueOr:
|
||||
return err("Starting monitoring and external interfaces failed: " & error)
|
||||
waku[].metricsServer = server
|
||||
waku[].node.ports.metrics = port.uint16
|
||||
waku[].conf.metricsServerConf.get().httpPort = port
|
||||
waku.metricsServer = server
|
||||
waku.node.ports.metrics = port.uint16
|
||||
waku.conf.metricsServerConf.get().httpPort = port
|
||||
except CatchableError:
|
||||
return err(
|
||||
"Caught exception starting monitoring and external interfaces failed: " &
|
||||
getCurrentExceptionMsg()
|
||||
)
|
||||
waku[].healthMonitor.setOverallHealth(HealthStatus.READY)
|
||||
waku.healthMonitor.setOverallHealth(HealthStatus.READY)
|
||||
|
||||
if not waku.messagingClient.isNil():
|
||||
waku.messagingClient.start().isOkOr:
|
||||
return err("failed to start messaging client: " & $error)
|
||||
|
||||
return ok()
|
||||
|
||||
proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
|
||||
## Waku shutdown
|
||||
|
||||
if not waku.node.started:
|
||||
warn "stop: attempting to stop node that isn't running"
|
||||
|
||||
@ -538,9 +542,8 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
|
||||
if not waku.wakuDiscv5.isNil():
|
||||
await waku.wakuDiscv5.stop()
|
||||
|
||||
if not waku.deliveryService.isNil():
|
||||
await waku.deliveryService.stopDeliveryService()
|
||||
waku.deliveryService = nil
|
||||
if not waku.messagingClient.isNil():
|
||||
await waku.messagingClient.stop()
|
||||
|
||||
if not waku.node.isNil():
|
||||
await waku.node.stop()
|
||||
|
||||
31
waku/messaging_client.nim
Normal file
31
waku/messaging_client.nim
Normal file
@ -0,0 +1,31 @@
|
||||
import results, chronos
|
||||
import
|
||||
./node/waku_node,
|
||||
./node/delivery_service/[recv_service, send_service]
|
||||
|
||||
type MessagingClient* = ref object
|
||||
sendService*: SendService
|
||||
recvService*: RecvService
|
||||
started: bool
|
||||
|
||||
proc new*(
|
||||
T: type MessagingClient, useP2PReliability: bool, node: WakuNode
|
||||
): Result[T, string] =
|
||||
let sendService = ?SendService.new(useP2PReliability, node)
|
||||
let recvService = RecvService.new(node)
|
||||
ok(T(sendService: sendService, recvService: recvService))
|
||||
|
||||
proc start*(self: MessagingClient): Result[void, string] =
|
||||
if self.started:
|
||||
return ok()
|
||||
self.recvService.startRecvService()
|
||||
self.sendService.startSendService()
|
||||
self.started = true
|
||||
ok()
|
||||
|
||||
proc stop*(self: MessagingClient) {.async.} =
|
||||
if not self.started:
|
||||
return
|
||||
await self.sendService.stopSendService()
|
||||
await self.recvService.stopRecvService()
|
||||
self.started = false
|
||||
@ -1,44 +0,0 @@
|
||||
## This module helps to ensure the correct transmission and reception of messages
|
||||
|
||||
import results
|
||||
import chronos, chronicles
|
||||
import
|
||||
./recv_service,
|
||||
./send_service,
|
||||
./subscription_manager,
|
||||
waku/[
|
||||
waku_core, waku_node, waku_store/client, waku_relay/protocol, waku_lightpush/client
|
||||
]
|
||||
|
||||
type DeliveryService* = ref object
|
||||
sendService*: SendService
|
||||
recvService*: RecvService
|
||||
subscriptionManager*: SubscriptionManager
|
||||
|
||||
proc new*(
|
||||
T: type DeliveryService, useP2PReliability: bool, w: WakuNode
|
||||
): Result[T, string] =
|
||||
## storeClient is needed to give store visitility to DeliveryService
|
||||
## wakuRelay and wakuLightpushClient are needed to give a mechanism to SendService to re-publish
|
||||
let subscriptionManager = SubscriptionManager.new(w)
|
||||
let sendService = ?SendService.new(useP2PReliability, w, subscriptionManager)
|
||||
let recvService = RecvService.new(w, subscriptionManager)
|
||||
|
||||
return ok(
|
||||
DeliveryService(
|
||||
sendService: sendService,
|
||||
recvService: recvService,
|
||||
subscriptionManager: subscriptionManager,
|
||||
)
|
||||
)
|
||||
|
||||
proc startDeliveryService*(self: DeliveryService): Result[void, string] =
|
||||
?self.subscriptionManager.startSubscriptionManager()
|
||||
self.recvService.startRecvService()
|
||||
self.sendService.startSendService()
|
||||
return ok()
|
||||
|
||||
proc stopDeliveryService*(self: DeliveryService) {.async.} =
|
||||
await self.sendService.stopSendService()
|
||||
await self.recvService.stopRecvService()
|
||||
await self.subscriptionManager.stopSubscriptionManager()
|
||||
@ -4,17 +4,17 @@
|
||||
|
||||
import std/[tables, sequtils, options, sets]
|
||||
import chronos, chronicles, libp2p/utility
|
||||
import ../[subscription_manager]
|
||||
import brokers/broker_context
|
||||
import
|
||||
waku/[
|
||||
waku_core,
|
||||
waku_core/topics,
|
||||
waku_store/client,
|
||||
waku_store/common,
|
||||
waku_filter_v2/client,
|
||||
waku_core/topics,
|
||||
events/message_events,
|
||||
waku_node,
|
||||
node/subscription_manager,
|
||||
]
|
||||
|
||||
const StoreCheckPeriod = chronos.minutes(5) ## How often to perform store queries
|
||||
@ -38,7 +38,6 @@ type RecvService* = ref object of RootObj
|
||||
brokerCtx: BrokerContext
|
||||
node: WakuNode
|
||||
seenMsgListener: MessageSeenEventListener
|
||||
subscriptionManager: SubscriptionManager
|
||||
|
||||
recentReceivedMsgs: seq[RecvMessage]
|
||||
|
||||
@ -77,7 +76,7 @@ proc processIncomingMessage(
|
||||
## or if the message is a duplicate (recently-seen). Otherwise, save it as
|
||||
## recently-seen, emit a MessageReceivedEvent, and return true.
|
||||
|
||||
if not self.subscriptionManager.isSubscribed(pubsubTopic, message.contentTopic):
|
||||
if not self.node.subscriptionManager.isContentSubscribed(pubsubTopic, message.contentTopic):
|
||||
trace "skipping message as I am not subscribed",
|
||||
shard = pubsubTopic, contentTopic = message.contentTopic
|
||||
return false
|
||||
@ -101,7 +100,7 @@ proc checkStore*(self: RecvService) {.async.} =
|
||||
self.endTimeToCheck = getNowInNanosecondTime()
|
||||
|
||||
## query store and deliver new recovered messages per subscribed topic
|
||||
for pubsubTopic, contentTopics in self.subscriptionManager.subscribedTopics:
|
||||
for pubsubTopic, contentTopics in self.node.subscriptionManager.subscribedContentTopics:
|
||||
let storeResp: StoreQueryResponse = (
|
||||
await self.node.wakuStoreClient.queryToAny(
|
||||
StoreQueryRequest(
|
||||
@ -146,7 +145,7 @@ proc msgChecker(self: RecvService) {.async.} =
|
||||
await sleepAsync(StoreCheckPeriod)
|
||||
await self.checkStore()
|
||||
|
||||
proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionManager): T =
|
||||
proc new*(T: typedesc[RecvService], node: WakuNode): T =
|
||||
## The storeClient will help to acquire any possible missed messages
|
||||
|
||||
let now = getNowInNanosecondTime()
|
||||
@ -154,7 +153,6 @@ proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionManager): T =
|
||||
node: node,
|
||||
startTimeToCheck: now,
|
||||
brokerCtx: node.brokerCtx,
|
||||
subscriptionManager: s,
|
||||
recentReceivedMsgs: @[],
|
||||
)
|
||||
|
||||
|
||||
@ -6,10 +6,10 @@ import chronos, chronicles, libp2p/utility
|
||||
import brokers/broker_context
|
||||
import
|
||||
./[send_processor, relay_processor, lightpush_processor, delivery_task],
|
||||
../[subscription_manager],
|
||||
waku/[
|
||||
waku_core,
|
||||
node/waku_node,
|
||||
node/subscription_manager,
|
||||
node/peer_manager,
|
||||
waku_store/client,
|
||||
waku_store/common,
|
||||
@ -58,7 +58,6 @@ type SendService* = ref object of RootObj
|
||||
|
||||
node: WakuNode
|
||||
checkStoreForMessages: bool
|
||||
subscriptionManager: SubscriptionManager
|
||||
|
||||
proc setupSendProcessorChain(
|
||||
peerManager: PeerManager,
|
||||
@ -99,7 +98,6 @@ proc new*(
|
||||
T: typedesc[SendService],
|
||||
preferP2PReliability: bool,
|
||||
w: WakuNode,
|
||||
s: SubscriptionManager,
|
||||
): Result[T, string] =
|
||||
if w.wakuRelay.isNil() and w.wakuLightpushClient.isNil():
|
||||
return err(
|
||||
@ -120,7 +118,6 @@ proc new*(
|
||||
sendProcessor: sendProcessorChain,
|
||||
node: w,
|
||||
checkStoreForMessages: checkStoreForMessages,
|
||||
subscriptionManager: s,
|
||||
)
|
||||
|
||||
return ok(sendService)
|
||||
@ -263,7 +260,7 @@ proc send*(self: SendService, task: DeliveryTask) {.async.} =
|
||||
info "SendService.send: processing delivery task",
|
||||
requestId = task.requestId, msgHash = task.msgHash.to0xHex()
|
||||
|
||||
self.subscriptionManager.subscribe(task.msg.contentTopic).isOkOr:
|
||||
self.node.subscriptionManager.subscribe(task.msg.contentTopic).isOkOr:
|
||||
error "SendService.send: failed to subscribe to content topic",
|
||||
contentTopic = task.msg.contentTopic, error = error
|
||||
|
||||
|
||||
@ -14,6 +14,7 @@ import
|
||||
events/health_events,
|
||||
events/peer_events,
|
||||
node/waku_node,
|
||||
node/node_telemetry,
|
||||
node/peer_manager,
|
||||
node/kernel_api,
|
||||
node/health_monitor/online_monitor,
|
||||
|
||||
@ -21,6 +21,7 @@ import
|
||||
|
||||
import
|
||||
../waku_node,
|
||||
../node_telemetry,
|
||||
../../waku_core,
|
||||
../../waku_core/topics/sharding,
|
||||
../../waku_filter_v2,
|
||||
|
||||
@ -19,6 +19,7 @@ import
|
||||
|
||||
import
|
||||
../waku_node,
|
||||
../node_telemetry,
|
||||
../../waku_peer_exchange,
|
||||
../../waku_core,
|
||||
../peer_manager,
|
||||
|
||||
@ -29,90 +29,18 @@ import
|
||||
waku_store_sync,
|
||||
waku_rln_relay,
|
||||
node/waku_node,
|
||||
node/subscription_manager,
|
||||
node/peer_manager,
|
||||
events/message_events,
|
||||
]
|
||||
|
||||
export waku_relay.WakuRelayHandler
|
||||
|
||||
declarePublicHistogram waku_histogram_message_size,
|
||||
"message size histogram in kB",
|
||||
buckets = [
|
||||
0.0, 1.0, 3.0, 5.0, 15.0, 50.0, 75.0, 100.0, 125.0, 150.0, 500.0, 700.0, 1000.0, Inf
|
||||
]
|
||||
|
||||
logScope:
|
||||
topics = "waku node relay api"
|
||||
|
||||
## Waku relay
|
||||
|
||||
proc registerRelayHandler(
|
||||
node: WakuNode, topic: PubsubTopic, appHandler: WakuRelayHandler = nil
|
||||
): bool =
|
||||
## Registers the only handler for the given topic.
|
||||
## Notice that this handler internally calls other handlers, such as filter,
|
||||
## archive, etc, plus the handler provided by the application.
|
||||
## Returns `true` if a mesh subscription was created or `false` if the relay
|
||||
## was already subscribed to the topic.
|
||||
|
||||
let alreadySubscribed = node.wakuRelay.isSubscribed(topic)
|
||||
|
||||
if not appHandler.isNil():
|
||||
if not alreadySubscribed or not node.legacyAppHandlers.hasKey(topic):
|
||||
node.legacyAppHandlers[topic] = appHandler
|
||||
else:
|
||||
debug "Legacy appHandler already exists for active PubsubTopic, ignoring new handler",
|
||||
topic = topic
|
||||
|
||||
if alreadySubscribed:
|
||||
return false
|
||||
|
||||
proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||
let msgSizeKB = msg.payload.len / 1000
|
||||
|
||||
waku_node_messages.inc(labelValues = ["relay"])
|
||||
waku_histogram_message_size.observe(msgSizeKB)
|
||||
|
||||
proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||
if node.wakuFilter.isNil():
|
||||
return
|
||||
|
||||
await node.wakuFilter.handleMessage(topic, msg)
|
||||
|
||||
proc archiveHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||
if node.wakuArchive.isNil():
|
||||
return
|
||||
|
||||
await node.wakuArchive.handleMessage(topic, msg)
|
||||
|
||||
proc syncHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||
if node.wakuStoreReconciliation.isNil():
|
||||
return
|
||||
|
||||
node.wakuStoreReconciliation.messageIngress(topic, msg)
|
||||
|
||||
proc internalHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||
MessageSeenEvent.emit(node.brokerCtx, topic, msg)
|
||||
|
||||
let uniqueTopicHandler = proc(
|
||||
topic: PubsubTopic, msg: WakuMessage
|
||||
): Future[void] {.async, gcsafe.} =
|
||||
await traceHandler(topic, msg)
|
||||
await filterHandler(topic, msg)
|
||||
await archiveHandler(topic, msg)
|
||||
await syncHandler(topic, msg)
|
||||
await internalHandler(topic, msg)
|
||||
|
||||
# Call the legacy (kernel API) app handler if it exists.
|
||||
# Normally, hasKey is false and the MessageSeenEvent bus (new API) is used instead.
|
||||
# But we need to support legacy behavior (kernel API use), hence this.
|
||||
# NOTE: We can delete `legacyAppHandlers` if instead we refactor WakuRelay to support multiple
|
||||
# PubsubTopic handlers, since that's actually supported by libp2p PubSub (bigger refactor...)
|
||||
if node.legacyAppHandlers.hasKey(topic) and not node.legacyAppHandlers[topic].isNil():
|
||||
await node.legacyAppHandlers[topic](topic, msg)
|
||||
|
||||
node.wakuRelay.subscribe(topic, uniqueTopicHandler)
|
||||
|
||||
proc getTopicOfSubscriptionEvent(
|
||||
node: WakuNode, subscription: SubscriptionEvent
|
||||
): Result[(PubsubTopic, Option[ContentTopic]), string] =
|
||||
@ -147,17 +75,9 @@ proc subscribe*(
|
||||
error "Failed to decode subscription event", error = error
|
||||
return err("Failed to decode subscription event: " & error)
|
||||
|
||||
if node.registerRelayHandler(pubsubTopic, handler):
|
||||
info "subscribe", pubsubTopic, contentTopicOp
|
||||
node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic))
|
||||
else:
|
||||
if isNil(handler):
|
||||
warn "No-effect API call to subscribe. Already subscribed to topic", pubsubTopic
|
||||
else:
|
||||
info "subscribe (was already subscribed in the mesh; appHandler set)",
|
||||
pubsubTopic = pubsubTopic
|
||||
|
||||
return ok()
|
||||
if contentTopicOp.isSome():
|
||||
return node.subscriptionManager.subscribe(pubsubTopic, contentTopicOp.get(), handler)
|
||||
return node.subscriptionManager.subscribeShard(pubsubTopic, handler)
|
||||
|
||||
proc unsubscribe*(
|
||||
node: WakuNode, subscription: SubscriptionEvent
|
||||
@ -174,22 +94,9 @@ proc unsubscribe*(
|
||||
error "Failed to decode unsubscribe event", error = error
|
||||
return err("Failed to decode unsubscribe event: " & error)
|
||||
|
||||
let hadHandler = node.legacyAppHandlers.hasKey(pubsubTopic)
|
||||
if hadHandler:
|
||||
node.legacyAppHandlers.del(pubsubTopic)
|
||||
|
||||
if node.wakuRelay.isSubscribed(pubsubTopic):
|
||||
info "unsubscribe", pubsubTopic, contentTopicOp
|
||||
node.wakuRelay.unsubscribe(pubsubTopic)
|
||||
node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic))
|
||||
else:
|
||||
if not hadHandler:
|
||||
warn "No-effect API call to `unsubscribe`. Was not subscribed", pubsubTopic
|
||||
else:
|
||||
info "unsubscribe (was not subscribed in the mesh; appHandler removed)",
|
||||
pubsubTopic = pubsubTopic
|
||||
|
||||
return ok()
|
||||
if contentTopicOp.isSome():
|
||||
return node.subscriptionManager.unsubscribe(pubsubTopic, contentTopicOp.get())
|
||||
return node.subscriptionManager.unsubscribeAll(pubsubTopic)
|
||||
|
||||
proc isSubscribed*(
|
||||
node: WakuNode, subscription: SubscriptionEvent
|
||||
|
||||
27
waku/node/node_telemetry.nim
Normal file
27
waku/node/node_telemetry.nim
Normal file
@ -0,0 +1,27 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import metrics
|
||||
|
||||
declarePublicGauge waku_version,
|
||||
"Waku version info (in git describe format)", ["version"]
|
||||
|
||||
declarePublicCounter waku_node_errors, "number of wakunode errors", ["type"]
|
||||
|
||||
declarePublicGauge waku_lightpush_peers, "number of lightpush peers"
|
||||
|
||||
declarePublicGauge waku_filter_peers, "number of filter peers"
|
||||
|
||||
declarePublicGauge waku_store_peers, "number of store peers"
|
||||
|
||||
declarePublicGauge waku_px_peers,
|
||||
"number of peers (in the node's peerManager) supporting the peer exchange protocol"
|
||||
|
||||
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
|
||||
|
||||
declarePublicHistogram waku_histogram_message_size,
|
||||
"message size histogram in kB",
|
||||
buckets = [
|
||||
0.0, 1.0, 3.0, 5.0, 15.0, 50.0, 75.0, 100.0, 125.0, 150.0, 500.0, 700.0, 1000.0, Inf
|
||||
]
|
||||
|
||||
{.pop.}
|
||||
113
waku/node/node_types.nim
Normal file
113
waku/node/node_types.nim
Normal file
@ -0,0 +1,113 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[options, tables, sets],
|
||||
chronos,
|
||||
results,
|
||||
eth/keys,
|
||||
bearssl/rand,
|
||||
eth/p2p/discoveryv5/enr,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/[multiaddress, multicodec],
|
||||
libp2p/protocols/ping,
|
||||
libp2p/protocols/mix/mix_protocol,
|
||||
brokers/broker_context
|
||||
|
||||
import
|
||||
waku/[
|
||||
waku_core,
|
||||
waku_relay,
|
||||
waku_archive,
|
||||
waku_store/protocol as store,
|
||||
waku_store/client as store_client,
|
||||
waku_store/resume,
|
||||
waku_store_sync,
|
||||
waku_filter_v2,
|
||||
waku_filter_v2/client as filter_client,
|
||||
waku_metadata,
|
||||
waku_rendezvous/protocol,
|
||||
waku_rendezvous/client as rendezvous_client,
|
||||
waku_lightpush_legacy/client as legacy_lightpush_client,
|
||||
waku_lightpush_legacy as legacy_lightpush_protocol,
|
||||
waku_lightpush/client as lightpush_client,
|
||||
waku_lightpush as lightpush_protocol,
|
||||
waku_peer_exchange,
|
||||
waku_rln_relay,
|
||||
waku_mix,
|
||||
common/rate_limit/setting,
|
||||
discovery/waku_kademlia,
|
||||
net/bound_ports,
|
||||
events/peer_events,
|
||||
],
|
||||
./peer_manager,
|
||||
./health_monitor/topic_health
|
||||
|
||||
# key and crypto modules different
|
||||
type
|
||||
# TODO: Move to application instance (e.g., `WakuNode2`)
|
||||
WakuInfo* = object # NOTE One for simplicity, can extend later as needed
|
||||
listenAddresses*: seq[string]
|
||||
enrUri*: string #multiaddrStrings*: seq[string]
|
||||
mixPubKey*: Option[string]
|
||||
|
||||
# NOTE based on Eth2Node in NBC eth2_network.nim
|
||||
WakuNode* = ref object
|
||||
peerManager*: PeerManager
|
||||
switch*: Switch
|
||||
wakuRelay*: WakuRelay
|
||||
wakuArchive*: waku_archive.WakuArchive
|
||||
wakuStore*: store.WakuStore
|
||||
wakuStoreClient*: store_client.WakuStoreClient
|
||||
wakuStoreResume*: StoreResume
|
||||
wakuStoreReconciliation*: SyncReconciliation
|
||||
wakuStoreTransfer*: SyncTransfer
|
||||
wakuFilter*: waku_filter_v2.WakuFilter
|
||||
wakuFilterClient*: filter_client.WakuFilterClient
|
||||
wakuRlnRelay*: WakuRLNRelay
|
||||
wakuLegacyLightPush*: WakuLegacyLightPush
|
||||
wakuLegacyLightpushClient*: WakuLegacyLightPushClient
|
||||
wakuLightPush*: WakuLightPush
|
||||
wakuLightpushClient*: WakuLightPushClient
|
||||
wakuPeerExchange*: WakuPeerExchange
|
||||
wakuPeerExchangeClient*: WakuPeerExchangeClient
|
||||
wakuMetadata*: WakuMetadata
|
||||
wakuAutoSharding*: Option[Sharding]
|
||||
enr*: enr.Record
|
||||
libp2pPing*: Ping
|
||||
rng*: ref rand.HmacDrbgContext
|
||||
brokerCtx*: BrokerContext
|
||||
wakuRendezvous*: WakuRendezVous
|
||||
wakuRendezvousClient*: rendezvous_client.WakuRendezVousClient
|
||||
announcedAddresses*: seq[MultiAddress]
|
||||
extMultiAddrsOnly*: bool # When true, skip automatic IP address replacement
|
||||
started*: bool # Indicates that node has started listening
|
||||
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
|
||||
rateLimitSettings*: ProtocolRateLimitSettings
|
||||
legacyAppHandlers*: Table[PubsubTopic, WakuRelayHandler]
|
||||
## Kernel API Relay appHandlers (if any)
|
||||
subscriptionManager*: SubscriptionManager
|
||||
wakuMix*: WakuMix
|
||||
kademliaDiscoveryLoop*: Future[void]
|
||||
wakuKademlia*: WakuKademlia
|
||||
ports*: BoundPorts
|
||||
|
||||
ShardSubscription* = object
|
||||
contentTopics*: HashSet[ContentTopic]
|
||||
directShardSub*: bool ## shard subscribed directly (PubsubSub), independent of content-topic interest
|
||||
|
||||
EdgeFilterSubState* = object
|
||||
peers*: seq[RemotePeerInfo]
|
||||
pending*: seq[Future[void]]
|
||||
pendingPeers*: HashSet[PeerId]
|
||||
currentHealth*: TopicHealth
|
||||
|
||||
SubscriptionManager* = ref object of RootObj
|
||||
node*: WakuNode
|
||||
shards*: Table[PubsubTopic, ShardSubscription]
|
||||
edgeFilterSubStates*: Table[PubsubTopic, EdgeFilterSubState]
|
||||
edgeFilterWakeup*: AsyncEvent
|
||||
edgeFilterSubLoopFut*: Future[void]
|
||||
edgeFilterConnectionLoopFut*: Future[void]
|
||||
peerEventListener*: WakuPeerEventListener
|
||||
|
||||
{.pop.}
|
||||
@ -1,18 +1,21 @@
|
||||
import std/[sequtils, sets, tables, options, strutils], chronos, chronicles, results
|
||||
import std/[sequtils, sets, tables, options], chronos, chronicles, metrics, results
|
||||
import libp2p/[peerid, peerinfo]
|
||||
import brokers/broker_context
|
||||
|
||||
import
|
||||
waku/[
|
||||
waku_core,
|
||||
waku_core/topics,
|
||||
waku_core/topics/sharding,
|
||||
waku_node,
|
||||
node/node_types,
|
||||
node/node_telemetry,
|
||||
waku_relay,
|
||||
waku_archive,
|
||||
waku_store_sync,
|
||||
waku_filter_v2/common as filter_common,
|
||||
waku_filter_v2/client as filter_client,
|
||||
waku_filter_v2/protocol as filter_protocol,
|
||||
events/health_events,
|
||||
events/message_events,
|
||||
events/peer_events,
|
||||
requests/health_requests,
|
||||
node/peer_manager,
|
||||
@ -20,126 +23,114 @@ import
|
||||
node/health_monitor/connection_status,
|
||||
]
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Logos Messaging API SubscriptionManager
|
||||
#
|
||||
# Maps all topic subscription intent and centralizes all consistency
|
||||
# maintenance of the pubsub and content topic subscription model across
|
||||
# the various network drivers that handle topics (Edge/Filter and Core/Relay).
|
||||
# ---------------------------------------------------------------------------
|
||||
{.push raises: [].}
|
||||
|
||||
type EdgeFilterSubState* = object
|
||||
peers: seq[RemotePeerInfo]
|
||||
## Filter service peers with confirmed subscriptions on this shard.
|
||||
pending: seq[Future[void]] ## In-flight dial futures for peers not yet confirmed.
|
||||
pendingPeers: HashSet[PeerId] ## PeerIds of peers currently being dialed.
|
||||
currentHealth: TopicHealth
|
||||
## Cached health derived from peers.len; updated on every peer set change.
|
||||
proc doRelaySubscribe(
|
||||
node: WakuNode, shard: PubsubTopic, appHandler: WakuRelayHandler = nil
|
||||
): bool =
|
||||
let alreadySubscribed = node.wakuRelay.isSubscribed(shard)
|
||||
|
||||
if not appHandler.isNil():
|
||||
if not alreadySubscribed or not node.legacyAppHandlers.hasKey(shard):
|
||||
node.legacyAppHandlers[shard] = appHandler
|
||||
else:
|
||||
debug "Legacy appHandler already exists for active PubsubTopic, ignoring new handler",
|
||||
topic = shard
|
||||
|
||||
if alreadySubscribed:
|
||||
return false
|
||||
|
||||
proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||
let msgSizeKB = msg.payload.len / 1000
|
||||
|
||||
waku_node_messages.inc(labelValues = ["relay"])
|
||||
waku_histogram_message_size.observe(msgSizeKB)
|
||||
|
||||
proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||
if node.wakuFilter.isNil():
|
||||
return
|
||||
|
||||
await node.wakuFilter.handleMessage(topic, msg)
|
||||
|
||||
proc archiveHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||
if node.wakuArchive.isNil():
|
||||
return
|
||||
|
||||
await node.wakuArchive.handleMessage(topic, msg)
|
||||
|
||||
proc syncHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||
if node.wakuStoreReconciliation.isNil():
|
||||
return
|
||||
|
||||
node.wakuStoreReconciliation.messageIngress(topic, msg)
|
||||
|
||||
proc internalHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||
MessageSeenEvent.emit(node.brokerCtx, topic, msg)
|
||||
|
||||
let uniqueTopicHandler = proc(
|
||||
topic: PubsubTopic, msg: WakuMessage
|
||||
): Future[void] {.async, gcsafe.} =
|
||||
await traceHandler(topic, msg)
|
||||
await filterHandler(topic, msg)
|
||||
await archiveHandler(topic, msg)
|
||||
await syncHandler(topic, msg)
|
||||
await internalHandler(topic, msg)
|
||||
|
||||
if node.legacyAppHandlers.hasKey(topic) and not node.legacyAppHandlers[topic].isNil():
|
||||
await node.legacyAppHandlers[topic](topic, msg)
|
||||
|
||||
node.wakuRelay.subscribe(shard, uniqueTopicHandler)
|
||||
node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: shard))
|
||||
return true
|
||||
|
||||
proc doRelayUnsubscribe(node: WakuNode, shard: PubsubTopic) =
|
||||
if node.legacyAppHandlers.hasKey(shard):
|
||||
node.legacyAppHandlers.del(shard)
|
||||
|
||||
if node.wakuRelay.isSubscribed(shard):
|
||||
node.wakuRelay.unsubscribe(shard)
|
||||
node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: shard))
|
||||
|
||||
proc new*(T: type SubscriptionManager, node: WakuNode): T =
|
||||
T(
|
||||
node: node,
|
||||
shards: initTable[PubsubTopic, ShardSubscription](),
|
||||
edgeFilterSubStates: initTable[PubsubTopic, EdgeFilterSubState](),
|
||||
edgeFilterWakeup: newAsyncEvent(),
|
||||
)
|
||||
|
||||
func wanted(entry: ShardSubscription): bool =
|
||||
## True if the shard has content-topic interest or a direct subscription.
|
||||
return entry.contentTopics.len > 0 or entry.directShardSub
|
||||
|
||||
proc isContentSubscribed*(
|
||||
self: SubscriptionManager, shard: PubsubTopic, contentTopic: ContentTopic
|
||||
): bool =
|
||||
self.shards.withValue(shard, sub):
|
||||
return contentTopic in sub.contentTopics
|
||||
return false
|
||||
|
||||
iterator subscribedContentTopics*(
|
||||
self: SubscriptionManager
|
||||
): (PubsubTopic, HashSet[ContentTopic]) =
|
||||
## Yields each shard with its non-empty content-topic set.
|
||||
for shard, sub in self.shards.pairs:
|
||||
if sub.contentTopics.len > 0:
|
||||
yield (shard, sub.contentTopics)
|
||||
|
||||
func toTopicHealth*(peersCount: int): TopicHealth =
|
||||
if peersCount >= HealthyThreshold:
|
||||
TopicHealth.SUFFICIENTLY_HEALTHY
|
||||
return TopicHealth.SUFFICIENTLY_HEALTHY
|
||||
elif peersCount > 0:
|
||||
TopicHealth.MINIMALLY_HEALTHY
|
||||
return TopicHealth.MINIMALLY_HEALTHY
|
||||
else:
|
||||
TopicHealth.UNHEALTHY
|
||||
return TopicHealth.UNHEALTHY
|
||||
|
||||
type SubscriptionManager* = ref object of RootObj
|
||||
node: WakuNode
|
||||
contentTopicSubs: Table[PubsubTopic, HashSet[ContentTopic]]
|
||||
## Map of Shard to ContentTopic needed because e.g. WakuRelay is PubsubTopic only.
|
||||
## A present key with an empty HashSet value means pubsubtopic already subscribed
|
||||
## (via subscribePubsubTopics()) but there's no specific content topic interest yet.
|
||||
edgeFilterSubStates*: Table[PubsubTopic, EdgeFilterSubState]
|
||||
## Per-shard filter subscription state for edge mode.
|
||||
edgeFilterWakeup: AsyncEvent
|
||||
## Signalled when the edge filter sub loop should re-reconcile.
|
||||
edgeFilterSubLoopFut: Future[void]
|
||||
edgeFilterHealthLoopFut: Future[void]
|
||||
peerEventListener: WakuPeerEventListener
|
||||
## Listener for peer connect/disconnect events (edge filter wakeup).
|
||||
|
||||
iterator subscribedTopics*(
|
||||
self: SubscriptionManager
|
||||
): (PubsubTopic, HashSet[ContentTopic]) =
|
||||
## Iterate over all subscribed content topics, batched per shard.
|
||||
## This is guaranteed to return a non-empty `topics` (content topics) list on iteration.
|
||||
|
||||
for pubsub, topics in self.contentTopicSubs.pairs:
|
||||
# We are iterating over subscribed content topics; if we are subscribed to
|
||||
# a shard but have no subscription (interest) for any content topic in that
|
||||
# shard, then avoid triggering an iteration that doesn't advance the intent
|
||||
# to iterate over content topic subscriptions.
|
||||
if topics.len == 0:
|
||||
continue
|
||||
yield (pubsub, topics)
|
||||
|
||||
proc edgeFilterPeerCount*(sm: SubscriptionManager, shard: PubsubTopic): int =
|
||||
sm.edgeFilterSubStates.withValue(shard, state):
|
||||
proc edgeFilterPeerCount*(self: SubscriptionManager, shard: PubsubTopic): int =
|
||||
self.edgeFilterSubStates.withValue(shard, state):
|
||||
return state.peers.len
|
||||
return 0
|
||||
|
||||
proc new*(T: typedesc[SubscriptionManager], node: WakuNode): T =
|
||||
SubscriptionManager(
|
||||
node: node, contentTopicSubs: initTable[PubsubTopic, HashSet[ContentTopic]]()
|
||||
)
|
||||
|
||||
proc addContentTopicInterest(
|
||||
self: SubscriptionManager, shard: PubsubTopic, topic: ContentTopic
|
||||
): Result[void, string] =
|
||||
var changed = false
|
||||
if not self.contentTopicSubs.hasKey(shard):
|
||||
self.contentTopicSubs[shard] = initHashSet[ContentTopic]()
|
||||
changed = true
|
||||
|
||||
self.contentTopicSubs.withValue(shard, cTopics):
|
||||
if not cTopics[].contains(topic):
|
||||
cTopics[].incl(topic)
|
||||
changed = true
|
||||
|
||||
if changed and not isNil(self.edgeFilterWakeup):
|
||||
self.edgeFilterWakeup.fire()
|
||||
|
||||
return ok()
|
||||
|
||||
proc removeContentTopicInterest(
|
||||
self: SubscriptionManager, shard: PubsubTopic, topic: ContentTopic
|
||||
): Result[void, string] =
|
||||
var changed = false
|
||||
self.contentTopicSubs.withValue(shard, cTopics):
|
||||
if cTopics[].contains(topic):
|
||||
cTopics[].excl(topic)
|
||||
changed = true
|
||||
|
||||
if cTopics[].len == 0 and isNil(self.node.wakuRelay):
|
||||
self.contentTopicSubs.del(shard) # We're done with cTopics here
|
||||
|
||||
if changed and not isNil(self.edgeFilterWakeup):
|
||||
self.edgeFilterWakeup.fire()
|
||||
|
||||
return ok()
|
||||
|
||||
proc subscribePubsubTopics(
|
||||
self: SubscriptionManager, shards: seq[PubsubTopic]
|
||||
): Result[void, string] =
|
||||
if isNil(self.node.wakuRelay):
|
||||
return err("subscribePubsubTopics requires a Relay")
|
||||
|
||||
var errors: seq[string]
|
||||
|
||||
for shard in shards:
|
||||
if not self.contentTopicSubs.hasKey(shard):
|
||||
self.node.subscribe((kind: PubsubSub, topic: shard), nil).isOkOr:
|
||||
errors.add("shard " & shard & ": " & error)
|
||||
continue
|
||||
|
||||
self.contentTopicSubs[shard] = initHashSet[ContentTopic]()
|
||||
|
||||
if errors.len > 0:
|
||||
return err("subscribeShard errors: " & errors.join("; "))
|
||||
|
||||
return ok()
|
||||
|
||||
proc getShardForContentTopic(
|
||||
self: SubscriptionManager, topic: ContentTopic
|
||||
): Result[PubsubTopic, string] =
|
||||
@ -147,55 +138,138 @@ proc getShardForContentTopic(
|
||||
let shardObj = ?self.node.wakuAutoSharding.get().getShard(topic)
|
||||
return ok($shardObj)
|
||||
|
||||
return err("SubscriptionManager requires AutoSharding")
|
||||
return err("autosharding is not configured; pass an explicit shard")
|
||||
|
||||
proc subscribeShard*(
|
||||
self: SubscriptionManager, shard: PubsubTopic, handler: WakuRelayHandler = nil
|
||||
): Result[void, string] =
|
||||
## Subscribes to the shard directly and joins the relay mesh.
|
||||
var added = false
|
||||
self.shards.withValue(shard, entry):
|
||||
if not entry.directShardSub:
|
||||
entry.directShardSub = true
|
||||
added = true
|
||||
do:
|
||||
self.shards[shard] =
|
||||
ShardSubscription(contentTopics: initHashSet[ContentTopic](), directShardSub: true)
|
||||
added = true
|
||||
if added:
|
||||
self.edgeFilterWakeup.fire()
|
||||
if not isNil(self.node.wakuRelay):
|
||||
discard self.node.doRelaySubscribe(shard, handler)
|
||||
return ok()
|
||||
|
||||
proc unsubscribeShard*(
|
||||
self: SubscriptionManager, shard: PubsubTopic
|
||||
): Result[void, string] =
|
||||
## Drops the direct shard subscription; unsubscribes the mesh if no content topic wants it.
|
||||
var removed = false
|
||||
var shardEmpty = false
|
||||
self.shards.withValue(shard, entry):
|
||||
if entry.directShardSub:
|
||||
entry.directShardSub = false
|
||||
removed = true
|
||||
shardEmpty = not entry[].wanted()
|
||||
if removed:
|
||||
self.edgeFilterWakeup.fire()
|
||||
if shardEmpty:
|
||||
self.shards.del(shard)
|
||||
if not isNil(self.node.wakuRelay):
|
||||
self.node.doRelayUnsubscribe(shard)
|
||||
return ok()
|
||||
|
||||
proc subscribe*(
|
||||
self: SubscriptionManager,
|
||||
shard: PubsubTopic,
|
||||
contentTopic: ContentTopic,
|
||||
handler: WakuRelayHandler = nil,
|
||||
): Result[void, string] =
|
||||
## Adds content-topic interest on the shard and joins the relay mesh.
|
||||
var added = false
|
||||
self.shards.withValue(shard, entry):
|
||||
if contentTopic notin entry.contentTopics:
|
||||
entry.contentTopics.incl(contentTopic)
|
||||
added = true
|
||||
do:
|
||||
var entry = ShardSubscription(contentTopics: initHashSet[ContentTopic]())
|
||||
entry.contentTopics.incl(contentTopic)
|
||||
self.shards[shard] = entry
|
||||
added = true
|
||||
if added:
|
||||
self.edgeFilterWakeup.fire()
|
||||
if not isNil(self.node.wakuRelay):
|
||||
discard self.node.doRelaySubscribe(shard, handler)
|
||||
return ok()
|
||||
|
||||
proc unsubscribe*(
|
||||
self: SubscriptionManager, shard: PubsubTopic, contentTopic: ContentTopic
|
||||
): Result[void, string] =
|
||||
## Drops content-topic interest on the shard; unsubscribes the mesh if nothing else wants it.
|
||||
var removed = false
|
||||
var shardEmpty = false
|
||||
self.shards.withValue(shard, entry):
|
||||
if contentTopic in entry.contentTopics:
|
||||
entry.contentTopics.excl(contentTopic)
|
||||
removed = true
|
||||
shardEmpty = not entry[].wanted()
|
||||
if removed:
|
||||
self.edgeFilterWakeup.fire()
|
||||
if shardEmpty:
|
||||
self.shards.del(shard)
|
||||
if not isNil(self.node.wakuRelay):
|
||||
self.node.doRelayUnsubscribe(shard)
|
||||
return ok()
|
||||
|
||||
proc subscribe*(
|
||||
self: SubscriptionManager, topic: ContentTopic
|
||||
): Result[void, string] =
|
||||
## Subscribes to a content topic, resolving its shard via autosharding.
|
||||
let shard = ?self.getShardForContentTopic(topic)
|
||||
return self.subscribe(shard, topic)
|
||||
|
||||
proc unsubscribe*(
|
||||
self: SubscriptionManager, topic: ContentTopic
|
||||
): Result[void, string] =
|
||||
## Unsubscribes from a content topic, resolving its shard via autosharding.
|
||||
let shard = ?self.getShardForContentTopic(topic)
|
||||
return self.unsubscribe(shard, topic)
|
||||
|
||||
proc unsubscribeAll*(
|
||||
self: SubscriptionManager, shard: PubsubTopic
|
||||
): Result[void, string] =
|
||||
## Drops every content topic on the shard, then the direct subscription.
|
||||
var snapshot: seq[ContentTopic]
|
||||
self.shards.withValue(shard, sub):
|
||||
snapshot = toSeq(sub.contentTopics)
|
||||
for contentTopic in snapshot:
|
||||
?self.unsubscribe(shard, contentTopic)
|
||||
return self.unsubscribeShard(shard)
|
||||
|
||||
proc isSubscribed*(
|
||||
self: SubscriptionManager, topic: ContentTopic
|
||||
): Result[bool, string] =
|
||||
let shard = ?self.getShardForContentTopic(topic)
|
||||
return ok(
|
||||
self.contentTopicSubs.hasKey(shard) and self.contentTopicSubs[shard].contains(topic)
|
||||
)
|
||||
return ok(self.isContentSubscribed(shard, topic))
|
||||
|
||||
proc isSubscribed*(
|
||||
self: SubscriptionManager, shard: PubsubTopic, contentTopic: ContentTopic
|
||||
): bool {.raises: [].} =
|
||||
self.contentTopicSubs.withValue(shard, cTopics):
|
||||
return cTopics[].contains(contentTopic)
|
||||
return false
|
||||
proc subscribeAllAutoshards*(self: SubscriptionManager): Result[void, string] =
|
||||
## Subscribes the relay to every shard in the configured autosharding cluster.
|
||||
if self.node.wakuRelay.isNil() or self.node.wakuAutoSharding.isNone():
|
||||
return ok()
|
||||
|
||||
proc subscribe*(self: SubscriptionManager, topic: ContentTopic): Result[void, string] =
|
||||
if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient):
|
||||
return err("SubscriptionManager requires either Relay or Filter Client.")
|
||||
let autoSharding = self.node.wakuAutoSharding.get()
|
||||
let numShards = autoSharding.shardCountGenZero
|
||||
if numShards == 0:
|
||||
return ok()
|
||||
|
||||
let shard = ?self.getShardForContentTopic(topic)
|
||||
for i in 0'u32 ..< numShards:
|
||||
let shardObj = RelayShard(clusterId: autoSharding.clusterId, shardId: uint16(i))
|
||||
self.subscribeShard(PubsubTopic($shardObj)).isOkOr:
|
||||
error "failed to auto-subscribe relay to cluster shard",
|
||||
shard = $shardObj, error = error
|
||||
|
||||
if not isNil(self.node.wakuRelay) and not self.contentTopicSubs.hasKey(shard):
|
||||
?self.subscribePubsubTopics(@[shard])
|
||||
ok()
|
||||
|
||||
?self.addContentTopicInterest(shard, topic)
|
||||
|
||||
return ok()
|
||||
|
||||
proc unsubscribe*(
|
||||
self: SubscriptionManager, topic: ContentTopic
|
||||
): Result[void, string] =
|
||||
if isNil(self.node.wakuRelay) and isNil(self.node.wakuFilterClient):
|
||||
return err("SubscriptionManager requires either Relay or Filter Client.")
|
||||
|
||||
let shard = ?self.getShardForContentTopic(topic)
|
||||
|
||||
if self.isSubscribed(shard, topic):
|
||||
?self.removeContentTopicInterest(shard, topic)
|
||||
|
||||
return ok()
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Edge Filter driver for the Logos Messaging API
|
||||
#
|
||||
# The SubscriptionManager absorbs natively the responsibility of using the
|
||||
# Edge Filter protocol to effect subscriptions and message receipt for edge.
|
||||
# ---------------------------------------------------------------------------
|
||||
{.pop.}
|
||||
|
||||
const EdgeFilterSubscribeTimeout = chronos.seconds(15)
|
||||
## Timeout for a single filter subscribe/unsubscribe RPC to a service peer.
|
||||
@ -225,23 +299,22 @@ proc removePeer(self: SubscriptionManager, shard: PubsubTopic, peerId: PeerId) =
|
||||
## update health, and wake the sub loop to dial a replacement.
|
||||
## Best-effort unsubscribe so the service peer stops pushing to us.
|
||||
self.edgeFilterSubStates.withValue(shard, state):
|
||||
var peer: RemotePeerInfo
|
||||
var found = false
|
||||
for p in state.peers:
|
||||
var idx = -1
|
||||
for i, p in state.peers:
|
||||
if p.peerId == peerId:
|
||||
peer = p
|
||||
found = true
|
||||
idx = i
|
||||
break
|
||||
if not found:
|
||||
if idx < 0:
|
||||
return
|
||||
|
||||
state.peers.keepItIf(it.peerId != peerId)
|
||||
let peer = state.peers[idx]
|
||||
state.peers.del(idx)
|
||||
self.updateShardHealth(shard, state[])
|
||||
self.edgeFilterWakeup.fire()
|
||||
|
||||
if not self.node.wakuFilterClient.isNil():
|
||||
self.contentTopicSubs.withValue(shard, topics):
|
||||
let ct = toSeq(topics[])
|
||||
self.shards.withValue(shard, sub):
|
||||
let ct = toSeq(sub.contentTopics)
|
||||
if ct.len > 0:
|
||||
proc doUnsubscribe() {.async.} =
|
||||
discard await self.node.wakuFilterClient.unsubscribe(peer, shard, ct)
|
||||
@ -331,14 +404,14 @@ proc dialFilterPeer(
|
||||
self.edgeFilterSubStates.withValue(shard, state):
|
||||
state.pendingPeers.excl(peer.peerId)
|
||||
|
||||
proc edgeFilterHealthLoop*(self: SubscriptionManager) {.async.} =
|
||||
## Periodically pings all connected filter service peers to verify they are
|
||||
proc edgeFilterConnectionLoop(self: SubscriptionManager) {.async.} =
|
||||
## Periodically pings all tracked filter service peers to verify they are
|
||||
## still alive at the application layer. Peers that fail the ping are removed.
|
||||
while true:
|
||||
await sleepAsync(EdgeFilterLoopInterval)
|
||||
|
||||
if self.node.wakuFilterClient.isNil():
|
||||
warn "filter client is nil within edge filter health loop"
|
||||
warn "filter client is nil within edge filter connection loop"
|
||||
continue
|
||||
|
||||
var connected = initTable[PeerId, RemotePeerInfo]()
|
||||
@ -356,7 +429,6 @@ proc edgeFilterHealthLoop*(self: SubscriptionManager) {.async.} =
|
||||
(peer.peerId, self.node.wakuFilterClient.ping(peer, EdgeFilterPingTimeout))
|
||||
)
|
||||
|
||||
# extract future tasks from (PeerId, Future) tuples and await them
|
||||
await allFutures(pingTasks.mapIt(it[1]))
|
||||
|
||||
for (peerId, task) in pingTasks:
|
||||
@ -407,7 +479,7 @@ proc selectFilterCandidates(
|
||||
candidates.setLen(needed)
|
||||
return candidates
|
||||
|
||||
proc edgeFilterSubLoop*(self: SubscriptionManager) {.async.} =
|
||||
proc edgeFilterSubLoop(self: SubscriptionManager) {.async.} =
|
||||
## Reconciles filter subscriptions with the desired state from SubscriptionManager.
|
||||
var lastSynced = initTable[PubsubTopic, HashSet[ContentTopic]]()
|
||||
|
||||
@ -421,11 +493,16 @@ proc edgeFilterSubLoop*(self: SubscriptionManager) {.async.} =
|
||||
trace "edgeFilterSubLoop: wakuFilterClient is nil, skipping"
|
||||
continue
|
||||
|
||||
let desired = self.contentTopicSubs
|
||||
var newSynced = initTable[PubsubTopic, HashSet[ContentTopic]]()
|
||||
var allShards: HashSet[PubsubTopic]
|
||||
for shard, sub in self.shards.pairs:
|
||||
if sub.contentTopics.len > 0:
|
||||
newSynced[shard] = sub.contentTopics
|
||||
allShards.incl(shard)
|
||||
for shard in lastSynced.keys:
|
||||
allShards.incl(shard)
|
||||
|
||||
trace "edgeFilterSubLoop: desired state", numShards = desired.len
|
||||
|
||||
let allShards = toHashSet(toSeq(desired.keys)) + toHashSet(toSeq(lastSynced.keys))
|
||||
trace "edgeFilterSubLoop: desired state", numShards = newSynced.len
|
||||
|
||||
# Step 1: read state across all shards at once and
|
||||
# create a list of peer dial tasks and shard tracking to delete.
|
||||
@ -434,15 +511,28 @@ proc edgeFilterSubLoop*(self: SubscriptionManager) {.async.} =
|
||||
var shardsToDelete: seq[PubsubTopic]
|
||||
|
||||
for shard in allShards:
|
||||
let currTopics = desired.getOrDefault(shard)
|
||||
let prevTopics = lastSynced.getOrDefault(shard)
|
||||
# Compute added/removed deltas via direct iteration; no HashSet copies.
|
||||
var addedTopics: seq[ContentTopic]
|
||||
var removedTopics: seq[ContentTopic]
|
||||
newSynced.withValue(shard, curr):
|
||||
lastSynced.withValue(shard, prev):
|
||||
for t in curr[]:
|
||||
if t notin prev[]:
|
||||
addedTopics.add(t)
|
||||
for t in prev[]:
|
||||
if t notin curr[]:
|
||||
removedTopics.add(t)
|
||||
do:
|
||||
for t in curr[]:
|
||||
addedTopics.add(t)
|
||||
do:
|
||||
lastSynced.withValue(shard, prev):
|
||||
for t in prev[]:
|
||||
removedTopics.add(t)
|
||||
|
||||
if shard notin self.edgeFilterSubStates:
|
||||
self.edgeFilterSubStates[shard] =
|
||||
EdgeFilterSubState(currentHealth: TopicHealth.UNHEALTHY)
|
||||
|
||||
let addedTopics = toSeq(currTopics - prevTopics)
|
||||
let removedTopics = toSeq(prevTopics - currTopics)
|
||||
discard self.edgeFilterSubStates.mgetOrPut(
|
||||
shard, EdgeFilterSubState(currentHealth: TopicHealth.UNHEALTHY)
|
||||
)
|
||||
|
||||
self.edgeFilterSubStates.withValue(shard, state):
|
||||
state.peers.keepItIf(
|
||||
@ -454,7 +544,7 @@ proc edgeFilterSubLoop*(self: SubscriptionManager) {.async.} =
|
||||
for peer in state.peers:
|
||||
asyncSpawn self.syncFilterDeltas(peer, shard, addedTopics, removedTopics)
|
||||
|
||||
if currTopics.len == 0:
|
||||
if shard notin newSynced:
|
||||
shardsToDelete.add(shard)
|
||||
else:
|
||||
self.updateShardHealth(shard, state[])
|
||||
@ -462,7 +552,11 @@ proc edgeFilterSubLoop*(self: SubscriptionManager) {.async.} =
|
||||
let needed = max(0, HealthyThreshold - state.peers.len - state.pending.len)
|
||||
|
||||
if needed > 0:
|
||||
let tracked = state.peers.mapIt(it.peerId).toHashSet() + state.pendingPeers
|
||||
var tracked: HashSet[PeerId]
|
||||
for p in state.peers:
|
||||
tracked.incl(p.peerId)
|
||||
for p in state.pendingPeers:
|
||||
tracked.incl(p)
|
||||
let candidates = self.selectFilterCandidates(shard, tracked, needed)
|
||||
let toDial = min(needed, candidates.len)
|
||||
|
||||
@ -474,11 +568,13 @@ proc edgeFilterSubLoop*(self: SubscriptionManager) {.async.} =
|
||||
num_available = candidates.len,
|
||||
toDial = toDial
|
||||
|
||||
var dialTopics: seq[ContentTopic]
|
||||
newSynced.withValue(shard, curr):
|
||||
dialTopics = toSeq(curr[])
|
||||
|
||||
for i in 0 ..< toDial:
|
||||
dialTasks.add(
|
||||
EdgeDialTask(
|
||||
peer: candidates[i], shard: shard, topics: toSeq(currTopics)
|
||||
)
|
||||
EdgeDialTask(peer: candidates[i], shard: shard, topics: dialTopics)
|
||||
)
|
||||
|
||||
# Step 2: execute deferred shard tracking deletion and dial tasks.
|
||||
@ -495,13 +591,11 @@ proc edgeFilterSubLoop*(self: SubscriptionManager) {.async.} =
|
||||
self.edgeFilterSubStates.withValue(task.shard, state):
|
||||
state.pending.add(fut)
|
||||
|
||||
lastSynced = desired
|
||||
lastSynced = newSynced
|
||||
|
||||
proc startEdgeFilterLoops(self: SubscriptionManager): Result[void, string] =
|
||||
## Start the edge filter orchestration loops.
|
||||
## Caller must ensure this is only called in edge mode (relay nil, filter client present).
|
||||
self.edgeFilterWakeup = newAsyncEvent()
|
||||
|
||||
self.peerEventListener = WakuPeerEvent.listen(
|
||||
self.node.brokerCtx,
|
||||
proc(evt: WakuPeerEvent) {.async: (raises: []), gcsafe.} =
|
||||
@ -513,7 +607,7 @@ proc startEdgeFilterLoops(self: SubscriptionManager): Result[void, string] =
|
||||
return err("Failed to listen to peer events for edge filter: " & error)
|
||||
|
||||
self.edgeFilterSubLoopFut = self.edgeFilterSubLoop()
|
||||
self.edgeFilterHealthLoopFut = self.edgeFilterHealthLoop()
|
||||
self.edgeFilterConnectionLoopFut = self.edgeFilterConnectionLoop()
|
||||
return ok()
|
||||
|
||||
proc stopEdgeFilterLoops(self: SubscriptionManager) {.async: (raises: []).} =
|
||||
@ -522,9 +616,9 @@ proc stopEdgeFilterLoops(self: SubscriptionManager) {.async: (raises: []).} =
|
||||
await self.edgeFilterSubLoopFut.cancelAndWait()
|
||||
self.edgeFilterSubLoopFut = nil
|
||||
|
||||
if not isNil(self.edgeFilterHealthLoopFut):
|
||||
await self.edgeFilterHealthLoopFut.cancelAndWait()
|
||||
self.edgeFilterHealthLoopFut = nil
|
||||
if not isNil(self.edgeFilterConnectionLoopFut):
|
||||
await self.edgeFilterConnectionLoopFut.cancelAndWait()
|
||||
self.edgeFilterConnectionLoopFut = nil
|
||||
|
||||
for shard, state in self.edgeFilterSubStates:
|
||||
for fut in state.pending:
|
||||
@ -533,18 +627,7 @@ proc stopEdgeFilterLoops(self: SubscriptionManager) {.async: (raises: []).} =
|
||||
|
||||
await WakuPeerEvent.dropListener(self.node.brokerCtx, self.peerEventListener)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SubscriptionManager Lifecycle (calls Edge behavior above)
|
||||
#
|
||||
# startSubscriptionManager and stopSubscriptionManager orchestrate both the
|
||||
# core (relay) and edge (filter) paths, and register/clear broker providers.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
proc startSubscriptionManager*(self: SubscriptionManager): Result[void, string] =
|
||||
# Register edge filter broker providers. The shard/content health providers
|
||||
# in WakuNode query these via the broker as a fallback when relay health is
|
||||
# not available. If edge mode is not active, these providers simply return
|
||||
# NOT_SUBSCRIBED / strength 0, which is harmless.
|
||||
proc start*(self: SubscriptionManager): Result[void, string] =
|
||||
RequestEdgeShardHealth.setProvider(
|
||||
self.node.brokerCtx,
|
||||
proc(shard: PubsubTopic): Result[RequestEdgeShardHealth, string] =
|
||||
@ -566,31 +649,18 @@ proc startSubscriptionManager*(self: SubscriptionManager): Result[void, string]
|
||||
).isOkOr:
|
||||
error "Can't set provider for RequestEdgeFilterPeerCount", error = error
|
||||
|
||||
# Start Edge workers if node is in Edge mode (which is
|
||||
# currently mutually-exclusive with relay being mounted).
|
||||
if self.node.wakuRelay.isNil():
|
||||
return self.startEdgeFilterLoops()
|
||||
|
||||
# Core mode: auto-subscribe relay to all shards in autosharding.
|
||||
if self.node.wakuAutoSharding.isSome():
|
||||
let autoSharding = self.node.wakuAutoSharding.get()
|
||||
let clusterId = autoSharding.clusterId
|
||||
let numShards = autoSharding.shardCountGenZero
|
||||
|
||||
if numShards > 0:
|
||||
var clusterPubsubTopics = newSeqOfCap[PubsubTopic](numShards)
|
||||
|
||||
for i in 0 ..< numShards:
|
||||
let shardObj = RelayShard(clusterId: clusterId, shardId: uint16(i))
|
||||
clusterPubsubTopics.add(PubsubTopic($shardObj))
|
||||
|
||||
self.subscribePubsubTopics(clusterPubsubTopics).isOkOr:
|
||||
error "Failed to auto-subscribe Relay to cluster shards: ", error = error
|
||||
else:
|
||||
info "SubscriptionManager has no AutoSharding configured; skipping auto-subscribe."
|
||||
|
||||
return ok()
|
||||
|
||||
proc stopSubscriptionManager*(self: SubscriptionManager) {.async: (raises: []).} =
|
||||
proc stop*(self: SubscriptionManager) {.async: (raises: []).} =
|
||||
# Stop Edge workers if node is in Edge mode (which is
|
||||
# currently mutually-exclusive with relay being mounted).
|
||||
if self.node.wakuRelay.isNil():
|
||||
await self.stopEdgeFilterLoops()
|
||||
|
||||
RequestEdgeShardHealth.clearProvider(self.node.brokerCtx)
|
||||
RequestEdgeFilterPeerCount.clearProvider(self.node.brokerCtx)
|
||||
@ -4,6 +4,7 @@ import chronicles, chronos, metrics, metrics/chronos_httpserver
|
||||
import
|
||||
waku/[net/auto_port, waku_rln_relay/protocol_metrics as rln_metrics, utils/collector],
|
||||
./peer_manager,
|
||||
./node_telemetry,
|
||||
./waku_node
|
||||
|
||||
const LogInterval = 10.minutes
|
||||
|
||||
@ -60,23 +60,14 @@ import
|
||||
requests/health_requests,
|
||||
events/health_events,
|
||||
events/message_events,
|
||||
events/peer_events,
|
||||
],
|
||||
waku/discovery/waku_kademlia,
|
||||
waku/net/[bound_ports, net_config],
|
||||
./peer_manager,
|
||||
./health_monitor/health_status,
|
||||
./health_monitor/topic_health
|
||||
|
||||
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
|
||||
|
||||
declarePublicGauge waku_version,
|
||||
"Waku version info (in git describe format)", ["version"]
|
||||
declarePublicCounter waku_node_errors, "number of wakunode errors", ["type"]
|
||||
declarePublicGauge waku_lightpush_peers, "number of lightpush peers"
|
||||
declarePublicGauge waku_filter_peers, "number of filter peers"
|
||||
declarePublicGauge waku_store_peers, "number of store peers"
|
||||
declarePublicGauge waku_px_peers,
|
||||
"number of peers (in the node's peerManager) supporting the peer exchange protocol"
|
||||
./health_monitor/topic_health,
|
||||
./node_telemetry
|
||||
|
||||
logScope:
|
||||
topics = "waku node"
|
||||
@ -94,53 +85,10 @@ const clientId* = "Nimbus Waku v2 node"
|
||||
|
||||
const WakuNodeVersionString* = "version / git commit hash: " & git_version
|
||||
|
||||
# key and crypto modules different
|
||||
type
|
||||
# TODO: Move to application instance (e.g., `WakuNode2`)
|
||||
WakuInfo* = object # NOTE One for simplicity, can extend later as needed
|
||||
listenAddresses*: seq[string]
|
||||
enrUri*: string #multiaddrStrings*: seq[string]
|
||||
mixPubKey*: Option[string]
|
||||
import ./node_types
|
||||
export node_types
|
||||
|
||||
# NOTE based on Eth2Node in NBC eth2_network.nim
|
||||
WakuNode* = ref object
|
||||
peerManager*: PeerManager
|
||||
switch*: Switch
|
||||
wakuRelay*: WakuRelay
|
||||
wakuArchive*: waku_archive.WakuArchive
|
||||
wakuStore*: store.WakuStore
|
||||
wakuStoreClient*: store_client.WakuStoreClient
|
||||
wakuStoreResume*: StoreResume
|
||||
wakuStoreReconciliation*: SyncReconciliation
|
||||
wakuStoreTransfer*: SyncTransfer
|
||||
wakuFilter*: waku_filter_v2.WakuFilter
|
||||
wakuFilterClient*: filter_client.WakuFilterClient
|
||||
wakuRlnRelay*: WakuRLNRelay
|
||||
wakuLegacyLightPush*: WakuLegacyLightPush
|
||||
wakuLegacyLightpushClient*: WakuLegacyLightPushClient
|
||||
wakuLightPush*: WakuLightPush
|
||||
wakuLightpushClient*: WakuLightPushClient
|
||||
wakuPeerExchange*: WakuPeerExchange
|
||||
wakuPeerExchangeClient*: WakuPeerExchangeClient
|
||||
wakuMetadata*: WakuMetadata
|
||||
wakuAutoSharding*: Option[Sharding]
|
||||
enr*: enr.Record
|
||||
libp2pPing*: Ping
|
||||
rng*: ref rand.HmacDrbgContext
|
||||
brokerCtx*: BrokerContext
|
||||
wakuRendezvous*: WakuRendezVous
|
||||
wakuRendezvousClient*: rendezvous_client.WakuRendezVousClient
|
||||
announcedAddresses*: seq[MultiAddress]
|
||||
extMultiAddrsOnly*: bool # When true, skip automatic IP address replacement
|
||||
started*: bool # Indicates that node has started listening
|
||||
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
|
||||
rateLimitSettings*: ProtocolRateLimitSettings
|
||||
legacyAppHandlers*: Table[PubsubTopic, WakuRelayHandler]
|
||||
## Kernel API Relay appHandlers (if any)
|
||||
wakuMix*: WakuMix
|
||||
kademliaDiscoveryLoop*: Future[void]
|
||||
wakuKademlia*: WakuKademlia
|
||||
ports*: BoundPorts
|
||||
import ./subscription_manager
|
||||
|
||||
proc deduceRelayShard(
|
||||
node: WakuNode,
|
||||
@ -230,6 +178,8 @@ proc new*(
|
||||
|
||||
peerManager.setShardGetter(node.getShardsGetter(@[]))
|
||||
|
||||
node.subscriptionManager = SubscriptionManager.new(node)
|
||||
|
||||
return node
|
||||
|
||||
proc peerInfo*(node: WakuNode): PeerInfo =
|
||||
@ -600,6 +550,9 @@ proc start*(node: WakuNode) {.async.} =
|
||||
|
||||
node.startProvidersAndListeners()
|
||||
|
||||
node.subscriptionManager.start().isOkOr:
|
||||
error "failed to start subscription manager", error = error
|
||||
|
||||
if not zeroPortPresent:
|
||||
updateAnnouncedAddrWithPrimaryIpAddr(node).isOkOr:
|
||||
error "failed update announced addr", error = $error
|
||||
@ -611,6 +564,8 @@ proc start*(node: WakuNode) {.async.} =
|
||||
proc stop*(node: WakuNode) {.async.} =
|
||||
## By stopping the switch we are stopping all the underlying mounted protocols
|
||||
|
||||
await node.subscriptionManager.stop()
|
||||
|
||||
node.stopProvidersAndListeners()
|
||||
|
||||
## NOTE: This will dispatch gossipsub stop to the WakuRelay.stop method override
|
||||
|
||||
@ -38,14 +38,14 @@ RequestBroker:
|
||||
|
||||
proc signature(protocol: WakuProtocol): Future[Result[RequestProtocolHealth, string]]
|
||||
|
||||
# Get edge filter health for a single shard (set by DeliveryService when edge mode is active)
|
||||
# Get edge filter health for a single shard (set when edge mode is active)
|
||||
RequestBroker(sync):
|
||||
type RequestEdgeShardHealth* = object
|
||||
health*: TopicHealth
|
||||
|
||||
proc signature(shard: PubsubTopic): Result[RequestEdgeShardHealth, string]
|
||||
|
||||
# Get edge filter confirmed peer count (set by DeliveryService when edge mode is active)
|
||||
# Get edge filter confirmed peer count (set when edge mode is active)
|
||||
RequestBroker(sync):
|
||||
type RequestEdgeFilterPeerCount* = object
|
||||
peerCount*: int
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user