Merge branch 'master' into chore/retrieve-roots-current

This commit is contained in:
stubbsta 2026-06-01 09:11:45 +02:00
commit 3649e859f5
No known key found for this signature in database
13 changed files with 581 additions and 156 deletions

View File

@ -8,7 +8,8 @@ on:
env:
NPROC: 2
MAKEFLAGS: "-j${NPROC}"
NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none"
NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none -d:disableMarchNative"
NIM_PARAMS: "-d:disableMarchNative"
jobs:
build:

View File

@ -13,7 +13,8 @@ concurrency:
env:
NPROC: 2
MAKEFLAGS: "-j${NPROC}"
NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none"
NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none -d:disableMarchNative"
NIM_PARAMS: "-d:disableMarchNative"
NIM_VERSION: '2.2.4'
NIMBLE_VERSION: '0.22.3'
@ -35,6 +36,9 @@ jobs:
- 'nimble.lock'
- 'waku.nimble'
- 'Makefile'
- 'scripts/**'
- 'flake.nix'
- 'flake.lock'
- 'library/**'
- 'liblogosdelivery/**'
v2:
@ -157,7 +161,7 @@ jobs:
fi
export MAKEFLAGS="-j1"
export NIMFLAGS="--colors:off -d:chronicles_colors:none"
export NIMFLAGS="--colors:off -d:chronicles_colors:none -d:disableMarchNative"
export USE_LIBBACKTRACE=0
make V=1 POSTGRES=$postgres_enabled test

View File

