api: define layer contracts with concept, not RootObj inheritance

The IMessagingClient / IKernel / IReliableChannelManager base types were
inheritance-as-documentation: nothing dispatched over them and no impl ever
overrode their base `method`s (the ops live in `*/api/*` as plain procs). The
`method`s only returned "not implemented" at runtime and, crucially, were never
checked against the real surface -- so IKernel had silently drifted from `Waku`
(relayPublish returned int not string; filter/lightpush still carried a `peer`
param; connectedPeersInfo returned seq[string]).

Replace each base type with a structural Nim `concept` matched against the real
implementation, and assert conformance once in the concentrator
(`doAssert Waku is KernelApi`, etc.) where every impl and its op modules are in
scope. This is zero-cost, drops the dead vtables, and makes each layer's true
surface a single compiler-checked source of truth.

Move `PeerConnInfo` from `waku/api/peer_manager` into `api/types` (alongside the
other api-boundary data types) so `KernelApi` can name it without an import
cycle -- otherwise `connectedPeersInfo` could not be part of the contract.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Ivan FB 2026-06-26 12:30:07 +02:00
parent e2448115b3
commit b7d1cb8872
No known key found for this signature in database
GPG Key ID: DF0C67A04C543270
9 changed files with 78 additions and 225 deletions

View File

