Compare commits

..

No commits in common. "master" and "discovery-v0.0.13" have entirely different histories.

62 changed files with 2552 additions and 3814 deletions

View File

@ -104,7 +104,6 @@
"reactjs",
"recid",
"rlnrelay",
"rlnv",
"roadmap",
"sandboxed",
"scanf",
@ -133,9 +132,7 @@
"upgrader",
"vacp",
"varint",
"viem",
"vkey",
"wagmi",
"waku",
"wakuconnect",
"wakunode",

View File

@ -113,44 +113,12 @@ jobs:
node-version: ${{ env.NODE_JS }}
registry-url: "https://registry.npmjs.org"
- uses: pnpm/action-setup@v4
if: ${{ steps.release.outputs.releases_created }}
with:
version: 9
- run: npm install
if: ${{ steps.release.outputs.releases_created }}
- run: npm run build
if: ${{ steps.release.outputs.releases_created }}
- name: Setup Foundry
if: ${{ steps.release.outputs.releases_created }}
uses: foundry-rs/foundry-toolchain@v1
with:
version: nightly
- name: Generate RLN contract ABIs
id: rln-abi
if: ${{ steps.release.outputs.releases_created }}
run: |
npm run setup:contract-abi -w @waku/rln || {
echo "::warning::Failed to generate contract ABIs, marking @waku/rln as private to skip publishing"
cd packages/rln
node -e "const fs = require('fs'); const pkg = JSON.parse(fs.readFileSync('package.json', 'utf8')); pkg.private = true; fs.writeFileSync('package.json', JSON.stringify(pkg, null, 2));"
echo "failed=true" >> $GITHUB_OUTPUT
}
- name: Rebuild with new ABIs
if: ${{ steps.release.outputs.releases_created && steps.rln-abi.outputs.failed != 'true' }}
run: |
npm install -w packages/rln
npm run build -w @waku/rln || {
echo "::warning::Failed to build @waku/rln, marking as private to skip publishing"
cd packages/rln
node -e "const fs = require('fs'); const pkg = JSON.parse(fs.readFileSync('package.json', 'utf8')); pkg.private = true; fs.writeFileSync('package.json', JSON.stringify(pkg, null, 2));"
}
- run: npm run publish
if: ${{ steps.release.outputs.releases_created }}
env:

View File

@ -17,46 +17,16 @@ jobs:
- uses: actions/checkout@v4
with:
repository: waku-org/js-waku
ref: ${{ github.ref }}
- uses: actions/setup-node@v4
with:
node-version: ${{ env.NODE_JS }}
registry-url: "https://registry.npmjs.org"
- uses: pnpm/action-setup@v4
with:
version: 9
- run: npm install
- run: npm run build
- name: Setup Foundry
uses: foundry-rs/foundry-toolchain@v1
with:
version: nightly
- name: Generate RLN contract ABIs
id: rln-abi
run: |
npm run setup:contract-abi -w @waku/rln || {
echo "::warning::Failed to generate contract ABIs, marking @waku/rln as private to skip publishing"
cd packages/rln
node -e "const fs = require('fs'); const pkg = JSON.parse(fs.readFileSync('package.json', 'utf8')); pkg.private = true; fs.writeFileSync('package.json', JSON.stringify(pkg, null, 2));"
echo "failed=true" >> $GITHUB_OUTPUT
}
- name: Rebuild with new ABIs
if: steps.rln-abi.outputs.failed != 'true'
run: |
npm install -w packages/rln
npm run build -w @waku/rln || {
echo "::warning::Failed to build @waku/rln, marking as private to skip publishing"
cd packages/rln
node -e "const fs = require('fs'); const pkg = JSON.parse(fs.readFileSync('package.json', 'utf8')); pkg.private = true; fs.writeFileSync('package.json', JSON.stringify(pkg, null, 2));"
}
- run: npm run publish -- --tag next
env:
NODE_AUTH_TOKEN: ${{ secrets.NPM_JS_WAKU_PUBLISH }}

1
.gitignore vendored
View File

@ -20,4 +20,3 @@ packages/discovery/mock_local_storage
CLAUDE.md
.env
postgres-data/
packages/rln/waku-rlnv2-contract/

5
ci/Jenkinsfile vendored
View File

