add agent rules

This commit is contained in:
Arseniy Klempner 2025-07-15 20:20:39 -07:00
parent 27292edabc
commit 9f3c89e458
No known key found for this signature in database
GPG Key ID: 51653F18863BD24B
3 changed files with 1072 additions and 0 deletions

222
JS_WAKU_AGENT_RULES.md Normal file
View File

@ -0,0 +1,222 @@
# Comprehensive Rules for Warp Agents in js-waku
Here are the comprehensive rules for Warp agents working in the js-waku repository, based on the provided project structure and development workflow requirements.
## 1. Package Structure and Naming Conventions
### Rule 1.1: Standard Package Layout
Each package within the `packages/` directory must adhere to the following structure:
```bash
packages/{package-name}/
├── src/ # Source TypeScript files
│ ├── interfaces/ # Type definitions and interfaces
│ ├── utils/ # Utility functions (camelCase naming)
│ ├── components/ # Main components (PascalCase naming)
│ └── index.ts # Main entry point
├── tests/ # Test files only
│ ├── unit/ # Unit tests
│ └── integration/ # Integration tests
├── dist/ # Build outputs (generated, never commit)
├── package.json # Package configuration
└── tsconfig.json # TypeScript configuration
```
## 2. Version Control and Scripting
### Rule 2.1: Conventional Commit Messages
Commit messages MUST follow the Conventional Commits specification.
**Examples:**
```
feat: add new message filtering capability
fix: resolve connection timeout issue
chore: update dependencies to latest versions
docs: improve API documentation
test: add unit tests for message validation
refactor: restructure connection management
```
### Rule 2.2: Git Hooks Requirements
The following Git hooks MUST be configured using `husky`:
- **Pre-commit:** Run `lint-staged` for code quality checks on staged files.
- **Pre-push:** Run the full test suite to ensure repository health.
### Rule 2.3: npm Scripts Standards
Every package MUST include these standard npm scripts in its `package.json`:
```json
"scripts": {
"build": "yarn clean && tsc && rollup -c",
"clean": "rm -rf dist",
"test": "mocha",
"lint": "eslint . --ext .ts",
"watch": "tsc --watch"
}
```
## 3. Testing Pattern Rules
### Rule 3.1: Test Framework
Use `mocha` and `chai` for all unit and integration tests.
```typescript
import { expect } from 'chai';
describe('ComponentName', () => {
it('should perform its function correctly', () => {
// Test logic here
expect(true).to.be.true;
});
});
```
### Rule 3.2: Test File Naming and Location
- Test files MUST be located in the `tests/` directory.
- Test files MUST mirror the source file structure.
- **Unit tests:** `tests/unit/ComponentName.spec.ts`
- **Integration tests:** `tests/integration/FeatureName.spec.ts`
### Rule 3.3: Integration Tests Requiring Waku Node
- Store tests requiring a Waku node instance in `packages/tests/integration/`.
- Each test file MUST include setup and teardown logic for the Waku node.
- Use a consistent and isolated test environment configuration.
### Rule 3.4: CI Pipeline Integration
- All tests MUST pass before a pull request can be merged.
- The CI pipeline MUST run tests automatically on every push event to any branch.
- Test coverage reporting MUST be included in the CI pipeline.
## 4. Build Process Rules
### Rule 4.1: TypeScript Configuration
Use a strict `tsconfig.json` to ensure type safety.
```json
{
"compilerOptions": {
"target": "es2020",
"module": "commonjs",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"outDir": "./dist"
},
"include": ["src/**/*"]
}
```
### Rule 4.2: Rollup Configuration Requirements
- The build process MUST generate both ESM and CommonJS outputs.
- Configure module resolution to support both Node.js and browser environments.
- Tree-shaking MUST be enabled to optimize the final bundle size.
### Rule 4.3: Build Script Requirements
- `build`: Cleans the `dist/` directory, compiles TypeScript, and bundles with Rollup.
- `clean`: Removes all generated files in the `dist/` directory.
- `watch`: Enables development mode with automatic recompilation on file changes.
### Rule 4.4: Build Outputs
- The `dist/` directory MUST never be committed to version control.
- Ensure `dist/` is included in the root `.gitignore` file.
- Builds MUST be reproducible across all development and CI environments.
## 5. Dependency Management Rules
### Rule 5.1: Use Precise Versioning
Use tilde (`~`) for patch versions and caret (`^`) for minor versions to ensure consistency while allowing non-breaking updates.
```json
"dependencies": {
"some-package": "^1.2.3",
"another-package": "~2.4.1"
}
```
### Rule 5.2: Peer Dependencies Usage
- Use `peerDependencies` for libraries that are expected to be provided by the consuming application (e.g., `react`).
- Always specify a valid version range for peer dependencies.
### Rule 5.3: Dependency Audit and Updates
- Run `npm audit` regularly to identify and fix vulnerabilities.
- Update dependencies at least monthly.
- All changes from dependency updates MUST be thoroughly tested.
## 6. Code Quality and Style Rules
### Rule 6.1: TypeScript Best Practices
- `strict: true` MUST be enabled in `tsconfig.json`.
- The `any` type should be avoided. Use `unknown` with type guards instead.
- Use type guards for runtime validation of external data.
### Rule 6.2: Export Patterns
- Prefer named exports for utilities, types, and components.
- Use a single `index.ts` file in each major directory (`src/`, `src/components`, etc.) to expose the public API of that module.
### Rule 6.3: Implement Proper Error Handling
- Create and use typed error classes for different failure modes.
- Provide meaningful and descriptive error messages.
- Handle asynchronous operations correctly using `try-catch` blocks with `async/await`.
## 7. Documentation Rules
### Rule 7.1: Code Documentation Requirements
- All public APIs, functions, and classes MUST have JSDoc comments.
- Each package MUST contain a `README.md` with setup instructions and usage examples.
- Complex architectural components should be documented in a central location.
### Rule 7.2: API Documentation
- All public interfaces and types MUST be documented.
- Documentation MUST be kept up-to-date with any code changes.
## 8. Performance and Optimization Rules
### Rule 8.1: Bundle Optimization
- Use tree-shaking in the Rollup configuration.
- Regularly analyze the bundle size to identify and remove unnecessary dependencies.
### Rule 8.2: Memory Management
- Ensure resources (e.g., event listeners, subscriptions) are properly disposed of to prevent memory leaks.
- Use `WeakMap` or `WeakSet` for caching objects where appropriate.
- Monitor memory usage in long-running processes.
## 9. Security Rules
### Rule 9.1: Security Practices
- All external inputs MUST be validated and sanitized.
- Use cryptographically secure random number generation where needed.
- Regularly update dependencies to patch security vulnerabilities.
### Rule 9.2: Sensitive Data Handling
- Never commit secrets, private keys, or other sensitive data to the repository.
- Use environment variables for configuration and secrets management.
## 10. Debugging and Monitoring Rules
### Rule 10.1: Logging Standards
- Use a structured logging format (e.g., JSON).
- Include appropriate log levels (debug, info, warn, error).
- Never log sensitive information in plain text.
### Rule 10.2: Debugging Support
- Generate source maps in development builds to facilitate easier debugging.
- Provide clear error messages that include context about the failure.
## 11. Enforcement Rules
### Rule 11.1: Before any code changes:
1. Check that the current project structure matches these rules.
2. Verify that all naming conventions are being followed.
3. Run linting and all tests to ensure the current state is clean.
4. Review proposed changes against these established standards.
### Rule 11.2: When creating new files or packages:
1. Follow the established directory and file structure.
2. Use the appropriate naming conventions for files, components, and variables.
3. Include necessary configuration files (`package.json`, `tsconfig.json`).
4. Update relevant documentation.
### Rule 11.3: For dependency changes:
1. Check for compatibility with the existing codebase.
2. Update lock files (`package-lock.json` or `yarn.lock`) consistently.
3. Test thoroughly across different environments.
4. Document any breaking changes in the pull request and relevant READMEs.

