From 9f3c89e458693ccb92a3b508275a10549daaa49f Mon Sep 17 00:00:00 2001 From: Arseniy Klempner Date: Tue, 15 Jul 2025 20:20:39 -0700 Subject: [PATCH] add agent rules --- JS_WAKU_AGENT_RULES.md | 222 +++++++++++++ WAKU_LIBP2P_INTEGRATION_PATTERNS.md | 463 ++++++++++++++++++++++++++++ WAKU_PROTOCOL_STACK_ARCHITECTURE.md | 387 +++++++++++++++++++++++ 3 files changed, 1072 insertions(+) create mode 100644 JS_WAKU_AGENT_RULES.md create mode 100644 WAKU_LIBP2P_INTEGRATION_PATTERNS.md create mode 100644 WAKU_PROTOCOL_STACK_ARCHITECTURE.md diff --git a/JS_WAKU_AGENT_RULES.md b/JS_WAKU_AGENT_RULES.md new file mode 100644 index 0000000000..2c8e932bb2 --- /dev/null +++ b/JS_WAKU_AGENT_RULES.md @@ -0,0 +1,222 @@ +# Comprehensive Rules for Warp Agents in js-waku + +Here are the comprehensive rules for Warp agents working in the js-waku repository, based on the provided project structure and development workflow requirements. + +## 1. Package Structure and Naming Conventions + +### Rule 1.1: Standard Package Layout +Each package within the `packages/` directory must adhere to the following structure: + +```bash +packages/{package-name}/ +├── src/ # Source TypeScript files +│ ├── interfaces/ # Type definitions and interfaces +│ ├── utils/ # Utility functions (camelCase naming) +│ ├── components/ # Main components (PascalCase naming) +│ └── index.ts # Main entry point +├── tests/ # Test files only +│ ├── unit/ # Unit tests +│ └── integration/ # Integration tests +├── dist/ # Build outputs (generated, never commit) +├── package.json # Package configuration +└── tsconfig.json # TypeScript configuration +``` + +## 2. Version Control and Scripting + +### Rule 2.1: Conventional Commit Messages +Commit messages MUST follow the Conventional Commits specification. + +**Examples:** +``` +feat: add new message filtering capability +fix: resolve connection timeout issue +chore: update dependencies to latest versions +docs: improve API documentation +test: add unit tests for message validation +refactor: restructure connection management +``` + +### Rule 2.2: Git Hooks Requirements +The following Git hooks MUST be configured using `husky`: +- **Pre-commit:** Run `lint-staged` for code quality checks on staged files. +- **Pre-push:** Run the full test suite to ensure repository health. + +### Rule 2.3: npm Scripts Standards +Every package MUST include these standard npm scripts in its `package.json`: +```json +"scripts": { + "build": "yarn clean && tsc && rollup -c", + "clean": "rm -rf dist", + "test": "mocha", + "lint": "eslint . --ext .ts", + "watch": "tsc --watch" +} +``` + +## 3. Testing Pattern Rules + +### Rule 3.1: Test Framework +Use `mocha` and `chai` for all unit and integration tests. + +```typescript +import { expect } from 'chai'; + +describe('ComponentName', () => { + it('should perform its function correctly', () => { + // Test logic here + expect(true).to.be.true; + }); +}); +``` + +### Rule 3.2: Test File Naming and Location +- Test files MUST be located in the `tests/` directory. +- Test files MUST mirror the source file structure. +- **Unit tests:** `tests/unit/ComponentName.spec.ts` +- **Integration tests:** `tests/integration/FeatureName.spec.ts` + +### Rule 3.3: Integration Tests Requiring Waku Node +- Store tests requiring a Waku node instance in `packages/tests/integration/`. +- Each test file MUST include setup and teardown logic for the Waku node. +- Use a consistent and isolated test environment configuration. + +### Rule 3.4: CI Pipeline Integration +- All tests MUST pass before a pull request can be merged. +- The CI pipeline MUST run tests automatically on every push event to any branch. +- Test coverage reporting MUST be included in the CI pipeline. + +## 4. Build Process Rules + +### Rule 4.1: TypeScript Configuration +Use a strict `tsconfig.json` to ensure type safety. +```json +{ + "compilerOptions": { + "target": "es2020", + "module": "commonjs", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true, + "outDir": "./dist" + }, + "include": ["src/**/*"] +} +``` + +### Rule 4.2: Rollup Configuration Requirements +- The build process MUST generate both ESM and CommonJS outputs. +- Configure module resolution to support both Node.js and browser environments. +- Tree-shaking MUST be enabled to optimize the final bundle size. + +### Rule 4.3: Build Script Requirements +- `build`: Cleans the `dist/` directory, compiles TypeScript, and bundles with Rollup. +- `clean`: Removes all generated files in the `dist/` directory. +- `watch`: Enables development mode with automatic recompilation on file changes. + +### Rule 4.4: Build Outputs +- The `dist/` directory MUST never be committed to version control. +- Ensure `dist/` is included in the root `.gitignore` file. +- Builds MUST be reproducible across all development and CI environments. + +## 5. Dependency Management Rules + +### Rule 5.1: Use Precise Versioning +Use tilde (`~`) for patch versions and caret (`^`) for minor versions to ensure consistency while allowing non-breaking updates. +```json +"dependencies": { + "some-package": "^1.2.3", + "another-package": "~2.4.1" +} +``` + +### Rule 5.2: Peer Dependencies Usage +- Use `peerDependencies` for libraries that are expected to be provided by the consuming application (e.g., `react`). +- Always specify a valid version range for peer dependencies. + +### Rule 5.3: Dependency Audit and Updates +- Run `npm audit` regularly to identify and fix vulnerabilities. +- Update dependencies at least monthly. +- All changes from dependency updates MUST be thoroughly tested. + +## 6. Code Quality and Style Rules + +### Rule 6.1: TypeScript Best Practices +- `strict: true` MUST be enabled in `tsconfig.json`. +- The `any` type should be avoided. Use `unknown` with type guards instead. +- Use type guards for runtime validation of external data. + +### Rule 6.2: Export Patterns +- Prefer named exports for utilities, types, and components. +- Use a single `index.ts` file in each major directory (`src/`, `src/components`, etc.) to expose the public API of that module. + +### Rule 6.3: Implement Proper Error Handling +- Create and use typed error classes for different failure modes. +- Provide meaningful and descriptive error messages. +- Handle asynchronous operations correctly using `try-catch` blocks with `async/await`. + +## 7. Documentation Rules + +### Rule 7.1: Code Documentation Requirements +- All public APIs, functions, and classes MUST have JSDoc comments. +- Each package MUST contain a `README.md` with setup instructions and usage examples. +- Complex architectural components should be documented in a central location. + +### Rule 7.2: API Documentation +- All public interfaces and types MUST be documented. +- Documentation MUST be kept up-to-date with any code changes. + +## 8. Performance and Optimization Rules + +### Rule 8.1: Bundle Optimization +- Use tree-shaking in the Rollup configuration. +- Regularly analyze the bundle size to identify and remove unnecessary dependencies. + +### Rule 8.2: Memory Management +- Ensure resources (e.g., event listeners, subscriptions) are properly disposed of to prevent memory leaks. +- Use `WeakMap` or `WeakSet` for caching objects where appropriate. +- Monitor memory usage in long-running processes. + +## 9. Security Rules + +### Rule 9.1: Security Practices +- All external inputs MUST be validated and sanitized. +- Use cryptographically secure random number generation where needed. +- Regularly update dependencies to patch security vulnerabilities. + +### Rule 9.2: Sensitive Data Handling +- Never commit secrets, private keys, or other sensitive data to the repository. +- Use environment variables for configuration and secrets management. + +## 10. Debugging and Monitoring Rules + +### Rule 10.1: Logging Standards +- Use a structured logging format (e.g., JSON). +- Include appropriate log levels (debug, info, warn, error). +- Never log sensitive information in plain text. + +### Rule 10.2: Debugging Support +- Generate source maps in development builds to facilitate easier debugging. +- Provide clear error messages that include context about the failure. + +## 11. Enforcement Rules + +### Rule 11.1: Before any code changes: +1. Check that the current project structure matches these rules. +2. Verify that all naming conventions are being followed. +3. Run linting and all tests to ensure the current state is clean. +4. Review proposed changes against these established standards. + +### Rule 11.2: When creating new files or packages: +1. Follow the established directory and file structure. +2. Use the appropriate naming conventions for files, components, and variables. +3. Include necessary configuration files (`package.json`, `tsconfig.json`). +4. Update relevant documentation. + +### Rule 11.3: For dependency changes: +1. Check for compatibility with the existing codebase. +2. Update lock files (`package-lock.json` or `yarn.lock`) consistently. +3. Test thoroughly across different environments. +4. Document any breaking changes in the pull request and relevant READMEs. + diff --git a/WAKU_LIBP2P_INTEGRATION_PATTERNS.md b/WAKU_LIBP2P_INTEGRATION_PATTERNS.md new file mode 100644 index 0000000000..7c092d988a --- /dev/null +++ b/WAKU_LIBP2P_INTEGRATION_PATTERNS.md @@ -0,0 +1,463 @@ +# Waku Protocol-Specific libp2p Integration Patterns + +This document analyzes how each Waku protocol integrates with libp2p, focusing on stream management, error handling, and protocol buffering patterns. + +## Overview + +All Waku protocols use a common `StreamManager` class for managing libp2p streams, but each protocol has different patterns for using these streams based on their communication model. + +## Core Stream Management Pattern + +### StreamManager Architecture + +The `StreamManager` class provides a unified interface for managing libp2p streams across all protocols: + +```typescript +export class StreamManager { + private readonly streamPool: Map> = new Map(); + private readonly ongoingCreation: Set = new Set(); + + public async getStream(peerId: PeerId): Promise { + // 1. Check for existing stream + let stream = this.getOpenStreamForCodec(peerId); + if (stream) { + this.lockStream(peerIdStr, stream); + return stream; + } + + // 2. Create new stream if needed + stream = await this.createStream(peerId); + this.lockStream(peerIdStr, stream); + return stream; + } +} +``` + +**Key Features:** +- **Stream Pooling**: Reuses existing streams when possible +- **Stream Locking**: Prevents concurrent access to the same stream +- **Connection Selection**: Uses `selectOpenConnection()` to pick the best connection +- **Automatic Cleanup**: Handles peer disconnection events + +## Protocol-Specific Integration Patterns + +### 1. Store Protocol: Request/Response Pattern + +**Multicodec**: `/vac/waku/store-query/3.0.0` + +**Integration Pattern:** +```typescript +export class StoreCore { + private readonly streamManager: StreamManager; + + public async *queryPerPage( + queryOpts: QueryRequestParams, + decoders: Map>, + peerId: PeerId + ): AsyncGenerator[]> { + // 1. Get managed stream + const stream = await this.streamManager.getStream(peerId); + + // 2. Send request with length-prefixed encoding + const res = await pipe( + [storeQueryRequest.encode()], + lp.encode, // Length-prefixed encoding + stream, // libp2p stream + lp.decode, // Length-prefixed decoding + async (source) => await all(source) + ); + + // 3. Process response + const storeQueryResponse = StoreQueryResponse.decode(bytes); + yield decodedMessages; + } +} +``` + +**Stream Lifecycle:** +- **Creation**: On-demand when query is initiated +- **Duration**: Single request/response cycle +- **Cleanup**: Automatic via StreamManager locking mechanism +- **Error Handling**: Stream failures break pagination loop + +**Protocol Buffer Integration:** +```typescript +// Request encoding +const request = StoreQueryRequest.create({ + pubsubTopic, + contentTopics, + timeStart: BigInt(params.timeStart.getTime() * 1_000_000), + paginationLimit: BigInt(params.paginationLimit) +}); + +// Response decoding +const response = StoreQueryResponse.decode(bytes); +``` + +### 2. Filter Protocol: Push-Based Subscription Pattern + +**Multicodecs**: +- Subscribe: `/vac/waku/filter-subscribe/2.0.0-beta1` +- Push: `/vac/waku/filter-push/2.0.0-beta1` + +**Integration Pattern:** +```typescript +export class FilterCore { + private streamManager: StreamManager; + + constructor(handleIncomingMessage: IncomingMessageHandler, libp2p: Libp2p) { + this.streamManager = new StreamManager(FilterCodecs.SUBSCRIBE, libp2p.components); + + // Register push handler for incoming messages + libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this), { + maxInboundStreams: 100 + }); + } + + // Outbound: Subscribe to filter + public async subscribe( + pubsubTopic: PubsubTopic, + peerId: PeerId, + contentTopics: ContentTopic[] + ): Promise { + const stream = await this.streamManager.getStream(peerId); + const request = FilterSubscribeRpc.createSubscribeRequest(pubsubTopic, contentTopics); + + const res = await pipe( + [request.encode()], + lp.encode, + stream, + lp.decode, + async (source) => await all(source) + ); + } + + // Inbound: Handle pushed messages + private onRequest(streamData: IncomingStreamData): void { + const { connection, stream } = streamData; + + pipe(stream, lp.decode, async (source) => { + for await (const bytes of source) { + const response = FilterPushRpc.decode(bytes.slice()); + const { pubsubTopic, wakuMessage } = response; + + await this.handleIncomingMessage(pubsubTopic, wakuMessage, connection.remotePeer.toString()); + } + }); + } +} +``` + +**Stream Lifecycle:** +- **Subscribe Stream**: Short-lived for subscription requests +- **Push Stream**: Long-lived for receiving messages +- **Duration**: Push streams remain open for continuous message delivery +- **Cleanup**: Automatic cleanup on peer disconnection + +**Bidirectional Communication:** +- **Outbound**: Uses StreamManager for subscription/unsubscription requests +- **Inbound**: Handles incoming push streams via libp2p stream handler + +### 3. LightPush Protocol: Request/Response with RPC Encoding + +**Multicodec**: `/vac/waku/lightpush/2.0.0-beta1` + +**Integration Pattern:** +```typescript +export class LightPushCore { + private readonly streamManager: StreamManager; + + public async send( + encoder: IEncoder, + message: IMessage, + peerId: PeerId + ): Promise { + // 1. Prepare message + const query = PushRpc.createRequest(protoMessage, encoder.pubsubTopic); + + // 2. Get stream + const stream = await this.streamManager.getStream(peerId); + + // 3. Send with length-prefixed encoding + const res = await pipe( + [query.encode()], + lp.encode, + stream, + lp.decode, + async (source) => await all(source) + ); + + // 4. Process response + const response = PushRpc.decode(bytes).response; + return response.isSuccess ? { success: peerId } : { failure: { error, peerId } }; + } +} +``` + +**Stream Lifecycle:** +- **Creation**: On-demand for each send operation +- **Duration**: Single request/response cycle +- **Cleanup**: Automatic via StreamManager +- **Error Handling**: Comprehensive error classification + +**Message Validation:** +```typescript +// Size validation +if (!(await isMessageSizeUnderCap(encoder, message))) { + return { error: ProtocolError.SIZE_TOO_BIG }; +} + +// Payload validation +if (!message.payload || message.payload.length === 0) { + return { error: ProtocolError.EMPTY_PAYLOAD }; +} +``` + +## Common Stream Management Patterns + +### 1. Length-Prefixed Encoding + +All protocols use `it-length-prefixed` for message framing: + +```typescript +// Send pattern +await pipe( + [message.encode()], + lp.encode, // Add length prefix + stream, + lp.decode, // Remove length prefix + async (source) => await all(source) +); +``` + +**Benefits:** +- **Message Boundaries**: Clear message delimitation +- **Streaming Support**: Works with async iterators +- **Error Resilience**: Prevents message fragmentation issues + +### 2. Connection Selection Strategy + +```typescript +export function selectOpenConnection(connections: Connection[]): Connection | undefined { + return connections + .filter((c) => c.status === "open") + .sort((left, right) => right.timeline.open - left.timeline.open) + .at(0); +} +``` + +**Strategy:** +- **Status Filter**: Only use open connections +- **Recency Preference**: Prefer recently opened connections +- **Fallback**: Return undefined if no suitable connection + +### 3. Stream Locking Mechanism + +```typescript +private lockStream(peerId: string, stream: Stream): void { + stream.metadata[STREAM_LOCK_KEY] = true; +} + +private isStreamLocked(stream: Stream): boolean { + return !!stream.metadata[STREAM_LOCK_KEY]; +} +``` + +**Purpose:** +- **Concurrent Access Prevention**: Avoid stream conflicts +- **Resource Management**: Ensure proper stream lifecycle +- **Debugging**: Track stream usage patterns + +## Error Handling Patterns + +### 1. Stream-Level Error Handling + +```typescript +// Store protocol error handling +try { + stream = await this.streamManager.getStream(peerId); +} catch (e) { + log.error("Failed to get stream", e); + break; // Exit pagination loop +} +``` + +### 2. Protocol-Level Error Classification + +```typescript +// LightPush comprehensive error handling +export enum ProtocolError { + GENERIC_FAIL = "GENERIC_FAIL", + NO_STREAM_AVAILABLE = "NO_STREAM_AVAILABLE", + STREAM_ABORTED = "STREAM_ABORTED", + DECODE_FAILED = "DECODE_FAILED", + NO_RESPONSE = "NO_RESPONSE", + REMOTE_PEER_REJECTED = "REMOTE_PEER_REJECTED", + SIZE_TOO_BIG = "SIZE_TOO_BIG", + EMPTY_PAYLOAD = "EMPTY_PAYLOAD", + ENCODE_FAILED = "ENCODE_FAILED" +} +``` + +### 3. Graceful Degradation + +```typescript +// Filter protocol graceful error handling +pipe(stream, lp.decode, async (source) => { + for await (const bytes of source) { + try { + const response = FilterPushRpc.decode(bytes.slice()); + await this.handleIncomingMessage(pubsubTopic, wakuMessage, peerId); + } catch (e) { + log.error("Error decoding message", e); + // Continue processing other messages + } + } +}).catch((e) => { + log.error("Error with receiving pipe", e); + // Handle stream-level errors +}); +``` + +## Protocol Buffer Integration + +### 1. Message Encoding/Decoding Pattern + +```typescript +// Consistent encoding pattern across protocols +export class MessageRpc { + public static decode(bytes: Uint8ArrayList): MessageRpc { + const res = proto.MessageRpc.decode(bytes); + return new MessageRpc(res); + } + + public encode(): Uint8Array { + return proto.MessageRpc.encode(this.proto); + } +} +``` + +### 2. Type-Safe Protocol Buffers + +```typescript +// Store protocol with proper typing +export class StoreQueryRequest { + public constructor(public proto: proto.StoreQueryRequest) {} + + public static create(params: QueryRequestParams): StoreQueryRequest { + return new StoreQueryRequest({ + ...params, + timeStart: params.timeStart ? BigInt(params.timeStart.getTime() * 1_000_000) : undefined, + paginationLimit: params.paginationLimit ? BigInt(params.paginationLimit) : undefined + }); + } +} +``` + +### 3. Validation and Error Handling + +```typescript +// Comprehensive request validation +public static create(params: QueryRequestParams): StoreQueryRequest { + const isHashQuery = params.messageHashes && params.messageHashes.length > 0; + const hasContentTopics = params.contentTopics && params.contentTopics.length > 0; + + if (isHashQuery && hasContentTopics) { + throw new Error("Message hash queries cannot include content filters"); + } + + if (!isHashQuery && !params.pubsubTopic && hasContentTopics) { + throw new Error("Both pubsubTopic and contentTopics required for content-filtered queries"); + } +} +``` + +## Performance Optimizations + +### 1. Stream Reuse Strategy + +```typescript +// StreamManager optimization +public async getStream(peerId: PeerId): Promise { + // Check for existing usable stream + let stream = this.getOpenStreamForCodec(peerId); + if (stream && !this.isStreamLocked(stream)) { + return stream; // Reuse existing stream + } + + // Create new stream only if necessary + return await this.createStream(peerId); +} +``` + +### 2. Connection Pooling + +```typescript +// Efficient connection selection +private getOpenStreamForCodec(peerId: PeerId): Stream | undefined { + const connections = this.libp2p.connectionManager.getConnections(peerId); + const connection = selectOpenConnection(connections); + + if (!connection) return; + + // Find existing stream with correct protocol + const stream = connection.streams.find(s => s.protocol === this.multicodec); + + return stream && !this.isStreamUnusable(stream) ? stream : undefined; +} +``` + +### 3. Async Iterator Optimization + +```typescript +// Efficient message streaming in Store protocol +public async *queryPerPage(): AsyncGenerator[]> { + while (true) { + const res = await pipe( + [request.encode()], + lp.encode, + stream, + lp.decode, + async (source) => await all(source) // Collect all chunks efficiently + ); + + yield decodedMessages; // Yield batch of messages + + if (shouldStopPagination) break; + } +} +``` + +## Best Practices + +### 1. Stream Lifecycle Management + +- **Acquire**: Use StreamManager for consistent stream acquisition +- **Lock**: Lock streams during usage to prevent conflicts +- **Release**: Automatic cleanup via metadata and event handlers +- **Monitor**: Track stream health and connection status + +### 2. Error Handling + +- **Classify**: Use typed error enums for better error handling +- **Propagate**: Bubble up errors with context information +- **Recover**: Implement graceful degradation where possible +- **Log**: Comprehensive logging for debugging + +### 3. Protocol Buffer Usage + +- **Validate**: Validate inputs before encoding +- **Type Safety**: Use TypeScript wrappers for protocol buffers +- **Efficiency**: Reuse encoder/decoder instances +- **Version**: Handle protocol version compatibility + +### 4. Performance Considerations + +- **Stream Reuse**: Maximize stream reuse for better performance +- **Connection Selection**: Use efficient connection selection strategies +- **Batch Processing**: Process messages in batches where possible +- **Memory Management**: Clean up resources properly + +## Conclusion + +The Waku protocol implementations demonstrate sophisticated libp2p integration patterns that balance performance, reliability, and maintainability. The common StreamManager provides a solid foundation while allowing each protocol to implement its specific communication patterns. The consistent use of length-prefixed encoding, comprehensive error handling, and efficient stream management makes the codebase robust and scalable. diff --git a/WAKU_PROTOCOL_STACK_ARCHITECTURE.md b/WAKU_PROTOCOL_STACK_ARCHITECTURE.md new file mode 100644 index 0000000000..702d05dfc8 --- /dev/null +++ b/WAKU_PROTOCOL_STACK_ARCHITECTURE.md @@ -0,0 +1,387 @@ +# Waku Protocol Stack Architecture Documentation + +## Overview + +The Waku protocol stack is built on top of libp2p's networking foundation, providing a layered architecture where application-layer Waku protocols utilize libp2p's transport and connection management capabilities. This document provides an in-depth analysis of how these protocols interact, negotiate, and register with the underlying libp2p infrastructure. + +## Protocol Stack Layers + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Application Layer │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────┐ │ +│ │ Relay │ │ Store │ │ Filter │ │LightPush│ │ +│ │ (Gossip) │ │ (History) │ │(Bandwidth) │ │ (Relay) │ │ +│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────┘ │ +├─────────────────────────────────────────────────────────────┤ +│ Protocol Layer │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────┐ │ +│ │ Metadata │ │ Peer Exchange│ │ Identity │ │ Ping │ │ +│ │ (Handshake) │ │ (Discovery) │ │ (Protocol) │ │ (Health)│ │ +│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────┘ │ +├─────────────────────────────────────────────────────────────┤ +│ Libp2p Layer │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │ +│ │ Multistream │ │ Connection │ │ Stream Management │ │ +│ │ Select │ │ Management │ │ (Multiplexing) │ │ +│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │ +├─────────────────────────────────────────────────────────────┤ +│ Transport Layer │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │ +│ │ WebSockets │ │ Noise │ │ Mplex │ │ +│ │(Transport) │ │(Encryption) │ │ (Multiplexing) │ │ +│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │ +└─────────────────────────────────────────────────────────────┘ +``` + +## Multicodec Identifiers + +Waku protocols use specific multicodec identifiers to enable protocol negotiation through libp2p's multistream-select mechanism: + +### Core Waku Protocols + +| Protocol | Multicodec | Purpose | Implementation | +|----------|------------|---------|---------------| +| **Relay** | `/vac/waku/relay/2.0.0` | Gossip-based message routing | GossipSub protocol | +| **Store** | `/vac/waku/store-query/3.0.0` | Historical message retrieval | Request-response pattern | +| **Filter** | `/vac/waku/filter-subscribe/2.0.0-beta1` | Bandwidth-efficient message filtering | Subscription-based | +| **Filter Push** | `/vac/waku/filter-push/2.0.0-beta1` | Filter message delivery | Push-based delivery | +| **LightPush** | `/vac/waku/lightpush/2.0.0-beta1` | Lightweight message publishing | Request-response pattern | +| **Metadata** | `/vac/waku/metadata/1.0.0` | Shard information exchange | Handshake protocol | + +### Legacy Protocol Identifiers + +Note: The task mentions Store as `/vac/waku/store/2.0.0-beta4`, but the current implementation uses `/vac/waku/store-query/3.0.0` for Store v3. This represents the evolution of the protocol specifications. + +## Protocol Negotiation Through Multistream-Select + +### Protocol Registration Process + +1. **Service Registration**: Each protocol registers its multicodec with libp2p during initialization +2. **Stream Handler Setup**: Protocols define handlers for incoming streams +3. **Protocol Advertising**: Libp2p advertises supported protocols to peers + +### Example: Filter Protocol Registration + +```typescript +// packages/core/src/lib/filter/filter.ts +export const FilterCodecs = { + SUBSCRIBE: "/vac/waku/filter-subscribe/2.0.0-beta1", + PUSH: "/vac/waku/filter-push/2.0.0-beta1" +}; + +export class FilterCore { + public constructor( + private handleIncomingMessage: IncomingMessageHandler, + libp2p: Libp2p + ) { + // Register protocol handler with libp2p + libp2p + .handle(FilterCodecs.PUSH, this.onRequest.bind(this), { + maxInboundStreams: 100 + }) + .catch((e) => { + log.error("Failed to register ", FilterCodecs.PUSH, e); + }); + } +} +``` + +### Multistream-Select Negotiation Flow + +``` +Initiator Responder + │ │ + │ ── /multistream/1.0.0 ──────────→ │ + │ ←─────── /multistream/1.0.0 ───── │ + │ │ + │ ── /vac/waku/store-query/3.0.0 ─→ │ + │ ←─ /vac/waku/store-query/3.0.0 ── │ + │ │ + │ ── ───→ │ + │ ←─ ──── │ +``` + +## Abstraction Layers + +### Transport/Connection Layer (libp2p) + +**Responsibilities:** +- Connection establishment and management +- Stream multiplexing (Mplex) +- Encryption (Noise protocol) +- Transport protocols (WebSockets) +- Peer discovery and routing + +**Key Components:** +```typescript +// Connection establishment +const connection = await libp2p.dial(peerId); +const stream = await connection.newStream(multicodec); + +// Stream management +const streamManager = new StreamManager(multicodec, libp2p.components); +``` + +### Stream Management Layer + +**Purpose:** Abstracts stream lifecycle management for Waku protocols + +**Key Features:** +- Stream pooling and reuse +- Connection state management +- Automatic stream creation +- Stream locking mechanism + +```typescript +// packages/core/src/lib/stream_manager/stream_manager.ts +export class StreamManager { + public async getStream(peerId: PeerId): Promise { + // 1. Check for existing stream + let stream = this.getOpenStreamForCodec(peerId); + + if (stream) { + this.lockStream(peerIdStr, stream); + return stream; + } + + // 2. Create new stream if needed + stream = await this.createStream(peerId); + this.lockStream(peerIdStr, stream); + + return stream; + } +} +``` + +### Protocol Layer (Waku Applications) + +**Responsibilities:** +- Protocol-specific message handling +- Business logic implementation +- Message encoding/decoding +- Protocol state management + +## Protocol Registration and Stream Handling + +### Registration Patterns + +#### 1. Outbound-Only Protocols (Store, LightPush) +```typescript +// Store protocol - client-side only +export class StoreCore { + public constructor(libp2p: Libp2p) { + this.streamManager = new StreamManager(StoreCodec, libp2p.components); + // No incoming stream handler needed + } +} +``` + +#### 2. Bidirectional Protocols (Filter, Metadata) +```typescript +// Filter protocol - handles both outbound and inbound streams +export class FilterCore { + public constructor(libp2p: Libp2p) { + // Setup outbound stream management + this.streamManager = new StreamManager(FilterCodecs.SUBSCRIBE, libp2p.components); + + // Register inbound stream handler + libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)); + } +} +``` + +#### 3. Gossip-Based Protocols (Relay) +```typescript +// Relay protocol - integrates with libp2p's GossipSub +export class Relay implements IRelay { + public constructor(params: RelayConstructorParams) { + this.gossipSub = params.libp2p.services.pubsub as GossipSub; + this.gossipSub.multicodecs = RelayCodecs; // ["/vac/waku/relay/2.0.0"] + } +} +``` + +### Stream Handler Implementation + +#### Request-Response Pattern +```typescript +// Store protocol query implementation +public async *queryPerPage( + queryOpts: QueryRequestParams, + decoders: Map>, + peerId: PeerId +): AsyncGenerator[]> { + const stream = await this.streamManager.getStream(peerId); + + const res = await pipe( + [storeQueryRequest.encode()], + lp.encode, // Length-prefixed encoding + stream, // libp2p stream + lp.decode, // Length-prefixed decoding + async (source) => await all(source) + ); + + const storeQueryResponse = StoreQueryResponse.decode(bytes); + // Process response... +} +``` + +#### Push-Based Pattern +```typescript +// Filter protocol push handler +private onRequest(streamData: IncomingStreamData): void { + const { connection, stream } = streamData; + + pipe(stream, lp.decode, async (source) => { + for await (const bytes of source) { + const response = FilterPushRpc.decode(bytes.slice()); + + await this.handleIncomingMessage( + response.pubsubTopic, + response.wakuMessage, + connection.remotePeer.toString() + ); + } + }); +} +``` + +### Protocol Lifecycle Management + +#### Node Initialization +```typescript +// Waku node creation with protocol configuration +export class WakuNode implements IWaku { + public constructor( + pubsubTopics: PubsubTopic[], + options: CreateNodeOptions, + libp2p: Libp2p, + protocolsEnabled: ProtocolsEnabled + ) { + // Initialize protocols based on configuration + if (protocolsEnabled.store) { + this.store = new Store({ libp2p, ... }); + } + + if (protocolsEnabled.lightpush) { + this.lightPush = new LightPush({ libp2p, ... }); + } + + if (protocolsEnabled.filter) { + this.filter = new Filter({ libp2p, ... }); + } + } +} +``` + +#### Service Registration with Libp2p +```typescript +// libp2p service configuration +const libp2p = await createLibp2p({ + services: { + identify: identify({ agentVersion: userAgent }), + ping: ping({ maxInboundStreams: 10 }), + metadata: wakuMetadata(pubsubTopics), // Waku metadata service + ...options?.services + } +}); +``` + +## Protocol-Specific Stream Management + +### Store Protocol +- **Pattern:** Client-server request-response +- **Stream Usage:** One-time use per query +- **Multiplexing:** Single stream per peer, length-prefixed messages + +### Filter Protocol +- **Pattern:** Subscription-based with bidirectional communication +- **Stream Usage:** Long-lived subscription streams +- **Multiplexing:** Separate streams for subscribe/unsubscribe and push + +### LightPush Protocol +- **Pattern:** Client-server request-response +- **Stream Usage:** One-time use per message publish +- **Multiplexing:** Single stream per peer + +### Relay Protocol +- **Pattern:** Gossip-based peer-to-peer +- **Stream Usage:** Managed by GossipSub +- **Multiplexing:** libp2p's GossipSub handles stream management + +## Error Handling and Recovery + +### Protocol Error Types +```typescript +export enum ProtocolError { + NO_PEER_AVAILABLE = "No peer available", + NO_STREAM_AVAILABLE = "No stream available", + DECODE_FAILED = "Failed to decode", + REMOTE_PEER_REJECTED = "Remote peer rejected", + STREAM_ABORTED = "Stream aborted", + // ... other errors +} +``` + +### Stream Recovery Mechanisms +- **Automatic retry:** Stream creation retries on failure +- **Peer rotation:** Fall back to alternative peers +- **Connection pooling:** Reuse existing connections +- **Stream locking:** Prevent concurrent stream usage + +## Performance Optimizations + +### Stream Pooling +```typescript +// Stream reuse mechanism +private getOpenStreamForCodec(peerId: PeerId): Stream | undefined { + const connections = this.libp2p.connectionManager.getConnections(peerId); + const connection = selectOpenConnection(connections); + + const stream = connection.streams.find( + (s) => s.protocol === this.multicodec + ); + + if (stream && !this.isStreamLocked(stream)) { + return stream; + } +} +``` + +### Connection Management +- **Keep-alive mechanisms:** Ping-based connection health monitoring +- **Connection limits:** Configurable connection pool sizes +- **Priority-based peer selection:** Preferential treatment for bootstrap peers + +## Security Considerations + +### Protocol Authentication +- **Noise encryption:** All connections encrypted by default +- **Peer identity verification:** Public key-based peer identification +- **Message validation:** Protocol-specific message validation + +### Stream Security +- **Stream isolation:** Each protocol uses isolated streams +- **Message size limits:** Prevent DoS attacks via large messages +- **Rate limiting:** Configurable limits on stream creation + +## Conclusion + +The Waku protocol stack demonstrates a well-architected layered approach where: + +1. **Libp2p provides the foundation** with transport, connection management, and protocol negotiation +2. **Multistream-select enables protocol negotiation** through well-defined multicodec identifiers +3. **Stream management abstracts complexity** of connection lifecycle and reuse +4. **Protocol implementations focus on business logic** while leveraging libp2p's infrastructure + +This architecture enables scalable, efficient, and secure communication while maintaining clear separation of concerns between transport, networking, and application layers. + +## References + +- [Waku Relay Protocol Specification](https://rfc.vac.dev/spec/11/) +- [Waku Store Protocol Specification](https://rfc.vac.dev/spec/13/) +- [Waku Filter Protocol Specification](https://rfc.vac.dev/spec/12/) +- [Waku LightPush Protocol Specification](https://rfc.vac.dev/spec/19/) +- [Libp2p Protocol Documentation](https://docs.libp2p.io/concepts/protocols/) +- [Multistream-Select Specification](https://github.com/multiformats/multistream-select)