@ -28,8 +28,11 @@ jobs:
set -euo pipefail
NIMBLE_VERSION=$(grep -m1 '^version = ' waku.nimble | sed -E 's/version = "([^"]+)"/\1/')
# Nearest tag reachable from HEAD; --abbrev=0 drops the -<n>-g<sha>
# suffix so we get the bare tag (e.g. v0.38.0).
BASE_TAG=$(git describe --tags --abbrev=0 2>/dev/null || echo "")
# suffix so we get the bare tag (e.g. v0.38.0). `--match 'v*'` skips
# the moving `nightly` tag (auto-updated by the daily CI to point at
# master HEAD), which would otherwise be picked as the nearest tag
# and break the version-sort comparison below.
BASE_TAG=$(git describe --tags --abbrev=0 --match 'v*' 2>/dev/null || echo "")
BASE_TAG=${BASE_TAG#v}
# Compare on the base version, ignoring any -rc.N prerelease suffix.
BASE_TAG=${BASE_TAG%%-*}

View File

@ -21,3 +21,19 @@ EventBroker:
channelId*: ChannelId
senderId*: SdsParticipantID
payload*: seq[byte]
EventBroker:
## Emitted when every segment of a channel-level `send()` reached
## `Confirmed`. Channel-level analogue of `MessageSentEvent`; the
## `requestId` is the channel-layer parent returned by `send()`.
type ChannelMessageSentEvent* = object
channelId*: ChannelId
requestId*: RequestId
EventBroker:
## Emitted when a channel-level `send()` finalises with at least one
## segment in `Failed`. Channel-level analogue of `MessageErrorEvent`.
type ChannelMessageErrorEvent* = object
channelId*: ChannelId
requestId*: RequestId
error*: string

View File

@ -29,7 +29,7 @@ EventBroker:
##
## `channelId` lets listeners filter to their own channel, since all
## reliable channels share the underlying Waku node's broker context.
type ReadyToSendEvent* = object
type ReadyToSendEvent* = ref object
channelId*: SdsChannelID
msgs*: seq[seq[byte]]

View File

@ -13,14 +13,15 @@
##
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
import std/[options, tables]
import std/[options, sets, tables]
import results
import chronos
import bearssl/rand
import stew/byteutils
import libp2p/crypto/crypto as libp2p_crypto
import waku/node/delivery_service/delivery_service
import waku/api/api
import waku/factory/waku as waku_factory
import waku/node/delivery_service/send_service
import waku/waku_core/topics
@ -31,8 +32,8 @@ import ./rate_limit_manager/rate_limit_manager
import ./encryption/encryption
export
delivery_service, send_service, events, segmentation, scalable_data_sync,
rate_limit_manager, encryption
api, waku_factory, events, segmentation, scalable_data_sync, rate_limit_manager,
encryption
const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1"
## Wire-format spec marker for the Reliable Channel layer, as defined
@ -42,27 +43,64 @@ const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1"
## The trailing `/N` is the wire-format version and is bumped only
## on breaking on-the-wire changes; implementations pin one version.
type ReliableChannel* = ref object
## Spec-defined public type. Fields are private so callers cannot
## mutate internals and break invariants. Getters are added below
## for the few values consumers may need.
deliveryService: DeliveryService
channelId: ChannelId
contentTopic: ContentTopic
senderId: SdsParticipantID
rng: ref HmacDrbgContext
segmentation: SegmentationHandler
sdsHandler: SdsHandler
rateLimit: RateLimitManager
type
SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.
async: (raises: [CatchableError]), gcsafe
.}
## Egress dispatch boundary. Defaults to `waku.send`; tests inject a
## fake that records calls and returns canned `RequestId`s so the
## send state machine can be exercised end-to-end without a network.
requestIds: Table[RequestId, seq[RequestId]]
pendingRequests: seq[tuple[parent: RequestId, ephemeral: bool]]
brokerCtx: BrokerContext
## Captured here so the channel emits `ChannelMessageReceivedEvent`
## on the same broker context the owning manager registered its
## listeners on. Without this, an emit via `globalBrokerContext()`
## would land on whatever context happens to be thread-local at
## emit time, which is not necessarily the manager's.
MessagePersistence {.pure.} = enum
Persistent
Ephemeral
SegmentSendState {.pure.} = enum
## Lifecycle of a single segment as tracked by the channel. The
## messaging layer has its own richer `DeliveryState` (retries,
## propagated-vs-validated); here we only model what's needed to
## decide when a `channelReqId` is fully accounted for.
AwaitingRateLimit ## Pushed by `send`; not yet released by rate_limit_manager.
InFlight
## Released by rate_limit_manager and handed to delivery_service;
## `messagingReqId` is now set.
Confirmed ## `MessageSentEvent` arrived for `messagingReqId`.
Failed
## `MessageErrorEvent` arrived for `messagingReqId`, or the local
## delivery-task construction failed before any id was reachable.
PendingMessagingRequest = object
## One entry per segment (i.e. per messaging-layer request). The
## relative order of `AwaitingRateLimit` entries must match the
## order in which `rate_limit_manager` re-emits messages, which is
## FIFO with `send()`.
channelReqId*: RequestId
## The channel-layer parent id returned to the caller of `send()` in channel layer.
## One channel request maps to N pending messaging requests.
messagingReqId*: Option[RequestId]
## Per-segment messaging layer id. `none` until `onReadyToSend` assigns it.
persistenceReqType: MessagePersistence
segmentSendState*: SegmentSendState
ReliableChannel* = ref object
## Spec-defined public type. Fields are private so callers cannot
## mutate internals and break invariants. Getters are added below
## for the few values consumers may need.
sendHandler: SendHandler
channelId: ChannelId
contentTopic: ContentTopic
senderId: SdsParticipantID
rng: ref HmacDrbgContext
segmentation: SegmentationHandler
sdsHandler: SdsHandler
rateLimit: RateLimitManager
requestIds: Table[RequestId, seq[RequestId]]
pendingMessagingRequests: seq[PendingMessagingRequest]
## Entries are kept until the matching segment reaches a final
## state (`Confirmed` or `Failed`); a whole channel request is
## then pruned in one pass once all its segments are final.
brokerCtx: BrokerContext
func getChannelId*(self: ReliableChannel): ChannelId {.inline.} =
self.channelId
@ -73,19 +111,103 @@ func getContentTopic*(self: ReliableChannel): ContentTopic {.inline.} =
func getSenderId*(self: ReliableChannel): SdsParticipantID {.inline.} =
self.senderId
func isFinal(state: SegmentSendState): bool {.inline.} =
return state in {SegmentSendState.Confirmed, SegmentSendState.Failed}
proc pruneCompletedChannelReqs(self: ReliableChannel) =
## Drop every `pendingMessagingRequests` entry whose `channelReqId`
## has all of its segments in a final state. A single failing
## segment doesn't trigger a drop on its own — we wait until siblings
## are also accounted for, so the channel-level outcome is decided
## from a complete picture. For each fully-final `channelReqId`, emit
## the channel-level final event before the entries are dropped:
## `ChannelMessageSentEvent` if every sibling Confirmed,
## `ChannelMessageErrorEvent` if any sibling Failed.
var hasPending = initHashSet[RequestId]()
var anyFailed = initHashSet[RequestId]()
for entry in self.pendingMessagingRequests:
if not entry.segmentSendState.isFinal():
hasPending.incl(entry.channelReqId)
elif entry.segmentSendState == SegmentSendState.Failed:
anyFailed.incl(entry.channelReqId)
var emitted = initHashSet[RequestId]()
for entry in self.pendingMessagingRequests:
if entry.channelReqId in hasPending or entry.channelReqId in emitted:
continue
emitted.incl(entry.channelReqId)
if entry.channelReqId in anyFailed:
ChannelMessageErrorEvent.emit(
self.brokerCtx,
ChannelMessageErrorEvent(
channelId: self.channelId,
requestId: entry.channelReqId,
error: "one or more segments failed",
),
)
else:
ChannelMessageSentEvent.emit(
self.brokerCtx,
ChannelMessageSentEvent(
channelId: self.channelId, requestId: entry.channelReqId
),
)
self.pendingMessagingRequests.keepItIf(it.channelReqId in hasPending)
proc onMessageSent(self: ReliableChannel, messagingReqId: RequestId) =
## Invoked from this channel's `MessageSentEvent` listener. Flips
## the matching `InFlight` segment to `Confirmed` and prunes. The
## listener routes every event through here; entries that don't
## belong to this channel simply don't match and are no-ops.
self.pendingMessagingRequests.applyItIf(
it.segmentSendState == SegmentSendState.InFlight and
it.messagingReqId == some(messagingReqId)
):
it.segmentSendState = SegmentSendState.Confirmed
self.pruneCompletedChannelReqs()
proc onMessageError(self: ReliableChannel, messagingReqId: RequestId) =
## Symmetric to `onMessageSent` but for `MessageErrorEvent`.
self.pendingMessagingRequests.applyItIf(
it.segmentSendState == SegmentSendState.InFlight and
it.messagingReqId == some(messagingReqId)
):
it.segmentSendState = SegmentSendState.Failed
self.pruneCompletedChannelReqs()
proc onReadyToSend(
self: ReliableChannel, msgs: seq[seq[byte]]
self: ReliableChannel, readyToSendEvent: ReadyToSendEvent
) {.async: (raises: []).} =
## Tail of the outgoing pipeline. Invoked from the `ReadyToSendEvent`
## listener once `rate_limit_manager` releases a batch of opaque
## blobs (already-encoded SDS messages):
##
## ... -> rate_limit_manager -> [encryption] -> dispatch
for m in msgs:
## Each `m` was preceded by exactly one push onto `pendingRequests`
## in `send`, so this pop is always safe in the current skeleton.
let pending = self.pendingRequests[0]
self.pendingRequests.delete(0)
var idx = 0
for m in readyToSendEvent.msgs:
## The first `AwaitingRateLimit` entry in push order is the one
## this `m` belongs to: `send()` adds one entry per segment, and
## `rate_limit_manager` re-emits them in the same FIFO order, so
## the two sequences advance in lockstep. Earlier entries may
## already be `InFlight` / `Confirmed` / `Failed` because they
## live on until every sibling of their `channelReqId` is final,
## so we walk past those to find the next one that was awaiting for this batch.
while idx < self.pendingMessagingRequests.len and
self.pendingMessagingRequests[idx].segmentSendState !=
SegmentSendState.AwaitingRateLimit
:
idx.inc()
if idx >= self.pendingMessagingRequests.len:
## rate_limit_manager emitted more messages than we have pending —
## should not happen given `send` pushes one entry per enqueued
## SDS payload. Drop silently rather than corrupt state.
break
let channelReqId = self.pendingMessagingRequests[idx].channelReqId
let isEphemeral =
self.pendingMessagingRequests[idx].persistenceReqType ==
MessagePersistence.Ephemeral
## TODO: revisit which fields of the SDS message must be encrypted.
## Encrypting the whole encoded blob forces every receiver to attempt
@ -97,32 +219,58 @@ proc onReadyToSend(
MessageErrorEvent.emit(
self.brokerCtx,
MessageErrorEvent(
requestId: pending.parent,
messageHash: "",
error: "encryption failed: " & error,
requestId: channelReqId, messageHash: "", error: "encryption failed: " & error
),
)
## Encryption failed *before* we could hand the segment to the
## delivery layer — no `messagingReqId` was minted and no
## `DeliveryTask` was queued on `sendService`. The delivery
## layer will therefore never emit a `MessageSentEvent` /
## `MessageErrorEvent` for this segment, so `onMessageError`
## won't fire either. Advance the state machine inline so the
## parent `channelReqId` can still be pruned once its siblings
## are also final.
self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.Failed
idx.inc()
continue
let wireBytes = seq[byte](encrypted)
## The `meta` field carries the Reliable Channel wire-format spec
## marker so the ingress side of any peer can route this WakuMessage
## to its Reliable Channel layer.
let envelope = MessageEnvelope(
contentTopic: self.contentTopic, payload: wireBytes, ephemeral: pending.ephemeral
contentTopic: self.contentTopic,
payload: wireBytes,
ephemeral: isEphemeral,
meta: LipWireReliableChannelVersion.toBytes(),
)
let deliveryReqId = RequestId.new(self.rng)
let deliveryTask = DeliveryTask.new(deliveryReqId, envelope, globalBrokerContext()).valueOr:
## TODO: emit waku `MessageErrorEvent` for the parent request id.
## `waku.send` is not annotated `(raises: [])`, but this listener is.
## Convert any raise to a Result error so the state machine handles
## both failure modes (Result.err and exception) through one path.
let sendRes =
try:
await self.sendHandler(envelope)
except CatchableError as e:
Result[RequestId, string].err("waku send raised: " & e.msg)
let messagingReqId = sendRes.valueOr:
MessageErrorEvent.emit(
self.brokerCtx,
MessageErrorEvent(
requestId: channelReqId, messageHash: "", error: "waku send failed: " & error
),
)
self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.Failed
idx.inc()
continue
## Stamp the Reliable Channel wire-format spec marker so the ingress
## side of any peer can route this WakuMessage to its Reliable
## Channel layer. Done on the constructed WakuMessage rather than
## via the envelope because `MessageEnvelope` does not expose a
## `meta` field.
deliveryTask.msg.meta = LipWireReliableChannelVersion.toBytes()
self.pendingMessagingRequests[idx].messagingReqId = some(messagingReqId)
self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.InFlight
self.requestIds.mgetOrPut(channelReqId, @[]).add(messagingReqId)
idx.inc()
asyncSpawn self.deliveryService.sendService.send(deliveryTask)
self.requestIds.mgetOrPut(pending.parent, @[]).add(deliveryReqId)
self.pruneCompletedChannelReqs()
proc send*(
self: ReliableChannel, payload: seq[byte], ephemeral: bool = false
@ -135,18 +283,22 @@ proc send*(
##
## `rate_limit_manager.enqueueToSend` emits a `ReadyToSendEvent` with
## the SDS messages cleared for transmission; the channel's listener
## then runs the final stage (encryption -> dispatch). The `ephemeral`
## flag is carried alongside each segment in `pendingRequests` and
## stamped onto the eventual `MessageEnvelope`.
## then runs the final stage (encryption -> dispatch). The
## `persistenceReqType` is carried alongside each segment in
## `pendingMessagingRequests` and stamped onto the eventual
## `MessageEnvelope`.
##
## The returned `RequestId` is the parent of one-or-more
## delivery-service `RequestId`s; the mapping is recorded in
## The returned `RequestId` is the channel-level parent of one-or-more
## messaging-layer `RequestId`s; the mapping is recorded in
## `self.requestIds`.
if payload.len == 0:
return err("empty payload")
let parentReqId = RequestId.new(self.rng)
self.requestIds[parentReqId] = @[]
let channelReqId = RequestId.new(self.rng)
self.requestIds[channelReqId] = @[]
let persistenceReqType =
if ephemeral: MessagePersistence.Ephemeral else: MessagePersistence.Persistent
for segmentBytes in self.segmentation.performSegmentation(payload):
## Segments arrive already encoded; the segmentation module owns
@ -155,10 +307,17 @@ proc send*(
self.channelId, self.senderId, segmentBytes
).valueOr:
return err("SDS wrap failed: " & error)
self.pendingRequests.add((parent: parentReqId, ephemeral: ephemeral))
self.pendingMessagingRequests.add(
PendingMessagingRequest(
channelReqId: channelReqId,
messagingReqId: none(RequestId),
persistenceReqType: persistenceReqType,
segmentSendState: SegmentSendState.AwaitingRateLimit,
)
)
self.rateLimit.enqueueToSend(sdsBytes)
return ok(parentReqId)
return ok(channelReqId)
proc onMessageReceived(
self: ReliableChannel, messageHash: string, payload: seq[byte]
@ -206,7 +365,7 @@ proc onMessageReceived(
proc new*(
T: type ReliableChannel,
deliveryService: DeliveryService,
waku: Waku,
channelId: ChannelId,
contentTopic: ContentTopic,
senderId: SdsParticipantID,
@ -214,6 +373,7 @@ proc new*(
sdsConfig: SdsConfig,
rateConfig: RateLimitConfig,
brokerCtx: BrokerContext = globalBrokerContext(),
sendHandler: SendHandler = nil,
): T =
## Pipeline handlers (segmentation/SDS/rate-limit) are constructed
## inside the channel rather than handed in by the caller — they are
@ -221,8 +381,20 @@ proc new*(
## should be wiring up. Encryption is delegated to the `Encrypt`/
## `Decrypt` request brokers, so the channel keeps no per-instance
## encryption state either.
##
## `sendHandler` defaults to `waku.send`; tests pass a fake to drive
## the send state machine without touching the network.
let resolvedSendHandler =
if sendHandler.isNil():
proc(
envelope: MessageEnvelope
): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} =
return await waku.send(envelope)
else:
sendHandler
let chn = T(
deliveryService: deliveryService,
sendHandler: resolvedSendHandler,
channelId: channelId,
contentTopic: contentTopic,
senderId: senderId,
@ -231,20 +403,21 @@ proc new*(
sdsHandler: SdsHandler.new(sdsConfig, senderId),
rateLimit: RateLimitManager.new(rateConfig, channelId, brokerCtx),
requestIds: initTable[RequestId, seq[RequestId]](),
pendingRequests: @[],
pendingMessagingRequests: @[],
brokerCtx: brokerCtx,
)
## Each channel owns its own egress + ingress listeners on
## `chn.brokerCtx`, filtered to traffic addressed to this channel.
## Keeping the listeners (and the procs they call) inside the
## channel lets `onReadyToSend` and `onMessageReceived` stay private
## — the manager doesn't need to know about them.
## Each channel owns its own egress + ingress + send-completion
## listeners on `chn.brokerCtx`, filtered to traffic addressed to
## this channel. Keeping the listeners (and the handler procs they
## call) inside the channel lets `onReadyToSend` /
## `onMessageReceived` / `onMessageSent` / `onMessageError` stay
## private — the manager doesn't need to know about them.
discard ReadyToSendEvent.listen(
chn.brokerCtx,
proc(evt: ReadyToSendEvent): Future[void] {.async: (raises: []).} =
if evt.channelId == chn.channelId:
await chn.onReadyToSend(evt.msgs)
await chn.onReadyToSend(evt)
,
)
@ -261,4 +434,20 @@ proc new*(
,
)
## Send-completion events are tagged with the per-segment messaging
## `requestId` — globally unique, so we don't need any channel filter
## up front. The handler scans this channel's pending entries for a
## match and is a no-op when the id belongs to a different channel.
discard MessageSentEvent.listen(
chn.brokerCtx,
proc(evt: MessageSentEvent): Future[void] {.async: (raises: []).} =
chn.onMessageSent(evt.requestId),
)
discard MessageErrorEvent.listen(
chn.brokerCtx,
proc(evt: MessageErrorEvent): Future[void] {.async: (raises: []).} =
chn.onMessageError(evt.requestId),
)
return chn

View File

@ -14,6 +14,7 @@ import waku/api/api
import waku/api/api_conf
import waku/events/message_events as waku_message_events
import waku/factory/waku as waku_factory
import waku/node/delivery_service/delivery_service
import waku/waku_core/topics
import ./reliable_channel
@ -23,11 +24,10 @@ export reliable_channel
type ReliableChannelManager* = ref object
channels: Table[ChannelId, ReliableChannel]
deliveryService: DeliveryService
## Owned by the manager. The ownership chain is
## ReliableChannelManager -> DeliveryService -> Waku -> WakuNode.
## Hidden so callers can't substitute their own and bypass the
## manager's pipeline.
waku: Waku
## Owned by the manager. The channel layer reaches the messaging
## API through `waku.send(envelope)`; constructing DeliveryTasks
## directly would breach the layer boundary.
brokerCtx: BrokerContext
proc new*(
@ -38,15 +38,13 @@ proc new*(
## TODO !! The proper ownership chain is:
## ReliableChannelManager -> DeliveryService (MessagingClient) -> Waku (Kernel/Protocols) -> WakuNode,
## and this will be implemented in the future. For now, `createNode`
## is called here to get a DeliveryService instance, and the WakuNode is immediately discarded.
## is called here to get a Waku instance, and the WakuNode is immediately discarded.
## This is a temporary workaround to get the API
let waku = ?(await createNode(conf))
let manager = T(
channels: initTable[ChannelId, ReliableChannel](),
deliveryService: waku.deliveryService,
brokerCtx: brokerCtx,
channels: initTable[ChannelId, ReliableChannel](), waku: waku, brokerCtx: brokerCtx
)
return ok(manager)
@ -55,17 +53,18 @@ proc start*(self: ReliableChannelManager): Result[void, string] =
## Bring the owned DeliveryService up. Separated from `new` so callers
## can register encryption providers / create channels before traffic
## starts flowing.
self.deliveryService.startDeliveryService()
self.waku.deliveryService.startDeliveryService()
proc stop*(self: ReliableChannelManager) {.async.} =
if not self.deliveryService.isNil():
await self.deliveryService.stopDeliveryService()
if not self.waku.isNil():
await self.waku.deliveryService.stopDeliveryService()
proc createReliableChannel*(
self: ReliableChannelManager,
channelId: ChannelId,
contentTopic: ContentTopic,
senderId: SdsParticipantID,
sendHandler: SendHandler = nil,
): Result[ChannelId, string] =
## Spec entry point. The `DeliveryService` and `rng` the channel needs
## are sourced from the owning `ReliableChannelManager` rather than
@ -75,6 +74,9 @@ proc createReliableChannel*(
##
## Segmentation, SDS and rate-limit configs will eventually be read
## from the node's `NodeConfig`. Defaults for now.
##
## `sendHandler` is left `nil` in production so the channel uses the
## owned `waku.send`; tests pass a fake to bypass the network.
if self.channels.hasKey(channelId):
return err("channel already exists: " & channelId)
@ -94,7 +96,7 @@ proc createReliableChannel*(
)
let chn = ReliableChannel.new(
deliveryService = self.deliveryService,
waku = self.waku,
channelId = channelId,
contentTopic = contentTopic,
senderId = senderId,
@ -102,6 +104,7 @@ proc createReliableChannel*(
sdsConfig = sdsConfig,
rateConfig = rateConfig,
brokerCtx = self.brokerCtx,
sendHandler = sendHandler,
)
self.channels[channelId] = chn

44
flake.lock generated
View File

@ -19,8 +19,7 @@
"root": {
"inputs": {
"nixpkgs": "nixpkgs",
"rust-overlay": "rust-overlay",
"zerokit": "zerokit"
"rust-overlay": "rust-overlay"
}
},
"rust-overlay": {
@ -42,47 +41,6 @@
"repo": "rust-overlay",
"type": "github"
}
},
"rust-overlay_2": {
"inputs": {
"nixpkgs": [
"zerokit",
"nixpkgs"
]
},
"locked": {
"lastModified": 1771211437,
"narHash": "sha256-lcNK438i4DGtyA+bPXXyVLHVmJjYpVKmpux9WASa3ro=",
"owner": "oxalica",
"repo": "rust-overlay",
"rev": "c62195b3d6e1bb11e0c2fb2a494117d3b55d410f",
"type": "github"
},
"original": {
"owner": "oxalica",
"repo": "rust-overlay",
"type": "github"
}
},
"zerokit": {
"inputs": {
"nixpkgs": [
"nixpkgs"
],
"rust-overlay": "rust-overlay_2"
},
"locked": {
"owner": "vacp2p",
"repo": "zerokit",
"rev": "5e64cb8822bee65eed6cf459f95ae72b80c6ba63",
"type": "github"
},
"original": {
"owner": "vacp2p",
"repo": "zerokit",
"rev": "5e64cb8822bee65eed6cf459f95ae72b80c6ba63",
"type": "github"
}
}
},
"root": "root",

View File

@ -17,19 +17,9 @@
url = "github:oxalica/rust-overlay";
inputs.nixpkgs.follows = "nixpkgs";
};
# External flake input: Zerokit pinned to a specific commit.
# Update the rev here when a new zerokit version is needed.
zerokit = {
# Pinned to v2.0.2 (5e64cb8822bee65eed6cf459f95ae72b80c6ba63) to match
# the vendor/zerokit submodule. Keep these two in sync: the nix build
# links librln from this input, the Makefile build from the submodule.
url = "github:vacp2p/zerokit/5e64cb8822bee65eed6cf459f95ae72b80c6ba63";
inputs.nixpkgs.follows = "nixpkgs";
};
};
outputs = { self, nixpkgs, rust-overlay, zerokit }:
outputs = { self, nixpkgs, rust-overlay }:
let
systems = [
"x86_64-linux" "aarch64-linux"
@ -69,19 +59,78 @@
inherit system;
overlays = [ (import rust-overlay) nimbleOverlay ];
};
# Prebuilt zerokit librln, fetched from the upstream GitHub release
# rather than compiled from source. Compiling zerokit makes Nix download
# its many crate dependencies from crates.io in one parallel burst, which
# crates.io intermittently rejects with HTTP 403 (rate limiting from the
# self-hosted runners' shared IP), breaking the nix build. The release
# ships the exact `stateless` library this project links (see
# scripts/build_rln.sh), so we use it directly — no Rust toolchain and
# no crates.io access needed.
#
# Keep `rlnVersion` aligned with `LIBRLN_VERSION` in the Makefile and the
# vendor/zerokit submodule. Each hash is the sha256 of the release tarball
# for that platform; refresh all four when bumping the version.
rlnVersion = "v2.0.2";
rlnAssets = {
"x86_64-linux" = { triple = "x86_64-unknown-linux-gnu"; hash = "sha256-qbrUdaetYKFhjzxUP/QcwD3JHWJ8qk/tCMK3yXceIAk="; };
"aarch64-linux" = { triple = "aarch64-unknown-linux-gnu"; hash = "sha256-s4bWrmCcNTWHNyJwV73ilWNp58ZdAVG+TAgtWN1cTQs="; };
"x86_64-darwin" = { triple = "x86_64-apple-darwin"; hash = "sha256-ZaHP5CApN66FYY7jxwOmGcF9kJR78Fng3k1qE2W08Mk="; };
"aarch64-darwin" = { triple = "aarch64-apple-darwin"; hash = "sha256-f2YppkPsKFdN00j+IY8fpvsebWTIb9lW/V1/vOTiVKU="; };
};
mkZerokitRln = system: pkgs:
let
asset = rlnAssets.${system} or
(throw "zerokit ${rlnVersion} has no prebuilt rln asset for system '${system}'");
in pkgs.stdenv.mkDerivation {
pname = "librln";
version = lib.removePrefix "v" rlnVersion;
src = pkgs.fetchurl {
url = "https://github.com/vacp2p/zerokit/releases/download/"
+ "${rlnVersion}/${asset.triple}-stateless-rln.tar.gz";
hash = asset.hash;
};
# The tarball lays its files out under release/.
sourceRoot = "release";
dontConfigure = true;
dontBuild = true;
# The release .so was linked outside Nix, so it references system
# libraries (libgcc_s, libstdc++, glibc) by bare name. autoPatchelfHook
# points those at the Nix versions so the library loads correctly when
# used by the Nix build. It does nothing for the static .a, and the
# step is skipped on macOS (dylib paths are fixed in nix/default.nix).
nativeBuildInputs =
pkgs.lib.optionals pkgs.stdenv.isLinux [ pkgs.autoPatchelfHook ];
buildInputs =
pkgs.lib.optionals pkgs.stdenv.isLinux [ pkgs.stdenv.cc.cc.lib ];
installPhase = ''
runHook preInstall
mkdir -p $out/lib
cp librln.a $out/lib/ 2>/dev/null || true
cp librln.so $out/lib/ 2>/dev/null || true
cp librln.dylib $out/lib/ 2>/dev/null || true
runHook postInstall
'';
meta = with pkgs.lib; {
description = "Prebuilt zerokit RLN library (stateless flavor)";
homepage = "https://github.com/vacp2p/zerokit";
license = with licenses; [ mit asl20 ];
platforms = builtins.attrNames rlnAssets;
};
};
in {
packages = forAllSystems (system:
let
pkgs = pkgsFor system;
# HACK: Fix for stale cargoHash in 2.0.2 release.
zerokitRln = zerokit.packages.${system}.rln.overrideAttrs (old: {
cargoDeps = old.cargoDeps.overrideAttrs (oldCargoDeps: {
vendorStaging = oldCargoDeps.vendorStaging.overrideAttrs (_: {
outputHash = "sha256-PNwEdZLgGQPqQDrEK2hsQtSybVfBbD6xn4K47fPFJUU=";
});
});
});
zerokitRln = mkZerokitRln system pkgs;
liblogosdelivery = pkgs.callPackage ./nix/default.nix {
inherit pkgs;
@ -94,14 +143,13 @@
inherit pkgs;
src = ./.;
targets = ["wakucanary"];
zerokitRln = zerokit.packages.${system}.rln;
inherit zerokitRln;
};
in {
inherit liblogosdelivery wakucanary;
# Expose the cargoHash-corrected librln so downstream consumers
# Expose the prebuilt librln so downstream consumers
# (e.g. logos-delivery-module) bundle the exact same librln this
# build links, instead of pulling zerokit's rln directly — whose
# committed cargoHash is stale for v2.0.2 (see zerokitRln above).
# build links against.
rln = zerokitRln;
default = liblogosdelivery;
}

View File

@ -1,8 +1,15 @@
#!/usr/bin/env bash
# This script is used to build the rln library for the current platform.
# Previously downloaded prebuilt binaries, but due to compatibility issues
# we now always build from source.
# Provides the rln static library for the current platform.
#
# If zerokit publishes a prebuilt `stateless` release asset for this platform,
# download and use it: that is faster than compiling and avoids fetching
# zerokit's many crate dependencies from crates.io. The asset is selected by
# the Rust host target triple (the platform identifier reported by rustc,
# e.g. x86_64-unknown-linux-gnu or aarch64-apple-darwin).
#
# When no matching asset exists (e.g. Windows), build from the vendored
# zerokit submodule instead.
set -e
@ -15,8 +22,26 @@ output_filename=$3
[[ -z "${rln_version}" ]] && { echo "No rln version specified"; exit 1; }
[[ -z "${output_filename}" ]] && { echo "No output filename specified"; exit 1; }
echo "Building RLN library from source (version ${rln_version})..."
# --- Prefer the prebuilt release asset --------------------------------------
# Host target triple, e.g. x86_64-unknown-linux-gnu / aarch64-apple-darwin.
host_triplet=$(rustc --version --verbose | awk '/host:/{print $2}')
tarball="${host_triplet}-stateless-rln.tar.gz"
url="https://github.com/vacp2p/zerokit/releases/download/${rln_version}/${tarball}"
echo "Looking for prebuilt RLN: ${url}"
if curl --silent --fail-with-body -L "${url}" -o "${tarball}"; then
echo "Downloaded prebuilt ${tarball}"
tar -xzf "${tarball}"
mv "release/librln.a" "${output_filename}"
rm -rf "${tarball}" release
echo "Using prebuilt ${output_filename}"
exit 0
fi
# curl --fail-with-body writes the error body to the file on HTTP failure.
rm -f "${tarball}"
echo "No prebuilt asset for ${host_triplet} at ${rln_version}; building from source."
# --- Fall back to building from the vendored submodule ----------------------
# Check if submodule version = version in Makefile
cargo metadata --format-version=1 --no-deps --manifest-path "${build_dir}/rln/Cargo.toml"
@ -33,7 +58,6 @@ if [[ "v${submodule_version}" != "${rln_version}" ]]; then
exit 1
fi
# Build rln from source.
# `stateless` feature: logos-delivery does not maintain a local Merkle tree
# (post-PR #3312); the contract is the source of truth and the path is fetched
# via getMerkleProof(index). The stateless build compiles out tree code.

View File

@ -147,3 +147,171 @@ suite "Reliable Channel - ingress":
check not fired
await manager.stop()
suite "Reliable Channel - send state machine":
asyncTest "MessageSentEvent finalises the channelReqId as Sent":
## Drives the real send pipeline (`send` -> segmentation -> SDS ->
## rate_limit -> encrypt -> dispatch) via a fake `SendHandler` that
## returns a canned `RequestId` instead of hitting the network.
## Emitting the delivery-layer `MessageSentEvent` must drive the
## channel-level state machine through `Confirmed` and produce a
## `ChannelMessageSentEvent` (channel-level terminal event) for the
## `channelReqId` returned by `send()`.
const
channelId = ChannelId("sm-success-channel")
contentTopic = ContentTopic("/reliable-channel/test/sm-success")
fakeMsgReqId = RequestId("fake-msg-req-1")
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
manager = (await ReliableChannelManager.new(createApiNodeConf())).expect(
"Failed to create manager"
)
setNoopEncryption()
var sendCalls = 0
let fakeSend: SendHandler = proc(
env: MessageEnvelope
): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} =
sendCalls.inc
return ok(fakeMsgReqId)
discard manager
.createReliableChannel(
channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend
)
.expect("createReliableChannel")
let sentFut = newFuture[RequestId]("channel-sent")
discard ChannelMessageSentEvent
.listen(
brokerCtx,
proc(evt: ChannelMessageSentEvent) {.async: (raises: []).} =
if not sentFut.finished() and evt.channelId == channelId:
sentFut.complete(evt.requestId)
,
)
.expect("listen ChannelMessageSentEvent")
let channelReqId = manager.send(channelId, "hello".toBytes()).expect("send")
let dispatchDeadline = Moment.now() + 1.seconds
while Moment.now() < dispatchDeadline and sendCalls == 0:
await sleepAsync(5.milliseconds)
check sendCalls == 1
waku_message_events.MessageSentEvent.emit(
brokerCtx,
waku_message_events.MessageSentEvent(requestId: fakeMsgReqId, messageHash: ""),
)
let finalised = await sentFut.withTimeout(1.seconds)
check finalised
if finalised:
check sentFut.read() == channelReqId
await manager.stop()
asyncTest "two independent channelReqIds are finalised independently":
## Two `send()` calls -> two independent `channelReqId`s, each with
## one segment under the current segmentation skeleton
## (`performSegmentation` always emits exactly one segment). The
## fake `SendHandler` returns distinct `messagingReqId`s; finalising
## the first emits `ChannelMessageSentEvent` for its `channelReqId`,
## finalising the second as a failure emits `ChannelMessageErrorEvent`
## for the other.
const
channelId = ChannelId("sm-multi-channel")
contentTopic = ContentTopic("/reliable-channel/test/sm-multi")
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
manager = (await ReliableChannelManager.new(createApiNodeConf())).expect(
"Failed to create manager"
)
setNoopEncryption()
var msgReqIds: seq[RequestId]
let fakeSend: SendHandler = proc(
env: MessageEnvelope
): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} =
let id = RequestId("fake-msg-req-" & $(msgReqIds.len + 1))
msgReqIds.add(id)
return ok(id)
discard manager
.createReliableChannel(
channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend
)
.expect("createReliableChannel")
let sentFut = newFuture[RequestId]("channel-sent")
let erroredFut = newFuture[RequestId]("channel-errored")
discard ChannelMessageSentEvent
.listen(
brokerCtx,
proc(evt: ChannelMessageSentEvent) {.async: (raises: []).} =
if not sentFut.finished() and evt.channelId == channelId:
sentFut.complete(evt.requestId)
,
)
.expect("listen ChannelMessageSentEvent")
discard ChannelMessageErrorEvent
.listen(
brokerCtx,
proc(evt: ChannelMessageErrorEvent) {.async: (raises: []).} =
if not erroredFut.finished() and evt.channelId == channelId:
erroredFut.complete(evt.requestId)
,
)
.expect("listen ChannelMessageErrorEvent")
let channelReqId1 = manager.send(channelId, "first".toBytes()).expect("send 1")
let channelReqId2 = manager.send(channelId, "second".toBytes()).expect("send 2")
let dispatchDeadline = Moment.now() + 1.seconds
while Moment.now() < dispatchDeadline and msgReqIds.len < 2:
await sleepAsync(5.milliseconds)
check msgReqIds.len == 2
waku_message_events.MessageSentEvent.emit(
brokerCtx,
waku_message_events.MessageSentEvent(requestId: msgReqIds[0], messageHash: ""),
)
let sentArrived = await sentFut.withTimeout(1.seconds)
check sentArrived
if sentArrived:
check sentFut.read() == channelReqId1
## The second `channelReqId` must NOT have finalised yet — its
## segment is still `InFlight`.
check not erroredFut.finished()
waku_message_events.MessageErrorEvent.emit(
brokerCtx,
waku_message_events.MessageErrorEvent(
requestId: msgReqIds[1], messageHash: "", error: "synthetic"
),
)
let erroredArrived = await erroredFut.withTimeout(1.seconds)
check erroredArrived
if erroredArrived:
check erroredFut.read() == channelReqId2
await manager.stop()
asyncTest "TODO: channelReqId not pruned until ALL its segments are final":
## Placeholder for the multi-sibling prune rule. Today's
## `performSegmentation` (segmentation skeleton) always emits
## exactly one segment per `send()`, so multiple siblings under one
## `channelReqId` cannot be produced through the real pipeline.
## Implement once segmentation does real chunking: send a payload
## larger than `DefaultSegmentSizeBytes`, capture the N
## `messagingReqId`s from a fake `SendHandler`, finalise some, and
## assert prune only fires once every sibling is final.
skip()

View File

@ -11,6 +11,10 @@ type
contentTopic*: ContentTopic
payload*: seq[byte]
ephemeral*: bool
meta*: seq[byte]
## Opaque wire-format marker carried on the underlying WakuMessage.
## Higher layers (e.g. Reliable Channel) stamp this so peers can route
## ingress traffic to their corresponding layer. Empty by default.
RequestId* = distinct string
@ -34,12 +38,18 @@ proc init*(
contentTopic: ContentTopic,
payload: seq[byte] | string,
ephemeral: bool = false,
meta: seq[byte] = @[],
): MessageEnvelope =
when payload is seq[byte]:
MessageEnvelope(contentTopic: contentTopic, payload: payload, ephemeral: ephemeral)
MessageEnvelope(
contentTopic: contentTopic, payload: payload, ephemeral: ephemeral, meta: meta
)
else:
MessageEnvelope(
contentTopic: contentTopic, payload: payload.toBytes(), ephemeral: ephemeral
contentTopic: contentTopic,
payload: payload.toBytes(),
ephemeral: ephemeral,
meta: meta,
)
proc toWakuMessage*(envelope: MessageEnvelope): WakuMessage =
@ -48,6 +58,7 @@ proc toWakuMessage*(envelope: MessageEnvelope): WakuMessage =
contentTopic: envelope.contentTopic,
payload: envelope.payload,
ephemeral: envelope.ephemeral,
meta: envelope.meta,
timestamp: getNowInNanosecondTime(),
)

View File

@ -26,7 +26,7 @@ logScope:
# This useful util is missing from sequtils, this extends applyIt with predicate...
template applyItIf*(varSeq, pred, op: untyped) =
for i in low(varSeq) .. high(varSeq):
let it {.inject.} = varSeq[i]
var it {.inject.} = varSeq[i]
if pred:
op
varSeq[i] = it