mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-06-05 13:39:59 +00:00
Merge remote-tracking branch 'origin/master' into feat/layered-config
# Conflicts: # tests/api/test_api_subscription.nim # tests/node/test_wakunode_health_monitor.nim # waku/api/api.nim # waku/node/subscription_manager.nim # waku/node/waku_node.nim
This commit is contained in:
commit
f16317b7cd
21
.github/workflows/ci.yml
vendored
21
.github/workflows/ci.yml
vendored
@ -89,12 +89,15 @@ jobs:
|
||||
path: |
|
||||
nimbledeps/
|
||||
nimble.paths
|
||||
key: ${{ runner.os }}-nimbledeps-nimble${{ env.NIMBLE_VERSION }}-${{ hashFiles('nimble.lock', 'BearSSL.mk', 'Nat.mk') }}
|
||||
key: ${{ runner.os }}-nimbledeps-v2-nimble${{ env.NIMBLE_VERSION }}-${{ hashFiles('nimble.lock', 'BearSSL.mk', 'Nat.mk') }}
|
||||
|
||||
- name: Install nimble deps
|
||||
if: steps.cache-nimbledeps.outputs.cache-hit != 'true'
|
||||
run: |
|
||||
nimble setup --localdeps -y
|
||||
# nim's source tree checksums differently across platforms, so its
|
||||
# locked checksum is unreliable. --useSystemNim uses the CI nim and
|
||||
# skips that check, while still verifying every other locked dep.
|
||||
nimble setup --localdeps -y --useSystemNim
|
||||
make rebuild-nat-libs-nimbledeps
|
||||
make rebuild-bearssl-nimbledeps
|
||||
touch nimbledeps/.nimble-setup
|
||||
@ -142,12 +145,15 @@ jobs:
|
||||
path: |
|
||||
nimbledeps/
|
||||
nimble.paths
|
||||
key: ${{ runner.os }}-nimbledeps-nimble${{ env.NIMBLE_VERSION }}-${{ hashFiles('nimble.lock', 'BearSSL.mk', 'Nat.mk') }}
|
||||
key: ${{ runner.os }}-nimbledeps-v2-nimble${{ env.NIMBLE_VERSION }}-${{ hashFiles('nimble.lock', 'BearSSL.mk', 'Nat.mk') }}
|
||||
|
||||
- name: Install nimble deps
|
||||
if: steps.cache-nimbledeps.outputs.cache-hit != 'true'
|
||||
run: |
|
||||
nimble setup --localdeps -y
|
||||
# nim's source tree checksums differently across platforms, so its
|
||||
# locked checksum is unreliable. --useSystemNim uses the CI nim and
|
||||
# skips that check, while still verifying every other locked dep.
|
||||
nimble setup --localdeps -y --useSystemNim
|
||||
make rebuild-nat-libs-nimbledeps
|
||||
make rebuild-bearssl-nimbledeps
|
||||
touch nimbledeps/.nimble-setup
|
||||
@ -207,12 +213,15 @@ jobs:
|
||||
path: |
|
||||
nimbledeps/
|
||||
nimble.paths
|
||||
key: ${{ runner.os }}-nimbledeps-nimble${{ env.NIMBLE_VERSION }}-${{ hashFiles('nimble.lock', 'BearSSL.mk', 'Nat.mk') }}
|
||||
key: ${{ runner.os }}-nimbledeps-v2-nimble${{ env.NIMBLE_VERSION }}-${{ hashFiles('nimble.lock', 'BearSSL.mk', 'Nat.mk') }}
|
||||
|
||||
- name: Install nimble deps
|
||||
if: steps.cache-nimbledeps.outputs.cache-hit != 'true'
|
||||
run: |
|
||||
nimble setup --localdeps -y
|
||||
# nim's source tree checksums differently across platforms, so its
|
||||
# locked checksum is unreliable. --useSystemNim uses the CI nim and
|
||||
# skips that check, while still verifying every other locked dep.
|
||||
nimble setup --localdeps -y --useSystemNim
|
||||
make rebuild-nat-libs-nimbledeps
|
||||
make rebuild-bearssl-nimbledeps
|
||||
touch nimbledeps/.nimble-setup
|
||||
|
||||
4
.github/workflows/container-image.yml
vendored
4
.github/workflows/container-image.yml
vendored
@ -15,6 +15,10 @@ env:
|
||||
NPROC: 2
|
||||
MAKEFLAGS: "-j${NPROC}"
|
||||
NIMFLAGS: "--parallelBuild:${NPROC}"
|
||||
# waku.nimble reads compile flags from NIM_PARAMS, not NIMFLAGS. Without
|
||||
# -d:disableMarchNative here, config.nims applies -march=native and
|
||||
# secp256k1 fails to compile.
|
||||
NIM_PARAMS: "-d:disableMarchNative"
|
||||
NIM_VERSION: '2.2.4'
|
||||
NIMBLE_VERSION: '0.22.3'
|
||||
|
||||
|
||||
33
.github/workflows/windows-build.yml
vendored
33
.github/workflows/windows-build.yml
vendored
@ -24,6 +24,16 @@ jobs:
|
||||
MSYSTEM: MINGW64
|
||||
|
||||
steps:
|
||||
- name: Configure Git to keep LF line endings
|
||||
# Windows Git defaults to core.autocrlf=true. The LF→CRLF conversion
|
||||
# changes the SHA1 of nimble's cloned deps, so they no longer match
|
||||
# nimble.lock and nimble re-downloads them on every run (hanging the
|
||||
# job). Disable autocrlf so clones match the Linux-computed checksums.
|
||||
shell: pwsh
|
||||
run: |
|
||||
git config --global core.autocrlf false
|
||||
git config --global core.eol lf
|
||||
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
@ -52,6 +62,14 @@ jobs:
|
||||
mingw-w64-x86_64-clang
|
||||
mingw-w64-x86_64-nasm
|
||||
|
||||
- name: Configure Git in MSYS2 to keep LF line endings
|
||||
# The step above only configures Git for Windows. nimble clones its deps
|
||||
# from the MSYS2 shell, whose git reads a separate global config, so the
|
||||
# same setting must be repeated here.
|
||||
run: |
|
||||
git config --global core.autocrlf false
|
||||
git config --global core.eol lf
|
||||
|
||||
- name: Manually install nasm
|
||||
run: |
|
||||
bash scripts/install_nasm_in_windows.sh
|
||||
@ -80,19 +98,16 @@ jobs:
|
||||
cd /tmp && nimble install "nimble@${{ env.NIMBLE_VERSION }}" -y
|
||||
echo "$HOME/.nimble/bin" >> $GITHUB_PATH
|
||||
|
||||
- name: Patch nimble.lock for Windows nim checksum
|
||||
# nimble.exe uses Windows Git (core.autocrlf=true by default), which converts LF→CRLF
|
||||
# on checkout. This changes the SHA1 of the nim package source tree relative to the
|
||||
# Linux-computed checksum stored in nimble.lock. Patch the lock file with the
|
||||
# Windows-computed checksum before nimble reads it.
|
||||
run: |
|
||||
sed -i 's/68bb85cbfb1832ce4db43943911b046c3af3caab/a092a045d3a427d127a5334a6e59c76faff54686/g' nimble.lock
|
||||
|
||||
- name: Install nimble deps
|
||||
if: steps.cache-nimbledeps.outputs.cache-hit != 'true'
|
||||
run: |
|
||||
export PATH="$GITHUB_WORKSPACE/.nim_runtime/bin:$HOME/.nimble/bin:$PATH"
|
||||
nimble setup --localdeps -y
|
||||
# nim's source tree checks out differently per platform (its own
|
||||
# .gitattributes forces line endings), so its locked checksum never
|
||||
# matches on Windows — even with autocrlf disabled. --useSystemNim
|
||||
# uses the CI-installed nim and skips that check, while still
|
||||
# verifying every other locked dependency.
|
||||
nimble setup --localdeps -y --useSystemNim
|
||||
make rebuild-nat-libs-nimbledeps CC=gcc
|
||||
make rebuild-bearssl-nimbledeps CC=gcc
|
||||
touch nimbledeps/.nimble-setup
|
||||
|
||||
60
Makefile
60
Makefile
@ -24,7 +24,9 @@ export PATH := $(HOME)/.nimble/bin:$(PATH)
|
||||
# NIM binary location
|
||||
NIM_BINARY := $(shell which nim 2>/dev/null)
|
||||
NPH := $(HOME)/.nimble/bin/nph
|
||||
NIMBLE := $(HOME)/.nimble/bin/nimble
|
||||
# Resolve nimble via PATH (Windows has no $(HOME)/.nimble/bin); --useSystemNim
|
||||
# reuses the nim on PATH so nimble never re-clones the locked nim.
|
||||
NIMBLE := nimble --useSystemNim
|
||||
NIMBLEDEPS_STAMP := nimbledeps/.nimble-setup
|
||||
|
||||
# Compilation parameters
|
||||
@ -204,7 +206,7 @@ clean: | clean-librln
|
||||
|
||||
testcommon: | build-deps build
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble testcommon
|
||||
$(NIMBLE) testcommon
|
||||
|
||||
##########
|
||||
## Waku ##
|
||||
@ -213,47 +215,54 @@ testcommon: | build-deps build
|
||||
|
||||
testwaku: | build-deps build rln-deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble test
|
||||
$(NIMBLE) test
|
||||
|
||||
# Windows: build with nim directly — `nimble <task>` re-clones git deps every
|
||||
# build and they intermittently hang on the MSYS2 runner. Flags mirror waku.nimble.
|
||||
wakunode2: | build-deps build deps librln
|
||||
ifeq ($(detected_OS),Windows)
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble wakunode2
|
||||
nim c --out:build/wakunode2 --mm:refc --cpu:amd64 $(NIM_PARAMS) -d:chronicles_log_level=TRACE apps/wakunode2/wakunode2.nim
|
||||
else
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
$(NIMBLE) wakunode2
|
||||
endif
|
||||
|
||||
benchmarks: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble benchmarks
|
||||
$(NIMBLE) benchmarks
|
||||
|
||||
testwakunode2: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble testwakunode2
|
||||
$(NIMBLE) testwakunode2
|
||||
|
||||
example2: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble example2
|
||||
$(NIMBLE) example2
|
||||
|
||||
chat2: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble chat2
|
||||
$(NIMBLE) chat2
|
||||
|
||||
chat2mix: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble chat2mix
|
||||
$(NIMBLE) chat2mix
|
||||
|
||||
rln-db-inspector: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble rln_db_inspector
|
||||
$(NIMBLE) rln_db_inspector
|
||||
|
||||
chat2bridge: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble chat2bridge
|
||||
$(NIMBLE) chat2bridge
|
||||
|
||||
liteprotocoltester: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble liteprotocoltester
|
||||
$(NIMBLE) liteprotocoltester
|
||||
|
||||
lightpushwithmix: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble lightpushwithmix
|
||||
$(NIMBLE) lightpushwithmix
|
||||
|
||||
api_example: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
@ -261,12 +270,12 @@ api_example: | build-deps build deps librln
|
||||
|
||||
build/%: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$*" && \
|
||||
nimble buildone $*
|
||||
$(NIMBLE) buildone $*
|
||||
|
||||
compile-test: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "$(TEST_FILE)" "\"$(TEST_NAME)\"" && \
|
||||
nimble buildTest $(TEST_FILE) && \
|
||||
nimble execTest $(TEST_FILE) "\"$(TEST_NAME)\""
|
||||
$(NIMBLE) buildTest $(TEST_FILE) && \
|
||||
$(NIMBLE) execTest $(TEST_FILE) "\"$(TEST_NAME)\""
|
||||
|
||||
################
|
||||
## Waku tools ##
|
||||
@ -277,11 +286,11 @@ tools: networkmonitor wakucanary
|
||||
|
||||
wakucanary: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble wakucanary
|
||||
$(NIMBLE) wakucanary
|
||||
|
||||
networkmonitor: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble networkmonitor
|
||||
$(NIMBLE) networkmonitor
|
||||
|
||||
############
|
||||
## Format ##
|
||||
@ -327,7 +336,7 @@ clean:
|
||||
|
||||
docs: | build deps
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble doc --run --index:on --project --out:.gh-pages waku/waku.nim waku.nims
|
||||
$(NIMBLE) doc --run --index:on --project --out:.gh-pages waku/waku.nim waku.nims
|
||||
|
||||
coverage:
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
@ -423,11 +432,16 @@ else ifeq ($(detected_OS),Linux)
|
||||
BUILD_COMMAND := $(BUILD_COMMAND)Linux
|
||||
endif
|
||||
|
||||
# Windows: build with nim directly (see wakunode2). Flags mirror waku.nimble.
|
||||
libwaku: | build-deps librln
|
||||
nimble --verbose libwaku$(BUILD_COMMAND) waku.nimble
|
||||
ifeq ($(detected_OS),Windows)
|
||||
nim c --out:build/libwaku.dll --threads:on --app:lib --opt:speed --noMain --mm:refc --header -d:metrics --nimMainPrefix:libwaku --skipParentCfg:off -d:discv5_protocol_id=d5waku --cpu:amd64 $(NIM_PARAMS) library/libwaku.nim
|
||||
else
|
||||
$(NIMBLE) --verbose libwaku$(BUILD_COMMAND) waku.nimble
|
||||
endif
|
||||
|
||||
liblogosdelivery: | build-deps librln
|
||||
nimble --verbose liblogosdelivery$(BUILD_COMMAND) waku.nimble
|
||||
$(NIMBLE) --verbose liblogosdelivery$(BUILD_COMMAND) waku.nimble
|
||||
|
||||
logosdelivery_example: | build liblogosdelivery
|
||||
@echo -e $(BUILD_MSG) "build/$@"
|
||||
@ -502,7 +516,7 @@ endif
|
||||
build-libwaku-for-android-arch:
|
||||
ifneq ($(findstring /nix/store,$(LIBRLN_FILE)),)
|
||||
mkdir -p $(CURDIR)/build/android/$(ABIDIR)/
|
||||
CPU=$(CPU) ABIDIR=$(ABIDIR) ANDROID_ARCH=$(ANDROID_ARCH) ANDROID_COMPILER=$(ANDROID_COMPILER) ANDROID_TOOLCHAIN_DIR=$(ANDROID_TOOLCHAIN_DIR) nimble libWakuAndroid
|
||||
CPU=$(CPU) ABIDIR=$(ABIDIR) ANDROID_ARCH=$(ANDROID_ARCH) ANDROID_COMPILER=$(ANDROID_COMPILER) ANDROID_TOOLCHAIN_DIR=$(ANDROID_TOOLCHAIN_DIR) $(NIMBLE) libWakuAndroid
|
||||
else
|
||||
./scripts/build_rln_android.sh $(CURDIR)/build $(LIBRLN_BUILDDIR) $(LIBRLN_VERSION) $(CROSS_TARGET) $(ABIDIR)
|
||||
endif
|
||||
@ -559,7 +573,7 @@ else
|
||||
endif
|
||||
|
||||
build-libwaku-for-ios-arch:
|
||||
IOS_SDK=$(IOS_SDK) IOS_ARCH=$(IOS_ARCH) IOS_SDK_PATH=$(IOS_SDK_PATH) nimble libWakuIOS
|
||||
IOS_SDK=$(IOS_SDK) IOS_ARCH=$(IOS_ARCH) IOS_SDK_PATH=$(IOS_SDK_PATH) $(NIMBLE) libWakuIOS
|
||||
|
||||
libwaku-ios-device: IOS_ARCH=arm64
|
||||
libwaku-ios-device: IOS_SDK=iphoneos
|
||||
|
||||
@ -13,6 +13,7 @@ import
|
||||
import
|
||||
./certsgenerator,
|
||||
waku/[waku_enr, node/peer_manager, waku_core, waku_node, factory/builder],
|
||||
waku/net/net_config,
|
||||
waku/waku_metadata/protocol,
|
||||
waku/common/callbacks
|
||||
|
||||
|
||||
@ -13,7 +13,7 @@
|
||||
##
|
||||
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
|
||||
|
||||
import std/[options, sets, tables]
|
||||
import std/[options, tables]
|
||||
import results
|
||||
import chronos
|
||||
import bearssl/rand
|
||||
@ -55,32 +55,35 @@ type
|
||||
Persistent
|
||||
Ephemeral
|
||||
|
||||
SegmentSendState {.pure.} = enum
|
||||
## Lifecycle of a single segment as tracked by the channel. The
|
||||
## messaging layer has its own richer `DeliveryState` (retries,
|
||||
## propagated-vs-validated); here we only model what's needed to
|
||||
## decide when a `channelReqId` is fully accounted for.
|
||||
AwaitingRateLimit ## Pushed by `send`; not yet released by rate_limit_manager.
|
||||
InFlight
|
||||
## Released by rate_limit_manager and handed to delivery_service;
|
||||
## `messagingReqId` is now set.
|
||||
Confirmed ## `MessageSentEvent` arrived for `messagingReqId`.
|
||||
Failed
|
||||
## `MessageErrorEvent` arrived for `messagingReqId`, or the local
|
||||
## delivery-task construction failed before any id was reachable.
|
||||
|
||||
PendingMessagingRequest = object
|
||||
## One entry per segment (i.e. per messaging-layer request). The
|
||||
## relative order of `AwaitingRateLimit` entries must match the
|
||||
## order in which `rate_limit_manager` re-emits messages, which is
|
||||
## FIFO with `send()`.
|
||||
channelReqId*: RequestId
|
||||
## The channel-layer parent id returned to the caller of `send()` in channel layer.
|
||||
## One channel request maps to N pending messaging requests.
|
||||
messagingReqId*: Option[RequestId]
|
||||
## Per-segment messaging layer id. `none` until `onReadyToSend` assigns it.
|
||||
ChannelReqState = object
|
||||
## Per channel-level request, tracks how many of its segments are
|
||||
## still queued, in flight, or have terminated. The channel-level
|
||||
## final event fires when `confirmedCount + failedCount` reaches
|
||||
## `totalExpectedSegments` AND no segments are still awaiting dispatch
|
||||
## or in flight.
|
||||
persistenceReqType: MessagePersistence
|
||||
segmentSendState*: SegmentSendState
|
||||
totalExpectedSegments: int
|
||||
## Total segments produced by `segmentation.performSegmentation`
|
||||
## for this `channelReqId`. Set once in `send`, never mutated.
|
||||
awaitingDispatch: int
|
||||
## Segments enqueued in `rate_limit_manager` but not yet claimed
|
||||
## by `onReadyToSend`. Decremented when `onReadyToSend` picks a
|
||||
## message and assigns it to this `channelReqId`.
|
||||
inflightMessagingIds: seq[RequestId]
|
||||
## Messaging-layer ids minted by the send handler that have not
|
||||
## yet produced a final event. Removed on `MessageSentEvent` / `MessageErrorEvent`.
|
||||
confirmedCount: int
|
||||
failedCount: int
|
||||
|
||||
ChannelReqs = OrderedTable[RequestId, ChannelReqState]
|
||||
## Key: channelReqId (the parent id returned by channel `send`). Value:
|
||||
## per-request state, see `ChannelReqState`.
|
||||
##
|
||||
## `OrderedTable` preserves insertion order, which matches the FIFO
|
||||
## order `rate_limit_manager` re-emits messages in: `onReadyToSend`
|
||||
## routes each segment to the first entry with `awaitingDispatch > 0`,
|
||||
## and that scan is correct precisely because the outer iteration
|
||||
## order matches the order `send` pushed entries.
|
||||
|
||||
ReliableChannel* = ref object
|
||||
## Spec-defined public type. Fields are private so callers cannot
|
||||
@ -95,13 +98,23 @@ type
|
||||
sdsHandler: SdsHandler
|
||||
rateLimit: RateLimitManager
|
||||
|
||||
requestIds: Table[RequestId, seq[RequestId]]
|
||||
pendingMessagingRequests: seq[PendingMessagingRequest]
|
||||
## Entries are kept until the matching segment reaches a final
|
||||
## state (`Confirmed` or `Failed`); a whole channel request is
|
||||
## then pruned in one pass once all its segments are final.
|
||||
channelReqs: ChannelReqs
|
||||
brokerCtx: BrokerContext
|
||||
|
||||
func init(
|
||||
T: type ChannelReqState,
|
||||
persistenceReqType: MessagePersistence,
|
||||
totalExpectedSegments: int,
|
||||
): T =
|
||||
return ChannelReqState(
|
||||
persistenceReqType: persistenceReqType,
|
||||
totalExpectedSegments: totalExpectedSegments,
|
||||
awaitingDispatch: totalExpectedSegments,
|
||||
inflightMessagingIds: @[],
|
||||
confirmedCount: 0,
|
||||
failedCount: 0,
|
||||
)
|
||||
|
||||
func getChannelId*(self: ReliableChannel): ChannelId {.inline.} =
|
||||
self.channelId
|
||||
|
||||
@ -111,70 +124,92 @@ func getContentTopic*(self: ReliableChannel): ContentTopic {.inline.} =
|
||||
func getSenderId*(self: ReliableChannel): SdsParticipantID {.inline.} =
|
||||
self.senderId
|
||||
|
||||
func isFinal(state: SegmentSendState): bool {.inline.} =
|
||||
return state in {SegmentSendState.Confirmed, SegmentSendState.Failed}
|
||||
proc tryFinalizeChannelReq(self: ReliableChannel, channelReqId: RequestId) =
|
||||
## Tries to finalize the channel-level request identified by `channelReqId` if
|
||||
## certain conditions are met, i.e., no segments are still awaiting dispatch or in flight,
|
||||
## and the total number of confirmed + failed segments equals the total expected segments.
|
||||
## Therefore, the channel-level request is removed from `self.channelReqs`
|
||||
## and the appropriate final event is emitted.
|
||||
##
|
||||
let state = self.channelReqs.getOrDefault(channelReqId)
|
||||
if state.totalExpectedSegments == 0:
|
||||
## Either already finalized (and removed) or never inserted.
|
||||
return
|
||||
if state.awaitingDispatch != 0 or state.inflightMessagingIds.len != 0:
|
||||
return
|
||||
if state.confirmedCount + state.failedCount < state.totalExpectedSegments:
|
||||
return
|
||||
|
||||
proc pruneCompletedChannelReqs(self: ReliableChannel) =
|
||||
## Drop every `pendingMessagingRequests` entry whose `channelReqId`
|
||||
## has all of its segments in a final state. A single failing
|
||||
## segment doesn't trigger a drop on its own — we wait until siblings
|
||||
## are also accounted for, so the channel-level outcome is decided
|
||||
## from a complete picture. For each fully-final `channelReqId`, emit
|
||||
## the channel-level final event before the entries are dropped:
|
||||
## `ChannelMessageSentEvent` if every sibling Confirmed,
|
||||
## `ChannelMessageErrorEvent` if any sibling Failed.
|
||||
var hasPending = initHashSet[RequestId]()
|
||||
var anyFailed = initHashSet[RequestId]()
|
||||
for entry in self.pendingMessagingRequests:
|
||||
if not entry.segmentSendState.isFinal():
|
||||
hasPending.incl(entry.channelReqId)
|
||||
elif entry.segmentSendState == SegmentSendState.Failed:
|
||||
anyFailed.incl(entry.channelReqId)
|
||||
self.channelReqs.del(channelReqId)
|
||||
|
||||
var emitted = initHashSet[RequestId]()
|
||||
for entry in self.pendingMessagingRequests:
|
||||
if entry.channelReqId in hasPending or entry.channelReqId in emitted:
|
||||
if state.failedCount > 0:
|
||||
ChannelMessageErrorEvent.emit(
|
||||
self.brokerCtx,
|
||||
ChannelMessageErrorEvent(
|
||||
channelId: self.channelId,
|
||||
requestId: channelReqId,
|
||||
error: "one or more segments failed",
|
||||
),
|
||||
)
|
||||
else:
|
||||
ChannelMessageSentEvent.emit(
|
||||
self.brokerCtx,
|
||||
ChannelMessageSentEvent(channelId: self.channelId, requestId: channelReqId),
|
||||
)
|
||||
|
||||
type ClaimedSegment = object
|
||||
channelReqId: RequestId
|
||||
isEphemeral: bool
|
||||
|
||||
proc claimAwaitingChannelReq(self: ReliableChannel): Option[ClaimedSegment] =
|
||||
for channelReqId, state in self.channelReqs.mpairs:
|
||||
if state.awaitingDispatch > 0:
|
||||
state.awaitingDispatch.dec()
|
||||
return some(
|
||||
ClaimedSegment(
|
||||
channelReqId: channelReqId,
|
||||
isEphemeral: state.persistenceReqType == MessagePersistence.Ephemeral,
|
||||
)
|
||||
)
|
||||
return none(ClaimedSegment)
|
||||
|
||||
type MessagingOutcome {.pure.} = enum
|
||||
Sent
|
||||
Failed
|
||||
|
||||
proc onMessageFinal(
|
||||
self: ReliableChannel, messagingReqId: RequestId, outcome: MessagingOutcome
|
||||
) =
|
||||
for channelReqId, state in self.channelReqs.mpairs:
|
||||
let idx = state.inflightMessagingIds.find(messagingReqId)
|
||||
if idx < 0:
|
||||
continue
|
||||
emitted.incl(entry.channelReqId)
|
||||
if entry.channelReqId in anyFailed:
|
||||
ChannelMessageErrorEvent.emit(
|
||||
self.brokerCtx,
|
||||
ChannelMessageErrorEvent(
|
||||
channelId: self.channelId,
|
||||
requestId: entry.channelReqId,
|
||||
error: "one or more segments failed",
|
||||
),
|
||||
)
|
||||
else:
|
||||
ChannelMessageSentEvent.emit(
|
||||
self.brokerCtx,
|
||||
ChannelMessageSentEvent(
|
||||
channelId: self.channelId, requestId: entry.channelReqId
|
||||
),
|
||||
)
|
||||
state.inflightMessagingIds.del(idx)
|
||||
case outcome
|
||||
of MessagingOutcome.Sent:
|
||||
state.confirmedCount.inc()
|
||||
of MessagingOutcome.Failed:
|
||||
state.failedCount.inc()
|
||||
self.tryFinalizeChannelReq(channelReqId)
|
||||
return
|
||||
|
||||
self.pendingMessagingRequests.keepItIf(it.channelReqId in hasPending)
|
||||
proc markSegmentFailed(self: ReliableChannel, channelReqId: RequestId) =
|
||||
try:
|
||||
self.channelReqs[channelReqId].failedCount.inc()
|
||||
except KeyError as e:
|
||||
error "unreachable: channelReqId not found in markSegmentFailed",
|
||||
channelReqId = $channelReqId, error = e.msg
|
||||
return
|
||||
self.tryFinalizeChannelReq(channelReqId)
|
||||
|
||||
proc onMessageSent(self: ReliableChannel, messagingReqId: RequestId) =
|
||||
## Invoked from this channel's `MessageSentEvent` listener. Flips
|
||||
## the matching `InFlight` segment to `Confirmed` and prunes. The
|
||||
## listener routes every event through here; entries that don't
|
||||
## belong to this channel simply don't match and are no-ops.
|
||||
self.pendingMessagingRequests.applyItIf(
|
||||
it.segmentSendState == SegmentSendState.InFlight and
|
||||
it.messagingReqId == some(messagingReqId)
|
||||
):
|
||||
it.segmentSendState = SegmentSendState.Confirmed
|
||||
self.pruneCompletedChannelReqs()
|
||||
|
||||
proc onMessageError(self: ReliableChannel, messagingReqId: RequestId) =
|
||||
## Symmetric to `onMessageSent` but for `MessageErrorEvent`.
|
||||
self.pendingMessagingRequests.applyItIf(
|
||||
it.segmentSendState == SegmentSendState.InFlight and
|
||||
it.messagingReqId == some(messagingReqId)
|
||||
):
|
||||
it.segmentSendState = SegmentSendState.Failed
|
||||
self.pruneCompletedChannelReqs()
|
||||
proc markSegmentInflight(
|
||||
self: ReliableChannel, channelReqId: RequestId, messagingReqId: RequestId
|
||||
) =
|
||||
try:
|
||||
self.channelReqs[channelReqId].inflightMessagingIds.add(messagingReqId)
|
||||
except KeyError as e:
|
||||
error "unreachable: channelReqId not found in markSegmentInflight",
|
||||
channelReqId = $channelReqId, error = e.msg
|
||||
|
||||
proc onReadyToSend(
|
||||
self: ReliableChannel, readyToSendEvent: ReadyToSendEvent
|
||||
@ -184,30 +219,22 @@ proc onReadyToSend(
|
||||
## blobs (already-encoded SDS messages):
|
||||
##
|
||||
## ... -> rate_limit_manager -> [encryption] -> dispatch
|
||||
var idx = 0
|
||||
##
|
||||
## For each `m`, the next channelReqId still queued in rate-limit
|
||||
## claims the slot (FIFO across sibling sends). The channelReqId is
|
||||
## captured up front and used as a stable key for every later state
|
||||
## update — no positional index is ever held across an `await`, so
|
||||
## sibling events mutating other entries (or even this one's
|
||||
## `inflightMessagingIds`) cannot corrupt this fiber's view.
|
||||
for m in readyToSendEvent.msgs:
|
||||
## The first `AwaitingRateLimit` entry in push order is the one
|
||||
## this `m` belongs to: `send()` adds one entry per segment, and
|
||||
## `rate_limit_manager` re-emits them in the same FIFO order, so
|
||||
## the two sequences advance in lockstep. Earlier entries may
|
||||
## already be `InFlight` / `Confirmed` / `Failed` because they
|
||||
## live on until every sibling of their `channelReqId` is final,
|
||||
## so we walk past those to find the next one that was awaiting for this batch.
|
||||
while idx < self.pendingMessagingRequests.len and
|
||||
self.pendingMessagingRequests[idx].segmentSendState !=
|
||||
SegmentSendState.AwaitingRateLimit
|
||||
:
|
||||
idx.inc()
|
||||
if idx >= self.pendingMessagingRequests.len:
|
||||
let claimed = self.claimAwaitingChannelReq().valueOr:
|
||||
## rate_limit_manager emitted more messages than we have pending —
|
||||
## should not happen given `send` pushes one entry per enqueued
|
||||
## SDS payload. Drop silently rather than corrupt state.
|
||||
## should not happen given `send` increments `awaitingDispatch`
|
||||
## once per enqueued SDS payload. Drop silently rather than
|
||||
## corrupt state.
|
||||
break
|
||||
|
||||
let channelReqId = self.pendingMessagingRequests[idx].channelReqId
|
||||
let isEphemeral =
|
||||
self.pendingMessagingRequests[idx].persistenceReqType ==
|
||||
MessagePersistence.Ephemeral
|
||||
let channelReqId = claimed.channelReqId
|
||||
let isEphemeral = claimed.isEphemeral
|
||||
|
||||
## TODO: revisit which fields of the SDS message must be encrypted.
|
||||
## Encrypting the whole encoded blob forces every receiver to attempt
|
||||
@ -223,15 +250,7 @@ proc onReadyToSend(
|
||||
),
|
||||
)
|
||||
## Encryption failed *before* we could hand the segment to the
|
||||
## delivery layer — no `messagingReqId` was minted and no
|
||||
## `DeliveryTask` was queued on `sendService`. The delivery
|
||||
## layer will therefore never emit a `MessageSentEvent` /
|
||||
## `MessageErrorEvent` for this segment, so `onMessageError`
|
||||
## won't fire either. Advance the state machine inline so the
|
||||
## parent `channelReqId` can still be pruned once its siblings
|
||||
## are also final.
|
||||
self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.Failed
|
||||
idx.inc()
|
||||
self.markSegmentFailed(channelReqId)
|
||||
continue
|
||||
let wireBytes = seq[byte](encrypted)
|
||||
|
||||
@ -261,16 +280,10 @@ proc onReadyToSend(
|
||||
requestId: channelReqId, messageHash: "", error: "messaging send failed: " & error
|
||||
),
|
||||
)
|
||||
self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.Failed
|
||||
idx.inc()
|
||||
self.markSegmentFailed(channelReqId)
|
||||
continue
|
||||
|
||||
self.pendingMessagingRequests[idx].messagingReqId = some(messagingReqId)
|
||||
self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.InFlight
|
||||
self.requestIds.mgetOrPut(channelReqId, @[]).add(messagingReqId)
|
||||
idx.inc()
|
||||
|
||||
self.pruneCompletedChannelReqs()
|
||||
self.markSegmentInflight(channelReqId, messagingReqId)
|
||||
|
||||
proc send*(
|
||||
self: ReliableChannel, payload: seq[byte], ephemeral: bool = false
|
||||
@ -283,23 +296,20 @@ proc send*(
|
||||
##
|
||||
## `rate_limit_manager.enqueueToSend` emits a `ReadyToSendEvent` with
|
||||
## the SDS messages cleared for transmission; the channel's listener
|
||||
## then runs the final stage (encryption -> dispatch). The
|
||||
## `persistenceReqType` is carried alongside each segment in
|
||||
## `pendingMessagingRequests` and stamped onto the eventual
|
||||
## `MessageEnvelope`.
|
||||
## then runs the final stage (encryption -> dispatch).
|
||||
##
|
||||
## The returned `RequestId` is the channel-level parent of one-or-more
|
||||
## messaging-layer `RequestId`s; the mapping is recorded in
|
||||
## `self.requestIds`.
|
||||
## messaging-layer `RequestId`s; the mapping is held in
|
||||
## `self.channelReqs` until every segment is final.
|
||||
if payload.len == 0:
|
||||
return err("empty payload")
|
||||
|
||||
let channelReqId = RequestId.new(self.rng)
|
||||
self.requestIds[channelReqId] = @[]
|
||||
|
||||
let persistenceReqType =
|
||||
if ephemeral: MessagePersistence.Ephemeral else: MessagePersistence.Persistent
|
||||
|
||||
var segmentCount = 0
|
||||
var enqueued: seq[seq[byte]]
|
||||
for segmentBytes in self.segmentation.performSegmentation(payload):
|
||||
## Segments arrive already encoded; the segmentation module owns
|
||||
## the wire format so SDS only ever sees opaque bytes.
|
||||
@ -307,14 +317,13 @@ proc send*(
|
||||
self.channelId, self.senderId, segmentBytes
|
||||
).valueOr:
|
||||
return err("SDS wrap failed: " & error)
|
||||
self.pendingMessagingRequests.add(
|
||||
PendingMessagingRequest(
|
||||
channelReqId: channelReqId,
|
||||
messagingReqId: none(RequestId),
|
||||
persistenceReqType: persistenceReqType,
|
||||
segmentSendState: SegmentSendState.AwaitingRateLimit,
|
||||
)
|
||||
)
|
||||
enqueued.add(sdsBytes)
|
||||
segmentCount.inc()
|
||||
|
||||
self.channelReqs[channelReqId] =
|
||||
ChannelReqState.init(persistenceReqType, segmentCount)
|
||||
|
||||
for sdsBytes in enqueued:
|
||||
self.rateLimit.enqueueToSend(sdsBytes)
|
||||
|
||||
return ok(channelReqId)
|
||||
@ -393,8 +402,7 @@ proc new*(
|
||||
segmentation: SegmentationHandler.new(segConfig),
|
||||
sdsHandler: SdsHandler.new(sdsConfig, senderId),
|
||||
rateLimit: RateLimitManager.new(rateConfig, channelId, brokerCtx),
|
||||
requestIds: initTable[RequestId, seq[RequestId]](),
|
||||
pendingMessagingRequests: @[],
|
||||
channelReqs: initOrderedTable[RequestId, ChannelReqState](),
|
||||
brokerCtx: brokerCtx,
|
||||
)
|
||||
|
||||
@ -402,8 +410,8 @@ proc new*(
|
||||
## listeners on `chn.brokerCtx`, filtered to traffic addressed to
|
||||
## this channel. Keeping the listeners (and the handler procs they
|
||||
## call) inside the channel lets `onReadyToSend` /
|
||||
## `onMessageReceived` / `onMessageSent` / `onMessageError` stay
|
||||
## private — the manager doesn't need to know about them.
|
||||
## `onMessageReceived` / `onMessageFinal` stay private — the
|
||||
## manager doesn't need to know about them.
|
||||
discard ReadyToSendEvent.listen(
|
||||
chn.brokerCtx,
|
||||
proc(evt: ReadyToSendEvent): Future[void] {.async: (raises: []).} =
|
||||
@ -432,13 +440,13 @@ proc new*(
|
||||
discard MessageSentEvent.listen(
|
||||
chn.brokerCtx,
|
||||
proc(evt: MessageSentEvent): Future[void] {.async: (raises: []).} =
|
||||
chn.onMessageSent(evt.requestId),
|
||||
chn.onMessageFinal(evt.requestId, MessagingOutcome.Sent),
|
||||
)
|
||||
|
||||
discard MessageErrorEvent.listen(
|
||||
chn.brokerCtx,
|
||||
proc(evt: MessageErrorEvent): Future[void] {.async: (raises: []).} =
|
||||
chn.onMessageError(evt.requestId),
|
||||
chn.onMessageFinal(evt.requestId, MessagingOutcome.Failed),
|
||||
)
|
||||
|
||||
return chn
|
||||
|
||||
@ -5,8 +5,7 @@ import
|
||||
waku/discovery/waku_dnsdisc,
|
||||
waku/discovery/waku_discv5,
|
||||
waku/waku_core/peers,
|
||||
waku/node/waku_node,
|
||||
waku/node/kernel_api,
|
||||
waku/waku_node,
|
||||
library/declare_lib
|
||||
|
||||
proc retrieveBootstrapNodes(
|
||||
|
||||
@ -8,8 +8,7 @@ import
|
||||
waku/waku_filter_v2/common,
|
||||
waku/waku_core/subscription/push_handler,
|
||||
waku/node/peer_manager/peer_manager,
|
||||
waku/node/waku_node,
|
||||
waku/node/kernel_api,
|
||||
waku/waku_node,
|
||||
waku/waku_core/topics/pubsub_topic,
|
||||
waku/waku_core/topics/content_topic,
|
||||
library/events/json_message_event,
|
||||
|
||||
@ -7,7 +7,7 @@ import
|
||||
waku/waku_core/message,
|
||||
waku/waku_core/topics/pubsub_topic,
|
||||
waku/waku_core/topics,
|
||||
waku/node/kernel_api/relay,
|
||||
waku/node/waku_node/relay,
|
||||
waku/waku_relay/protocol,
|
||||
waku/node/peer_manager,
|
||||
library/events/json_message_event,
|
||||
|
||||
12
nimble.lock
12
nimble.lock
@ -328,8 +328,8 @@
|
||||
}
|
||||
},
|
||||
"brokers": {
|
||||
"version": "#v2.0.1",
|
||||
"vcsRevision": "2093ca4d50e581adda73fee7fd16231f990f4cbe",
|
||||
"version": "#v3.1.1",
|
||||
"vcsRevision": "a7316a35f1b62e3497ae8ee0fc1aace74df0beb2",
|
||||
"url": "https://github.com/NagyZoltanPeter/nim-brokers.git",
|
||||
"downloadMethod": "git",
|
||||
"dependencies": [
|
||||
@ -341,7 +341,7 @@
|
||||
"cbor_serialization"
|
||||
],
|
||||
"checksums": {
|
||||
"sha1": "cc74c987af94537e9d44d1b0143aa417299040c5"
|
||||
"sha1": "4447d7c1f9da14ae439afb23aee45116ce2ecb40"
|
||||
}
|
||||
},
|
||||
"stint": {
|
||||
@ -620,8 +620,8 @@
|
||||
}
|
||||
},
|
||||
"sds": {
|
||||
"version": "#2e9a7683f0e180bf112135fae3a3803eed8490d4",
|
||||
"vcsRevision": "2e9a7683f0e180bf112135fae3a3803eed8490d4",
|
||||
"version": "#abdd40cc645f1b024c3ee99cced7e287c4e4c441",
|
||||
"vcsRevision": "abdd40cc645f1b024c3ee99cced7e287c4e4c441",
|
||||
"url": "https://github.com/logos-messaging/nim-sds.git",
|
||||
"downloadMethod": "git",
|
||||
"dependencies": [
|
||||
@ -636,7 +636,7 @@
|
||||
"taskpools"
|
||||
],
|
||||
"checksums": {
|
||||
"sha1": "d13f1bf8d1b90b27e9edfc063b043831242cda19"
|
||||
"sha1": "61c4ae13c6896bfa70e662520e8660a78c7f438c"
|
||||
}
|
||||
},
|
||||
"ffi": {
|
||||
|
||||
@ -14,7 +14,7 @@ import
|
||||
waku_core,
|
||||
events/message_events,
|
||||
waku_relay/protocol,
|
||||
node/kernel_api/filter,
|
||||
node/waku_node/filter,
|
||||
node/subscription_manager,
|
||||
]
|
||||
import waku/factory/waku_conf
|
||||
|
||||
@ -323,3 +323,104 @@ suite "Reliable Channel - send state machine":
|
||||
## `messagingReqId`s from a fake `SendHandler`, finalise some, and
|
||||
## assert prune only fires once every sibling is final.
|
||||
skip()
|
||||
|
||||
asyncTest "sibling MessageSentEvent during sendHandler await does not corrupt state":
|
||||
## Regression test for the prune-during-await race
|
||||
## (PR #3914 review comment r3324891059). Locks in that a sibling
|
||||
## `MessageSentEvent` firing while `onReadyToSend` is paused at an
|
||||
## `await` does not lose the second `channelReqId`'s terminal
|
||||
## event.
|
||||
const
|
||||
channelId = ChannelId("sm-race-channel")
|
||||
contentTopic = ContentTopic("/reliable-channel/test/sm-race")
|
||||
|
||||
var waku: Waku
|
||||
var manager: ReliableChannelManager
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
waku = (await createNode(createApiNodeConf())).expect("createNode")
|
||||
waku.mountMessagingClient().expect("mountMessagingClient")
|
||||
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
|
||||
manager = waku.reliableChannelManager
|
||||
|
||||
setNoopEncryption()
|
||||
|
||||
var msgReqIds: seq[RequestId]
|
||||
var sendsReturned = 0
|
||||
let fakeSend: SendHandler = proc(
|
||||
env: MessageEnvelope
|
||||
): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} =
|
||||
## Call 2 fires the first segment's terminal event and then
|
||||
## yields, so the listener task runs while the second segment
|
||||
## is still mid-`await` in `onReadyToSend` — the exact race
|
||||
## window the regression test targets.
|
||||
let id = RequestId("race-msg-req-" & $(msgReqIds.len + 1))
|
||||
msgReqIds.add(id)
|
||||
if msgReqIds.len == 2:
|
||||
waku_message_events.MessageSentEvent.emit(
|
||||
brokerCtx,
|
||||
waku_message_events.MessageSentEvent(requestId: msgReqIds[0], messageHash: ""),
|
||||
)
|
||||
await sleepAsync(50.milliseconds)
|
||||
sendsReturned.inc()
|
||||
return ok(id)
|
||||
|
||||
discard manager
|
||||
.createReliableChannel(
|
||||
channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend
|
||||
)
|
||||
.expect("createReliableChannel")
|
||||
|
||||
var finalisedReqIds: seq[RequestId]
|
||||
let bothFinalised = newFuture[void]("both-finalised")
|
||||
discard ChannelMessageSentEvent
|
||||
.listen(
|
||||
brokerCtx,
|
||||
proc(evt: ChannelMessageSentEvent) {.async: (raises: []).} =
|
||||
if evt.channelId == channelId:
|
||||
finalisedReqIds.add(evt.requestId)
|
||||
if finalisedReqIds.len == 2 and not bothFinalised.finished():
|
||||
bothFinalised.complete()
|
||||
,
|
||||
)
|
||||
.expect("listen ChannelMessageSentEvent")
|
||||
|
||||
let channelReqId1 = manager.send(channelId, "first".toBytes()).expect("send 1")
|
||||
|
||||
## Drain the first segment fully before queueing the second, so
|
||||
## the rate-limit FIFO between sibling sends isn't itself under
|
||||
## test here.
|
||||
let firstDispatched = Moment.now() + 1.seconds
|
||||
while Moment.now() < firstDispatched and msgReqIds.len < 1:
|
||||
await sleepAsync(5.milliseconds)
|
||||
check msgReqIds.len == 1
|
||||
|
||||
let channelReqId2 = manager.send(channelId, "second".toBytes()).expect("send 2")
|
||||
|
||||
## Wait until `fakeSend(m2)` has fully returned and yield once
|
||||
## more so `onReadyToSend`'s post-await continuation gets a chance
|
||||
## to register `id2` in `inflightMessagingIds` before we emit its
|
||||
## terminal event.
|
||||
let dispatchDeadline = Moment.now() + 1.seconds
|
||||
while Moment.now() < dispatchDeadline and sendsReturned < 2:
|
||||
await sleepAsync(5.milliseconds)
|
||||
check sendsReturned == 2
|
||||
await sleepAsync(50.milliseconds)
|
||||
|
||||
## Finalise the second segment from the outside. If the race
|
||||
## corrupted state, `channelReqId2`'s entry would never reach
|
||||
## `inflightMessagingIds` and this event would silently miss.
|
||||
waku_message_events.MessageSentEvent.emit(
|
||||
brokerCtx,
|
||||
waku_message_events.MessageSentEvent(requestId: msgReqIds[1], messageHash: ""),
|
||||
)
|
||||
|
||||
let arrived = await bothFinalised.withTimeout(2.seconds)
|
||||
check arrived
|
||||
if arrived:
|
||||
check finalisedReqIds.len == 2
|
||||
check channelReqId1 in finalisedReqIds
|
||||
check channelReqId2 in finalisedReqIds
|
||||
|
||||
(await waku.stop()).expect("stop")
|
||||
|
||||
@ -11,7 +11,14 @@ import
|
||||
|
||||
import
|
||||
tests/testlib/[wakunode, wakucore],
|
||||
waku/[waku_node, waku_enr, net/auto_port, discovery/waku_discv5, node/waku_metrics],
|
||||
waku/[
|
||||
waku_node,
|
||||
net/net_config,
|
||||
waku_enr,
|
||||
net/auto_port,
|
||||
discovery/waku_discv5,
|
||||
node/waku_metrics,
|
||||
],
|
||||
waku/factory/[
|
||||
node_factory,
|
||||
internal_config,
|
||||
|
||||
@ -11,8 +11,7 @@ import
|
||||
waku/[
|
||||
waku_core,
|
||||
node/peer_manager,
|
||||
node/waku_node,
|
||||
node/kernel_api,
|
||||
waku_node,
|
||||
waku_filter_v2,
|
||||
waku_filter_v2/client,
|
||||
waku_filter_v2/subscriptions,
|
||||
|
||||
@ -16,10 +16,10 @@ import
|
||||
node/health_monitor/topic_health,
|
||||
node/health_monitor/node_health_monitor,
|
||||
messaging_client,
|
||||
node/kernel_api/relay,
|
||||
node/kernel_api/store,
|
||||
node/kernel_api/lightpush,
|
||||
node/kernel_api/filter,
|
||||
node/waku_node/relay,
|
||||
node/waku_node/store,
|
||||
node/waku_node/lightpush,
|
||||
node/waku_node/filter,
|
||||
events/health_events,
|
||||
events/peer_events,
|
||||
waku_archive,
|
||||
|
||||
@ -11,9 +11,7 @@ import
|
||||
waku/[
|
||||
waku_core,
|
||||
node/peer_manager,
|
||||
node/waku_node,
|
||||
node/kernel_api,
|
||||
node/kernel_api/lightpush,
|
||||
waku_node,
|
||||
waku_lightpush_legacy,
|
||||
waku_lightpush_legacy/common,
|
||||
waku_lightpush_legacy/protocol_metrics,
|
||||
|
||||
@ -8,15 +8,7 @@ import
|
||||
libp2p/crypto/crypto
|
||||
|
||||
import
|
||||
waku/[
|
||||
waku_core,
|
||||
node/peer_manager,
|
||||
node/waku_node,
|
||||
node/kernel_api,
|
||||
node/kernel_api/lightpush,
|
||||
waku_lightpush,
|
||||
waku_rln_relay,
|
||||
],
|
||||
waku/[waku_core, node/peer_manager, waku_node, waku_lightpush, waku_rln_relay],
|
||||
../testlib/[wakucore, wakunode, testasync, futures],
|
||||
../resources/payloads,
|
||||
../waku_rln_relay/[rln/waku_rln_relay_utils, utils_onchain]
|
||||
|
||||
@ -13,14 +13,8 @@ import
|
||||
brokers/broker_context
|
||||
|
||||
import
|
||||
waku/[
|
||||
waku_node,
|
||||
node/kernel_api,
|
||||
discovery/waku_discv5,
|
||||
waku_peer_exchange,
|
||||
node/peer_manager,
|
||||
waku_core,
|
||||
],
|
||||
waku/
|
||||
[waku_node, discovery/waku_discv5, waku_peer_exchange, node/peer_manager, waku_core],
|
||||
../waku_peer_exchange/utils,
|
||||
../testlib/[wakucore, wakunode, testasync]
|
||||
|
||||
|
||||
@ -16,8 +16,7 @@ import
|
||||
waku/[
|
||||
waku_core,
|
||||
node/peer_manager,
|
||||
node/waku_node,
|
||||
node/kernel_api,
|
||||
waku_node,
|
||||
discovery/waku_discv5,
|
||||
waku_filter_v2/common,
|
||||
waku_relay/protocol,
|
||||
|
||||
@ -13,11 +13,9 @@ from std/times import epochTime
|
||||
|
||||
import
|
||||
../../../waku/[
|
||||
node/waku_node,
|
||||
node/peer_manager,
|
||||
waku_core,
|
||||
waku_node,
|
||||
node/kernel_api,
|
||||
common/error_handling,
|
||||
waku_rln_relay,
|
||||
waku_rln_relay/rln,
|
||||
|
||||
@ -14,8 +14,7 @@ import
|
||||
waku/[
|
||||
waku_core/topics/pubsub_topic,
|
||||
waku_core/topics/sharding,
|
||||
node/waku_node,
|
||||
node/kernel_api,
|
||||
waku_node,
|
||||
common/paging,
|
||||
waku_core,
|
||||
waku_store/common,
|
||||
|
||||
@ -5,8 +5,7 @@ import std/[options, sequtils, sets], testutils/unittests, chronos, libp2p/crypt
|
||||
import
|
||||
waku/[
|
||||
common/paging,
|
||||
node/waku_node,
|
||||
node/kernel_api,
|
||||
waku_node,
|
||||
node/peer_manager,
|
||||
waku_core,
|
||||
waku_core/message/digest,
|
||||
|
||||
@ -5,5 +5,6 @@ import ./test_backend
|
||||
import ./test_lifecycle
|
||||
import ./test_facade
|
||||
import ./test_encoding
|
||||
import ./test_sds_persistency
|
||||
import ./test_string_lookup
|
||||
import ./test_singleton
|
||||
|
||||
155
tests/persistency/test_sds_persistency.nim
Normal file
155
tests/persistency/test_sds_persistency.nim
Normal file
@ -0,0 +1,155 @@
|
||||
{.used.}
|
||||
|
||||
## Behavioural tests for the SDS Persistence adapter (nim-sds 0.3.0 snapshot
|
||||
## model). Importing `sds_persistency` also compile-checks the real adapter.
|
||||
##
|
||||
## Writes go through the fire-and-forget Job path (the Future resolves when
|
||||
## the op is queued, not applied — Persistency v1), so every read-back polls
|
||||
## until the row appears/disappears.
|
||||
|
||||
import std/[options, os, times]
|
||||
import chronos, results
|
||||
import testutils/unittests
|
||||
import waku/persistency/persistency
|
||||
import waku/persistency/keys
|
||||
import waku/persistency/sds_persistency
|
||||
|
||||
proc tmpRoot(label: string): string =
|
||||
let p = getTempDir() / ("sds_persistency_test_" & label & "_" & $epochTime().int)
|
||||
removeDir(p)
|
||||
p
|
||||
|
||||
proc pollExists(
|
||||
t: Job, category: string, k: Key, timeoutMs = 1000
|
||||
): Future[bool] {.async.} =
|
||||
let deadline = epochTime() + (timeoutMs.float / 1000.0)
|
||||
while epochTime() < deadline:
|
||||
let r = await t.exists(category, k)
|
||||
if r.isOk and r.get():
|
||||
return true
|
||||
await sleepAsync(chronos.milliseconds(2))
|
||||
return false
|
||||
|
||||
proc pollGone(
|
||||
t: Job, category: string, k: Key, timeoutMs = 1000
|
||||
): Future[bool] {.async.} =
|
||||
let deadline = epochTime() + (timeoutMs.float / 1000.0)
|
||||
while epochTime() < deadline:
|
||||
let r = await t.exists(category, k)
|
||||
if r.isOk and not r.get():
|
||||
return true
|
||||
await sleepAsync(chronos.milliseconds(2))
|
||||
return false
|
||||
|
||||
proc mkMsg(channelId: SdsChannelID, msgId: SdsMessageID, lamport: int64): SdsMessage =
|
||||
SdsMessage.init(
|
||||
messageId = msgId,
|
||||
lamportTimestamp = lamport,
|
||||
causalHistory = @[],
|
||||
channelId = channelId,
|
||||
content = @[byte(1), byte(2)],
|
||||
bloomFilter = @[],
|
||||
)
|
||||
|
||||
suite "SDS persistency adapter (0.3.0 snapshot model)":
|
||||
asyncTest "saveChannelMeta + updateHistory round-trip via loadChannel":
|
||||
let root = tmpRoot("roundtrip")
|
||||
defer:
|
||||
removeDir(root)
|
||||
let p = Persistency.instance(root).get()
|
||||
defer:
|
||||
Persistency.reset()
|
||||
let job = p.openJob("sds").get()
|
||||
let persistence = newSdsPersistence(job)
|
||||
let channelId = "chan-1".SdsChannelID
|
||||
|
||||
var meta = ChannelMeta.init()
|
||||
meta.lamportTimestamp = 42
|
||||
check (await persistence.saveChannelMeta(channelId, meta)).isOk
|
||||
check (await job.pollExists(CatMeta, toKey(channelId)))
|
||||
|
||||
# append out of (lamport) order on purpose; loadChannel must sort.
|
||||
var upd = HistoryUpdate.init()
|
||||
upd.append = @[mkMsg(channelId, "m2", 2), mkMsg(channelId, "m1", 1)]
|
||||
check (await persistence.updateHistory(channelId, upd)).isOk
|
||||
check (await job.pollExists(CatLog, key(channelId, "m2")))
|
||||
|
||||
let data = (await persistence.loadChannel(channelId)).valueOr:
|
||||
check false
|
||||
return
|
||||
check data.meta.lamportTimestamp == 42
|
||||
check data.messageHistory.len == 2
|
||||
check data.messageHistory[0].messageId == "m1"
|
||||
check data.messageHistory[1].messageId == "m2"
|
||||
|
||||
asyncTest "loadChannel on a fresh channel returns empty ChannelData":
|
||||
let root = tmpRoot("empty")
|
||||
defer:
|
||||
removeDir(root)
|
||||
let p = Persistency.instance(root).get()
|
||||
defer:
|
||||
Persistency.reset()
|
||||
let job = p.openJob("sds").get()
|
||||
let persistence = newSdsPersistence(job)
|
||||
|
||||
let data = (await persistence.loadChannel("nope".SdsChannelID)).valueOr:
|
||||
check false
|
||||
return
|
||||
check data.meta.lamportTimestamp == 0
|
||||
check data.messageHistory.len == 0
|
||||
|
||||
asyncTest "updateHistory evict removes a log row":
|
||||
let root = tmpRoot("evict")
|
||||
defer:
|
||||
removeDir(root)
|
||||
let p = Persistency.instance(root).get()
|
||||
defer:
|
||||
Persistency.reset()
|
||||
let job = p.openJob("sds").get()
|
||||
let persistence = newSdsPersistence(job)
|
||||
let channelId = "c".SdsChannelID
|
||||
|
||||
var upd = HistoryUpdate.init()
|
||||
upd.append = @[mkMsg(channelId, "a", 1), mkMsg(channelId, "b", 2)]
|
||||
check (await persistence.updateHistory(channelId, upd)).isOk
|
||||
check (await job.pollExists(CatLog, key(channelId, "b")))
|
||||
|
||||
var ev = HistoryUpdate.init()
|
||||
ev.evict = @["a".SdsMessageID]
|
||||
check (await persistence.updateHistory(channelId, ev)).isOk
|
||||
check (await job.pollGone(CatLog, key(channelId, "a")))
|
||||
|
||||
let data = (await persistence.loadChannel(channelId)).valueOr:
|
||||
check false
|
||||
return
|
||||
check data.messageHistory.len == 1
|
||||
check data.messageHistory[0].messageId == "b"
|
||||
|
||||
asyncTest "dropChannel wipes meta and log":
|
||||
let root = tmpRoot("drop")
|
||||
defer:
|
||||
removeDir(root)
|
||||
let p = Persistency.instance(root).get()
|
||||
defer:
|
||||
Persistency.reset()
|
||||
let job = p.openJob("sds").get()
|
||||
let persistence = newSdsPersistence(job)
|
||||
let channelId = "d".SdsChannelID
|
||||
|
||||
var meta = ChannelMeta.init()
|
||||
meta.lamportTimestamp = 7
|
||||
check (await persistence.saveChannelMeta(channelId, meta)).isOk
|
||||
var upd = HistoryUpdate.init()
|
||||
upd.append = @[mkMsg(channelId, "x", 1)]
|
||||
check (await persistence.updateHistory(channelId, upd)).isOk
|
||||
check (await job.pollExists(CatMeta, toKey(channelId)))
|
||||
check (await job.pollExists(CatLog, key(channelId, "x")))
|
||||
|
||||
check (await persistence.dropChannel(channelId)).isOk
|
||||
check (await job.pollGone(CatMeta, toKey(channelId)))
|
||||
|
||||
let data = (await persistence.loadChannel(channelId)).valueOr:
|
||||
check false
|
||||
return
|
||||
check data.meta.lamportTimestamp == 0
|
||||
check data.messageHistory.len == 0
|
||||
@ -9,7 +9,12 @@ import
|
||||
libp2p/stream/bufferstream,
|
||||
libp2p/stream/connection,
|
||||
libp2p/crypto/crypto
|
||||
import waku/waku_core, waku/waku_node, ./testlib/wakucore, ./testlib/wakunode
|
||||
import
|
||||
waku/waku_core,
|
||||
waku/waku_node,
|
||||
waku/node/health_monitor,
|
||||
./testlib/wakucore,
|
||||
./testlib/wakunode
|
||||
|
||||
suite "Waku Keepalive":
|
||||
asyncTest "handle ping keepalives":
|
||||
|
||||
@ -10,6 +10,7 @@ import
|
||||
import
|
||||
waku/[
|
||||
waku_node,
|
||||
net/net_config,
|
||||
waku_core/topics,
|
||||
node/peer_manager,
|
||||
waku_enr,
|
||||
|
||||
@ -21,8 +21,7 @@ import
|
||||
waku_enr/capabilities,
|
||||
factory/conf_builder/conf_builder,
|
||||
factory/waku,
|
||||
node/waku_node,
|
||||
node/kernel_api,
|
||||
waku_node,
|
||||
node/peer_manager,
|
||||
],
|
||||
../testlib/[wakucore, testasync, assertions, futures, wakunode, testutils],
|
||||
|
||||
@ -3,7 +3,8 @@
|
||||
import std/[options, sequtils, json], testutils/unittests, results, chronos
|
||||
|
||||
import
|
||||
waku/node/[peer_manager, waku_node, kernel_api],
|
||||
waku/node/peer_manager,
|
||||
waku/waku_node,
|
||||
waku/waku_core,
|
||||
waku/waku_filter_v2/[common, client, subscriptions, protocol, rpc_codec],
|
||||
../testlib/[wakucore, testasync, testutils, futures, sequtils, wakunode],
|
||||
|
||||
@ -97,7 +97,8 @@ suite "Onchain group manager":
|
||||
check:
|
||||
merkleRootBefore != merkleRootAfter
|
||||
|
||||
test "trackRootChanges: should fetch history correctly":
|
||||
test "trackRootChanges: should fetch history correctly: fetch single root()":
|
||||
# basic check for the soon to be deprecated root contract function, is replaced by getRecentRoots()
|
||||
# TODO: We can't use `trackRootChanges()` directly in this test because its current implementation
|
||||
# relies on a busy loop rather than event-based monitoring. but that busy loop fetch root every 5 seconds
|
||||
# so we can't use it in this test.
|
||||
@ -107,7 +108,8 @@ suite "Onchain group manager":
|
||||
(waitFor manager.init()).isOkOr:
|
||||
raiseAssert $error
|
||||
|
||||
let merkleRootBefore = waitFor manager.fetchMerkleRoot()
|
||||
let merkleRootBefore = (waitFor manager.fetchMerkleRoot()).valueOr:
|
||||
raiseAssert "Failed to fetch merkle root before: " & error
|
||||
|
||||
for i in 0 ..< credentials.len():
|
||||
info "Registering credential", index = i, credential = credentials[i]
|
||||
@ -115,12 +117,83 @@ suite "Onchain group manager":
|
||||
assert false, "Failed to register credential " & $i & ": " & error
|
||||
discard waitFor manager.updateRoots()
|
||||
|
||||
let merkleRootAfter = waitFor manager.fetchMerkleRoot()
|
||||
let merkleRootAfter = (waitFor manager.fetchMerkleRoot()).valueOr:
|
||||
raiseAssert "Failed to fetch merkle root after: " & error
|
||||
|
||||
check:
|
||||
merkleRootBefore != merkleRootAfter
|
||||
manager.validRoots.len() == credentialCount
|
||||
|
||||
test "trackRootChanges: should fetch history correctly: fetch root cache":
|
||||
# Verify that the group_manager list of valid roots is updated correctly from the recent roots
|
||||
# cache as new credentials are registered.
|
||||
# TODO: We can't use `trackRootChanges()` directly in this test because its current implementation
|
||||
# relies on a busy loop rather than event-based monitoring. but that busy loop fetch root every 5 seconds
|
||||
# so we can't use it in this test.
|
||||
|
||||
const credentialCount = RlnContractRootCacheSize
|
||||
let credentials = generateCredentials(credentialCount)
|
||||
(waitFor manager.init()).isOkOr:
|
||||
raiseAssert $error
|
||||
|
||||
let merkleRootCacheBefore = (waitFor manager.fetchMerkleRootsCache()).valueOr:
|
||||
raiseAssert "Failed to fetch merkle root cache before: " & error
|
||||
|
||||
check:
|
||||
merkleRootCacheBefore.len == RlnContractRootCacheSize * 32
|
||||
merkleRootCacheBefore.allIt(it == 0'u8)
|
||||
manager.validRoots.len() == 0
|
||||
|
||||
for i in 0 ..< credentials.len():
|
||||
info "Registering credential", index = i, credential = credentials[i]
|
||||
(waitFor manager.register(credentials[i], UserMessageLimit(20))).isOkOr:
|
||||
assert false, "Failed to register credential " & $i & ": " & error
|
||||
discard waitFor manager.updateRecentRoots()
|
||||
|
||||
let merkleRootCacheAfter = (waitFor manager.fetchMerkleRootsCache()).valueOr:
|
||||
raiseAssert "Failed to fetch merkle root cache after: " & error
|
||||
|
||||
check:
|
||||
merkleRootCacheAfter.len == RlnContractRootCacheSize * 32
|
||||
not merkleRootCacheAfter.allIt(it == 0'u8)
|
||||
manager.validRoots.len() == credentialCount
|
||||
manager.validRoots.items().toSeq().allIt(it != default(MerkleNode))
|
||||
|
||||
test "trackRootChanges: oldest roots are evicted once the window is exceeded":
|
||||
const
|
||||
initialCount = AcceptableRootWindowSize - RlnContractRootCacheSize
|
||||
additionalCount = RlnContractRootCacheSize + 1
|
||||
# one more than the cache size to ensure eviction occurs
|
||||
let credentials = generateCredentials(initialCount + additionalCount)
|
||||
(waitFor manager.init()).isOkOr:
|
||||
raiseAssert $error
|
||||
|
||||
# Register the first credentials and snapshot the 3 oldest roots.
|
||||
for i in 0 ..< initialCount:
|
||||
(waitFor manager.register(credentials[i], UserMessageLimit(20))).isOkOr:
|
||||
assert false, "Failed to register credential " & $i & ": " & error
|
||||
discard waitFor manager.updateRecentRoots()
|
||||
|
||||
check manager.validRoots.len() >= 3
|
||||
let firstThreeBefore =
|
||||
@[manager.validRoots[0], manager.validRoots[1], manager.validRoots[2]]
|
||||
|
||||
# Register the remaining credentials, pushing the deque past AcceptableRootWindowSize.
|
||||
for i in initialCount ..< credentials.len():
|
||||
(waitFor manager.register(credentials[i], UserMessageLimit(20))).isOkOr:
|
||||
assert false, "Failed to register credential " & $i & ": " & error
|
||||
discard waitFor manager.updateRecentRoots()
|
||||
|
||||
let rootsAfter = manager.validRoots.items().toSeq()
|
||||
|
||||
# AcceptableRootWindowSize + 1 registrations evicts exactly the single oldest root,
|
||||
# so only the first of the original three is gone; the other two remain.
|
||||
check:
|
||||
manager.validRoots.len() == AcceptableRootWindowSize
|
||||
firstThreeBefore[0] notin rootsAfter
|
||||
firstThreeBefore[1] in rootsAfter
|
||||
firstThreeBefore[2] in rootsAfter
|
||||
|
||||
test "register: should guard against uninitialized state":
|
||||
let dummyCommitment = default(IDCommitment)
|
||||
|
||||
@ -214,7 +287,7 @@ suite "Onchain group manager":
|
||||
|
||||
waitFor fut
|
||||
|
||||
let rootUpdated = waitFor manager.updateRoots()
|
||||
let rootUpdated = waitFor manager.updateRecentRoots()
|
||||
|
||||
if rootUpdated:
|
||||
let proofResult = waitFor manager.fetchMerkleProofElements()
|
||||
@ -296,7 +369,7 @@ suite "Onchain group manager":
|
||||
assert false, "error returned when calling register: " & error
|
||||
waitFor fut
|
||||
|
||||
let rootUpdated = waitFor manager.updateRoots()
|
||||
let rootUpdated = waitFor manager.updateRecentRoots()
|
||||
|
||||
if rootUpdated:
|
||||
let proofResult = waitFor manager.fetchMerkleProofElements()
|
||||
@ -333,7 +406,7 @@ suite "Onchain group manager":
|
||||
|
||||
let messageBytes = "Hello".toBytes()
|
||||
|
||||
let rootUpdated = waitFor manager.updateRoots()
|
||||
let rootUpdated = waitFor manager.updateRecentRoots()
|
||||
|
||||
manager.merkleProofCache = newSeq[byte](640)
|
||||
for i in 0 ..< 640:
|
||||
@ -362,7 +435,7 @@ suite "Onchain group manager":
|
||||
verified == false
|
||||
|
||||
test "root queue should be updated correctly":
|
||||
const credentialCount = 12
|
||||
const credentialCount = 9
|
||||
let credentials = generateCredentials(credentialCount)
|
||||
(waitFor manager.init()).isOkOr:
|
||||
raiseAssert $error
|
||||
@ -391,7 +464,7 @@ suite "Onchain group manager":
|
||||
for i in 0 ..< credentials.len():
|
||||
(waitFor manager.register(credentials[i], UserMessageLimit(20))).isOkOr:
|
||||
assert false, "Failed to register credential " & $i & ": " & error
|
||||
discard waitFor manager.updateRoots()
|
||||
discard waitFor manager.updateRecentRoots()
|
||||
|
||||
waitFor allFutures(futures)
|
||||
|
||||
@ -436,7 +509,7 @@ suite "Onchain group manager":
|
||||
(waitFor manager.register(credentials, UserMessageLimit(20))).isOkOr:
|
||||
assert false, "register failed: " & error
|
||||
|
||||
discard waitFor manager.updateRoots()
|
||||
discard waitFor manager.updateRecentRoots()
|
||||
let roots = manager.validRoots.items().toSeq()
|
||||
require:
|
||||
roots.len > 0
|
||||
|
||||
322
tools/sync-nimble-lock.sh
Executable file
322
tools/sync-nimble-lock.sh
Executable file
@ -0,0 +1,322 @@
|
||||
#!/usr/bin/env bash
|
||||
#
|
||||
# sync-nimble-lock.sh
|
||||
#
|
||||
# Cross-check git-URL pinned `requires` in waku.nimble against nimble.lock and
|
||||
# sync the lock entry for any pin that CHANGED relative to a git base ref
|
||||
# (default: HEAD) -- and ONLY those entries. No other package is touched.
|
||||
#
|
||||
# It does NOT run `nimble lock` (which rewrites the whole file and churns
|
||||
# unrelated packages). Instead it computes the package sha1 checksum itself,
|
||||
# reproducing nimble's algorithm exactly (src/nimblepkg/checksums.nim):
|
||||
#
|
||||
# files = `git ls-files` in the package's git checkout at the pinned rev
|
||||
# files.sort() # lexicographic
|
||||
# sha1 = SHA1 over, for each existing regular file (in sorted order):
|
||||
# update(relative_path_string)
|
||||
# if symlink: update(symlink_target_string)
|
||||
# else: update(file_bytes) # 8192-byte chunks
|
||||
#
|
||||
# For each changed pin it updates exactly three fields of the matching lock
|
||||
# entry, preserving all formatting and every other entry byte-for-byte:
|
||||
# version = "#" + <rev-as-written-in-waku.nimble> (commit or tag)
|
||||
# vcsRevision = git rev-parse of the ref (resolves tags)
|
||||
# checksums.sha1 = the self-computed checksum
|
||||
#
|
||||
# The `dependencies` array is intentionally left untouched (see NOTE below).
|
||||
#
|
||||
# Usage:
|
||||
# tools/sync-nimble-lock.sh # dry-run; exit 1 if drift
|
||||
# tools/sync-nimble-lock.sh --apply # update nimble.lock
|
||||
# tools/sync-nimble-lock.sh --base origin/master # compare against a ref
|
||||
#
|
||||
# Exit codes: 0 = in sync / applied, 1 = drift (dry-run), 2 = usage/tooling error
|
||||
#
|
||||
# Portable across macOS (bash 3.2, BSD tools) and Linux: all logic is in
|
||||
# python3; bash only parses args and checks tools. Requires: git, python3.
|
||||
#
|
||||
# NOTE on `dependencies`: a version bump can in principle change a package's
|
||||
# direct dependency set. Reproducing nimble's dependency-name normalization
|
||||
# without running nimble is fragile, and the user-requested scope is
|
||||
# version/vcsRevision/sha1. If a bumped dependency added/removed a `requires`,
|
||||
# update its lock `dependencies` array by hand. The script warns when the
|
||||
# bumped package's own .nimble `requires` count differs from the lock entry.
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
APPLY=0
|
||||
BASE="HEAD"
|
||||
|
||||
usage() { sed -n '2,55p' "$0" | sed 's/^#\{0,1\} \{0,1\}//'; }
|
||||
|
||||
while [ $# -gt 0 ]; do
|
||||
case "$1" in
|
||||
--apply) APPLY=1 ;;
|
||||
--base) shift; [ $# -gt 0 ] || { echo "error: --base needs a ref" >&2; exit 2; }; BASE="$1" ;;
|
||||
--base=*) BASE="${1#*=}" ;;
|
||||
-h|--help) usage; exit 0 ;;
|
||||
*) echo "error: unknown argument: $1" >&2; exit 2 ;;
|
||||
esac
|
||||
shift
|
||||
done
|
||||
|
||||
command -v python3 >/dev/null 2>&1 || { echo "error: python3 is required" >&2; exit 2; }
|
||||
command -v git >/dev/null 2>&1 || { echo "error: git is required" >&2; exit 2; }
|
||||
|
||||
ROOT="$(git rev-parse --show-toplevel 2>/dev/null)" || { echo "error: not in a git repo" >&2; exit 2; }
|
||||
|
||||
export SYNC_ROOT="$ROOT" SYNC_APPLY="$APPLY" SYNC_BASE="$BASE" SYNC_PKGCACHE="${HOME}/.nimble/pkgcache"
|
||||
|
||||
exec python3 - <<'PYEOF'
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
ROOT = os.environ["SYNC_ROOT"]
|
||||
APPLY = os.environ["SYNC_APPLY"] == "1"
|
||||
BASE = os.environ["SYNC_BASE"]
|
||||
PKGCACHE = os.environ["SYNC_PKGCACHE"]
|
||||
|
||||
NIMBLE_FILE = os.path.join(ROOT, "waku.nimble")
|
||||
LOCK_FILE = os.path.join(ROOT, "nimble.lock")
|
||||
|
||||
REQ_RE = re.compile(r'requires\s+"(https?://[^"#]+)#([^"]+)"')
|
||||
COMMIT_RE = re.compile(r"^[0-9a-f]{40}$")
|
||||
NEAR_HASH_RE = re.compile(r"^[0-9a-fx]{38,42}$") # catches the leading-`x` typo
|
||||
|
||||
|
||||
def fail(msg):
|
||||
sys.stderr.write("error: %s\n" % msg)
|
||||
sys.exit(2)
|
||||
|
||||
|
||||
def warn(msg):
|
||||
sys.stderr.write("warning: %s\n" % msg)
|
||||
|
||||
|
||||
def norm_url(url):
|
||||
u = url.rstrip("/")
|
||||
return u[:-4] if u.endswith(".git") else u
|
||||
|
||||
|
||||
def git(args, cwd=None, check=True):
|
||||
r = subprocess.run(["git"] + args, cwd=cwd, capture_output=True, text=True)
|
||||
if check and r.returncode != 0:
|
||||
fail("git %s failed: %s" % (" ".join(args), (r.stderr or r.stdout).strip()))
|
||||
return r
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# nimble checksum reproduction (verified byte-for-byte against nimble v0.22.3)
|
||||
# ---------------------------------------------------------------------------
|
||||
def compute_checksum(checkout_dir):
|
||||
out = git(["-C", checkout_dir, "ls-files"]).stdout
|
||||
files = out.strip().splitlines()
|
||||
files.sort()
|
||||
h = hashlib.sha1()
|
||||
for rel in files:
|
||||
path = os.path.join(checkout_dir, rel)
|
||||
if not os.path.isfile(path):
|
||||
# Skips directories / gitlinks / broken symlinks, matching nimble's
|
||||
# `fileExists` guard (regular file or symlink-to-file only).
|
||||
continue
|
||||
h.update(rel.encode("utf-8"))
|
||||
if os.path.islink(path):
|
||||
h.update(os.readlink(path).encode("utf-8"))
|
||||
else:
|
||||
with open(path, "rb") as fh:
|
||||
while True:
|
||||
chunk = fh.read(8192)
|
||||
if not chunk:
|
||||
break
|
||||
h.update(chunk)
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
def get_checkout(url, rev, tmpdir):
|
||||
"""Return (checkout_dir, cleanup_fn). Reuses ~/.nimble/pkgcache when the
|
||||
exact commit is already cloned; otherwise clones from the URL."""
|
||||
# pkgcache dirs are suffixed with the commit sha (commit pins only).
|
||||
if os.path.isdir(PKGCACHE):
|
||||
for name in os.listdir(PKGCACHE):
|
||||
if name.endswith("_" + rev) and os.path.isdir(os.path.join(PKGCACHE, name, ".git")):
|
||||
cache = os.path.join(PKGCACHE, name)
|
||||
git(["-C", cache, "checkout", "-q", rev])
|
||||
return cache, (lambda: None)
|
||||
# Fall back to a fresh clone (network). Full clone, then checkout the ref.
|
||||
dest = os.path.join(tmpdir, "clone")
|
||||
print(" cloning %s ..." % url)
|
||||
git(["clone", "--quiet", url, dest])
|
||||
r = git(["-C", dest, "checkout", "-q", rev], check=False)
|
||||
if r.returncode != 0:
|
||||
# commit may live on a ref not fetched by default; try fetching it
|
||||
git(["-C", dest, "fetch", "--quiet", "origin", rev], check=False)
|
||||
git(["-C", dest, "checkout", "-q", rev])
|
||||
return dest, (lambda: shutil.rmtree(dest, ignore_errors=True))
|
||||
|
||||
|
||||
def dep_requires_count(checkout_dir):
|
||||
"""Best-effort count of git/registry `requires` in the dep's .nimble file,
|
||||
for a heads-up if the lock `dependencies` array may be stale."""
|
||||
nimbles = [f for f in os.listdir(checkout_dir) if f.endswith(".nimble")]
|
||||
if not nimbles:
|
||||
return None
|
||||
try:
|
||||
txt = open(os.path.join(checkout_dir, nimbles[0])).read()
|
||||
except OSError:
|
||||
return None
|
||||
n = 0
|
||||
for m in re.finditer(r'requires\s+"([^"]+)"', txt):
|
||||
n += len([p for p in m.group(1).split(",") if p.strip()])
|
||||
return n or None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# detect changes
|
||||
# ---------------------------------------------------------------------------
|
||||
def parse_changed(base):
|
||||
r = git(["-C", ROOT, "diff", base, "--", "waku.nimble"], check=False)
|
||||
if r.returncode != 0:
|
||||
fail("git diff against %r failed: %s" % (base, r.stderr.strip()))
|
||||
changed, seen = [], set()
|
||||
for line in r.stdout.splitlines():
|
||||
if not line.startswith("+") or line.startswith("+++"):
|
||||
continue
|
||||
m = REQ_RE.search(line[1:])
|
||||
if not m:
|
||||
continue
|
||||
url, rev = m.group(1), m.group(2)
|
||||
key = norm_url(url)
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
if not COMMIT_RE.match(rev) and NEAR_HASH_RE.match(rev):
|
||||
fail("invalid commit hash for %s: %r is not a valid 40-char hex SHA "
|
||||
"(stray character / typo?)" % (url, rev))
|
||||
changed.append((url, rev))
|
||||
return changed
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# surgical lock patch (text-level: preserves formatting & all other entries)
|
||||
# ---------------------------------------------------------------------------
|
||||
PKG_OPEN_RE = re.compile(r'^\s{4}"[^"]+":\s*\{\s*$')
|
||||
PKG_CLOSE_RE = re.compile(r'^\s{4}\},?\s*$')
|
||||
|
||||
|
||||
def set_value(line, key, val):
|
||||
return re.sub(r'(^\s*"' + re.escape(key) + r'":\s*")[^"]*(")',
|
||||
lambda m: m.group(1) + val + m.group(2), line, count=1)
|
||||
|
||||
|
||||
def patch_lock_text(text, url, version, vcs_rev, sha1):
|
||||
lines = text.splitlines(keepends=True)
|
||||
url_re = re.compile(r'^\s*"url":\s*"' + re.escape(url) + r'"\s*,?\s*$')
|
||||
ui = next((i for i, l in enumerate(lines) if url_re.match(l)), None)
|
||||
if ui is None:
|
||||
return None
|
||||
# block bounds
|
||||
start = next(i for i in range(ui, -1, -1) if PKG_OPEN_RE.match(lines[i]))
|
||||
end = next(i for i in range(ui, len(lines)) if PKG_CLOSE_RE.match(lines[i]))
|
||||
done = set()
|
||||
for i in range(start, end + 1):
|
||||
if "version" not in done and re.match(r'^\s*"version":', lines[i]):
|
||||
lines[i] = set_value(lines[i], "version", version); done.add("version")
|
||||
elif "vcsRevision" not in done and re.match(r'^\s*"vcsRevision":', lines[i]):
|
||||
lines[i] = set_value(lines[i], "vcsRevision", vcs_rev); done.add("vcsRevision")
|
||||
elif "sha1" not in done and re.match(r'^\s*"sha1":', lines[i]):
|
||||
lines[i] = set_value(lines[i], "sha1", sha1); done.add("sha1")
|
||||
missing = {"version", "vcsRevision", "sha1"} - done
|
||||
if missing:
|
||||
fail("could not locate field(s) %s in lock block for %s" % (sorted(missing), url))
|
||||
return "".join(lines)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
def main():
|
||||
for p in (NIMBLE_FILE, LOCK_FILE):
|
||||
if not os.path.isfile(p):
|
||||
fail("%s not found" % p)
|
||||
|
||||
changed = parse_changed(BASE)
|
||||
if not changed:
|
||||
print("No changed git-URL `requires` in waku.nimble vs %s — nothing to sync." % BASE)
|
||||
return 0
|
||||
|
||||
lock = json.load(open(LOCK_FILE))
|
||||
by_url = {}
|
||||
for name, e in lock.get("packages", {}).items():
|
||||
if e.get("url"):
|
||||
by_url[norm_url(e["url"])] = (name, e)
|
||||
|
||||
drift = [] # (url, rev, name_or_None, cur_version_or_None)
|
||||
for url, rev in changed:
|
||||
hit = by_url.get(norm_url(url))
|
||||
want = "#" + rev
|
||||
if hit is None:
|
||||
drift.append((url, rev, None, None))
|
||||
elif hit[1].get("version") != want:
|
||||
drift.append((url, rev, hit[0], hit[1].get("version")))
|
||||
|
||||
if not drift:
|
||||
print("nimble.lock already in sync with waku.nimble (%d changed pin(s) checked)." % len(changed))
|
||||
return 0
|
||||
|
||||
print("Dependency drift (waku.nimble vs nimble.lock):")
|
||||
for url, rev, name, cur in drift:
|
||||
tag = name or "(missing)"
|
||||
print(" ~ %s [%s]\n waku.nimble: #%s\n nimble.lock: %s" % (url, tag, rev, cur))
|
||||
|
||||
if not APPLY:
|
||||
print("\nRun with --apply to update nimble.lock (computes checksum itself; no `nimble lock`).")
|
||||
return 1
|
||||
|
||||
print("\nApplying (computing checksums; not running `nimble lock`)...")
|
||||
text = open(LOCK_FILE).read()
|
||||
updated = []
|
||||
tmproot = tempfile.mkdtemp(prefix="sync-nimble-lock.")
|
||||
try:
|
||||
for url, rev, name, _cur in drift:
|
||||
if name is None:
|
||||
fail("%s has no entry in nimble.lock; this script updates existing "
|
||||
"entries only (add new deps with a normal nimble install first)." % url)
|
||||
sub = os.path.join(tmproot, re.sub(r"\W+", "_", norm_url(url)))
|
||||
os.makedirs(sub, exist_ok=True)
|
||||
checkout, cleanup = get_checkout(url, rev, sub)
|
||||
try:
|
||||
vcs_rev = git(["-C", checkout, "rev-parse", "HEAD"]).stdout.strip()
|
||||
sha1 = compute_checksum(checkout)
|
||||
# dependency-drift heads-up
|
||||
cnt = dep_requires_count(checkout)
|
||||
lock_deps = len(by_url[norm_url(url)][1].get("dependencies", []))
|
||||
if cnt is not None and lock_deps and cnt != lock_deps:
|
||||
warn("%s: .nimble has %d `requires` but lock lists %d dependencies; "
|
||||
"review the `dependencies` array manually." % (name, cnt, lock_deps))
|
||||
finally:
|
||||
cleanup()
|
||||
new_text = patch_lock_text(text, url, "#" + rev, vcs_rev, sha1)
|
||||
if new_text is None:
|
||||
fail("could not find lock block for url %s" % url)
|
||||
text = new_text
|
||||
updated.append((name, "#" + rev, vcs_rev, sha1))
|
||||
finally:
|
||||
shutil.rmtree(tmproot, ignore_errors=True)
|
||||
|
||||
with open(LOCK_FILE, "w") as f:
|
||||
f.write(text)
|
||||
|
||||
print("\nUpdated nimble.lock (only these entries; all others untouched):")
|
||||
for name, ver, vcs, sha1 in updated:
|
||||
print(" %-16s version=%s" % (name, ver))
|
||||
print(" %-16s vcsRevision=%s" % ("", vcs))
|
||||
print(" %-16s sha1=%s" % ("", sha1))
|
||||
return 0
|
||||
|
||||
|
||||
sys.exit(main())
|
||||
PYEOF
|
||||
79
waku.nimble
79
waku.nimble
@ -59,19 +59,12 @@ requires "nim >= 2.2.4",
|
||||
"unittest2"
|
||||
|
||||
# Packages not on nimble (use git URLs)
|
||||
requires "https://github.com/logos-messaging/nim-ffi"
|
||||
|
||||
requires "https://github.com/logos-messaging/nim-sds.git#2e9a7683f0e180bf112135fae3a3803eed8490d4"
|
||||
requires "https://github.com/logos-messaging/nim-ffi#v0.1.3"
|
||||
|
||||
# brokers: pinned by URL+commit rather than the bare `brokers >= 2.0.1`
|
||||
# form because the nim-lang/packages registry entry for `brokers` only
|
||||
# carries metadata for the original v0.1.0 publication. Until that
|
||||
# registry entry is refreshed, the local SAT solver enumerates "0.1.0"
|
||||
# as the only available version and cannot satisfy `>= 2.0.1`. The URL
|
||||
# pin below bypasses the registry and locks the exact commit of the
|
||||
# v2.0.1 tag. Revert to the bare form once nim-lang/packages is
|
||||
# updated.
|
||||
requires "https://github.com/NagyZoltanPeter/nim-brokers.git#v2.0.1"
|
||||
requires "https://github.com/logos-messaging/nim-sds.git#abdd40cc645f1b024c3ee99cced7e287c4e4c441"
|
||||
|
||||
requires "https://github.com/NagyZoltanPeter/nim-brokers.git#v3.1.1"
|
||||
|
||||
requires "https://github.com/vacp2p/nim-lsquic"
|
||||
requires "https://github.com/vacp2p/nim-jwt.git#057ec95eb5af0eea9c49bfe9025b3312c95dc5f2"
|
||||
@ -528,3 +521,67 @@ task liblogosdeliveryStaticLinux, "Generate bindings":
|
||||
|
||||
task liblogosdeliveryStaticMac, "Generate bindings":
|
||||
buildLibStaticMac("liblogosdelivery", "liblogosdelivery")
|
||||
|
||||
### Formatting tasks
|
||||
|
||||
task nphchanges, "Run nph on .nim/.nims/.nimble files changed on this branch/PR":
|
||||
## Formats every Nim source file that differs from the base branch.
|
||||
## The set covers committed changes on the branch, working-tree edits
|
||||
## (staged or not) and untracked files. The base branch is auto-detected
|
||||
## (origin's default branch, else local main/master); override it with
|
||||
## the NPH_BASE_BRANCH env var.
|
||||
let nph =
|
||||
if findExe("nph").len > 0: findExe("nph")
|
||||
else: getHomeDir() / ".nimble" / "bin" / "nph"
|
||||
if not fileExists(nph):
|
||||
quit "nph not found. Run `make build-nph` first.", 1
|
||||
|
||||
proc detectBaseBranch(): string =
|
||||
# Explicit override wins.
|
||||
if existsEnv("NPH_BASE_BRANCH"):
|
||||
return getEnv("NPH_BASE_BRANCH")
|
||||
# origin's default branch, e.g. "origin/main" -> "main".
|
||||
let (head, hCode) =
|
||||
gorgeEx("git symbolic-ref --short refs/remotes/origin/HEAD")
|
||||
if hCode == 0 and head.strip().len > 0:
|
||||
let parts = head.strip().split('/')
|
||||
return parts[^1]
|
||||
# Fall back to whichever local branch exists.
|
||||
for candidate in ["main", "master"]:
|
||||
let (_, vCode) =
|
||||
gorgeEx("git rev-parse --verify --quiet " & candidate)
|
||||
if vCode == 0:
|
||||
return candidate
|
||||
return "master"
|
||||
|
||||
let baseBranch = detectBaseBranch()
|
||||
|
||||
# Diff against the merge-base so we only touch what this branch introduced.
|
||||
var diffRef = baseBranch
|
||||
let (mergeBase, mbCode) = gorgeEx("git merge-base HEAD " & baseBranch)
|
||||
if mbCode == 0 and mergeBase.strip().len > 0:
|
||||
diffRef = mergeBase.strip()
|
||||
|
||||
let (changed, dCode) = gorgeEx("git diff --name-only --diff-filter=ACMR " & diffRef)
|
||||
if dCode != 0:
|
||||
quit "git diff failed: " & changed, 1
|
||||
let (untracked, _) = gorgeEx("git ls-files --others --exclude-standard")
|
||||
|
||||
var files: seq[string]
|
||||
for line in (changed & "\n" & untracked).splitLines():
|
||||
let f = line.strip()
|
||||
if f.len == 0:
|
||||
continue
|
||||
if not (f.endsWith(".nim") or f.endsWith(".nims") or f.endsWith(".nimble")):
|
||||
continue
|
||||
if fileExists(f) and f notin files:
|
||||
files.add(f)
|
||||
|
||||
if files.len == 0:
|
||||
echo "nphchanges: no changed .nim/.nims/.nimble files to format"
|
||||
return
|
||||
|
||||
echo "nphchanges: formatting " & $files.len & " file(s) (base: " & baseBranch & ")"
|
||||
for f in files:
|
||||
echo "Formatting " & f
|
||||
exec nph & " \"" & f & "\""
|
||||
|
||||
@ -15,6 +15,7 @@ import
|
||||
../waku_enr,
|
||||
../discovery/waku_discv5,
|
||||
../waku_node,
|
||||
../net/net_config,
|
||||
../node/peer_manager,
|
||||
../common/rate_limit/setting,
|
||||
../common/utils/parse_size_units
|
||||
|
||||
@ -17,6 +17,7 @@ import
|
||||
./validator_signed,
|
||||
../waku_enr/sharding,
|
||||
../waku_node,
|
||||
../net/net_config,
|
||||
../waku_core,
|
||||
../waku_core/codecs,
|
||||
../waku_rln_relay,
|
||||
|
||||
@ -34,6 +34,7 @@ import
|
||||
common/logging,
|
||||
node/peer_manager,
|
||||
node/health_monitor,
|
||||
net/net_config,
|
||||
node/waku_metrics,
|
||||
node/subscription_manager,
|
||||
rest_api/message_cache,
|
||||
|
||||
13
waku/node/edge_filter_sub_state.nim
Normal file
13
waku/node/edge_filter_sub_state.nim
Normal file
@ -0,0 +1,13 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/sets
|
||||
import chronos, libp2p/peerid
|
||||
import ../waku_core, ./health_monitor/topic_health
|
||||
|
||||
type EdgeFilterSubState* = object
|
||||
peers*: seq[RemotePeerInfo]
|
||||
pending*: seq[Future[void]]
|
||||
pendingPeers*: HashSet[PeerId]
|
||||
currentHealth*: TopicHealth
|
||||
|
||||
{.pop.}
|
||||
@ -16,7 +16,7 @@ import
|
||||
node/waku_node,
|
||||
node/node_telemetry,
|
||||
node/peer_manager,
|
||||
node/kernel_api,
|
||||
node/waku_node/ping,
|
||||
node/health_monitor/online_monitor,
|
||||
node/health_monitor/health_status,
|
||||
node/health_monitor/health_report,
|
||||
|
||||
@ -1,9 +0,0 @@
|
||||
import
|
||||
./kernel_api/filter as filter_api,
|
||||
./kernel_api/lightpush as lightpush_api,
|
||||
./kernel_api/store as store_api,
|
||||
./kernel_api/relay as relay_api,
|
||||
./kernel_api/peer_exchange as peer_exchange_api,
|
||||
./kernel_api/ping as ping_api
|
||||
|
||||
export filter_api, lightpush_api, store_api, relay_api, peer_exchange_api, ping_api
|
||||
@ -1,116 +0,0 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[options, tables, sets],
|
||||
chronos,
|
||||
results,
|
||||
eth/keys,
|
||||
bearssl/rand,
|
||||
eth/p2p/discoveryv5/enr,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/[multiaddress, multicodec],
|
||||
libp2p/protocols/ping,
|
||||
libp2p/protocols/mix/mix_protocol,
|
||||
brokers/broker_context
|
||||
|
||||
import
|
||||
waku/[
|
||||
waku_core,
|
||||
waku_relay,
|
||||
waku_archive,
|
||||
waku_store/protocol as store,
|
||||
waku_store/client as store_client,
|
||||
waku_store/resume,
|
||||
waku_store_sync,
|
||||
waku_filter_v2,
|
||||
waku_filter_v2/client as filter_client,
|
||||
waku_metadata,
|
||||
waku_rendezvous/protocol,
|
||||
waku_rendezvous/client as rendezvous_client,
|
||||
waku_lightpush_legacy/client as legacy_lightpush_client,
|
||||
waku_lightpush_legacy as legacy_lightpush_protocol,
|
||||
waku_lightpush/client as lightpush_client,
|
||||
waku_lightpush as lightpush_protocol,
|
||||
waku_peer_exchange,
|
||||
waku_rln_relay,
|
||||
waku_mix,
|
||||
common/rate_limit/setting,
|
||||
discovery/waku_kademlia,
|
||||
net/bound_ports,
|
||||
events/peer_events,
|
||||
],
|
||||
./peer_manager,
|
||||
./health_monitor/topic_health
|
||||
|
||||
# key and crypto modules different
|
||||
type
|
||||
# TODO: Move to application instance (e.g., `WakuNode2`)
|
||||
WakuInfo* = object # NOTE One for simplicity, can extend later as needed
|
||||
listenAddresses*: seq[string]
|
||||
enrUri*: string #multiaddrStrings*: seq[string]
|
||||
mixPubKey*: Option[string]
|
||||
|
||||
# NOTE based on Eth2Node in NBC eth2_network.nim
|
||||
WakuNode* = ref object
|
||||
peerManager*: PeerManager
|
||||
switch*: Switch
|
||||
wakuRelay*: WakuRelay
|
||||
wakuArchive*: waku_archive.WakuArchive
|
||||
wakuStore*: store.WakuStore
|
||||
wakuStoreClient*: store_client.WakuStoreClient
|
||||
wakuStoreResume*: StoreResume
|
||||
wakuStoreReconciliation*: SyncReconciliation
|
||||
wakuStoreTransfer*: SyncTransfer
|
||||
wakuFilter*: waku_filter_v2.WakuFilter
|
||||
wakuFilterClient*: filter_client.WakuFilterClient
|
||||
wakuRlnRelay*: WakuRLNRelay
|
||||
wakuLegacyLightPush*: WakuLegacyLightPush
|
||||
wakuLegacyLightpushClient*: WakuLegacyLightPushClient
|
||||
wakuLightPush*: WakuLightPush
|
||||
wakuLightpushClient*: WakuLightPushClient
|
||||
wakuPeerExchange*: WakuPeerExchange
|
||||
wakuPeerExchangeClient*: WakuPeerExchangeClient
|
||||
wakuMetadata*: WakuMetadata
|
||||
wakuAutoSharding*: Option[Sharding]
|
||||
enr*: enr.Record
|
||||
libp2pPing*: Ping
|
||||
rng*: ref rand.HmacDrbgContext
|
||||
brokerCtx*: BrokerContext
|
||||
wakuRendezvous*: WakuRendezVous
|
||||
wakuRendezvousClient*: rendezvous_client.WakuRendezVousClient
|
||||
announcedAddresses*: seq[MultiAddress]
|
||||
extMultiAddrsOnly*: bool # When true, skip automatic IP address replacement
|
||||
started*: bool # Indicates that node has started listening
|
||||
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
|
||||
rateLimitSettings*: ProtocolRateLimitSettings
|
||||
legacyAppHandlers*: Table[PubsubTopic, WakuRelayHandler]
|
||||
## Kernel API Relay appHandlers (if any)
|
||||
subscriptionManager*: SubscriptionManager
|
||||
wakuMix*: WakuMix
|
||||
kademliaDiscoveryLoop*: Future[void]
|
||||
wakuKademlia*: WakuKademlia
|
||||
ports*: BoundPorts
|
||||
|
||||
ShardSubscription* = object
|
||||
contentTopics*: HashSet[ContentTopic]
|
||||
directShardSub*: bool
|
||||
## shard subscribed directly (PubsubSub), independent of content-topic interest
|
||||
|
||||
EdgeFilterSubState* = object
|
||||
peers*: seq[RemotePeerInfo]
|
||||
pending*: seq[Future[void]]
|
||||
pendingPeers*: HashSet[PeerId]
|
||||
currentHealth*: TopicHealth
|
||||
|
||||
SubscriptionManager* = ref object of RootObj
|
||||
node*: WakuNode
|
||||
shards*: Table[PubsubTopic, ShardSubscription]
|
||||
edgeFilterSubStates*: Table[PubsubTopic, EdgeFilterSubState]
|
||||
edgeFilterWakeup*: AsyncEvent
|
||||
edgeFilterSubLoopFut*: Future[void]
|
||||
edgeFilterConnectionLoopFut*: Future[void]
|
||||
peerEventListener*: WakuPeerEventListener
|
||||
ownsEdgeShardHealthProvider*: bool
|
||||
ownsEdgeFilterPeerCountProvider*: bool
|
||||
|
||||
{.pop.}
|
||||
11
waku/node/shard_subscription.nim
Normal file
11
waku/node/shard_subscription.nim
Normal file
@ -0,0 +1,11 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/sets
|
||||
import ../waku_core
|
||||
|
||||
type ShardSubscription* = object
|
||||
contentTopics*: HashSet[ContentTopic]
|
||||
directShardSub*: bool
|
||||
## shard subscribed directly (PubsubSub), independent of content-topic interest
|
||||
|
||||
{.pop.}
|
||||
@ -6,7 +6,7 @@ import
|
||||
waku/[
|
||||
waku_core,
|
||||
waku_core/topics/sharding,
|
||||
node/node_types,
|
||||
node/waku_node,
|
||||
node/node_telemetry,
|
||||
waku_relay,
|
||||
waku_archive,
|
||||
@ -35,8 +35,8 @@ proc registerRelayHandler(
|
||||
if not alreadySubscribed or not node.legacyAppHandlers.hasKey(shard):
|
||||
node.legacyAppHandlers[shard] = appHandler
|
||||
else:
|
||||
debug "Legacy appHandler already exists for active PubsubTopic, ignoring new handler",
|
||||
topic = shard
|
||||
debug "Legacy appHandler already exists for active shard, ignoring new handler",
|
||||
shard
|
||||
|
||||
if alreadySubscribed:
|
||||
return false
|
||||
@ -301,7 +301,7 @@ const EdgeFilterLoopInterval = chronos.seconds(30)
|
||||
const EdgeFilterSubLoopDebounce = chronos.seconds(1)
|
||||
## Debounce delay to coalesce rapid-fire wakeups into a single reconciliation pass.
|
||||
|
||||
type EdgeDialTask = object
|
||||
type EdgeFilterSubscribeTask = object
|
||||
peer: RemotePeerInfo
|
||||
shard: PubsubTopic
|
||||
topics: seq[ContentTopic]
|
||||
@ -317,7 +317,7 @@ proc updateShardHealth(
|
||||
|
||||
proc removePeer(self: SubscriptionManager, shard: PubsubTopic, peerId: PeerId) =
|
||||
## Remove a peer from edgeFilterSubStates for the given shard,
|
||||
## update health, and wake the sub loop to dial a replacement.
|
||||
## update health, and wake the sub loop to filter-subscribe a replacement.
|
||||
## Best-effort unsubscribe so the service peer stops pushing to us.
|
||||
self.edgeFilterSubStates.withValue(shard, state):
|
||||
var idx = -1
|
||||
@ -394,13 +394,14 @@ proc syncFilterDeltas(
|
||||
if removed.len > 0:
|
||||
discard await self.sendChunkedFilterRpc(peer, shard, removed, FilterUnsubscribe)
|
||||
|
||||
proc dialFilterPeer(
|
||||
proc subscribeFilterPeer(
|
||||
self: SubscriptionManager,
|
||||
peer: RemotePeerInfo,
|
||||
shard: PubsubTopic,
|
||||
contentTopics: seq[ContentTopic],
|
||||
) {.async.} =
|
||||
## Subscribe a new peer to all content topics on a shard and start tracking it.
|
||||
## Filter-subscribe to a service peer for all content topics on a shard and
|
||||
## start tracking it (note that the filter client dials the peer if not connected).
|
||||
self.edgeFilterSubStates.withValue(shard, state):
|
||||
state.pendingPeers.incl(peer.peerId)
|
||||
|
||||
@ -410,16 +411,16 @@ proc dialFilterPeer(
|
||||
|
||||
self.edgeFilterSubStates.withValue(shard, state):
|
||||
if state.peers.anyIt(it.peerId == peer.peerId):
|
||||
trace "dialFilterPeer: peer already tracked, skipping duplicate",
|
||||
trace "subscribeFilterPeer: peer already tracked, skipping duplicate",
|
||||
shard = shard, peer = peer.peerId
|
||||
return
|
||||
|
||||
state.peers.add(peer)
|
||||
self.updateShardHealth(shard, state[])
|
||||
trace "dialFilterPeer: successfully subscribed to all chunks",
|
||||
trace "subscribeFilterPeer: successfully subscribed to all chunks",
|
||||
shard = shard, peer = peer.peerId, totalPeers = state.peers.len
|
||||
do:
|
||||
trace "dialFilterPeer: shard removed while subscribing, discarding result",
|
||||
trace "subscribeFilterPeer: shard removed while subscribing, discarding result",
|
||||
shard = shard, peer = peer.peerId
|
||||
finally:
|
||||
self.edgeFilterSubStates.withValue(shard, state):
|
||||
@ -480,7 +481,7 @@ proc selectFilterCandidates(
|
||||
filter_common.WakuFilterSubscribeCodec, some(shard)
|
||||
)
|
||||
|
||||
# Remove all already used in this shard or being dialed for it
|
||||
# Remove all already used in this shard or being filter-subscribed for it
|
||||
allCandidates.keepItIf(it.peerId notin exclude)
|
||||
|
||||
# Collect peer IDs already tracked on other shards
|
||||
@ -526,9 +527,9 @@ proc edgeFilterSubLoop(self: SubscriptionManager) {.async.} =
|
||||
trace "edgeFilterSubLoop: desired state", numShards = newSynced.len
|
||||
|
||||
# Step 1: read state across all shards at once and
|
||||
# create a list of peer dial tasks and shard tracking to delete.
|
||||
# create a list of peer filter-subscribe tasks and shard tracking to delete.
|
||||
|
||||
var dialTasks: seq[EdgeDialTask]
|
||||
var subscribeTasks: seq[EdgeFilterSubscribeTask]
|
||||
var shardsToDelete: seq[PubsubTopic]
|
||||
|
||||
for shard in allShards:
|
||||
@ -579,7 +580,7 @@ proc edgeFilterSubLoop(self: SubscriptionManager) {.async.} =
|
||||
for p in state.pendingPeers:
|
||||
tracked.incl(p)
|
||||
let candidates = self.selectFilterCandidates(shard, tracked, needed)
|
||||
let toDial = min(needed, candidates.len)
|
||||
let toSubscribe = min(needed, candidates.len)
|
||||
|
||||
trace "edgeFilterSubLoop: shard reconciliation",
|
||||
shard = shard,
|
||||
@ -587,18 +588,20 @@ proc edgeFilterSubLoop(self: SubscriptionManager) {.async.} =
|
||||
num_pending = state.pending.len,
|
||||
num_needed = needed,
|
||||
num_available = candidates.len,
|
||||
toDial = toDial
|
||||
toSubscribe = toSubscribe
|
||||
|
||||
var dialTopics: seq[ContentTopic]
|
||||
var subscribeTopics: seq[ContentTopic]
|
||||
newSynced.withValue(shard, curr):
|
||||
dialTopics = toSeq(curr[])
|
||||
subscribeTopics = toSeq(curr[])
|
||||
|
||||
for i in 0 ..< toDial:
|
||||
dialTasks.add(
|
||||
EdgeDialTask(peer: candidates[i], shard: shard, topics: dialTopics)
|
||||
for i in 0 ..< toSubscribe:
|
||||
subscribeTasks.add(
|
||||
EdgeFilterSubscribeTask(
|
||||
peer: candidates[i], shard: shard, topics: subscribeTopics
|
||||
)
|
||||
)
|
||||
|
||||
# Step 2: execute deferred shard tracking deletion and dial tasks.
|
||||
# Step 2: execute deferred shard tracking deletion and filter-subscribe tasks.
|
||||
|
||||
for shard in shardsToDelete:
|
||||
self.edgeFilterSubStates.withValue(shard, state):
|
||||
@ -607,8 +610,8 @@ proc edgeFilterSubLoop(self: SubscriptionManager) {.async.} =
|
||||
await fut.cancelAndWait()
|
||||
self.edgeFilterSubStates.del(shard)
|
||||
|
||||
for task in dialTasks:
|
||||
let fut = self.dialFilterPeer(task.peer, task.shard, task.topics)
|
||||
for task in subscribeTasks:
|
||||
let fut = self.subscribeFilterPeer(task.peer, task.shard, task.topics)
|
||||
self.edgeFilterSubStates.withValue(task.shard, state):
|
||||
state.pending.add(fut)
|
||||
|
||||
|
||||
@ -67,7 +67,11 @@ import
|
||||
./peer_manager,
|
||||
./health_monitor/health_status,
|
||||
./health_monitor/topic_health,
|
||||
./node_telemetry
|
||||
./node_telemetry,
|
||||
./shard_subscription,
|
||||
./edge_filter_sub_state
|
||||
|
||||
export shard_subscription, edge_filter_sub_state
|
||||
|
||||
logScope:
|
||||
topics = "waku node"
|
||||
@ -85,8 +89,64 @@ const clientId* = "Nimbus Waku v2 node"
|
||||
|
||||
const WakuNodeVersionString* = "version / git commit hash: " & git_version
|
||||
|
||||
import ./node_types
|
||||
export node_types
|
||||
type
|
||||
# TODO: Move to application instance (e.g., `WakuNode2`)
|
||||
WakuInfo* = object # NOTE One for simplicity, can extend later as needed
|
||||
listenAddresses*: seq[string]
|
||||
enrUri*: string #multiaddrStrings*: seq[string]
|
||||
mixPubKey*: Option[string]
|
||||
|
||||
# NOTE based on Eth2Node in NBC eth2_network.nim
|
||||
WakuNode* = ref object
|
||||
peerManager*: PeerManager
|
||||
switch*: Switch
|
||||
wakuRelay*: WakuRelay
|
||||
wakuArchive*: waku_archive.WakuArchive
|
||||
wakuStore*: store.WakuStore
|
||||
wakuStoreClient*: store_client.WakuStoreClient
|
||||
wakuStoreResume*: StoreResume
|
||||
wakuStoreReconciliation*: SyncReconciliation
|
||||
wakuStoreTransfer*: SyncTransfer
|
||||
wakuFilter*: waku_filter_v2.WakuFilter
|
||||
wakuFilterClient*: filter_client.WakuFilterClient
|
||||
wakuRlnRelay*: WakuRLNRelay
|
||||
wakuLegacyLightPush*: WakuLegacyLightPush
|
||||
wakuLegacyLightpushClient*: WakuLegacyLightPushClient
|
||||
wakuLightPush*: WakuLightPush
|
||||
wakuLightpushClient*: WakuLightPushClient
|
||||
wakuPeerExchange*: WakuPeerExchange
|
||||
wakuPeerExchangeClient*: WakuPeerExchangeClient
|
||||
wakuMetadata*: WakuMetadata
|
||||
wakuAutoSharding*: Option[Sharding]
|
||||
enr*: enr.Record
|
||||
libp2pPing*: Ping
|
||||
rng*: ref rand.HmacDrbgContext
|
||||
brokerCtx*: BrokerContext
|
||||
wakuRendezvous*: WakuRendezVous
|
||||
wakuRendezvousClient*: rendezvous_client.WakuRendezVousClient
|
||||
announcedAddresses*: seq[MultiAddress]
|
||||
extMultiAddrsOnly*: bool # When true, skip automatic IP address replacement
|
||||
started*: bool # Indicates that node has started listening
|
||||
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
|
||||
rateLimitSettings*: ProtocolRateLimitSettings
|
||||
legacyAppHandlers*: Table[PubsubTopic, WakuRelayHandler]
|
||||
## Kernel API Relay appHandlers (if any)
|
||||
subscriptionManager*: SubscriptionManager
|
||||
wakuMix*: WakuMix
|
||||
kademliaDiscoveryLoop*: Future[void]
|
||||
wakuKademlia*: WakuKademlia
|
||||
ports*: BoundPorts
|
||||
|
||||
SubscriptionManager* = ref object of RootObj
|
||||
node*: WakuNode
|
||||
shards*: Table[PubsubTopic, ShardSubscription]
|
||||
edgeFilterSubStates*: Table[PubsubTopic, EdgeFilterSubState]
|
||||
edgeFilterWakeup*: AsyncEvent
|
||||
edgeFilterSubLoopFut*: Future[void]
|
||||
edgeFilterConnectionLoopFut*: Future[void]
|
||||
peerEventListener*: WakuPeerEventListener
|
||||
ownsEdgeShardHealthProvider*: bool
|
||||
ownsEdgeFilterPeerCountProvider*: bool
|
||||
|
||||
import ./subscription_manager
|
||||
|
||||
|
||||
@ -68,7 +68,7 @@ proc mtMarshalValue*(
|
||||
of txPut:
|
||||
if not mtMarshalValue(buf, cap, value.payload, pos):
|
||||
return false
|
||||
of txDelete:
|
||||
of txDelete, txDeletePrefix:
|
||||
discard
|
||||
return true
|
||||
|
||||
@ -93,6 +93,8 @@ proc mtUnmarshalValue*(
|
||||
value = TxOp(category: category, key: key, kind: txPut, payload: payload)
|
||||
of txDelete:
|
||||
value = TxOp(category: category, key: key, kind: txDelete)
|
||||
of txDeletePrefix:
|
||||
value = TxOp(category: category, key: key, kind: txDeletePrefix)
|
||||
return true
|
||||
|
||||
EventBroker(mt):
|
||||
|
||||
@ -7,7 +7,7 @@
|
||||
import std/options
|
||||
import results, sqlite3_abi
|
||||
import ../common/databases/[common, db_sqlite]
|
||||
import ./[types, schema]
|
||||
import ./[types, keys, schema]
|
||||
|
||||
type
|
||||
KvBackend* = ref object
|
||||
@ -121,6 +121,37 @@ proc close*(b: KvBackend) =
|
||||
b.db.close()
|
||||
b.db = nil
|
||||
|
||||
proc deletePrefix(
|
||||
b: KvBackend, category: string, prefix: Key
|
||||
): Result[void, PersistencyError] =
|
||||
let rng = prefixRange(prefix)
|
||||
let openEnded = bytes(rng.stop).len == 0
|
||||
let sql =
|
||||
if openEnded:
|
||||
"DELETE FROM kv WHERE category = ? AND key >= ?;"
|
||||
else:
|
||||
"DELETE FROM kv WHERE category = ? AND key >= ? AND key < ?;"
|
||||
var s: ptr sqlite3_stmt
|
||||
let rc = sqlite3_prepare_v2(b.db.env, sql.cstring, sql.len.cint, addr s, nil)
|
||||
if rc != SQLITE_OK:
|
||||
return err(toErr("deletePrefix prepare: " & $sqlite3_errstr(rc)))
|
||||
defer:
|
||||
discard sqlite3_finalize(s)
|
||||
var bc = bindBlob(s, 1.cint, catBytes(category))
|
||||
if bc != SQLITE_OK:
|
||||
return err(toErr("deletePrefix bind cat: " & $sqlite3_errstr(bc)))
|
||||
bc = bindBlob(s, 2.cint, keyBytes(rng.start))
|
||||
if bc != SQLITE_OK:
|
||||
return err(toErr("deletePrefix bind start: " & $sqlite3_errstr(bc)))
|
||||
if not openEnded:
|
||||
bc = bindBlob(s, 3.cint, keyBytes(rng.stop))
|
||||
if bc != SQLITE_OK:
|
||||
return err(toErr("deletePrefix bind stop: " & $sqlite3_errstr(bc)))
|
||||
let v = sqlite3_step(s)
|
||||
if v != SQLITE_DONE:
|
||||
return err(toErr("deletePrefix step: " & $sqlite3_errstr(v)))
|
||||
return ok()
|
||||
|
||||
proc applyOne(b: KvBackend, op: TxOp): Result[void, PersistencyError] =
|
||||
case op.kind
|
||||
of txPut:
|
||||
@ -131,6 +162,8 @@ proc applyOne(b: KvBackend, op: TxOp): Result[void, PersistencyError] =
|
||||
let r = b.deleteStmt.exec((catBytes(op.category), keyBytes(op.key)))
|
||||
if r.isErr:
|
||||
return err(toErr("delete failed: " & r.error))
|
||||
of txDeletePrefix:
|
||||
?b.deletePrefix(op.category, op.key)
|
||||
return ok()
|
||||
|
||||
proc execSql(b: KvBackend, sql: string): Result[void, PersistencyError] =
|
||||
|
||||
@ -284,6 +284,11 @@ proc persistPut*(
|
||||
proc persistDelete*(t: Job, category: string, key: Key): Future[void] {.async.} =
|
||||
await persist(t, TxOp(category: category, key: key, kind: txDelete))
|
||||
|
||||
proc persistDeletePrefix*(
|
||||
t: Job, category: string, prefix: Key
|
||||
): Future[void] {.async.} =
|
||||
await persist(t, TxOp(category: category, key: prefix, kind: txDeletePrefix))
|
||||
|
||||
proc persistEncoded*[T](
|
||||
t: Job, category: string, key: Key, value: T
|
||||
): Future[void] {.async.} =
|
||||
@ -335,6 +340,13 @@ proc persistDelete*(
|
||||
if not j.isNil():
|
||||
await j.persistDelete(category, key)
|
||||
|
||||
proc persistDeletePrefix*(
|
||||
p: Persistency, jobId: string, category: string, prefix: Key
|
||||
): Future[void] {.async.} =
|
||||
let j = p.jobOrWarn(jobId)
|
||||
if not j.isNil():
|
||||
await j.persistDeletePrefix(category, prefix)
|
||||
|
||||
proc persistEncoded*[T](
|
||||
p: Persistency, jobId: string, category: string, key: Key, value: T
|
||||
): Future[void] {.async.} =
|
||||
|
||||
176
waku/persistency/sds_persistency.nim
Normal file
176
waku/persistency/sds_persistency.nim
Normal file
@ -0,0 +1,176 @@
|
||||
## Adapter that materialises the SDS `Persistence` contract (nim-sds 0.3.0,
|
||||
## snapshot model) on top of a waku-persistency `Job`. One `Job` (== one
|
||||
## SQLite file, one worker thread) services all channels for a given SDS
|
||||
## context; rows are namespaced by category and the channelId is the first
|
||||
## key component so per-channel prefix scans stay cheap.
|
||||
##
|
||||
## ## Snapshot contract (nim-sds 0.3.0)
|
||||
##
|
||||
## The fine-grained per-row callbacks of 0.2.4 are gone. SDS now persists via
|
||||
## five procs, all `Future[Result[void, string]]` (load returns
|
||||
## `Result[ChannelData, string]`), `{.async: (raises: []), gcsafe.}`:
|
||||
##
|
||||
## * **`saveChannelMeta`** — the complete fast-changing per-channel state
|
||||
## (lamport clock, outgoing/incoming buffers, both SDS-R repair buffers)
|
||||
## as ONE blob. Idempotent; a missed write self-heals on the next save.
|
||||
## * **`updateHistory`** — append newly-delivered messages / evict the
|
||||
## oldest past the cap, applied as one transactional batch.
|
||||
## * **`loadChannel`** — bootstrap: returns the prior `ChannelData`
|
||||
## (meta + ordered message history) or an empty one. Surfaces errors.
|
||||
## * **`dropChannel`** — wipe all state for a channel. Surfaces errors.
|
||||
##
|
||||
## Failure policy mirrors the interface docs: save/update/hint are non-fatal
|
||||
## (we log and still return the error string); load/drop are durability-intent
|
||||
## and propagate their error to the caller.
|
||||
##
|
||||
## ## Codec
|
||||
##
|
||||
## The blob transform is owned by nim-sds: `ChannelMeta` round-trips through
|
||||
## `sds/snapshot_codec` (protobuf, schema-versioned — refuses unknown
|
||||
## versions), and each persisted `SdsMessage` log row through the SDS wire
|
||||
## codec in `sds/protobuf`. We do not maintain a second codec for these
|
||||
## shapes (the previous `payload_codec`/`BlobCodec` path is retired).
|
||||
##
|
||||
## ## Retrieval hints
|
||||
##
|
||||
## `setRetrievalHint` is intentionally a no-op: persisted hints are never read
|
||||
## back — `loadChannel` returns `ChannelData` (meta + messageHistory) with no
|
||||
## hint field, and `ChannelMeta` carries none. Hints are supplied live via the
|
||||
## `onRetrievalHint` provider, so persisting them would be write-only dead
|
||||
## data. The closure still exists because the field is required by the
|
||||
## `Persistence` object (SDS calls it from `getRecentHistoryEntries`).
|
||||
##
|
||||
## ## Storage layout
|
||||
##
|
||||
## | Category | Key | Value |
|
||||
## |---------------|--------------------------|----------------------------------------|
|
||||
## | `sds.meta` | `key(channelId)` | `ChannelMeta` (snapshot_codec protobuf)|
|
||||
## | `sds.log` | `key(channelId, msgId)` | `SdsMessage` (sds wire protobuf) |
|
||||
##
|
||||
## `messageHistory` is reconstructed in memory by sorting on
|
||||
## `(lamportTimestamp, messageId)` — the same total order SDS uses for
|
||||
## delivery (see sds/sds_utils.nim).
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[algorithm, options]
|
||||
import chronos, chronicles, results
|
||||
import libp2p/protobuf/minprotobuf
|
||||
import ./persistency
|
||||
import ./keys
|
||||
import types/persistence
|
||||
import snapshot_codec
|
||||
import protobuf
|
||||
|
||||
export persistence, persistency
|
||||
|
||||
logScope:
|
||||
topics = "sds-persistency"
|
||||
|
||||
const
|
||||
CatMeta* = "sds.meta"
|
||||
CatLog* = "sds.log"
|
||||
|
||||
# ── Public factory ──────────────────────────────────────────────────────
|
||||
|
||||
proc newSdsPersistence*(job: Job): Persistence {.gcsafe, raises: [].} =
|
||||
## Build an SDS `Persistence` value backed by ``job``. One Job services
|
||||
## all channels — channelId is part of every key.
|
||||
##
|
||||
## The closures capture ``job`` by ref. They must be invoked from a thread
|
||||
## that owns a running chronos loop (the SDS context's worker thread
|
||||
## satisfies this).
|
||||
doAssert not job.isNil, "newSdsPersistence: job is nil"
|
||||
|
||||
# Built field-by-field via assignment rather than an object literal: every
|
||||
# field is an async closure whose body uses `await`/`return` statements,
|
||||
# which cannot be followed by the `,` field separator a `Persistence(..)`
|
||||
# literal would require. Assignments have no separator, so bodies stay plain.
|
||||
var persistence = Persistence()
|
||||
|
||||
persistence.saveChannelMeta = proc(
|
||||
channelId: SdsChannelID, meta: ChannelMeta
|
||||
): Future[Result[void, string]] {.async: (raises: []), gcsafe.} =
|
||||
try:
|
||||
await job.persistPut(CatMeta, toKey(channelId), encode(meta).buffer)
|
||||
return ok()
|
||||
except CatchableError as e:
|
||||
warn "sds-persistency: saveChannelMeta failed", channelId, err = e.msg
|
||||
return err(e.msg)
|
||||
|
||||
persistence.updateHistory = proc(
|
||||
channelId: SdsChannelID, update: HistoryUpdate
|
||||
): Future[Result[void, string]] {.async: (raises: []), gcsafe.} =
|
||||
if update.isEmpty:
|
||||
return ok()
|
||||
# One transactional batch: append rows (txPut) and evictions (txDelete).
|
||||
var ops = newSeq[TxOp]()
|
||||
for m in update.append:
|
||||
ops.add TxOp(
|
||||
category: CatLog,
|
||||
key: key(channelId, m.messageId),
|
||||
kind: txPut,
|
||||
payload: encode(m).buffer,
|
||||
)
|
||||
for id in update.evict:
|
||||
ops.add TxOp(category: CatLog, key: key(channelId, id), kind: txDelete)
|
||||
try:
|
||||
await job.persist(ops)
|
||||
return ok()
|
||||
except CatchableError as e:
|
||||
warn "sds-persistency: updateHistory failed",
|
||||
channelId, appended = update.append.len, evicted = update.evict.len, err = e.msg
|
||||
return err(e.msg)
|
||||
|
||||
persistence.loadChannel = proc(
|
||||
channelId: SdsChannelID
|
||||
): Future[Result[ChannelData, string]] {.async: (raises: []), gcsafe.} =
|
||||
let chanKey = toKey(channelId)
|
||||
var data = ChannelData.init()
|
||||
try:
|
||||
block meta:
|
||||
let opt = (await job.get(CatMeta, chanKey)).valueOr:
|
||||
return err("loadChannel: get meta: " & $error)
|
||||
if opt.isSome:
|
||||
# schema-versioned decode; refuses unknown versions loudly.
|
||||
data.meta = ChannelMeta.decode(opt.get).valueOr:
|
||||
return err("loadChannel: corrupt or unsupported ChannelMeta blob")
|
||||
|
||||
block history:
|
||||
let rows = (await job.scanPrefix(CatLog, chanKey)).valueOr:
|
||||
return err("loadChannel: scan log: " & $error)
|
||||
var msgs = newSeq[SdsMessage]()
|
||||
for row in rows:
|
||||
let m = SdsMessage.decode(row.payload).valueOr:
|
||||
warn "sds-persistency: skipping undecodable log row", channelId
|
||||
continue
|
||||
msgs.add(m)
|
||||
msgs.sort do(a, b: SdsMessage) -> int:
|
||||
result = cmp(a.lamportTimestamp, b.lamportTimestamp)
|
||||
if result == 0:
|
||||
result = cmp(a.messageId, b.messageId)
|
||||
data.messageHistory = msgs
|
||||
|
||||
return ok(data)
|
||||
except CatchableError as e:
|
||||
return err("loadChannel: " & e.msg)
|
||||
|
||||
persistence.dropChannel = proc(
|
||||
channelId: SdsChannelID
|
||||
): Future[Result[void, string]] {.async: (raises: []), gcsafe.} =
|
||||
let chanKey = toKey(channelId)
|
||||
try:
|
||||
await job.persist(
|
||||
@[
|
||||
TxOp(category: CatLog, key: chanKey, kind: txDeletePrefix),
|
||||
TxOp(category: CatMeta, key: chanKey, kind: txDelete),
|
||||
]
|
||||
)
|
||||
return ok()
|
||||
except CatchableError as e:
|
||||
error "sds-persistency: dropChannel failed", channelId, err = e.msg
|
||||
return err(e.msg)
|
||||
|
||||
return persistence
|
||||
|
||||
{.pop.}
|
||||
@ -19,6 +19,7 @@ type
|
||||
TxOpKind* = enum
|
||||
txPut
|
||||
txDelete
|
||||
txDeletePrefix
|
||||
|
||||
TxOp* = object
|
||||
category*: string
|
||||
@ -28,6 +29,8 @@ type
|
||||
payload*: seq[byte]
|
||||
of txDelete:
|
||||
discard
|
||||
of txDeletePrefix:
|
||||
discard
|
||||
|
||||
PersistencyErrorKind* = enum
|
||||
peBackend
|
||||
|
||||
@ -4,6 +4,7 @@ import net, tables
|
||||
import presto
|
||||
import
|
||||
waku/waku_node,
|
||||
waku/node/health_monitor,
|
||||
waku/discovery/waku_discv5,
|
||||
waku/rest_api/message_cache,
|
||||
waku/rest_api/handlers,
|
||||
|
||||
@ -1,7 +1,8 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import chronicles, json_serialization, presto/route
|
||||
import ../../../waku_node, ../responses, ../serdes, ./types
|
||||
import
|
||||
../../../waku_node, ../../../node/health_monitor, ../responses, ../serdes, ./types
|
||||
|
||||
logScope:
|
||||
topics = "waku node rest health_api"
|
||||
|
||||
@ -3,7 +3,7 @@
|
||||
import results
|
||||
import chronicles, json_serialization, json_serialization/std/options
|
||||
import ../serdes
|
||||
import waku/[waku_node, api/types]
|
||||
import waku/[waku_node, api/types, node/health_monitor]
|
||||
|
||||
#### Serialization and deserialization
|
||||
|
||||
|
||||
@ -1,8 +1,13 @@
|
||||
import
|
||||
./net/net_config,
|
||||
./node/waku_switch as switch,
|
||||
./node/waku_node as node,
|
||||
./node/health_monitor as health_monitor,
|
||||
./node/kernel_api as kernel_api
|
||||
./node/waku_node/filter as filter_api,
|
||||
./node/waku_node/lightpush as lightpush_api,
|
||||
./node/waku_node/store as store_api,
|
||||
./node/waku_node/relay as relay_api,
|
||||
./node/waku_node/peer_exchange as peer_exchange_api,
|
||||
./node/waku_node/ping as ping_api
|
||||
|
||||
export net_config, switch, node, health_monitor, kernel_api
|
||||
export
|
||||
switch, node, filter_api, lightpush_api, store_api, relay_api, peer_exchange_api,
|
||||
ping_api
|
||||
|
||||
3
waku/waku_persistency.nim
Normal file
3
waku/waku_persistency.nim
Normal file
@ -0,0 +1,3 @@
|
||||
import waku/persistency/persistency
|
||||
|
||||
export persistency
|
||||
@ -7,6 +7,9 @@ import ../waku_keystore
|
||||
# Acceptable roots for merkle root validation of incoming messages
|
||||
const AcceptableRootWindowSize* = 50
|
||||
|
||||
#Size if RLN contract root cache
|
||||
const RlnContractRootCacheSize* = 5
|
||||
|
||||
# RLN membership key and index files path
|
||||
const RlnCredentialsFilename* = "rlnCredentials.txt"
|
||||
|
||||
|
||||
@ -62,10 +62,10 @@ proc fetchMerkleProofElements*(
|
||||
let response = await sendEthCallWithParams(
|
||||
ethRpc = g.ethRpc.get(),
|
||||
functionSignature = methodSig,
|
||||
params = paddedParam,
|
||||
fromAddress = g.ethRpc.get().defaultAccount,
|
||||
toAddress = fromHex(Address, g.ethContractAddress),
|
||||
chainId = g.chainId,
|
||||
params = paddedParam,
|
||||
)
|
||||
|
||||
return response
|
||||
@ -73,14 +73,32 @@ proc fetchMerkleProofElements*(
|
||||
proc fetchMerkleRoot*(
|
||||
g: OnchainGroupManager
|
||||
): Future[Result[UInt256, string]] {.async.} =
|
||||
let merkleRoot = await sendEthCallWithoutParams(
|
||||
ethRpc = g.ethRpc.get(),
|
||||
functionSignature = "root()",
|
||||
fromAddress = g.ethRpc.get().defaultAccount,
|
||||
toAddress = fromHex(Address, g.ethContractAddress),
|
||||
chainId = g.chainId,
|
||||
)
|
||||
return merkleRoot
|
||||
try:
|
||||
let merkleRoot = await sendEthCallWithoutParams(
|
||||
ethRpc = g.ethRpc.get(),
|
||||
functionSignature = "root()",
|
||||
fromAddress = g.ethRpc.get().defaultAccount,
|
||||
toAddress = fromHex(Address, g.ethContractAddress),
|
||||
chainId = g.chainId,
|
||||
)
|
||||
return merkleRoot
|
||||
except CatchableError:
|
||||
error "Failed to fetch Merkle root", error = getCurrentExceptionMsg()
|
||||
return err("Failed to fetch merkle root: " & getCurrentExceptionMsg())
|
||||
|
||||
proc fetchMerkleRootsCache*(
|
||||
g: OnchainGroupManager
|
||||
): Future[Result[seq[byte], string]] {.async.} =
|
||||
let
|
||||
# using sendEthCallWithParams to get return type of seq[bytes] for getRecentRoots() function which returns an array of bytes32
|
||||
merkleRoots = await sendEthCallWithParams(
|
||||
ethRpc = g.ethRpc.get(),
|
||||
functionSignature = "getRecentRoots()",
|
||||
fromAddress = g.ethRpc.get().defaultAccount,
|
||||
toAddress = fromHex(Address, g.ethContractAddress),
|
||||
chainId = g.chainId,
|
||||
)
|
||||
return merkleRoots
|
||||
|
||||
proc fetchNextFreeIndex*(
|
||||
g: OnchainGroupManager
|
||||
@ -102,10 +120,10 @@ proc fetchMembershipStatus*(
|
||||
await sendEthCallWithParams(
|
||||
ethRpc = g.ethRpc.get(),
|
||||
functionSignature = "isInMembershipSet(uint256)",
|
||||
params = params,
|
||||
fromAddress = g.ethRpc.get().defaultAccount,
|
||||
toAddress = fromHex(Address, g.ethContractAddress),
|
||||
chainId = g.chainId,
|
||||
params = params,
|
||||
)
|
||||
).valueOr:
|
||||
return err("Failed to check membership: " & error)
|
||||
@ -148,14 +166,64 @@ proc updateRoots*(g: OnchainGroupManager): Future[bool] {.async.} =
|
||||
|
||||
return false
|
||||
|
||||
proc updateRecentRoots*(g: OnchainGroupManager): Future[bool] {.async.} =
|
||||
## Fetch recent roots from the contract roots cache and update the validRoots deque, ensuring we maintain a window of unique acceptable roots.
|
||||
## Contract returns array of uint256 roots, newest first, zero-padded to the cache size (e.g. 5).
|
||||
let bytes = (await g.fetchMerkleRootsCache()).valueOr:
|
||||
error "Failed to fetch current Merkle root", error = error
|
||||
return false
|
||||
|
||||
if (bytes.len mod 32) != 0:
|
||||
error "Invalid recent roots payload length", length = bytes.len
|
||||
return false
|
||||
|
||||
let chunkCount = bytes.len div 32
|
||||
if chunkCount != RlnContractRootCacheSize:
|
||||
warn "Unexpected number of recent roots returned; proceeding anyway",
|
||||
count = chunkCount
|
||||
|
||||
# Parse 32-byte chunks (contract returns newest-first) into MerkleNode values,
|
||||
# reversing to oldest-first and skipping zero roots.
|
||||
var newRootsDequeOrder: seq[MerkleNode] = @[]
|
||||
for startIdx in countdown(bytes.len - 32, 0, 32):
|
||||
let u = UInt256.fromBytesBE(bytes.toOpenArray(startIdx, startIdx + 31))
|
||||
if u.isZero:
|
||||
continue
|
||||
newRootsDequeOrder.add(UInt256ToField(u))
|
||||
|
||||
if newRootsDequeOrder.len == 0:
|
||||
debug "no non-zero recent roots to add; skipping update"
|
||||
return false
|
||||
|
||||
# Determine overlap with existing tail so we only append truly new roots
|
||||
let overlap = min(g.validRoots.len, newRootsDequeOrder.len)
|
||||
var matchLen = 0
|
||||
for startIdx in (g.validRoots.len - overlap) ..< g.validRoots.len:
|
||||
if g.validRoots[startIdx] == newRootsDequeOrder[0]:
|
||||
matchLen = g.validRoots.len - startIdx
|
||||
break
|
||||
|
||||
let toAdd = newRootsDequeOrder[matchLen ..< newRootsDequeOrder.len]
|
||||
if toAdd.len == 0:
|
||||
return false
|
||||
|
||||
# Append new roots to the tail; trim happens below if we exceed the window.
|
||||
for root in toAdd:
|
||||
g.validRoots.addLast(root)
|
||||
debug "appended recent roots to list of valid roots", count = toAdd.len, roots = toAdd
|
||||
|
||||
while g.validRoots.len > AcceptableRootWindowSize:
|
||||
discard g.validRoots.popFirst()
|
||||
|
||||
return true
|
||||
|
||||
proc trackRootChanges*(g: OnchainGroupManager): Future[Result[void, string]] {.async.} =
|
||||
?checkInitialized(g)
|
||||
|
||||
const rpcDelay = 5.seconds
|
||||
const rpcDelay = 10.seconds
|
||||
|
||||
while true:
|
||||
await sleepAsync(rpcDelay)
|
||||
let rootUpdated = await g.updateRoots()
|
||||
let rootUpdated = await g.updateRecentRoots()
|
||||
|
||||
if rootUpdated:
|
||||
## The membership set on-chain has changed (some new members have joined or some members have left)
|
||||
@ -174,6 +242,7 @@ proc trackRootChanges*(g: OnchainGroupManager): Future[Result[void, string]] {.a
|
||||
|
||||
let memberCount = cast[int64](nextFreeIndex)
|
||||
waku_rln_number_registered_memberships.set(float64(memberCount))
|
||||
await sleepAsync(rpcDelay)
|
||||
|
||||
method register*(
|
||||
g: OnchainGroupManager, rateCommitment: RateCommitment
|
||||
@ -393,8 +462,11 @@ method generateProof*(
|
||||
external_nullifier: extNullifier,
|
||||
)
|
||||
|
||||
let output = generateRlnProofWithWitness(g.rlnInstance, witness, epoch, rlnIdentifier).valueOr:
|
||||
return err("Failed to generate proof: " & error)
|
||||
waku_rln_proof_generation_duration_seconds.nanosecondTime:
|
||||
let output = generateRlnProofWithWitness(
|
||||
g.rlnInstance, witness, epoch, rlnIdentifier
|
||||
).valueOr:
|
||||
return err("Failed to generate proof: " & error)
|
||||
|
||||
info "Proof generated successfully", proof = output
|
||||
|
||||
|
||||
@ -76,10 +76,10 @@ proc sendEthCallWithoutParams*(
|
||||
proc sendEthCallWithParams*(
|
||||
ethRpc: Web3,
|
||||
functionSignature: string,
|
||||
params: seq[byte],
|
||||
fromAddress: Address,
|
||||
toAddress: Address,
|
||||
chainId: UInt256,
|
||||
params: seq[byte] = @[],
|
||||
): Future[Result[seq[byte], string]] {.async.} =
|
||||
## Workaround for web3 chainId=null issue with parameterized contract calls
|
||||
let functionHash =
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user