@ -1,6 +1,3 @@
#!/usr/bin/env groovy
library 'status-jenkins-lib@v1.9.27'
pipeline {
agent {
docker {
@ -59,7 +56,7 @@ pipeline {
steps {
sshagent(credentials: ['status-im-auto-ssh']) {
script {
nix.develop('npm run deploy', pure: false)
nix.develop('npm run deploy', pure: true)
}
}
}

1373
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -61,7 +61,6 @@ export class FilterCore {
}
public async stop(): Promise<void> {
this.streamManager.stop();
try {
await this.libp2p.unhandle(FilterCodecs.PUSH);
} catch (e) {

View File

@ -33,11 +33,6 @@ export class LightPushCore {
this.streamManager = new StreamManager(CODECS.v3, libp2p.components);
}
public stop(): void {
this.streamManager.stop();
this.streamManagerV2.stop();
}
public async send(
encoder: IEncoder,
message: IMessage,

View File

@ -35,10 +35,6 @@ export class StoreCore {
this.streamManager = new StreamManager(StoreCodec, libp2p.components);
}
public stop(): void {
this.streamManager.stop();
}
public get maxTimeLimit(): number {
return MAX_TIME_RANGE;
}
@ -72,11 +68,6 @@ export class StoreCore {
let currentCursor = queryOpts.paginationCursor;
while (true) {
if (queryOpts.abortSignal?.aborted) {
log.info("Store query aborted by signal");
break;
}
const storeQueryRequest = StoreQueryRequest.create({
...queryOpts,
paginationCursor: currentCursor
@ -98,22 +89,13 @@ export class StoreCore {
break;
}
let res;
try {
res = await pipe(
[storeQueryRequest.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
} catch (error) {
if (error instanceof Error && error.name === "AbortError") {
log.info(`Store query aborted for peer ${peerId.toString()}`);
break;
}
throw error;
}
const res = await pipe(
[storeQueryRequest.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
const bytes = new Uint8ArrayList();
res.forEach((chunk) => {
@ -140,11 +122,6 @@ export class StoreCore {
`${storeQueryResponse.messages.length} messages retrieved from store`
);
if (queryOpts.abortSignal?.aborted) {
log.info("Store query aborted by signal before processing messages");
break;
}
const decodedMessages = storeQueryResponse.messages.map((protoMsg) => {
if (!protoMsg.message) {
return Promise.resolve(undefined);

View File

@ -27,10 +27,6 @@ describe("StreamManager", () => {
} as any as Libp2pComponents);
});
afterEach(() => {
sinon.restore();
});
it("should return usable stream attached to connection", async () => {
for (const writeStatus of ["ready", "writing"]) {
const con1 = createMockConnection();

View File

@ -23,15 +23,6 @@ export class StreamManager {
);
}
public stop(): void {
this.libp2p.events.removeEventListener(
"peer:update",
this.handlePeerUpdateStreamPool
);
this.streamPool.clear();
this.ongoingCreation.clear();
}
public async getStream(peerId: PeerId): Promise<Stream | undefined> {
try {
const peerIdStr = peerId.toString();

View File

@ -16,7 +16,6 @@ export interface IRelayAPI {
readonly pubsubTopics: Set<PubsubTopic>;
readonly gossipSub: GossipSub;
start: () => Promise<void>;
stop: () => Promise<void>;
waitForPeers: () => Promise<void>;
getMeshPeers: (topic?: TopicStr) => PeerIdStr[];
}

View File

@ -88,18 +88,11 @@ export type QueryRequestParams = {
* Only use if you know what you are doing.
*/
peerId?: PeerId;
/**
* An optional AbortSignal to cancel the query.
* When the signal is aborted, the query will stop processing and return early.
*/
abortSignal?: AbortSignal;
};
export type IStore = {
readonly multicodec: string;
stop(): void;
createCursor(message: IDecodedMessage): StoreCursor;
queryGenerator: <T extends IDecodedMessage>(
decoders: IDecoder<T>[],

View File

@ -67,10 +67,6 @@ export class Relay implements IRelay {
* Observers under key `""` are always called.
*/
private observers: Map<PubsubTopic, Map<ContentTopic, Set<unknown>>>;
private messageEventHandlers: Map<
PubsubTopic,
(event: CustomEvent<GossipsubMessage>) => void
> = new Map();
public constructor(params: RelayConstructorParams) {
if (!this.isRelayPubsub(params.libp2p.services.pubsub)) {
@ -109,19 +105,6 @@ export class Relay implements IRelay {
this.subscribeToAllTopics();
}
public async stop(): Promise<void> {
for (const pubsubTopic of this.pubsubTopics) {
const handler = this.messageEventHandlers.get(pubsubTopic);
if (handler) {
this.gossipSub.removeEventListener("gossipsub:message", handler);
}
this.gossipSub.topicValidators.delete(pubsubTopic);
this.gossipSub.unsubscribe(pubsubTopic);
}
this.messageEventHandlers.clear();
this.observers.clear();
}
/**
* Wait for at least one peer with the given protocol to be connected and in the gossipsub
* mesh for all pubsubTopics.
@ -316,17 +299,17 @@ export class Relay implements IRelay {
* @override
*/
private gossipSubSubscribe(pubsubTopic: string): void {
const handler = (event: CustomEvent<GossipsubMessage>): void => {
if (event.detail.msg.topic !== pubsubTopic) return;
this.gossipSub.addEventListener(
"gossipsub:message",
(event: CustomEvent<GossipsubMessage>) => {
if (event.detail.msg.topic !== pubsubTopic) return;
this.processIncomingMessage(
event.detail.msg.topic,
event.detail.msg.data
).catch((e) => log.error("Failed to process incoming message", e));
};
this.messageEventHandlers.set(pubsubTopic, handler);
this.gossipSub.addEventListener("gossipsub:message", handler);
this.processIncomingMessage(
event.detail.msg.topic,
event.detail.msg.data
).catch((e) => log.error("Failed to process incoming message", e));
}
);
this.gossipSub.topicValidators.set(pubsubTopic, messageValidator);
this.gossipSub.subscribe(pubsubTopic);

View File

@ -3,10 +3,5 @@ module.exports = {
tsconfigRootDir: __dirname,
project: "./tsconfig.dev.json"
},
ignorePatterns: ["src/resources/**/*"],
overrides: [
{
files: ["*.config.ts", "*.config.js"]
}
]
ignorePatterns: ["src/resources/**/*"]
};

View File

@ -12,18 +12,6 @@ This package provides RLN functionality for the Waku protocol, enabling rate-lim
npm install @waku/rln
```
## Smart Contract Type Generation
We use `wagmi` to generate TypeScript bindings for interacting with the RLN smart contracts.
When changes are pushed to the `waku-rlnv2-contract` repository, run the following script to fetch and build the latest contracts and generate the TypeScript bindings:
```
npm run setup:contract-abi
```
Note that we commit/bundle the generated typings, so it's not necessary to run this script unless the contracts are updated.
## Usage
```typescript
@ -32,6 +20,11 @@ import { RLN } from '@waku/rln';
// Usage examples coming soon
```
## Constants
- Implementation contract: 0xde2260ca49300357d5af4153cda0d18f7b3ea9b3
- Proxy contract: 0xb9cd878c90e49f797b4431fbf4fb333108cb90e6
## License
MIT OR Apache-2.0
MIT OR Apache-2.0

View File

@ -1,66 +0,0 @@
import { execSync } from "child_process";
import { existsSync, rmSync } from "fs";
import { dirname, join } from "path";
import process from "process";
import { fileURLToPath } from "url";
// Get script directory (equivalent to BASH_SOURCE in bash)
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
const CONTRACT_DIR = join(__dirname, "waku-rlnv2-contract");
const REPO_URL = "https://github.com/waku-org/waku-rlnv2-contract.git";
/**
* Execute a shell command and print output in real-time
* @param {string} command - The command to execute
* @param {object} options - Options for execSync
*/
function exec(command, options = {}) {
execSync(command, {
stdio: "inherit",
cwd: options.cwd || __dirname,
...options
});
}
async function main() {
try {
console.log("📦 Setting up waku-rlnv2-contract...");
// Remove existing directory if it exists
if (existsSync(CONTRACT_DIR)) {
console.log("🗑️ Removing existing waku-rlnv2-contract directory...");
rmSync(CONTRACT_DIR, { recursive: true, force: true });
}
// Clone the repository
console.log("📥 Cloning waku-rlnv2-contract...");
exec(`git clone ${REPO_URL} ${CONTRACT_DIR}`);
// Install dependencies
console.log("📦 Installing dependencies...");
exec("pnpm i", { cwd: CONTRACT_DIR });
// Build contracts with Foundry
console.log("🔨 Building contracts with Foundry...");
exec("forge build", { cwd: CONTRACT_DIR });
// Generate ABIs with wagmi
console.log("⚙️ Generating ABIs with wagmi...");
exec("npx wagmi generate");
console.log("✅ Contract ABIs generated successfully!");
} catch (error) {
console.log(
"❌ Error generating contract ABIs:",
error instanceof Error ? error.message : error
);
process.exit(1);
}
}
main().catch((error) => {
console.log(error);
process.exit(1);
});

View File

@ -43,8 +43,7 @@
"watch:build": "tsc -p tsconfig.json -w",
"watch:test": "mocha --watch",
"prepublish": "npm run build",
"reset-hard": "git clean -dfx -e .idea && git reset --hard && npm i && npm run build",
"setup:contract-abi": "node generate_contract_abi.js"
"reset-hard": "git clean -dfx -e .idea && git reset --hard && npm i && npm run build"
},
"engines": {
"node": ">=22"
@ -55,13 +54,12 @@
"@rollup/plugin-node-resolve": "^15.2.3",
"@types/chai": "^5.0.1",
"@types/chai-spies": "^1.0.6",
"@waku/interfaces": "0.0.34",
"@types/deep-equal-in-any-order": "^1.0.4",
"@types/lodash": "^4.17.15",
"@types/sinon": "^17.0.3",
"@wagmi/cli": "^2.7.0",
"@waku/build-utils": "^1.0.0",
"@waku/interfaces": "0.0.34",
"@waku/message-encryption": "^0.0.37",
"@waku/message-encryption": "^0.0.38",
"deep-equal-in-any-order": "^2.0.6",
"fast-check": "^3.23.2",
"rollup-plugin-copy": "^3.5.0"
@ -78,19 +76,18 @@
],
"dependencies": {
"@chainsafe/bls-keystore": "3.0.0",
"@noble/hashes": "^1.2.0",
"@wagmi/core": "^2.22.1",
"@waku/core": "^0.0.40",
"@waku/utils": "^0.0.27",
"@noble/hashes": "^1.2.0",
"@waku/zerokit-rln-wasm": "^0.2.1",
"ethereum-cryptography": "^3.1.0",
"ethers": "^5.7.2",
"lodash": "^4.17.21",
"uuid": "^11.0.5",
"chai": "^5.1.2",
"chai-as-promised": "^8.0.1",
"chai-spies": "^1.1.0",
"chai-subset": "^1.6.0",
"ethereum-cryptography": "^3.1.0",
"lodash": "^4.17.21",
"sinon": "^19.0.2",
"uuid": "^11.0.5",
"viem": "^2.38.4"
"sinon": "^19.0.2"
}
}

View File

@ -0,0 +1,93 @@
export const PRICE_CALCULATOR_ABI = [
{
inputs: [
{ internalType: "address", name: "_token", type: "address" },
{
internalType: "uint256",
name: "_pricePerMessagePerEpoch",
type: "uint256"
}
],
stateMutability: "nonpayable",
type: "constructor"
},
{ inputs: [], name: "OnlyTokensAllowed", type: "error" },
{
anonymous: false,
inputs: [
{
indexed: true,
internalType: "address",
name: "previousOwner",
type: "address"
},
{
indexed: true,
internalType: "address",
name: "newOwner",
type: "address"
}
],
name: "OwnershipTransferred",
type: "event"
},
{
inputs: [{ internalType: "uint32", name: "_rateLimit", type: "uint32" }],
name: "calculate",
outputs: [
{ internalType: "address", name: "", type: "address" },
{ internalType: "uint256", name: "", type: "uint256" }
],
stateMutability: "view",
type: "function"
},
{
inputs: [],
name: "owner",
outputs: [{ internalType: "address", name: "", type: "address" }],
stateMutability: "view",
type: "function"
},
{
inputs: [],
name: "pricePerMessagePerEpoch",
outputs: [{ internalType: "uint256", name: "", type: "uint256" }],
stateMutability: "view",
type: "function"
},
{
inputs: [],
name: "renounceOwnership",
outputs: [],
stateMutability: "nonpayable",
type: "function"
},
{
inputs: [
{ internalType: "address", name: "_token", type: "address" },
{
internalType: "uint256",
name: "_pricePerMessagePerEpoch",
type: "uint256"
}
],
name: "setTokenAndPrice",
outputs: [],
stateMutability: "nonpayable",
type: "function"
},
{
inputs: [],
name: "token",
outputs: [{ internalType: "address", name: "", type: "address" }],
stateMutability: "view",
type: "function"
},
{
inputs: [{ internalType: "address", name: "newOwner", type: "address" }],
name: "transferOwnership",
outputs: [],
stateMutability: "nonpayable",
type: "function"
}
];

View File

@ -0,0 +1,646 @@
export const RLN_ABI = [
{ inputs: [], stateMutability: "nonpayable", type: "constructor" },
{
inputs: [
{ internalType: "uint256", name: "idCommitment", type: "uint256" }
],
name: "CannotEraseActiveMembership",
type: "error"
},
{ inputs: [], name: "CannotExceedMaxTotalRateLimit", type: "error" },
{
inputs: [
{ internalType: "uint256", name: "idCommitment", type: "uint256" }
],
name: "CannotExtendNonGracePeriodMembership",
type: "error"
},
{
inputs: [
{ internalType: "uint256", name: "idCommitment", type: "uint256" }
],
name: "InvalidIdCommitment",
type: "error"
},
{ inputs: [], name: "InvalidMembershipRateLimit", type: "error" },
{
inputs: [
{ internalType: "uint256", name: "startIndex", type: "uint256" },
{ internalType: "uint256", name: "endIndex", type: "uint256" }
],
name: "InvalidPaginationQuery",
type: "error"
},
{
inputs: [
{ internalType: "uint256", name: "idCommitment", type: "uint256" }
],
name: "MembershipDoesNotExist",
type: "error"
},
{
inputs: [
{ internalType: "uint256", name: "idCommitment", type: "uint256" }
],
name: "NonHolderCannotEraseGracePeriodMembership",
type: "error"
},
{
inputs: [
{ internalType: "uint256", name: "idCommitment", type: "uint256" }
],
name: "NonHolderCannotExtend",
type: "error"
},
{
anonymous: false,
inputs: [
{
indexed: false,
internalType: "address",
name: "previousAdmin",
type: "address"
},
{
indexed: false,
internalType: "address",
name: "newAdmin",
type: "address"
}
],
name: "AdminChanged",
type: "event"
},
{
anonymous: false,
inputs: [
{
indexed: true,
internalType: "address",
name: "beacon",
type: "address"
}
],
name: "BeaconUpgraded",
type: "event"
},
{
anonymous: false,
inputs: [
{ indexed: false, internalType: "uint8", name: "version", type: "uint8" }
],
name: "Initialized",
type: "event"
},
{
anonymous: false,
inputs: [
{
indexed: false,
internalType: "uint256",
name: "idCommitment",
type: "uint256"
},
{
indexed: false,
internalType: "uint32",
name: "membershipRateLimit",
type: "uint32"
},
{ indexed: false, internalType: "uint32", name: "index", type: "uint32" }
],
name: "MembershipErased",
type: "event"
},
{
anonymous: false,
inputs: [
{
indexed: false,
internalType: "uint256",
name: "idCommitment",
type: "uint256"
},
{
indexed: false,
internalType: "uint32",
name: "membershipRateLimit",
type: "uint32"
},
{ indexed: false, internalType: "uint32", name: "index", type: "uint32" }
],
name: "MembershipExpired",
type: "event"
},
{
anonymous: false,
inputs: [
{
indexed: false,
internalType: "uint256",
name: "idCommitment",
type: "uint256"
},
{
indexed: false,
internalType: "uint32",
name: "membershipRateLimit",
type: "uint32"
},
{ indexed: false, internalType: "uint32", name: "index", type: "uint32" },
{
indexed: false,
internalType: "uint256",
name: "newGracePeriodStartTimestamp",
type: "uint256"
}
],
name: "MembershipExtended",
type: "event"
},
{
anonymous: false,
inputs: [
{
indexed: false,
internalType: "uint256",
name: "idCommitment",
type: "uint256"
},
{
indexed: false,
internalType: "uint256",
name: "membershipRateLimit",
type: "uint256"
},
{ indexed: false, internalType: "uint32", name: "index", type: "uint32" }
],
name: "MembershipRegistered",
type: "event"
},
{
anonymous: false,
inputs: [
{
indexed: true,
internalType: "address",
name: "previousOwner",
type: "address"
},
{
indexed: true,
internalType: "address",
name: "newOwner",
type: "address"
}
],
name: "OwnershipTransferred",
type: "event"
},
{
anonymous: false,
inputs: [
{
indexed: true,
internalType: "address",
name: "implementation",
type: "address"
}
],
name: "Upgraded",
type: "event"
},
{
inputs: [],
name: "MAX_MEMBERSHIP_SET_SIZE",
outputs: [{ internalType: "uint32", name: "", type: "uint32" }],
stateMutability: "view",
type: "function"
},
{
inputs: [],
name: "MERKLE_TREE_DEPTH",
outputs: [{ internalType: "uint8", name: "", type: "uint8" }],
stateMutability: "view",
type: "function"
},
{
inputs: [],
name: "Q",
outputs: [{ internalType: "uint256", name: "", type: "uint256" }],
stateMutability: "view",
type: "function"
},
{
inputs: [],
name: "activeDurationForNewMemberships",
outputs: [{ internalType: "uint32", name: "", type: "uint32" }],
stateMutability: "view",
type: "function"
},
{
inputs: [],
name: "currentTotalRateLimit",
outputs: [{ internalType: "uint256", name: "", type: "uint256" }],
stateMutability: "view",
type: "function"
},
{
inputs: [],
name: "deployedBlockNumber",
outputs: [{ internalType: "uint32", name: "", type: "uint32" }],
stateMutability: "view",
type: "function"
},
{
inputs: [
{ internalType: "address", name: "holder", type: "address" },
{ internalType: "address", name: "token", type: "address" }
],
name: "depositsToWithdraw",
outputs: [{ internalType: "uint256", name: "balance", type: "uint256" }],
stateMutability: "view",
type: "function"
},
{
inputs: [
{ internalType: "uint256[]", name: "idCommitments", type: "uint256[]" }
],
name: "eraseMemberships",
outputs: [],
stateMutability: "nonpayable",
type: "function"
},
{
inputs: [
{ internalType: "uint256[]", name: "idCommitments", type: "uint256[]" },
{ internalType: "bool", name: "eraseFromMembershipSet", type: "bool" }
],
name: "eraseMemberships",
outputs: [],
stateMutability: "nonpayable",
type: "function"
},
{
inputs: [
{ internalType: "uint256[]", name: "idCommitments", type: "uint256[]" }
],
name: "extendMemberships",
outputs: [],
stateMutability: "nonpayable",
type: "function"
},
{
inputs: [
{ internalType: "uint256", name: "idCommitment", type: "uint256" }
],
name: "getMembershipInfo",
outputs: [
{ internalType: "uint32", name: "", type: "uint32" },
{ internalType: "uint32", name: "", type: "uint32" },
{ internalType: "uint256", name: "", type: "uint256" }
],
stateMutability: "view",
type: "function"
},
{
inputs: [{ internalType: "uint40", name: "index", type: "uint40" }],
name: "getMerkleProof",
outputs: [{ internalType: "uint256[20]", name: "", type: "uint256[20]" }],
stateMutability: "view",
type: "function"
},
{
inputs: [
{ internalType: "uint32", name: "startIndex", type: "uint32" },
{ internalType: "uint32", name: "endIndex", type: "uint32" }
],
name: "getRateCommitmentsInRangeBoundsInclusive",
outputs: [{ internalType: "uint256[]", name: "", type: "uint256[]" }],
stateMutability: "view",
type: "function"
},
{
inputs: [],
name: "gracePeriodDurationForNewMemberships",
outputs: [{ internalType: "uint32", name: "", type: "uint32" }],
stateMutability: "view",
type: "function"
},
{
inputs: [{ internalType: "uint256", name: "", type: "uint256" }],
name: "indicesOfLazilyErasedMemberships",
outputs: [{ internalType: "uint32", name: "", type: "uint32" }],
stateMutability: "view",
type: "function"
},
{
inputs: [
{ internalType: "address", name: "_priceCalculator", type: "address" },
{ internalType: "uint32", name: "_maxTotalRateLimit", type: "uint32" },
{
internalType: "uint32",
name: "_minMembershipRateLimit",
type: "uint32"
},
{
internalType: "uint32",
name: "_maxMembershipRateLimit",
type: "uint32"
},
{ internalType: "uint32", name: "_activeDuration", type: "uint32" },
{ internalType: "uint32", name: "_gracePeriod", type: "uint32" }
],
name: "initialize",
outputs: [],
stateMutability: "nonpayable",
type: "function"
},
{
inputs: [
{ internalType: "uint256", name: "_idCommitment", type: "uint256" }
],
name: "isExpired",
outputs: [{ internalType: "bool", name: "", type: "bool" }],
stateMutability: "view",
type: "function"
},
{
inputs: [
{ internalType: "uint256", name: "_idCommitment", type: "uint256" }
],
name: "isInGracePeriod",
outputs: [{ internalType: "bool", name: "", type: "bool" }],
stateMutability: "view",
type: "function"
},
{
inputs: [
{ internalType: "uint256", name: "idCommitment", type: "uint256" }
],
name: "isInMembershipSet",
outputs: [{ internalType: "bool", name: "", type: "bool" }],
stateMutability: "view",
type: "function"
},
{
inputs: [
{ internalType: "uint256", name: "idCommitment", type: "uint256" }
],
name: "isValidIdCommitment",
outputs: [{ internalType: "bool", name: "", type: "bool" }],
stateMutability: "pure",
type: "function"
},
{
inputs: [{ internalType: "uint32", name: "rateLimit", type: "uint32" }],
name: "isValidMembershipRateLimit",
outputs: [{ internalType: "bool", name: "", type: "bool" }],
stateMutability: "view",
type: "function"
},
{
inputs: [],
name: "maxMembershipRateLimit",
outputs: [{ internalType: "uint32", name: "", type: "uint32" }],
stateMutability: "view",
type: "function"
},
{
inputs: [],
name: "maxTotalRateLimit",
outputs: [{ internalType: "uint32", name: "", type: "uint32" }],
stateMutability: "view",
type: "function"
},
{
inputs: [
{ internalType: "uint256", name: "_idCommitment", type: "uint256" }
],
name: "membershipExpirationTimestamp",
outputs: [{ internalType: "uint256", name: "", type: "uint256" }],
stateMutability: "view",
type: "function"
},
{
inputs: [
{ internalType: "uint256", name: "idCommitment", type: "uint256" }
],
name: "memberships",
outputs: [
{ internalType: "uint256", name: "depositAmount", type: "uint256" },
{ internalType: "uint32", name: "activeDuration", type: "uint32" },
{
internalType: "uint256",
name: "gracePeriodStartTimestamp",
type: "uint256"
},
{ internalType: "uint32", name: "gracePeriodDuration", type: "uint32" },
{ internalType: "uint32", name: "rateLimit", type: "uint32" },
{ internalType: "uint32", name: "index", type: "uint32" },
{ internalType: "address", name: "holder", type: "address" },
{ internalType: "address", name: "token", type: "address" }
],
stateMutability: "view",
type: "function"
},
{
inputs: [],
name: "merkleTree",
outputs: [
{ internalType: "uint40", name: "maxIndex", type: "uint40" },
{ internalType: "uint40", name: "numberOfLeaves", type: "uint40" }
],
stateMutability: "view",
type: "function"
},
{
inputs: [],
name: "minMembershipRateLimit",
outputs: [{ internalType: "uint32", name: "", type: "uint32" }],
stateMutability: "view",
type: "function"
},
{
inputs: [],
name: "nextFreeIndex",
outputs: [{ internalType: "uint32", name: "", type: "uint32" }],
stateMutability: "view",
type: "function"
},
{
inputs: [],
name: "owner",
outputs: [{ internalType: "address", name: "", type: "address" }],
stateMutability: "view",
type: "function"
},
{
inputs: [],
name: "priceCalculator",
outputs: [
{ internalType: "contract IPriceCalculator", name: "", type: "address" }
],
stateMutability: "view",
type: "function"
},
{
inputs: [],
name: "proxiableUUID",
outputs: [{ internalType: "bytes32", name: "", type: "bytes32" }],
stateMutability: "view",
type: "function"
},
{
inputs: [
{ internalType: "uint256", name: "idCommitment", type: "uint256" },
{ internalType: "uint32", name: "rateLimit", type: "uint32" },
{
internalType: "uint256[]",
name: "idCommitmentsToErase",
type: "uint256[]"
}
],
name: "register",
outputs: [],
stateMutability: "nonpayable",
type: "function"
},
{
inputs: [
{ internalType: "address", name: "owner", type: "address" },
{ internalType: "uint256", name: "deadline", type: "uint256" },
{ internalType: "uint8", name: "v", type: "uint8" },
{ internalType: "bytes32", name: "r", type: "bytes32" },
{ internalType: "bytes32", name: "s", type: "bytes32" },
{ internalType: "uint256", name: "idCommitment", type: "uint256" },
{ internalType: "uint32", name: "rateLimit", type: "uint32" },
{
internalType: "uint256[]",
name: "idCommitmentsToErase",
type: "uint256[]"
}
],
name: "registerWithPermit",
outputs: [],
stateMutability: "nonpayable",
type: "function"
},
{
inputs: [],
name: "renounceOwnership",
outputs: [],
stateMutability: "nonpayable",
type: "function"
},
{
inputs: [],
name: "root",
outputs: [{ internalType: "uint256", name: "", type: "uint256" }],
stateMutability: "view",
type: "function"
},
{
inputs: [
{
internalType: "uint32",
name: "_activeDurationForNewMembership",
type: "uint32"
}
],
name: "setActiveDuration",
outputs: [],
stateMutability: "nonpayable",
type: "function"
},
{
inputs: [
{
internalType: "uint32",
name: "_gracePeriodDurationForNewMembership",
type: "uint32"
}
],
name: "setGracePeriodDuration",
outputs: [],
stateMutability: "nonpayable",
type: "function"
},
{
inputs: [
{
internalType: "uint32",
name: "_maxMembershipRateLimit",
type: "uint32"
}
],
name: "setMaxMembershipRateLimit",
outputs: [],
stateMutability: "nonpayable",
type: "function"
},
{
inputs: [
{ internalType: "uint32", name: "_maxTotalRateLimit", type: "uint32" }
],
name: "setMaxTotalRateLimit",
outputs: [],
stateMutability: "nonpayable",
type: "function"
},
{
inputs: [
{
internalType: "uint32",
name: "_minMembershipRateLimit",
type: "uint32"
}
],
name: "setMinMembershipRateLimit",
outputs: [],
stateMutability: "nonpayable",
type: "function"
},
{
inputs: [
{ internalType: "address", name: "_priceCalculator", type: "address" }
],
name: "setPriceCalculator",
outputs: [],
stateMutability: "nonpayable",
type: "function"
},
{
inputs: [{ internalType: "address", name: "newOwner", type: "address" }],
name: "transferOwnership",
outputs: [],
stateMutability: "nonpayable",
type: "function"
},
{
inputs: [
{ internalType: "address", name: "newImplementation", type: "address" }
],
name: "upgradeTo",
outputs: [],
stateMutability: "nonpayable",
type: "function"
},
{
inputs: [
{ internalType: "address", name: "newImplementation", type: "address" },
{ internalType: "bytes", name: "data", type: "bytes" }
],
name: "upgradeToAndCall",
outputs: [],
stateMutability: "payable",
type: "function"
},
{
inputs: [{ internalType: "address", name: "token", type: "address" }],
name: "withdraw",
outputs: [],
stateMutability: "nonpayable",
type: "function"
}
];

View File

@ -1,15 +1,16 @@
import { linearPriceCalculatorAbi, wakuRlnV2Abi } from "./wagmi/generated.js";
import { PRICE_CALCULATOR_ABI } from "./abi/price_calculator.js";
import { RLN_ABI } from "./abi/rln.js";
export const RLN_CONTRACT = {
chainId: 59141,
address: "0xb9cd878c90e49f797b4431fbf4fb333108cb90e6",
abi: wakuRlnV2Abi
abi: RLN_ABI
};
export const PRICE_CALCULATOR_CONTRACT = {
chainId: 59141,
address: "0xBcfC0660Df69f53ab409F32bb18A3fb625fcE644",
abi: linearPriceCalculatorAbi
abi: PRICE_CALCULATOR_ABI
};
/**

View File

@ -1,39 +1,28 @@
import { expect, use } from "chai";
import chaiAsPromised from "chai-as-promised";
import { ethers } from "ethers";
import sinon from "sinon";
import { RLNBaseContract } from "./rln_base_contract.js";
use(chaiAsPromised);
function createMockRLNBaseContract(
mockContract: any,
mockRpcClient: any
): RLNBaseContract {
function createMockRLNBaseContract(provider: any): RLNBaseContract {
const dummy = Object.create(RLNBaseContract.prototype);
dummy.contract = mockContract;
dummy.rpcClient = mockRpcClient;
dummy.contract = { provider };
return dummy as RLNBaseContract;
}
describe("RLNBaseContract.getPriceForRateLimit (unit)", function () {
let mockContract: any;
let mockRpcClient: any;
let priceCalculatorReadStub: sinon.SinonStub;
let readContractStub: sinon.SinonStub;
let provider: any;
let calculateStub: sinon.SinonStub;
let mockContractFactory: any;
beforeEach(() => {
priceCalculatorReadStub = sinon.stub();
readContractStub = sinon.stub();
mockContract = {
read: {
priceCalculator: priceCalculatorReadStub
}
};
mockRpcClient = {
readContract: readContractStub
provider = {};
calculateStub = sinon.stub();
mockContractFactory = function () {
return { calculate: calculateStub };
};
});
@ -43,53 +32,35 @@ describe("RLNBaseContract.getPriceForRateLimit (unit)", function () {
it("returns token and price for valid calculate", async () => {
const fakeToken = "0x1234567890abcdef1234567890abcdef12345678";
const fakePrice = 42n;
const priceCalculatorAddress = "0xabcdef1234567890abcdef1234567890abcdef12";
priceCalculatorReadStub.resolves(priceCalculatorAddress);
readContractStub.resolves([fakeToken, fakePrice]);
const rlnBase = createMockRLNBaseContract(mockContract, mockRpcClient);
const result = await rlnBase.getPriceForRateLimit(20);
const fakePrice = ethers.BigNumber.from(42);
calculateStub.resolves([fakeToken, fakePrice]);
const rlnBase = createMockRLNBaseContract(provider);
const result = await rlnBase.getPriceForRateLimit(20, mockContractFactory);
expect(result.token).to.equal(fakeToken);
expect(result.price).to.equal(fakePrice);
expect(priceCalculatorReadStub.calledOnce).to.be.true;
expect(readContractStub.calledOnce).to.be.true;
const readContractCall = readContractStub.getCall(0);
expect(readContractCall.args[0]).to.deep.include({
address: priceCalculatorAddress,
functionName: "calculate",
args: [20]
});
expect(result.price).to.not.be.null;
if (result.price) {
expect(result.price.eq(fakePrice)).to.be.true;
}
expect(calculateStub.calledOnceWith(20)).to.be.true;
});
it("throws if calculate throws", async () => {
const priceCalculatorAddress = "0xabcdef1234567890abcdef1234567890abcdef12";
calculateStub.rejects(new Error("fail"));
priceCalculatorReadStub.resolves(priceCalculatorAddress);
readContractStub.rejects(new Error("fail"));
const rlnBase = createMockRLNBaseContract(mockContract, mockRpcClient);
await expect(rlnBase.getPriceForRateLimit(20)).to.be.rejectedWith("fail");
expect(priceCalculatorReadStub.calledOnce).to.be.true;
expect(readContractStub.calledOnce).to.be.true;
const rlnBase = createMockRLNBaseContract(provider);
await expect(
rlnBase.getPriceForRateLimit(20, mockContractFactory)
).to.be.rejectedWith("fail");
expect(calculateStub.calledOnceWith(20)).to.be.true;
});
it("returns null values if calculate returns malformed data", async () => {
const priceCalculatorAddress = "0xabcdef1234567890abcdef1234567890abcdef12";
priceCalculatorReadStub.resolves(priceCalculatorAddress);
readContractStub.resolves([null, null]);
const rlnBase = createMockRLNBaseContract(mockContract, mockRpcClient);
const result = await rlnBase.getPriceForRateLimit(20);
it("throws if calculate returns malformed data", async () => {
calculateStub.resolves([null, null]);
const rlnBase = createMockRLNBaseContract(provider);
const result = await rlnBase.getPriceForRateLimit(20, mockContractFactory);
expect(result.token).to.be.null;
expect(result.price).to.be.null;
expect(priceCalculatorReadStub.calledOnce).to.be.true;
expect(readContractStub.calledOnce).to.be.true;
});
});

View File

@ -1,74 +1,92 @@
import { Logger } from "@waku/utils";
import {
type Address,
decodeEventLog,
getContract,
type GetContractReturnType,
type Hash,
type PublicClient,
type WalletClient
} from "viem";
import { ethers } from "ethers";
import { IdentityCredential } from "../identity.js";
import type { DecryptedCredentials } from "../keystore/types.js";
import type { RpcClient } from "../utils/index.js";
import { DecryptedCredentials } from "../keystore/types.js";
import { RLN_ABI } from "./abi/rln.js";
import {
DEFAULT_RATE_LIMIT,
RATE_LIMIT_PARAMS,
RLN_CONTRACT
PRICE_CALCULATOR_CONTRACT,
RATE_LIMIT_PARAMS
} from "./constants.js";
import {
CustomQueryOptions,
FetchMembersOptions,
Member,
MembershipInfo,
MembershipRegisteredEvent,
MembershipState,
RLNContractOptions
RLNContractInitOptions
} from "./types.js";
import { iPriceCalculatorAbi, wakuRlnV2Abi } from "./wagmi/generated.js";
const log = new Logger("rln:contract:base");
export class RLNBaseContract {
public contract: GetContractReturnType<
typeof wakuRlnV2Abi,
PublicClient | WalletClient
>;
public rpcClient: RpcClient;
public contract: ethers.Contract;
private deployBlock: undefined | number;
private rateLimit: number;
private minRateLimit?: number;
private maxRateLimit?: number;
protected _members: Map<number, Member> = new Map();
private _membersFilter: ethers.EventFilter;
private _membershipErasedFilter: ethers.EventFilter;
private _membersExpiredFilter: ethers.EventFilter;
/**
* Private constructor for RLNBaseContract. Use static create() instead.
*/
protected constructor(options: RLNContractOptions) {
const { address, rpcClient, rateLimit = DEFAULT_RATE_LIMIT } = options;
protected constructor(options: RLNContractInitOptions) {
const {
address,
signer,
rateLimit = DEFAULT_RATE_LIMIT,
contract
} = options;
log.info("Initializing RLNBaseContract", { address, rateLimit });
this.rpcClient = rpcClient;
this.contract = getContract({
address,
abi: wakuRlnV2Abi,
client: this.rpcClient
});
this.contract = contract || new ethers.Contract(address, RLN_ABI, signer);
this.rateLimit = rateLimit;
try {
log.info("Setting up event filters");
// Initialize event filters
this._membersFilter = this.contract.filters.MembershipRegistered();
this._membershipErasedFilter = this.contract.filters.MembershipErased();
this._membersExpiredFilter = this.contract.filters.MembershipExpired();
log.info("Event filters initialized successfully");
} catch (error) {
log.error("Failed to initialize event filters", { error });
throw new Error(
"Failed to initialize event filters: " + (error as Error).message
);
}
// Initialize members and subscriptions
this.fetchMembers()
.then(() => {
this.subscribeToMembers();
})
.catch((error) => {
log.error("Failed to initialize members", { error });
});
}
/**
* Static async factory to create and initialize RLNBaseContract
*/
public static async create(
options: RLNContractOptions
options: RLNContractInitOptions
): Promise<RLNBaseContract> {
const instance = new RLNBaseContract(options);
const [min, max] = await Promise.all([
instance.contract.read.minMembershipRateLimit(),
instance.contract.read.maxMembershipRateLimit()
instance.contract.minMembershipRateLimit(),
instance.contract.maxMembershipRateLimit()
]);
instance.minRateLimit = min;
instance.maxRateLimit = max;
instance.minRateLimit = ethers.BigNumber.from(min).toNumber();
instance.maxRateLimit = ethers.BigNumber.from(max).toNumber();
instance.validateRateLimit(instance.rateLimit);
return instance;
@ -88,6 +106,13 @@ export class RLNBaseContract {
return this.contract.address;
}
/**
* Gets the contract provider
*/
public get provider(): ethers.providers.Provider {
return this.contract.provider;
}
/**
* Gets the minimum allowed rate limit (cached)
*/
@ -111,7 +136,8 @@ export class RLNBaseContract {
* @returns Promise<number> The maximum total rate limit in messages per epoch
*/
public async getMaxTotalRateLimit(): Promise<number> {
return await this.contract.read.maxTotalRateLimit();
const maxTotalRate = await this.contract.maxTotalRateLimit();
return maxTotalRate.toNumber();
}
/**
@ -119,7 +145,8 @@ export class RLNBaseContract {
* @returns Promise<number> The current total rate limit usage in messages per epoch
*/
public async getCurrentTotalRateLimit(): Promise<number> {
return Number(await this.contract.read.currentTotalRateLimit());
const currentTotal = await this.contract.currentTotalRateLimit();
return currentTotal.toNumber();
}
/**
@ -127,10 +154,11 @@ export class RLNBaseContract {
* @returns Promise<number> The remaining rate limit that can be allocated
*/
public async getRemainingTotalRateLimit(): Promise<number> {
return (
(await this.contract.read.maxTotalRateLimit()) -
Number(await this.contract.read.currentTotalRateLimit())
);
const [maxTotal, currentTotal] = await Promise.all([
this.contract.maxTotalRateLimit(),
this.contract.currentTotalRateLimit()
]);
return Number(maxTotal) - Number(currentTotal);
}
/**
@ -142,35 +170,233 @@ export class RLNBaseContract {
this.rateLimit = newRateLimit;
}
/**
* Gets the Merkle tree root for RLN proof verification
* @returns Promise<bigint> The Merkle tree root
*
*/
public async getMerkleRoot(): Promise<bigint> {
return this.contract.read.root();
public get members(): Member[] {
const sortedMembers = Array.from(this._members.values()).sort(
(left, right) => left.index.toNumber() - right.index.toNumber()
);
return sortedMembers;
}
/**
* Gets the Merkle proof for a member at a given index
* @param index The index of the member in the membership set
* @returns Promise<bigint[]> Array of 20 Merkle proof elements
*
*/
public async getMerkleProof(index: number): Promise<readonly bigint[]> {
return await this.contract.read.getMerkleProof([index]);
public async fetchMembers(options: FetchMembersOptions = {}): Promise<void> {
const registeredMemberEvents = await RLNBaseContract.queryFilter(
this.contract,
{
fromBlock: this.deployBlock,
...options,
membersFilter: this.membersFilter
}
);
const removedMemberEvents = await RLNBaseContract.queryFilter(
this.contract,
{
fromBlock: this.deployBlock,
...options,
membersFilter: this.membershipErasedFilter
}
);
const expiredMemberEvents = await RLNBaseContract.queryFilter(
this.contract,
{
fromBlock: this.deployBlock,
...options,
membersFilter: this.membersExpiredFilter
}
);
const events = [
...registeredMemberEvents,
...removedMemberEvents,
...expiredMemberEvents
];
this.processEvents(events);
}
public static async queryFilter(
contract: ethers.Contract,
options: CustomQueryOptions
): Promise<ethers.Event[]> {
const FETCH_CHUNK = 5;
const BLOCK_RANGE = 3000;
const {
fromBlock,
membersFilter,
fetchRange = BLOCK_RANGE,
fetchChunks = FETCH_CHUNK
} = options;
if (fromBlock === undefined) {
return contract.queryFilter(membersFilter);
}
if (!contract.provider) {
throw Error("No provider found on the contract.");
}
const toBlock = await contract.provider.getBlockNumber();
if (toBlock - fromBlock < fetchRange) {
return contract.queryFilter(membersFilter, fromBlock, toBlock);
}
const events: ethers.Event[][] = [];
const chunks = RLNBaseContract.splitToChunks(
fromBlock,
toBlock,
fetchRange
);
for (const portion of RLNBaseContract.takeN<[number, number]>(
chunks,
fetchChunks
)) {
const promises = portion.map(([left, right]) =>
RLNBaseContract.ignoreErrors(
contract.queryFilter(membersFilter, left, right),
[]
)
);
const fetchedEvents = await Promise.all(promises);
events.push(fetchedEvents.flatMap((v) => v));
}
return events.flatMap((v) => v);
}
public processEvents(events: ethers.Event[]): void {
const toRemoveTable = new Map<number, number[]>();
const toInsertTable = new Map<number, ethers.Event[]>();
events.forEach((evt) => {
if (!evt.args) {
return;
}
if (
evt.event === "MembershipErased" ||
evt.event === "MembershipExpired"
) {
let index = evt.args.index;
if (!index) {
return;
}
if (typeof index === "number" || typeof index === "string") {
index = ethers.BigNumber.from(index);
}
const toRemoveVal = toRemoveTable.get(evt.blockNumber);
if (toRemoveVal != undefined) {
toRemoveVal.push(index.toNumber());
toRemoveTable.set(evt.blockNumber, toRemoveVal);
} else {
toRemoveTable.set(evt.blockNumber, [index.toNumber()]);
}
} else if (evt.event === "MembershipRegistered") {
let eventsPerBlock = toInsertTable.get(evt.blockNumber);
if (eventsPerBlock == undefined) {
eventsPerBlock = [];
}
eventsPerBlock.push(evt);
toInsertTable.set(evt.blockNumber, eventsPerBlock);
}
});
}
public static splitToChunks(
from: number,
to: number,
step: number
): Array<[number, number]> {
const chunks: Array<[number, number]> = [];
let left = from;
while (left < to) {
const right = left + step < to ? left + step : to;
chunks.push([left, right] as [number, number]);
left = right;
}
return chunks;
}
public static *takeN<T>(array: T[], size: number): Iterable<T[]> {
let start = 0;
while (start < array.length) {
const portion = array.slice(start, start + size);
yield portion;
start += size;
}
}
public static async ignoreErrors<T>(
promise: Promise<T>,
defaultValue: T
): Promise<T> {
try {
return await promise;
} catch (err: unknown) {
if (err instanceof Error) {
log.info(`Ignoring an error during query: ${err.message}`);
} else {
log.info(`Ignoring an unknown error during query`);
}
return defaultValue;
}
}
public subscribeToMembers(): void {
this.contract.on(
this.membersFilter,
(
_idCommitment: bigint,
_membershipRateLimit: ethers.BigNumber,
_index: ethers.BigNumber,
event: ethers.Event
) => {
this.processEvents([event]);
}
);
this.contract.on(
this.membershipErasedFilter,
(
_idCommitment: bigint,
_membershipRateLimit: ethers.BigNumber,
_index: ethers.BigNumber,
event: ethers.Event
) => {
this.processEvents([event]);
}
);
this.contract.on(
this.membersExpiredFilter,
(
_idCommitment: bigint,
_membershipRateLimit: ethers.BigNumber,
_index: ethers.BigNumber,
event: ethers.Event
) => {
this.processEvents([event]);
}
);
}
public async getMembershipInfo(
idCommitmentBigInt: bigint
): Promise<MembershipInfo | undefined> {
try {
const membershipData = await this.contract.read.memberships([
idCommitmentBigInt
]);
const currentBlock = await this.rpcClient.getBlockNumber();
const membershipData =
await this.contract.memberships(idCommitmentBigInt);
const currentBlock = await this.contract.provider.getBlockNumber();
const [
depositAmount,
activeDuration,
@ -182,13 +408,12 @@ export class RLNBaseContract {
token
] = membershipData;
const gracePeriodEnd =
Number(gracePeriodStartTimestamp) + Number(gracePeriodDuration);
const gracePeriodEnd = gracePeriodStartTimestamp.add(gracePeriodDuration);
let state: MembershipState;
if (currentBlock < Number(gracePeriodStartTimestamp)) {
if (currentBlock < gracePeriodStartTimestamp.toNumber()) {
state = MembershipState.Active;
} else if (currentBlock < gracePeriodEnd) {
} else if (currentBlock < gracePeriodEnd.toNumber()) {
state = MembershipState.GracePeriod;
} else {
state = MembershipState.Expired;
@ -197,9 +422,9 @@ export class RLNBaseContract {
return {
index,
idCommitment: idCommitmentBigInt.toString(),
rateLimit: rateLimit,
startBlock: Number(gracePeriodStartTimestamp),
endBlock: gracePeriodEnd,
rateLimit: Number(rateLimit),
startBlock: gracePeriodStartTimestamp.toNumber(),
endBlock: gracePeriodEnd.toNumber(),
state,
depositAmount,
activeDuration,
@ -213,87 +438,43 @@ export class RLNBaseContract {
}
}
public async extendMembership(idCommitmentBigInt: bigint): Promise<Hash> {
if (!this.rpcClient.account) {
throw new Error(
"Failed to extendMembership: no account set in wallet client"
);
}
try {
await this.contract.simulate.extendMemberships([[idCommitmentBigInt]], {
chain: this.rpcClient.chain,
account: this.rpcClient.account.address
});
} catch (err) {
if (err instanceof Error) {
throw new Error(
"Error simulating extending membership: " + err.message
);
} else {
throw new Error("Error simulating extending membership", {
cause: err
});
}
}
const hash = await this.contract.write.extendMemberships(
[[idCommitmentBigInt]],
{
account: this.rpcClient.account,
chain: this.rpcClient.chain
}
);
await this.rpcClient.waitForTransactionReceipt({ hash });
return hash;
public async extendMembership(
idCommitmentBigInt: bigint
): Promise<ethers.ContractTransaction> {
const tx = await this.contract.extendMemberships([idCommitmentBigInt]);
await tx.wait();
return tx;
}
public async eraseMembership(
idCommitmentBigInt: bigint,
eraseFromMembershipSet: boolean = true
): Promise<Hash> {
): Promise<ethers.ContractTransaction> {
if (
!(await this.isExpired(idCommitmentBigInt)) ||
!(await this.isInGracePeriod(idCommitmentBigInt))
) {
throw new Error("Membership is not expired or in grace period");
}
if (!this.rpcClient.account) {
throw new Error(
"Failed to eraseMembership: no account set in wallet client"
);
}
try {
await this.contract.simulate.eraseMemberships(
[[idCommitmentBigInt], eraseFromMembershipSet],
{
chain: this.rpcClient.chain,
account: this.rpcClient.account.address
}
);
} catch (err) {
if (err instanceof Error) {
throw new Error("Error simulating eraseMemberships: " + err.message);
} else {
throw new Error("Error simulating eraseMemberships", { cause: err });
}
}
const estimatedGas = await this.contract.estimateGas[
"eraseMemberships(uint256[],bool)"
]([idCommitmentBigInt], eraseFromMembershipSet);
const gasLimit = estimatedGas.add(10000);
const hash = await this.contract.write.eraseMemberships(
[[idCommitmentBigInt], eraseFromMembershipSet],
{
chain: this.rpcClient.chain,
account: this.rpcClient.account
}
const tx = await this.contract["eraseMemberships(uint256[],bool)"](
[idCommitmentBigInt],
eraseFromMembershipSet,
{ gasLimit }
);
await this.rpcClient.waitForTransactionReceipt({ hash });
return hash;
await tx.wait();
return tx;
}
public async registerMembership(
idCommitmentBigInt: bigint,
rateLimit: number = DEFAULT_RATE_LIMIT
): Promise<Hash> {
): Promise<ethers.ContractTransaction> {
if (
rateLimit < RATE_LIMIT_PARAMS.MIN_RATE ||
rateLimit > RATE_LIMIT_PARAMS.MAX_RATE
@ -302,80 +483,21 @@ export class RLNBaseContract {
`Rate limit must be between ${RATE_LIMIT_PARAMS.MIN_RATE} and ${RATE_LIMIT_PARAMS.MAX_RATE}`
);
}
if (!this.rpcClient.account) {
throw new Error(
"Failed to registerMembership: no account set in wallet client"
);
}
try {
await this.contract.simulate.register(
[idCommitmentBigInt, rateLimit, []],
{
chain: this.rpcClient.chain,
account: this.rpcClient.account.address
}
);
} catch (err) {
if (err instanceof Error) {
throw new Error("Error simulating register membership: " + err.message);
} else {
throw new Error("Error simulating register membership", { cause: err });
}
}
const hash = await this.contract.write.register(
[idCommitmentBigInt, rateLimit, []],
{
chain: this.rpcClient.chain,
account: this.rpcClient.account
}
);
await this.rpcClient.waitForTransactionReceipt({ hash });
return hash;
return this.contract.register(idCommitmentBigInt, rateLimit, []);
}
/**
* Withdraw deposited tokens after membership is erased.
* The smart contract validates that the sender is the holder of the membership,
* and will only send tokens to that address.
* @param token - Token address to withdraw
*/
public async withdraw(token: string): Promise<Hash> {
if (!this.rpcClient.account) {
throw new Error("Failed to withdraw: no account set in wallet client");
}
public async withdraw(token: string, walletAddress: string): Promise<void> {
try {
await this.contract.simulate.withdraw([token as Address], {
chain: this.rpcClient.chain,
account: this.rpcClient.account.address
});
} catch (err) {
if (err instanceof Error) {
throw new Error("Error simulating withdraw: " + err.message);
} else {
throw new Error("Error simulating withdraw", { cause: err });
}
const tx = await this.contract.withdraw(token, walletAddress);
await tx.wait();
} catch (error) {
log.error(`Error in withdraw: ${(error as Error).message}`);
}
const hash = await this.contract.write.withdraw([token as Address], {
chain: this.rpcClient.chain,
account: this.rpcClient.account
});
await this.rpcClient.waitForTransactionReceipt({ hash });
return hash;
}
public async registerWithIdentity(
identity: IdentityCredential
): Promise<DecryptedCredentials | undefined> {
try {
if (!this.rpcClient.account) {
throw new Error(
"Failed to registerWithIdentity: no account set in wallet client"
);
}
log.info(
`Registering identity with rate limit: ${this.rateLimit} messages/epoch`
);
@ -398,71 +520,62 @@ export class RLNBaseContract {
);
}
await this.contract.simulate.register(
[identity.IDCommitmentBigInt, this.rateLimit, []],
{
chain: this.rpcClient.chain,
account: this.rpcClient.account.address
}
const estimatedGas = await this.contract.estimateGas.register(
identity.IDCommitmentBigInt,
this.rateLimit,
[]
);
const gasLimit = estimatedGas.add(10000);
const hash: Hash = await this.contract.write.register(
[identity.IDCommitmentBigInt, this.rateLimit, []],
{
chain: this.rpcClient.chain,
account: this.rpcClient.account
}
);
const txRegisterResponse: ethers.ContractTransaction =
await this.contract.register(
identity.IDCommitmentBigInt,
this.rateLimit,
[],
{
gasLimit
}
);
const txRegisterReceipt = await this.rpcClient.waitForTransactionReceipt({
hash
});
const txRegisterReceipt = await txRegisterResponse.wait();
if (txRegisterReceipt.status === "reverted") {
if (txRegisterReceipt.status === 0) {
throw new Error("Transaction failed on-chain");
}
// Parse MembershipRegistered event from logs
const memberRegisteredLog = txRegisterReceipt.logs.find((log) => {
try {
const decoded = decodeEventLog({
abi: wakuRlnV2Abi,
data: log.data,
topics: log.topics
});
return decoded.eventName === "MembershipRegistered";
} catch {
return false;
}
});
const memberRegistered = txRegisterReceipt.events?.find(
(event: ethers.Event) => event.event === "MembershipRegistered"
);
if (!memberRegisteredLog) {
if (!memberRegistered || !memberRegistered.args) {
log.error(
"Failed to register membership: No MembershipRegistered event found"
);
return undefined;
}
// Decode the event
const decoded = decodeEventLog({
abi: wakuRlnV2Abi,
data: memberRegisteredLog.data,
topics: memberRegisteredLog.topics,
eventName: "MembershipRegistered"
});
const decodedData: MembershipRegisteredEvent = {
idCommitment: memberRegistered.args.idCommitment,
membershipRateLimit: memberRegistered.args.membershipRateLimit,
index: memberRegistered.args.index
};
log.info(
`Successfully registered membership with index ${decoded.args.index} ` +
`and rate limit ${decoded.args.membershipRateLimit}`
`Successfully registered membership with index ${decodedData.index} ` +
`and rate limit ${decodedData.membershipRateLimit}`
);
const network = await this.contract.provider.getNetwork();
const address = this.contract.address;
const membershipId = Number(decodedData.index);
return {
identity,
membership: {
address: this.contract.address,
treeIndex: decoded.args.index,
chainId: String(RLN_CONTRACT.chainId),
rateLimit: Number(decoded.args.membershipRateLimit)
address,
treeIndex: membershipId,
chainId: network.chainId.toString(),
rateLimit: decodedData.membershipRateLimit.toNumber()
}
};
} catch (error) {
@ -495,6 +608,78 @@ export class RLNBaseContract {
}
}
public async registerWithPermitAndErase(
identity: IdentityCredential,
permit: {
owner: string;
deadline: number;
v: number;
r: string;
s: string;
},
idCommitmentsToErase: string[]
): Promise<DecryptedCredentials | undefined> {
try {
log.info(
`Registering identity with permit and rate limit: ${this.rateLimit} messages/epoch`
);
const txRegisterResponse: ethers.ContractTransaction =
await this.contract.registerWithPermit(
permit.owner,
permit.deadline,
permit.v,
permit.r,
permit.s,
identity.IDCommitmentBigInt,
this.rateLimit,
idCommitmentsToErase.map((id) => ethers.BigNumber.from(id))
);
const txRegisterReceipt = await txRegisterResponse.wait();
const memberRegistered = txRegisterReceipt.events?.find(
(event: ethers.Event) => event.event === "MembershipRegistered"
);
if (!memberRegistered || !memberRegistered.args) {
log.error(
"Failed to register membership with permit: No MembershipRegistered event found"
);
return undefined;
}
const decodedData: MembershipRegisteredEvent = {
idCommitment: memberRegistered.args.idCommitment,
membershipRateLimit: memberRegistered.args.membershipRateLimit,
index: memberRegistered.args.index
};
log.info(
`Successfully registered membership with permit. Index: ${decodedData.index}, ` +
`Rate limit: ${decodedData.membershipRateLimit}, Erased ${idCommitmentsToErase.length} commitments`
);
const network = await this.contract.provider.getNetwork();
const address = this.contract.address;
const membershipId = Number(decodedData.index);
return {
identity,
membership: {
address,
treeIndex: membershipId,
chainId: network.chainId.toString(),
rateLimit: decodedData.membershipRateLimit.toNumber()
}
};
} catch (error) {
log.error(
`Error in registerWithPermitAndErase: ${(error as Error).message}`
);
return undefined;
}
}
/**
* Validates that the rate limit is within the allowed range (sync)
* @throws Error if the rate limit is outside the allowed range
@ -510,17 +695,50 @@ export class RLNBaseContract {
}
}
private async getMemberIndex(idCommitmentBigInt: bigint): Promise<number> {
// Current version of the contract has the index at position 5 in the membership struct
return (await this.contract.read.memberships([idCommitmentBigInt]))[5];
private get membersFilter(): ethers.EventFilter {
if (!this._membersFilter) {
throw Error("Members filter was not initialized.");
}
return this._membersFilter;
}
private get membershipErasedFilter(): ethers.EventFilter {
if (!this._membershipErasedFilter) {
throw Error("MembershipErased filter was not initialized.");
}
return this._membershipErasedFilter;
}
private get membersExpiredFilter(): ethers.EventFilter {
if (!this._membersExpiredFilter) {
throw Error("MembersExpired filter was not initialized.");
}
return this._membersExpiredFilter;
}
private async getMemberIndex(
idCommitmentBigInt: bigint
): Promise<ethers.BigNumber | undefined> {
try {
const events = await this.contract.queryFilter(
this.contract.filters.MembershipRegistered(idCommitmentBigInt)
);
if (events.length === 0) return undefined;
// Get the most recent registration event
const event = events[events.length - 1];
return event.args?.index;
} catch (error) {
return undefined;
}
}
public async getMembershipStatus(
idCommitment: bigint
): Promise<"expired" | "grace" | "active"> {
const [isExpired, isInGrace] = await Promise.all([
this.contract.read.isExpired([idCommitment]),
this.contract.read.isInGracePeriod([idCommitment])
this.contract.isExpired(idCommitment),
this.contract.isInGracePeriod(idCommitment)
]);
if (isExpired) return "expired";
@ -535,7 +753,7 @@ export class RLNBaseContract {
*/
public async isExpired(idCommitmentBigInt: bigint): Promise<boolean> {
try {
return await this.contract.read.isExpired([idCommitmentBigInt]);
return await this.contract.isExpired(idCommitmentBigInt);
} catch (error) {
log.error("Error in isExpired:", error);
return false;
@ -549,7 +767,7 @@ export class RLNBaseContract {
*/
public async isInGracePeriod(idCommitmentBigInt: bigint): Promise<boolean> {
try {
return await this.contract.read.isInGracePeriod([idCommitmentBigInt]);
return await this.contract.isInGracePeriod(idCommitmentBigInt);
} catch (error) {
log.error("Error in isInGracePeriod:", error);
return false;
@ -561,18 +779,21 @@ export class RLNBaseContract {
* @param rateLimit The rate limit to calculate the price for
* @param contractFactory Optional factory for creating the contract (for testing)
*/
public async getPriceForRateLimit(rateLimit: number): Promise<{
public async getPriceForRateLimit(
rateLimit: number,
contractFactory?: typeof import("ethers").Contract
): Promise<{
token: string | null;
price: bigint | null;
price: import("ethers").BigNumber | null;
}> {
const address = await this.contract.read.priceCalculator();
const [token, price] = await this.rpcClient.readContract({
address,
abi: iPriceCalculatorAbi,
functionName: "calculate",
args: [rateLimit]
});
const provider = this.contract.provider;
const ContractCtor = contractFactory || ethers.Contract;
const priceCalculator = new ContractCtor(
PRICE_CALCULATOR_CONTRACT.address,
PRICE_CALCULATOR_CONTRACT.abi,
provider
);
const [token, price] = await priceCalculator.calculate(rateLimit);
// Defensive: if token or price is null/undefined, return nulls
if (!token || !price) {
return { token: null, price: null };

View File

@ -1,22 +1,28 @@
import { Address } from "viem";
import { ethers } from "ethers";
import { RpcClient } from "../utils/index.js";
export interface CustomQueryOptions extends FetchMembersOptions {
membersFilter: ethers.EventFilter;
}
export type Member = {
idCommitment: string;
index: bigint;
index: ethers.BigNumber;
};
export interface RLNContractOptions {
rpcClient: RpcClient;
address: Address;
signer: ethers.Signer;
address: string;
rateLimit?: number;
}
export interface RLNContractInitOptions extends RLNContractOptions {
contract?: ethers.Contract;
}
export interface MembershipRegisteredEvent {
idCommitment: string;
membershipRateLimit: bigint;
index: bigint;
membershipRateLimit: ethers.BigNumber;
index: ethers.BigNumber;
}
export type FetchMembersOptions = {
@ -26,13 +32,13 @@ export type FetchMembersOptions = {
};
export interface MembershipInfo {
index: number;
index: ethers.BigNumber;
idCommitment: string;
rateLimit: number;
startBlock: number;
endBlock: number;
state: MembershipState;
depositAmount: bigint;
depositAmount: ethers.BigNumber;
activeDuration: number;
gracePeriodDuration: number;
holder: string;

File diff suppressed because it is too large Load Diff

View File

@ -1,5 +1,5 @@
import { Logger } from "@waku/utils";
import { publicActions } from "viem";
import { ethers } from "ethers";
import { RLN_CONTRACT } from "./contract/constants.js";
import { RLNBaseContract } from "./contract/rln_base_contract.js";
@ -10,7 +10,7 @@ import type {
} from "./keystore/index.js";
import { KeystoreEntity, Password } from "./keystore/types.js";
import { RegisterMembershipOptions, StartRLNOptions } from "./types.js";
import { createViemClientFromWindow, RpcClient } from "./utils/index.js";
import { extractMetaMaskSigner } from "./utils/index.js";
import { Zerokit } from "./zerokit.js";
const log = new Logger("rln:credentials");
@ -24,7 +24,7 @@ export class RLNCredentialsManager {
protected starting = false;
public contract: undefined | RLNBaseContract;
public rpcClient: undefined | RpcClient;
public signer: undefined | ethers.Signer;
protected keystore = Keystore.create();
public credentials: undefined | DecryptedCredentials;
@ -36,6 +36,10 @@ export class RLNCredentialsManager {
this.zerokit = zerokit;
}
public get provider(): undefined | ethers.providers.Provider {
return this.contract?.provider;
}
public async start(options: StartRLNOptions = {}): Promise<void> {
if (this.started || this.starting) {
log.info("RLNCredentialsManager already started or starting");
@ -55,8 +59,10 @@ export class RLNCredentialsManager {
log.info("Credentials successfully decrypted");
}
const { rpcClient, address, rateLimit } =
await this.determineStartOptions(options, credentials);
const { signer, address, rateLimit } = await this.determineStartOptions(
options,
credentials
);
log.info(`Using contract address: ${address}`);
@ -66,10 +72,10 @@ export class RLNCredentialsManager {
}
this.credentials = credentials;
this.rpcClient = rpcClient!;
this.signer = signer!;
this.contract = await RLNBaseContract.create({
address: address! as `0x${string}`,
rpcClient: this.rpcClient,
address: address!,
signer: signer!,
rateLimit: rateLimit ?? this.zerokit.rateLimit
});
@ -128,7 +134,7 @@ export class RLNCredentialsManager {
protected async determineStartOptions(
options: StartRLNOptions,
credentials: KeystoreEntity | undefined
): Promise<StartRLNOptions & { rpcClient: RpcClient }> {
): Promise<StartRLNOptions> {
let chainId = credentials?.membership.chainId;
const address =
credentials?.membership.address ||
@ -140,14 +146,11 @@ export class RLNCredentialsManager {
log.info(`Using Linea contract with chainId: ${chainId}`);
}
const rpcClient: RpcClient = options.walletClient
? options.walletClient.extend(publicActions)
: await createViemClientFromWindow();
const currentChainId = rpcClient.chain?.id;
const signer = options.signer || (await extractMetaMaskSigner());
const currentChainId = await signer.getChainId();
log.info(`Current chain ID: ${currentChainId}`);
if (chainId && chainId !== currentChainId?.toString()) {
if (chainId && chainId !== currentChainId.toString()) {
log.error(
`Chain ID mismatch: contract=${chainId}, current=${currentChainId}`
);
@ -157,7 +160,7 @@ export class RLNCredentialsManager {
}
return {
rpcClient,
signer,
address
};
}
@ -203,9 +206,9 @@ export class RLNCredentialsManager {
protected async verifyCredentialsAgainstContract(
credentials: KeystoreEntity
): Promise<void> {
if (!this.contract || !this.rpcClient) {
if (!this.contract) {
throw Error(
"Failed to verify chain coordinates: no contract or viem client initialized."
"Failed to verify chain coordinates: no contract initialized."
);
}
@ -218,7 +221,8 @@ export class RLNCredentialsManager {
}
const chainId = credentials.membership.chainId;
const currentChainId = await this.rpcClient.getChainId();
const network = await this.contract.provider.getNetwork();
const currentChainId = network.chainId;
if (chainId !== currentChainId.toString()) {
throw Error(
`Failed to verify chain coordinates: credentials chainID=${chainId} is not equal to registryContract chainID=${currentChainId}`

View File

@ -1,10 +1,11 @@
import { RLN_ABI } from "./contract/abi/rln.js";
import { RLN_CONTRACT } from "./contract/index.js";
import { RLNBaseContract } from "./contract/rln_base_contract.js";
import { createRLN } from "./create.js";
import { IdentityCredential } from "./identity.js";
import { Keystore } from "./keystore/index.js";
import { RLNInstance } from "./rln.js";
import { createViemClientFromWindow } from "./utils/index.js";
import { extractMetaMaskSigner } from "./utils/index.js";
export {
RLNBaseContract,
@ -13,16 +14,10 @@ export {
RLNInstance,
IdentityCredential,
RLN_CONTRACT,
createViemClientFromWindow
extractMetaMaskSigner,
RLN_ABI
};
export {
wakuRlnV2Abi,
linearPriceCalculatorAbi,
iPriceCalculatorAbi,
membershipUpgradeableAbi
} from "./contract/wagmi/generated.js";
export type {
DecryptedCredentials,
EncryptedCredentials,

View File

@ -1,4 +1,4 @@
import { WalletClient } from "viem";
import { ethers } from "ethers";
import { IdentityCredential } from "./identity.js";
import {
@ -8,9 +8,9 @@ import {
export type StartRLNOptions = {
/**
* If not set - will attempt to create from provider injected in window.
* If not set - will extract MetaMask account and get signer from it.
*/
walletClient?: WalletClient;
signer?: ethers.Signer;
/**
* If not set - will use default SEPOLIA_CONTRACT address.
*/

View File

@ -1,4 +1,4 @@
export { createViemClientFromWindow, RpcClient } from "./rpcClient.js";
export { extractMetaMaskSigner } from "./metamask.js";
export { BytesUtils } from "./bytes.js";
export { sha256, poseidonHash } from "./hash.js";
export { dateToEpoch, epochIntToBytes, epochBytesToInt } from "./epoch.js";

View File

@ -0,0 +1,17 @@
import { ethers } from "ethers";
export const extractMetaMaskSigner = async (): Promise<ethers.Signer> => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const ethereum = (window as any).ethereum;
if (!ethereum) {
throw Error(
"Missing or invalid Ethereum provider. Please install a Web3 wallet such as MetaMask."
);
}
await ethereum.request({ method: "eth_requestAccounts" });
const provider = new ethers.providers.Web3Provider(ethereum, "any");
return provider.getSigner();
};

View File

@ -1,61 +0,0 @@
import "viem/window";
import {
type Address,
createWalletClient,
custom,
PublicActions,
publicActions,
WalletClient
} from "viem";
import { lineaSepolia } from "viem/chains";
export type RpcClient = WalletClient & PublicActions;
/**
* Checks window for injected Ethereum provider, requests user to connect, and creates an RPC client object
* capable of performing both read and write operations on the blockchain.
*
* If the wallet is not connected to the Linea Sepolia network, it will attempt to switch to it.
* If the wallet does not have the Linea Sepolia network added, it will attempt to add it.
*/
export const createViemClientFromWindow = async (): Promise<RpcClient> => {
const ethereum = window.ethereum;
if (!ethereum) {
throw Error(
"Missing or invalid Ethereum provider. Please install a Web3 wallet such as MetaMask."
);
}
const accounts = await ethereum.request({ method: "eth_requestAccounts" });
if (!Array.isArray(accounts)) {
throw Error("Failed to get accounts");
}
const account = accounts[0] as Address;
const rpcClient: RpcClient = createWalletClient({
account: account as Address,
chain: lineaSepolia,
transport: custom(window.ethereum!)
}).extend(publicActions);
// Ensure wallet is connected to Linea Sepolia
try {
await rpcClient.switchChain({ id: lineaSepolia.id });
} catch (error: unknown) {
// This error code indicates that the chain has not been added to the wallet
if (
typeof error === "object" &&
error !== null &&
"code" in error &&
error.code === 4902
) {
await rpcClient.addChain({ chain: lineaSepolia });
await rpcClient.switchChain({ id: lineaSepolia.id });
} else {
throw error;
}
}
return rpcClient;
};

View File

@ -1,4 +1,3 @@
{
"extends": "../../tsconfig.dev",
"exclude": ["wagmi.config.ts"]
}
"extends": "../../tsconfig.dev"
}

View File

@ -6,5 +6,5 @@
"tsBuildInfoFile": "dist/.tsbuildinfo"
},
"include": ["src"],
"exclude": ["wagmi.config.ts", "src/**/*.spec.ts", "src/test_utils"]
}
"exclude": ["src/**/*.spec.ts", "src/test_utils"]
}

View File

@ -1,18 +0,0 @@
import { defineConfig } from "@wagmi/cli";
import { foundry } from "@wagmi/cli/plugins";
export default defineConfig({
out: "src/contract/wagmi/generated.ts",
plugins: [
foundry({
project: "./waku-rlnv2-contract",
artifacts: "out",
include: [
"WakuRlnV2.sol/**",
"Membership.sol/**",
"LinearPriceCalculator.sol/**",
"IPriceCalculator.sol/**"
]
})
]
});

View File

@ -9,7 +9,6 @@ import { Libp2p, LightPushError, LightPushStatusCode } from "@waku/interfaces";
import { createRoutingInfo } from "@waku/utils";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import { afterEach } from "mocha";
import sinon, { SinonSpy } from "sinon";
import { PeerManager } from "../peer_manager/index.js";
@ -39,10 +38,6 @@ describe("LightPush SDK", () => {
lightPush = mockLightPush({ libp2p });
});
afterEach(() => {
sinon.restore();
});
it("should fail to send if no connected peers found", async () => {
const result = await lightPush.send(encoder, {
payload: utf8ToBytes("test")

View File

@ -65,7 +65,6 @@ export class LightPush implements ILightPush {
public stop(): void {
this.retryManager.stop();
this.protocol.stop();
}
public async send(

View File

@ -47,9 +47,7 @@ describe("RetryManager", () => {
sinon.restore();
});
// TODO: Skipped because the global state is not being restored and it breaks
// tests of functionalities that rely on intervals
it.skip("should start and stop interval correctly", () => {
it("should start and stop interval correctly", () => {
const setIntervalSpy = sinon.spy(global, "setInterval");
const clearIntervalSpy = sinon.spy(global, "clearInterval");

View File

@ -10,7 +10,6 @@ import {
import { delay } from "@waku/utils";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import { afterEach } from "mocha";
import sinon from "sinon";
import {
@ -92,10 +91,6 @@ describe("QueryOnConnect", () => {
};
});
afterEach(() => {
sinon.restore();
});
describe("constructor", () => {
it("should create QueryOnConnect instance with all required parameters", () => {
queryOnConnect = new QueryOnConnect(
@ -163,14 +158,14 @@ describe("QueryOnConnect", () => {
expect(wakuEventSpy.calledWith(WakuEvent.Health)).to.be.true;
});
it("should remove event listeners when stopped", async () => {
it("should remove event listeners when stopped", () => {
const peerRemoveSpy =
mockPeerManagerEventEmitter.removeEventListener as sinon.SinonSpy;
const wakuRemoveSpy =
mockWakuEventEmitter.removeEventListener as sinon.SinonSpy;
queryOnConnect.start();
await queryOnConnect.stop();
queryOnConnect.stop();
expect(peerRemoveSpy.calledWith(PeerManagerEventNames.StoreConnect)).to.be
.true;
@ -342,7 +337,6 @@ describe("QueryOnConnect", () => {
});
afterEach(() => {
sinon.restore();
mockClock.restore();
});

View File

@ -52,13 +52,6 @@ export class QueryOnConnect<
private lastTimeOffline: number;
private readonly forceQueryThresholdMs: number;
private isStarted: boolean = false;
private abortController?: AbortController;
private activeQueryPromise?: Promise<void>;
private boundStoreConnectHandler?: (event: CustomEvent<PeerId>) => void;
private boundHealthHandler?: (event: CustomEvent<HealthStatus>) => void;
public constructor(
public decoders: IDecoder<T>[],
public stopIfTrue: (msg: T) => boolean,
@ -78,37 +71,11 @@ export class QueryOnConnect<
}
public start(): void {
if (this.isStarted) {
log.warn("QueryOnConnect already running");
return;
}
log.info("starting query-on-connect service");
this.isStarted = true;
this.abortController = new AbortController();
this.setupEventListeners();
}
public async stop(): Promise<void> {
if (!this.isStarted) {
return;
}
log.info("stopping query-on-connect service");
this.isStarted = false;
if (this.abortController) {
this.abortController.abort();
this.abortController = undefined;
}
if (this.activeQueryPromise) {
log.info("Waiting for active query to complete...");
try {
await this.activeQueryPromise;
} catch (error) {
log.warn("Active query failed during stop:", error);
}
}
public stop(): void {
this.unsetEventListeners();
}
@ -140,10 +107,7 @@ export class QueryOnConnect<
this.lastTimeOffline > this.lastSuccessfulQuery ||
timeSinceLastQuery > this.forceQueryThresholdMs
) {
this.activeQueryPromise = this.query(peerId).finally(() => {
this.activeQueryPromise = undefined;
});
await this.activeQueryPromise;
await this.query(peerId);
} else {
log.info(`no querying`);
}
@ -156,8 +120,7 @@ export class QueryOnConnect<
for await (const page of this._queryGenerator(this.decoders, {
timeStart,
timeEnd,
peerId,
abortSignal: this.abortController?.signal
peerId
})) {
// Await for decoding
const messages = (await Promise.all(page)).filter(
@ -203,41 +166,33 @@ export class QueryOnConnect<
}
private setupEventListeners(): void {
this.boundStoreConnectHandler = (event: CustomEvent<PeerId>) => {
void this.maybeQuery(event.detail).catch((err) =>
log.error("query-on-connect error", err)
);
};
this.boundHealthHandler = this.updateLastOfflineDate.bind(this);
this.peerManagerEventEmitter.addEventListener(
PeerManagerEventNames.StoreConnect,
this.boundStoreConnectHandler
(event) =>
void this.maybeQuery(event.detail).catch((err) =>
log.error("query-on-connect error", err)
)
);
this.wakuEventEmitter.addEventListener(
WakuEvent.Health,
this.boundHealthHandler
this.updateLastOfflineDate.bind(this)
);
}
private unsetEventListeners(): void {
if (this.boundStoreConnectHandler) {
this.peerManagerEventEmitter.removeEventListener(
PeerManagerEventNames.StoreConnect,
this.boundStoreConnectHandler
);
this.boundStoreConnectHandler = undefined;
}
this.peerManagerEventEmitter.removeEventListener(
PeerManagerEventNames.StoreConnect,
(event) =>
void this.maybeQuery(event.detail).catch((err) =>
log.error("query-on-connect error", err)
)
);
if (this.boundHealthHandler) {
this.wakuEventEmitter.removeEventListener(
WakuEvent.Health,
this.boundHealthHandler
);
this.boundHealthHandler = undefined;
}
this.wakuEventEmitter.removeEventListener(
WakuEvent.Health,
this.updateLastOfflineDate.bind(this)
);
}
private updateLastOfflineDate(event: CustomEvent<HealthStatus>): void {

View File

@ -1,8 +1,2 @@
export { ReliableChannel, ReliableChannelOptions } from "./reliable_channel.js";
export { ReliableChannelEvents, ReliableChannelEvent } from "./events.js";
export {
StatusEvent,
StatusEvents,
StatusDetail,
ISyncStatusEvents
} from "./sync_status.js";

View File

@ -13,8 +13,6 @@ const DEFAULT_RETRIEVE_FREQUENCY_MS = 10 * 1000; // 10 seconds
export class MissingMessageRetriever<T extends IDecodedMessage> {
private retrieveInterval: ReturnType<typeof setInterval> | undefined;
private missingMessages: Map<MessageId, Uint8Array<ArrayBufferLike>>; // Waku Message Ids
private activeQueryPromise: Promise<void> | undefined;
private abortController?: AbortController;
public constructor(
private readonly decoder: IDecoder<T>,
@ -31,11 +29,7 @@ export class MissingMessageRetriever<T extends IDecodedMessage> {
public start(): void {
if (this.retrieveInterval) {
clearInterval(this.retrieveInterval);
this.retrieveInterval = undefined;
}
this.abortController = new AbortController();
if (this.retrieveFrequencyMs !== 0) {
log.info(`start retrieve loop every ${this.retrieveFrequencyMs}ms`);
this.retrieveInterval = setInterval(() => {
@ -44,30 +38,10 @@ export class MissingMessageRetriever<T extends IDecodedMessage> {
}
}
public async stop(): Promise<void> {
log.info("Stopping MissingMessageRetriever...");
public stop(): void {
if (this.retrieveInterval) {
clearInterval(this.retrieveInterval);
this.retrieveInterval = undefined;
}
if (this.abortController) {
this.abortController.abort();
this.abortController = undefined;
}
if (this.activeQueryPromise) {
log.info("Waiting for active query to complete...");
try {
await this.activeQueryPromise;
} catch (error) {
log.warn("Active query failed during stop:", error);
}
}
this.missingMessages.clear();
log.info("MissingMessageRetriever stopped");
}
public addMissingMessage(
@ -90,30 +64,15 @@ export class MissingMessageRetriever<T extends IDecodedMessage> {
if (this.missingMessages.size) {
const messageHashes = Array.from(this.missingMessages.values());
log.info("attempting to retrieve missing message", messageHashes.length);
this.activeQueryPromise = (async () => {
try {
for await (const page of this._retrieve([this.decoder], {
messageHashes,
abortSignal: this.abortController?.signal
})) {
for await (const msg of page) {
if (msg && this.onMessageRetrieved) {
await this.onMessageRetrieved(msg);
}
}
for await (const page of this._retrieve([this.decoder], {
messageHashes
})) {
for await (const msg of page) {
if (msg && this.onMessageRetrieved) {
await this.onMessageRetrieved(msg);
}
} catch (error) {
if (error instanceof Error && error.name === "AbortError") {
log.info("Store query aborted");
return;
}
log.error("Store query failed:", error);
}
})();
await this.activeQueryPromise;
this.activeQueryPromise = undefined;
}
}
}
}

View File

@ -1,67 +0,0 @@
import { Logger } from "@waku/utils";
const log = new Logger("sdk:random-timeout");
/**
* Enables waiting a random time before doing an action (using `setTimeout`),
* with possibility to apply a multiplier to manipulate said time.
*/
export class RandomTimeout {
private timeout: ReturnType<typeof setTimeout> | undefined;
public constructor(
/**
* The maximum interval one would wait before the call is made, in milliseconds.
*/
private maxIntervalMs: number,
/**
* When not zero: Anytime a call is made, then a new call will be rescheduled
* using this multiplier
*/
private multiplierOnCall: number,
/**
* The function to call when the timer is reached
*/
private callback: () => void | Promise<void>
) {
if (!Number.isFinite(maxIntervalMs) || maxIntervalMs < 0) {
throw new Error(
`maxIntervalMs must be a non-negative finite number, got: ${maxIntervalMs}`
);
}
if (!Number.isFinite(multiplierOnCall)) {
throw new Error(
`multiplierOnCall must be a finite number, got: ${multiplierOnCall}`
);
}
}
/**
* Use to start the timer. If a timer was already set, it deletes it and
* schedule a new one.
* @param multiplier applied to [[maxIntervalMs]]
*/
public restart(multiplier: number = 1): void {
this.stop();
if (this.maxIntervalMs) {
const timeoutMs = Math.random() * this.maxIntervalMs * multiplier;
this.timeout = setTimeout(() => {
try {
void this.callback();
} catch (error) {
log.error("Error in RandomTimeout callback:", error);
}
void this.restart(this.multiplierOnCall);
}, timeoutMs);
}
}
public stop(): void {
if (this.timeout) {
clearTimeout(this.timeout);
this.timeout = undefined;
}
}
}

View File

@ -13,7 +13,7 @@ import {
LightPushSDKResult,
QueryRequestParams
} from "@waku/interfaces";
import { ContentMessage, MessageChannelEvent, SyncMessage } from "@waku/sds";
import { ContentMessage, SyncMessage } from "@waku/sds";
import {
createRoutingInfo,
delay,
@ -22,7 +22,7 @@ import {
} from "@waku/utils";
import { bytesToUtf8, hexToBytes, utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import { afterEach, beforeEach, describe } from "mocha";
import { beforeEach, describe } from "mocha";
import sinon from "sinon";
import { ReliableChannel } from "./index.js";
@ -40,9 +40,6 @@ describe("Reliable Channel", () => {
let mockWakuNode: IWaku;
let encoder: IEncoder;
let decoder: IDecoder<IDecodedMessage>;
let reliableChannel: ReliableChannel<IDecodedMessage>;
let reliableChannelAlice: ReliableChannel<IDecodedMessage>;
let reliableChannelBob: ReliableChannel<IDecodedMessage>;
beforeEach(async () => {
mockWakuNode = new MockWakuNode();
@ -53,14 +50,8 @@ describe("Reliable Channel", () => {
decoder = createDecoder(TEST_CONTENT_TOPIC, TEST_ROUTING_INFO);
});
afterEach(async () => {
await reliableChannel?.stop();
await reliableChannelAlice?.stop();
await reliableChannelBob?.stop();
});
it("Outgoing message is emitted as sending", async () => {
reliableChannel = await ReliableChannel.create(
const reliableChannel = await ReliableChannel.create(
mockWakuNode,
"MyChannel",
"alice",
@ -87,7 +78,7 @@ describe("Reliable Channel", () => {
});
it("Outgoing message is emitted as sent", async () => {
reliableChannel = await ReliableChannel.create(
const reliableChannel = await ReliableChannel.create(
mockWakuNode,
"MyChannel",
"alice",
@ -126,7 +117,7 @@ describe("Reliable Channel", () => {
});
};
reliableChannel = await ReliableChannel.create(
const reliableChannel = await ReliableChannel.create(
mockWakuNode,
"MyChannel",
"alice",
@ -158,7 +149,7 @@ describe("Reliable Channel", () => {
});
it("Outgoing message is not emitted as acknowledged from own outgoing messages", async () => {
reliableChannel = await ReliableChannel.create(
const reliableChannel = await ReliableChannel.create(
mockWakuNode,
"MyChannel",
"alice",
@ -191,14 +182,14 @@ describe("Reliable Channel", () => {
const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter);
const mockWakuNodeBob = new MockWakuNode(commonEventEmitter);
reliableChannelAlice = await ReliableChannel.create(
const reliableChannelAlice = await ReliableChannel.create(
mockWakuNodeAlice,
"MyChannel",
"alice",
encoder,
decoder
);
reliableChannelBob = await ReliableChannel.create(
const reliableChannelBob = await ReliableChannel.create(
mockWakuNodeBob,
"MyChannel",
"bob",
@ -254,14 +245,14 @@ describe("Reliable Channel", () => {
const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter);
const mockWakuNodeBob = new MockWakuNode(commonEventEmitter);
reliableChannelAlice = await ReliableChannel.create(
const reliableChannelAlice = await ReliableChannel.create(
mockWakuNodeAlice,
"MyChannel",
"alice",
encoder,
decoder
);
reliableChannelBob = await ReliableChannel.create(
const reliableChannelBob = await ReliableChannel.create(
mockWakuNodeBob,
"MyChannel",
"bob",
@ -301,7 +292,7 @@ describe("Reliable Channel", () => {
});
it("Incoming message is emitted as received", async () => {
reliableChannel = await ReliableChannel.create(
const reliableChannel = await ReliableChannel.create(
mockWakuNode,
"MyChannel",
"alice",
@ -330,7 +321,7 @@ describe("Reliable Channel", () => {
const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter);
const mockWakuNodeBob = new MockWakuNode(commonEventEmitter);
reliableChannelAlice = await ReliableChannel.create(
const reliableChannelAlice = await ReliableChannel.create(
mockWakuNodeAlice,
"MyChannel",
"alice",
@ -341,7 +332,7 @@ describe("Reliable Channel", () => {
processTaskMinElapseMs: 10 // faster so it process message as soon as they arrive
}
);
reliableChannelBob = await ReliableChannel.create(
const reliableChannelBob = await ReliableChannel.create(
mockWakuNodeBob,
"MyChannel",
"bob",
@ -388,13 +379,16 @@ describe("Reliable Channel", () => {
});
});
describe("Missing Message Retrieval", () => {
// the test is failing when run with all tests in sdk package
// no clear reason why, skipping for now
// TODO: fix this test https://github.com/waku-org/js-waku/issues/2648
describe.skip("Missing Message Retrieval", () => {
it("Automatically retrieves missing message", async () => {
const commonEventEmitter = new TypedEventEmitter<MockWakuEvents>();
const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter);
// Setup, Alice first
reliableChannelAlice = await ReliableChannel.create(
const reliableChannelAlice = await ReliableChannel.create(
mockWakuNodeAlice,
"MyChannel",
"alice",
@ -448,7 +442,7 @@ describe("Reliable Channel", () => {
queryGenerator: queryGeneratorStub
};
reliableChannelBob = await ReliableChannel.create(
const reliableChannelBob = await ReliableChannel.create(
mockWakuNodeBob,
"MyChannel",
"bob",
@ -490,6 +484,201 @@ describe("Reliable Channel", () => {
});
});
describe("Query On Connect Integration E2E Tests", () => {
let mockWakuNode: MockWakuNode;
let reliableChannel: ReliableChannel<IDecodedMessage>;
let encoder: IEncoder;
let decoder: IDecoder<IDecodedMessage>;
let mockPeerManagerEvents: TypedEventEmitter<any>;
let queryGeneratorStub: sinon.SinonStub;
let mockPeerId: PeerId;
beforeEach(async () => {
// Setup mock waku node with store capability
mockWakuNode = new MockWakuNode();
// Setup mock peer manager events for QueryOnConnect
mockPeerManagerEvents = new TypedEventEmitter();
(mockWakuNode as any).peerManager = {
events: mockPeerManagerEvents
};
// Setup encoder and decoder
encoder = createEncoder({
contentTopic: TEST_CONTENT_TOPIC,
routingInfo: TEST_ROUTING_INFO
});
decoder = createDecoder(TEST_CONTENT_TOPIC, TEST_ROUTING_INFO);
// Setup store with queryGenerator for QueryOnConnect
queryGeneratorStub = sinon.stub();
mockWakuNode.store = {
queryGenerator: queryGeneratorStub
} as any;
mockPeerId = {
toString: () => "QmTestPeerId"
} as unknown as PeerId;
});
it("should trigger QueryOnConnect when going offline and store peer reconnects", async () => {
// Create a message that will be auto-retrieved
const messageText = "Auto-retrieved message";
const messagePayload = utf8ToBytes(messageText);
const sdsMessage = new ContentMessage(
ReliableChannel.getMessageId(messagePayload),
"testChannel",
"testSender",
[],
1n,
undefined,
messagePayload
);
const autoRetrievedMessage: IDecodedMessage = {
hash: hexToBytes("1234"),
hashStr: "1234",
version: 1,
timestamp: new Date(),
contentTopic: TEST_CONTENT_TOPIC,
pubsubTopic: decoder.pubsubTopic,
payload: sdsMessage.encode(),
rateLimitProof: undefined,
ephemeral: false,
meta: undefined
};
// Setup queryGenerator to return the auto-retrieved message
queryGeneratorStub.callsFake(async function* () {
yield [Promise.resolve(autoRetrievedMessage)];
});
// Create ReliableChannel with queryOnConnect enabled
reliableChannel = await ReliableChannel.create(
mockWakuNode,
"testChannel",
"testSender",
encoder,
decoder
);
// Wait for initial setup
await delay(50);
// Setup complete - focus on testing QueryOnConnect trigger
// Simulate going offline (change health status)
mockWakuNode.events.dispatchEvent(
new CustomEvent("health", { detail: HealthStatus.Unhealthy })
);
await delay(10);
// Simulate store peer reconnection which should trigger QueryOnConnect
mockPeerManagerEvents.dispatchEvent(
new CustomEvent("store:connect", { detail: mockPeerId })
);
// Wait for store query to be triggered
await delay(200);
// Verify that QueryOnConnect was triggered by the conditions
expect(queryGeneratorStub.called).to.be.true;
});
it("should trigger QueryOnConnect when time threshold is exceeded", async () => {
// Create multiple messages that will be auto-retrieved
const message1Text = "First auto-retrieved message";
const message2Text = "Second auto-retrieved message";
const message1Payload = utf8ToBytes(message1Text);
const message2Payload = utf8ToBytes(message2Text);
const sdsMessage1 = new ContentMessage(
ReliableChannel.getMessageId(message1Payload),
"testChannel",
"testSender",
[],
1n,
undefined,
message1Payload
);
const sdsMessage2 = new ContentMessage(
ReliableChannel.getMessageId(message2Payload),
"testChannel",
"testSender",
[],
2n,
undefined,
message2Payload
);
const autoRetrievedMessage1: IDecodedMessage = {
hash: hexToBytes("5678"),
hashStr: "5678",
version: 1,
timestamp: new Date(Date.now() - 1000),
contentTopic: TEST_CONTENT_TOPIC,
pubsubTopic: decoder.pubsubTopic,
payload: sdsMessage1.encode(),
rateLimitProof: undefined,
ephemeral: false,
meta: undefined
};
const autoRetrievedMessage2: IDecodedMessage = {
hash: hexToBytes("9abc"),
hashStr: "9abc",
version: 1,
timestamp: new Date(),
contentTopic: TEST_CONTENT_TOPIC,
pubsubTopic: decoder.pubsubTopic,
payload: sdsMessage2.encode(),
rateLimitProof: undefined,
ephemeral: false,
meta: undefined
};
// Setup queryGenerator to return multiple messages
queryGeneratorStub.callsFake(async function* () {
yield [Promise.resolve(autoRetrievedMessage1)];
yield [Promise.resolve(autoRetrievedMessage2)];
});
// Create ReliableChannel with queryOnConnect enabled
reliableChannel = await ReliableChannel.create(
mockWakuNode,
"testChannel",
"testSender",
encoder,
decoder,
{ queryOnConnect: true }
);
await delay(50);
// Simulate old last successful query by accessing QueryOnConnect internals
// The default threshold is 5 minutes, so we'll set it to an old time
if ((reliableChannel as any).queryOnConnect) {
((reliableChannel as any).queryOnConnect as any).lastSuccessfulQuery =
Date.now() - 6 * 60 * 1000; // 6 minutes ago
}
// Simulate store peer connection which should trigger retrieval due to time threshold
mockPeerManagerEvents.dispatchEvent(
new CustomEvent("store:connect", { detail: mockPeerId })
);
// Wait for store query to be triggered
await delay(200);
// Verify that QueryOnConnect was triggered due to time threshold
expect(queryGeneratorStub.called).to.be.true;
});
});
describe("stopIfTrue Integration with QueryOnConnect", () => {
let mockWakuNode: MockWakuNode;
let encoder: IEncoder;
@ -603,7 +792,7 @@ describe("Reliable Channel", () => {
yield [Promise.resolve(messages[2])];
});
reliableChannel = await ReliableChannel.create(
const reliableChannel = await ReliableChannel.create(
mockWakuNode,
channelId,
senderId,
@ -685,7 +874,7 @@ describe("Reliable Channel", () => {
yield [Promise.resolve(messages[1])];
});
reliableChannel = await ReliableChannel.create(
const reliableChannel = await ReliableChannel.create(
mockWakuNode,
channelId,
senderId,
@ -790,7 +979,7 @@ describe("Reliable Channel", () => {
yield [Promise.resolve(messages[2])];
});
reliableChannel = await ReliableChannel.create(
const reliableChannel = await ReliableChannel.create(
mockWakuNode,
channelId,
senderId,
@ -815,6 +1004,7 @@ describe("Reliable Channel", () => {
describe("isChannelMessageWithCausalHistory predicate", () => {
let mockWakuNode: MockWakuNode;
let reliableChannel: ReliableChannel<IDecodedMessage>;
let encoder: IEncoder;
let decoder: IDecoder<IDecodedMessage>;
@ -940,317 +1130,4 @@ describe("Reliable Channel", () => {
expect(result).to.be.true;
});
});
describe("Irretrievably lost messages", () => {
it("Sends ack once message is marked as irretrievably lost", async function (): Promise<void> {
this.timeout(5000);
sinon.restore();
const commonEventEmitter = new TypedEventEmitter<MockWakuEvents>();
const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter);
// Setup, Alice first
reliableChannelAlice = await ReliableChannel.create(
mockWakuNodeAlice,
"MyChannel",
"alice",
encoder,
decoder,
{
// disable any automation to better control the test
retryIntervalMs: 0,
syncMinIntervalMs: 0,
retrieveFrequencyMs: 0,
processTaskMinElapseMs: 10
}
);
// Bob is offline, Alice sends a message, this is the message we want
// Bob to consider irretrievable in this test.
const message = utf8ToBytes("missing message");
reliableChannelAlice.send(message);
// Wait to be sent
await new Promise((resolve) => {
reliableChannelAlice.addEventListener("message-sent", resolve, {
once: true
});
});
// Now Bob goes online
const mockWakuNodeBob = new MockWakuNode(commonEventEmitter);
reliableChannelBob = await ReliableChannel.create(
mockWakuNodeBob,
"MyChannel",
"bob",
encoder,
decoder,
{
retryIntervalMs: 0, // disable any automation to better control the test
syncMinIntervalMs: 0,
sweepInBufIntervalMs: 20,
processTaskMinElapseMs: 10,
retrieveFrequencyMs: 0,
timeoutForLostMessagesMs: 30
}
);
let messageWithDepRcvd = false;
reliableChannelBob.addEventListener("message-received", (event) => {
if (bytesToUtf8(event.detail.payload) === "message with dep") {
messageWithDepRcvd = true;
}
});
// Alice sends a second message that refers to the first message.
// Bob should emit it, and learn about missing messages, and then finally
// mark it lost
const messageWithDep = utf8ToBytes("message with dep");
const messageWithDepId = reliableChannelAlice.send(messageWithDep);
let messageIsAcknowledged = false;
reliableChannelAlice.messageChannel.addEventListener(
MessageChannelEvent.OutMessageAcknowledged,
(event) => {
if (event.detail == messageWithDepId) {
messageIsAcknowledged = true;
}
}
);
// Wait to be sent
await new Promise((resolve) => {
reliableChannelAlice.addEventListener("message-sent", resolve, {
once: true
});
});
let messageMarkedLost = false;
reliableChannelBob.messageChannel.addEventListener(
MessageChannelEvent.InMessageLost,
(_event) => {
// TODO: check message matches
messageMarkedLost = true;
}
);
while (!messageWithDepRcvd) {
await delay(50);
}
expect(messageWithDepRcvd, "message with dep received and emitted").to.be
.true;
while (!messageMarkedLost) {
await delay(50);
}
expect(messageMarkedLost, "message marked as lost").to.be.true;
// Bob should now include Alice's message in a sync message and ack it
await reliableChannelBob["sendSyncMessage"]();
while (!messageIsAcknowledged) {
await delay(50);
}
expect(messageIsAcknowledged, "message has been acknowledged").to.be.true;
});
});
});
describe("Query On Connect Integration E2E Tests", () => {
let mockWakuNode: MockWakuNode;
let reliableChannel: ReliableChannel<IDecodedMessage>;
let encoder: IEncoder;
let decoder: IDecoder<IDecodedMessage>;
let mockPeerManagerEvents: TypedEventEmitter<any>;
let queryGeneratorStub: sinon.SinonStub;
let mockPeerId: PeerId;
beforeEach(async () => {
// Setup mock waku node with store capability
mockWakuNode = new MockWakuNode();
// Setup mock peer manager events for QueryOnConnect
mockPeerManagerEvents = new TypedEventEmitter();
(mockWakuNode as any).peerManager = {
events: mockPeerManagerEvents
};
// Setup encoder and decoder
encoder = createEncoder({
contentTopic: TEST_CONTENT_TOPIC,
routingInfo: TEST_ROUTING_INFO
});
decoder = createDecoder(TEST_CONTENT_TOPIC, TEST_ROUTING_INFO);
// Setup store with queryGenerator for QueryOnConnect
queryGeneratorStub = sinon.stub();
mockWakuNode.store = {
queryGenerator: queryGeneratorStub
} as any;
mockPeerId = {
toString: () => "QmTestPeerId"
} as unknown as PeerId;
});
afterEach(async () => {
await reliableChannel?.stop();
});
it("should trigger QueryOnConnect when going offline and store peer reconnects", async () => {
// Create a message that will be auto-retrieved
const messageText = "Auto-retrieved message";
const messagePayload = utf8ToBytes(messageText);
const sdsMessage = new ContentMessage(
ReliableChannel.getMessageId(messagePayload),
"testChannel",
"testSender",
[],
1n,
undefined,
messagePayload
);
const autoRetrievedMessage: IDecodedMessage = {
hash: hexToBytes("1234"),
hashStr: "1234",
version: 1,
timestamp: new Date(),
contentTopic: TEST_CONTENT_TOPIC,
pubsubTopic: decoder.pubsubTopic,
payload: sdsMessage.encode(),
rateLimitProof: undefined,
ephemeral: false,
meta: undefined
};
// Setup queryGenerator to return the auto-retrieved message
queryGeneratorStub.callsFake(async function* () {
yield [Promise.resolve(autoRetrievedMessage)];
});
// Create ReliableChannel with queryOnConnect enabled
reliableChannel = await ReliableChannel.create(
mockWakuNode,
"testChannel",
"testSender",
encoder,
decoder
);
// Wait for initial setup
await delay(50);
// Setup complete - focus on testing QueryOnConnect trigger
// Simulate going offline (change health status)
mockWakuNode.events.dispatchEvent(
new CustomEvent("health", { detail: HealthStatus.Unhealthy })
);
await delay(10);
// Simulate store peer reconnection which should trigger QueryOnConnect
mockPeerManagerEvents.dispatchEvent(
new CustomEvent("store:connect", { detail: mockPeerId })
);
// Wait for store query to be triggered
await delay(200);
// Verify that QueryOnConnect was triggered by the conditions
expect(queryGeneratorStub.called).to.be.true;
});
it("should trigger QueryOnConnect when time threshold is exceeded", async () => {
// Create multiple messages that will be auto-retrieved
const message1Text = "First auto-retrieved message";
const message2Text = "Second auto-retrieved message";
const message1Payload = utf8ToBytes(message1Text);
const message2Payload = utf8ToBytes(message2Text);
const sdsMessage1 = new ContentMessage(
ReliableChannel.getMessageId(message1Payload),
"testChannel",
"testSender",
[],
1n,
undefined,
message1Payload
);
const sdsMessage2 = new ContentMessage(
ReliableChannel.getMessageId(message2Payload),
"testChannel",
"testSender",
[],
2n,
undefined,
message2Payload
);
const autoRetrievedMessage1: IDecodedMessage = {
hash: hexToBytes("5678"),
hashStr: "5678",
version: 1,
timestamp: new Date(Date.now() - 1000),
contentTopic: TEST_CONTENT_TOPIC,
pubsubTopic: decoder.pubsubTopic,
payload: sdsMessage1.encode(),
rateLimitProof: undefined,
ephemeral: false,
meta: undefined
};
const autoRetrievedMessage2: IDecodedMessage = {
hash: hexToBytes("9abc"),
hashStr: "9abc",
version: 1,
timestamp: new Date(),
contentTopic: TEST_CONTENT_TOPIC,
pubsubTopic: decoder.pubsubTopic,
payload: sdsMessage2.encode(),
rateLimitProof: undefined,
ephemeral: false,
meta: undefined
};
// Setup queryGenerator to return multiple messages
queryGeneratorStub.callsFake(async function* () {
yield [Promise.resolve(autoRetrievedMessage1)];
yield [Promise.resolve(autoRetrievedMessage2)];
});
// Create ReliableChannel with queryOnConnect enabled
reliableChannel = await ReliableChannel.create(
mockWakuNode,
"testChannel",
"testSender",
encoder,
decoder,
{ queryOnConnect: true }
);
await delay(50);
// Simulate old last successful query by accessing QueryOnConnect internals
// The default threshold is 5 minutes, so we'll set it to an old time
if ((reliableChannel as any).queryOnConnect) {
((reliableChannel as any).queryOnConnect as any).lastSuccessfulQuery =
Date.now() - 6 * 60 * 1000; // 6 minutes ago
}
// Simulate store peer connection which should trigger retrieval due to time threshold
mockPeerManagerEvents.dispatchEvent(
new CustomEvent("store:connect", { detail: mockPeerId })
);
// Wait for store query to be triggered
await delay(200);
// Verify that QueryOnConnect was triggered due to time threshold
expect(queryGeneratorStub.called).to.be.true;
});
});

View File

@ -17,10 +17,9 @@ import {
isContentMessage,
MessageChannel,
MessageChannelEvent,
MessageChannelEvents,
type MessageChannelOptions,
type ParticipantId,
Message as SdsMessage,
type SenderId,
SyncMessage
} from "@waku/sds";
import { Logger } from "@waku/utils";
@ -32,18 +31,14 @@ import {
import { ReliableChannelEvent, ReliableChannelEvents } from "./events.js";
import { MissingMessageRetriever } from "./missing_message_retriever.js";
import { RandomTimeout } from "./random_timeout.js";
import { RetryManager } from "./retry_manager.js";
import { ISyncStatusEvents, SyncStatus } from "./sync_status.js";
const log = new Logger("sdk:reliable-channel");
const DEFAULT_SYNC_MIN_INTERVAL_MS = 30 * 1000; // 30 seconds
const SYNC_INTERVAL_REPAIR_MULTIPLIER = 0.3; // Reduce sync interval when repairs pending
const DEFAULT_RETRY_INTERVAL_MS = 30 * 1000; // 30 seconds
const DEFAULT_MAX_RETRY_ATTEMPTS = 10;
const DEFAULT_SWEEP_IN_BUF_INTERVAL_MS = 5 * 1000;
const DEFAULT_SWEEP_REPAIR_INTERVAL_MS = 10 * 1000; // 10 seconds
const DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS = 1000;
const IRRECOVERABLE_SENDING_ERRORS: LightPushError[] = [
@ -53,15 +48,6 @@ const IRRECOVERABLE_SENDING_ERRORS: LightPushError[] = [
LightPushError.RLN_PROOF_GENERATION
];
/**
* Strategy for retrieving missing messages.
* - 'both': Use SDS-R peer repair and Store queries in parallel (default)
* - 'sds-r-only': Only use SDS-R peer repair
* - 'store-only': Only use Store queries (legacy behavior)
* - 'none': No automatic retrieval
*/
export type RetrievalStrategy = "both" | "sds-r-only" | "store-only" | "none";
export type ReliableChannelOptions = MessageChannelOptions & {
/**
* The minimum interval between 2 sync messages in the channel.
@ -92,7 +78,6 @@ export type ReliableChannelOptions = MessageChannelOptions & {
/**
* How often store queries are done to retrieve missing messages.
* Only applies when retrievalStrategy includes Store ('both' or 'store-only').
*
* @default 10,000 (10 seconds)
*/
@ -126,13 +111,6 @@ export type ReliableChannelOptions = MessageChannelOptions & {
* @default 1000 (1 second)
*/
processTaskMinElapseMs?: number;
/**
* Strategy for retrieving missing messages.
*
* @default 'both'
*/
retrievalStrategy?: RetrievalStrategy;
};
/**
@ -158,34 +136,27 @@ export class ReliableChannel<
callback: Callback<T>
) => Promise<boolean>;
private readonly _unsubscribe?: (
decoders: IDecoder<T> | IDecoder<T>[]
) => Promise<boolean>;
private readonly _retrieve?: <T extends IDecodedMessage>(
decoders: IDecoder<T>[],
options?: Partial<QueryRequestParams>
) => AsyncGenerator<Promise<T | undefined>[]>;
private eventListenerCleanups: Array<() => void> = [];
private syncRandomTimeout: RandomTimeout;
private readonly syncMinIntervalMs: number;
private syncTimeout: ReturnType<typeof setTimeout> | undefined;
private sweepInBufInterval: ReturnType<typeof setInterval> | undefined;
private readonly sweepInBufIntervalMs: number;
private sweepRepairInterval: ReturnType<typeof setInterval> | undefined;
private processTaskTimeout: ReturnType<typeof setTimeout> | undefined;
private readonly retryManager: RetryManager | undefined;
private readonly missingMessageRetriever?: MissingMessageRetriever<T>;
private readonly queryOnConnect?: QueryOnConnect<T>;
private readonly processTaskMinElapseMs: number;
private _started: boolean;
private activePendingProcessTask?: Promise<void>;
private constructor(
public node: IWaku,
public messageChannel: MessageChannel,
private encoder: IEncoder,
private decoder: IDecoder<T>,
private retrievalStrategy: RetrievalStrategy,
options?: ReliableChannelOptions
) {
super();
@ -199,7 +170,6 @@ export class ReliableChannel<
if (node.filter) {
this._subscribe = node.filter.subscribe.bind(node.filter);
this._unsubscribe = node.filter.unsubscribe.bind(node.filter);
} else if (node.relay) {
// TODO: Why do relay and filter have different interfaces?
// this._subscribe = node.relay.subscribeWithUnsubscribe;
@ -225,11 +195,8 @@ export class ReliableChannel<
}
}
this.syncRandomTimeout = new RandomTimeout(
options?.syncMinIntervalMs ?? DEFAULT_SYNC_MIN_INTERVAL_MS,
2,
this.sendSyncMessage.bind(this)
);
this.syncMinIntervalMs =
options?.syncMinIntervalMs ?? DEFAULT_SYNC_MIN_INTERVAL_MS;
this.sweepInBufIntervalMs =
options?.sweepInBufIntervalMs ?? DEFAULT_SWEEP_IN_BUF_INTERVAL_MS;
@ -247,8 +214,7 @@ export class ReliableChannel<
this.processTaskMinElapseMs =
options?.processTaskMinElapseMs ?? DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS;
// Only enable Store retrieval based on strategy
if (this._retrieve && this.shouldUseStore()) {
if (this._retrieve) {
this.missingMessageRetriever = new MissingMessageRetriever(
this.decoder,
options?.retrieveFrequencyMs,
@ -260,22 +226,8 @@ export class ReliableChannel<
}
this._started = false;
this._internalSyncStatus = new SyncStatus();
this.syncStatus = this._internalSyncStatus;
}
/**
* Emit events when the channel is aware of missing message.
* Note that "synced" may mean some messages are irretrievably lost.
* Check the emitted data for details.
*
* @emits [[StatusEvents]]
*
*/
public readonly syncStatus: ISyncStatusEvents;
private readonly _internalSyncStatus: SyncStatus;
public get isStarted(): boolean {
return this._started;
}
@ -312,26 +264,17 @@ export class ReliableChannel<
public static async create<T extends IDecodedMessage>(
node: IWaku,
channelId: ChannelId,
senderId: ParticipantId,
senderId: SenderId,
encoder: IEncoder,
decoder: IDecoder<T>,
options?: ReliableChannelOptions
): Promise<ReliableChannel<T>> {
// Enable SDS-R repair only if retrieval strategy uses it
const retrievalStrategy = options?.retrievalStrategy ?? "both";
const enableRepair =
retrievalStrategy === "both" || retrievalStrategy === "sds-r-only";
const sdsMessageChannel = new MessageChannel(channelId, senderId, {
...options,
enableRepair
});
const sdsMessageChannel = new MessageChannel(channelId, senderId, options);
const messageChannel = new ReliableChannel(
node,
sdsMessageChannel,
encoder,
decoder,
retrievalStrategy,
options
);
@ -441,21 +384,10 @@ export class ReliableChannel<
private async subscribe(): Promise<boolean> {
this.assertStarted();
return this._subscribe(this.decoder, async (message: T) => {
if (!this._started) {
log.info("ReliableChannel stopped, ignoring incoming message");
return;
}
await this.processIncomingMessage(message);
});
}
private async unsubscribe(): Promise<boolean> {
if (!this._unsubscribe) {
throw Error("No unsubscribe method available");
}
return this._unsubscribe(this.decoder);
}
/**
* Don't forget to call `this.messageChannel.sweepIncomingBuffer();` once done.
* @param msg
@ -486,7 +418,6 @@ export class ReliableChannel<
// missing messages or the status of previous outgoing messages
this.messageChannel.pushIncomingMessage(sdsMessage, retrievalHint);
// Remove from Store retriever if message was retrieved
this.missingMessageRetriever?.removeMissingMessage(sdsMessage.messageId);
if (sdsMessage.content && sdsMessage.content.length > 0) {
@ -527,42 +458,26 @@ export class ReliableChannel<
// TODO: For now we only queue process tasks for incoming messages
// As this is where there is most volume
private queueProcessTasks(): void {
if (!this._started) return;
// If one is already queued, then we can ignore it
if (this.processTaskTimeout === undefined) {
this.processTaskTimeout = setTimeout(() => {
this.activePendingProcessTask = this.messageChannel
.processTasks()
.catch((err) => {
log.error("error encountered when processing sds tasks", err);
})
.finally(() => {
this.activePendingProcessTask = undefined;
});
void this.messageChannel.processTasks().catch((err) => {
log.error("error encountered when processing sds tasks", err);
});
// Clear timeout once triggered
this.clearProcessTasks();
clearTimeout(this.processTaskTimeout);
this.processTaskTimeout = undefined;
}, this.processTaskMinElapseMs); // we ensure that we don't call process tasks more than once per second
}
}
private clearProcessTasks(): void {
if (this.processTaskTimeout) {
clearTimeout(this.processTaskTimeout);
this.processTaskTimeout = undefined;
}
}
public async start(): Promise<boolean> {
if (this._started) return true;
this._started = true;
this.setupEventListeners();
this.restartSync();
this.startSweepIncomingBufferLoop();
this.startRepairSweepLoop();
if (this._retrieve) {
this.missingMessageRetriever?.start();
this.queryOnConnect?.start();
@ -570,33 +485,15 @@ export class ReliableChannel<
return this.subscribe();
}
public async stop(): Promise<void> {
public stop(): void {
if (!this._started) return;
log.info("Stopping ReliableChannel...");
this._started = false;
this.removeAllEventListeners();
this.stopSync();
this.stopSweepIncomingBufferLoop();
this.stopRepairSweepLoop();
this.clearProcessTasks();
if (this.activePendingProcessTask) {
await this.activePendingProcessTask;
}
await this.missingMessageRetriever?.stop();
await this.queryOnConnect?.stop();
this.retryManager?.stopAllRetries();
await this.unsubscribe();
this._internalSyncStatus.cleanUp();
log.info("ReliableChannel stopped successfully");
this.missingMessageRetriever?.stop();
this.queryOnConnect?.stop();
// TODO unsubscribe
// TODO unsetMessageListeners
}
private assertStarted(): void {
@ -612,65 +509,34 @@ export class ReliableChannel<
}
private stopSweepIncomingBufferLoop(): void {
if (this.sweepInBufInterval) {
clearInterval(this.sweepInBufInterval);
this.sweepInBufInterval = undefined;
}
}
private startRepairSweepLoop(): void {
if (!this.shouldUseSdsR()) {
return;
}
this.stopRepairSweepLoop();
this.sweepRepairInterval = setInterval(() => {
void this.messageChannel
.sweepRepairIncomingBuffer(async (message) => {
// Rebroadcast the repair message
const wakuMessage = { payload: message.encode() };
const result = await this._send(this.encoder, wakuMessage);
return result.failures.length === 0;
})
.catch((err) => {
log.error("error encountered when sweeping repair buffer", err);
});
}, DEFAULT_SWEEP_REPAIR_INTERVAL_MS);
}
private stopRepairSweepLoop(): void {
if (this.sweepRepairInterval) {
clearInterval(this.sweepRepairInterval);
this.sweepInBufInterval = undefined;
}
}
private shouldUseStore(): boolean {
return (
this.retrievalStrategy === "both" ||
this.retrievalStrategy === "store-only"
);
}
private shouldUseSdsR(): boolean {
return (
this.retrievalStrategy === "both" ||
this.retrievalStrategy === "sds-r-only"
);
if (this.sweepInBufInterval) clearInterval(this.sweepInBufInterval);
}
private restartSync(multiplier: number = 1): void {
// Adaptive sync: use shorter interval when repairs are pending
const hasPendingRepairs =
this.shouldUseSdsR() && this.messageChannel.hasPendingRepairRequests();
const effectiveMultiplier = hasPendingRepairs
? multiplier * SYNC_INTERVAL_REPAIR_MULTIPLIER
: multiplier;
if (this.syncTimeout) {
clearTimeout(this.syncTimeout);
}
if (this.syncMinIntervalMs) {
const timeoutMs = this.random() * this.syncMinIntervalMs * multiplier;
this.syncRandomTimeout.restart(effectiveMultiplier);
this.syncTimeout = setTimeout(() => {
void this.sendSyncMessage();
// Always restart a sync, no matter whether the message was sent.
// Set a multiplier so we wait a bit longer to not hog the conversation
void this.restartSync(2);
}, timeoutMs);
}
}
private stopSync(): void {
this.syncRandomTimeout.stop();
if (this.syncTimeout) {
clearTimeout(this.syncTimeout);
}
}
// Used to enable overriding when testing
private random(): number {
return Math.random();
}
private safeSendEvent<T extends ReliableChannelEvent>(
@ -729,36 +595,20 @@ export class ReliableChannel<
return sdsMessage.causalHistory && sdsMessage.causalHistory.length > 0;
}
private addTrackedEventListener<K extends keyof MessageChannelEvents>(
eventName: K,
listener: (event: MessageChannelEvents[K]) => void
): void {
this.messageChannel.addEventListener(eventName, listener as any);
this.eventListenerCleanups.push(() => {
this.messageChannel.removeEventListener(eventName, listener as any);
});
}
private setupEventListeners(): void {
this.addTrackedEventListener(
this.messageChannel.addEventListener(
MessageChannelEvent.OutMessageSent,
(event) => {
if (isContentMessage(event.detail)) {
if (event.detail.content) {
const messageId = ReliableChannel.getMessageId(event.detail.content);
this.safeSendEvent("message-sent", {
detail: messageId
});
// restart the timeout when a content message has been sent
// because the functionality is fulfilled (content message contains
// causal history)
this.restartSync();
}
}
);
this.addTrackedEventListener(
this.messageChannel.addEventListener(
MessageChannelEvent.OutMessageAcknowledged,
(event) => {
if (event.detail) {
@ -766,13 +616,13 @@ export class ReliableChannel<
detail: event.detail
});
// Stopping retries as the message was acknowledged
// Stopping retries
this.retryManager?.stopRetries(event.detail);
}
}
);
this.addTrackedEventListener(
this.messageChannel.addEventListener(
MessageChannelEvent.OutMessagePossiblyAcknowledged,
(event) => {
if (event.detail) {
@ -786,7 +636,7 @@ export class ReliableChannel<
}
);
this.addTrackedEventListener(
this.messageChannel.addEventListener(
MessageChannelEvent.InSyncReceived,
(_event) => {
// restart the timeout when a sync message has been received
@ -794,10 +644,9 @@ export class ReliableChannel<
}
);
this.addTrackedEventListener(
this.messageChannel.addEventListener(
MessageChannelEvent.InMessageReceived,
(event) => {
this._internalSyncStatus.onMessagesReceived(event.detail.messageId);
// restart the timeout when a content message has been received
if (isContentMessage(event.detail)) {
// send a sync message faster to ack someone's else
@ -806,16 +655,20 @@ export class ReliableChannel<
}
);
this.addTrackedEventListener(
this.messageChannel.addEventListener(
MessageChannelEvent.OutMessageSent,
(event) => {
// restart the timeout when a content message has been sent
if (isContentMessage(event.detail)) {
this.restartSync();
}
}
);
this.messageChannel.addEventListener(
MessageChannelEvent.InMessageMissing,
(event) => {
this._internalSyncStatus.onMessagesMissing(
...event.detail.map((m) => m.messageId)
);
for (const { messageId, retrievalHint } of event.detail) {
// Store retrieval (for 'both' and 'store-only' strategies)
// SDS-R repair happens automatically via RepairManager for 'both' and 'sds-r-only'
if (retrievalHint && this.missingMessageRetriever) {
this.missingMessageRetriever.addMissingMessage(
messageId,
@ -826,39 +679,13 @@ export class ReliableChannel<
}
);
this.addTrackedEventListener(MessageChannelEvent.InMessageLost, (event) => {
this._internalSyncStatus.onMessagesLost(
...event.detail.map((m) => m.messageId)
);
});
if (this.queryOnConnect) {
const queryListener = (event: any): void => {
void this.processIncomingMessages(event.detail);
};
this.queryOnConnect.addEventListener(
QueryOnConnectEvent.MessagesRetrieved,
queryListener
(event) => {
void this.processIncomingMessages(event.detail);
}
);
this.eventListenerCleanups.push(() => {
this.queryOnConnect?.removeEventListener(
QueryOnConnectEvent.MessagesRetrieved,
queryListener
);
});
}
}
private removeAllEventListeners(): void {
for (const cleanup of this.eventListenerCleanups) {
try {
cleanup();
} catch (error) {
log.error("error removing event listener:", error);
}
}
this.eventListenerCleanups = [];
}
}

View File

@ -66,7 +66,7 @@ describe("Reliable Channel: Sync", () => {
});
while (!messageSent) {
await delay(10);
await delay(50);
}
let syncMessageSent = false;

View File

@ -1,207 +0,0 @@
import { createDecoder, createEncoder } from "@waku/core";
import {
AutoSharding,
IDecodedMessage,
IDecoder,
IEncoder
} from "@waku/interfaces";
import { createRoutingInfo, delay, MockWakuNode } from "@waku/utils";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import { beforeEach, describe } from "mocha";
import {
createMockNodes,
sendAndWaitForEvent,
TEST_CONSTANTS,
waitFor
} from "./test_utils.js";
import { ReliableChannel, StatusDetail } from "./index.js";
const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto";
const TEST_NETWORK_CONFIG: AutoSharding = {
clusterId: 0,
numShardsInCluster: 1
};
const TEST_ROUTING_INFO = createRoutingInfo(TEST_NETWORK_CONFIG, {
contentTopic: TEST_CONTENT_TOPIC
});
describe("Sync Status", () => {
let encoder: IEncoder;
let decoder: IDecoder<IDecodedMessage>;
let mockWakuNodeAlice: MockWakuNode;
let mockWakuNodeBob: MockWakuNode;
let reliableChannelAlice: ReliableChannel<any> | undefined;
let reliableChannelBob: ReliableChannel<any> | undefined;
beforeEach(async () => {
encoder = createEncoder({
contentTopic: TEST_CONTENT_TOPIC,
routingInfo: TEST_ROUTING_INFO
});
decoder = createDecoder(TEST_CONTENT_TOPIC, TEST_ROUTING_INFO);
const mockNodes = createMockNodes();
mockWakuNodeAlice = mockNodes.alice;
mockWakuNodeBob = mockNodes.bob;
});
afterEach(async () => {
if (reliableChannelAlice) {
await reliableChannelAlice.stop();
reliableChannelAlice = undefined;
}
if (reliableChannelBob) {
await reliableChannelBob.stop();
reliableChannelBob = undefined;
}
});
it("Synced status is emitted when a message is received", async () => {
reliableChannelAlice = await ReliableChannel.create(
mockWakuNodeAlice,
"MyChannel",
"alice",
encoder,
decoder
);
reliableChannelBob = await ReliableChannel.create(
mockWakuNodeBob,
"MyChannel",
"bob",
encoder,
decoder
);
let statusDetail: StatusDetail | undefined;
reliableChannelBob.syncStatus.addEventListener("synced", (event) => {
statusDetail = event.detail;
});
const message = utf8ToBytes("message in channel");
reliableChannelAlice.send(message);
await waitFor(() => statusDetail);
expect(statusDetail!.received).to.eq(1);
});
it("Synced status is emitted when a missing message is received", async () => {
reliableChannelAlice = await ReliableChannel.create(
mockWakuNodeAlice,
"MyChannel",
"alice",
encoder,
decoder,
{
retryIntervalMs: TEST_CONSTANTS.RETRY_INTERVAL_MS
}
);
// Send a message before Bob goes online so it's marked as missing
await sendAndWaitForEvent(
reliableChannelAlice,
utf8ToBytes("missing message")
);
reliableChannelBob = await ReliableChannel.create(
mockWakuNodeBob,
"MyChannel",
"bob",
encoder,
decoder
);
let syncingStatusDetail: StatusDetail | undefined;
reliableChannelBob.syncStatus.addEventListener("syncing", (event) => {
syncingStatusDetail = event.detail;
});
let syncedStatusDetail: StatusDetail | undefined;
reliableChannelBob.syncStatus.addEventListener("synced", (event) => {
syncedStatusDetail = event.detail;
});
await sendAndWaitForEvent(
reliableChannelAlice,
utf8ToBytes("second message with missing message as dep")
);
await waitFor(() => syncingStatusDetail);
expect(syncingStatusDetail!.missing).to.eq(1);
expect(syncingStatusDetail!.received).to.eq(1);
await waitFor(() => syncedStatusDetail);
expect(syncedStatusDetail!.missing).to.eq(0);
expect(syncedStatusDetail!.received).to.eq(2);
});
it("Synced status is emitted when a missing message is marked as lost", async () => {
reliableChannelAlice = await ReliableChannel.create(
mockWakuNodeAlice,
"MyChannel",
"alice",
encoder,
decoder,
{
syncMinIntervalMs: 0,
retryIntervalMs: 0 // Do not retry so we can lose the message
}
);
// Send a message before Bob goes online so it's marked as missing
await sendAndWaitForEvent(
reliableChannelAlice,
utf8ToBytes("missing message")
);
reliableChannelBob = await ReliableChannel.create(
mockWakuNodeBob,
"MyChannel",
"bob",
encoder,
decoder,
{
retrieveFrequencyMs: 0,
syncMinIntervalMs: 0,
sweepInBufIntervalMs: 0, // we want to control this
timeoutForLostMessagesMs: 200 // timeout within the test
}
);
let syncingStatusDetail: StatusDetail | undefined;
reliableChannelBob.syncStatus.addEventListener("syncing", (event) => {
syncingStatusDetail = event.detail;
});
await sendAndWaitForEvent(
reliableChannelAlice,
utf8ToBytes("second message with missing message as dep")
);
await waitFor(() => syncingStatusDetail);
expect(syncingStatusDetail!.missing).to.eq(1, "at first, one missing");
expect(syncingStatusDetail!.received).to.eq(1, "at first, one received");
expect(syncingStatusDetail!.lost).to.eq(0, "at first, no loss");
let syncedStatusDetail: StatusDetail | undefined;
reliableChannelBob.syncStatus.addEventListener("synced", (event) => {
syncedStatusDetail = event.detail;
});
// await long enough so message will be marked as lost
await delay(200);
reliableChannelBob.messageChannel["sweepIncomingBuffer"]();
await waitFor(() => syncedStatusDetail);
expect(syncedStatusDetail!.missing).to.eq(0, "no more missing message");
expect(syncedStatusDetail!.received).to.eq(1, "still one received message");
expect(syncedStatusDetail!.lost).to.eq(1, "missing message is marked lost");
});
});

View File

@ -24,33 +24,19 @@ export class RetryManager {
const timeout = this.timeouts.get(id);
if (timeout) {
clearTimeout(timeout);
this.timeouts.delete(id);
}
}
public stopAllRetries(): void {
for (const [_id, timeout] of this.timeouts.entries()) {
clearTimeout(timeout);
}
this.timeouts.clear();
}
public startRetries(id: string, retry: () => void | Promise<void>): void {
this.retry(id, retry, 0);
}
public stop(): void {
for (const timeout of this.timeouts.values()) {
clearTimeout(timeout);
}
}
private retry(
id: string,
retry: () => void | Promise<void>,
attemptNumber: number
): void {
this.stopRetries(id);
clearTimeout(this.timeouts.get(id));
if (attemptNumber < this.maxRetryNumber) {
const interval = setTimeout(() => {
void retry();

View File

@ -1,189 +0,0 @@
import { MessageId } from "@waku/sds";
import { delay } from "@waku/utils";
import { expect } from "chai";
import { StatusDetail, StatusEvents, SyncStatus } from "./sync_status.js";
async function testSyncStatus(
syncStatus: SyncStatus,
statusEvent: keyof StatusEvents,
onMessageFn: (...msgIds: MessageId[]) => void,
expectedStatusDetail: Partial<StatusDetail>,
...messageIds: MessageId[]
): Promise<void> {
let statusDetail: StatusDetail;
syncStatus.addEventListener(statusEvent, (event) => {
statusDetail = event.detail;
});
onMessageFn.bind(syncStatus)(...messageIds);
while (!statusDetail!) {
await delay(10);
}
expect(statusDetail.received).to.eq(expectedStatusDetail.received ?? 0);
expect(statusDetail.missing).to.eq(expectedStatusDetail.missing ?? 0);
expect(statusDetail.lost).to.eq(expectedStatusDetail.lost ?? 0);
}
describe("Sync Status", () => {
let syncStatus: SyncStatus;
beforeEach(() => {
syncStatus = new SyncStatus();
});
afterEach(() => {
syncStatus.cleanUp();
});
it("Emits 'synced' when new message received", async () => {
await testSyncStatus(
syncStatus,
"synced",
syncStatus.onMessagesReceived,
{ received: 1 },
"123"
);
});
it("Emits 'syncing' when message flagged as missed", async () => {
await testSyncStatus(
syncStatus,
"syncing",
syncStatus.onMessagesMissing,
{ missing: 1 },
"123"
);
});
it("Emits 'synced' when message flagged as lost", async () => {
await testSyncStatus(
syncStatus,
"synced",
syncStatus.onMessagesLost,
{ lost: 1 },
"123"
);
});
it("Emits 'syncing' then 'synced' when message flagged as missing and then received", async () => {
await testSyncStatus(
syncStatus,
"syncing",
syncStatus.onMessagesMissing,
{ missing: 1 },
"123"
);
await testSyncStatus(
syncStatus,
"synced",
syncStatus.onMessagesReceived,
{ received: 1 },
"123"
);
});
it("Emits 'syncing' then 'synced' when message flagged as missing and then lost", async () => {
await testSyncStatus(
syncStatus,
"syncing",
syncStatus.onMessagesMissing,
{ missing: 1 },
"123"
);
await testSyncStatus(
syncStatus,
"synced",
syncStatus.onMessagesLost,
{ lost: 1 },
"123"
);
});
it("Emits 'synced' then 'synced' when message flagged as lost and then received", async () => {
await testSyncStatus(
syncStatus,
"synced",
syncStatus.onMessagesLost,
{ lost: 1 },
"123"
);
await testSyncStatus(
syncStatus,
"synced",
syncStatus.onMessagesReceived,
{ received: 1 },
"123"
);
});
it("Emits 'syncing' until all messages are received or lost", async () => {
await testSyncStatus(
syncStatus,
"synced",
syncStatus.onMessagesReceived,
{ received: 1 },
"1"
);
await testSyncStatus(
syncStatus,
"syncing",
syncStatus.onMessagesMissing,
{ received: 1, missing: 3 },
"2",
"3",
"4"
);
await testSyncStatus(
syncStatus,
"syncing",
syncStatus.onMessagesReceived,
{ received: 2, missing: 2 },
"2"
);
await testSyncStatus(
syncStatus,
"syncing",
syncStatus.onMessagesReceived,
{ received: 3, missing: 1 },
"3"
);
await testSyncStatus(
syncStatus,
"synced",
syncStatus.onMessagesLost,
{ received: 3, lost: 1 },
"4"
);
});
it("Debounces events when receiving batch of messages", async () => {
let eventCount = 0;
let statusDetail: StatusDetail | undefined;
syncStatus.addEventListener("synced", (event) => {
eventCount++;
statusDetail = event.detail;
});
// Process 100 messages in the same task
for (let i = 0; i < 100; i++) {
syncStatus.onMessagesReceived(`msg-${i}`);
}
// Wait for microtask to complete
await delay(10);
// Should only emit 1 event despite 100 calls
expect(eventCount).to.eq(1, "Should only emit one event for batch");
expect(statusDetail!.received).to.eq(100, "Should track all 100 messages");
});
});

View File

@ -1,163 +0,0 @@
import { TypedEventEmitter } from "@libp2p/interface";
import { MessageId } from "@waku/sds";
import { Logger } from "@waku/utils";
const log = new Logger("sds:sync-status");
export const StatusEvent = {
/**
* We are not aware of any missing messages that we may be able to get
* We MAY have messages lost forever, see the `event.detail`
*/
Synced: "synced", // TODO or synced or health or caught-up?
/**
* We are aware of missing messages that we may be able to get
*/
Syncing: "syncing" // TODO: it assumes "syncing" is happening via SDS repair or store queries
};
export type StatusEvent = (typeof StatusEvent)[keyof typeof StatusEvent];
export type StatusDetail = {
/**
* number of received messages
*/
received: number;
/**
* number of missing messages that are not yet considered as irretrievably lost
*/
missing: number;
/**
* number of messages considered as irretrievably lost
*/
lost: number;
};
export interface StatusEvents {
synced: CustomEvent<StatusDetail>;
syncing: CustomEvent<StatusDetail>;
}
/**
* Read-only interface for sync status events.
* Only exposes event listener methods, hiding internal state management.
*/
export interface ISyncStatusEvents {
addEventListener(
event: "synced",
callback: (e: CustomEvent<StatusDetail>) => void
): void;
addEventListener(
event: "syncing",
callback: (e: CustomEvent<StatusDetail>) => void
): void;
removeEventListener(
event: "synced",
callback: (e: CustomEvent<StatusDetail>) => void
): void;
removeEventListener(
event: "syncing",
callback: (e: CustomEvent<StatusDetail>) => void
): void;
}
export class SyncStatus extends TypedEventEmitter<StatusEvents> {
private readonly receivedMessages: Set<MessageId>;
private readonly missingMessages: Set<MessageId>;
private readonly lostMessages: Set<MessageId>;
private sendScheduled = false;
private cleaned = false;
public constructor() {
super();
this.receivedMessages = new Set();
this.missingMessages = new Set();
this.lostMessages = new Set();
}
/**
* Cleanup all tracked message IDs. Should be called when stopping the channel.
*/
public cleanUp(): void {
// Mark as cleaned to prevent any pending microtasks from firing
this.cleaned = true;
this.receivedMessages.clear();
this.missingMessages.clear();
this.lostMessages.clear();
}
public onMessagesReceived(...messageIds: MessageId[]): void {
for (const messageId of messageIds) {
this.missingMessages.delete(messageId);
this.lostMessages.delete(messageId);
this.receivedMessages.add(messageId);
}
this.scheduleSend();
}
public onMessagesMissing(...messageIds: MessageId[]): void {
for (const messageId of messageIds) {
if (
!this.receivedMessages.has(messageId) &&
!this.lostMessages.has(messageId)
) {
this.missingMessages.add(messageId);
} else {
log.error(
"A message previously received or lost has been marked as missing",
messageId
);
}
}
this.scheduleSend();
}
public onMessagesLost(...messageIds: MessageId[]): void {
for (const messageId of messageIds) {
this.missingMessages.delete(messageId);
this.lostMessages.add(messageId);
}
this.scheduleSend();
}
/**
* Schedule an event to be sent on the next microtask.
* Multiple calls within the same task will result in only one event being sent.
* This prevents event spam when processing batches of messages.
*/
private scheduleSend(): void {
if (!this.sendScheduled) {
this.sendScheduled = true;
queueMicrotask(() => {
this.sendScheduled = false;
this.safeSend();
});
}
}
private safeSend(): void {
// Don't send events if cleanup was already called
if (this.cleaned) {
return;
}
const statusEvent =
this.missingMessages.size === 0
? StatusEvent.Synced
: StatusEvent.Syncing;
try {
this.dispatchEvent(
new CustomEvent(statusEvent, {
detail: {
received: this.receivedMessages.size,
missing: this.missingMessages.size,
lost: this.lostMessages.size
}
})
);
} catch (error) {
log.error(`Failed to dispatch sync status:`, error);
}
}
}

View File

@ -1,68 +0,0 @@
import { TypedEventEmitter } from "@libp2p/interface";
import { delay, MockWakuEvents, MockWakuNode } from "@waku/utils";
import { ReliableChannel } from "./reliable_channel.js";
export const TEST_CONSTANTS = {
POLL_INTERVAL_MS: 50,
RETRY_INTERVAL_MS: 300
} as const;
/**
* Wait for a condition to become truthy, with timeout
* @param condition Function that returns the value when ready, or undefined while waiting
* @param timeoutMs Maximum time to wait before throwing
* @returns The value returned by condition
* @throws Error if timeout is reached
*/
export async function waitFor<T>(
condition: () => T | undefined,
timeoutMs = 5000
): Promise<T> {
const start = Date.now();
while (!condition()) {
if (Date.now() - start > timeoutMs) {
throw new Error(
`Timeout after ${timeoutMs}ms waiting for condition to be met`
);
}
await delay(TEST_CONSTANTS.POLL_INTERVAL_MS);
}
return condition()!;
}
/**
* Send a message and wait for the "message-sent" event
* @param channel The ReliableChannel to send from
* @param message The message payload to send
*/
export async function sendAndWaitForEvent(
channel: ReliableChannel<any>,
message: Uint8Array
): Promise<void> {
return new Promise((resolve) => {
const handler = (): void => {
channel.removeEventListener("message-sent", handler);
resolve();
};
channel.addEventListener("message-sent", handler);
channel.send(message);
});
}
/**
* Create a common event emitter and two mock Waku nodes
* @returns Object containing the emitter and two mock nodes (alice and bob)
*/
export function createMockNodes(): {
emitter: TypedEventEmitter<MockWakuEvents>;
alice: MockWakuNode;
bob: MockWakuNode;
} {
const emitter = new TypedEventEmitter<MockWakuEvents>();
return {
emitter,
alice: new MockWakuNode(emitter),
bob: new MockWakuNode(emitter)
};
}

View File

@ -46,10 +46,6 @@ export class Store implements IStore {
return this.protocol.multicodec;
}
public stop(): void {
this.protocol.stop();
}
/**
* Queries the Waku Store for historical messages using the provided decoders and options.
* Returns an asynchronous generator that yields promises of decoded messages.

View File

@ -19,10 +19,6 @@ describe("waitForRemotePeer", () => {
eventTarget = new EventTarget();
});
afterEach(() => {
sinon.restore();
});
it("should reject if WakuNode is not started", async () => {
const wakuMock = mockWakuNode({
connections: [{}]

View File

@ -232,9 +232,7 @@ export class WakuNode implements IWaku {
this._nodeStateLock = true;
this.lightPush?.stop();
this.store?.stop();
await this.filter?.stop();
await this.relay?.stop();
this.healthIndicator.stop();
this.peerManager.stop();
this.connectionManager.stop();

View File

@ -12,8 +12,10 @@ export enum MessageChannelEvent {
InMessageLost = "sds:in:message-irretrievably-lost",
ErrorTask = "sds:error-task",
// SDS-R Repair Events
RepairRequestQueued = "sds:repair:request-queued",
RepairRequestSent = "sds:repair:request-sent",
RepairRequestReceived = "sds:repair:request-received",
RepairResponseQueued = "sds:repair:response-queued",
RepairResponseSent = "sds:repair:response-sent"
}
@ -31,6 +33,10 @@ export type MessageChannelEvents = {
[MessageChannelEvent.OutSyncSent]: CustomEvent<Message>;
[MessageChannelEvent.InSyncReceived]: CustomEvent<Message>;
[MessageChannelEvent.ErrorTask]: CustomEvent<unknown>;
[MessageChannelEvent.RepairRequestQueued]: CustomEvent<{
messageId: MessageId;
tReq: number;
}>;
[MessageChannelEvent.RepairRequestSent]: CustomEvent<{
messageIds: MessageId[];
carrierMessageId: MessageId;
@ -39,6 +45,10 @@ export type MessageChannelEvents = {
messageIds: MessageId[];
fromSenderId?: ParticipantId;
}>;
[MessageChannelEvent.RepairResponseQueued]: CustomEvent<{
messageId: MessageId;
tResp: number;
}>;
[MessageChannelEvent.RepairResponseSent]: CustomEvent<{
messageId: MessageId;
}>;

View File

@ -1,50 +0,0 @@
import { expect } from "chai";
import { MemLocalHistory } from "./mem_local_history.js";
import { ContentMessage } from "./message.js";
describe("MemLocalHistory", () => {
it("Cap max size when messages are pushed one at a time", () => {
const maxSize = 2;
const hist = new MemLocalHistory(maxSize);
hist.push(
new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1]))
);
expect(hist.length).to.eq(1);
hist.push(
new ContentMessage("2", "c", "a", [], 2n, undefined, new Uint8Array([2]))
);
expect(hist.length).to.eq(2);
hist.push(
new ContentMessage("3", "c", "a", [], 3n, undefined, new Uint8Array([3]))
);
expect(hist.length).to.eq(2);
expect(hist.findIndex((m) => m.messageId === "1")).to.eq(-1);
expect(hist.findIndex((m) => m.messageId === "2")).to.not.eq(-1);
expect(hist.findIndex((m) => m.messageId === "3")).to.not.eq(-1);
});
it("Cap max size when a pushed array is exceeding the cap", () => {
const maxSize = 2;
const hist = new MemLocalHistory(maxSize);
hist.push(
new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1]))
);
expect(hist.length).to.eq(1);
hist.push(
new ContentMessage("2", "c", "a", [], 2n, undefined, new Uint8Array([2])),
new ContentMessage("3", "c", "a", [], 3n, undefined, new Uint8Array([3]))
);
expect(hist.length).to.eq(2);
expect(hist.findIndex((m) => m.messageId === "1")).to.eq(-1);
expect(hist.findIndex((m) => m.messageId === "2")).to.not.eq(-1);
expect(hist.findIndex((m) => m.messageId === "3")).to.not.eq(-1);
});
});

View File

@ -2,31 +2,18 @@ import _ from "lodash";
import { ContentMessage, isContentMessage } from "./message.js";
export const DEFAULT_MAX_LENGTH = 10_000;
/**
* In-Memory implementation of a local history of messages.
* In-Memory implementation of a local store of messages.
*
* Messages are store in SDS chronological order:
* - messages[0] is the oldest message
* - messages[n] is the newest message
*
* Only stores content message: `message.lamportTimestamp` and `message.content` are present.
*
* Oldest messages are dropped when `maxLength` is reached.
* If an array of items longer than `maxLength` is pushed, dropping will happen
* at next push.
*/
export class MemLocalHistory {
private items: ContentMessage[] = [];
/**
* Construct a new in-memory local history
*
* @param maxLength The maximum number of message to store.
*/
public constructor(private maxLength: number = DEFAULT_MAX_LENGTH) {}
public get length(): number {
return this.items.length;
}
@ -46,12 +33,6 @@ export class MemLocalHistory {
// Remove duplicates by messageId while maintaining order
this.items = _.uniqBy(combinedItems, "messageId");
// Let's drop older messages if max length is reached
if (this.length > this.maxLength) {
const numItemsToRemove = this.length - this.maxLength;
this.items.splice(0, numItemsToRemove);
}
return this.items.length;
}

View File

@ -128,7 +128,13 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
// Only construct RepairManager if repair is enabled (default: true)
if (options.enableRepair ?? true) {
this.repairManager = new RepairManager(senderId, options.repairConfig);
this.repairManager = new RepairManager(
senderId,
options.repairConfig,
(event: string, detail: unknown) => {
this.safeSendEvent(event as MessageChannelEvent, { detail });
}
);
}
}
@ -136,14 +142,6 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
return bytesToHex(sha256(payload));
}
/**
* Check if there are pending repair requests that need to be sent.
* Useful for adaptive sync intervals - increase frequency when repairs pending.
*/
public hasPendingRepairRequests(currentTime = Date.now()): boolean {
return this.repairManager?.hasRequestsReady(currentTime) ?? false;
}
/**
* Processes all queued tasks sequentially to ensure proper message ordering.
*
@ -285,7 +283,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
/**
* Processes messages in the incoming buffer, delivering those with satisfied dependencies.
*
* @returns The missing dependencies
* @returns Array of history entries for messages still missing dependencies
*/
public sweepIncomingBuffer(): HistoryEntry[] {
const { buffer, missing } = this.incomingBuffer.reduce<{
@ -321,8 +319,8 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
})
);
// Optionally, if a message did not get its dependencies fulfilled after a predetermined amount of time,
// they are marked as irretrievably lost (implicitly by removing it from the buffer without delivery)
// Optionally, if a message has not been received after a predetermined amount of time,
// its dependencies are marked as irretrievably lost (implicitly by removing it from the buffer without delivery)
if (this.timeoutForLostMessagesMs) {
const timeReceived = this.timeReceived.get(message.messageId);
if (
@ -332,19 +330,9 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
this.safeSendEvent(MessageChannelEvent.InMessageLost, {
detail: Array.from(missingDependencies)
});
// We deliver the message to resume participation in the log
if (isContentMessage(message) && this.deliverMessage(message)) {
this.safeSendEvent(MessageChannelEvent.InMessageDelivered, {
detail: message.messageId
});
}
// The message and its missing dependencies are dropped
// from the incoming buffer
return { buffer, missing };
}
}
missingDependencies.forEach((dependency) => {
missing.add(dependency);
});

View File

@ -20,6 +20,11 @@ const log = new Logger("sds:repair:manager");
*/
const PARTICIPANTS_PER_RESPONSE_GROUP = 128;
/**
* Event emitter callback for repair events
*/
export type RepairEventEmitter = (event: string, detail: unknown) => void;
/**
* Configuration for SDS-R repair protocol
*/
@ -53,10 +58,16 @@ export class RepairManager {
private readonly config: Required<RepairConfig>;
private readonly outgoingBuffer: OutgoingRepairBuffer;
private readonly incomingBuffer: IncomingRepairBuffer;
private readonly eventEmitter?: RepairEventEmitter;
public constructor(participantId: ParticipantId, config: RepairConfig = {}) {
public constructor(
participantId: ParticipantId,
config: RepairConfig = {},
eventEmitter?: RepairEventEmitter
) {
this.participantId = participantId;
this.config = { ...DEFAULT_REPAIR_CONFIG, ...config };
this.eventEmitter = eventEmitter;
this.outgoingBuffer = new OutgoingRepairBuffer(this.config.bufferSize);
this.incomingBuffer = new IncomingRepairBuffer(this.config.bufferSize);
@ -131,13 +142,19 @@ export class RepairManager {
// Calculate when to request this repair
const tReq = this.calculateTReq(entry.messageId, currentTime);
// Add to outgoing buffer - only log if actually added
// Add to outgoing buffer - only log and emit event if actually added
const wasAdded = this.outgoingBuffer.add(entry, tReq);
if (wasAdded) {
log.info(
`Added missing dependency ${entry.messageId} to repair buffer with T_req=${tReq}`
);
// Emit event
this.eventEmitter?.("RepairRequestQueued", {
messageId: entry.messageId,
tReq
});
}
}
}
@ -221,13 +238,19 @@ export class RepairManager {
currentTime
);
// Add to incoming buffer - only log if actually added
// Add to incoming buffer - only log and emit event if actually added
const wasAdded = this.incomingBuffer.add(request, tResp);
if (wasAdded) {
log.info(
`Will respond to repair request for ${request.messageId} at T_resp=${tResp}`
);
// Emit event
this.eventEmitter?.("RepairResponseQueued", {
messageId: request.messageId,
tResp
});
}
}
}
@ -305,12 +328,4 @@ export class RepairManager {
`Updated response groups to ${this.config.numResponseGroups} for ${numParticipants} participants`
);
}
/**
* Check if there are repair requests ready to be sent
*/
public hasRequestsReady(currentTime = Date.now()): boolean {
const items = this.outgoingBuffer.getItems();
return items.length > 0 && items[0].tReq <= currentTime;
}
}

View File

@ -70,16 +70,14 @@ describe("Peer Cache Discovery", function () {
it("should discover peers from provided peer cache", async function () {
const mockCache = new MockPeerCache();
const peerId1 = (await nwaku1.getPeerId()).toString();
const peerId2 = (await nwaku2.getPeerId()).toString();
mockCache.set([
{
id: peerId1,
id: (await nwaku1.getPeerId()).toString(),
multiaddrs: [(await nwaku1.getMultiaddrWithId()).toString()]
},
{
id: peerId2,
id: (await nwaku2.getPeerId()).toString(),
multiaddrs: [(await nwaku2.getMultiaddrWithId()).toString()]
}
]);
@ -98,22 +96,21 @@ describe("Peer Cache Discovery", function () {
const discoveredPeers = new Set<string>();
await new Promise<void>((resolve) => {
waku.libp2p.addEventListener("peer:identify", (evt) => {
discoveredPeers.add(evt.detail.peerId.toString());
const peerId = evt.detail.peerId;
discoveredPeers.add(peerId.toString());
if (discoveredPeers.has(peerId1) && discoveredPeers.has(peerId2)) {
if (discoveredPeers.size === 2) {
resolve();
}
});
});
expect(dialPeerSpy.callCount).to.be.greaterThanOrEqual(2);
expect(discoveredPeers).to.include(peerId1);
expect(discoveredPeers).to.include(peerId2);
expect(dialPeerSpy.callCount).to.equal(2);
expect(discoveredPeers.size).to.equal(2);
});
it("should monitor connected peers and store them into cache", async function () {
const mockCache = new MockPeerCache();
const targetPeerId = (await nwaku2.getPeerId()).toString();
waku = await createLightNode({
networkConfig: DefaultTestNetworkConfig,
@ -129,18 +126,19 @@ describe("Peer Cache Discovery", function () {
await new Promise<void>((resolve) => {
waku.libp2p.addEventListener("peer:identify", (evt) => {
discoveredPeers.add(evt.detail.peerId.toString());
const peerId = evt.detail.peerId;
discoveredPeers.add(peerId.toString());
if (discoveredPeers.has(targetPeerId)) {
if (discoveredPeers.size === 1) {
resolve();
}
});
});
expect(discoveredPeers).to.include(targetPeerId);
expect(discoveredPeers.size).to.equal(1);
const cachedPeers = mockCache.get();
const isTargetCached = cachedPeers.some((p) => p.id === targetPeerId);
expect(isTargetCached).to.be.true;
expect(cachedPeers.length).to.equal(1);
expect(discoveredPeers.has(cachedPeers[0].id)).to.be.true;
});
});

View File

@ -170,7 +170,7 @@ describe("Wait for remote peer", function () {
await nwaku.start({
lightpush: true,
filter: false,
relay: true,
relay: false,
store: false
});
const multiAddrWithId = await nwaku.getMultiaddrWithId();
@ -229,7 +229,7 @@ describe("Wait for remote peer", function () {
await nwaku.start({
filter: true,
lightpush: true,
relay: true
relay: false
// store: true
});
const multiAddrWithId = await nwaku.getMultiaddrWithId();

View File

@ -57,8 +57,7 @@ describe("Waku Dial [node only]", function () {
await nwaku.start({
filter: true,
store: true,
lightpush: true,
relay: true
lightpush: true
});
const multiAddrWithId = await nwaku.getMultiaddrWithId();
@ -92,8 +91,7 @@ describe("Waku Dial [node only]", function () {
await nwaku.start({
filter: true,
store: true,
lightpush: true,
relay: true
lightpush: true
});
const multiAddrWithId = await nwaku.getMultiaddrWithId();

View File

@ -59,11 +59,10 @@ export class MockWakuNode implements IWaku {
unsubscribe<T extends IDecodedMessage>(
_decoders: IDecoder<T> | IDecoder<T>[]
): Promise<boolean> {
// The expectation is that it does not matter for tests
return Promise.resolve(true);
throw "Not implemented";
},
unsubscribeAll(): void {
throw "unsubscribeAll not implemented";
throw "Not implemented";
}
};
}
@ -139,7 +138,7 @@ export class MockWakuNode implements IWaku {
return Promise.resolve();
}
public stop(): Promise<void> {
return Promise.resolve();
throw new Error("Method not implemented.");
}
public waitForPeers(
_protocols?: Protocols[],