mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-06-06 05:59:33 +00:00
nim and nimble script updates for windows
This commit is contained in:
commit
632f1fe066
4
.github/workflows/ci-daily.yml
vendored
4
.github/workflows/ci-daily.yml
vendored
@ -3,11 +3,13 @@ name: Daily logos-delivery CI
|
|||||||
on:
|
on:
|
||||||
schedule:
|
schedule:
|
||||||
- cron: '30 6 * * *'
|
- cron: '30 6 * * *'
|
||||||
|
workflow_dispatch:
|
||||||
|
|
||||||
env:
|
env:
|
||||||
NPROC: 2
|
NPROC: 2
|
||||||
MAKEFLAGS: "-j${NPROC}"
|
MAKEFLAGS: "-j${NPROC}"
|
||||||
NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none"
|
NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none -d:disableMarchNative"
|
||||||
|
NIM_PARAMS: "-d:disableMarchNative"
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
build:
|
build:
|
||||||
|
|||||||
23
.github/workflows/ci.yml
vendored
23
.github/workflows/ci.yml
vendored
@ -13,7 +13,8 @@ concurrency:
|
|||||||
env:
|
env:
|
||||||
NPROC: 2
|
NPROC: 2
|
||||||
MAKEFLAGS: "-j${NPROC}"
|
MAKEFLAGS: "-j${NPROC}"
|
||||||
NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none"
|
NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none -d:disableMarchNative"
|
||||||
|
NIM_PARAMS: "-d:disableMarchNative"
|
||||||
NIM_VERSION: '2.2.4'
|
NIM_VERSION: '2.2.4'
|
||||||
NIMBLE_VERSION: '0.22.3'
|
NIMBLE_VERSION: '0.22.3'
|
||||||
|
|
||||||
@ -35,6 +36,9 @@ jobs:
|
|||||||
- 'nimble.lock'
|
- 'nimble.lock'
|
||||||
- 'waku.nimble'
|
- 'waku.nimble'
|
||||||
- 'Makefile'
|
- 'Makefile'
|
||||||
|
- 'scripts/**'
|
||||||
|
- 'flake.nix'
|
||||||
|
- 'flake.lock'
|
||||||
- 'library/**'
|
- 'library/**'
|
||||||
- 'liblogosdelivery/**'
|
- 'liblogosdelivery/**'
|
||||||
v2:
|
v2:
|
||||||
@ -43,6 +47,7 @@ jobs:
|
|||||||
- 'tools/**'
|
- 'tools/**'
|
||||||
- 'tests/all_tests_v2.nim'
|
- 'tests/all_tests_v2.nim'
|
||||||
- 'tests/**'
|
- 'tests/**'
|
||||||
|
- 'channels/**'
|
||||||
docker:
|
docker:
|
||||||
- 'docker/**'
|
- 'docker/**'
|
||||||
|
|
||||||
@ -156,7 +161,7 @@ jobs:
|
|||||||
fi
|
fi
|
||||||
|
|
||||||
export MAKEFLAGS="-j1"
|
export MAKEFLAGS="-j1"
|
||||||
export NIMFLAGS="--colors:off -d:chronicles_colors:none"
|
export NIMFLAGS="--colors:off -d:chronicles_colors:none -d:disableMarchNative"
|
||||||
export USE_LIBBACKTRACE=0
|
export USE_LIBBACKTRACE=0
|
||||||
|
|
||||||
make V=1 POSTGRES=$postgres_enabled test
|
make V=1 POSTGRES=$postgres_enabled test
|
||||||
@ -176,20 +181,6 @@ jobs:
|
|||||||
|
|
||||||
secrets: inherit
|
secrets: inherit
|
||||||
|
|
||||||
js-waku-node:
|
|
||||||
needs: build-docker-image
|
|
||||||
uses: logos-messaging/logos-delivery-js/.github/workflows/test-node.yml@master
|
|
||||||
with:
|
|
||||||
nim_wakunode_image: ${{ needs.build-docker-image.outputs.image }}
|
|
||||||
test_type: node
|
|
||||||
|
|
||||||
js-waku-node-optional:
|
|
||||||
needs: build-docker-image
|
|
||||||
uses: logos-messaging/logos-delivery-js/.github/workflows/test-node.yml@master
|
|
||||||
with:
|
|
||||||
nim_wakunode_image: ${{ needs.build-docker-image.outputs.image }}
|
|
||||||
test_type: node-optional
|
|
||||||
|
|
||||||
lint:
|
lint:
|
||||||
name: "Lint"
|
name: "Lint"
|
||||||
runs-on: ubuntu-22.04
|
runs-on: ubuntu-22.04
|
||||||
|
|||||||
7
.github/workflows/version-check.yml
vendored
7
.github/workflows/version-check.yml
vendored
@ -28,8 +28,11 @@ jobs:
|
|||||||
set -euo pipefail
|
set -euo pipefail
|
||||||
NIMBLE_VERSION=$(grep -m1 '^version = ' waku.nimble | sed -E 's/version = "([^"]+)"/\1/')
|
NIMBLE_VERSION=$(grep -m1 '^version = ' waku.nimble | sed -E 's/version = "([^"]+)"/\1/')
|
||||||
# Nearest tag reachable from HEAD; --abbrev=0 drops the -<n>-g<sha>
|
# Nearest tag reachable from HEAD; --abbrev=0 drops the -<n>-g<sha>
|
||||||
# suffix so we get the bare tag (e.g. v0.38.0).
|
# suffix so we get the bare tag (e.g. v0.38.0). `--match 'v*'` skips
|
||||||
BASE_TAG=$(git describe --tags --abbrev=0 2>/dev/null || echo "")
|
# the moving `nightly` tag (auto-updated by the daily CI to point at
|
||||||
|
# master HEAD), which would otherwise be picked as the nearest tag
|
||||||
|
# and break the version-sort comparison below.
|
||||||
|
BASE_TAG=$(git describe --tags --abbrev=0 --match 'v*' 2>/dev/null || echo "")
|
||||||
BASE_TAG=${BASE_TAG#v}
|
BASE_TAG=${BASE_TAG#v}
|
||||||
# Compare on the base version, ignoring any -rc.N prerelease suffix.
|
# Compare on the base version, ignoring any -rc.N prerelease suffix.
|
||||||
BASE_TAG=${BASE_TAG%%-*}
|
BASE_TAG=${BASE_TAG%%-*}
|
||||||
|
|||||||
7
Makefile
7
Makefile
@ -24,6 +24,7 @@ export PATH := $(HOME)/.nimble/bin:$(PATH)
|
|||||||
# NIM binary location
|
# NIM binary location
|
||||||
NIM_BINARY := $(shell which nim 2>/dev/null)
|
NIM_BINARY := $(shell which nim 2>/dev/null)
|
||||||
NPH := $(HOME)/.nimble/bin/nph
|
NPH := $(HOME)/.nimble/bin/nph
|
||||||
|
NIMBLE := $(HOME)/.nimble/bin/nimble
|
||||||
NIMBLEDEPS_STAMP := nimbledeps/.nimble-setup
|
NIMBLEDEPS_STAMP := nimbledeps/.nimble-setup
|
||||||
|
|
||||||
# Compilation parameters
|
# Compilation parameters
|
||||||
@ -71,7 +72,7 @@ waku.nims:
|
|||||||
ln -s waku.nimble $@
|
ln -s waku.nimble $@
|
||||||
|
|
||||||
$(NIMBLEDEPS_STAMP): nimble.lock | install-nimble build-nph waku.nims
|
$(NIMBLEDEPS_STAMP): nimble.lock | install-nimble build-nph waku.nims
|
||||||
nimble setup --localdeps
|
$(NIMBLE) setup --localdeps
|
||||||
touch $@
|
touch $@
|
||||||
|
|
||||||
# Must be phony so the recipe always runs and the sub-make re-evaluates
|
# Must be phony so the recipe always runs and the sub-make re-evaluates
|
||||||
@ -92,10 +93,14 @@ REQUIRED_NIM_VERSION := $(shell grep -E '^const RequiredNimVersion\s*=' waku.
|
|||||||
REQUIRED_NIMBLE_VERSION := $(shell grep -E '^const RequiredNimbleVersion\s*=' waku.nimble | grep -oE '"[0-9]+\.[0-9]+\.[0-9]+"' | tr -d '"')
|
REQUIRED_NIMBLE_VERSION := $(shell grep -E '^const RequiredNimbleVersion\s*=' waku.nimble | grep -oE '"[0-9]+\.[0-9]+\.[0-9]+"' | tr -d '"')
|
||||||
|
|
||||||
install-nim:
|
install-nim:
|
||||||
|
ifneq ($(detected_OS),Windows)
|
||||||
scripts/install_nim.sh $(REQUIRED_NIM_VERSION)
|
scripts/install_nim.sh $(REQUIRED_NIM_VERSION)
|
||||||
|
endif
|
||||||
|
|
||||||
install-nimble: install-nim
|
install-nimble: install-nim
|
||||||
|
ifneq ($(detected_OS),Windows)
|
||||||
scripts/install_nimble.sh $(REQUIRED_NIMBLE_VERSION)
|
scripts/install_nimble.sh $(REQUIRED_NIMBLE_VERSION)
|
||||||
|
endif
|
||||||
|
|
||||||
build:
|
build:
|
||||||
mkdir -p build
|
mkdir -p build
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[tables, times, strutils, hashes, sequtils, json],
|
std/[tables, times, strutils, hashes, sequtils, json, options],
|
||||||
chronos,
|
chronos,
|
||||||
confutils,
|
confutils,
|
||||||
chronicles,
|
chronicles,
|
||||||
@ -267,10 +267,16 @@ when isMainModule:
|
|||||||
else:
|
else:
|
||||||
nodev2ExtPort
|
nodev2ExtPort
|
||||||
|
|
||||||
|
let nodev2Key =
|
||||||
|
if conf.nodekey.isSome():
|
||||||
|
conf.nodekey.get()
|
||||||
|
else:
|
||||||
|
crypto.PrivateKey.random(Secp256k1, rng[]).tryGet()
|
||||||
|
|
||||||
let bridge = Chat2Matterbridge.new(
|
let bridge = Chat2Matterbridge.new(
|
||||||
mbHostUri = "http://" & $initTAddress(conf.mbHostAddress, Port(conf.mbHostPort)),
|
mbHostUri = "http://" & $initTAddress(conf.mbHostAddress, Port(conf.mbHostPort)),
|
||||||
mbGateway = conf.mbGateway,
|
mbGateway = conf.mbGateway,
|
||||||
nodev2Key = conf.nodekey,
|
nodev2Key = nodev2Key,
|
||||||
nodev2BindIp = conf.listenAddress,
|
nodev2BindIp = conf.listenAddress,
|
||||||
nodev2BindPort = Port(uint16(conf.libp2pTcpPort) + conf.portsShift),
|
nodev2BindPort = Port(uint16(conf.libp2pTcpPort) + conf.portsShift),
|
||||||
nodev2ExtIp = nodev2ExtIp,
|
nodev2ExtIp = nodev2ExtIp,
|
||||||
|
|||||||
@ -1,4 +1,5 @@
|
|||||||
import
|
import
|
||||||
|
std/options,
|
||||||
confutils,
|
confutils,
|
||||||
confutils/defs,
|
confutils/defs,
|
||||||
confutils/std/net,
|
confutils/std/net,
|
||||||
@ -45,7 +46,7 @@ type Chat2MatterbridgeConf* = object
|
|||||||
|
|
||||||
metricsServerAddress* {.
|
metricsServerAddress* {.
|
||||||
desc: "Listening address of the metrics server",
|
desc: "Listening address of the metrics server",
|
||||||
defaultValue: parseIpAddress("127.0.0.1"),
|
defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]),
|
||||||
name: "metrics-server-address"
|
name: "metrics-server-address"
|
||||||
.}: IpAddress
|
.}: IpAddress
|
||||||
|
|
||||||
@ -62,10 +63,8 @@ type Chat2MatterbridgeConf* = object
|
|||||||
.}: seq[string]
|
.}: seq[string]
|
||||||
|
|
||||||
nodekey* {.
|
nodekey* {.
|
||||||
desc: "P2P node private key as hex",
|
desc: "P2P node private key as hex", defaultValueDesc: "random", name: "nodekey"
|
||||||
defaultValue: crypto.PrivateKey.random(Secp256k1, newRng()[]).tryGet(),
|
.}: Option[crypto.PrivateKey]
|
||||||
name: "nodekey"
|
|
||||||
.}: crypto.PrivateKey
|
|
||||||
|
|
||||||
store* {.
|
store* {.
|
||||||
desc: "Flag whether to start store protocol", defaultValue: true, name: "store"
|
desc: "Flag whether to start store protocol", defaultValue: true, name: "store"
|
||||||
@ -94,7 +93,7 @@ type Chat2MatterbridgeConf* = object
|
|||||||
# Matterbridge options
|
# Matterbridge options
|
||||||
mbHostAddress* {.
|
mbHostAddress* {.
|
||||||
desc: "Listening address of the Matterbridge host",
|
desc: "Listening address of the Matterbridge host",
|
||||||
defaultValue: parseIpAddress("127.0.0.1"),
|
defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]),
|
||||||
name: "mb-host-address"
|
name: "mb-host-address"
|
||||||
.}: IpAddress
|
.}: IpAddress
|
||||||
|
|
||||||
|
|||||||
@ -162,7 +162,8 @@ type
|
|||||||
|
|
||||||
metricsServerAddress* {.
|
metricsServerAddress* {.
|
||||||
desc: "Listening address of the metrics server.",
|
desc: "Listening address of the metrics server.",
|
||||||
defaultValue: parseIpAddress("127.0.0.1"),
|
defaultValue:
|
||||||
|
IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]),
|
||||||
name: "metrics-server-address"
|
name: "metrics-server-address"
|
||||||
.}: IpAddress
|
.}: IpAddress
|
||||||
|
|
||||||
@ -194,7 +195,10 @@ type
|
|||||||
|
|
||||||
dnsDiscoveryNameServers* {.
|
dnsDiscoveryNameServers* {.
|
||||||
desc: "DNS name server IPs to query. Argument may be repeated.",
|
desc: "DNS name server IPs to query. Argument may be repeated.",
|
||||||
defaultValue: @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")],
|
defaultValue: @[
|
||||||
|
IpAddress(family: IpAddressFamily.IPv4, address_v4: [1'u8, 1, 1, 1]),
|
||||||
|
IpAddress(family: IpAddressFamily.IPv4, address_v4: [1'u8, 0, 0, 1]),
|
||||||
|
],
|
||||||
name: "dns-discovery-name-server"
|
name: "dns-discovery-name-server"
|
||||||
.}: seq[IpAddress]
|
.}: seq[IpAddress]
|
||||||
|
|
||||||
|
|||||||
@ -133,7 +133,7 @@ type LiteProtocolTesterConf* = object
|
|||||||
## Tester REST service configuration
|
## Tester REST service configuration
|
||||||
restAddress* {.
|
restAddress* {.
|
||||||
desc: "Listening address of the REST HTTP server.",
|
desc: "Listening address of the REST HTTP server.",
|
||||||
defaultValue: parseIpAddress("127.0.0.1"),
|
defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]),
|
||||||
name: "rest-address"
|
name: "rest-address"
|
||||||
.}: IpAddress
|
.}: IpAddress
|
||||||
|
|
||||||
|
|||||||
@ -116,7 +116,7 @@ type NetworkMonitorConf* = object
|
|||||||
|
|
||||||
metricsServerAddress* {.
|
metricsServerAddress* {.
|
||||||
desc: "Listening address of the metrics server.",
|
desc: "Listening address of the metrics server.",
|
||||||
defaultValue: parseIpAddress("127.0.0.1"),
|
defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]),
|
||||||
name: "metrics-server-address"
|
name: "metrics-server-address"
|
||||||
.}: IpAddress
|
.}: IpAddress
|
||||||
|
|
||||||
|
|||||||
25
channels/encryption/encryption.nim
Normal file
25
channels/encryption/encryption.nim
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
## Optional encryption hooks for the Reliable Channel API.
|
||||||
|
##
|
||||||
|
## Modelled as `RequestBroker`s: the broker pattern lets the channel
|
||||||
|
## delegate work to a provider that may live in any module without
|
||||||
|
## introducing a direct dependency. If no provider is registered the
|
||||||
|
## broker returns an error, so installing the noop providers from
|
||||||
|
## `noop_encryption` is required when the application does not want
|
||||||
|
## actual encryption.
|
||||||
|
##
|
||||||
|
## Applied per-segment after SDS processing on outgoing, and before
|
||||||
|
## SDS processing on incoming. No specific scheme is mandated.
|
||||||
|
##
|
||||||
|
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
|
||||||
|
|
||||||
|
import brokers/request_broker
|
||||||
|
|
||||||
|
export request_broker
|
||||||
|
|
||||||
|
RequestBroker:
|
||||||
|
type Encrypt* = seq[byte]
|
||||||
|
proc signature*(payload: seq[byte]): Future[Result[Encrypt, string]] {.async.}
|
||||||
|
|
||||||
|
RequestBroker:
|
||||||
|
type Decrypt* = seq[byte]
|
||||||
|
proc signature*(payload: seq[byte]): Future[Result[Decrypt, string]] {.async.}
|
||||||
18
channels/encryption/noop_encryption.nim
Normal file
18
channels/encryption/noop_encryption.nim
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
## No-op encryption providers. Install these when the application does
|
||||||
|
## not want actual encryption so the `Encrypt` / `Decrypt` brokers have
|
||||||
|
## something to dispatch to.
|
||||||
|
|
||||||
|
import results
|
||||||
|
import chronos
|
||||||
|
import ./encryption
|
||||||
|
|
||||||
|
proc setNoopEncryption*() =
|
||||||
|
discard Encrypt.setProvider(
|
||||||
|
proc(payload: seq[byte]): Future[Result[Encrypt, string]] {.async.} =
|
||||||
|
return ok(Encrypt(payload))
|
||||||
|
)
|
||||||
|
|
||||||
|
discard Decrypt.setProvider(
|
||||||
|
proc(payload: seq[byte]): Future[Result[Decrypt, string]] {.async.} =
|
||||||
|
return ok(Decrypt(payload))
|
||||||
|
)
|
||||||
39
channels/events.nim
Normal file
39
channels/events.nim
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
## Reliable Channel event types emitted to API consumers.
|
||||||
|
##
|
||||||
|
## Lifecycle events for individual segments (sent / propagated / errored)
|
||||||
|
## are the same as the network-level ones the DeliveryService already
|
||||||
|
## emits — `requestId` is shared across layers — so we just re-export
|
||||||
|
## `waku/events/message_events` and avoid declaring duplicates.
|
||||||
|
##
|
||||||
|
## Only the channel-level `MessageReceivedEvent` carries data that has
|
||||||
|
## no analogue in the lower layer (reassembled application payload,
|
||||||
|
## senderId, channelId), so it lives here.
|
||||||
|
|
||||||
|
import waku/events/message_events as waku_message_events
|
||||||
|
import brokers/event_broker
|
||||||
|
|
||||||
|
import ./types as channel_types
|
||||||
|
|
||||||
|
export waku_message_events, channel_types, event_broker
|
||||||
|
|
||||||
|
EventBroker:
|
||||||
|
type ChannelMessageReceivedEvent* = object
|
||||||
|
channelId*: ChannelId
|
||||||
|
senderId*: SdsParticipantID
|
||||||
|
payload*: seq[byte]
|
||||||
|
|
||||||
|
EventBroker:
|
||||||
|
## Emitted when every segment of a channel-level `send()` reached
|
||||||
|
## `Confirmed`. Channel-level analogue of `MessageSentEvent`; the
|
||||||
|
## `requestId` is the channel-layer parent returned by `send()`.
|
||||||
|
type ChannelMessageSentEvent* = object
|
||||||
|
channelId*: ChannelId
|
||||||
|
requestId*: RequestId
|
||||||
|
|
||||||
|
EventBroker:
|
||||||
|
## Emitted when a channel-level `send()` finalises with at least one
|
||||||
|
## segment in `Failed`. Channel-level analogue of `MessageErrorEvent`.
|
||||||
|
type ChannelMessageErrorEvent* = object
|
||||||
|
channelId*: ChannelId
|
||||||
|
requestId*: RequestId
|
||||||
|
error*: string
|
||||||
80
channels/rate_limit_manager/rate_limit_manager.nim
Normal file
80
channels/rate_limit_manager/rate_limit_manager.nim
Normal file
@ -0,0 +1,80 @@
|
|||||||
|
## Rate Limit Manager for the Reliable Channel API.
|
||||||
|
##
|
||||||
|
## Tracks messages sent per RLN epoch and delays dispatch when the
|
||||||
|
## limit is approached, ensuring RLN compliance on enforcing relays.
|
||||||
|
##
|
||||||
|
## For the skeleton this is a pass-through: messages are immediately
|
||||||
|
## released as ready-to-send. Real epoch budgeting will be added later.
|
||||||
|
##
|
||||||
|
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
|
||||||
|
|
||||||
|
import std/times
|
||||||
|
import message
|
||||||
|
import brokers/event_broker
|
||||||
|
import brokers/broker_context
|
||||||
|
|
||||||
|
export event_broker, broker_context
|
||||||
|
export message.SdsChannelID
|
||||||
|
|
||||||
|
const
|
||||||
|
DefaultEpochPeriodSec* = 600
|
||||||
|
DefaultMessagesPerEpoch* = 1
|
||||||
|
|
||||||
|
EventBroker:
|
||||||
|
## Emitted by `enqueueToSend` carrying the batch of opaque message
|
||||||
|
## blobs that may now leave the rate limiter and continue down the
|
||||||
|
## outgoing pipeline (encryption -> dispatch). Bytes only: the rate
|
||||||
|
## limiter is intentionally agnostic of SDS, so anything serialisable
|
||||||
|
## can flow through it.
|
||||||
|
##
|
||||||
|
## `channelId` lets listeners filter to their own channel, since all
|
||||||
|
## reliable channels share the underlying Waku node's broker context.
|
||||||
|
type ReadyToSendEvent* = ref object
|
||||||
|
channelId*: SdsChannelID
|
||||||
|
msgs*: seq[seq[byte]]
|
||||||
|
|
||||||
|
type
|
||||||
|
RateLimitConfig* = object
|
||||||
|
enabled*: bool ## spec: rate limiting opt-in; SHOULD be true when RLN active
|
||||||
|
epochPeriodSec*: int
|
||||||
|
messagesPerEpoch*: int
|
||||||
|
|
||||||
|
RateLimitManager* = ref object
|
||||||
|
config*: RateLimitConfig
|
||||||
|
queue*: seq[seq[byte]]
|
||||||
|
currentEpochStart*: Time
|
||||||
|
sentInCurrentEpoch*: int
|
||||||
|
channelId*: SdsChannelID ## tag for the emitted `ReadyToSendEvent`
|
||||||
|
brokerCtx: BrokerContext
|
||||||
|
|
||||||
|
proc new*(
|
||||||
|
T: type RateLimitManager,
|
||||||
|
config: RateLimitConfig,
|
||||||
|
channelId: SdsChannelID,
|
||||||
|
brokerCtx: BrokerContext = globalBrokerContext(),
|
||||||
|
): T =
|
||||||
|
return T(
|
||||||
|
config: config,
|
||||||
|
queue: @[],
|
||||||
|
currentEpochStart: getTime(),
|
||||||
|
sentInCurrentEpoch: 0,
|
||||||
|
channelId: channelId,
|
||||||
|
brokerCtx: brokerCtx,
|
||||||
|
)
|
||||||
|
|
||||||
|
proc enqueueToSend*(self: RateLimitManager, msg: seq[byte]) =
|
||||||
|
## Skeleton behaviour: enqueue and immediately release as a single
|
||||||
|
## ready batch. Real per-epoch budgeting will park messages on
|
||||||
|
## `self.queue` and emit only when the budget allows.
|
||||||
|
ReadyToSendEvent.emit(
|
||||||
|
self.brokerCtx, ReadyToSendEvent(channelId: self.channelId, msgs: @[msg])
|
||||||
|
)
|
||||||
|
|
||||||
|
proc dequeueReady*(self: RateLimitManager): seq[seq[byte]] =
|
||||||
|
## Returns the set of queued messages that may be dispatched now
|
||||||
|
## without exceeding the configured rate limit.
|
||||||
|
discard
|
||||||
|
|
||||||
|
proc resetEpoch*(self: RateLimitManager) =
|
||||||
|
self.currentEpochStart = getTime()
|
||||||
|
self.sentInCurrentEpoch = 0
|
||||||
453
channels/reliable_channel.nim
Normal file
453
channels/reliable_channel.nim
Normal file
@ -0,0 +1,453 @@
|
|||||||
|
## Reliable Channel type.
|
||||||
|
##
|
||||||
|
## A `ReliableChannel` orchestrates segmentation, SDS (end-to-end
|
||||||
|
## reliability), optional encryption, and rate-limited dispatch on top
|
||||||
|
## of the Messaging API for a single channel.
|
||||||
|
##
|
||||||
|
## Outgoing pipeline: Segment -> SDS -> Rate Limit -> Encrypt -> Dispatch
|
||||||
|
## Incoming pipeline: Decrypt -> SDS -> Reassemble -> Emit event
|
||||||
|
##
|
||||||
|
## Channels are owned by a `ReliableChannelManager`. Lifecycle and send
|
||||||
|
## operations are addressed by `ChannelId`, so callers only need to keep
|
||||||
|
## an opaque handle around.
|
||||||
|
##
|
||||||
|
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
|
||||||
|
|
||||||
|
import std/[options, sets, tables]
|
||||||
|
import results
|
||||||
|
import chronos
|
||||||
|
import bearssl/rand
|
||||||
|
import stew/byteutils
|
||||||
|
import libp2p/crypto/crypto as libp2p_crypto
|
||||||
|
|
||||||
|
import waku/api/api
|
||||||
|
import waku/factory/waku as waku_factory
|
||||||
|
import waku/node/delivery_service/send_service
|
||||||
|
import waku/waku_core/topics
|
||||||
|
|
||||||
|
import ./events
|
||||||
|
import ./segmentation/segmentation
|
||||||
|
import ./scalable_data_sync/scalable_data_sync
|
||||||
|
import ./rate_limit_manager/rate_limit_manager
|
||||||
|
import ./encryption/encryption
|
||||||
|
|
||||||
|
export
|
||||||
|
api, waku_factory, events, segmentation, scalable_data_sync, rate_limit_manager,
|
||||||
|
encryption
|
||||||
|
|
||||||
|
const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1"
|
||||||
|
## Wire-format spec marker for the Reliable Channel layer, as defined
|
||||||
|
## in the reliable-channel-api LIP (`Wire Format / Spec Marker`).
|
||||||
|
## A `WakuMessage` whose `meta` field does not equal these bytes is
|
||||||
|
## not addressed to this layer and is silently dropped on ingress.
|
||||||
|
## The trailing `/N` is the wire-format version and is bumped only
|
||||||
|
## on breaking on-the-wire changes; implementations pin one version.
|
||||||
|
|
||||||
|
type
|
||||||
|
SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.
|
||||||
|
async: (raises: [CatchableError]), gcsafe
|
||||||
|
.}
|
||||||
|
## Egress dispatch boundary. Defaults to `waku.send`; tests inject a
|
||||||
|
## fake that records calls and returns canned `RequestId`s so the
|
||||||
|
## send state machine can be exercised end-to-end without a network.
|
||||||
|
|
||||||
|
MessagePersistence {.pure.} = enum
|
||||||
|
Persistent
|
||||||
|
Ephemeral
|
||||||
|
|
||||||
|
SegmentSendState {.pure.} = enum
|
||||||
|
## Lifecycle of a single segment as tracked by the channel. The
|
||||||
|
## messaging layer has its own richer `DeliveryState` (retries,
|
||||||
|
## propagated-vs-validated); here we only model what's needed to
|
||||||
|
## decide when a `channelReqId` is fully accounted for.
|
||||||
|
AwaitingRateLimit ## Pushed by `send`; not yet released by rate_limit_manager.
|
||||||
|
InFlight
|
||||||
|
## Released by rate_limit_manager and handed to delivery_service;
|
||||||
|
## `messagingReqId` is now set.
|
||||||
|
Confirmed ## `MessageSentEvent` arrived for `messagingReqId`.
|
||||||
|
Failed
|
||||||
|
## `MessageErrorEvent` arrived for `messagingReqId`, or the local
|
||||||
|
## delivery-task construction failed before any id was reachable.
|
||||||
|
|
||||||
|
PendingMessagingRequest = object
|
||||||
|
## One entry per segment (i.e. per messaging-layer request). The
|
||||||
|
## relative order of `AwaitingRateLimit` entries must match the
|
||||||
|
## order in which `rate_limit_manager` re-emits messages, which is
|
||||||
|
## FIFO with `send()`.
|
||||||
|
channelReqId*: RequestId
|
||||||
|
## The channel-layer parent id returned to the caller of `send()` in channel layer.
|
||||||
|
## One channel request maps to N pending messaging requests.
|
||||||
|
messagingReqId*: Option[RequestId]
|
||||||
|
## Per-segment messaging layer id. `none` until `onReadyToSend` assigns it.
|
||||||
|
persistenceReqType: MessagePersistence
|
||||||
|
segmentSendState*: SegmentSendState
|
||||||
|
|
||||||
|
ReliableChannel* = ref object
|
||||||
|
## Spec-defined public type. Fields are private so callers cannot
|
||||||
|
## mutate internals and break invariants. Getters are added below
|
||||||
|
## for the few values consumers may need.
|
||||||
|
sendHandler: SendHandler
|
||||||
|
channelId: ChannelId
|
||||||
|
contentTopic: ContentTopic
|
||||||
|
senderId: SdsParticipantID
|
||||||
|
rng: ref HmacDrbgContext
|
||||||
|
segmentation: SegmentationHandler
|
||||||
|
sdsHandler: SdsHandler
|
||||||
|
rateLimit: RateLimitManager
|
||||||
|
|
||||||
|
requestIds: Table[RequestId, seq[RequestId]]
|
||||||
|
pendingMessagingRequests: seq[PendingMessagingRequest]
|
||||||
|
## Entries are kept until the matching segment reaches a final
|
||||||
|
## state (`Confirmed` or `Failed`); a whole channel request is
|
||||||
|
## then pruned in one pass once all its segments are final.
|
||||||
|
brokerCtx: BrokerContext
|
||||||
|
|
||||||
|
func getChannelId*(self: ReliableChannel): ChannelId {.inline.} =
|
||||||
|
self.channelId
|
||||||
|
|
||||||
|
func getContentTopic*(self: ReliableChannel): ContentTopic {.inline.} =
|
||||||
|
self.contentTopic
|
||||||
|
|
||||||
|
func getSenderId*(self: ReliableChannel): SdsParticipantID {.inline.} =
|
||||||
|
self.senderId
|
||||||
|
|
||||||
|
func isFinal(state: SegmentSendState): bool {.inline.} =
|
||||||
|
return state in {SegmentSendState.Confirmed, SegmentSendState.Failed}
|
||||||
|
|
||||||
|
proc pruneCompletedChannelReqs(self: ReliableChannel) =
|
||||||
|
## Drop every `pendingMessagingRequests` entry whose `channelReqId`
|
||||||
|
## has all of its segments in a final state. A single failing
|
||||||
|
## segment doesn't trigger a drop on its own — we wait until siblings
|
||||||
|
## are also accounted for, so the channel-level outcome is decided
|
||||||
|
## from a complete picture. For each fully-final `channelReqId`, emit
|
||||||
|
## the channel-level final event before the entries are dropped:
|
||||||
|
## `ChannelMessageSentEvent` if every sibling Confirmed,
|
||||||
|
## `ChannelMessageErrorEvent` if any sibling Failed.
|
||||||
|
var hasPending = initHashSet[RequestId]()
|
||||||
|
var anyFailed = initHashSet[RequestId]()
|
||||||
|
for entry in self.pendingMessagingRequests:
|
||||||
|
if not entry.segmentSendState.isFinal():
|
||||||
|
hasPending.incl(entry.channelReqId)
|
||||||
|
elif entry.segmentSendState == SegmentSendState.Failed:
|
||||||
|
anyFailed.incl(entry.channelReqId)
|
||||||
|
|
||||||
|
var emitted = initHashSet[RequestId]()
|
||||||
|
for entry in self.pendingMessagingRequests:
|
||||||
|
if entry.channelReqId in hasPending or entry.channelReqId in emitted:
|
||||||
|
continue
|
||||||
|
emitted.incl(entry.channelReqId)
|
||||||
|
if entry.channelReqId in anyFailed:
|
||||||
|
ChannelMessageErrorEvent.emit(
|
||||||
|
self.brokerCtx,
|
||||||
|
ChannelMessageErrorEvent(
|
||||||
|
channelId: self.channelId,
|
||||||
|
requestId: entry.channelReqId,
|
||||||
|
error: "one or more segments failed",
|
||||||
|
),
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
ChannelMessageSentEvent.emit(
|
||||||
|
self.brokerCtx,
|
||||||
|
ChannelMessageSentEvent(
|
||||||
|
channelId: self.channelId, requestId: entry.channelReqId
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
self.pendingMessagingRequests.keepItIf(it.channelReqId in hasPending)
|
||||||
|
|
||||||
|
proc onMessageSent(self: ReliableChannel, messagingReqId: RequestId) =
|
||||||
|
## Invoked from this channel's `MessageSentEvent` listener. Flips
|
||||||
|
## the matching `InFlight` segment to `Confirmed` and prunes. The
|
||||||
|
## listener routes every event through here; entries that don't
|
||||||
|
## belong to this channel simply don't match and are no-ops.
|
||||||
|
self.pendingMessagingRequests.applyItIf(
|
||||||
|
it.segmentSendState == SegmentSendState.InFlight and
|
||||||
|
it.messagingReqId == some(messagingReqId)
|
||||||
|
):
|
||||||
|
it.segmentSendState = SegmentSendState.Confirmed
|
||||||
|
self.pruneCompletedChannelReqs()
|
||||||
|
|
||||||
|
proc onMessageError(self: ReliableChannel, messagingReqId: RequestId) =
|
||||||
|
## Symmetric to `onMessageSent` but for `MessageErrorEvent`.
|
||||||
|
self.pendingMessagingRequests.applyItIf(
|
||||||
|
it.segmentSendState == SegmentSendState.InFlight and
|
||||||
|
it.messagingReqId == some(messagingReqId)
|
||||||
|
):
|
||||||
|
it.segmentSendState = SegmentSendState.Failed
|
||||||
|
self.pruneCompletedChannelReqs()
|
||||||
|
|
||||||
|
proc onReadyToSend(
|
||||||
|
self: ReliableChannel, readyToSendEvent: ReadyToSendEvent
|
||||||
|
) {.async: (raises: []).} =
|
||||||
|
## Tail of the outgoing pipeline. Invoked from the `ReadyToSendEvent`
|
||||||
|
## listener once `rate_limit_manager` releases a batch of opaque
|
||||||
|
## blobs (already-encoded SDS messages):
|
||||||
|
##
|
||||||
|
## ... -> rate_limit_manager -> [encryption] -> dispatch
|
||||||
|
var idx = 0
|
||||||
|
for m in readyToSendEvent.msgs:
|
||||||
|
## The first `AwaitingRateLimit` entry in push order is the one
|
||||||
|
## this `m` belongs to: `send()` adds one entry per segment, and
|
||||||
|
## `rate_limit_manager` re-emits them in the same FIFO order, so
|
||||||
|
## the two sequences advance in lockstep. Earlier entries may
|
||||||
|
## already be `InFlight` / `Confirmed` / `Failed` because they
|
||||||
|
## live on until every sibling of their `channelReqId` is final,
|
||||||
|
## so we walk past those to find the next one that was awaiting for this batch.
|
||||||
|
while idx < self.pendingMessagingRequests.len and
|
||||||
|
self.pendingMessagingRequests[idx].segmentSendState !=
|
||||||
|
SegmentSendState.AwaitingRateLimit
|
||||||
|
:
|
||||||
|
idx.inc()
|
||||||
|
if idx >= self.pendingMessagingRequests.len:
|
||||||
|
## rate_limit_manager emitted more messages than we have pending —
|
||||||
|
## should not happen given `send` pushes one entry per enqueued
|
||||||
|
## SDS payload. Drop silently rather than corrupt state.
|
||||||
|
break
|
||||||
|
|
||||||
|
let channelReqId = self.pendingMessagingRequests[idx].channelReqId
|
||||||
|
let isEphemeral =
|
||||||
|
self.pendingMessagingRequests[idx].persistenceReqType ==
|
||||||
|
MessagePersistence.Ephemeral
|
||||||
|
|
||||||
|
## TODO: revisit which fields of the SDS message must be encrypted.
|
||||||
|
## Encrypting the whole encoded blob forces every receiver to attempt
|
||||||
|
## decryption before it can route, which breaks selective dispatch.
|
||||||
|
## Leave routing metadata (channelId, causal-history references) in
|
||||||
|
## clear and encrypt only the application payload.
|
||||||
|
let encRes = await Encrypt.request(m)
|
||||||
|
let encrypted = encRes.valueOr:
|
||||||
|
MessageErrorEvent.emit(
|
||||||
|
self.brokerCtx,
|
||||||
|
MessageErrorEvent(
|
||||||
|
requestId: channelReqId, messageHash: "", error: "encryption failed: " & error
|
||||||
|
),
|
||||||
|
)
|
||||||
|
## Encryption failed *before* we could hand the segment to the
|
||||||
|
## delivery layer — no `messagingReqId` was minted and no
|
||||||
|
## `DeliveryTask` was queued on `sendService`. The delivery
|
||||||
|
## layer will therefore never emit a `MessageSentEvent` /
|
||||||
|
## `MessageErrorEvent` for this segment, so `onMessageError`
|
||||||
|
## won't fire either. Advance the state machine inline so the
|
||||||
|
## parent `channelReqId` can still be pruned once its siblings
|
||||||
|
## are also final.
|
||||||
|
self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.Failed
|
||||||
|
idx.inc()
|
||||||
|
continue
|
||||||
|
let wireBytes = seq[byte](encrypted)
|
||||||
|
|
||||||
|
## The `meta` field carries the Reliable Channel wire-format spec
|
||||||
|
## marker so the ingress side of any peer can route this WakuMessage
|
||||||
|
## to its Reliable Channel layer.
|
||||||
|
let envelope = MessageEnvelope(
|
||||||
|
contentTopic: self.contentTopic,
|
||||||
|
payload: wireBytes,
|
||||||
|
ephemeral: isEphemeral,
|
||||||
|
meta: LipWireReliableChannelVersion.toBytes(),
|
||||||
|
)
|
||||||
|
|
||||||
|
## `waku.send` is not annotated `(raises: [])`, but this listener is.
|
||||||
|
## Convert any raise to a Result error so the state machine handles
|
||||||
|
## both failure modes (Result.err and exception) through one path.
|
||||||
|
let sendRes =
|
||||||
|
try:
|
||||||
|
await self.sendHandler(envelope)
|
||||||
|
except CatchableError as e:
|
||||||
|
Result[RequestId, string].err("waku send raised: " & e.msg)
|
||||||
|
|
||||||
|
let messagingReqId = sendRes.valueOr:
|
||||||
|
MessageErrorEvent.emit(
|
||||||
|
self.brokerCtx,
|
||||||
|
MessageErrorEvent(
|
||||||
|
requestId: channelReqId, messageHash: "", error: "waku send failed: " & error
|
||||||
|
),
|
||||||
|
)
|
||||||
|
self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.Failed
|
||||||
|
idx.inc()
|
||||||
|
continue
|
||||||
|
|
||||||
|
self.pendingMessagingRequests[idx].messagingReqId = some(messagingReqId)
|
||||||
|
self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.InFlight
|
||||||
|
self.requestIds.mgetOrPut(channelReqId, @[]).add(messagingReqId)
|
||||||
|
idx.inc()
|
||||||
|
|
||||||
|
self.pruneCompletedChannelReqs()
|
||||||
|
|
||||||
|
proc send*(
|
||||||
|
self: ReliableChannel, payload: seq[byte], ephemeral: bool = false
|
||||||
|
): Result[RequestId, string] =
|
||||||
|
## Single application-level send. The first three stages of the
|
||||||
|
## outgoing pipeline are chained explicitly so the flow is visible
|
||||||
|
## at a glance:
|
||||||
|
##
|
||||||
|
## segmentation -> sds -> rate_limit_manager
|
||||||
|
##
|
||||||
|
## `rate_limit_manager.enqueueToSend` emits a `ReadyToSendEvent` with
|
||||||
|
## the SDS messages cleared for transmission; the channel's listener
|
||||||
|
## then runs the final stage (encryption -> dispatch). The
|
||||||
|
## `persistenceReqType` is carried alongside each segment in
|
||||||
|
## `pendingMessagingRequests` and stamped onto the eventual
|
||||||
|
## `MessageEnvelope`.
|
||||||
|
##
|
||||||
|
## The returned `RequestId` is the channel-level parent of one-or-more
|
||||||
|
## messaging-layer `RequestId`s; the mapping is recorded in
|
||||||
|
## `self.requestIds`.
|
||||||
|
if payload.len == 0:
|
||||||
|
return err("empty payload")
|
||||||
|
|
||||||
|
let channelReqId = RequestId.new(self.rng)
|
||||||
|
self.requestIds[channelReqId] = @[]
|
||||||
|
|
||||||
|
let persistenceReqType =
|
||||||
|
if ephemeral: MessagePersistence.Ephemeral else: MessagePersistence.Persistent
|
||||||
|
|
||||||
|
for segmentBytes in self.segmentation.performSegmentation(payload):
|
||||||
|
## Segments arrive already encoded; the segmentation module owns
|
||||||
|
## the wire format so SDS only ever sees opaque bytes.
|
||||||
|
let sdsBytes = self.sdsHandler.wrapOutgoing(
|
||||||
|
self.channelId, self.senderId, segmentBytes
|
||||||
|
).valueOr:
|
||||||
|
return err("SDS wrap failed: " & error)
|
||||||
|
self.pendingMessagingRequests.add(
|
||||||
|
PendingMessagingRequest(
|
||||||
|
channelReqId: channelReqId,
|
||||||
|
messagingReqId: none(RequestId),
|
||||||
|
persistenceReqType: persistenceReqType,
|
||||||
|
segmentSendState: SegmentSendState.AwaitingRateLimit,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.rateLimit.enqueueToSend(sdsBytes)
|
||||||
|
|
||||||
|
return ok(channelReqId)
|
||||||
|
|
||||||
|
proc onMessageReceived(
|
||||||
|
self: ReliableChannel, messageHash: string, payload: seq[byte]
|
||||||
|
) {.async: (raises: []).} =
|
||||||
|
## Ingress pipeline made visible:
|
||||||
|
##
|
||||||
|
## payload -> decrypt -> sds -> reassemble -> emit
|
||||||
|
##
|
||||||
|
## Invoked from this channel's `MessageReceivedEvent` listener, which
|
||||||
|
## already filtered on the spec marker and on `contentTopic`. The
|
||||||
|
## channel only sees the raw payload bytes for itself.
|
||||||
|
|
||||||
|
## Notice that the following "request" is implemented implicitly as a broker call to
|
||||||
|
## the `Decrypt` request broker.
|
||||||
|
let decRes = await Decrypt.request(payload)
|
||||||
|
let plaintext = decRes.valueOr:
|
||||||
|
MessageErrorEvent.emit(
|
||||||
|
self.brokerCtx,
|
||||||
|
MessageErrorEvent(
|
||||||
|
requestId: RequestId(""),
|
||||||
|
messageHash: messageHash,
|
||||||
|
error: "decryption failed: " & error,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
return
|
||||||
|
let plaintextBytes = seq[byte](plaintext)
|
||||||
|
|
||||||
|
let unwrapped = self.sdsHandler.handleIncoming(plaintextBytes)
|
||||||
|
if unwrapped.isErr():
|
||||||
|
return
|
||||||
|
|
||||||
|
let reassembled = self.segmentation.handleIncomingSegment(unwrapped.get().content)
|
||||||
|
if reassembled.isSome():
|
||||||
|
## Emit on the captured `brokerCtx` (the manager's), so the
|
||||||
|
## application listener that the manager has set up on that same
|
||||||
|
## context picks the event up.
|
||||||
|
ChannelMessageReceivedEvent.emit(
|
||||||
|
self.brokerCtx,
|
||||||
|
ChannelMessageReceivedEvent(
|
||||||
|
channelId: self.channelId,
|
||||||
|
senderId: self.senderId,
|
||||||
|
payload: reassembled.get().payload,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
proc new*(
|
||||||
|
T: type ReliableChannel,
|
||||||
|
waku: Waku,
|
||||||
|
channelId: ChannelId,
|
||||||
|
contentTopic: ContentTopic,
|
||||||
|
senderId: SdsParticipantID,
|
||||||
|
segConfig: SegmentationConfig,
|
||||||
|
sdsConfig: SdsConfig,
|
||||||
|
rateConfig: RateLimitConfig,
|
||||||
|
brokerCtx: BrokerContext = globalBrokerContext(),
|
||||||
|
sendHandler: SendHandler = nil,
|
||||||
|
): T =
|
||||||
|
## Pipeline handlers (segmentation/SDS/rate-limit) are constructed
|
||||||
|
## inside the channel rather than handed in by the caller — they are
|
||||||
|
## implementation details of the channel, not knobs the API consumer
|
||||||
|
## should be wiring up. Encryption is delegated to the `Encrypt`/
|
||||||
|
## `Decrypt` request brokers, so the channel keeps no per-instance
|
||||||
|
## encryption state either.
|
||||||
|
##
|
||||||
|
## `sendHandler` defaults to `waku.send`; tests pass a fake to drive
|
||||||
|
## the send state machine without touching the network.
|
||||||
|
let resolvedSendHandler =
|
||||||
|
if sendHandler.isNil():
|
||||||
|
proc(
|
||||||
|
envelope: MessageEnvelope
|
||||||
|
): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} =
|
||||||
|
return await waku.send(envelope)
|
||||||
|
else:
|
||||||
|
sendHandler
|
||||||
|
|
||||||
|
let chn = T(
|
||||||
|
sendHandler: resolvedSendHandler,
|
||||||
|
channelId: channelId,
|
||||||
|
contentTopic: contentTopic,
|
||||||
|
senderId: senderId,
|
||||||
|
rng: libp2p_crypto.newRng(),
|
||||||
|
segmentation: SegmentationHandler.new(segConfig),
|
||||||
|
sdsHandler: SdsHandler.new(sdsConfig, senderId),
|
||||||
|
rateLimit: RateLimitManager.new(rateConfig, channelId, brokerCtx),
|
||||||
|
requestIds: initTable[RequestId, seq[RequestId]](),
|
||||||
|
pendingMessagingRequests: @[],
|
||||||
|
brokerCtx: brokerCtx,
|
||||||
|
)
|
||||||
|
|
||||||
|
## Each channel owns its own egress + ingress + send-completion
|
||||||
|
## listeners on `chn.brokerCtx`, filtered to traffic addressed to
|
||||||
|
## this channel. Keeping the listeners (and the handler procs they
|
||||||
|
## call) inside the channel lets `onReadyToSend` /
|
||||||
|
## `onMessageReceived` / `onMessageSent` / `onMessageError` stay
|
||||||
|
## private — the manager doesn't need to know about them.
|
||||||
|
discard ReadyToSendEvent.listen(
|
||||||
|
chn.brokerCtx,
|
||||||
|
proc(evt: ReadyToSendEvent): Future[void] {.async: (raises: []).} =
|
||||||
|
if evt.channelId == chn.channelId:
|
||||||
|
await chn.onReadyToSend(evt)
|
||||||
|
,
|
||||||
|
)
|
||||||
|
|
||||||
|
discard MessageReceivedEvent.listen(
|
||||||
|
chn.brokerCtx,
|
||||||
|
proc(evt: MessageReceivedEvent): Future[void] {.async: (raises: []).} =
|
||||||
|
## Drop foreign traffic (non-Reliable-Channel `meta`) and traffic
|
||||||
|
## for other channels before doing any decode work.
|
||||||
|
if string.fromBytes(evt.message.meta) != LipWireReliableChannelVersion:
|
||||||
|
return
|
||||||
|
if evt.message.contentTopic != chn.contentTopic:
|
||||||
|
return
|
||||||
|
await chn.onMessageReceived(evt.messageHash, evt.message.payload)
|
||||||
|
,
|
||||||
|
)
|
||||||
|
|
||||||
|
## Send-completion events are tagged with the per-segment messaging
|
||||||
|
## `requestId` — globally unique, so we don't need any channel filter
|
||||||
|
## up front. The handler scans this channel's pending entries for a
|
||||||
|
## match and is a no-op when the id belongs to a different channel.
|
||||||
|
discard MessageSentEvent.listen(
|
||||||
|
chn.brokerCtx,
|
||||||
|
proc(evt: MessageSentEvent): Future[void] {.async: (raises: []).} =
|
||||||
|
chn.onMessageSent(evt.requestId),
|
||||||
|
)
|
||||||
|
|
||||||
|
discard MessageErrorEvent.listen(
|
||||||
|
chn.brokerCtx,
|
||||||
|
proc(evt: MessageErrorEvent): Future[void] {.async: (raises: []).} =
|
||||||
|
chn.onMessageError(evt.requestId),
|
||||||
|
)
|
||||||
|
|
||||||
|
return chn
|
||||||
141
channels/reliable_channel_manager.nim
Normal file
141
channels/reliable_channel_manager.nim
Normal file
@ -0,0 +1,141 @@
|
|||||||
|
## Reliable Channel API entry point.
|
||||||
|
##
|
||||||
|
## Owns the set of `ReliableChannel` instances and exposes lifecycle and
|
||||||
|
## send/receive operations addressed by `ChannelId`.
|
||||||
|
##
|
||||||
|
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
|
||||||
|
|
||||||
|
import std/tables
|
||||||
|
import results
|
||||||
|
import chronos
|
||||||
|
import stew/byteutils
|
||||||
|
|
||||||
|
import waku/api/api
|
||||||
|
import waku/api/api_conf
|
||||||
|
import waku/events/message_events as waku_message_events
|
||||||
|
import waku/factory/waku as waku_factory
|
||||||
|
import waku/node/delivery_service/delivery_service
|
||||||
|
import waku/waku_core/topics
|
||||||
|
|
||||||
|
import ./reliable_channel
|
||||||
|
import ./encryption/noop_encryption
|
||||||
|
|
||||||
|
export reliable_channel
|
||||||
|
|
||||||
|
type ReliableChannelManager* = ref object
|
||||||
|
channels: Table[ChannelId, ReliableChannel]
|
||||||
|
waku: Waku
|
||||||
|
## Owned by the manager. The channel layer reaches the messaging
|
||||||
|
## API through `waku.send(envelope)`; constructing DeliveryTasks
|
||||||
|
## directly would breach the layer boundary.
|
||||||
|
brokerCtx: BrokerContext
|
||||||
|
|
||||||
|
proc new*(
|
||||||
|
T: type ReliableChannelManager,
|
||||||
|
conf: WakuNodeConf,
|
||||||
|
brokerCtx: BrokerContext = globalBrokerContext(),
|
||||||
|
): Future[Result[T, string]] {.async.} =
|
||||||
|
## TODO !! The proper ownership chain is:
|
||||||
|
## ReliableChannelManager -> DeliveryService (MessagingClient) -> Waku (Kernel/Protocols) -> WakuNode,
|
||||||
|
## and this will be implemented in the future. For now, `createNode`
|
||||||
|
## is called here to get a Waku instance, and the WakuNode is immediately discarded.
|
||||||
|
## This is a temporary workaround to get the API
|
||||||
|
|
||||||
|
let waku = ?(await createNode(conf))
|
||||||
|
|
||||||
|
let manager = T(
|
||||||
|
channels: initTable[ChannelId, ReliableChannel](), waku: waku, brokerCtx: brokerCtx
|
||||||
|
)
|
||||||
|
|
||||||
|
return ok(manager)
|
||||||
|
|
||||||
|
proc start*(self: ReliableChannelManager): Result[void, string] =
|
||||||
|
## Bring the owned DeliveryService up. Separated from `new` so callers
|
||||||
|
## can register encryption providers / create channels before traffic
|
||||||
|
## starts flowing.
|
||||||
|
self.waku.deliveryService.startDeliveryService()
|
||||||
|
|
||||||
|
proc stop*(self: ReliableChannelManager) {.async.} =
|
||||||
|
if not self.waku.isNil():
|
||||||
|
await self.waku.deliveryService.stopDeliveryService()
|
||||||
|
|
||||||
|
proc createReliableChannel*(
|
||||||
|
self: ReliableChannelManager,
|
||||||
|
channelId: ChannelId,
|
||||||
|
contentTopic: ContentTopic,
|
||||||
|
senderId: SdsParticipantID,
|
||||||
|
sendHandler: SendHandler = nil,
|
||||||
|
): Result[ChannelId, string] =
|
||||||
|
## Spec entry point. The `DeliveryService` and `rng` the channel needs
|
||||||
|
## are sourced from the owning `ReliableChannelManager` rather than
|
||||||
|
## passed per call. Encryption is wired up through the `Encrypt`/
|
||||||
|
## `Decrypt` request brokers — the application installs its own
|
||||||
|
## providers (or `setNoopEncryption()`) before traffic flows.
|
||||||
|
##
|
||||||
|
## Segmentation, SDS and rate-limit configs will eventually be read
|
||||||
|
## from the node's `NodeConfig`. Defaults for now.
|
||||||
|
##
|
||||||
|
## `sendHandler` is left `nil` in production so the channel uses the
|
||||||
|
## owned `waku.send`; tests pass a fake to bypass the network.
|
||||||
|
if self.channels.hasKey(channelId):
|
||||||
|
return err("channel already exists: " & channelId)
|
||||||
|
|
||||||
|
let segConfig = SegmentationConfig(
|
||||||
|
segmentSizeBytes: DefaultSegmentSizeBytes,
|
||||||
|
enableReedSolomon: false,
|
||||||
|
persistence: nil,
|
||||||
|
)
|
||||||
|
let sdsConfig = SdsConfig(
|
||||||
|
acknowledgementTimeoutMs: DefaultAcknowledgementTimeoutMs,
|
||||||
|
maxRetransmissions: DefaultMaxRetransmissions,
|
||||||
|
causalHistorySize: DefaultCausalHistorySize,
|
||||||
|
persistence: nil,
|
||||||
|
)
|
||||||
|
let rateConfig = RateLimitConfig(
|
||||||
|
epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch
|
||||||
|
)
|
||||||
|
|
||||||
|
let chn = ReliableChannel.new(
|
||||||
|
waku = self.waku,
|
||||||
|
channelId = channelId,
|
||||||
|
contentTopic = contentTopic,
|
||||||
|
senderId = senderId,
|
||||||
|
segConfig = segConfig,
|
||||||
|
sdsConfig = sdsConfig,
|
||||||
|
rateConfig = rateConfig,
|
||||||
|
brokerCtx = self.brokerCtx,
|
||||||
|
sendHandler = sendHandler,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.channels[channelId] = chn
|
||||||
|
return ok(channelId)
|
||||||
|
|
||||||
|
proc closeChannel*(
|
||||||
|
self: ReliableChannelManager, channelId: ChannelId
|
||||||
|
): Result[void, string] =
|
||||||
|
## Flush state, persist outstanding SDS buffers, release resources.
|
||||||
|
if not self.channels.hasKey(channelId):
|
||||||
|
return err("unknown channel: " & channelId)
|
||||||
|
self.channels.del(channelId)
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
proc send*(
|
||||||
|
self: ReliableChannelManager,
|
||||||
|
channelId: ChannelId,
|
||||||
|
appPayload: seq[byte],
|
||||||
|
ephemeral: bool = false,
|
||||||
|
): Result[RequestId, string] =
|
||||||
|
## Spec-level entry point. Looks the channel up by id and delegates
|
||||||
|
## to `ReliableChannel.send`, which exposes the visible pipeline
|
||||||
|
## segmentation -> sds -> rate_limit_manager -> encryption.
|
||||||
|
let chn = self.channels.getOrDefault(channelId)
|
||||||
|
if chn.isNil():
|
||||||
|
return err("unknown channel: " & channelId)
|
||||||
|
return chn.send(appPayload, ephemeral)
|
||||||
|
|
||||||
|
## Inbound messages are not handed to the manager by direct call. Each
|
||||||
|
## `ReliableChannel` installs its own `MessageReceivedEvent` listener
|
||||||
|
## in `ReliableChannel.new`, filters by spec marker and `contentTopic`,
|
||||||
|
## and routes to its private `onMessageReceived`. This keeps the lower
|
||||||
|
## layer (MessagingAPI/Waku) unaware of the existence of ReliableChannel
|
||||||
|
## and keeps the manager out of per-channel event dispatch.
|
||||||
62
channels/scalable_data_sync/scalable_data_sync.nim
Normal file
62
channels/scalable_data_sync/scalable_data_sync.nim
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
## Scalable Data Sync (SDS) component for the Reliable Channel API.
|
||||||
|
##
|
||||||
|
## Provides end-to-end delivery guarantees via causal history tracking,
|
||||||
|
## acknowledgements, and retransmission of unacknowledged segments.
|
||||||
|
##
|
||||||
|
## Skeleton: `wrapOutgoing` and `handleIncoming` are pass-throughs so
|
||||||
|
## the send/receive circuit can exercise the surrounding pipeline.
|
||||||
|
## Real SDS wrapping will plug in via `nim-sds` later.
|
||||||
|
##
|
||||||
|
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
|
||||||
|
|
||||||
|
import results
|
||||||
|
import message as sds_message
|
||||||
|
|
||||||
|
import ./sds_persistence
|
||||||
|
|
||||||
|
export sds_message, sds_persistence
|
||||||
|
|
||||||
|
const
|
||||||
|
DefaultAcknowledgementTimeoutMs* = 5_000
|
||||||
|
DefaultMaxRetransmissions* = 5
|
||||||
|
DefaultCausalHistorySize* = 2
|
||||||
|
|
||||||
|
type
|
||||||
|
SdsConfig* = object
|
||||||
|
acknowledgementTimeoutMs*: int
|
||||||
|
maxRetransmissions*: int
|
||||||
|
causalHistorySize*: int
|
||||||
|
persistence*: SdsPersistence
|
||||||
|
|
||||||
|
SdsHandler* = ref object
|
||||||
|
config*: SdsConfig
|
||||||
|
participantId*: SdsParticipantID
|
||||||
|
|
||||||
|
proc new*(
|
||||||
|
T: type SdsHandler,
|
||||||
|
config: SdsConfig,
|
||||||
|
participantId: SdsParticipantID = SdsParticipantID(""),
|
||||||
|
): T =
|
||||||
|
return T(config: config, participantId: participantId)
|
||||||
|
|
||||||
|
proc wrapOutgoing*(
|
||||||
|
self: SdsHandler,
|
||||||
|
channelId: SdsChannelID,
|
||||||
|
senderId: SdsParticipantID,
|
||||||
|
payload: seq[byte],
|
||||||
|
): Result[seq[byte], string] =
|
||||||
|
## Stage 2 of the outgoing pipeline (segmentation -> sds -> rate_limit_manager -> encryption).
|
||||||
|
## Skeleton: pass the encoded segment through unchanged. Real causal
|
||||||
|
## history / lamport / bloom-filter population will replace this.
|
||||||
|
return ok(payload)
|
||||||
|
|
||||||
|
proc handleIncoming*(
|
||||||
|
self: SdsHandler, msg: seq[byte]
|
||||||
|
): Result[tuple[content: seq[byte], channelId: SdsChannelID], string] =
|
||||||
|
## Skeleton: pass the bytes through; channel id is left empty until
|
||||||
|
## the real wire format provides it.
|
||||||
|
return ok((content: msg, channelId: SdsChannelID("")))
|
||||||
|
|
||||||
|
proc tickRetransmissions*(self: SdsHandler) =
|
||||||
|
## Drives retransmissions of unacknowledged messages.
|
||||||
|
discard
|
||||||
25
channels/scalable_data_sync/sds_persistence.nim
Normal file
25
channels/scalable_data_sync/sds_persistence.nim
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
## Persistence backend for SDS outgoing buffer and causal history.
|
||||||
|
##
|
||||||
|
## TODO (raised in PR review): this surface is duplicating concerns that
|
||||||
|
## should come from the SDS module itself. Once the SDS module exposes a
|
||||||
|
## complete persistence contract, drop this file and import that surface
|
||||||
|
## instead of re-declaring it here.
|
||||||
|
|
||||||
|
import message
|
||||||
|
|
||||||
|
type
|
||||||
|
SdsPersistenceKind* {.pure.} = enum
|
||||||
|
InMemory
|
||||||
|
Sqlite
|
||||||
|
|
||||||
|
SdsPersistence* = ref object of RootObj
|
||||||
|
kind*: SdsPersistenceKind
|
||||||
|
|
||||||
|
method storeOutgoing*(self: SdsPersistence, msg: SdsMessage) {.base.} =
|
||||||
|
discard
|
||||||
|
|
||||||
|
method markAcknowledged*(self: SdsPersistence, messageId: SdsMessageID) {.base.} =
|
||||||
|
discard
|
||||||
|
|
||||||
|
method unackedOlderThan*(self: SdsPersistence, ageMs: int): seq[SdsMessage] {.base.} =
|
||||||
|
discard
|
||||||
34
channels/segmentation/segment_message_proto.nim
Normal file
34
channels/segmentation/segment_message_proto.nim
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
## Wire format for a single segment, per the Reliable Channel API spec.
|
||||||
|
##
|
||||||
|
## Skeleton: encode/decode treat the segment as just its payload bytes,
|
||||||
|
## since for now we only ever produce a single segment per send.
|
||||||
|
|
||||||
|
type SegmentMessageProto* = object
|
||||||
|
entireMessageHash*: seq[byte] ## Keccak256(original payload), 32 bytes
|
||||||
|
dataSegmentIndex*: uint32 ## zero-indexed sequence number for data segments
|
||||||
|
dataSegmentCount*: uint32 ## number of data segments (>= 1)
|
||||||
|
payload*: seq[byte] ## segment payload (data or parity shard)
|
||||||
|
paritySegmentIndex*: uint32 ## zero-based sequence number for parity segments
|
||||||
|
paritySegmentCount*: uint32 ## number of parity segments
|
||||||
|
isParity*: bool ## true for parity segments, false (default) for data segments
|
||||||
|
|
||||||
|
proc isParityMessage*(self: SegmentMessageProto): bool =
|
||||||
|
self.isParity
|
||||||
|
|
||||||
|
proc isValid*(self: SegmentMessageProto): bool =
|
||||||
|
## Validates hash length (32 bytes), segment indices and counts.
|
||||||
|
discard
|
||||||
|
|
||||||
|
proc encode*(self: SegmentMessageProto): seq[byte] =
|
||||||
|
self.payload
|
||||||
|
|
||||||
|
proc decode*(T: type SegmentMessageProto, buf: seq[byte]): T =
|
||||||
|
T(
|
||||||
|
entireMessageHash: @[],
|
||||||
|
dataSegmentIndex: 0,
|
||||||
|
dataSegmentCount: 1,
|
||||||
|
payload: buf,
|
||||||
|
paritySegmentIndex: 0,
|
||||||
|
paritySegmentCount: 0,
|
||||||
|
isParity: false,
|
||||||
|
)
|
||||||
70
channels/segmentation/segmentation.nim
Normal file
70
channels/segmentation/segmentation.nim
Normal file
@ -0,0 +1,70 @@
|
|||||||
|
## Segmentation component for the Reliable Channel API.
|
||||||
|
##
|
||||||
|
## Splits large application payloads into transmittable segments and
|
||||||
|
## reassembles them on reception. Supports optional Reed-Solomon parity
|
||||||
|
## segments for loss recovery, as per the Reliable Channel API spec.
|
||||||
|
##
|
||||||
|
## For the skeleton everything fits in a single segment: real chunking
|
||||||
|
## and Reed-Solomon parity will be plugged in later.
|
||||||
|
##
|
||||||
|
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
|
||||||
|
|
||||||
|
import std/options
|
||||||
|
import ./segment_message_proto
|
||||||
|
import ./segmentation_persistence
|
||||||
|
|
||||||
|
export segment_message_proto, segmentation_persistence
|
||||||
|
|
||||||
|
const
|
||||||
|
DefaultSegmentSizeBytes* = 102_400
|
||||||
|
SegmentsParityRate* = 0.125
|
||||||
|
SegmentsReedSolomonMaxCount* = 256
|
||||||
|
|
||||||
|
type
|
||||||
|
SegmentationConfig* = object
|
||||||
|
segmentSizeBytes*: int
|
||||||
|
enableReedSolomon*: bool
|
||||||
|
persistence*: SegmentationPersistence
|
||||||
|
|
||||||
|
SegmentationHandler* = ref object
|
||||||
|
config*: SegmentationConfig
|
||||||
|
|
||||||
|
ReassemblyResult* = object
|
||||||
|
payload*: seq[byte]
|
||||||
|
entireMessageHash*: seq[byte]
|
||||||
|
|
||||||
|
proc new*(T: type SegmentationHandler, config: SegmentationConfig): T =
|
||||||
|
return T(config: config)
|
||||||
|
|
||||||
|
proc performSegmentation*(
|
||||||
|
self: SegmentationHandler, payload: seq[byte]
|
||||||
|
): seq[seq[byte]] =
|
||||||
|
## Skeleton behaviour: emit exactly one segment carrying the whole
|
||||||
|
## payload. Real chunking and Reed-Solomon parity will replace this.
|
||||||
|
let segment = SegmentMessageProto(
|
||||||
|
entireMessageHash: @[],
|
||||||
|
dataSegmentIndex: 0,
|
||||||
|
dataSegmentCount: 1,
|
||||||
|
payload: payload,
|
||||||
|
paritySegmentIndex: 0,
|
||||||
|
paritySegmentCount: 0,
|
||||||
|
isParity: false,
|
||||||
|
)
|
||||||
|
return @[segment.encode()]
|
||||||
|
|
||||||
|
proc handleIncomingSegment*(
|
||||||
|
self: SegmentationHandler, segmentBytes: seq[byte]
|
||||||
|
): Option[ReassemblyResult] =
|
||||||
|
## Skeleton behaviour: every segment is already a complete message
|
||||||
|
## (since `performSegmentation` always emits one), so just hand the
|
||||||
|
## payload straight back.
|
||||||
|
let segment = SegmentMessageProto.decode(segmentBytes)
|
||||||
|
return some(
|
||||||
|
ReassemblyResult(
|
||||||
|
payload: segment.payload, entireMessageHash: segment.entireMessageHash
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
proc cleanupSegments*(self: SegmentationHandler) =
|
||||||
|
## Drop expired partial-reassembly state.
|
||||||
|
discard
|
||||||
20
channels/segmentation/segmentation_persistence.nim
Normal file
20
channels/segmentation/segmentation_persistence.nim
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
## Persistence backend interface for segmentation reassembly state.
|
||||||
|
##
|
||||||
|
## Allows partial reassembly state to survive process restarts.
|
||||||
|
|
||||||
|
type
|
||||||
|
SegmentationPersistenceKind* {.pure.} = enum
|
||||||
|
InMemory
|
||||||
|
Sqlite
|
||||||
|
|
||||||
|
SegmentationPersistence* = ref object of RootObj
|
||||||
|
kind*: SegmentationPersistenceKind
|
||||||
|
|
||||||
|
method put*(self: SegmentationPersistence, key: seq[byte], value: seq[byte]) {.base.} =
|
||||||
|
discard
|
||||||
|
|
||||||
|
method get*(self: SegmentationPersistence, key: seq[byte]): seq[byte] {.base.} =
|
||||||
|
discard
|
||||||
|
|
||||||
|
method delete*(self: SegmentationPersistence, key: seq[byte]) {.base.} =
|
||||||
|
discard
|
||||||
15
channels/types.nim
Normal file
15
channels/types.nim
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
## Core identifier types for the Reliable Channel API.
|
||||||
|
|
||||||
|
import std/hashes
|
||||||
|
import waku/api/types as api_types
|
||||||
|
|
||||||
|
import ./scalable_data_sync/scalable_data_sync
|
||||||
|
|
||||||
|
export scalable_data_sync
|
||||||
|
export api_types
|
||||||
|
|
||||||
|
type ChannelId* = SdsChannelID
|
||||||
|
|
||||||
|
proc hash*(r: RequestId): Hash =
|
||||||
|
## Allows `RequestId` to be used as a `Table` key.
|
||||||
|
hash(string(r))
|
||||||
44
flake.lock
generated
44
flake.lock
generated
@ -19,8 +19,7 @@
|
|||||||
"root": {
|
"root": {
|
||||||
"inputs": {
|
"inputs": {
|
||||||
"nixpkgs": "nixpkgs",
|
"nixpkgs": "nixpkgs",
|
||||||
"rust-overlay": "rust-overlay",
|
"rust-overlay": "rust-overlay"
|
||||||
"zerokit": "zerokit"
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"rust-overlay": {
|
"rust-overlay": {
|
||||||
@ -42,47 +41,6 @@
|
|||||||
"repo": "rust-overlay",
|
"repo": "rust-overlay",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
}
|
}
|
||||||
},
|
|
||||||
"rust-overlay_2": {
|
|
||||||
"inputs": {
|
|
||||||
"nixpkgs": [
|
|
||||||
"zerokit",
|
|
||||||
"nixpkgs"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
"locked": {
|
|
||||||
"lastModified": 1771211437,
|
|
||||||
"narHash": "sha256-lcNK438i4DGtyA+bPXXyVLHVmJjYpVKmpux9WASa3ro=",
|
|
||||||
"owner": "oxalica",
|
|
||||||
"repo": "rust-overlay",
|
|
||||||
"rev": "c62195b3d6e1bb11e0c2fb2a494117d3b55d410f",
|
|
||||||
"type": "github"
|
|
||||||
},
|
|
||||||
"original": {
|
|
||||||
"owner": "oxalica",
|
|
||||||
"repo": "rust-overlay",
|
|
||||||
"type": "github"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"zerokit": {
|
|
||||||
"inputs": {
|
|
||||||
"nixpkgs": [
|
|
||||||
"nixpkgs"
|
|
||||||
],
|
|
||||||
"rust-overlay": "rust-overlay_2"
|
|
||||||
},
|
|
||||||
"locked": {
|
|
||||||
"owner": "vacp2p",
|
|
||||||
"repo": "zerokit",
|
|
||||||
"rev": "5e64cb8822bee65eed6cf459f95ae72b80c6ba63",
|
|
||||||
"type": "github"
|
|
||||||
},
|
|
||||||
"original": {
|
|
||||||
"owner": "vacp2p",
|
|
||||||
"repo": "zerokit",
|
|
||||||
"rev": "5e64cb8822bee65eed6cf459f95ae72b80c6ba63",
|
|
||||||
"type": "github"
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"root": "root",
|
"root": "root",
|
||||||
|
|||||||
101
flake.nix
101
flake.nix
@ -17,19 +17,9 @@
|
|||||||
url = "github:oxalica/rust-overlay";
|
url = "github:oxalica/rust-overlay";
|
||||||
inputs.nixpkgs.follows = "nixpkgs";
|
inputs.nixpkgs.follows = "nixpkgs";
|
||||||
};
|
};
|
||||||
|
|
||||||
# External flake input: Zerokit pinned to a specific commit.
|
|
||||||
# Update the rev here when a new zerokit version is needed.
|
|
||||||
zerokit = {
|
|
||||||
# Pinned to v2.0.2 (5e64cb8822bee65eed6cf459f95ae72b80c6ba63) to match
|
|
||||||
# the vendor/zerokit submodule. Keep these two in sync: the nix build
|
|
||||||
# links librln from this input, the Makefile build from the submodule.
|
|
||||||
url = "github:vacp2p/zerokit/5e64cb8822bee65eed6cf459f95ae72b80c6ba63";
|
|
||||||
inputs.nixpkgs.follows = "nixpkgs";
|
|
||||||
};
|
|
||||||
};
|
};
|
||||||
|
|
||||||
outputs = { self, nixpkgs, rust-overlay, zerokit }:
|
outputs = { self, nixpkgs, rust-overlay }:
|
||||||
let
|
let
|
||||||
systems = [
|
systems = [
|
||||||
"x86_64-linux" "aarch64-linux"
|
"x86_64-linux" "aarch64-linux"
|
||||||
@ -69,19 +59,78 @@
|
|||||||
inherit system;
|
inherit system;
|
||||||
overlays = [ (import rust-overlay) nimbleOverlay ];
|
overlays = [ (import rust-overlay) nimbleOverlay ];
|
||||||
};
|
};
|
||||||
|
|
||||||
|
# Prebuilt zerokit librln, fetched from the upstream GitHub release
|
||||||
|
# rather than compiled from source. Compiling zerokit makes Nix download
|
||||||
|
# its many crate dependencies from crates.io in one parallel burst, which
|
||||||
|
# crates.io intermittently rejects with HTTP 403 (rate limiting from the
|
||||||
|
# self-hosted runners' shared IP), breaking the nix build. The release
|
||||||
|
# ships the exact `stateless` library this project links (see
|
||||||
|
# scripts/build_rln.sh), so we use it directly — no Rust toolchain and
|
||||||
|
# no crates.io access needed.
|
||||||
|
#
|
||||||
|
# Keep `rlnVersion` aligned with `LIBRLN_VERSION` in the Makefile and the
|
||||||
|
# vendor/zerokit submodule. Each hash is the sha256 of the release tarball
|
||||||
|
# for that platform; refresh all four when bumping the version.
|
||||||
|
rlnVersion = "v2.0.2";
|
||||||
|
rlnAssets = {
|
||||||
|
"x86_64-linux" = { triple = "x86_64-unknown-linux-gnu"; hash = "sha256-qbrUdaetYKFhjzxUP/QcwD3JHWJ8qk/tCMK3yXceIAk="; };
|
||||||
|
"aarch64-linux" = { triple = "aarch64-unknown-linux-gnu"; hash = "sha256-s4bWrmCcNTWHNyJwV73ilWNp58ZdAVG+TAgtWN1cTQs="; };
|
||||||
|
"x86_64-darwin" = { triple = "x86_64-apple-darwin"; hash = "sha256-ZaHP5CApN66FYY7jxwOmGcF9kJR78Fng3k1qE2W08Mk="; };
|
||||||
|
"aarch64-darwin" = { triple = "aarch64-apple-darwin"; hash = "sha256-f2YppkPsKFdN00j+IY8fpvsebWTIb9lW/V1/vOTiVKU="; };
|
||||||
|
};
|
||||||
|
|
||||||
|
mkZerokitRln = system: pkgs:
|
||||||
|
let
|
||||||
|
asset = rlnAssets.${system} or
|
||||||
|
(throw "zerokit ${rlnVersion} has no prebuilt rln asset for system '${system}'");
|
||||||
|
in pkgs.stdenv.mkDerivation {
|
||||||
|
pname = "librln";
|
||||||
|
version = lib.removePrefix "v" rlnVersion;
|
||||||
|
|
||||||
|
src = pkgs.fetchurl {
|
||||||
|
url = "https://github.com/vacp2p/zerokit/releases/download/"
|
||||||
|
+ "${rlnVersion}/${asset.triple}-stateless-rln.tar.gz";
|
||||||
|
hash = asset.hash;
|
||||||
|
};
|
||||||
|
|
||||||
|
# The tarball lays its files out under release/.
|
||||||
|
sourceRoot = "release";
|
||||||
|
dontConfigure = true;
|
||||||
|
dontBuild = true;
|
||||||
|
|
||||||
|
# The release .so was linked outside Nix, so it references system
|
||||||
|
# libraries (libgcc_s, libstdc++, glibc) by bare name. autoPatchelfHook
|
||||||
|
# points those at the Nix versions so the library loads correctly when
|
||||||
|
# used by the Nix build. It does nothing for the static .a, and the
|
||||||
|
# step is skipped on macOS (dylib paths are fixed in nix/default.nix).
|
||||||
|
nativeBuildInputs =
|
||||||
|
pkgs.lib.optionals pkgs.stdenv.isLinux [ pkgs.autoPatchelfHook ];
|
||||||
|
buildInputs =
|
||||||
|
pkgs.lib.optionals pkgs.stdenv.isLinux [ pkgs.stdenv.cc.cc.lib ];
|
||||||
|
|
||||||
|
installPhase = ''
|
||||||
|
runHook preInstall
|
||||||
|
mkdir -p $out/lib
|
||||||
|
cp librln.a $out/lib/ 2>/dev/null || true
|
||||||
|
cp librln.so $out/lib/ 2>/dev/null || true
|
||||||
|
cp librln.dylib $out/lib/ 2>/dev/null || true
|
||||||
|
runHook postInstall
|
||||||
|
'';
|
||||||
|
|
||||||
|
meta = with pkgs.lib; {
|
||||||
|
description = "Prebuilt zerokit RLN library (stateless flavor)";
|
||||||
|
homepage = "https://github.com/vacp2p/zerokit";
|
||||||
|
license = with licenses; [ mit asl20 ];
|
||||||
|
platforms = builtins.attrNames rlnAssets;
|
||||||
|
};
|
||||||
|
};
|
||||||
in {
|
in {
|
||||||
packages = forAllSystems (system:
|
packages = forAllSystems (system:
|
||||||
let
|
let
|
||||||
pkgs = pkgsFor system;
|
pkgs = pkgsFor system;
|
||||||
|
|
||||||
# HACK: Fix for stale cargoHash in 2.0.2 release.
|
zerokitRln = mkZerokitRln system pkgs;
|
||||||
zerokitRln = zerokit.packages.${system}.rln.overrideAttrs (old: {
|
|
||||||
cargoDeps = old.cargoDeps.overrideAttrs (oldCargoDeps: {
|
|
||||||
vendorStaging = oldCargoDeps.vendorStaging.overrideAttrs (_: {
|
|
||||||
outputHash = "sha256-PNwEdZLgGQPqQDrEK2hsQtSybVfBbD6xn4K47fPFJUU=";
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
liblogosdelivery = pkgs.callPackage ./nix/default.nix {
|
liblogosdelivery = pkgs.callPackage ./nix/default.nix {
|
||||||
inherit pkgs;
|
inherit pkgs;
|
||||||
@ -89,12 +138,18 @@
|
|||||||
inherit zerokitRln;
|
inherit zerokitRln;
|
||||||
gitVersion = "v${nimbleVersion}-g${builtins.substring 0 6 shortRev}";
|
gitVersion = "v${nimbleVersion}-g${builtins.substring 0 6 shortRev}";
|
||||||
};
|
};
|
||||||
|
|
||||||
|
wakucanary = pkgs.callPackage ./nix/default.nix {
|
||||||
|
inherit pkgs;
|
||||||
|
src = ./.;
|
||||||
|
targets = ["wakucanary"];
|
||||||
|
inherit zerokitRln;
|
||||||
|
};
|
||||||
in {
|
in {
|
||||||
inherit liblogosdelivery;
|
inherit liblogosdelivery wakucanary;
|
||||||
# Expose the cargoHash-corrected librln so downstream consumers
|
# Expose the prebuilt librln so downstream consumers
|
||||||
# (e.g. logos-delivery-module) bundle the exact same librln this
|
# (e.g. logos-delivery-module) bundle the exact same librln this
|
||||||
# build links, instead of pulling zerokit's rln directly — whose
|
# build links against.
|
||||||
# committed cargoHash is stale for v2.0.2 (see zerokitRln above).
|
|
||||||
rln = zerokitRln;
|
rln = zerokitRln;
|
||||||
default = liblogosdelivery;
|
default = liblogosdelivery;
|
||||||
}
|
}
|
||||||
|
|||||||
149
nix/default.nix
149
nix/default.nix
@ -1,6 +1,7 @@
|
|||||||
{ pkgs
|
{ pkgs
|
||||||
, src
|
, src
|
||||||
, zerokitRln
|
, zerokitRln
|
||||||
|
, targets ? []
|
||||||
, gitVersion ? "n/a"
|
, gitVersion ? "n/a"
|
||||||
, enablePostgres ? true
|
, enablePostgres ? true
|
||||||
, enableNimDebugDlOpen ? true
|
, enableNimDebugDlOpen ? true
|
||||||
@ -10,6 +11,8 @@
|
|||||||
let
|
let
|
||||||
deps = import ./deps.nix { inherit pkgs; };
|
deps = import ./deps.nix { inherit pkgs; };
|
||||||
|
|
||||||
|
buildWakucanary = builtins.elem "wakucanary" targets;
|
||||||
|
|
||||||
nimDefineArgs = pkgs.lib.concatStringsSep " \\\n " (
|
nimDefineArgs = pkgs.lib.concatStringsSep " \\\n " (
|
||||||
[ "--define:disable_libbacktrace"
|
[ "--define:disable_libbacktrace"
|
||||||
"--define:git_version=${gitVersion}" ]
|
"--define:git_version=${gitVersion}" ]
|
||||||
@ -34,9 +37,29 @@ let
|
|||||||
if pkgs.stdenv.hostPlatform.isWindows then "dll"
|
if pkgs.stdenv.hostPlatform.isWindows then "dll"
|
||||||
else if pkgs.stdenv.hostPlatform.isDarwin then "dylib"
|
else if pkgs.stdenv.hostPlatform.isDarwin then "dylib"
|
||||||
else "so";
|
else "so";
|
||||||
|
|
||||||
|
# Shared `nim c` invocation. Callers vary the output, the source file and a
|
||||||
|
# few mode-specific flags (e.g. --app:lib, --noMain, --header); everything
|
||||||
|
# else (paths, defines, threading, gc, nimcache, rln linkage) is constant.
|
||||||
|
# $NAT_TRAV and $NIMCACHE are shell variables defined in buildPhase.
|
||||||
|
nimCompile = { outFile, sourceFile, extraArgs ? [] }: ''
|
||||||
|
nim c \
|
||||||
|
--noNimblePath \
|
||||||
|
${pathArgs} \
|
||||||
|
--path:$NAT_TRAV \
|
||||||
|
--path:$NAT_TRAV/src \
|
||||||
|
--passL:"-L${zerokitRln}/lib -lrln${pkgs.lib.optionalString pkgs.stdenv.isLinux " -lstdc++"}" \
|
||||||
|
${nimDefineArgs} \
|
||||||
|
--threads:on \
|
||||||
|
--mm:refc \
|
||||||
|
--nimcache:$NIMCACHE \
|
||||||
|
--out:${outFile} \
|
||||||
|
${pkgs.lib.concatStringsSep " \\\n " extraArgs} \
|
||||||
|
${sourceFile}
|
||||||
|
'';
|
||||||
in
|
in
|
||||||
pkgs.stdenv.mkDerivation {
|
pkgs.stdenv.mkDerivation {
|
||||||
pname = "liblogosdelivery";
|
pname = if buildWakucanary then "wakucanary" else "liblogosdelivery";
|
||||||
version = "dev";
|
version = "dev";
|
||||||
|
|
||||||
inherit src;
|
inherit src;
|
||||||
@ -71,45 +94,47 @@ pkgs.stdenv.mkDerivation {
|
|||||||
make -C $NAT_TRAV/vendor/libnatpmp-upstream \
|
make -C $NAT_TRAV/vendor/libnatpmp-upstream \
|
||||||
CFLAGS="-Wall -Os -fPIC -DENABLE_STRNATPMPERR -DNATPMP_MAX_RETRIES=4" libnatpmp.a
|
CFLAGS="-Wall -Os -fPIC -DENABLE_STRNATPMPERR -DNATPMP_MAX_RETRIES=4" libnatpmp.a
|
||||||
|
|
||||||
|
${if buildWakucanary then ''
|
||||||
|
echo "== Building wakucanary =="
|
||||||
|
${nimCompile {
|
||||||
|
outFile = "build/wakucanary";
|
||||||
|
sourceFile = "apps/wakucanary/wakucanary.nim";
|
||||||
|
extraArgs = [ "--path:." ];
|
||||||
|
}}
|
||||||
|
'' else ''
|
||||||
echo "== Building liblogosdelivery (dynamic) =="
|
echo "== Building liblogosdelivery (dynamic) =="
|
||||||
nim c \
|
${nimCompile {
|
||||||
--noNimblePath \
|
outFile = "build/liblogosdelivery.${libExt}";
|
||||||
${pathArgs} \
|
sourceFile = "liblogosdelivery/liblogosdelivery.nim";
|
||||||
--path:$NAT_TRAV \
|
extraArgs = [
|
||||||
--path:$NAT_TRAV/src \
|
"--app:lib"
|
||||||
--passL:"-L${zerokitRln}/lib -lrln${pkgs.lib.optionalString pkgs.stdenv.isLinux " -lstdc++"}" \
|
"--opt:size"
|
||||||
${nimDefineArgs} \
|
"--noMain"
|
||||||
--out:build/liblogosdelivery.${libExt} \
|
"--header"
|
||||||
--app:lib \
|
"--nimMainPrefix:liblogosdelivery"
|
||||||
--threads:on \
|
];
|
||||||
--opt:size \
|
}}
|
||||||
--noMain \
|
|
||||||
--mm:refc \
|
|
||||||
--header \
|
|
||||||
--nimMainPrefix:liblogosdelivery \
|
|
||||||
--nimcache:$NIMCACHE \
|
|
||||||
liblogosdelivery/liblogosdelivery.nim
|
|
||||||
|
|
||||||
echo "== Building liblogosdelivery (static) =="
|
echo "== Building liblogosdelivery (static) =="
|
||||||
nim c \
|
${nimCompile {
|
||||||
--noNimblePath \
|
outFile = "build/liblogosdelivery.a";
|
||||||
${pathArgs} \
|
sourceFile = "liblogosdelivery/liblogosdelivery.nim";
|
||||||
--path:$NAT_TRAV \
|
extraArgs = [
|
||||||
--path:$NAT_TRAV/src \
|
"--app:staticlib"
|
||||||
--passL:"-L${zerokitRln}/lib -lrln${pkgs.lib.optionalString pkgs.stdenv.isLinux " -lstdc++"}" \
|
"--opt:size"
|
||||||
${nimDefineArgs} \
|
"--noMain"
|
||||||
--out:build/liblogosdelivery.a \
|
"--nimMainPrefix:liblogosdelivery"
|
||||||
--app:staticlib \
|
];
|
||||||
--threads:on \
|
}}
|
||||||
--opt:size \
|
''}
|
||||||
--noMain \
|
|
||||||
--mm:refc \
|
|
||||||
--nimMainPrefix:liblogosdelivery \
|
|
||||||
--nimcache:$NIMCACHE \
|
|
||||||
liblogosdelivery/liblogosdelivery.nim
|
|
||||||
'';
|
'';
|
||||||
|
|
||||||
installPhase = ''
|
installPhase = if buildWakucanary then ''
|
||||||
|
runHook preInstall
|
||||||
|
mkdir -p $out/bin $out/lib
|
||||||
|
cp build/wakucanary $out/bin/
|
||||||
|
runHook postInstall
|
||||||
|
'' else ''
|
||||||
runHook preInstall
|
runHook preInstall
|
||||||
mkdir -p $out/lib $out/include
|
mkdir -p $out/lib $out/include
|
||||||
cp build/liblogosdelivery.${libExt} $out/lib/ 2>/dev/null || true
|
cp build/liblogosdelivery.${libExt} $out/lib/ 2>/dev/null || true
|
||||||
@ -118,21 +143,47 @@ pkgs.stdenv.mkDerivation {
|
|||||||
runHook postInstall
|
runHook postInstall
|
||||||
'';
|
'';
|
||||||
|
|
||||||
# Bundle librln alongside liblogosdelivery so the output is self-contained.
|
# Bundle librln alongside the produced artifact so the output is self-contained.
|
||||||
# Use --add-rpath (not --set-rpath) so fixupPhase's stdenv RUNPATH injection
|
# Use --add-rpath (not --set-rpath) so fixupPhase's stdenv RUNPATH injection
|
||||||
# for libstdc++ is preserved.
|
# for libstdc++ is preserved.
|
||||||
postInstall =
|
postInstall =
|
||||||
pkgs.lib.optionalString pkgs.stdenv.isDarwin ''
|
if buildWakucanary then
|
||||||
cp ${zerokitRln}/lib/librln.dylib $out/lib/
|
pkgs.lib.optionalString pkgs.stdenv.isDarwin ''
|
||||||
chmod +w $out/lib/librln.dylib $out/lib/liblogosdelivery.dylib
|
cp ${zerokitRln}/lib/librln.dylib $out/lib/
|
||||||
install_name_tool -id @rpath/liblogosdelivery.dylib $out/lib/liblogosdelivery.dylib
|
chmod +w $out/lib/librln.dylib $out/bin/wakucanary
|
||||||
install_name_tool -id @rpath/librln.dylib $out/lib/librln.dylib
|
install_name_tool -id @rpath/librln.dylib $out/lib/librln.dylib
|
||||||
old=$(otool -L $out/lib/liblogosdelivery.dylib | awk 'NR>1{print $1}' | grep librln)
|
old=$(otool -L $out/bin/wakucanary | awk 'NR>1{print $1}' | grep librln || true)
|
||||||
install_name_tool -change "$old" @rpath/librln.dylib $out/lib/liblogosdelivery.dylib
|
if [ -n "$old" ]; then
|
||||||
install_name_tool -add_rpath @loader_path $out/lib/liblogosdelivery.dylib
|
install_name_tool -change "$old" @rpath/librln.dylib $out/bin/wakucanary
|
||||||
''
|
fi
|
||||||
+ pkgs.lib.optionalString pkgs.stdenv.isLinux ''
|
install_name_tool -add_rpath @loader_path/../lib $out/bin/wakucanary
|
||||||
cp ${zerokitRln}/lib/librln.so $out/lib/
|
''
|
||||||
patchelf --add-rpath '$ORIGIN' $out/lib/liblogosdelivery.so
|
+ pkgs.lib.optionalString pkgs.stdenv.isLinux ''
|
||||||
'';
|
cp ${zerokitRln}/lib/librln.so $out/lib/
|
||||||
|
patchelf --add-rpath '$ORIGIN/../lib' $out/bin/wakucanary
|
||||||
|
''
|
||||||
|
else
|
||||||
|
pkgs.lib.optionalString pkgs.stdenv.isDarwin ''
|
||||||
|
cp ${zerokitRln}/lib/librln.dylib $out/lib/
|
||||||
|
chmod +w $out/lib/librln.dylib $out/lib/liblogosdelivery.dylib
|
||||||
|
install_name_tool -id @rpath/liblogosdelivery.dylib $out/lib/liblogosdelivery.dylib
|
||||||
|
install_name_tool -id @rpath/librln.dylib $out/lib/librln.dylib
|
||||||
|
old=$(otool -L $out/lib/liblogosdelivery.dylib | awk 'NR>1{print $1}' | grep librln)
|
||||||
|
install_name_tool -change "$old" @rpath/librln.dylib $out/lib/liblogosdelivery.dylib
|
||||||
|
install_name_tool -add_rpath @loader_path $out/lib/liblogosdelivery.dylib
|
||||||
|
''
|
||||||
|
+ pkgs.lib.optionalString pkgs.stdenv.isLinux ''
|
||||||
|
cp ${zerokitRln}/lib/librln.so $out/lib/
|
||||||
|
patchelf --add-rpath '$ORIGIN' $out/lib/liblogosdelivery.so
|
||||||
|
'';
|
||||||
|
|
||||||
|
meta = with pkgs.lib; {
|
||||||
|
description =
|
||||||
|
if buildWakucanary
|
||||||
|
then "Waku network canary tool"
|
||||||
|
else "logos-delivery shared/static library";
|
||||||
|
homepage = "https://github.com/logos-messaging/logos-delivery";
|
||||||
|
license = licenses.mit;
|
||||||
|
platforms = platforms.unix;
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,8 +1,15 @@
|
|||||||
#!/usr/bin/env bash
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
# This script is used to build the rln library for the current platform.
|
# Provides the rln static library for the current platform.
|
||||||
# Previously downloaded prebuilt binaries, but due to compatibility issues
|
#
|
||||||
# we now always build from source.
|
# If zerokit publishes a prebuilt `stateless` release asset for this platform,
|
||||||
|
# download and use it: that is faster than compiling and avoids fetching
|
||||||
|
# zerokit's many crate dependencies from crates.io. The asset is selected by
|
||||||
|
# the Rust host target triple (the platform identifier reported by rustc,
|
||||||
|
# e.g. x86_64-unknown-linux-gnu or aarch64-apple-darwin).
|
||||||
|
#
|
||||||
|
# When no matching asset exists (e.g. Windows), build from the vendored
|
||||||
|
# zerokit submodule instead.
|
||||||
|
|
||||||
set -e
|
set -e
|
||||||
|
|
||||||
@ -15,8 +22,26 @@ output_filename=$3
|
|||||||
[[ -z "${rln_version}" ]] && { echo "No rln version specified"; exit 1; }
|
[[ -z "${rln_version}" ]] && { echo "No rln version specified"; exit 1; }
|
||||||
[[ -z "${output_filename}" ]] && { echo "No output filename specified"; exit 1; }
|
[[ -z "${output_filename}" ]] && { echo "No output filename specified"; exit 1; }
|
||||||
|
|
||||||
echo "Building RLN library from source (version ${rln_version})..."
|
# --- Prefer the prebuilt release asset --------------------------------------
|
||||||
|
# Host target triple, e.g. x86_64-unknown-linux-gnu / aarch64-apple-darwin.
|
||||||
|
host_triplet=$(rustc --version --verbose | awk '/host:/{print $2}')
|
||||||
|
tarball="${host_triplet}-stateless-rln.tar.gz"
|
||||||
|
url="https://github.com/vacp2p/zerokit/releases/download/${rln_version}/${tarball}"
|
||||||
|
|
||||||
|
echo "Looking for prebuilt RLN: ${url}"
|
||||||
|
if curl --silent --fail-with-body -L "${url}" -o "${tarball}"; then
|
||||||
|
echo "Downloaded prebuilt ${tarball}"
|
||||||
|
tar -xzf "${tarball}"
|
||||||
|
mv "release/librln.a" "${output_filename}"
|
||||||
|
rm -rf "${tarball}" release
|
||||||
|
echo "Using prebuilt ${output_filename}"
|
||||||
|
exit 0
|
||||||
|
fi
|
||||||
|
# curl --fail-with-body writes the error body to the file on HTTP failure.
|
||||||
|
rm -f "${tarball}"
|
||||||
|
echo "No prebuilt asset for ${host_triplet} at ${rln_version}; building from source."
|
||||||
|
|
||||||
|
# --- Fall back to building from the vendored submodule ----------------------
|
||||||
# Check if submodule version = version in Makefile
|
# Check if submodule version = version in Makefile
|
||||||
cargo metadata --format-version=1 --no-deps --manifest-path "${build_dir}/rln/Cargo.toml"
|
cargo metadata --format-version=1 --no-deps --manifest-path "${build_dir}/rln/Cargo.toml"
|
||||||
|
|
||||||
@ -33,7 +58,6 @@ if [[ "v${submodule_version}" != "${rln_version}" ]]; then
|
|||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# Build rln from source.
|
|
||||||
# `stateless` feature: logos-delivery does not maintain a local Merkle tree
|
# `stateless` feature: logos-delivery does not maintain a local Merkle tree
|
||||||
# (post-PR #3312); the contract is the source of truth and the path is fetched
|
# (post-PR #3312); the contract is the source of truth and the path is fetched
|
||||||
# via getMerkleProof(index). The stateless build compiles out tree code.
|
# via getMerkleProof(index). The stateless build compiles out tree code.
|
||||||
|
|||||||
@ -45,6 +45,13 @@ if [ -n "${nim_ver}" ]; then
|
|||||||
echo "INFO: Nim ${nim_ver} found in PATH; installing Nim ${NIM_VERSION} to ${NIM_DEST}." >&2
|
echo "INFO: Nim ${nim_ver} found in PATH; installing Nim ${NIM_VERSION} to ${NIM_DEST}." >&2
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
<<<<<<< HEAD
|
||||||
|
=======
|
||||||
|
OS=$(uname -s | tr 'A-Z' 'a-z' | sed 's/darwin/macosx/')
|
||||||
|
ARCH=$(uname -m | sed 's/x86_64/x64/;s/aarch64/arm64/')
|
||||||
|
|
||||||
|
BINARY_URL="https://nim-lang.org/download/nim-${NIM_VERSION}-${OS}_${ARCH}.tar.xz"
|
||||||
|
>>>>>>> master
|
||||||
WORK_DIR=$(mktemp -d)
|
WORK_DIR=$(mktemp -d)
|
||||||
trap 'rm -rf "${WORK_DIR}"' EXIT
|
trap 'rm -rf "${WORK_DIR}"' EXIT
|
||||||
|
|
||||||
@ -58,6 +65,7 @@ MINGW* | MSYS* | CYGWIN*)
|
|||||||
esac
|
esac
|
||||||
BINARY_URL="https://nim-lang.org/download/nim-${NIM_VERSION}_${WIN_ARCH}.zip"
|
BINARY_URL="https://nim-lang.org/download/nim-${NIM_VERSION}_${WIN_ARCH}.zip"
|
||||||
|
|
||||||
|
<<<<<<< HEAD
|
||||||
echo "Downloading pre-built Nim ${NIM_VERSION} (windows_${WIN_ARCH}) from ${BINARY_URL}..."
|
echo "Downloading pre-built Nim ${NIM_VERSION} (windows_${WIN_ARCH}) from ${BINARY_URL}..."
|
||||||
curl -fL "${BINARY_URL}" -o "${WORK_DIR}/nim.zip"
|
curl -fL "${BINARY_URL}" -o "${WORK_DIR}/nim.zip"
|
||||||
unzip -q "${WORK_DIR}/nim.zip" -d "${WORK_DIR}"
|
unzip -q "${WORK_DIR}/nim.zip" -d "${WORK_DIR}"
|
||||||
@ -89,6 +97,28 @@ MINGW* | MSYS* | CYGWIN*)
|
|||||||
;;
|
;;
|
||||||
esac
|
esac
|
||||||
|
|
||||||
|
# rm -rf can fail with "Directory not empty" on overlay filesystems (e.g. Docker).
|
||||||
|
# Using cp -r src/. dst/ handles both cases: dst absent (clean) or partially present.
|
||||||
|
rm -rf "${NIM_DEST}" 2>/dev/null || true
|
||||||
|
mkdir -p "${NIM_DEST}"
|
||||||
|
cp -r "${SRC_DIR}/." "${NIM_DEST}/"
|
||||||
|
=======
|
||||||
|
if [ "${HTTP_STATUS}" = "200" ]; then
|
||||||
|
echo "Downloading pre-built binary from ${BINARY_URL}..."
|
||||||
|
curl -fL "${BINARY_URL}" -o "${WORK_DIR}/nim.tar.xz"
|
||||||
|
tar -xJf "${WORK_DIR}/nim.tar.xz" -C "${WORK_DIR}"
|
||||||
|
SRC_DIR="${WORK_DIR}/nim-${NIM_VERSION}"
|
||||||
|
else
|
||||||
|
echo "No pre-built binary found for ${OS}_${ARCH}. Building from source..."
|
||||||
|
SRC_URL="https://github.com/nim-lang/Nim/archive/refs/tags/v${NIM_VERSION}.tar.gz"
|
||||||
|
curl -fL "${SRC_URL}" -o "${WORK_DIR}/nim-src.tar.gz"
|
||||||
|
tar -xzf "${WORK_DIR}/nim-src.tar.gz" -C "${WORK_DIR}"
|
||||||
|
cd "${WORK_DIR}/Nim-${NIM_VERSION}"
|
||||||
|
sh build_all.sh
|
||||||
|
SRC_DIR="${WORK_DIR}/Nim-${NIM_VERSION}"
|
||||||
|
fi
|
||||||
|
>>>>>>> master
|
||||||
|
|
||||||
# rm -rf can fail with "Directory not empty" on overlay filesystems (e.g. Docker).
|
# rm -rf can fail with "Directory not empty" on overlay filesystems (e.g. Docker).
|
||||||
# Using cp -r src/. dst/ handles both cases: dst absent (clean) or partially present.
|
# Using cp -r src/. dst/ handles both cases: dst absent (clean) or partially present.
|
||||||
rm -rf "${NIM_DEST}" 2>/dev/null || true
|
rm -rf "${NIM_DEST}" 2>/dev/null || true
|
||||||
|
|||||||
@ -19,6 +19,7 @@ if [ -z "${NIMBLE_VERSION}" ]; then
|
|||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
<<<<<<< HEAD
|
||||||
# On Windows (MSYS2) the binaries carry a .exe extension.
|
# On Windows (MSYS2) the binaries carry a .exe extension.
|
||||||
EXE=""
|
EXE=""
|
||||||
case "$(uname -s)" in
|
case "$(uname -s)" in
|
||||||
@ -26,6 +27,9 @@ MINGW* | MSYS* | CYGWIN*) EXE=".exe" ;;
|
|||||||
esac
|
esac
|
||||||
|
|
||||||
NIMBLE_BIN="${HOME}/.nimble/bin/nimble${EXE}"
|
NIMBLE_BIN="${HOME}/.nimble/bin/nimble${EXE}"
|
||||||
|
=======
|
||||||
|
NIMBLE_BIN="${HOME}/.nimble/bin/nimble"
|
||||||
|
>>>>>>> master
|
||||||
|
|
||||||
# 1. Already installed at the right version?
|
# 1. Already installed at the right version?
|
||||||
if [ -x "${NIMBLE_BIN}" ]; then
|
if [ -x "${NIMBLE_BIN}" ]; then
|
||||||
@ -38,7 +42,11 @@ if [ -x "${NIMBLE_BIN}" ]; then
|
|||||||
fi
|
fi
|
||||||
|
|
||||||
# 2. Already compiled into pkgs2/ from a previous (possibly partial) run?
|
# 2. Already compiled into pkgs2/ from a previous (possibly partial) run?
|
||||||
|
<<<<<<< HEAD
|
||||||
PKGS2_NIMBLE=$(ls -dt "${HOME}/.nimble/pkgs2/nimble-${NIMBLE_VERSION}-"*/nimble${EXE} \
|
PKGS2_NIMBLE=$(ls -dt "${HOME}/.nimble/pkgs2/nimble-${NIMBLE_VERSION}-"*/nimble${EXE} \
|
||||||
|
=======
|
||||||
|
PKGS2_NIMBLE=$(ls -dt "${HOME}/.nimble/pkgs2/nimble-${NIMBLE_VERSION}-"*/nimble \
|
||||||
|
>>>>>>> master
|
||||||
2>/dev/null | head -1 || true)
|
2>/dev/null | head -1 || true)
|
||||||
if [ -n "${PKGS2_NIMBLE}" ] && [ -x "${PKGS2_NIMBLE}" ]; then
|
if [ -n "${PKGS2_NIMBLE}" ] && [ -x "${PKGS2_NIMBLE}" ]; then
|
||||||
echo "Nimble ${NIMBLE_VERSION} found in pkgs2, re-linking to ${NIMBLE_BIN}."
|
echo "Nimble ${NIMBLE_VERSION} found in pkgs2, re-linking to ${NIMBLE_BIN}."
|
||||||
@ -48,7 +56,11 @@ if [ -n "${PKGS2_NIMBLE}" ] && [ -x "${PKGS2_NIMBLE}" ]; then
|
|||||||
fi
|
fi
|
||||||
|
|
||||||
# 3. Build from source.
|
# 3. Build from source.
|
||||||
|
<<<<<<< HEAD
|
||||||
NIM_BIN="${HOME}/.nimble/bin/nim${EXE}"
|
NIM_BIN="${HOME}/.nimble/bin/nim${EXE}"
|
||||||
|
=======
|
||||||
|
NIM_BIN="${HOME}/.nimble/bin/nim"
|
||||||
|
>>>>>>> master
|
||||||
if [ ! -x "${NIM_BIN}" ]; then
|
if [ ! -x "${NIM_BIN}" ]; then
|
||||||
NIM_BIN="$(command -v nim)"
|
NIM_BIN="$(command -v nim)"
|
||||||
fi
|
fi
|
||||||
@ -66,11 +78,19 @@ echo "Building nimble ${NIMBLE_VERSION} with $("${NIM_BIN}" --version | head -1)
|
|||||||
cd "${WORK_DIR}/nimble"
|
cd "${WORK_DIR}/nimble"
|
||||||
# nim reads nim.cfg / config.nims in the current dir, which sets vendor paths.
|
# nim reads nim.cfg / config.nims in the current dir, which sets vendor paths.
|
||||||
"${NIM_BIN}" c -d:release --path:src \
|
"${NIM_BIN}" c -d:release --path:src \
|
||||||
|
<<<<<<< HEAD
|
||||||
-o:"${WORK_DIR}/nimble_new${EXE}" src/nimble.nim
|
-o:"${WORK_DIR}/nimble_new${EXE}" src/nimble.nim
|
||||||
|
|
||||||
mkdir -p "${HOME}/.nimble/bin"
|
mkdir -p "${HOME}/.nimble/bin"
|
||||||
# Atomic rename: avoids ETXTBSY when the old binary at NIMBLE_BIN is still running.
|
# Atomic rename: avoids ETXTBSY when the old binary at NIMBLE_BIN is still running.
|
||||||
cp "${WORK_DIR}/nimble_new${EXE}" "${NIMBLE_BIN}.new.$$"
|
cp "${WORK_DIR}/nimble_new${EXE}" "${NIMBLE_BIN}.new.$$"
|
||||||
|
=======
|
||||||
|
-o:"${WORK_DIR}/nimble_new" src/nimble.nim
|
||||||
|
|
||||||
|
mkdir -p "${HOME}/.nimble/bin"
|
||||||
|
# Atomic rename: avoids ETXTBSY when the old binary at NIMBLE_BIN is still running.
|
||||||
|
cp "${WORK_DIR}/nimble_new" "${NIMBLE_BIN}.new.$$"
|
||||||
|
>>>>>>> master
|
||||||
mv -f "${NIMBLE_BIN}.new.$$" "${NIMBLE_BIN}"
|
mv -f "${NIMBLE_BIN}.new.$$" "${NIMBLE_BIN}"
|
||||||
|
|
||||||
echo "Nimble ${NIMBLE_VERSION} installed to ${NIMBLE_BIN}"
|
echo "Nimble ${NIMBLE_VERSION} installed to ${NIMBLE_BIN}"
|
||||||
@ -88,3 +88,6 @@ import ./tools/test_all
|
|||||||
|
|
||||||
# Persistency library tests
|
# Persistency library tests
|
||||||
import ./persistency/test_all
|
import ./persistency/test_all
|
||||||
|
|
||||||
|
# Reliable Channel API tests
|
||||||
|
import ./channels/test_all
|
||||||
|
|||||||
3
tests/channels/test_all.nim
Normal file
3
tests/channels/test_all.nim
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
{.used.}
|
||||||
|
|
||||||
|
import ./test_reliable_channel_send_receive
|
||||||
317
tests/channels/test_reliable_channel_send_receive.nim
Normal file
317
tests/channels/test_reliable_channel_send_receive.nim
Normal file
@ -0,0 +1,317 @@
|
|||||||
|
{.used.}
|
||||||
|
|
||||||
|
import std/[net]
|
||||||
|
import chronos, testutils/unittests, stew/byteutils
|
||||||
|
import brokers/broker_context
|
||||||
|
|
||||||
|
import ../testlib/[common, wakucore, wakunode, testasync]
|
||||||
|
|
||||||
|
import waku
|
||||||
|
import waku/[waku_node, waku_core]
|
||||||
|
import waku/factory/waku_conf
|
||||||
|
import waku/events/message_events as waku_message_events
|
||||||
|
import tools/confutils/cli_args
|
||||||
|
|
||||||
|
import channels/reliable_channel_manager
|
||||||
|
import channels/encryption/noop_encryption
|
||||||
|
|
||||||
|
const TestTimeout = chronos.seconds(15)
|
||||||
|
|
||||||
|
proc createApiNodeConf(): WakuNodeConf =
|
||||||
|
var conf = defaultWakuNodeConf().valueOr:
|
||||||
|
raiseAssert error
|
||||||
|
conf.mode = cli_args.WakuMode.Core
|
||||||
|
conf.listenAddress = parseIpAddress("0.0.0.0")
|
||||||
|
conf.tcpPort = Port(0)
|
||||||
|
conf.discv5UdpPort = Port(0)
|
||||||
|
conf.clusterId = 3'u16
|
||||||
|
conf.numShardsInNetwork = 1
|
||||||
|
conf.reliabilityEnabled = true
|
||||||
|
conf.rest = false
|
||||||
|
return conf
|
||||||
|
|
||||||
|
suite "Reliable Channel - ingress":
|
||||||
|
asyncTest "manager dispatches marked WakuMessage to the right channel":
|
||||||
|
## Unit test for the receive side of the API: instead of standing
|
||||||
|
## up two libp2p nodes and a relay mesh, we drive the manager
|
||||||
|
## directly by emitting a `MessageReceivedEvent` (the exact event
|
||||||
|
## the DeliveryService emits when a `WakuMessage` arrives off the
|
||||||
|
## wire). The manager must:
|
||||||
|
## - drop traffic missing the Reliable Channel spec marker
|
||||||
|
## - dispatch the matching channel's `onMessageReceived`
|
||||||
|
## - emit `ChannelMessageReceivedEvent` with the payload
|
||||||
|
const
|
||||||
|
channelId = ChannelId("test-channel")
|
||||||
|
contentTopic = ContentTopic("/reliable-channel/test/proto")
|
||||||
|
let appPayload = "hello reliable channel".toBytes()
|
||||||
|
|
||||||
|
var manager: ReliableChannelManager
|
||||||
|
var brokerCtx: BrokerContext
|
||||||
|
lockNewGlobalBrokerContext:
|
||||||
|
brokerCtx = globalBrokerContext()
|
||||||
|
manager = (await ReliableChannelManager.new(createApiNodeConf())).expect(
|
||||||
|
"Failed to create manager"
|
||||||
|
)
|
||||||
|
|
||||||
|
## Noop encryption providers so the Encrypt/Decrypt brokers have
|
||||||
|
## something to dispatch to; without this the channel falls back to
|
||||||
|
## plaintext anyway, but installing them is the documented setup.
|
||||||
|
setNoopEncryption()
|
||||||
|
|
||||||
|
discard manager
|
||||||
|
.createReliableChannel(channelId, contentTopic, SdsParticipantID("local"))
|
||||||
|
.expect("createReliableChannel")
|
||||||
|
|
||||||
|
let received = newFuture[seq[byte]]("channel-message-received")
|
||||||
|
discard ChannelMessageReceivedEvent
|
||||||
|
.listen(
|
||||||
|
brokerCtx,
|
||||||
|
proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} =
|
||||||
|
if not received.finished() and evt.channelId == channelId:
|
||||||
|
received.complete(evt.payload)
|
||||||
|
,
|
||||||
|
)
|
||||||
|
.expect("listen ChannelMessageReceivedEvent")
|
||||||
|
|
||||||
|
## Build a `WakuMessage` that looks like one that came in off the
|
||||||
|
## wire from a peer: the spec marker on `meta` plus the right content
|
||||||
|
## topic. The manager's ingress listener should pick it up,
|
||||||
|
## decrypt (noop), unwrap SDS (pass-through), reassemble (one
|
||||||
|
## segment), and finally emit `ChannelMessageReceivedEvent`.
|
||||||
|
let inboundMsg = WakuMessage(
|
||||||
|
payload: appPayload,
|
||||||
|
contentTopic: contentTopic,
|
||||||
|
version: 0,
|
||||||
|
meta: LipWireReliableChannelVersion.toBytes(),
|
||||||
|
)
|
||||||
|
|
||||||
|
waku_message_events.MessageReceivedEvent.emit(
|
||||||
|
brokerCtx,
|
||||||
|
waku_message_events.MessageReceivedEvent(messageHash: "", message: inboundMsg),
|
||||||
|
)
|
||||||
|
|
||||||
|
let arrived = await received.withTimeout(TestTimeout)
|
||||||
|
check arrived
|
||||||
|
if arrived:
|
||||||
|
check received.read() == appPayload
|
||||||
|
|
||||||
|
await manager.stop()
|
||||||
|
|
||||||
|
asyncTest "manager drops unmarked WakuMessage":
|
||||||
|
## Mirror of the above: same content topic, but `meta` is empty
|
||||||
|
## (i.e. foreign traffic). The channel-level event must NOT fire.
|
||||||
|
const
|
||||||
|
channelId = ChannelId("test-channel-2")
|
||||||
|
contentTopic = ContentTopic("/reliable-channel/test/proto")
|
||||||
|
let appPayload = "foreign payload".toBytes()
|
||||||
|
|
||||||
|
var manager: ReliableChannelManager
|
||||||
|
var brokerCtx: BrokerContext
|
||||||
|
lockNewGlobalBrokerContext:
|
||||||
|
brokerCtx = globalBrokerContext()
|
||||||
|
manager = (await ReliableChannelManager.new(createApiNodeConf())).expect(
|
||||||
|
"Failed to create manager"
|
||||||
|
)
|
||||||
|
|
||||||
|
setNoopEncryption()
|
||||||
|
|
||||||
|
discard manager
|
||||||
|
.createReliableChannel(channelId, contentTopic, SdsParticipantID("local"))
|
||||||
|
.expect("createReliableChannel")
|
||||||
|
|
||||||
|
var fired = false
|
||||||
|
discard ChannelMessageReceivedEvent
|
||||||
|
.listen(
|
||||||
|
brokerCtx,
|
||||||
|
proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} =
|
||||||
|
if evt.channelId == channelId:
|
||||||
|
fired = true
|
||||||
|
,
|
||||||
|
)
|
||||||
|
.expect("listen ChannelMessageReceivedEvent")
|
||||||
|
|
||||||
|
let inboundMsg = WakuMessage(
|
||||||
|
payload: appPayload,
|
||||||
|
contentTopic: contentTopic,
|
||||||
|
version: 0,
|
||||||
|
meta: @[], ## no Reliable Channel spec marker
|
||||||
|
)
|
||||||
|
|
||||||
|
waku_message_events.MessageReceivedEvent.emit(
|
||||||
|
brokerCtx,
|
||||||
|
waku_message_events.MessageReceivedEvent(messageHash: "", message: inboundMsg),
|
||||||
|
)
|
||||||
|
|
||||||
|
## Give the event broker a chance to fan out.
|
||||||
|
await sleepAsync(100.milliseconds)
|
||||||
|
check not fired
|
||||||
|
|
||||||
|
await manager.stop()
|
||||||
|
|
||||||
|
suite "Reliable Channel - send state machine":
|
||||||
|
asyncTest "MessageSentEvent finalises the channelReqId as Sent":
|
||||||
|
## Drives the real send pipeline (`send` -> segmentation -> SDS ->
|
||||||
|
## rate_limit -> encrypt -> dispatch) via a fake `SendHandler` that
|
||||||
|
## returns a canned `RequestId` instead of hitting the network.
|
||||||
|
## Emitting the delivery-layer `MessageSentEvent` must drive the
|
||||||
|
## channel-level state machine through `Confirmed` and produce a
|
||||||
|
## `ChannelMessageSentEvent` (channel-level terminal event) for the
|
||||||
|
## `channelReqId` returned by `send()`.
|
||||||
|
const
|
||||||
|
channelId = ChannelId("sm-success-channel")
|
||||||
|
contentTopic = ContentTopic("/reliable-channel/test/sm-success")
|
||||||
|
fakeMsgReqId = RequestId("fake-msg-req-1")
|
||||||
|
|
||||||
|
var manager: ReliableChannelManager
|
||||||
|
var brokerCtx: BrokerContext
|
||||||
|
lockNewGlobalBrokerContext:
|
||||||
|
brokerCtx = globalBrokerContext()
|
||||||
|
manager = (await ReliableChannelManager.new(createApiNodeConf())).expect(
|
||||||
|
"Failed to create manager"
|
||||||
|
)
|
||||||
|
|
||||||
|
setNoopEncryption()
|
||||||
|
|
||||||
|
var sendCalls = 0
|
||||||
|
let fakeSend: SendHandler = proc(
|
||||||
|
env: MessageEnvelope
|
||||||
|
): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} =
|
||||||
|
sendCalls.inc
|
||||||
|
return ok(fakeMsgReqId)
|
||||||
|
|
||||||
|
discard manager
|
||||||
|
.createReliableChannel(
|
||||||
|
channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend
|
||||||
|
)
|
||||||
|
.expect("createReliableChannel")
|
||||||
|
|
||||||
|
let sentFut = newFuture[RequestId]("channel-sent")
|
||||||
|
discard ChannelMessageSentEvent
|
||||||
|
.listen(
|
||||||
|
brokerCtx,
|
||||||
|
proc(evt: ChannelMessageSentEvent) {.async: (raises: []).} =
|
||||||
|
if not sentFut.finished() and evt.channelId == channelId:
|
||||||
|
sentFut.complete(evt.requestId)
|
||||||
|
,
|
||||||
|
)
|
||||||
|
.expect("listen ChannelMessageSentEvent")
|
||||||
|
|
||||||
|
let channelReqId = manager.send(channelId, "hello".toBytes()).expect("send")
|
||||||
|
|
||||||
|
let dispatchDeadline = Moment.now() + 1.seconds
|
||||||
|
while Moment.now() < dispatchDeadline and sendCalls == 0:
|
||||||
|
await sleepAsync(5.milliseconds)
|
||||||
|
check sendCalls == 1
|
||||||
|
|
||||||
|
waku_message_events.MessageSentEvent.emit(
|
||||||
|
brokerCtx,
|
||||||
|
waku_message_events.MessageSentEvent(requestId: fakeMsgReqId, messageHash: ""),
|
||||||
|
)
|
||||||
|
|
||||||
|
let finalised = await sentFut.withTimeout(1.seconds)
|
||||||
|
check finalised
|
||||||
|
if finalised:
|
||||||
|
check sentFut.read() == channelReqId
|
||||||
|
|
||||||
|
await manager.stop()
|
||||||
|
|
||||||
|
asyncTest "two independent channelReqIds are finalised independently":
|
||||||
|
## Two `send()` calls -> two independent `channelReqId`s, each with
|
||||||
|
## one segment under the current segmentation skeleton
|
||||||
|
## (`performSegmentation` always emits exactly one segment). The
|
||||||
|
## fake `SendHandler` returns distinct `messagingReqId`s; finalising
|
||||||
|
## the first emits `ChannelMessageSentEvent` for its `channelReqId`,
|
||||||
|
## finalising the second as a failure emits `ChannelMessageErrorEvent`
|
||||||
|
## for the other.
|
||||||
|
const
|
||||||
|
channelId = ChannelId("sm-multi-channel")
|
||||||
|
contentTopic = ContentTopic("/reliable-channel/test/sm-multi")
|
||||||
|
|
||||||
|
var manager: ReliableChannelManager
|
||||||
|
var brokerCtx: BrokerContext
|
||||||
|
lockNewGlobalBrokerContext:
|
||||||
|
brokerCtx = globalBrokerContext()
|
||||||
|
manager = (await ReliableChannelManager.new(createApiNodeConf())).expect(
|
||||||
|
"Failed to create manager"
|
||||||
|
)
|
||||||
|
|
||||||
|
setNoopEncryption()
|
||||||
|
|
||||||
|
var msgReqIds: seq[RequestId]
|
||||||
|
let fakeSend: SendHandler = proc(
|
||||||
|
env: MessageEnvelope
|
||||||
|
): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} =
|
||||||
|
let id = RequestId("fake-msg-req-" & $(msgReqIds.len + 1))
|
||||||
|
msgReqIds.add(id)
|
||||||
|
return ok(id)
|
||||||
|
|
||||||
|
discard manager
|
||||||
|
.createReliableChannel(
|
||||||
|
channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend
|
||||||
|
)
|
||||||
|
.expect("createReliableChannel")
|
||||||
|
|
||||||
|
let sentFut = newFuture[RequestId]("channel-sent")
|
||||||
|
let erroredFut = newFuture[RequestId]("channel-errored")
|
||||||
|
discard ChannelMessageSentEvent
|
||||||
|
.listen(
|
||||||
|
brokerCtx,
|
||||||
|
proc(evt: ChannelMessageSentEvent) {.async: (raises: []).} =
|
||||||
|
if not sentFut.finished() and evt.channelId == channelId:
|
||||||
|
sentFut.complete(evt.requestId)
|
||||||
|
,
|
||||||
|
)
|
||||||
|
.expect("listen ChannelMessageSentEvent")
|
||||||
|
discard ChannelMessageErrorEvent
|
||||||
|
.listen(
|
||||||
|
brokerCtx,
|
||||||
|
proc(evt: ChannelMessageErrorEvent) {.async: (raises: []).} =
|
||||||
|
if not erroredFut.finished() and evt.channelId == channelId:
|
||||||
|
erroredFut.complete(evt.requestId)
|
||||||
|
,
|
||||||
|
)
|
||||||
|
.expect("listen ChannelMessageErrorEvent")
|
||||||
|
|
||||||
|
let channelReqId1 = manager.send(channelId, "first".toBytes()).expect("send 1")
|
||||||
|
let channelReqId2 = manager.send(channelId, "second".toBytes()).expect("send 2")
|
||||||
|
|
||||||
|
let dispatchDeadline = Moment.now() + 1.seconds
|
||||||
|
while Moment.now() < dispatchDeadline and msgReqIds.len < 2:
|
||||||
|
await sleepAsync(5.milliseconds)
|
||||||
|
check msgReqIds.len == 2
|
||||||
|
|
||||||
|
waku_message_events.MessageSentEvent.emit(
|
||||||
|
brokerCtx,
|
||||||
|
waku_message_events.MessageSentEvent(requestId: msgReqIds[0], messageHash: ""),
|
||||||
|
)
|
||||||
|
let sentArrived = await sentFut.withTimeout(1.seconds)
|
||||||
|
check sentArrived
|
||||||
|
if sentArrived:
|
||||||
|
check sentFut.read() == channelReqId1
|
||||||
|
## The second `channelReqId` must NOT have finalised yet — its
|
||||||
|
## segment is still `InFlight`.
|
||||||
|
check not erroredFut.finished()
|
||||||
|
|
||||||
|
waku_message_events.MessageErrorEvent.emit(
|
||||||
|
brokerCtx,
|
||||||
|
waku_message_events.MessageErrorEvent(
|
||||||
|
requestId: msgReqIds[1], messageHash: "", error: "synthetic"
|
||||||
|
),
|
||||||
|
)
|
||||||
|
let erroredArrived = await erroredFut.withTimeout(1.seconds)
|
||||||
|
check erroredArrived
|
||||||
|
if erroredArrived:
|
||||||
|
check erroredFut.read() == channelReqId2
|
||||||
|
|
||||||
|
await manager.stop()
|
||||||
|
|
||||||
|
asyncTest "TODO: channelReqId not pruned until ALL its segments are final":
|
||||||
|
## Placeholder for the multi-sibling prune rule. Today's
|
||||||
|
## `performSegmentation` (segmentation skeleton) always emits
|
||||||
|
## exactly one segment per `send()`, so multiple siblings under one
|
||||||
|
## `channelReqId` cannot be produced through the real pipeline.
|
||||||
|
## Implement once segmentation does real chunking: send a payload
|
||||||
|
## larger than `DefaultSegmentSizeBytes`, capture the N
|
||||||
|
## `messagingReqId`s from a fake `SendHandler`, finalise some, and
|
||||||
|
## assert prune only fires once every sibling is final.
|
||||||
|
skip()
|
||||||
@ -1,11 +1,11 @@
|
|||||||
import std/[options], stew/results, testutils/unittests
|
import std/[options], results, testutils/unittests
|
||||||
|
|
||||||
import
|
import
|
||||||
waku/node/peer_manager/peer_store/migrations,
|
waku/node/peer_manager/peer_store/migrations,
|
||||||
../../waku_archive/archive_utils,
|
../../waku_archive/archive_utils,
|
||||||
../../testlib/[simple_mock]
|
../../testlib/[simple_mock]
|
||||||
|
|
||||||
import std/[tables, strutils, os], stew/results, chronicles
|
import std/[tables, strutils, os], results, chronicles
|
||||||
|
|
||||||
import waku/common/databases/db_sqlite, waku/common/databases/common
|
import waku/common/databases/db_sqlite, waku/common/databases/common
|
||||||
|
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
import stew/results, testutils/unittests
|
import results, testutils/unittests
|
||||||
|
|
||||||
import waku/node/peer_manager/peer_store/peer_storage, waku/waku_core/peers
|
import waku/node/peer_manager/peer_store/peer_storage, waku/waku_core/peers
|
||||||
|
|
||||||
|
|||||||
@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
import
|
import
|
||||||
std/[tempfiles, strutils, options],
|
std/[tempfiles, strutils, options],
|
||||||
stew/results,
|
results,
|
||||||
testutils/unittests,
|
testutils/unittests,
|
||||||
chronos,
|
chronos,
|
||||||
libp2p/switch,
|
libp2p/switch,
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
{.used.}
|
{.used.}
|
||||||
|
|
||||||
import testutils/unittests
|
import testutils/unittests
|
||||||
import stew/results, waku/waku_core/message, waku/waku_core/time, ./testlib/common
|
import results, waku/waku_core/message, waku/waku_core/time, ./testlib/common
|
||||||
|
|
||||||
suite "Waku Payload":
|
suite "Waku Payload":
|
||||||
test "Encode/Decode waku message with timestamp":
|
test "Encode/Decode waku message with timestamp":
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
{.used.}
|
{.used.}
|
||||||
|
|
||||||
import
|
import
|
||||||
stew/results,
|
results,
|
||||||
chronos,
|
chronos,
|
||||||
testutils/unittests,
|
testutils/unittests,
|
||||||
libp2p/crypto/crypto as libp2p_keys,
|
libp2p/crypto/crypto as libp2p_keys,
|
||||||
|
|||||||
@ -11,6 +11,10 @@ type
|
|||||||
contentTopic*: ContentTopic
|
contentTopic*: ContentTopic
|
||||||
payload*: seq[byte]
|
payload*: seq[byte]
|
||||||
ephemeral*: bool
|
ephemeral*: bool
|
||||||
|
meta*: seq[byte]
|
||||||
|
## Opaque wire-format marker carried on the underlying WakuMessage.
|
||||||
|
## Higher layers (e.g. Reliable Channel) stamp this so peers can route
|
||||||
|
## ingress traffic to their corresponding layer. Empty by default.
|
||||||
|
|
||||||
RequestId* = distinct string
|
RequestId* = distinct string
|
||||||
|
|
||||||
@ -34,12 +38,18 @@ proc init*(
|
|||||||
contentTopic: ContentTopic,
|
contentTopic: ContentTopic,
|
||||||
payload: seq[byte] | string,
|
payload: seq[byte] | string,
|
||||||
ephemeral: bool = false,
|
ephemeral: bool = false,
|
||||||
|
meta: seq[byte] = @[],
|
||||||
): MessageEnvelope =
|
): MessageEnvelope =
|
||||||
when payload is seq[byte]:
|
when payload is seq[byte]:
|
||||||
MessageEnvelope(contentTopic: contentTopic, payload: payload, ephemeral: ephemeral)
|
MessageEnvelope(
|
||||||
|
contentTopic: contentTopic, payload: payload, ephemeral: ephemeral, meta: meta
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
MessageEnvelope(
|
MessageEnvelope(
|
||||||
contentTopic: contentTopic, payload: payload.toBytes(), ephemeral: ephemeral
|
contentTopic: contentTopic,
|
||||||
|
payload: payload.toBytes(),
|
||||||
|
ephemeral: ephemeral,
|
||||||
|
meta: meta,
|
||||||
)
|
)
|
||||||
|
|
||||||
proc toWakuMessage*(envelope: MessageEnvelope): WakuMessage =
|
proc toWakuMessage*(envelope: MessageEnvelope): WakuMessage =
|
||||||
@ -48,6 +58,7 @@ proc toWakuMessage*(envelope: MessageEnvelope): WakuMessage =
|
|||||||
contentTopic: envelope.contentTopic,
|
contentTopic: envelope.contentTopic,
|
||||||
payload: envelope.payload,
|
payload: envelope.payload,
|
||||||
ephemeral: envelope.ephemeral,
|
ephemeral: envelope.ephemeral,
|
||||||
|
meta: envelope.meta,
|
||||||
timestamp: getNowInNanosecondTime(),
|
timestamp: getNowInNanosecondTime(),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@ -26,7 +26,7 @@ logScope:
|
|||||||
# This useful util is missing from sequtils, this extends applyIt with predicate...
|
# This useful util is missing from sequtils, this extends applyIt with predicate...
|
||||||
template applyItIf*(varSeq, pred, op: untyped) =
|
template applyItIf*(varSeq, pred, op: untyped) =
|
||||||
for i in low(varSeq) .. high(varSeq):
|
for i in low(varSeq) .. high(varSeq):
|
||||||
let it {.inject.} = varSeq[i]
|
var it {.inject.} = varSeq[i]
|
||||||
if pred:
|
if pred:
|
||||||
op
|
op
|
||||||
varSeq[i] = it
|
varSeq[i] = it
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user