mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-05-29 18:19:25 +00:00
start basic reliable channel folder (#3886)
This commit is contained in:
parent
5e262badf7
commit
74057c6622
1
.github/workflows/ci.yml
vendored
1
.github/workflows/ci.yml
vendored
@ -43,6 +43,7 @@ jobs:
|
||||
- 'tools/**'
|
||||
- 'tests/all_tests_v2.nim'
|
||||
- 'tests/**'
|
||||
- 'channels/**'
|
||||
docker:
|
||||
- 'docker/**'
|
||||
|
||||
|
||||
25
channels/encryption/encryption.nim
Normal file
25
channels/encryption/encryption.nim
Normal file
@ -0,0 +1,25 @@
|
||||
## Optional encryption hooks for the Reliable Channel API.
|
||||
##
|
||||
## Modelled as `RequestBroker`s: the broker pattern lets the channel
|
||||
## delegate work to a provider that may live in any module without
|
||||
## introducing a direct dependency. If no provider is registered the
|
||||
## broker returns an error, so installing the noop providers from
|
||||
## `noop_encryption` is required when the application does not want
|
||||
## actual encryption.
|
||||
##
|
||||
## Applied per-segment after SDS processing on outgoing, and before
|
||||
## SDS processing on incoming. No specific scheme is mandated.
|
||||
##
|
||||
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
|
||||
|
||||
import brokers/request_broker
|
||||
|
||||
export request_broker
|
||||
|
||||
RequestBroker:
|
||||
type Encrypt* = seq[byte]
|
||||
proc signature*(payload: seq[byte]): Future[Result[Encrypt, string]] {.async.}
|
||||
|
||||
RequestBroker:
|
||||
type Decrypt* = seq[byte]
|
||||
proc signature*(payload: seq[byte]): Future[Result[Decrypt, string]] {.async.}
|
||||
18
channels/encryption/noop_encryption.nim
Normal file
18
channels/encryption/noop_encryption.nim
Normal file
@ -0,0 +1,18 @@
|
||||
## No-op encryption providers. Install these when the application does
|
||||
## not want actual encryption so the `Encrypt` / `Decrypt` brokers have
|
||||
## something to dispatch to.
|
||||
|
||||
import results
|
||||
import chronos
|
||||
import ./encryption
|
||||
|
||||
proc setNoopEncryption*() =
|
||||
discard Encrypt.setProvider(
|
||||
proc(payload: seq[byte]): Future[Result[Encrypt, string]] {.async.} =
|
||||
return ok(Encrypt(payload))
|
||||
)
|
||||
|
||||
discard Decrypt.setProvider(
|
||||
proc(payload: seq[byte]): Future[Result[Decrypt, string]] {.async.} =
|
||||
return ok(Decrypt(payload))
|
||||
)
|
||||
23
channels/events.nim
Normal file
23
channels/events.nim
Normal file
@ -0,0 +1,23 @@
|
||||
## Reliable Channel event types emitted to API consumers.
|
||||
##
|
||||
## Lifecycle events for individual segments (sent / propagated / errored)
|
||||
## are the same as the network-level ones the DeliveryService already
|
||||
## emits — `requestId` is shared across layers — so we just re-export
|
||||
## `waku/events/message_events` and avoid declaring duplicates.
|
||||
##
|
||||
## Only the channel-level `MessageReceivedEvent` carries data that has
|
||||
## no analogue in the lower layer (reassembled application payload,
|
||||
## senderId, channelId), so it lives here.
|
||||
|
||||
import waku/events/message_events as waku_message_events
|
||||
import brokers/event_broker
|
||||
|
||||
import ./types as channel_types
|
||||
|
||||
export waku_message_events, channel_types, event_broker
|
||||
|
||||
EventBroker:
|
||||
type ChannelMessageReceivedEvent* = object
|
||||
channelId*: ChannelId
|
||||
senderId*: SdsParticipantID
|
||||
payload*: seq[byte]
|
||||
80
channels/rate_limit_manager/rate_limit_manager.nim
Normal file
80
channels/rate_limit_manager/rate_limit_manager.nim
Normal file
@ -0,0 +1,80 @@
|
||||
## Rate Limit Manager for the Reliable Channel API.
|
||||
##
|
||||
## Tracks messages sent per RLN epoch and delays dispatch when the
|
||||
## limit is approached, ensuring RLN compliance on enforcing relays.
|
||||
##
|
||||
## For the skeleton this is a pass-through: messages are immediately
|
||||
## released as ready-to-send. Real epoch budgeting will be added later.
|
||||
##
|
||||
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
|
||||
|
||||
import std/times
|
||||
import message
|
||||
import brokers/event_broker
|
||||
import brokers/broker_context
|
||||
|
||||
export event_broker, broker_context
|
||||
export message.SdsChannelID
|
||||
|
||||
const
|
||||
DefaultEpochPeriodSec* = 600
|
||||
DefaultMessagesPerEpoch* = 1
|
||||
|
||||
EventBroker:
|
||||
## Emitted by `enqueueToSend` carrying the batch of opaque message
|
||||
## blobs that may now leave the rate limiter and continue down the
|
||||
## outgoing pipeline (encryption -> dispatch). Bytes only: the rate
|
||||
## limiter is intentionally agnostic of SDS, so anything serialisable
|
||||
## can flow through it.
|
||||
##
|
||||
## `channelId` lets listeners filter to their own channel, since all
|
||||
## reliable channels share the underlying Waku node's broker context.
|
||||
type ReadyToSendEvent* = object
|
||||
channelId*: SdsChannelID
|
||||
msgs*: seq[seq[byte]]
|
||||
|
||||
type
|
||||
RateLimitConfig* = object
|
||||
enabled*: bool ## spec: rate limiting opt-in; SHOULD be true when RLN active
|
||||
epochPeriodSec*: int
|
||||
messagesPerEpoch*: int
|
||||
|
||||
RateLimitManager* = ref object
|
||||
config*: RateLimitConfig
|
||||
queue*: seq[seq[byte]]
|
||||
currentEpochStart*: Time
|
||||
sentInCurrentEpoch*: int
|
||||
channelId*: SdsChannelID ## tag for the emitted `ReadyToSendEvent`
|
||||
brokerCtx: BrokerContext
|
||||
|
||||
proc new*(
|
||||
T: type RateLimitManager,
|
||||
config: RateLimitConfig,
|
||||
channelId: SdsChannelID,
|
||||
brokerCtx: BrokerContext = globalBrokerContext(),
|
||||
): T =
|
||||
return T(
|
||||
config: config,
|
||||
queue: @[],
|
||||
currentEpochStart: getTime(),
|
||||
sentInCurrentEpoch: 0,
|
||||
channelId: channelId,
|
||||
brokerCtx: brokerCtx,
|
||||
)
|
||||
|
||||
proc enqueueToSend*(self: RateLimitManager, msg: seq[byte]) =
|
||||
## Skeleton behaviour: enqueue and immediately release as a single
|
||||
## ready batch. Real per-epoch budgeting will park messages on
|
||||
## `self.queue` and emit only when the budget allows.
|
||||
ReadyToSendEvent.emit(
|
||||
self.brokerCtx, ReadyToSendEvent(channelId: self.channelId, msgs: @[msg])
|
||||
)
|
||||
|
||||
proc dequeueReady*(self: RateLimitManager): seq[seq[byte]] =
|
||||
## Returns the set of queued messages that may be dispatched now
|
||||
## without exceeding the configured rate limit.
|
||||
discard
|
||||
|
||||
proc resetEpoch*(self: RateLimitManager) =
|
||||
self.currentEpochStart = getTime()
|
||||
self.sentInCurrentEpoch = 0
|
||||
264
channels/reliable_channel.nim
Normal file
264
channels/reliable_channel.nim
Normal file
@ -0,0 +1,264 @@
|
||||
## Reliable Channel type.
|
||||
##
|
||||
## A `ReliableChannel` orchestrates segmentation, SDS (end-to-end
|
||||
## reliability), optional encryption, and rate-limited dispatch on top
|
||||
## of the Messaging API for a single channel.
|
||||
##
|
||||
## Outgoing pipeline: Segment -> SDS -> Rate Limit -> Encrypt -> Dispatch
|
||||
## Incoming pipeline: Decrypt -> SDS -> Reassemble -> Emit event
|
||||
##
|
||||
## Channels are owned by a `ReliableChannelManager`. Lifecycle and send
|
||||
## operations are addressed by `ChannelId`, so callers only need to keep
|
||||
## an opaque handle around.
|
||||
##
|
||||
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
|
||||
|
||||
import std/[options, tables]
|
||||
import results
|
||||
import chronos
|
||||
import bearssl/rand
|
||||
import stew/byteutils
|
||||
import libp2p/crypto/crypto as libp2p_crypto
|
||||
|
||||
import waku/node/delivery_service/delivery_service
|
||||
import waku/node/delivery_service/send_service
|
||||
import waku/waku_core/topics
|
||||
|
||||
import ./events
|
||||
import ./segmentation/segmentation
|
||||
import ./scalable_data_sync/scalable_data_sync
|
||||
import ./rate_limit_manager/rate_limit_manager
|
||||
import ./encryption/encryption
|
||||
|
||||
export
|
||||
delivery_service, send_service, events, segmentation, scalable_data_sync,
|
||||
rate_limit_manager, encryption
|
||||
|
||||
const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1"
|
||||
## Wire-format spec marker for the Reliable Channel layer, as defined
|
||||
## in the reliable-channel-api LIP (`Wire Format / Spec Marker`).
|
||||
## A `WakuMessage` whose `meta` field does not equal these bytes is
|
||||
## not addressed to this layer and is silently dropped on ingress.
|
||||
## The trailing `/N` is the wire-format version and is bumped only
|
||||
## on breaking on-the-wire changes; implementations pin one version.
|
||||
|
||||
type ReliableChannel* = ref object
|
||||
## Spec-defined public type. Fields are private so callers cannot
|
||||
## mutate internals and break invariants. Getters are added below
|
||||
## for the few values consumers may need.
|
||||
deliveryService: DeliveryService
|
||||
channelId: ChannelId
|
||||
contentTopic: ContentTopic
|
||||
senderId: SdsParticipantID
|
||||
rng: ref HmacDrbgContext
|
||||
segmentation: SegmentationHandler
|
||||
sdsHandler: SdsHandler
|
||||
rateLimit: RateLimitManager
|
||||
|
||||
requestIds: Table[RequestId, seq[RequestId]]
|
||||
pendingRequests: seq[tuple[parent: RequestId, ephemeral: bool]]
|
||||
brokerCtx: BrokerContext
|
||||
## Captured here so the channel emits `ChannelMessageReceivedEvent`
|
||||
## on the same broker context the owning manager registered its
|
||||
## listeners on. Without this, an emit via `globalBrokerContext()`
|
||||
## would land on whatever context happens to be thread-local at
|
||||
## emit time, which is not necessarily the manager's.
|
||||
|
||||
func getChannelId*(self: ReliableChannel): ChannelId {.inline.} =
|
||||
self.channelId
|
||||
|
||||
func getContentTopic*(self: ReliableChannel): ContentTopic {.inline.} =
|
||||
self.contentTopic
|
||||
|
||||
func getSenderId*(self: ReliableChannel): SdsParticipantID {.inline.} =
|
||||
self.senderId
|
||||
|
||||
proc onReadyToSend(
|
||||
self: ReliableChannel, msgs: seq[seq[byte]]
|
||||
) {.async: (raises: []).} =
|
||||
## Tail of the outgoing pipeline. Invoked from the `ReadyToSendEvent`
|
||||
## listener once `rate_limit_manager` releases a batch of opaque
|
||||
## blobs (already-encoded SDS messages):
|
||||
##
|
||||
## ... -> rate_limit_manager -> [encryption] -> dispatch
|
||||
for m in msgs:
|
||||
## Each `m` was preceded by exactly one push onto `pendingRequests`
|
||||
## in `send`, so this pop is always safe in the current skeleton.
|
||||
let pending = self.pendingRequests[0]
|
||||
self.pendingRequests.delete(0)
|
||||
|
||||
## TODO: revisit which fields of the SDS message must be encrypted.
|
||||
## Encrypting the whole encoded blob forces every receiver to attempt
|
||||
## decryption before it can route, which breaks selective dispatch.
|
||||
## Leave routing metadata (channelId, causal-history references) in
|
||||
## clear and encrypt only the application payload.
|
||||
let encRes = await Encrypt.request(m)
|
||||
let encrypted = encRes.valueOr:
|
||||
MessageErrorEvent.emit(
|
||||
self.brokerCtx,
|
||||
MessageErrorEvent(
|
||||
requestId: pending.parent,
|
||||
messageHash: "",
|
||||
error: "encryption failed: " & error,
|
||||
),
|
||||
)
|
||||
continue
|
||||
let wireBytes = seq[byte](encrypted)
|
||||
|
||||
let envelope = MessageEnvelope(
|
||||
contentTopic: self.contentTopic, payload: wireBytes, ephemeral: pending.ephemeral
|
||||
)
|
||||
|
||||
let deliveryReqId = RequestId.new(self.rng)
|
||||
let deliveryTask = DeliveryTask.new(deliveryReqId, envelope, globalBrokerContext()).valueOr:
|
||||
## TODO: emit waku `MessageErrorEvent` for the parent request id.
|
||||
continue
|
||||
|
||||
## Stamp the Reliable Channel wire-format spec marker so the ingress
|
||||
## side of any peer can route this WakuMessage to its Reliable
|
||||
## Channel layer. Done on the constructed WakuMessage rather than
|
||||
## via the envelope because `MessageEnvelope` does not expose a
|
||||
## `meta` field.
|
||||
deliveryTask.msg.meta = LipWireReliableChannelVersion.toBytes()
|
||||
|
||||
asyncSpawn self.deliveryService.sendService.send(deliveryTask)
|
||||
self.requestIds.mgetOrPut(pending.parent, @[]).add(deliveryReqId)
|
||||
|
||||
proc send*(
|
||||
self: ReliableChannel, payload: seq[byte], ephemeral: bool = false
|
||||
): Result[RequestId, string] =
|
||||
## Single application-level send. The first three stages of the
|
||||
## outgoing pipeline are chained explicitly so the flow is visible
|
||||
## at a glance:
|
||||
##
|
||||
## segmentation -> sds -> rate_limit_manager
|
||||
##
|
||||
## `rate_limit_manager.enqueueToSend` emits a `ReadyToSendEvent` with
|
||||
## the SDS messages cleared for transmission; the channel's listener
|
||||
## then runs the final stage (encryption -> dispatch). The `ephemeral`
|
||||
## flag is carried alongside each segment in `pendingRequests` and
|
||||
## stamped onto the eventual `MessageEnvelope`.
|
||||
##
|
||||
## The returned `RequestId` is the parent of one-or-more
|
||||
## delivery-service `RequestId`s; the mapping is recorded in
|
||||
## `self.requestIds`.
|
||||
if payload.len == 0:
|
||||
return err("empty payload")
|
||||
|
||||
let parentReqId = RequestId.new(self.rng)
|
||||
self.requestIds[parentReqId] = @[]
|
||||
|
||||
for segmentBytes in self.segmentation.performSegmentation(payload):
|
||||
## Segments arrive already encoded; the segmentation module owns
|
||||
## the wire format so SDS only ever sees opaque bytes.
|
||||
let sdsBytes = self.sdsHandler.wrapOutgoing(
|
||||
self.channelId, self.senderId, segmentBytes
|
||||
).valueOr:
|
||||
return err("SDS wrap failed: " & error)
|
||||
self.pendingRequests.add((parent: parentReqId, ephemeral: ephemeral))
|
||||
self.rateLimit.enqueueToSend(sdsBytes)
|
||||
|
||||
return ok(parentReqId)
|
||||
|
||||
proc onMessageReceived(
|
||||
self: ReliableChannel, messageHash: string, payload: seq[byte]
|
||||
) {.async: (raises: []).} =
|
||||
## Ingress pipeline made visible:
|
||||
##
|
||||
## payload -> decrypt -> sds -> reassemble -> emit
|
||||
##
|
||||
## Invoked from this channel's `MessageReceivedEvent` listener, which
|
||||
## already filtered on the spec marker and on `contentTopic`. The
|
||||
## channel only sees the raw payload bytes for itself.
|
||||
|
||||
## Notice that the following "request" is implemented implicitly as a broker call to
|
||||
## the `Decrypt` request broker.
|
||||
let decRes = await Decrypt.request(payload)
|
||||
let plaintext = decRes.valueOr:
|
||||
MessageErrorEvent.emit(
|
||||
self.brokerCtx,
|
||||
MessageErrorEvent(
|
||||
requestId: RequestId(""),
|
||||
messageHash: messageHash,
|
||||
error: "decryption failed: " & error,
|
||||
),
|
||||
)
|
||||
return
|
||||
let plaintextBytes = seq[byte](plaintext)
|
||||
|
||||
let unwrapped = self.sdsHandler.handleIncoming(plaintextBytes)
|
||||
if unwrapped.isErr():
|
||||
return
|
||||
|
||||
let reassembled = self.segmentation.handleIncomingSegment(unwrapped.get().content)
|
||||
if reassembled.isSome():
|
||||
## Emit on the captured `brokerCtx` (the manager's), so the
|
||||
## application listener that the manager has set up on that same
|
||||
## context picks the event up.
|
||||
ChannelMessageReceivedEvent.emit(
|
||||
self.brokerCtx,
|
||||
ChannelMessageReceivedEvent(
|
||||
channelId: self.channelId,
|
||||
senderId: self.senderId,
|
||||
payload: reassembled.get().payload,
|
||||
),
|
||||
)
|
||||
|
||||
proc new*(
|
||||
T: type ReliableChannel,
|
||||
deliveryService: DeliveryService,
|
||||
channelId: ChannelId,
|
||||
contentTopic: ContentTopic,
|
||||
senderId: SdsParticipantID,
|
||||
segConfig: SegmentationConfig,
|
||||
sdsConfig: SdsConfig,
|
||||
rateConfig: RateLimitConfig,
|
||||
brokerCtx: BrokerContext = globalBrokerContext(),
|
||||
): T =
|
||||
## Pipeline handlers (segmentation/SDS/rate-limit) are constructed
|
||||
## inside the channel rather than handed in by the caller — they are
|
||||
## implementation details of the channel, not knobs the API consumer
|
||||
## should be wiring up. Encryption is delegated to the `Encrypt`/
|
||||
## `Decrypt` request brokers, so the channel keeps no per-instance
|
||||
## encryption state either.
|
||||
let chn = T(
|
||||
deliveryService: deliveryService,
|
||||
channelId: channelId,
|
||||
contentTopic: contentTopic,
|
||||
senderId: senderId,
|
||||
rng: libp2p_crypto.newRng(),
|
||||
segmentation: SegmentationHandler.new(segConfig),
|
||||
sdsHandler: SdsHandler.new(sdsConfig, senderId),
|
||||
rateLimit: RateLimitManager.new(rateConfig, channelId, brokerCtx),
|
||||
requestIds: initTable[RequestId, seq[RequestId]](),
|
||||
pendingRequests: @[],
|
||||
brokerCtx: brokerCtx,
|
||||
)
|
||||
|
||||
## Each channel owns its own egress + ingress listeners on
|
||||
## `chn.brokerCtx`, filtered to traffic addressed to this channel.
|
||||
## Keeping the listeners (and the procs they call) inside the
|
||||
## channel lets `onReadyToSend` and `onMessageReceived` stay private
|
||||
## — the manager doesn't need to know about them.
|
||||
discard ReadyToSendEvent.listen(
|
||||
chn.brokerCtx,
|
||||
proc(evt: ReadyToSendEvent): Future[void] {.async: (raises: []).} =
|
||||
if evt.channelId == chn.channelId:
|
||||
await chn.onReadyToSend(evt.msgs)
|
||||
,
|
||||
)
|
||||
|
||||
discard MessageReceivedEvent.listen(
|
||||
chn.brokerCtx,
|
||||
proc(evt: MessageReceivedEvent): Future[void] {.async: (raises: []).} =
|
||||
## Drop foreign traffic (non-Reliable-Channel `meta`) and traffic
|
||||
## for other channels before doing any decode work.
|
||||
if string.fromBytes(evt.message.meta) != LipWireReliableChannelVersion:
|
||||
return
|
||||
if evt.message.contentTopic != chn.contentTopic:
|
||||
return
|
||||
await chn.onMessageReceived(evt.messageHash, evt.message.payload)
|
||||
,
|
||||
)
|
||||
|
||||
return chn
|
||||
138
channels/reliable_channel_manager.nim
Normal file
138
channels/reliable_channel_manager.nim
Normal file
@ -0,0 +1,138 @@
|
||||
## Reliable Channel API entry point.
|
||||
##
|
||||
## Owns the set of `ReliableChannel` instances and exposes lifecycle and
|
||||
## send/receive operations addressed by `ChannelId`.
|
||||
##
|
||||
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
|
||||
|
||||
import std/tables
|
||||
import results
|
||||
import chronos
|
||||
import stew/byteutils
|
||||
|
||||
import waku/api/api
|
||||
import waku/api/api_conf
|
||||
import waku/events/message_events as waku_message_events
|
||||
import waku/factory/waku as waku_factory
|
||||
import waku/waku_core/topics
|
||||
|
||||
import ./reliable_channel
|
||||
import ./encryption/noop_encryption
|
||||
|
||||
export reliable_channel
|
||||
|
||||
type ReliableChannelManager* = ref object
|
||||
channels: Table[ChannelId, ReliableChannel]
|
||||
deliveryService: DeliveryService
|
||||
## Owned by the manager. The ownership chain is
|
||||
## ReliableChannelManager -> DeliveryService -> Waku -> WakuNode.
|
||||
## Hidden so callers can't substitute their own and bypass the
|
||||
## manager's pipeline.
|
||||
brokerCtx: BrokerContext
|
||||
|
||||
proc new*(
|
||||
T: type ReliableChannelManager,
|
||||
conf: WakuNodeConf,
|
||||
brokerCtx: BrokerContext = globalBrokerContext(),
|
||||
): Future[Result[T, string]] {.async.} =
|
||||
## TODO !! The proper ownership chain is:
|
||||
## ReliableChannelManager -> DeliveryService (MessagingClient) -> Waku (Kernel/Protocols) -> WakuNode,
|
||||
## and this will be implemented in the future. For now, `createNode`
|
||||
## is called here to get a DeliveryService instance, and the WakuNode is immediately discarded.
|
||||
## This is a temporary workaround to get the API
|
||||
|
||||
let waku = ?(await createNode(conf))
|
||||
|
||||
let manager = T(
|
||||
channels: initTable[ChannelId, ReliableChannel](),
|
||||
deliveryService: waku.deliveryService,
|
||||
brokerCtx: brokerCtx,
|
||||
)
|
||||
|
||||
return ok(manager)
|
||||
|
||||
proc start*(self: ReliableChannelManager): Result[void, string] =
|
||||
## Bring the owned DeliveryService up. Separated from `new` so callers
|
||||
## can register encryption providers / create channels before traffic
|
||||
## starts flowing.
|
||||
self.deliveryService.startDeliveryService()
|
||||
|
||||
proc stop*(self: ReliableChannelManager) {.async.} =
|
||||
if not self.deliveryService.isNil():
|
||||
await self.deliveryService.stopDeliveryService()
|
||||
|
||||
proc createReliableChannel*(
|
||||
self: ReliableChannelManager,
|
||||
channelId: ChannelId,
|
||||
contentTopic: ContentTopic,
|
||||
senderId: SdsParticipantID,
|
||||
): Result[ChannelId, string] =
|
||||
## Spec entry point. The `DeliveryService` and `rng` the channel needs
|
||||
## are sourced from the owning `ReliableChannelManager` rather than
|
||||
## passed per call. Encryption is wired up through the `Encrypt`/
|
||||
## `Decrypt` request brokers — the application installs its own
|
||||
## providers (or `setNoopEncryption()`) before traffic flows.
|
||||
##
|
||||
## Segmentation, SDS and rate-limit configs will eventually be read
|
||||
## from the node's `NodeConfig`. Defaults for now.
|
||||
if self.channels.hasKey(channelId):
|
||||
return err("channel already exists: " & channelId)
|
||||
|
||||
let segConfig = SegmentationConfig(
|
||||
segmentSizeBytes: DefaultSegmentSizeBytes,
|
||||
enableReedSolomon: false,
|
||||
persistence: nil,
|
||||
)
|
||||
let sdsConfig = SdsConfig(
|
||||
acknowledgementTimeoutMs: DefaultAcknowledgementTimeoutMs,
|
||||
maxRetransmissions: DefaultMaxRetransmissions,
|
||||
causalHistorySize: DefaultCausalHistorySize,
|
||||
persistence: nil,
|
||||
)
|
||||
let rateConfig = RateLimitConfig(
|
||||
epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch
|
||||
)
|
||||
|
||||
let chn = ReliableChannel.new(
|
||||
deliveryService = self.deliveryService,
|
||||
channelId = channelId,
|
||||
contentTopic = contentTopic,
|
||||
senderId = senderId,
|
||||
segConfig = segConfig,
|
||||
sdsConfig = sdsConfig,
|
||||
rateConfig = rateConfig,
|
||||
brokerCtx = self.brokerCtx,
|
||||
)
|
||||
|
||||
self.channels[channelId] = chn
|
||||
return ok(channelId)
|
||||
|
||||
proc closeChannel*(
|
||||
self: ReliableChannelManager, channelId: ChannelId
|
||||
): Result[void, string] =
|
||||
## Flush state, persist outstanding SDS buffers, release resources.
|
||||
if not self.channels.hasKey(channelId):
|
||||
return err("unknown channel: " & channelId)
|
||||
self.channels.del(channelId)
|
||||
return ok()
|
||||
|
||||
proc send*(
|
||||
self: ReliableChannelManager,
|
||||
channelId: ChannelId,
|
||||
appPayload: seq[byte],
|
||||
ephemeral: bool = false,
|
||||
): Result[RequestId, string] =
|
||||
## Spec-level entry point. Looks the channel up by id and delegates
|
||||
## to `ReliableChannel.send`, which exposes the visible pipeline
|
||||
## segmentation -> sds -> rate_limit_manager -> encryption.
|
||||
let chn = self.channels.getOrDefault(channelId)
|
||||
if chn.isNil():
|
||||
return err("unknown channel: " & channelId)
|
||||
return chn.send(appPayload, ephemeral)
|
||||
|
||||
## Inbound messages are not handed to the manager by direct call. Each
|
||||
## `ReliableChannel` installs its own `MessageReceivedEvent` listener
|
||||
## in `ReliableChannel.new`, filters by spec marker and `contentTopic`,
|
||||
## and routes to its private `onMessageReceived`. This keeps the lower
|
||||
## layer (MessagingAPI/Waku) unaware of the existence of ReliableChannel
|
||||
## and keeps the manager out of per-channel event dispatch.
|
||||
62
channels/scalable_data_sync/scalable_data_sync.nim
Normal file
62
channels/scalable_data_sync/scalable_data_sync.nim
Normal file
@ -0,0 +1,62 @@
|
||||
## Scalable Data Sync (SDS) component for the Reliable Channel API.
|
||||
##
|
||||
## Provides end-to-end delivery guarantees via causal history tracking,
|
||||
## acknowledgements, and retransmission of unacknowledged segments.
|
||||
##
|
||||
## Skeleton: `wrapOutgoing` and `handleIncoming` are pass-throughs so
|
||||
## the send/receive circuit can exercise the surrounding pipeline.
|
||||
## Real SDS wrapping will plug in via `nim-sds` later.
|
||||
##
|
||||
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
|
||||
|
||||
import results
|
||||
import message as sds_message
|
||||
|
||||
import ./sds_persistence
|
||||
|
||||
export sds_message, sds_persistence
|
||||
|
||||
const
|
||||
DefaultAcknowledgementTimeoutMs* = 5_000
|
||||
DefaultMaxRetransmissions* = 5
|
||||
DefaultCausalHistorySize* = 2
|
||||
|
||||
type
|
||||
SdsConfig* = object
|
||||
acknowledgementTimeoutMs*: int
|
||||
maxRetransmissions*: int
|
||||
causalHistorySize*: int
|
||||
persistence*: SdsPersistence
|
||||
|
||||
SdsHandler* = ref object
|
||||
config*: SdsConfig
|
||||
participantId*: SdsParticipantID
|
||||
|
||||
proc new*(
|
||||
T: type SdsHandler,
|
||||
config: SdsConfig,
|
||||
participantId: SdsParticipantID = SdsParticipantID(""),
|
||||
): T =
|
||||
return T(config: config, participantId: participantId)
|
||||
|
||||
proc wrapOutgoing*(
|
||||
self: SdsHandler,
|
||||
channelId: SdsChannelID,
|
||||
senderId: SdsParticipantID,
|
||||
payload: seq[byte],
|
||||
): Result[seq[byte], string] =
|
||||
## Stage 2 of the outgoing pipeline (segmentation -> sds -> rate_limit_manager -> encryption).
|
||||
## Skeleton: pass the encoded segment through unchanged. Real causal
|
||||
## history / lamport / bloom-filter population will replace this.
|
||||
return ok(payload)
|
||||
|
||||
proc handleIncoming*(
|
||||
self: SdsHandler, msg: seq[byte]
|
||||
): Result[tuple[content: seq[byte], channelId: SdsChannelID], string] =
|
||||
## Skeleton: pass the bytes through; channel id is left empty until
|
||||
## the real wire format provides it.
|
||||
return ok((content: msg, channelId: SdsChannelID("")))
|
||||
|
||||
proc tickRetransmissions*(self: SdsHandler) =
|
||||
## Drives retransmissions of unacknowledged messages.
|
||||
discard
|
||||
25
channels/scalable_data_sync/sds_persistence.nim
Normal file
25
channels/scalable_data_sync/sds_persistence.nim
Normal file
@ -0,0 +1,25 @@
|
||||
## Persistence backend for SDS outgoing buffer and causal history.
|
||||
##
|
||||
## TODO (raised in PR review): this surface is duplicating concerns that
|
||||
## should come from the SDS module itself. Once the SDS module exposes a
|
||||
## complete persistence contract, drop this file and import that surface
|
||||
## instead of re-declaring it here.
|
||||
|
||||
import message
|
||||
|
||||
type
|
||||
SdsPersistenceKind* {.pure.} = enum
|
||||
InMemory
|
||||
Sqlite
|
||||
|
||||
SdsPersistence* = ref object of RootObj
|
||||
kind*: SdsPersistenceKind
|
||||
|
||||
method storeOutgoing*(self: SdsPersistence, msg: SdsMessage) {.base.} =
|
||||
discard
|
||||
|
||||
method markAcknowledged*(self: SdsPersistence, messageId: SdsMessageID) {.base.} =
|
||||
discard
|
||||
|
||||
method unackedOlderThan*(self: SdsPersistence, ageMs: int): seq[SdsMessage] {.base.} =
|
||||
discard
|
||||
34
channels/segmentation/segment_message_proto.nim
Normal file
34
channels/segmentation/segment_message_proto.nim
Normal file
@ -0,0 +1,34 @@
|
||||
## Wire format for a single segment, per the Reliable Channel API spec.
|
||||
##
|
||||
## Skeleton: encode/decode treat the segment as just its payload bytes,
|
||||
## since for now we only ever produce a single segment per send.
|
||||
|
||||
type SegmentMessageProto* = object
|
||||
entireMessageHash*: seq[byte] ## Keccak256(original payload), 32 bytes
|
||||
dataSegmentIndex*: uint32 ## zero-indexed sequence number for data segments
|
||||
dataSegmentCount*: uint32 ## number of data segments (>= 1)
|
||||
payload*: seq[byte] ## segment payload (data or parity shard)
|
||||
paritySegmentIndex*: uint32 ## zero-based sequence number for parity segments
|
||||
paritySegmentCount*: uint32 ## number of parity segments
|
||||
isParity*: bool ## true for parity segments, false (default) for data segments
|
||||
|
||||
proc isParityMessage*(self: SegmentMessageProto): bool =
|
||||
self.isParity
|
||||
|
||||
proc isValid*(self: SegmentMessageProto): bool =
|
||||
## Validates hash length (32 bytes), segment indices and counts.
|
||||
discard
|
||||
|
||||
proc encode*(self: SegmentMessageProto): seq[byte] =
|
||||
self.payload
|
||||
|
||||
proc decode*(T: type SegmentMessageProto, buf: seq[byte]): T =
|
||||
T(
|
||||
entireMessageHash: @[],
|
||||
dataSegmentIndex: 0,
|
||||
dataSegmentCount: 1,
|
||||
payload: buf,
|
||||
paritySegmentIndex: 0,
|
||||
paritySegmentCount: 0,
|
||||
isParity: false,
|
||||
)
|
||||
70
channels/segmentation/segmentation.nim
Normal file
70
channels/segmentation/segmentation.nim
Normal file
@ -0,0 +1,70 @@
|
||||
## Segmentation component for the Reliable Channel API.
|
||||
##
|
||||
## Splits large application payloads into transmittable segments and
|
||||
## reassembles them on reception. Supports optional Reed-Solomon parity
|
||||
## segments for loss recovery, as per the Reliable Channel API spec.
|
||||
##
|
||||
## For the skeleton everything fits in a single segment: real chunking
|
||||
## and Reed-Solomon parity will be plugged in later.
|
||||
##
|
||||
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
|
||||
|
||||
import std/options
|
||||
import ./segment_message_proto
|
||||
import ./segmentation_persistence
|
||||
|
||||
export segment_message_proto, segmentation_persistence
|
||||
|
||||
const
|
||||
DefaultSegmentSizeBytes* = 102_400
|
||||
SegmentsParityRate* = 0.125
|
||||
SegmentsReedSolomonMaxCount* = 256
|
||||
|
||||
type
|
||||
SegmentationConfig* = object
|
||||
segmentSizeBytes*: int
|
||||
enableReedSolomon*: bool
|
||||
persistence*: SegmentationPersistence
|
||||
|
||||
SegmentationHandler* = ref object
|
||||
config*: SegmentationConfig
|
||||
|
||||
ReassemblyResult* = object
|
||||
payload*: seq[byte]
|
||||
entireMessageHash*: seq[byte]
|
||||
|
||||
proc new*(T: type SegmentationHandler, config: SegmentationConfig): T =
|
||||
return T(config: config)
|
||||
|
||||
proc performSegmentation*(
|
||||
self: SegmentationHandler, payload: seq[byte]
|
||||
): seq[seq[byte]] =
|
||||
## Skeleton behaviour: emit exactly one segment carrying the whole
|
||||
## payload. Real chunking and Reed-Solomon parity will replace this.
|
||||
let segment = SegmentMessageProto(
|
||||
entireMessageHash: @[],
|
||||
dataSegmentIndex: 0,
|
||||
dataSegmentCount: 1,
|
||||
payload: payload,
|
||||
paritySegmentIndex: 0,
|
||||
paritySegmentCount: 0,
|
||||
isParity: false,
|
||||
)
|
||||
return @[segment.encode()]
|
||||
|
||||
proc handleIncomingSegment*(
|
||||
self: SegmentationHandler, segmentBytes: seq[byte]
|
||||
): Option[ReassemblyResult] =
|
||||
## Skeleton behaviour: every segment is already a complete message
|
||||
## (since `performSegmentation` always emits one), so just hand the
|
||||
## payload straight back.
|
||||
let segment = SegmentMessageProto.decode(segmentBytes)
|
||||
return some(
|
||||
ReassemblyResult(
|
||||
payload: segment.payload, entireMessageHash: segment.entireMessageHash
|
||||
)
|
||||
)
|
||||
|
||||
proc cleanupSegments*(self: SegmentationHandler) =
|
||||
## Drop expired partial-reassembly state.
|
||||
discard
|
||||
20
channels/segmentation/segmentation_persistence.nim
Normal file
20
channels/segmentation/segmentation_persistence.nim
Normal file
@ -0,0 +1,20 @@
|
||||
## Persistence backend interface for segmentation reassembly state.
|
||||
##
|
||||
## Allows partial reassembly state to survive process restarts.
|
||||
|
||||
type
|
||||
SegmentationPersistenceKind* {.pure.} = enum
|
||||
InMemory
|
||||
Sqlite
|
||||
|
||||
SegmentationPersistence* = ref object of RootObj
|
||||
kind*: SegmentationPersistenceKind
|
||||
|
||||
method put*(self: SegmentationPersistence, key: seq[byte], value: seq[byte]) {.base.} =
|
||||
discard
|
||||
|
||||
method get*(self: SegmentationPersistence, key: seq[byte]): seq[byte] {.base.} =
|
||||
discard
|
||||
|
||||
method delete*(self: SegmentationPersistence, key: seq[byte]) {.base.} =
|
||||
discard
|
||||
15
channels/types.nim
Normal file
15
channels/types.nim
Normal file
@ -0,0 +1,15 @@
|
||||
## Core identifier types for the Reliable Channel API.
|
||||
|
||||
import std/hashes
|
||||
import waku/api/types as api_types
|
||||
|
||||
import ./scalable_data_sync/scalable_data_sync
|
||||
|
||||
export scalable_data_sync
|
||||
export api_types
|
||||
|
||||
type ChannelId* = SdsChannelID
|
||||
|
||||
proc hash*(r: RequestId): Hash =
|
||||
## Allows `RequestId` to be used as a `Table` key.
|
||||
hash(string(r))
|
||||
@ -88,3 +88,6 @@ import ./tools/test_all
|
||||
|
||||
# Persistency library tests
|
||||
import ./persistency/test_all
|
||||
|
||||
# Reliable Channel API tests
|
||||
import ./channels/test_all
|
||||
|
||||
3
tests/channels/test_all.nim
Normal file
3
tests/channels/test_all.nim
Normal file
@ -0,0 +1,3 @@
|
||||
{.used.}
|
||||
|
||||
import ./test_reliable_channel_send_receive
|
||||
149
tests/channels/test_reliable_channel_send_receive.nim
Normal file
149
tests/channels/test_reliable_channel_send_receive.nim
Normal file
@ -0,0 +1,149 @@
|
||||
{.used.}
|
||||
|
||||
import std/[net]
|
||||
import chronos, testutils/unittests, stew/byteutils
|
||||
import brokers/broker_context
|
||||
|
||||
import ../testlib/[common, wakucore, wakunode, testasync]
|
||||
|
||||
import waku
|
||||
import waku/[waku_node, waku_core]
|
||||
import waku/factory/waku_conf
|
||||
import waku/events/message_events as waku_message_events
|
||||
import tools/confutils/cli_args
|
||||
|
||||
import channels/reliable_channel_manager
|
||||
import channels/encryption/noop_encryption
|
||||
|
||||
const TestTimeout = chronos.seconds(15)
|
||||
|
||||
proc createApiNodeConf(): WakuNodeConf =
|
||||
var conf = defaultWakuNodeConf().valueOr:
|
||||
raiseAssert error
|
||||
conf.mode = cli_args.WakuMode.Core
|
||||
conf.listenAddress = parseIpAddress("0.0.0.0")
|
||||
conf.tcpPort = Port(0)
|
||||
conf.discv5UdpPort = Port(0)
|
||||
conf.clusterId = 3'u16
|
||||
conf.numShardsInNetwork = 1
|
||||
conf.reliabilityEnabled = true
|
||||
conf.rest = false
|
||||
return conf
|
||||
|
||||
suite "Reliable Channel - ingress":
|
||||
asyncTest "manager dispatches marked WakuMessage to the right channel":
|
||||
## Unit test for the receive side of the API: instead of standing
|
||||
## up two libp2p nodes and a relay mesh, we drive the manager
|
||||
## directly by emitting a `MessageReceivedEvent` (the exact event
|
||||
## the DeliveryService emits when a `WakuMessage` arrives off the
|
||||
## wire). The manager must:
|
||||
## - drop traffic missing the Reliable Channel spec marker
|
||||
## - dispatch the matching channel's `onMessageReceived`
|
||||
## - emit `ChannelMessageReceivedEvent` with the payload
|
||||
const
|
||||
channelId = ChannelId("test-channel")
|
||||
contentTopic = ContentTopic("/reliable-channel/test/proto")
|
||||
let appPayload = "hello reliable channel".toBytes()
|
||||
|
||||
var manager: ReliableChannelManager
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
manager = (await ReliableChannelManager.new(createApiNodeConf())).expect(
|
||||
"Failed to create manager"
|
||||
)
|
||||
|
||||
## Noop encryption providers so the Encrypt/Decrypt brokers have
|
||||
## something to dispatch to; without this the channel falls back to
|
||||
## plaintext anyway, but installing them is the documented setup.
|
||||
setNoopEncryption()
|
||||
|
||||
discard manager
|
||||
.createReliableChannel(channelId, contentTopic, SdsParticipantID("local"))
|
||||
.expect("createReliableChannel")
|
||||
|
||||
let received = newFuture[seq[byte]]("channel-message-received")
|
||||
discard ChannelMessageReceivedEvent
|
||||
.listen(
|
||||
brokerCtx,
|
||||
proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} =
|
||||
if not received.finished() and evt.channelId == channelId:
|
||||
received.complete(evt.payload)
|
||||
,
|
||||
)
|
||||
.expect("listen ChannelMessageReceivedEvent")
|
||||
|
||||
## Build a `WakuMessage` that looks like one that came in off the
|
||||
## wire from a peer: the spec marker on `meta` plus the right content
|
||||
## topic. The manager's ingress listener should pick it up,
|
||||
## decrypt (noop), unwrap SDS (pass-through), reassemble (one
|
||||
## segment), and finally emit `ChannelMessageReceivedEvent`.
|
||||
let inboundMsg = WakuMessage(
|
||||
payload: appPayload,
|
||||
contentTopic: contentTopic,
|
||||
version: 0,
|
||||
meta: LipWireReliableChannelVersion.toBytes(),
|
||||
)
|
||||
|
||||
waku_message_events.MessageReceivedEvent.emit(
|
||||
brokerCtx,
|
||||
waku_message_events.MessageReceivedEvent(messageHash: "", message: inboundMsg),
|
||||
)
|
||||
|
||||
let arrived = await received.withTimeout(TestTimeout)
|
||||
check arrived
|
||||
if arrived:
|
||||
check received.read() == appPayload
|
||||
|
||||
await manager.stop()
|
||||
|
||||
asyncTest "manager drops unmarked WakuMessage":
|
||||
## Mirror of the above: same content topic, but `meta` is empty
|
||||
## (i.e. foreign traffic). The channel-level event must NOT fire.
|
||||
const
|
||||
channelId = ChannelId("test-channel-2")
|
||||
contentTopic = ContentTopic("/reliable-channel/test/proto")
|
||||
let appPayload = "foreign payload".toBytes()
|
||||
|
||||
var manager: ReliableChannelManager
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
manager = (await ReliableChannelManager.new(createApiNodeConf())).expect(
|
||||
"Failed to create manager"
|
||||
)
|
||||
|
||||
setNoopEncryption()
|
||||
|
||||
discard manager
|
||||
.createReliableChannel(channelId, contentTopic, SdsParticipantID("local"))
|
||||
.expect("createReliableChannel")
|
||||
|
||||
var fired = false
|
||||
discard ChannelMessageReceivedEvent
|
||||
.listen(
|
||||
brokerCtx,
|
||||
proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} =
|
||||
if evt.channelId == channelId:
|
||||
fired = true
|
||||
,
|
||||
)
|
||||
.expect("listen ChannelMessageReceivedEvent")
|
||||
|
||||
let inboundMsg = WakuMessage(
|
||||
payload: appPayload,
|
||||
contentTopic: contentTopic,
|
||||
version: 0,
|
||||
meta: @[], ## no Reliable Channel spec marker
|
||||
)
|
||||
|
||||
waku_message_events.MessageReceivedEvent.emit(
|
||||
brokerCtx,
|
||||
waku_message_events.MessageReceivedEvent(messageHash: "", message: inboundMsg),
|
||||
)
|
||||
|
||||
## Give the event broker a chance to fan out.
|
||||
await sleepAsync(100.milliseconds)
|
||||
check not fired
|
||||
|
||||
await manager.stop()
|
||||
Loading…
x
Reference in New Issue
Block a user