@ -1,203 +1,76 @@
import std/options
import chronos, results
import brokers/event_broker
import logos_delivery/api/types as api_types
import logos_delivery/waku/waku_core/topics/pubsub_topic
import logos_delivery/waku/waku_core/message
import logos_delivery/waku/waku_core/subscription/push_handler
import logos_delivery/waku/waku_store/common as store_types
export event_broker
export api_types, pubsub_topic, store_types
type IKernel* = ref object of RootObj
EventBroker:
# Internal event emitted when a message arrives from the network via any protocol
type MessageSeenEvent* = object
topic*: PubsubTopic
message*: WakuMessage
# --- topic construction ---
method buildContentTopic*(
self: IKernel, appName: string, appVersion: uint32, name: string, encoding: string
): Future[Result[ContentTopic, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.buildContentTopic not implemented")
# Structural API contract for the Kernel surface, implemented by `Waku`
# (ops in `waku/api/*`).
type KernelApi* = concept w
# --- topic construction ---
buildContentTopic(w, string, uint32, string, string) is
Future[Result[ContentTopic, string]]
buildPubsubTopic(w, string) is Future[Result[PubsubTopic, string]]
defaultPubsubTopic(w) is Future[Result[PubsubTopic, string]]
method buildPubsubTopic*(
self: IKernel, topicName: string
): Future[Result[PubsubTopic, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.buildPubsubTopic not implemented")
# --- relay ---
relayPublish(w, PubsubTopic, WakuMessage, uint32) is Future[Result[string, string]]
relaySubscribe(w, PubsubTopic) is Future[Result[bool, string]]
relayUnsubscribe(w, PubsubTopic) is Future[Result[bool, string]]
relayAddProtectedShard(w, uint16, uint16, string) is Future[Result[bool, string]]
relayConnectedPeers(w, PubsubTopic) is Future[Result[seq[string], string]]
relayPeersInMesh(w, PubsubTopic) is Future[Result[seq[string], string]]
relayNumPeersInMesh(w, PubsubTopic) is Future[Result[int, string]]
relayNumConnectedPeers(w, PubsubTopic) is Future[Result[int, string]]
method defaultPubsubTopic*(
self: IKernel
): Future[Result[PubsubTopic, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.defaultPubsubTopic not implemented")
# --- filter ---
filterSubscribe(w, PubsubTopic, seq[ContentTopic], FilterPushHandler) is
Future[Result[bool, string]]
filterUnsubscribe(w, PubsubTopic, seq[ContentTopic]) is Future[Result[bool, string]]
filterUnsubscribeAll(w) is Future[Result[bool, string]]
# --- relay ---
method relayPublish*(
self: IKernel, pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32
): Future[Result[int, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.relayPublish not implemented")
# --- lightpush ---
lightpushPublish(w, PubsubTopic, WakuMessage) is Future[Result[string, string]]
method relaySubscribe*(
self: IKernel, pubsubTopic: PubsubTopic
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.relaySubscribe not implemented")
# --- store ---
storeQuery(w, StoreQueryRequest, string, int) is
Future[Result[StoreQueryResponse, string]]
method relayUnsubscribe*(
self: IKernel, pubsubTopic: PubsubTopic
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.relayUnsubscribe not implemented")
# --- peer management ---
connect(w, seq[string], uint32) is Future[Result[bool, string]]
disconnectPeerById(w, string) is Future[Result[bool, string]]
disconnectAllPeers(w) is Future[Result[bool, string]]
dialPeer(w, string, string, int) is Future[Result[bool, string]]
dialPeerById(w, string, string, int) is Future[Result[bool, string]]
peerIdsFromPeerstore(w) is Future[Result[seq[string], string]]
connectedPeersInfo(w) is Future[Result[seq[PeerConnInfo], string]]
connectedPeers(w) is Future[Result[seq[string], string]]
peerIdsByProtocol(w, string) is Future[Result[seq[string], string]]
method relayAddProtectedShard*(
self: IKernel, clusterId: uint16, shardId: uint16, publicKey: string
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.relayAddProtectedShard not implemented")
# --- discovery ---
dnsDiscovery(w, string, string, int) is Future[Result[seq[string], string]]
discv5UpdateBootnodes(w, string) is Future[Result[bool, string]]
startDiscv5(w) is Future[Result[bool, string]]
stopDiscv5(w) is Future[Result[bool, string]]
peerExchangeRequest(w, uint64) is Future[Result[int, string]]
method relayConnectedPeers*(
self: IKernel, pubsubTopic: PubsubTopic
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.relayConnectedPeers not implemented")
method relayPeersInMesh*(
self: IKernel, pubsubTopic: PubsubTopic
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.relayPeersInMesh not implemented")
# --- filter ---
method filterSubscribe*(
self: IKernel,
pubsubTopic: Option[PubsubTopic],
contentTopics: seq[ContentTopic],
peer: string,
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.filterSubscribe not implemented")
method filterUnsubscribe*(
self: IKernel,
pubsubTopic: Option[PubsubTopic],
contentTopics: seq[ContentTopic],
peer: string,
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.filterUnsubscribe not implemented")
method filterUnsubscribeAll*(
self: IKernel, peer: string
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.filterUnsubscribeAll not implemented")
# --- lightpush ---
method lightpushPublish*(
self: IKernel, pubsubTopic: PubsubTopic, message: WakuMessage, peer: string
): Future[Result[string, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.lightpushPublish not implemented")
# --- store ---
method storeQuery*(
self: IKernel, request: StoreQueryRequest, peer: string, timeoutMs: int
): Future[Result[StoreQueryResponse, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.storeQuery not implemented")
# --- peer management ---
method connect*(
self: IKernel, peers: seq[string], timeoutMs: uint32
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.connect not implemented")
method disconnectPeerById*(
self: IKernel, peerId: string
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.disconnectPeerById not implemented")
method disconnectAllPeers*(
self: IKernel
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.disconnectAllPeers not implemented")
method dialPeer*(
self: IKernel, peerAddr: string, protocol: string, timeoutMs: int
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.dialPeer not implemented")
method dialPeerById*(
self: IKernel, peerId: string, protocol: string, timeoutMs: int
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.dialPeerById not implemented")
method peerIdsFromPeerstore*(
self: IKernel
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.peerIdsFromPeerstore not implemented")
method connectedPeersInfo*(
self: IKernel
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.connectedPeersInfo not implemented")
method connectedPeers*(
self: IKernel
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.connectedPeers not implemented")
method peerIdsByProtocol*(
self: IKernel, protocol: string
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.peerIdsByProtocol not implemented")
# --- discovery ---
method dnsDiscovery*(
self: IKernel, enrTreeUrl: string, nameServer: string, timeoutMs: int
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.dnsDiscovery not implemented")
method discv5UpdateBootnodes*(
self: IKernel, bootnodes: seq[string]
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.discv5UpdateBootnodes not implemented")
method startDiscv5*(
self: IKernel
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.startDiscv5 not implemented")
method stopDiscv5*(
self: IKernel
): Future[Result[bool, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.stopDiscv5 not implemented")
method peerExchangeRequest*(
self: IKernel, numPeers: uint64
): Future[Result[int, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.peerExchangeRequest not implemented")
# --- debug / info ---
method version*(
self: IKernel
): Future[Result[string, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.version not implemented")
method listenAddresses*(
self: IKernel
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
return err("Interface IKernel.listenAddresses not implemented")
method myEnr*(
self: IKernel
): Future[Result[string, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.myEnr not implemented")
method myPeerId*(
self: IKernel
): Future[Result[string, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.myPeerId not implemented")
method metrics*(
self: IKernel
): Future[Result[string, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.metrics not implemented")
method pingPeer*(
self: IKernel, peerAddr: string, timeoutMs: int
): Future[Result[int64, string]] {.async: (raises: []), base.} =
return err("Interface IKernel.pingPeer not implemented")
# --- debug / info ---
version(w) is Future[Result[string, string]]
listenAddresses(w) is Future[Result[seq[string], string]]
myEnr(w) is Future[Result[string, string]]
myPeerId(w) is Future[Result[string, string]]
metrics(w) is Future[Result[string, string]]
isOnline(w) is Future[Result[bool, string]]
pingPeer(w, string, int) is Future[Result[int64, string]]

View File

@ -6,8 +6,6 @@ import logos_delivery/waku/waku_core/message
export event_broker, api_types
type IMessagingClient* = ref object of RootObj
EventBroker:
# Event emitted when a message is sent to the network
type MessageSentEvent* = object
@ -33,17 +31,8 @@ EventBroker:
messageHash*: string
message*: WakuMessage
method subscribe*(
self: IMessagingClient, contentTopic: ContentTopic
): Future[Result[void, string]] {.async: (raises: []), base.} =
return err("Interface IMessagingClient.subscribe not implemented")
method unsubscribe*(
self: IMessagingClient, contentTopic: ContentTopic
): Result[void, string] {.base, raises: [].} =
return err("Interface IMessagingClient.unsubscribe not implemented")
method send*(
self: IMessagingClient, envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async: (raises: []), base.} =
return err("Interface IMessagingClient.send not implemented")
# Structural API contract for a messaging client (ops in `messaging/api/*`).
type MessagingApi* = concept c
subscribe(c, ContentTopic) is Future[Result[void, string]]
unsubscribe(c, ContentTopic) is Result[void, string]
send(c, MessageEnvelope) is Future[Result[RequestId, string]]

View File

@ -15,8 +15,6 @@ export event_broker, api_types
export channel_types, messaging_client_api
type
IReliableChannelManager* = ref object of RootObj
SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.
async: (raises: [CatchableError]), gcsafe
.}
@ -47,24 +45,9 @@ EventBroker:
requestId*: RequestId
error*: string
method createReliableChannel*(
self: IReliableChannelManager,
channelId: ChannelId,
contentTopic: ContentTopic,
senderId: SdsParticipantID,
sendHandler: SendHandler = nil,
): Result[ChannelId, string] {.base.} =
return err("Interface IReliableChannelManager.createReliableChannel not implemented")
method closeChannel*(
self: IReliableChannelManager, channelId: ChannelId
): Future[Result[void, string]] {.async: (raises: []), base.} =
return err("Interface IReliableChannelManager.closeChannel not implemented")
method send*(
self: IReliableChannelManager,
channelId: ChannelId,
appPayload: seq[byte],
ephemeral: bool = false,
): Future[Result[RequestId, string]] {.async: (raises: []), base.} =
return err("Interface IReliableChannelManager.send not implemented")
# Structural API contract for the reliable-channel surface (ops in `channels/api/*`).
type ReliableChannelApi* = concept c
createReliableChannel(c, ChannelId, ContentTopic, SdsParticipantID) is
Result[ChannelId, string]
closeChannel(c, ChannelId) is Future[Result[void, string]]
send(c, ChannelId, seq[byte]) is Future[Result[RequestId, string]]

View File

@ -26,6 +26,11 @@ type
PartiallyConnected
Connected
PeerConnInfo* = object ## structured connected-peer info for the api boundary
peerId*: string
protocols*: seq[string]
addresses*: seq[string]
proc new*(T: typedesc[RequestId], rng: crypto.Rng): T =
## Generate a new RequestId using the provided RNG.
RequestId(request_utils.generateRequestId(rng))

View File

@ -28,7 +28,7 @@ type
## channel API. Placeholder for now (segmentation / SDS / rate-limit defaults
## will move here in a follow-up PR); kept so each layer owns its own config.
ReliableChannelManager* = ref object of IReliableChannelManager
ReliableChannelManager* = ref object ## Implements `ReliableChannelApi`.
channels*: Table[ChannelId, ReliableChannel] ## read by `channels/api.nim`
messagingClient: MessagingClient ## The channel layer chains onto messaging.
sendHandler*: SendHandler

View File

@ -52,6 +52,12 @@ import logos_delivery/channels/api/send as channel_send
export channel_send
# ChannelMessage* events are surfaced via `export reliable_channel_manager`.
# Compile-time check that each layer's concrete type satisfies its API concept.
static:
doAssert Waku is KernelApi
doAssert MessagingClient is MessagingApi
doAssert ReliableChannelManager is ReliableChannelApi
import logos_delivery/waku/factory/waku_conf
import logos_delivery/waku/factory/app_callbacks
import tools/confutils/cli_args

View File

@ -17,7 +17,7 @@ type
## follow-up PR. Today it only carries the p2p reliability toggle.
useP2PReliability*: bool
MessagingClient* = ref object of IMessagingClient
MessagingClient* = ref object ## Implements `MessagingApi`.
waku*: Waku ## The Waku kernel this layer drives; read by `messaging/api/*`.
sendService*: SendService
recvService*: RecvService

View File

@ -8,10 +8,7 @@ import libp2p/[peerid, peerstore]
import logos_delivery/waku/waku
import logos_delivery/waku/[waku_core, node/waku_node, node/peer_manager]
type PeerConnInfo* = object ## structured connected-peer info for the api boundary
peerId*: string
protocols*: seq[string]
addresses*: seq[string]
# `PeerConnInfo` is defined in `api/types` (surfaced here via `import waku`).
proc connect*(
self: Waku, peers: seq[string], timeoutMs: uint32

View File

@ -68,7 +68,7 @@ logScope:
# Git version in git describe format (defined at compile time)
const git_version* {.strdefine.} = "n/a"
type Waku* = ref object of IKernel
type Waku* = ref object ## Implements `KernelApi` (ops in `waku/api/*`).
stateInfo*: WakuStateInfo
conf*: WakuConf
rng*: crypto.Rng