2026-04-09 14:29:17 -03:00
|
|
|
{.used.}
|
|
|
|
|
|
|
|
|
|
import std/[options, sequtils, net, sets]
|
|
|
|
|
import chronos, testutils/unittests, stew/byteutils
|
|
|
|
|
import libp2p/[peerid, peerinfo, crypto/crypto]
|
feat: persistency (#3880)
* persistency: per-job SQLite-backed storage layer (singleton, brokered)
Adds a backend-neutral CRUD library at waku/persistency/, plus the
nim-brokers dependency swap that enables it.
Architecture (ports-and-adapters):
* Persistency: process-wide singleton, one root directory.
* Job: one tenant, one DB file, one worker thread, one BrokerContext.
* Backend: SQLite via waku/common/databases/db_sqlite. Uniform schema
kv(category BLOB, key BLOB, payload BLOB) PRIMARY KEY (category, key)
WITHOUT ROWID, WAL mode.
* Writes are fire-and-forget via EventBroker(mt) PersistEvent.
* Reads are async via five RequestBroker(mt) shapes (KvGet, KvExists,
KvScan, KvCount, KvDelete). Reads return Result[T, PersistencyError].
* One storage thread per job; tenants isolated by BrokerContext.
Public surface (waku/persistency/persistency.nim):
Persistency.instance(rootDir) / Persistency.instance() / Persistency.reset()
p.openJob(id) / p.closeJob(id) / p.dropJob(id) / p.close()
p.job(id) / p[id] / p.hasJob(id)
Writes (Job form & string-id form, fire-and-forget):
persist / persistPut / persistDelete / persistEncoded
Reads (Job form & string-id form, async Result):
get / exists / scan / scanPrefix / count / deleteAcked
Key & payload encoding (keys.nim, payload.nim):
* encodePart family + variadic key(...) / payload(...) macros +
single-value toKey / toPayload.
* Primitives: string and openArray[byte] are 2-byte BE length + bytes;
int{8..64} are sign-flipped 8-byte BE; uint{16..64} are 8-byte BE;
bool/byte/char are 1 byte; enums are int64(ord(v)).
* Generic encodePart[T: tuple | object] recurses through fields() so
any composite Nim type is encodable without ceremony.
* Stable across Nim/C compiler upgrades: no sizeof, no memcpy, no
cast on pointers, no host-endianness dependency.
* `rawKey(bytes)` + `persistPut(..., openArray[byte])` let callers
bypass the built-in encoder with their own format (CBOR, protobuf...).
Lifecycle:
* Persistency.new is private; Persistency.instance is the only public
constructor. Same rootDir is idempotent; conflicting rootDir is
peInvalidArgument. Persistency.reset for test/restart paths.
* openJob opens-or-creates the per-job SQLite file; an existing file
is reused with its data preserved.
* Teardown integration: Persistency.instance registers a Teardown
MultiRequestBroker provider that closes all jobs and clears the
singleton slot when Waku.stop() issues Teardown.request.
Internal layering:
types.nim pure value types (Key, KeyRange, KvRow, TxOp,
PersistencyError)
keys.nim encodePart primitives + key(...) macro
payload.nim toPayload + payload(...) macro
schema.nim CREATE TABLE + connection pragmas + user_version
backend_sqlite.nim KvBackend, applyOps (single source of write SQL),
getOne/existsOne/deleteOne, scanRange (asc/desc,
half-open ranges, open-ended stop), countRange
backend_comm.nim EventBroker(mt) PersistEvent + 5 RequestBroker(mt)
declarations; encodeErr/decodeErr boundary helpers
backend_thread.nim startStorageThread / stopStorageThread (shared
allocShared0 arg, cstring dbPath, atomic
ready/shutdown flags); per-thread provider
registration
persistency.nim Persistency + Job types, singleton state, public
facade
../requests/lifecycle_requests.nim
Teardown MultiRequestBroker
Tests (69 cases, all passing):
test_keys.nim sort-order invariants (length-prefix strings,
sign-flipped ints, composite tuples, prefix
range)
test_backend.nim round-trip / replace / delete-return-value /
batched atomicity / asc-desc-half-open-open-
ended scans / category isolation / batch
txDelete
test_lifecycle.nim open-or-create rootDir / non-dir collision /
reopen across sessions / idempotent openJob /
two-tenant parallel isolation / closeJob joins
worker / dropJob removes file / acked delete
test_facade.nim put-then-get / atomic batch / scanPrefix
asc/desc / deleteAcked hit-miss /
fire-and-forget delete / two-tenant facade
isolation
test_encoding.nim tuple/named-tuple/object keys, embedded Key,
enum encoding, field-major composite sort,
payload struct encoding, end-to-end struct
round-trip through SQLite
test_string_lookup.nim peJobNotFound semantics / hasJob / subscript /
persistPut+get via id / reads short-circuit /
writes drop+warn / persistEncoded via id /
scan parity Job-ref vs id
test_singleton.nim idempotent same-rootDir / different-rootDir
rejection / no-arg instance lifecycle / reset
retargets / reset idempotence / Teardown.request
end-to-end
Prerequisite delivered in the same series: replace the in-tree broker
implementation with the external nim-brokers package; update all
broker call-sites (waku_filter_v2, waku_relay, waku_rln_relay,
delivery_service, peer_manager, requests/*, factory/*, api tests, etc.)
to the new package API; chat2 made to compile again.
Note: SDS adapter (Phase 5 of the design) is deferred -- nim-sds is
still developed side-by-side and the persistency layer is intentionally
SDS-agnostic.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* persistency: pin nim-brokers by URL+commit (workaround for stale registry)
The bare `brokers >= 2.0.1` form cannot resolve on machines where the
local nimble SAT solver enumerates only the registry-recorded 0.1.0 for
brokers. The nim-lang/packages entry for `brokers` carries no per-tag
metadata (only the URL), so until that registry entry is refreshed the
SAT solver clamps the available-versions list to 0.1.0 and rejects the
>= 2.0.1 constraint -- even though pkgs2 and pkgcache both have v2.0.1
cloned locally.
Pinning by URL+commit bypasses the registry path entirely. Inline
comment in waku.nimble documents the situation and the path back to
the bare form once nim-lang/packages is updated.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* persistency: nph format pass
Run `nph` on all 57 Nim files touched by this PR. Pure formatting:
17 files re-styled, no semantic change. Suite still 69/69.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* Fix build, add local-storage-path config, lazy init of Persistency from Waku start
* fix: fix nix deps
* fixes for nix build, regenerate deps
* reverting accidental dependency changes
* Fixing deps
* Apply suggestions from code review
Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
* persistency tests: migrate to suite / asyncTest / await
Match the in-tree test convention (procSuite -> suite, sync test +
waitFor -> asyncTest + await):
- procSuite "X": -> suite "X":
- For tests doing async work: test -> asyncTest, waitFor -> await.
- Poll helpers (proc waitFor(t: Job, ...) in test_lifecycle.nim,
proc waitUntilExists(...) in test_facade.nim and
test_string_lookup.nim) -> Future[bool] {.async.}, internal
`waitFor X` -> `await X`, internal `sleep(N)` ->
`await sleepAsync(chronos.milliseconds(N))`.
- Renamed test_lifecycle.nim's helper proc from `waitFor(t: Job, ...)`
-> `pollExists(t: Job, ...)`; the previous name shadowed
chronos.waitFor in the chronos macro expansion.
- `chronos.milliseconds(N)` explicitly qualified because `std/times`
also exports `milliseconds` (returning TimeInterval, not Duration).
- `check await x` -> `let okN = await x; check okN` to dodge chronos's
"yield in expr not lowered" with await-as-macro-argument.
- `(await x).foo()` -> `let awN = await x; ... awN.foo() ...` for the
same reason.
waku/persistency/persistency.nim: nph also pulled the proc signatures
across multiple lines; restored explicit `Future[void] {.async.}`
return types after the colon (an intermediate nph pass had elided them).
Suite: 71 / 71 OK against the new async write surface.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* use idiomatic valueOr instead of ifs
* Reworked persistency shutdown, remove not necessary teardown mechanism
* Use const for DefaultStoragePath
* format to follow coding guidelines - no use of result and explicit returns - no functional change
---------
Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
2026-05-16 00:09:07 +02:00
|
|
|
import brokers/broker_context
|
2026-04-09 14:29:17 -03:00
|
|
|
import ../testlib/[common, wakucore, wakunode, testasync]
|
|
|
|
|
import ../waku_archive/archive_utils
|
2026-06-08 13:37:53 +02:00
|
|
|
import logos_delivery/messaging/messaging_client
|
|
|
|
|
import logos_delivery/messaging/delivery_service/recv_service
|
2026-04-09 14:29:17 -03:00
|
|
|
|
|
|
|
|
import
|
2026-06-08 13:37:53 +02:00
|
|
|
logos_delivery,
|
|
|
|
|
logos_delivery/waku/[
|
2026-04-09 14:29:17 -03:00
|
|
|
waku_node,
|
|
|
|
|
waku_core,
|
Integrate api-shape phase2 (#3989) + api interfaces (#3975) (#3999)
* Reshape per-layer API into api/ folders and thin the FFI over them
Each layer now separates its constructible core from its public surface:
- core module (waku.nim / messaging_client.nim /
reliable_channel_manager.nim): the type plus new/start/stop and the
private construction helpers.
- api/ folder: one module per differentiated set of operations
(waku: topics/relay/filter/lightpush/store/peer_manager/discovery/
debug/health) plus an events surface.
The waku api is reshaped to be the complete operation surface the C
bindings need, so the library no longer reaches into node internals:
relayPublish returns the message hash, relaySubscribe takes an optional
handler, filter/lightpush auto-select the service peer, connectedPeersInfo
returns structured data, pingPeer honours the timeout, plus
relayNumPeersInMesh / relayNumConnectedPeers / isOnline. library/ is now a
thin C-ABI shim: each {.ffi.} proc only marshals cstring/JSON/callbacks and
delegates to ctx.myLib[].waku.<op> (or messagingClient.<op>).
app_callbacks re-exports the modules defining its handler types, which the
included FFI files previously relied on by leakage.
Events move next to the surface that owns them, with each dependency kept
pointing the right way:
- waku/events/ relocated under waku/api/events/.
- channel events live in channels/api/events.nim.
- the four messaging-level message events move to messaging/api/events;
MessageSeenEvent stays in waku because it is emitted by waku core, so
moving it would make waku depend on the messaging layer.
- delivery_events renamed to filter_subscribe_events to match the
OnFilterSubscribe/Unsubscribe events it actually declares.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
* Add reliable-channel FFI ops + events (nim-ffi v0.1.3)
Expose the reliable-channel layer through the v0.1.3 FFI:
- channel_create / channel_send / channel_close call the
ReliableChannelManager api (createReliableChannel / send / closeChannel),
marshalling channel id + base64 payload + ephemeral by hand
- channel message received / sent / errored are surfaced by listening to the
channel-layer broker events in start_node and forwarding them through
callEventCallback (received payload base64-encoded), dropped in stop_node
Stays on nim-ffi v0.1.3 (no typed/CBOR rewrite).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
* Expose reliable-channel ops in the stable C header (#3851)
The library already ships as a single .so with a tiered header surface
(liblogosdelivery.h = stable Messaging/Reliable-Channels, liblogosdelivery_kernel.h
= advanced Kernel). Per that tiering, the reliable-channel ops belong on the
stable surface, so declare channel_create / channel_send / channel_close in
liblogosdelivery.h and document the channel lifecycle events delivered through
the event callback.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
* Graft PR#3975 interface layer onto decomposed foundation (events deduped)
Add IKernel/IMessagingClient/IReliableChannelManager/ILogosDelivery interface
classes under logos_delivery/api/. The EventBroker types PR#3975 hoisted into
these files already exist in PR#3989's decomposed */api/events/ modules, so the
interface files re-export those modules instead of redefining the types
(avoids 8 duplicate EventBroker definitions). api/types.nim kept at the
foundation version (ChannelId stays in channels/types.nim, which the decomposed
modules import).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
* Wire impl classes to interfaces (inherit; relocate SendHandler)
- Waku : IKernel, MessagingClient : IMessagingClient,
ReliableChannelManager : IReliableChannelManager.
- The operation procs already live in PR#3989's decomposed */api/ modules and
stay as plain procs (nothing dispatches through the interface types, so no
method-ization is needed).
- SendHandler now lives in reliable_channel_manager_api.nim (its PR#3975 home);
removed the duplicate from reliable_channel.nim, which re-exports the
interface module so channels/api/{channel_lifecycle,send} still see it.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
* Wire LogosDelivery to ILogosDelivery orchestrator interface
LogosDelivery : ILogosDelivery; start/stop/isOnline become method overrides.
Peripheral PR#3975 edits (lightpush/store clients, self_req_handlers,
statistics) are import-reorg artifacts of deleting waku/utils/requests.nim,
which the decomposed structure keeps -- so they are intentionally not ported.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
* Dedup EventConnectionStatusChange (re-export from health_events)
9th duplicate EventBroker type: defined in both logos_delivery_api.nim and the
decomposed waku/api/events/health_events.nim. The interface file now re-exports
it. liblogosdelivery builds clean.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
* Move events back into interface-class source files (restore #3975 placement)
Reverses the earlier dedup-by-re-export: event TYPE definitions now live in the
interface classes, and the emptied decomposed event files are removed.
- MessageSeenEvent -> logos_delivery/api/kernel_api.nim
- Message{Sent,Error,Propagated,Received}Event -> api/messaging_client_api.nim
- ChannelMessage{Received,Sent,Error}Event -> api/reliable_channel_manager_api.nim
- EventConnectionStatusChange -> api/logos_delivery_api.nim
Deleted (became empty after the move):
- logos_delivery/waku/api/events/message_events.nim
- logos_delivery/messaging/api/events.nim
- logos_delivery/channels/api/events.nim
health_events.nim keeps its two remaining events (content/shard topic health).
Rewiring: each layer re-exports its interface module (waku->kernel_api,
messaging_client->messaging_client_api, reliable_channel->reliable_channel_manager_api,
which also re-exports messaging_client_api). Deep emitters/listeners
(subscription_manager, waku_node, waku_node/relay, node_health_monitor,
recv_service, send_service) import the owning interface module directly.
kernel_api stays below node level (types/topics/message/store-common) so the
node->kernel_api imports are acyclic. liblogosdelivery builds.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
* nph formatting
---------
Co-authored-by: Ivan FB <ivansete@status.im>
Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-30 11:51:22 +02:00
|
|
|
api/events/health_events,
|
2026-04-09 14:29:17 -03:00
|
|
|
waku_relay/protocol,
|
|
|
|
|
waku_archive,
|
|
|
|
|
waku_archive/common as archive_common,
|
|
|
|
|
]
|
2026-06-08 13:37:53 +02:00
|
|
|
import logos_delivery/waku/factory/waku_conf
|
2026-04-09 14:29:17 -03:00
|
|
|
import tools/confutils/cli_args
|
|
|
|
|
|
|
|
|
|
const TestTimeout = chronos.seconds(60)
|
|
|
|
|
|
|
|
|
|
type ReceiveEventListenerManager = ref object
|
|
|
|
|
brokerCtx: BrokerContext
|
|
|
|
|
receivedListener: MessageReceivedEventListener
|
|
|
|
|
receivedEvent: AsyncEvent
|
|
|
|
|
receivedMessages: seq[WakuMessage]
|
|
|
|
|
targetCount: int
|
|
|
|
|
|
|
|
|
|
proc newReceiveEventListenerManager(
|
|
|
|
|
brokerCtx: BrokerContext, expectedCount: int = 1
|
|
|
|
|
): ReceiveEventListenerManager =
|
|
|
|
|
let manager = ReceiveEventListenerManager(
|
|
|
|
|
brokerCtx: brokerCtx, receivedMessages: @[], targetCount: expectedCount
|
|
|
|
|
)
|
|
|
|
|
manager.receivedEvent = newAsyncEvent()
|
|
|
|
|
|
|
|
|
|
manager.receivedListener = MessageReceivedEvent
|
|
|
|
|
.listen(
|
|
|
|
|
brokerCtx,
|
|
|
|
|
proc(event: MessageReceivedEvent) {.async: (raises: []).} =
|
|
|
|
|
manager.receivedMessages.add(event.message)
|
|
|
|
|
if manager.receivedMessages.len >= manager.targetCount:
|
|
|
|
|
manager.receivedEvent.fire()
|
|
|
|
|
,
|
|
|
|
|
)
|
|
|
|
|
.expect("Failed to listen to MessageReceivedEvent")
|
|
|
|
|
|
|
|
|
|
return manager
|
|
|
|
|
|
feat: persistency (#3880)
* persistency: per-job SQLite-backed storage layer (singleton, brokered)
Adds a backend-neutral CRUD library at waku/persistency/, plus the
nim-brokers dependency swap that enables it.
Architecture (ports-and-adapters):
* Persistency: process-wide singleton, one root directory.
* Job: one tenant, one DB file, one worker thread, one BrokerContext.
* Backend: SQLite via waku/common/databases/db_sqlite. Uniform schema
kv(category BLOB, key BLOB, payload BLOB) PRIMARY KEY (category, key)
WITHOUT ROWID, WAL mode.
* Writes are fire-and-forget via EventBroker(mt) PersistEvent.
* Reads are async via five RequestBroker(mt) shapes (KvGet, KvExists,
KvScan, KvCount, KvDelete). Reads return Result[T, PersistencyError].
* One storage thread per job; tenants isolated by BrokerContext.
Public surface (waku/persistency/persistency.nim):
Persistency.instance(rootDir) / Persistency.instance() / Persistency.reset()
p.openJob(id) / p.closeJob(id) / p.dropJob(id) / p.close()
p.job(id) / p[id] / p.hasJob(id)
Writes (Job form & string-id form, fire-and-forget):
persist / persistPut / persistDelete / persistEncoded
Reads (Job form & string-id form, async Result):
get / exists / scan / scanPrefix / count / deleteAcked
Key & payload encoding (keys.nim, payload.nim):
* encodePart family + variadic key(...) / payload(...) macros +
single-value toKey / toPayload.
* Primitives: string and openArray[byte] are 2-byte BE length + bytes;
int{8..64} are sign-flipped 8-byte BE; uint{16..64} are 8-byte BE;
bool/byte/char are 1 byte; enums are int64(ord(v)).
* Generic encodePart[T: tuple | object] recurses through fields() so
any composite Nim type is encodable without ceremony.
* Stable across Nim/C compiler upgrades: no sizeof, no memcpy, no
cast on pointers, no host-endianness dependency.
* `rawKey(bytes)` + `persistPut(..., openArray[byte])` let callers
bypass the built-in encoder with their own format (CBOR, protobuf...).
Lifecycle:
* Persistency.new is private; Persistency.instance is the only public
constructor. Same rootDir is idempotent; conflicting rootDir is
peInvalidArgument. Persistency.reset for test/restart paths.
* openJob opens-or-creates the per-job SQLite file; an existing file
is reused with its data preserved.
* Teardown integration: Persistency.instance registers a Teardown
MultiRequestBroker provider that closes all jobs and clears the
singleton slot when Waku.stop() issues Teardown.request.
Internal layering:
types.nim pure value types (Key, KeyRange, KvRow, TxOp,
PersistencyError)
keys.nim encodePart primitives + key(...) macro
payload.nim toPayload + payload(...) macro
schema.nim CREATE TABLE + connection pragmas + user_version
backend_sqlite.nim KvBackend, applyOps (single source of write SQL),
getOne/existsOne/deleteOne, scanRange (asc/desc,
half-open ranges, open-ended stop), countRange
backend_comm.nim EventBroker(mt) PersistEvent + 5 RequestBroker(mt)
declarations; encodeErr/decodeErr boundary helpers
backend_thread.nim startStorageThread / stopStorageThread (shared
allocShared0 arg, cstring dbPath, atomic
ready/shutdown flags); per-thread provider
registration
persistency.nim Persistency + Job types, singleton state, public
facade
../requests/lifecycle_requests.nim
Teardown MultiRequestBroker
Tests (69 cases, all passing):
test_keys.nim sort-order invariants (length-prefix strings,
sign-flipped ints, composite tuples, prefix
range)
test_backend.nim round-trip / replace / delete-return-value /
batched atomicity / asc-desc-half-open-open-
ended scans / category isolation / batch
txDelete
test_lifecycle.nim open-or-create rootDir / non-dir collision /
reopen across sessions / idempotent openJob /
two-tenant parallel isolation / closeJob joins
worker / dropJob removes file / acked delete
test_facade.nim put-then-get / atomic batch / scanPrefix
asc/desc / deleteAcked hit-miss /
fire-and-forget delete / two-tenant facade
isolation
test_encoding.nim tuple/named-tuple/object keys, embedded Key,
enum encoding, field-major composite sort,
payload struct encoding, end-to-end struct
round-trip through SQLite
test_string_lookup.nim peJobNotFound semantics / hasJob / subscript /
persistPut+get via id / reads short-circuit /
writes drop+warn / persistEncoded via id /
scan parity Job-ref vs id
test_singleton.nim idempotent same-rootDir / different-rootDir
rejection / no-arg instance lifecycle / reset
retargets / reset idempotence / Teardown.request
end-to-end
Prerequisite delivered in the same series: replace the in-tree broker
implementation with the external nim-brokers package; update all
broker call-sites (waku_filter_v2, waku_relay, waku_rln_relay,
delivery_service, peer_manager, requests/*, factory/*, api tests, etc.)
to the new package API; chat2 made to compile again.
Note: SDS adapter (Phase 5 of the design) is deferred -- nim-sds is
still developed side-by-side and the persistency layer is intentionally
SDS-agnostic.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* persistency: pin nim-brokers by URL+commit (workaround for stale registry)
The bare `brokers >= 2.0.1` form cannot resolve on machines where the
local nimble SAT solver enumerates only the registry-recorded 0.1.0 for
brokers. The nim-lang/packages entry for `brokers` carries no per-tag
metadata (only the URL), so until that registry entry is refreshed the
SAT solver clamps the available-versions list to 0.1.0 and rejects the
>= 2.0.1 constraint -- even though pkgs2 and pkgcache both have v2.0.1
cloned locally.
Pinning by URL+commit bypasses the registry path entirely. Inline
comment in waku.nimble documents the situation and the path back to
the bare form once nim-lang/packages is updated.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* persistency: nph format pass
Run `nph` on all 57 Nim files touched by this PR. Pure formatting:
17 files re-styled, no semantic change. Suite still 69/69.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* Fix build, add local-storage-path config, lazy init of Persistency from Waku start
* fix: fix nix deps
* fixes for nix build, regenerate deps
* reverting accidental dependency changes
* Fixing deps
* Apply suggestions from code review
Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
* persistency tests: migrate to suite / asyncTest / await
Match the in-tree test convention (procSuite -> suite, sync test +
waitFor -> asyncTest + await):
- procSuite "X": -> suite "X":
- For tests doing async work: test -> asyncTest, waitFor -> await.
- Poll helpers (proc waitFor(t: Job, ...) in test_lifecycle.nim,
proc waitUntilExists(...) in test_facade.nim and
test_string_lookup.nim) -> Future[bool] {.async.}, internal
`waitFor X` -> `await X`, internal `sleep(N)` ->
`await sleepAsync(chronos.milliseconds(N))`.
- Renamed test_lifecycle.nim's helper proc from `waitFor(t: Job, ...)`
-> `pollExists(t: Job, ...)`; the previous name shadowed
chronos.waitFor in the chronos macro expansion.
- `chronos.milliseconds(N)` explicitly qualified because `std/times`
also exports `milliseconds` (returning TimeInterval, not Duration).
- `check await x` -> `let okN = await x; check okN` to dodge chronos's
"yield in expr not lowered" with await-as-macro-argument.
- `(await x).foo()` -> `let awN = await x; ... awN.foo() ...` for the
same reason.
waku/persistency/persistency.nim: nph also pulled the proc signatures
across multiple lines; restored explicit `Future[void] {.async.}`
return types after the colon (an intermediate nph pass had elided them).
Suite: 71 / 71 OK against the new async write surface.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* use idiomatic valueOr instead of ifs
* Reworked persistency shutdown, remove not necessary teardown mechanism
* Use const for DefaultStoragePath
* format to follow coding guidelines - no use of result and explicit returns - no functional change
---------
Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
2026-05-16 00:09:07 +02:00
|
|
|
proc teardown(manager: ReceiveEventListenerManager) {.async.} =
|
|
|
|
|
await MessageReceivedEvent.dropListener(manager.brokerCtx, manager.receivedListener)
|
2026-04-09 14:29:17 -03:00
|
|
|
|
|
|
|
|
proc waitForEvents(
|
|
|
|
|
manager: ReceiveEventListenerManager, timeout: Duration
|
|
|
|
|
): Future[bool] {.async.} =
|
|
|
|
|
return await manager.receivedEvent.wait().withTimeout(timeout)
|
|
|
|
|
|
2026-06-18 13:23:49 -03:00
|
|
|
proc waitForConnectionStatus(
|
|
|
|
|
brokerCtx: BrokerContext, expected: ConnectionStatus
|
|
|
|
|
) {.async.} =
|
|
|
|
|
## Completes when the node reports `expected`.
|
|
|
|
|
var future = newFuture[void]("waitForConnectionStatus")
|
|
|
|
|
|
|
|
|
|
let handler: EventConnectionStatusChangeListenerProc = proc(
|
|
|
|
|
e: EventConnectionStatusChange
|
|
|
|
|
) {.async: (raises: []), gcsafe.} =
|
|
|
|
|
if not future.finished and e.connectionStatus == expected:
|
|
|
|
|
future.complete()
|
|
|
|
|
|
|
|
|
|
let handle = EventConnectionStatusChange.listen(brokerCtx, handler).valueOr:
|
|
|
|
|
raiseAssert error
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
if not await future.withTimeout(TestTimeout):
|
|
|
|
|
raiseAssert "Timeout waiting for status: " & $expected
|
|
|
|
|
finally:
|
|
|
|
|
await EventConnectionStatusChange.dropListener(brokerCtx, handle)
|
|
|
|
|
|
2026-04-09 14:29:17 -03:00
|
|
|
proc createApiNodeConf(numShards: uint16 = 1): WakuNodeConf =
|
|
|
|
|
var conf = defaultWakuNodeConf().valueOr:
|
|
|
|
|
raiseAssert error
|
|
|
|
|
conf.mode = cli_args.WakuMode.Core
|
|
|
|
|
conf.listenAddress = parseIpAddress("0.0.0.0")
|
|
|
|
|
conf.tcpPort = Port(0)
|
|
|
|
|
conf.discv5UdpPort = Port(0)
|
2026-06-10 09:09:22 -03:00
|
|
|
conf.clusterId = some(3'u16)
|
2026-04-09 14:29:17 -03:00
|
|
|
conf.numShardsInNetwork = numShards
|
2026-06-10 09:09:22 -03:00
|
|
|
conf.reliabilityEnabled = some(true)
|
2026-04-09 14:29:17 -03:00
|
|
|
conf.rest = false
|
|
|
|
|
result = conf
|
|
|
|
|
|
2026-06-18 13:23:49 -03:00
|
|
|
type TestNetwork = ref object
|
|
|
|
|
storeNode: WakuNode
|
|
|
|
|
publisher: WakuNode
|
2026-06-23 01:20:09 +02:00
|
|
|
subscriber: LogosDelivery
|
2026-06-18 13:23:49 -03:00
|
|
|
storeNodePeerInfo: RemotePeerInfo
|
|
|
|
|
missedPayload: seq[byte]
|
|
|
|
|
|
|
|
|
|
proc setupNetwork(testTopic: ContentTopic): Future[TestNetwork] {.async.} =
|
|
|
|
|
## Returns a started subscriber subscribed to `testTopic` but not yet connected
|
|
|
|
|
## to the store, with a message sitting in the store it never saw live.
|
|
|
|
|
const numShards: uint16 = 1
|
|
|
|
|
let shard = PubsubTopic("/waku/2/rs/3/0")
|
|
|
|
|
|
|
|
|
|
proc dummyHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
|
|
|
|
discard
|
|
|
|
|
|
|
|
|
|
# store node: archive + store + relay, subscribed to the shard
|
|
|
|
|
var storeNode: WakuNode
|
|
|
|
|
lockNewGlobalBrokerContext:
|
|
|
|
|
storeNode =
|
|
|
|
|
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
|
|
|
|
storeNode.mountMetadata(3, toSeq(0'u16 ..< numShards)).expect(
|
|
|
|
|
"Failed to mount metadata on storeNode"
|
|
|
|
|
)
|
|
|
|
|
(await storeNode.mountRelay()).expect("Failed to mount relay on storeNode")
|
|
|
|
|
storeNode.mountArchive(newSqliteArchiveDriver()).expect("Failed to mount archive")
|
|
|
|
|
await storeNode.mountStore()
|
|
|
|
|
await storeNode.mountLibp2pPing()
|
|
|
|
|
await storeNode.start()
|
|
|
|
|
storeNode.subscribe((kind: PubsubSub, topic: shard), dummyHandler).expect(
|
|
|
|
|
"Failed to sub storeNode"
|
|
|
|
|
)
|
2026-04-09 14:29:17 -03:00
|
|
|
|
2026-06-18 13:23:49 -03:00
|
|
|
let storeNodePeerInfo = storeNode.peerInfo.toRemotePeerInfo()
|
2026-04-09 14:29:17 -03:00
|
|
|
|
2026-06-18 13:23:49 -03:00
|
|
|
# publisher: relay, connected to the store so its messages get archived
|
|
|
|
|
var publisher: WakuNode
|
|
|
|
|
lockNewGlobalBrokerContext:
|
|
|
|
|
publisher =
|
|
|
|
|
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
|
|
|
|
publisher.mountMetadata(3, toSeq(0'u16 ..< numShards)).expect(
|
|
|
|
|
"Failed to mount metadata on publisher"
|
|
|
|
|
)
|
|
|
|
|
(await publisher.mountRelay()).expect("Failed to mount relay on publisher")
|
|
|
|
|
await publisher.mountLibp2pPing()
|
|
|
|
|
await publisher.start()
|
|
|
|
|
publisher.subscribe((kind: PubsubSub, topic: shard), dummyHandler).expect(
|
|
|
|
|
"Failed to sub publisher"
|
|
|
|
|
)
|
2026-04-09 14:29:17 -03:00
|
|
|
|
2026-06-18 13:23:49 -03:00
|
|
|
await publisher.connectToNodes(@[storeNodePeerInfo])
|
|
|
|
|
|
|
|
|
|
var meshFormed = false
|
|
|
|
|
for _ in 0 ..< 50:
|
|
|
|
|
if publisher.wakuRelay.getNumPeersInMesh(shard).valueOr(0) > 0:
|
|
|
|
|
meshFormed = true
|
|
|
|
|
break
|
|
|
|
|
await sleepAsync(100.milliseconds)
|
|
|
|
|
if not meshFormed:
|
|
|
|
|
raiseAssert "publisher<->store relay mesh did not form in time"
|
|
|
|
|
|
|
|
|
|
# subscriber: created before the publish so the message timestamp lands after
|
|
|
|
|
# its RecvService startTimeToCheck watermark
|
2026-06-23 01:20:09 +02:00
|
|
|
var subscriber: LogosDelivery
|
2026-06-18 13:23:49 -03:00
|
|
|
lockNewGlobalBrokerContext:
|
2026-06-23 01:20:09 +02:00
|
|
|
subscriber = (await LogosDelivery.new(createApiNodeConf(numShards))).expect(
|
2026-06-18 13:23:49 -03:00
|
|
|
"Failed to create subscriber"
|
|
|
|
|
)
|
|
|
|
|
(await subscriber.start()).expect("Failed to start subscriber")
|
|
|
|
|
|
|
|
|
|
# publish while the subscriber is offline: the message reaches the archive but
|
|
|
|
|
# the subscriber never sees it via live relay
|
|
|
|
|
let missedPayload = "This message was missed".toBytes()
|
|
|
|
|
let missedMsg = WakuMessage(
|
|
|
|
|
payload: missedPayload, contentTopic: testTopic, version: 0, timestamp: now()
|
|
|
|
|
)
|
|
|
|
|
discard (await publisher.publish(some(shard), missedMsg)).expect(
|
|
|
|
|
"Publish missed msg failed"
|
|
|
|
|
)
|
2026-04-09 14:29:17 -03:00
|
|
|
|
2026-06-18 13:23:49 -03:00
|
|
|
block waitArchive:
|
2026-04-09 14:29:17 -03:00
|
|
|
for _ in 0 ..< 50:
|
2026-06-18 13:23:49 -03:00
|
|
|
let query = archive_common.ArchiveQuery(
|
|
|
|
|
includeData: false, contentTopics: @[testTopic], pubsubTopic: some(shard)
|
|
|
|
|
)
|
|
|
|
|
let res = await storeNode.wakuArchive.findMessages(query)
|
|
|
|
|
if res.isOk() and res.get().hashes.len > 0:
|
|
|
|
|
break waitArchive
|
2026-04-09 14:29:17 -03:00
|
|
|
await sleepAsync(100.milliseconds)
|
2026-06-18 13:23:49 -03:00
|
|
|
raiseAssert "Message was not archived in time"
|
2026-04-09 14:29:17 -03:00
|
|
|
|
2026-06-18 13:23:49 -03:00
|
|
|
# subscribe to the content topic; with no peers yet the subscriber stays offline
|
2026-06-25 09:27:01 +02:00
|
|
|
(await subscriber.messagingClient.subscribe(testTopic)).expect("Failed to subscribe")
|
2026-06-18 13:23:49 -03:00
|
|
|
|
|
|
|
|
return TestNetwork(
|
|
|
|
|
storeNode: storeNode,
|
|
|
|
|
publisher: publisher,
|
|
|
|
|
subscriber: subscriber,
|
|
|
|
|
storeNodePeerInfo: storeNodePeerInfo,
|
|
|
|
|
missedPayload: missedPayload,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
proc teardown(net: TestNetwork) {.async.} =
|
|
|
|
|
if not isNil(net.subscriber):
|
|
|
|
|
(await net.subscriber.stop()).expect("Failed to stop subscriber")
|
|
|
|
|
net.subscriber = nil
|
|
|
|
|
if not isNil(net.publisher):
|
|
|
|
|
await net.publisher.stop()
|
|
|
|
|
net.publisher = nil
|
|
|
|
|
if not isNil(net.storeNode):
|
|
|
|
|
await net.storeNode.stop()
|
|
|
|
|
net.storeNode = nil
|
|
|
|
|
|
|
|
|
|
suite "Messaging API, Receive Service (store recovery)":
|
|
|
|
|
asyncTest "recv_service delivers store-recovered messages via MessageReceivedEvent":
|
|
|
|
|
## Regression: a message archived before the subscriber connects is recovered
|
|
|
|
|
## by an explicit checkStore() and delivered via MessageReceivedEvent.
|
|
|
|
|
let net = await setupNetwork(ContentTopic("/waku/2/recv-test/proto"))
|
|
|
|
|
defer:
|
|
|
|
|
await net.teardown()
|
2026-04-09 14:29:17 -03:00
|
|
|
|
2026-06-23 01:20:09 +02:00
|
|
|
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
2026-04-09 14:29:17 -03:00
|
|
|
defer:
|
feat: persistency (#3880)
* persistency: per-job SQLite-backed storage layer (singleton, brokered)
Adds a backend-neutral CRUD library at waku/persistency/, plus the
nim-brokers dependency swap that enables it.
Architecture (ports-and-adapters):
* Persistency: process-wide singleton, one root directory.
* Job: one tenant, one DB file, one worker thread, one BrokerContext.
* Backend: SQLite via waku/common/databases/db_sqlite. Uniform schema
kv(category BLOB, key BLOB, payload BLOB) PRIMARY KEY (category, key)
WITHOUT ROWID, WAL mode.
* Writes are fire-and-forget via EventBroker(mt) PersistEvent.
* Reads are async via five RequestBroker(mt) shapes (KvGet, KvExists,
KvScan, KvCount, KvDelete). Reads return Result[T, PersistencyError].
* One storage thread per job; tenants isolated by BrokerContext.
Public surface (waku/persistency/persistency.nim):
Persistency.instance(rootDir) / Persistency.instance() / Persistency.reset()
p.openJob(id) / p.closeJob(id) / p.dropJob(id) / p.close()
p.job(id) / p[id] / p.hasJob(id)
Writes (Job form & string-id form, fire-and-forget):
persist / persistPut / persistDelete / persistEncoded
Reads (Job form & string-id form, async Result):
get / exists / scan / scanPrefix / count / deleteAcked
Key & payload encoding (keys.nim, payload.nim):
* encodePart family + variadic key(...) / payload(...) macros +
single-value toKey / toPayload.
* Primitives: string and openArray[byte] are 2-byte BE length + bytes;
int{8..64} are sign-flipped 8-byte BE; uint{16..64} are 8-byte BE;
bool/byte/char are 1 byte; enums are int64(ord(v)).
* Generic encodePart[T: tuple | object] recurses through fields() so
any composite Nim type is encodable without ceremony.
* Stable across Nim/C compiler upgrades: no sizeof, no memcpy, no
cast on pointers, no host-endianness dependency.
* `rawKey(bytes)` + `persistPut(..., openArray[byte])` let callers
bypass the built-in encoder with their own format (CBOR, protobuf...).
Lifecycle:
* Persistency.new is private; Persistency.instance is the only public
constructor. Same rootDir is idempotent; conflicting rootDir is
peInvalidArgument. Persistency.reset for test/restart paths.
* openJob opens-or-creates the per-job SQLite file; an existing file
is reused with its data preserved.
* Teardown integration: Persistency.instance registers a Teardown
MultiRequestBroker provider that closes all jobs and clears the
singleton slot when Waku.stop() issues Teardown.request.
Internal layering:
types.nim pure value types (Key, KeyRange, KvRow, TxOp,
PersistencyError)
keys.nim encodePart primitives + key(...) macro
payload.nim toPayload + payload(...) macro
schema.nim CREATE TABLE + connection pragmas + user_version
backend_sqlite.nim KvBackend, applyOps (single source of write SQL),
getOne/existsOne/deleteOne, scanRange (asc/desc,
half-open ranges, open-ended stop), countRange
backend_comm.nim EventBroker(mt) PersistEvent + 5 RequestBroker(mt)
declarations; encodeErr/decodeErr boundary helpers
backend_thread.nim startStorageThread / stopStorageThread (shared
allocShared0 arg, cstring dbPath, atomic
ready/shutdown flags); per-thread provider
registration
persistency.nim Persistency + Job types, singleton state, public
facade
../requests/lifecycle_requests.nim
Teardown MultiRequestBroker
Tests (69 cases, all passing):
test_keys.nim sort-order invariants (length-prefix strings,
sign-flipped ints, composite tuples, prefix
range)
test_backend.nim round-trip / replace / delete-return-value /
batched atomicity / asc-desc-half-open-open-
ended scans / category isolation / batch
txDelete
test_lifecycle.nim open-or-create rootDir / non-dir collision /
reopen across sessions / idempotent openJob /
two-tenant parallel isolation / closeJob joins
worker / dropJob removes file / acked delete
test_facade.nim put-then-get / atomic batch / scanPrefix
asc/desc / deleteAcked hit-miss /
fire-and-forget delete / two-tenant facade
isolation
test_encoding.nim tuple/named-tuple/object keys, embedded Key,
enum encoding, field-major composite sort,
payload struct encoding, end-to-end struct
round-trip through SQLite
test_string_lookup.nim peJobNotFound semantics / hasJob / subscript /
persistPut+get via id / reads short-circuit /
writes drop+warn / persistEncoded via id /
scan parity Job-ref vs id
test_singleton.nim idempotent same-rootDir / different-rootDir
rejection / no-arg instance lifecycle / reset
retargets / reset idempotence / Teardown.request
end-to-end
Prerequisite delivered in the same series: replace the in-tree broker
implementation with the external nim-brokers package; update all
broker call-sites (waku_filter_v2, waku_relay, waku_rln_relay,
delivery_service, peer_manager, requests/*, factory/*, api tests, etc.)
to the new package API; chat2 made to compile again.
Note: SDS adapter (Phase 5 of the design) is deferred -- nim-sds is
still developed side-by-side and the persistency layer is intentionally
SDS-agnostic.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* persistency: pin nim-brokers by URL+commit (workaround for stale registry)
The bare `brokers >= 2.0.1` form cannot resolve on machines where the
local nimble SAT solver enumerates only the registry-recorded 0.1.0 for
brokers. The nim-lang/packages entry for `brokers` carries no per-tag
metadata (only the URL), so until that registry entry is refreshed the
SAT solver clamps the available-versions list to 0.1.0 and rejects the
>= 2.0.1 constraint -- even though pkgs2 and pkgcache both have v2.0.1
cloned locally.
Pinning by URL+commit bypasses the registry path entirely. Inline
comment in waku.nimble documents the situation and the path back to
the bare form once nim-lang/packages is updated.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* persistency: nph format pass
Run `nph` on all 57 Nim files touched by this PR. Pure formatting:
17 files re-styled, no semantic change. Suite still 69/69.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* Fix build, add local-storage-path config, lazy init of Persistency from Waku start
* fix: fix nix deps
* fixes for nix build, regenerate deps
* reverting accidental dependency changes
* Fixing deps
* Apply suggestions from code review
Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
* persistency tests: migrate to suite / asyncTest / await
Match the in-tree test convention (procSuite -> suite, sync test +
waitFor -> asyncTest + await):
- procSuite "X": -> suite "X":
- For tests doing async work: test -> asyncTest, waitFor -> await.
- Poll helpers (proc waitFor(t: Job, ...) in test_lifecycle.nim,
proc waitUntilExists(...) in test_facade.nim and
test_string_lookup.nim) -> Future[bool] {.async.}, internal
`waitFor X` -> `await X`, internal `sleep(N)` ->
`await sleepAsync(chronos.milliseconds(N))`.
- Renamed test_lifecycle.nim's helper proc from `waitFor(t: Job, ...)`
-> `pollExists(t: Job, ...)`; the previous name shadowed
chronos.waitFor in the chronos macro expansion.
- `chronos.milliseconds(N)` explicitly qualified because `std/times`
also exports `milliseconds` (returning TimeInterval, not Duration).
- `check await x` -> `let okN = await x; check okN` to dodge chronos's
"yield in expr not lowered" with await-as-macro-argument.
- `(await x).foo()` -> `let awN = await x; ... awN.foo() ...` for the
same reason.
waku/persistency/persistency.nim: nph also pulled the proc signatures
across multiple lines; restored explicit `Future[void] {.async.}`
return types after the colon (an intermediate nph pass had elided them).
Suite: 71 / 71 OK against the new async write surface.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
* use idiomatic valueOr instead of ifs
* Reworked persistency shutdown, remove not necessary teardown mechanism
* Use const for DefaultStoragePath
* format to follow coding guidelines - no use of result and explicit returns - no functional change
---------
Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
2026-05-16 00:09:07 +02:00
|
|
|
await eventManager.teardown()
|
2026-04-09 14:29:17 -03:00
|
|
|
|
2026-06-23 01:20:09 +02:00
|
|
|
await net.subscriber.waku.node.connectToNodes(@[net.storeNodePeerInfo])
|
2026-06-18 13:23:49 -03:00
|
|
|
await net.subscriber.messagingClient.recvService.checkStore()
|
2026-04-09 14:29:17 -03:00
|
|
|
|
2026-06-18 13:23:49 -03:00
|
|
|
check await eventManager.waitForEvents(TestTimeout)
|
2026-04-09 14:29:17 -03:00
|
|
|
check eventManager.receivedMessages.len == 1
|
|
|
|
|
if eventManager.receivedMessages.len > 0:
|
2026-06-18 13:23:49 -03:00
|
|
|
check eventManager.receivedMessages[0].payload == net.missedPayload
|
2026-04-09 14:29:17 -03:00
|
|
|
|
2026-06-18 13:23:49 -03:00
|
|
|
asyncTest "recv_service backfills missed messages when it comes back online":
|
|
|
|
|
## Connecting a peer brings the subscriber online, firing the backfill that
|
|
|
|
|
## recovers a message archived while it was offline.
|
|
|
|
|
let net = await setupNetwork(ContentTopic("/waku/2/recv-reconnect-test/proto"))
|
|
|
|
|
defer:
|
|
|
|
|
await net.teardown()
|
|
|
|
|
|
2026-06-23 01:20:09 +02:00
|
|
|
let eventManager = newReceiveEventListenerManager(net.subscriber.waku.brokerCtx, 1)
|
2026-06-18 13:23:49 -03:00
|
|
|
defer:
|
|
|
|
|
await eventManager.teardown()
|
|
|
|
|
|
|
|
|
|
# sync on coming online (the transition that fires the backfill) before asserting
|
|
|
|
|
let onlineFut = waitForConnectionStatus(
|
2026-06-23 01:20:09 +02:00
|
|
|
net.subscriber.waku.brokerCtx, ConnectionStatus.PartiallyConnected
|
2026-06-18 13:23:49 -03:00
|
|
|
)
|
2026-06-23 01:20:09 +02:00
|
|
|
await net.subscriber.waku.node.connectToNodes(@[net.storeNodePeerInfo])
|
2026-06-18 13:23:49 -03:00
|
|
|
await onlineFut
|
|
|
|
|
|
|
|
|
|
check await eventManager.waitForEvents(TestTimeout)
|
|
|
|
|
check eventManager.receivedMessages.len == 1
|
|
|
|
|
if eventManager.receivedMessages.len > 0:
|
|
|
|
|
check eventManager.receivedMessages[0].payload == net.missedPayload
|