View File

@ -0,0 +1,463 @@
# Waku Protocol-Specific libp2p Integration Patterns
This document analyzes how each Waku protocol integrates with libp2p, focusing on stream management, error handling, and protocol buffering patterns.
## Overview
All Waku protocols use a common `StreamManager` class for managing libp2p streams, but each protocol has different patterns for using these streams based on their communication model.
## Core Stream Management Pattern
### StreamManager Architecture
The `StreamManager` class provides a unified interface for managing libp2p streams across all protocols:
```typescript
export class StreamManager {
private readonly streamPool: Map<string, Promise<void>> = new Map();
private readonly ongoingCreation: Set<string> = new Set();
public async getStream(peerId: PeerId): Promise<Stream> {
// 1. Check for existing stream
let stream = this.getOpenStreamForCodec(peerId);
if (stream) {
this.lockStream(peerIdStr, stream);
return stream;
}
// 2. Create new stream if needed
stream = await this.createStream(peerId);
this.lockStream(peerIdStr, stream);
return stream;
}
}
```
**Key Features:**
- **Stream Pooling**: Reuses existing streams when possible
- **Stream Locking**: Prevents concurrent access to the same stream
- **Connection Selection**: Uses `selectOpenConnection()` to pick the best connection
- **Automatic Cleanup**: Handles peer disconnection events
## Protocol-Specific Integration Patterns
### 1. Store Protocol: Request/Response Pattern
**Multicodec**: `/vac/waku/store-query/3.0.0`
**Integration Pattern:**
```typescript
export class StoreCore {
private readonly streamManager: StreamManager;
public async *queryPerPage<T>(
queryOpts: QueryRequestParams,
decoders: Map<string, IDecoder<T>>,
peerId: PeerId
): AsyncGenerator<Promise<T | undefined>[]> {
// 1. Get managed stream
const stream = await this.streamManager.getStream(peerId);
// 2. Send request with length-prefixed encoding
const res = await pipe(
[storeQueryRequest.encode()],
lp.encode, // Length-prefixed encoding
stream, // libp2p stream
lp.decode, // Length-prefixed decoding
async (source) => await all(source)
);
// 3. Process response
const storeQueryResponse = StoreQueryResponse.decode(bytes);
yield decodedMessages;
}
}
```
**Stream Lifecycle:**
- **Creation**: On-demand when query is initiated
- **Duration**: Single request/response cycle
- **Cleanup**: Automatic via StreamManager locking mechanism
- **Error Handling**: Stream failures break pagination loop
**Protocol Buffer Integration:**
```typescript
// Request encoding
const request = StoreQueryRequest.create({
pubsubTopic,
contentTopics,
timeStart: BigInt(params.timeStart.getTime() * 1_000_000),
paginationLimit: BigInt(params.paginationLimit)
});
// Response decoding
const response = StoreQueryResponse.decode(bytes);
```
### 2. Filter Protocol: Push-Based Subscription Pattern
**Multicodecs**:
- Subscribe: `/vac/waku/filter-subscribe/2.0.0-beta1`
- Push: `/vac/waku/filter-push/2.0.0-beta1`
**Integration Pattern:**
```typescript
export class FilterCore {
private streamManager: StreamManager;
constructor(handleIncomingMessage: IncomingMessageHandler, libp2p: Libp2p) {
this.streamManager = new StreamManager(FilterCodecs.SUBSCRIBE, libp2p.components);
// Register push handler for incoming messages
libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this), {
maxInboundStreams: 100
});
}
// Outbound: Subscribe to filter
public async subscribe(
pubsubTopic: PubsubTopic,
peerId: PeerId,
contentTopics: ContentTopic[]
): Promise<CoreProtocolResult> {
const stream = await this.streamManager.getStream(peerId);
const request = FilterSubscribeRpc.createSubscribeRequest(pubsubTopic, contentTopics);
const res = await pipe(
[request.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
}
// Inbound: Handle pushed messages
private onRequest(streamData: IncomingStreamData): void {
const { connection, stream } = streamData;
pipe(stream, lp.decode, async (source) => {
for await (const bytes of source) {
const response = FilterPushRpc.decode(bytes.slice());
const { pubsubTopic, wakuMessage } = response;
await this.handleIncomingMessage(pubsubTopic, wakuMessage, connection.remotePeer.toString());
}
});
}
}
```
**Stream Lifecycle:**
- **Subscribe Stream**: Short-lived for subscription requests
- **Push Stream**: Long-lived for receiving messages
- **Duration**: Push streams remain open for continuous message delivery
- **Cleanup**: Automatic cleanup on peer disconnection
**Bidirectional Communication:**
- **Outbound**: Uses StreamManager for subscription/unsubscription requests
- **Inbound**: Handles incoming push streams via libp2p stream handler
### 3. LightPush Protocol: Request/Response with RPC Encoding
**Multicodec**: `/vac/waku/lightpush/2.0.0-beta1`
**Integration Pattern:**
```typescript
export class LightPushCore {
private readonly streamManager: StreamManager;
public async send(
encoder: IEncoder,
message: IMessage,
peerId: PeerId
): Promise<CoreProtocolResult> {
// 1. Prepare message
const query = PushRpc.createRequest(protoMessage, encoder.pubsubTopic);
// 2. Get stream
const stream = await this.streamManager.getStream(peerId);
// 3. Send with length-prefixed encoding
const res = await pipe(
[query.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
// 4. Process response
const response = PushRpc.decode(bytes).response;
return response.isSuccess ? { success: peerId } : { failure: { error, peerId } };
}
}
```
**Stream Lifecycle:**
- **Creation**: On-demand for each send operation
- **Duration**: Single request/response cycle
- **Cleanup**: Automatic via StreamManager
- **Error Handling**: Comprehensive error classification
**Message Validation:**
```typescript
// Size validation
if (!(await isMessageSizeUnderCap(encoder, message))) {
return { error: ProtocolError.SIZE_TOO_BIG };
}
// Payload validation
if (!message.payload || message.payload.length === 0) {
return { error: ProtocolError.EMPTY_PAYLOAD };
}
```
## Common Stream Management Patterns
### 1. Length-Prefixed Encoding
All protocols use `it-length-prefixed` for message framing:
```typescript
// Send pattern
await pipe(
[message.encode()],
lp.encode, // Add length prefix
stream,
lp.decode, // Remove length prefix
async (source) => await all(source)
);
```
**Benefits:**
- **Message Boundaries**: Clear message delimitation
- **Streaming Support**: Works with async iterators
- **Error Resilience**: Prevents message fragmentation issues
### 2. Connection Selection Strategy
```typescript
export function selectOpenConnection(connections: Connection[]): Connection | undefined {
return connections
.filter((c) => c.status === "open")
.sort((left, right) => right.timeline.open - left.timeline.open)
.at(0);
}
```
**Strategy:**
- **Status Filter**: Only use open connections
- **Recency Preference**: Prefer recently opened connections
- **Fallback**: Return undefined if no suitable connection
### 3. Stream Locking Mechanism
```typescript
private lockStream(peerId: string, stream: Stream): void {
stream.metadata[STREAM_LOCK_KEY] = true;
}
private isStreamLocked(stream: Stream): boolean {
return !!stream.metadata[STREAM_LOCK_KEY];
}
```
**Purpose:**
- **Concurrent Access Prevention**: Avoid stream conflicts
- **Resource Management**: Ensure proper stream lifecycle
- **Debugging**: Track stream usage patterns
## Error Handling Patterns
### 1. Stream-Level Error Handling
```typescript
// Store protocol error handling
try {
stream = await this.streamManager.getStream(peerId);
} catch (e) {
log.error("Failed to get stream", e);
break; // Exit pagination loop
}
```
### 2. Protocol-Level Error Classification
```typescript
// LightPush comprehensive error handling
export enum ProtocolError {
GENERIC_FAIL = "GENERIC_FAIL",
NO_STREAM_AVAILABLE = "NO_STREAM_AVAILABLE",
STREAM_ABORTED = "STREAM_ABORTED",
DECODE_FAILED = "DECODE_FAILED",
NO_RESPONSE = "NO_RESPONSE",
REMOTE_PEER_REJECTED = "REMOTE_PEER_REJECTED",
SIZE_TOO_BIG = "SIZE_TOO_BIG",
EMPTY_PAYLOAD = "EMPTY_PAYLOAD",
ENCODE_FAILED = "ENCODE_FAILED"
}
```
### 3. Graceful Degradation
```typescript
// Filter protocol graceful error handling
pipe(stream, lp.decode, async (source) => {
for await (const bytes of source) {
try {
const response = FilterPushRpc.decode(bytes.slice());
await this.handleIncomingMessage(pubsubTopic, wakuMessage, peerId);
} catch (e) {
log.error("Error decoding message", e);
// Continue processing other messages
}
}
}).catch((e) => {
log.error("Error with receiving pipe", e);
// Handle stream-level errors
});
```
## Protocol Buffer Integration
### 1. Message Encoding/Decoding Pattern
```typescript
// Consistent encoding pattern across protocols
export class MessageRpc {
public static decode(bytes: Uint8ArrayList): MessageRpc {
const res = proto.MessageRpc.decode(bytes);
return new MessageRpc(res);
}
public encode(): Uint8Array {
return proto.MessageRpc.encode(this.proto);
}
}
```
### 2. Type-Safe Protocol Buffers
```typescript
// Store protocol with proper typing
export class StoreQueryRequest {
public constructor(public proto: proto.StoreQueryRequest) {}
public static create(params: QueryRequestParams): StoreQueryRequest {
return new StoreQueryRequest({
...params,
timeStart: params.timeStart ? BigInt(params.timeStart.getTime() * 1_000_000) : undefined,
paginationLimit: params.paginationLimit ? BigInt(params.paginationLimit) : undefined
});
}
}
```
### 3. Validation and Error Handling
```typescript
// Comprehensive request validation
public static create(params: QueryRequestParams): StoreQueryRequest {
const isHashQuery = params.messageHashes && params.messageHashes.length > 0;
const hasContentTopics = params.contentTopics && params.contentTopics.length > 0;
if (isHashQuery && hasContentTopics) {
throw new Error("Message hash queries cannot include content filters");
}
if (!isHashQuery && !params.pubsubTopic && hasContentTopics) {
throw new Error("Both pubsubTopic and contentTopics required for content-filtered queries");
}
}
```
## Performance Optimizations
### 1. Stream Reuse Strategy
```typescript
// StreamManager optimization
public async getStream(peerId: PeerId): Promise<Stream> {
// Check for existing usable stream
let stream = this.getOpenStreamForCodec(peerId);
if (stream && !this.isStreamLocked(stream)) {
return stream; // Reuse existing stream
}
// Create new stream only if necessary
return await this.createStream(peerId);
}
```
### 2. Connection Pooling
```typescript
// Efficient connection selection
private getOpenStreamForCodec(peerId: PeerId): Stream | undefined {
const connections = this.libp2p.connectionManager.getConnections(peerId);
const connection = selectOpenConnection(connections);
if (!connection) return;
// Find existing stream with correct protocol
const stream = connection.streams.find(s => s.protocol === this.multicodec);
return stream && !this.isStreamUnusable(stream) ? stream : undefined;
}
```
### 3. Async Iterator Optimization
```typescript
// Efficient message streaming in Store protocol
public async *queryPerPage<T>(): AsyncGenerator<Promise<T | undefined>[]> {
while (true) {
const res = await pipe(
[request.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source) // Collect all chunks efficiently
);
yield decodedMessages; // Yield batch of messages
if (shouldStopPagination) break;
}
}
```
## Best Practices
### 1. Stream Lifecycle Management
- **Acquire**: Use StreamManager for consistent stream acquisition
- **Lock**: Lock streams during usage to prevent conflicts
- **Release**: Automatic cleanup via metadata and event handlers
- **Monitor**: Track stream health and connection status
### 2. Error Handling
- **Classify**: Use typed error enums for better error handling
- **Propagate**: Bubble up errors with context information
- **Recover**: Implement graceful degradation where possible
- **Log**: Comprehensive logging for debugging
### 3. Protocol Buffer Usage
- **Validate**: Validate inputs before encoding
- **Type Safety**: Use TypeScript wrappers for protocol buffers
- **Efficiency**: Reuse encoder/decoder instances
- **Version**: Handle protocol version compatibility
### 4. Performance Considerations
- **Stream Reuse**: Maximize stream reuse for better performance
- **Connection Selection**: Use efficient connection selection strategies
- **Batch Processing**: Process messages in batches where possible
- **Memory Management**: Clean up resources properly
## Conclusion
The Waku protocol implementations demonstrate sophisticated libp2p integration patterns that balance performance, reliability, and maintainability. The common StreamManager provides a solid foundation while allowing each protocol to implement its specific communication patterns. The consistent use of length-prefixed encoding, comprehensive error handling, and efficient stream management makes the codebase robust and scalable.

