mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-06-05 13:39:59 +00:00
* Convert DeliveryService into optionally mountable MessagingClient
* Move SubscriptionManager to core layer (WakuNode)
* Ensure libwaku kernel_api/ still works (deprecated; removal pending)
* Create node_types.nim to allow WakuNode to compose subsystems cleanly
* Create node_telemetry.nim to centralize Prometheus types
* Remove unnecessary "ptr Waku" / "addr waku" indirection
* Rename Waku.startWaku -> Waku.start for upcoming Waku rename
* Write complete proc surface for SubscriptionManager (all intents expressible)
* Rename edgeFilterHealthLoop -> edgeFilterConnectionLoop ("Health" means monitoring)
* logosdelivery_start_node calls mountMessagingClient then starts
* libwaku and wakunode2 do not mount messagingClient
* Improve edge filter peer cleanup on disconnect
* misc refactors/moves, improvements, fixes
Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
64 lines
2.1 KiB
Nim
64 lines
2.1 KiB
Nim
import results, chronos
|
|
import chronicles
|
|
import
|
|
./api/types,
|
|
./node/[
|
|
waku_node,
|
|
subscription_manager,
|
|
delivery_service/recv_service,
|
|
delivery_service/send_service,
|
|
delivery_service/send_service/delivery_task,
|
|
]
|
|
|
|
type MessagingClient* = ref object
|
|
node: WakuNode
|
|
sendService*: SendService
|
|
recvService*: RecvService
|
|
started: bool
|
|
|
|
proc new*(
|
|
T: type MessagingClient, useP2PReliability: bool, node: WakuNode
|
|
): Result[T, string] =
|
|
let sendService = ?SendService.new(useP2PReliability, node)
|
|
let recvService = RecvService.new(node)
|
|
ok(T(node: node, sendService: sendService, recvService: recvService))
|
|
|
|
proc start*(self: MessagingClient): Result[void, string] =
|
|
if self.started:
|
|
return ok()
|
|
self.recvService.startRecvService()
|
|
self.sendService.startSendService()
|
|
self.started = true
|
|
ok()
|
|
|
|
proc stop*(self: MessagingClient) {.async.} =
|
|
if not self.started:
|
|
return
|
|
await self.sendService.stopSendService()
|
|
await self.recvService.stopRecvService()
|
|
self.started = false
|
|
|
|
proc send*(
|
|
self: MessagingClient, envelope: MessageEnvelope
|
|
): Future[Result[RequestId, string]] {.async.} =
|
|
## High-level messaging API send. Auto-subscribes to the content topic
|
|
## (so the local node sees its own gossipsub broadcast), builds a
|
|
## `DeliveryTask`, and hands it to the send service. Returns the request
|
|
## id the caller can correlate with `MessageSentEvent` / `MessageErrorEvent`.
|
|
let isSubbed =
|
|
self.node.subscriptionManager.isSubscribed(envelope.contentTopic).valueOr(false)
|
|
if not isSubbed:
|
|
info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic
|
|
self.node.subscriptionManager.subscribe(envelope.contentTopic).isOkOr:
|
|
warn "Failed to auto-subscribe", error = error
|
|
return err("Failed to auto-subscribe before sending: " & error)
|
|
|
|
let requestId = RequestId.new(self.node.rng)
|
|
|
|
let deliveryTask = DeliveryTask.new(requestId, envelope, self.node.brokerCtx).valueOr:
|
|
return err("MessagingClient.send: Failed to create delivery task: " & error)
|
|
|
|
asyncSpawn self.sendService.send(deliveryTask)
|
|
|
|
return ok(requestId)
|