Merge 2636551a11547a2714067f6ab6d1226bf3e04f80 into e2448115b3e6644b3140a65057fa46975907fa4c

This commit is contained in:
Ivan FB 2026-06-26 15:07:44 +02:00 committed by GitHub
commit 8680a125aa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 95 additions and 279 deletions

View File

@ -1,203 +1,76 @@
import std/options
import chronos, results
import brokers/event_broker
import logos_delivery/api/types as api_types
import logos_delivery/waku/waku_core/topics/pubsub_topic
import logos_delivery/waku/waku_core/message
import logos_delivery/waku/waku_core/subscription/push_handler
import logos_delivery/waku/waku_store/common as store_types
export event_broker
export api_types, pubsub_topic, store_types
type IKernel* = ref object of RootObj
EventBroker:
# Internal event emitted when a message arrives from the network via any protocol
type MessageSeenEvent* = object
topic*: PubsubTopic
message*: WakuMessage
# --- topic construction ---
method buildContentTopic*(
self: IKernel, appName: string, appVersion: uint32, name: string, encoding: string
): Future[Result[ContentTopic, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.buildContentTopic not implemented")
# Structural API contract for the Kernel surface, implemented by `Waku`
# (ops in `waku/api/*`).
type KernelApi* = concept w
# --- topic construction ---
buildContentTopic(w, string, uint32, string, string) is
Future[Result[ContentTopic, string]]
buildPubsubTopic(w, string) is Future[Result[PubsubTopic, string]]
defaultPubsubTopic(w) is Future[Result[PubsubTopic, string]]
method buildPubsubTopic*(
self: IKernel, topicName: string
): Future[Result[PubsubTopic, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.buildPubsubTopic not implemented")
# --- relay ---
relayPublish(w, PubsubTopic, WakuMessage, uint32) is Future[Result[string, string]]
relaySubscribe(w, PubsubTopic) is Future[Result[bool, string]]
relayUnsubscribe(w, PubsubTopic) is Future[Result[bool, string]]
relayAddProtectedShard(w, uint16, uint16, string) is Future[Result[bool, string]]
relayConnectedPeers(w, PubsubTopic) is Future[Result[seq[string], string]]
relayPeersInMesh(w, PubsubTopic) is Future[Result[seq[string], string]]
relayNumPeersInMesh(w, PubsubTopic) is Future[Result[int, string]]
relayNumConnectedPeers(w, PubsubTopic) is Future[Result[int, string]]
method defaultPubsubTopic*(
self: IKernel
): Future[Result[PubsubTopic, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.defaultPubsubTopic not implemented")
# --- filter ---
filterSubscribe(w, PubsubTopic, seq[ContentTopic], FilterPushHandler) is
Future[Result[bool, string]]
filterUnsubscribe(w, PubsubTopic, seq[ContentTopic]) is Future[Result[bool, string]]
filterUnsubscribeAll(w) is Future[Result[bool, string]]
# --- relay ---
method relayPublish*(
self: IKernel, pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32
): Future[Result[int, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.relayPublish not implemented")
# --- lightpush ---
lightpushPublish(w, PubsubTopic, WakuMessage) is Future[Result[string, string]]
method relaySubscribe*(
self: IKernel, pubsubTopic: PubsubTopic
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.relaySubscribe not implemented")
# --- store ---
storeQuery(w, StoreQueryRequest, string, int) is
Future[Result[StoreQueryResponse, string]]
method relayUnsubscribe*(
self: IKernel, pubsubTopic: PubsubTopic
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.relayUnsubscribe not implemented")
# --- peer management ---
connect(w, seq[string], uint32) is Future[Result[bool, string]]
disconnectPeerById(w, string) is Future[Result[bool, string]]
disconnectAllPeers(w) is Future[Result[bool, string]]
dialPeer(w, string, string, int) is Future[Result[bool, string]]
dialPeerById(w, string, string, int) is Future[Result[bool, string]]
peerIdsFromPeerstore(w) is Future[Result[seq[string], string]]
connectedPeersInfo(w) is Future[Result[seq[PeerConnInfo], string]]
connectedPeers(w) is Future[Result[seq[string], string]]
peerIdsByProtocol(w, string) is Future[Result[seq[string], string]]
method relayAddProtectedShard*(
self: IKernel, clusterId: uint16, shardId: uint16, publicKey: string
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.relayAddProtectedShard not implemented")
# --- discovery ---
dnsDiscovery(w, string, string, int) is Future[Result[seq[string], string]]
discv5UpdateBootnodes(w, string) is Future[Result[bool, string]]
startDiscv5(w) is Future[Result[bool, string]]
stopDiscv5(w) is Future[Result[bool, string]]
peerExchangeRequest(w, uint64) is Future[Result[int, string]]
method relayConnectedPeers*(
self: IKernel, pubsubTopic: PubsubTopic
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.relayConnectedPeers not implemented")
method relayPeersInMesh*(
self: IKernel, pubsubTopic: PubsubTopic
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.relayPeersInMesh not implemented")
# --- filter ---
method filterSubscribe*(
self: IKernel,
pubsubTopic: Option[PubsubTopic],
contentTopics: seq[ContentTopic],
peer: string,
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.filterSubscribe not implemented")
method filterUnsubscribe*(
self: IKernel,
pubsubTopic: Option[PubsubTopic],
contentTopics: seq[ContentTopic],
peer: string,
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.filterUnsubscribe not implemented")
method filterUnsubscribeAll*(
self: IKernel, peer: string
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.filterUnsubscribeAll not implemented")
# --- lightpush ---
method lightpushPublish*(
self: IKernel, pubsubTopic: PubsubTopic, message: WakuMessage, peer: string
): Future[Result[string, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.lightpushPublish not implemented")
# --- store ---
method storeQuery*(
self: IKernel, request: StoreQueryRequest, peer: string, timeoutMs: int
): Future[Result[StoreQueryResponse, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.storeQuery not implemented")
# --- peer management ---
method connect*(
self: IKernel, peers: seq[string], timeoutMs: uint32
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.connect not implemented")
method disconnectPeerById*(
self: IKernel, peerId: string
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.disconnectPeerById not implemented")
method disconnectAllPeers*(
self: IKernel
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.disconnectAllPeers not implemented")
method dialPeer*(
self: IKernel, peerAddr: string, protocol: string, timeoutMs: int
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.dialPeer not implemented")
method dialPeerById*(
self: IKernel, peerId: string, protocol: string, timeoutMs: int
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.dialPeerById not implemented")
method peerIdsFromPeerstore*(
self: IKernel
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.peerIdsFromPeerstore not implemented")
method connectedPeersInfo*(
self: IKernel
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.connectedPeersInfo not implemented")
method connectedPeers*(
self: IKernel
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.connectedPeers not implemented")
method peerIdsByProtocol*(
self: IKernel, protocol: string
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.peerIdsByProtocol not implemented")
# --- discovery ---
method dnsDiscovery*(
self: IKernel, enrTreeUrl: string, nameServer: string, timeoutMs: int
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.dnsDiscovery not implemented")
method discv5UpdateBootnodes*(
self: IKernel, bootnodes: seq[string]
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.discv5UpdateBootnodes not implemented")
method startDiscv5*(
self: IKernel
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.startDiscv5 not implemented")
method stopDiscv5*(
self: IKernel
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.stopDiscv5 not implemented")
method peerExchangeRequest*(
self: IKernel, numPeers: uint64
): Future[Result[int, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.peerExchangeRequest not implemented")
# --- debug / info ---
method version*(
self: IKernel
): Future[Result[string, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.version not implemented")
method listenAddresses*(
self: IKernel
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.listenAddresses not implemented")
method myEnr*(
self: IKernel
): Future[Result[string, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.myEnr not implemented")
method myPeerId*(
self: IKernel
): Future[Result[string, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.myPeerId not implemented")
method metrics*(
self: IKernel
): Future[Result[string, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.metrics not implemented")
method pingPeer*(
self: IKernel, peerAddr: string, timeoutMs: int
): Future[Result[int64, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.pingPeer not implemented")
# --- debug / info ---
version(w) is Future[Result[string, string]]
listenAddresses(w) is Future[Result[seq[string], string]]
myEnr(w) is Future[Result[string, string]]
myPeerId(w) is Future[Result[string, string]]
metrics(w) is Future[Result[string, string]]
isOnline(w) is Future[Result[bool, string]]
pingPeer(w, string, int) is Future[Result[int64, string]]

View File

@ -1,33 +0,0 @@
## `LogosDelivery` is the project entry point. It is a pure concentrator: it
## owns exactly one instance of each API layer
##
## Waku <- MessagingClient <- ReliableChannelManager
##
## and chains them together (each layer drives the one below it). Every layer
## keeps its own, separate public API — `LogosDelivery` only wires them up and
## drives the shared `new` / `start` / `stop` lifecycle.
{.push raises: [].}
import results, chronos
import brokers/event_broker
import types as api_types
export api_types, event_broker
type
## Entry point. Holds one instance of each API layer.
ILogosDelivery* = ref object of RootObj
EventBroker:
type EventConnectionStatusChange* = object
connectionStatus*: ConnectionStatus
method start*(self: ILogosDelivery): Future[Result[void, string]] {.async, base.} =
return err("ILogosDelivery.start not implemented")
method stop*(self: ILogosDelivery): Future[Result[void, string]] {.async, base.} =
return err("ILogosDelivery.stop not implemented")
method isOnline*(self: ILogosDelivery): Future[Result[bool, string]] {.async, base.} =
return err("ILogosDelivery.isOnline not implemented")

View File

@ -6,8 +6,6 @@ import logos_delivery/waku/waku_core/message
export event_broker, api_types
type IMessagingClient* = ref object of RootObj
EventBroker:
# Event emitted when a message is sent to the network
type MessageSentEvent* = object
@ -33,17 +31,8 @@ EventBroker:
messageHash*: string
message*: WakuMessage
method subscribe*(
self: IMessagingClient, contentTopic: ContentTopic
): Future[Result[void, string]] {.async: (raises: []), base.} =
return err("Interface IMessagingClient.subscribe not implemented")
method unsubscribe*(
self: IMessagingClient, contentTopic: ContentTopic
): Result[void, string] {.base, raises: [].} =
return err("Interface IMessagingClient.unsubscribe not implemented")
method send*(
self: IMessagingClient, envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async: (raises: []), base.} =
return err("Interface IMessagingClient.send not implemented")
# Structural API contract for a messaging client (ops in `messaging/api/*`).
type MessagingApi* = concept c
subscribe(c, ContentTopic) is Future[Result[void, string]]
unsubscribe(c, ContentTopic) is Result[void, string]
send(c, MessageEnvelope) is Future[Result[RequestId, string]]

View File

@ -14,16 +14,13 @@ import logos_delivery/api/messaging_client_api
export event_broker, api_types
export channel_types, messaging_client_api
type
IReliableChannelManager* = ref object of RootObj
SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.
async: (raises: [CatchableError]), gcsafe
.}
## Egress dispatch boundary. Typically wraps `MessagingClient.send`;
## tests inject a fake that records calls and returns canned
## `RequestId`s so the send state machine can be exercised end-to-end
## without a network.
type SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.
async: (raises: [CatchableError]), gcsafe
.}
## Egress dispatch boundary. Typically wraps `MessagingClient.send`;
## tests inject a fake that records calls and returns canned
## `RequestId`s so the send state machine can be exercised end-to-end
## without a network.
EventBroker:
type ChannelMessageReceivedEvent* = object
@ -47,24 +44,9 @@ EventBroker:
requestId*: RequestId
error*: string
method createReliableChannel*(
self: IReliableChannelManager,
channelId: ChannelId,
contentTopic: ContentTopic,
senderId: SdsParticipantID,
sendHandler: SendHandler = nil,
): Result[ChannelId, string] {.base.} =
return err("Interface IReliableChannelManager.createReliableChannel not implemented")
method closeChannel*(
self: IReliableChannelManager, channelId: ChannelId
): Future[Result[void, string]] {.async: (raises: []), base.} =
return err("Interface IReliableChannelManager.closeChannel not implemented")
method send*(
self: IReliableChannelManager,
channelId: ChannelId,
appPayload: seq[byte],
ephemeral: bool = false,
): Future[Result[RequestId, string]] {.async: (raises: []), base.} =
return err("Interface IReliableChannelManager.send not implemented")
# Structural API contract for the reliable-channel surface (ops in `channels/api/*`).
type ReliableChannelApi* = concept c
createReliableChannel(c, ChannelId, ContentTopic, SdsParticipantID) is
Result[ChannelId, string]
closeChannel(c, ChannelId) is Future[Result[void, string]]
send(c, ChannelId, seq[byte]) is Future[Result[RequestId, string]]

View File

@ -26,6 +26,11 @@ type
PartiallyConnected
Connected
PeerConnInfo* = object ## structured connected-peer info for the api boundary
peerId*: string
protocols*: seq[string]
addresses*: seq[string]
proc new*(T: typedesc[RequestId], rng: crypto.Rng): T =
## Generate a new RequestId using the provided RNG.
RequestId(request_utils.generateRequestId(rng))

View File

@ -28,7 +28,7 @@ type
## channel API. Placeholder for now (segmentation / SDS / rate-limit defaults
## will move here in a follow-up PR); kept so each layer owns its own config.
ReliableChannelManager* = ref object of IReliableChannelManager
ReliableChannelManager* = ref object ## Implements `ReliableChannelApi`.
channels*: Table[ChannelId, ReliableChannel] ## read by `channels/api.nim`
messagingClient: MessagingClient ## The channel layer chains onto messaging.
sendHandler*: SendHandler

View File

@ -11,9 +11,6 @@
import results, chronos, chronicles
import logos_delivery/api/logos_delivery_api
export logos_delivery_api
# Each layer has a core module (type + new/start/stop) and an api/ folder whose
# modules each implement a differentiated set of operations, plus an events
# surface. The concentrator re-exports them so library consumers get the full
@ -52,6 +49,12 @@ import logos_delivery/channels/api/send as channel_send
export channel_send
# ChannelMessage* events are surfaced via `export reliable_channel_manager`.
# Compile-time check that each layer's concrete type satisfies its API concept.
static:
doAssert Waku is KernelApi
doAssert MessagingClient is MessagingApi
doAssert ReliableChannelManager is ReliableChannelApi
import logos_delivery/waku/factory/waku_conf
import logos_delivery/waku/factory/app_callbacks
import tools/confutils/cli_args
@ -69,8 +72,7 @@ type
messaging*: MessagingClientConf
reliableChannel*: ReliableChannelManagerConf
LogosDelivery* = ref object of ILogosDelivery
## Entry point. Holds one instance of each API layer.
LogosDelivery* = ref object ## Entry point. Holds one instance of each API layer.
waku*: Waku
messagingClient*: MessagingClient
reliableChannelManager*: ReliableChannelManager
@ -114,7 +116,7 @@ proc new*(
)
)
method start*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
proc start*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
## Starts each layer bottom-up: transport first, then messaging, then channels.
if self.waku.isNil():
return err("Waku node is not initialized")
@ -134,7 +136,7 @@ method start*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
return ok()
method stop*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
proc stop*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
## Stops in reverse order so higher layers drain before their dependencies.
await self.reliableChannelManager.stop()
await self.messagingClient.stop()
@ -144,7 +146,7 @@ method stop*(self: LogosDelivery): Future[Result[void, string]] {.async.} =
return ok()
method isOnline*(self: LogosDelivery): Future[Result[bool, string]] {.async.} =
proc isOnline*(self: LogosDelivery): Future[Result[bool, string]] {.async.} =
if self.waku.isNil():
return err("Waku node is not initialized")
return ok(self.waku.healthMonitor.onlineMonitor.amIOnline())

View File

@ -13,7 +13,7 @@ import
import
logos_delivery/api/kernel_api, # MessageSeenEvent
logos_delivery/api/messaging_client_api, # MessageReceivedEvent
logos_delivery/api/logos_delivery_api # EventConnectionStatusChange
logos_delivery/waku/api/events/health_events # EventConnectionStatusChange
const MaxMessageLife = chronos.minutes(7) ## Max time we will keep track of rx messages

View File

@ -17,7 +17,7 @@ type
## follow-up PR. Today it only carries the p2p reliability toggle.
useP2PReliability*: bool
MessagingClient* = ref object of IMessagingClient
MessagingClient* = ref object ## Implements `MessagingApi`.
waku*: Waku ## The Waku kernel this layer drives; read by `messaging/api/*`.
sendService*: SendService
recvService*: RecvService

View File

@ -4,10 +4,12 @@ import logos_delivery/api/types
import logos_delivery/waku/node/health_monitor/[protocol_health, topic_health]
import logos_delivery/waku/waku_core/topics
export protocol_health, topic_health
export event_broker, protocol_health, topic_health
# Note: `EventConnectionStatusChange` lives in `logos_delivery/api/logos_delivery_api`
# (the top-level orchestrator interface owns the node-connectivity event).
# Emitted by the health monitor when overall node connectivity changes.
EventBroker:
type EventConnectionStatusChange* = object
connectionStatus*: ConnectionStatus
# Notify health changes to a subscribed topic
# TODO: emit content topic health change events when subscribe/unsubscribe

View File

@ -8,10 +8,7 @@ import libp2p/[peerid, peerstore]
import logos_delivery/waku/waku
import logos_delivery/waku/[waku_core, node/waku_node, node/peer_manager]
type PeerConnInfo* = object ## structured connected-peer info for the api boundary
peerId*: string
protocols*: seq[string]
addresses*: seq[string]
# `PeerConnInfo` is defined in `api/types` (surfaced here via `import waku`).
proc connect*(
self: Waku, peers: seq[string], timeoutMs: uint32

View File

@ -9,7 +9,6 @@ import
libp2p/protocols/pubsub,
libp2p/protocols/pubsub/rpc/messages,
logos_delivery/api/types,
logos_delivery/api/logos_delivery_api, # EventConnectionStatusChange
logos_delivery/waku/[
waku_relay,
waku_rln_relay,

View File

@ -68,7 +68,7 @@ logScope:
# Git version in git describe format (defined at compile time)
const git_version* {.strdefine.} = "n/a"
type Waku* = ref object of IKernel
type Waku* = ref object ## Implements `KernelApi` (ops in `waku/api/*`).
stateInfo*: WakuStateInfo
conf*: WakuConf
rng*: crypto.Rng