View File

@ -0,0 +1,387 @@
# Waku Protocol Stack Architecture Documentation
## Overview
The Waku protocol stack is built on top of libp2p's networking foundation, providing a layered architecture where application-layer Waku protocols utilize libp2p's transport and connection management capabilities. This document provides an in-depth analysis of how these protocols interact, negotiate, and register with the underlying libp2p infrastructure.
## Protocol Stack Layers
```
┌─────────────────────────────────────────────────────────────┐
│ Application Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────┐ │
│ │ Relay │ │ Store │ │ Filter │ │LightPush│ │
│ │ (Gossip) │ │ (History) │ │(Bandwidth) │ │ (Relay) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Protocol Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────┐ │
│ │ Metadata │ │ Peer Exchange│ │ Identity │ │ Ping │ │
│ │ (Handshake) │ │ (Discovery) │ │ (Protocol) │ │ (Health)│ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Libp2p Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ Multistream │ │ Connection │ │ Stream Management │ │
│ │ Select │ │ Management │ │ (Multiplexing) │ │
│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Transport Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ WebSockets │ │ Noise │ │ Mplex │ │
│ │(Transport) │ │(Encryption) │ │ (Multiplexing) │ │
│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
```
## Multicodec Identifiers
Waku protocols use specific multicodec identifiers to enable protocol negotiation through libp2p's multistream-select mechanism:
### Core Waku Protocols
| Protocol | Multicodec | Purpose | Implementation |
|----------|------------|---------|---------------|
| **Relay** | `/vac/waku/relay/2.0.0` | Gossip-based message routing | GossipSub protocol |
| **Store** | `/vac/waku/store-query/3.0.0` | Historical message retrieval | Request-response pattern |
| **Filter** | `/vac/waku/filter-subscribe/2.0.0-beta1` | Bandwidth-efficient message filtering | Subscription-based |
| **Filter Push** | `/vac/waku/filter-push/2.0.0-beta1` | Filter message delivery | Push-based delivery |
| **LightPush** | `/vac/waku/lightpush/2.0.0-beta1` | Lightweight message publishing | Request-response pattern |
| **Metadata** | `/vac/waku/metadata/1.0.0` | Shard information exchange | Handshake protocol |
### Legacy Protocol Identifiers
Note: The task mentions Store as `/vac/waku/store/2.0.0-beta4`, but the current implementation uses `/vac/waku/store-query/3.0.0` for Store v3. This represents the evolution of the protocol specifications.
## Protocol Negotiation Through Multistream-Select
### Protocol Registration Process
1. **Service Registration**: Each protocol registers its multicodec with libp2p during initialization
2. **Stream Handler Setup**: Protocols define handlers for incoming streams
3. **Protocol Advertising**: Libp2p advertises supported protocols to peers
### Example: Filter Protocol Registration
```typescript
// packages/core/src/lib/filter/filter.ts
export const FilterCodecs = {
SUBSCRIBE: "/vac/waku/filter-subscribe/2.0.0-beta1",
PUSH: "/vac/waku/filter-push/2.0.0-beta1"
};
export class FilterCore {
public constructor(
private handleIncomingMessage: IncomingMessageHandler,
libp2p: Libp2p
) {
// Register protocol handler with libp2p
libp2p
.handle(FilterCodecs.PUSH, this.onRequest.bind(this), {
maxInboundStreams: 100
})
.catch((e) => {
log.error("Failed to register ", FilterCodecs.PUSH, e);
});
}
}
```
### Multistream-Select Negotiation Flow
```
Initiator Responder
│ │
│ ── /multistream/1.0.0 ──────────→ │
│ ←─────── /multistream/1.0.0 ───── │
│ │
│ ── /vac/waku/store-query/3.0.0 ─→ │
│ ←─ /vac/waku/store-query/3.0.0 ── │
│ │
│ ── <protocol-specific data> ───→ │
│ ←─ <protocol-specific data> ──── │
```
## Abstraction Layers
### Transport/Connection Layer (libp2p)
**Responsibilities:**
- Connection establishment and management
- Stream multiplexing (Mplex)
- Encryption (Noise protocol)
- Transport protocols (WebSockets)
- Peer discovery and routing
**Key Components:**
```typescript
// Connection establishment
const connection = await libp2p.dial(peerId);
const stream = await connection.newStream(multicodec);
// Stream management
const streamManager = new StreamManager(multicodec, libp2p.components);
```
### Stream Management Layer
**Purpose:** Abstracts stream lifecycle management for Waku protocols
**Key Features:**
- Stream pooling and reuse
- Connection state management
- Automatic stream creation
- Stream locking mechanism
```typescript
// packages/core/src/lib/stream_manager/stream_manager.ts
export class StreamManager {
public async getStream(peerId: PeerId): Promise<Stream> {
// 1. Check for existing stream
let stream = this.getOpenStreamForCodec(peerId);
if (stream) {
this.lockStream(peerIdStr, stream);
return stream;
}
// 2. Create new stream if needed
stream = await this.createStream(peerId);
this.lockStream(peerIdStr, stream);
return stream;
}
}
```
### Protocol Layer (Waku Applications)
**Responsibilities:**
- Protocol-specific message handling
- Business logic implementation
- Message encoding/decoding
- Protocol state management
## Protocol Registration and Stream Handling
### Registration Patterns
#### 1. Outbound-Only Protocols (Store, LightPush)
```typescript
// Store protocol - client-side only
export class StoreCore {
public constructor(libp2p: Libp2p) {
this.streamManager = new StreamManager(StoreCodec, libp2p.components);
// No incoming stream handler needed
}
}
```
#### 2. Bidirectional Protocols (Filter, Metadata)
```typescript
// Filter protocol - handles both outbound and inbound streams
export class FilterCore {
public constructor(libp2p: Libp2p) {
// Setup outbound stream management
this.streamManager = new StreamManager(FilterCodecs.SUBSCRIBE, libp2p.components);
// Register inbound stream handler
libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this));
}
}
```
#### 3. Gossip-Based Protocols (Relay)
```typescript
// Relay protocol - integrates with libp2p's GossipSub
export class Relay implements IRelay {
public constructor(params: RelayConstructorParams) {
this.gossipSub = params.libp2p.services.pubsub as GossipSub;
this.gossipSub.multicodecs = RelayCodecs; // ["/vac/waku/relay/2.0.0"]
}
}
```
### Stream Handler Implementation
#### Request-Response Pattern
```typescript
// Store protocol query implementation
public async *queryPerPage<T extends IDecodedMessage>(
queryOpts: QueryRequestParams,
decoders: Map<string, IDecoder<T>>,
peerId: PeerId
): AsyncGenerator<Promise<T | undefined>[]> {
const stream = await this.streamManager.getStream(peerId);
const res = await pipe(
[storeQueryRequest.encode()],
lp.encode, // Length-prefixed encoding
stream, // libp2p stream
lp.decode, // Length-prefixed decoding
async (source) => await all(source)
);
const storeQueryResponse = StoreQueryResponse.decode(bytes);
// Process response...
}
```
#### Push-Based Pattern
```typescript
// Filter protocol push handler
private onRequest(streamData: IncomingStreamData): void {
const { connection, stream } = streamData;
pipe(stream, lp.decode, async (source) => {
for await (const bytes of source) {
const response = FilterPushRpc.decode(bytes.slice());
await this.handleIncomingMessage(
response.pubsubTopic,
response.wakuMessage,
connection.remotePeer.toString()
);
}
});
}
```
### Protocol Lifecycle Management
#### Node Initialization
```typescript
// Waku node creation with protocol configuration
export class WakuNode implements IWaku {
public constructor(
pubsubTopics: PubsubTopic[],
options: CreateNodeOptions,
libp2p: Libp2p,
protocolsEnabled: ProtocolsEnabled
) {
// Initialize protocols based on configuration
if (protocolsEnabled.store) {
this.store = new Store({ libp2p, ... });
}
if (protocolsEnabled.lightpush) {
this.lightPush = new LightPush({ libp2p, ... });
}
if (protocolsEnabled.filter) {
this.filter = new Filter({ libp2p, ... });
}
}
}
```
#### Service Registration with Libp2p
```typescript
// libp2p service configuration
const libp2p = await createLibp2p({
services: {
identify: identify({ agentVersion: userAgent }),
ping: ping({ maxInboundStreams: 10 }),
metadata: wakuMetadata(pubsubTopics), // Waku metadata service
...options?.services
}
});
```
## Protocol-Specific Stream Management
### Store Protocol
- **Pattern:** Client-server request-response
- **Stream Usage:** One-time use per query
- **Multiplexing:** Single stream per peer, length-prefixed messages
### Filter Protocol
- **Pattern:** Subscription-based with bidirectional communication
- **Stream Usage:** Long-lived subscription streams
- **Multiplexing:** Separate streams for subscribe/unsubscribe and push
### LightPush Protocol
- **Pattern:** Client-server request-response
- **Stream Usage:** One-time use per message publish
- **Multiplexing:** Single stream per peer
### Relay Protocol
- **Pattern:** Gossip-based peer-to-peer
- **Stream Usage:** Managed by GossipSub
- **Multiplexing:** libp2p's GossipSub handles stream management
## Error Handling and Recovery
### Protocol Error Types
```typescript
export enum ProtocolError {
NO_PEER_AVAILABLE = "No peer available",
NO_STREAM_AVAILABLE = "No stream available",
DECODE_FAILED = "Failed to decode",
REMOTE_PEER_REJECTED = "Remote peer rejected",
STREAM_ABORTED = "Stream aborted",
// ... other errors
}
```
### Stream Recovery Mechanisms
- **Automatic retry:** Stream creation retries on failure
- **Peer rotation:** Fall back to alternative peers
- **Connection pooling:** Reuse existing connections
- **Stream locking:** Prevent concurrent stream usage
## Performance Optimizations
### Stream Pooling
```typescript
// Stream reuse mechanism
private getOpenStreamForCodec(peerId: PeerId): Stream | undefined {
const connections = this.libp2p.connectionManager.getConnections(peerId);
const connection = selectOpenConnection(connections);
const stream = connection.streams.find(
(s) => s.protocol === this.multicodec
);
if (stream && !this.isStreamLocked(stream)) {
return stream;
}
}
```
### Connection Management
- **Keep-alive mechanisms:** Ping-based connection health monitoring
- **Connection limits:** Configurable connection pool sizes
- **Priority-based peer selection:** Preferential treatment for bootstrap peers
## Security Considerations
### Protocol Authentication
- **Noise encryption:** All connections encrypted by default
- **Peer identity verification:** Public key-based peer identification
- **Message validation:** Protocol-specific message validation
### Stream Security
- **Stream isolation:** Each protocol uses isolated streams
- **Message size limits:** Prevent DoS attacks via large messages
- **Rate limiting:** Configurable limits on stream creation
## Conclusion
The Waku protocol stack demonstrates a well-architected layered approach where:
1. **Libp2p provides the foundation** with transport, connection management, and protocol negotiation
2. **Multistream-select enables protocol negotiation** through well-defined multicodec identifiers
3. **Stream management abstracts complexity** of connection lifecycle and reuse
4. **Protocol implementations focus on business logic** while leveraging libp2p's infrastructure
This architecture enables scalable, efficient, and secure communication while maintaining clear separation of concerns between transport, networking, and application layers.
## References
- [Waku Relay Protocol Specification](https://rfc.vac.dev/spec/11/)
- [Waku Store Protocol Specification](https://rfc.vac.dev/spec/13/)
- [Waku Filter Protocol Specification](https://rfc.vac.dev/spec/12/)
- [Waku LightPush Protocol Specification](https://rfc.vac.dev/spec/19/)
- [Libp2p Protocol Documentation](https://docs.libp2p.io/concepts/protocols/)
- [Multistream-Select Specification](https://github.com/multiformats/multistream-select)