mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-06-29 13:00:06 +00:00
messaging: depend on the Waku kernel, not the raw WakuNode
MessagingClient sat one layer below where the documented layering
(Waku <- MessagingClient <- ReliableChannelManager) places it: it took a
raw WakuNode and reached around the Waku kernel to its internals. Make the
messaging layer hold the Waku kernel and read `waku.node` from there, so the
declared dependency matches the layering and the layer holds the kernel handle
it will later route through.
The health-monitor test hand-builds a WakuNode, so it now wraps it in a
minimal Waku (conf/stateInfo only satisfy {.requiresInit.}; the messaging
path reads neither).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
68ef4c70d0
commit
b95a74c600
@ -97,7 +97,7 @@ proc new*(
|
||||
let waku = (await Waku.new(layerConf.waku, appCallbacks)).valueOr:
|
||||
return err("failed to create Waku: " & error)
|
||||
|
||||
let messagingClient = MessagingClient.new(layerConf.messaging, waku.node).valueOr:
|
||||
let messagingClient = MessagingClient.new(layerConf.messaging, waku).valueOr:
|
||||
return err("failed to create MessagingClient: " & error)
|
||||
|
||||
let reliableChannelManager = ReliableChannelManager.new(
|
||||
|
||||
@ -3,6 +3,7 @@ import results, chronos, chronicles
|
||||
|
||||
import logos_delivery/api/types
|
||||
import logos_delivery/messaging/messaging_client
|
||||
import logos_delivery/waku/waku
|
||||
import logos_delivery/waku/node/[waku_node, subscription_manager]
|
||||
import logos_delivery/messaging/delivery_service/send_service
|
||||
import logos_delivery/messaging/delivery_service/send_service/delivery_task
|
||||
@ -16,17 +17,20 @@ proc send*(
|
||||
## id the caller can correlate with `MessageSentEvent` / `MessageErrorEvent`.
|
||||
?self.checkApiAvailability()
|
||||
|
||||
let isSubbed =
|
||||
self.node.subscriptionManager.isSubscribed(envelope.contentTopic).valueOr(false)
|
||||
let isSubbed = self.waku.node.subscriptionManager.isSubscribed(
|
||||
envelope.contentTopic
|
||||
).valueOr(false)
|
||||
if not isSubbed:
|
||||
info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic
|
||||
self.node.subscriptionManager.subscribe(envelope.contentTopic).isOkOr:
|
||||
self.waku.node.subscriptionManager.subscribe(envelope.contentTopic).isOkOr:
|
||||
warn "Failed to auto-subscribe", error = error
|
||||
return err("Failed to auto-subscribe before sending: " & error)
|
||||
|
||||
let requestId = RequestId.new(self.node.rng)
|
||||
let requestId = RequestId.new(self.waku.node.rng)
|
||||
|
||||
let deliveryTask = DeliveryTask.new(requestId, envelope, self.node.brokerCtx).valueOr:
|
||||
let deliveryTask = DeliveryTask.new(
|
||||
requestId, envelope, self.waku.node.brokerCtx
|
||||
).valueOr:
|
||||
return err("MessagingClient.send: Failed to create delivery task: " & error)
|
||||
|
||||
asyncSpawn self.sendService.send(deliveryTask)
|
||||
|
||||
@ -3,16 +3,17 @@ import results, chronos
|
||||
|
||||
import logos_delivery/api/types
|
||||
import logos_delivery/messaging/messaging_client
|
||||
import logos_delivery/waku/waku
|
||||
import logos_delivery/waku/node/[waku_node, subscription_manager]
|
||||
|
||||
proc subscribe*(
|
||||
self: MessagingClient, contentTopic: ContentTopic
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
?self.checkApiAvailability()
|
||||
return self.node.subscriptionManager.subscribe(contentTopic)
|
||||
return self.waku.node.subscriptionManager.subscribe(contentTopic)
|
||||
|
||||
proc unsubscribe*(
|
||||
self: MessagingClient, contentTopic: ContentTopic
|
||||
): Result[void, string] =
|
||||
?self.checkApiAvailability()
|
||||
return self.node.subscriptionManager.unsubscribe(contentTopic)
|
||||
return self.waku.node.subscriptionManager.unsubscribe(contentTopic)
|
||||
|
||||
@ -4,7 +4,7 @@
|
||||
import results, chronos
|
||||
import
|
||||
logos_delivery/api/messaging_client_api,
|
||||
logos_delivery/waku/node/waku_node,
|
||||
logos_delivery/waku/waku,
|
||||
logos_delivery/messaging/delivery_service/[recv_service, send_service]
|
||||
|
||||
# Surfaces the messaging API interface (and its Message* events) to consumers.
|
||||
@ -18,19 +18,19 @@ type
|
||||
useP2PReliability*: bool
|
||||
|
||||
MessagingClient* = ref object of IMessagingClient
|
||||
node*: WakuNode ## Waku core driven by this layer; read by `messaging/api.nim`.
|
||||
waku*: Waku ## The Waku kernel this layer drives; read by `messaging/api/*`.
|
||||
sendService*: SendService
|
||||
recvService*: RecvService
|
||||
started: bool
|
||||
|
||||
proc new*(
|
||||
T: type MessagingClient, conf: MessagingClientConf, node: WakuNode
|
||||
T: type MessagingClient, conf: MessagingClientConf, waku: Waku
|
||||
): Result[T, string] =
|
||||
## The messaging layer chains onto Waku: it drives the underlying
|
||||
## `WakuNode` (Waku's core) for transport while exposing its own send/recv API.
|
||||
let sendService = ?SendService.new(conf.useP2PReliability, node)
|
||||
let recvService = RecvService.new(node)
|
||||
ok(T(node: node, sendService: sendService, recvService: recvService))
|
||||
## The messaging layer chains onto Waku: it drives the underlying Waku kernel
|
||||
## for transport while exposing its own send/recv API.
|
||||
let sendService = ?SendService.new(conf.useP2PReliability, waku.node)
|
||||
let recvService = RecvService.new(waku.node)
|
||||
ok(T(waku: waku, sendService: sendService, recvService: recvService))
|
||||
|
||||
proc start*(self: MessagingClient): Result[void, string] =
|
||||
if self.started:
|
||||
|
||||
@ -26,6 +26,8 @@ import
|
||||
|
||||
import ../testlib/[wakunode, wakucore], ../waku_archive/archive_utils
|
||||
import logos_delivery/waku/node/subscription_manager
|
||||
import logos_delivery/waku/waku
|
||||
import logos_delivery/waku/factory/waku_state_info
|
||||
import logos_delivery/messaging/messaging_client
|
||||
|
||||
const MockDLow = 4 # Mocked GossipSub DLow value
|
||||
@ -228,8 +230,14 @@ suite "Health Monitor - events":
|
||||
nodeA.mountMetadata(1, @[0'u16]).expect("Node A failed to mount metadata")
|
||||
await nodeA.start()
|
||||
|
||||
# MessagingClient now depends on the Waku kernel, not the raw node. Only
|
||||
# `waku.node` is read on the messaging path; `conf`/`stateInfo` are supplied
|
||||
# solely to satisfy Waku's {.requiresInit.} fields.
|
||||
let waku = Waku(
|
||||
node: nodeA, conf: defaultTestWakuConf(), stateInfo: WakuStateInfo.init(nodeA)
|
||||
)
|
||||
let ds = MessagingClient
|
||||
.new(MessagingClientConf(useP2PReliability: false), nodeA)
|
||||
.new(MessagingClientConf(useP2PReliability: false), waku)
|
||||
.expect("Failed to create MessagingClient")
|
||||
ds.start().expect("Failed to start MessagingClient")
|
||||
|
||||
@ -333,8 +341,14 @@ suite "Health Monitor - events":
|
||||
nodeA.mountMetadata(1, @[0'u16]).expect("Node A failed to mount metadata")
|
||||
await nodeA.start()
|
||||
|
||||
# MessagingClient now depends on the Waku kernel, not the raw node. Only
|
||||
# `waku.node` is read on the messaging path; `conf`/`stateInfo` are supplied
|
||||
# solely to satisfy Waku's {.requiresInit.} fields.
|
||||
let waku = Waku(
|
||||
node: nodeA, conf: defaultTestWakuConf(), stateInfo: WakuStateInfo.init(nodeA)
|
||||
)
|
||||
let ds = MessagingClient
|
||||
.new(MessagingClientConf(useP2PReliability: false), nodeA)
|
||||
.new(MessagingClientConf(useP2PReliability: false), waku)
|
||||
.expect("Failed to create MessagingClient")
|
||||
ds.start().expect("Failed to start MessagingClient")
|
||||
let subMgr = nodeA.subscriptionManager
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user