This commit is contained in:
fryorcraken 2025-08-30 22:03:37 +10:00
parent 914beb6531
commit b15cb51a5c
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
7 changed files with 1459 additions and 0 deletions

View File

@ -0,0 +1,12 @@
{
"permissions": {
"allow": [
"Bash(npm run build:*)",
"Bash(npm run check:*)",
"Bash(npm test)",
"Bash(npm run test:node:*)"
],
"deny": [],
"ask": []
}
}

View File

@ -0,0 +1,243 @@
# SDS-Repair (SDS-R) Implementation Guide
## Overview
SDS-R is an optional extension to the Scalable Data Sync (SDS) protocol that enables collaborative repair of missing messages within a limited time window. It's designed to work over Waku and assumes participants are already in a secure channel.
## Core Concept
When a participant detects missing messages (via causal dependencies), it waits a random backoff period before requesting repairs. Other participants who have the missing message wait their own random backoff before responding. The protocol uses clever timing and grouping to ensure typically only one request and one response per missing message.
---
## Data Structures
### Protobuf Schema Modifications
```protobuf
message HistoryEntry {
string message_id = 1;
optional bytes retrieval_hint = 2;
optional string sender_id = 3; // NEW: Original sender's ID (only for SDS-R)
}
message Message {
string sender_id = 1;
string message_id = 2;
string channel_id = 3;
optional int32 lamport_timestamp = 10;
repeated HistoryEntry causal_history = 11;
optional bytes bloom_filter = 12;
repeated HistoryEntry repair_request = 13; // NEW: List of missing messages
optional bytes content = 20;
}
```
### Additional Participant State
Each participant must maintain:
1. **Outgoing Repair Request Buffer**
- Map: `HistoryEntry -> T_req (timestamp)`
- Sorted by ascending T_req
- Contains missing messages waiting to be requested
2. **Incoming Repair Request Buffer**
- Map: `HistoryEntry -> T_resp (timestamp)`
- Contains repair requests from others that this participant can fulfill
- Only includes requests where participant is in the response group
3. **Augmented Local History**
- Change from base SDS: Store full `Message` objects, not just message IDs
- Only for messages where participant could be a responder
- Needed to rebroadcast messages when responding to repairs
### Global Configuration (per channel)
```
T_min = 30 seconds // Minimum wait before requesting repair
T_max = 120 seconds // Maximum wait for repair window (recommend 120-600)
num_response_groups = max(1, num_participants / 128) // Response group count
```
---
## Critical Algorithms
### 1. Calculate T_req (When to Request Repair)
**IMPORTANT BUG FIX**: The spec has an off-by-one error. Use this corrected formula:
```
T_req = current_time + hash(participant_id, message_id) % (T_max - T_min) + T_min
```
- `participant_id`: Your OWN participant ID (not the sender's)
- `message_id`: The missing message's ID
- Result: Timestamp between `current_time + T_min` and `current_time + T_max`
### 2. Calculate T_resp (When to Respond to Repair)
```
distance = participant_id XOR sender_id
T_resp = current_time + (distance * hash(message_id)) % T_max
```
- `participant_id`: Your OWN participant ID
- `sender_id`: Original sender's ID from the HistoryEntry
- `message_id`: The requested message's ID
- Note: Original sender has distance=0, responds immediately
### 3. Determine Response Group Membership
```
is_in_group = (hash(participant_id, message_id) % num_response_groups) ==
(hash(sender_id, message_id) % num_response_groups)
```
- Only respond to repairs if `is_in_group` is true
- Original sender is always in their own response group
---
## Protocol Implementation Steps
### When SENDING a Message
1. Check outgoing repair request buffer for eligible entries (where `T_req <= current_time`)
2. Take up to 3 eligible entries with lowest T_req values
3. Populate `repair_request` field with these HistoryEntries:
- Include `message_id`
- Include `retrieval_hint` if available
- Include `sender_id` (original sender's ID)
4. If no eligible entries, leave `repair_request` field unset
5. Continue with normal SDS send procedure
### When RECEIVING a Message
1. **Clean up buffers:**
- Remove received message_id from outgoing repair request buffer
- Remove received message_id from incoming repair request buffer
2. **Process causal dependencies:**
- For each missing dependency in causal_history:
- Add to outgoing repair request buffer
- Calculate T_req using formula above
- Include sender_id from the causal history entry
3. **Process repair_request field:**
- For each repair request entry:
a. Remove from your own outgoing buffer (someone else is requesting it)
b. Check if you have this message in local history
c. Check if you're in the response group (use formula above)
d. If both b and c are true:
- Add to incoming repair request buffer
- Calculate T_resp using formula above
4. Continue with normal SDS receive procedure
### Periodic Sweeps
#### Outgoing Repair Request Buffer Sweep (every ~5 seconds)
```python
for entry, t_req in outgoing_buffer:
if current_time >= t_req:
# This entry will be included in next message's repair_request
# No action needed here, just wait for next send
pass
```
#### Incoming Repair Request Buffer Sweep (every ~5 seconds)
```python
for entry, t_resp in incoming_buffer:
if current_time >= t_resp:
message = get_from_local_history(entry.message_id)
if message:
broadcast(message) # Rebroadcast the full original message
remove_from_incoming_buffer(entry)
```
### Periodic Sync Messages with SDS-R
When sending periodic sync messages:
1. Check if there are eligible entries in outgoing repair request buffer
2. If yes, send the sync message WITH repair_request field populated
3. Unlike base SDS, don't suppress sync message even if others recently sent one
---
## Implementation Notes and Edge Cases
### Hash Function
**CRITICAL**: The spec doesn't specify which hash function to use. Recommend:
- Use SHA256 for cryptographic properties
- Convert to integer for modulo operations: `int(hash_bytes[:8], byteorder='big')`
- Must be consistent across all participants
### Participant ID Format
- Must support XOR operation for distance calculation
- Recommend using numeric IDs or convert string IDs to integers
- Must be globally unique within the channel
### Memory Management
1. **Buffer limits**: Implement max size for repair buffers (suggest 1000 entries)
2. **Eviction policy**: Remove oldest T_req/T_resp when at capacity
3. **History retention**: Only keep messages for T_max duration
4. **Response group optimization**: Only cache full messages if you're likely to be in response group
### Edge Cases to Handle
1. **Duplicate repair requests**: Use Set semantics, only track once
2. **Expired repairs**: If T_req > current_time + T_max, remove from buffer
3. **Non-numeric participant IDs**: Hash to integer for XOR operations
4. **Missing sender_id**: Cannot participate in repair for that message
5. **Circular dependencies**: Set maximum recursion depth for dependency resolution
### Typo to Fix
The spec has "Perdiodic" on line 461 - should be "Periodic"
---
## Testing Scenarios
1. **Single missing message**: Verify only one repair request and response
2. **Cascade recovery**: Missing message A depends on missing message B
3. **Original sender offline**: Verify next closest participant responds
4. **Response group isolation**: Verify only in-group participants respond
5. **Buffer overflow**: Test eviction policies
6. **Network partition**: Test behavior when repair window expires
---
## Integration with Base SDS
### Modified State from Base SDS
- Local history stores full Messages, not just IDs
- Additional buffers for repair tracking
- Sender_id must be preserved in HistoryEntry
### Unchanged from Base SDS
- Lamport timestamp management
- Bloom filter operations
- Causal dependency checking
- Message delivery and conflict resolution
---
## Performance Recommendations
1. Use priority queues for T_req/T_resp ordered buffers
2. Index local history by message_id for O(1) lookup
3. Batch repair requests in single message (up to 3)
4. Cache response group calculation results
5. Implement exponential backoff in future version (noted as TODO in spec)
---
## Security Assumptions
- Operating within secure channel (via Waku)
- All participants are authenticated
- Rate limiting via Waku RLN-RELAY
- No additional authentication needed for repairs
- Trust all repair requests from channel members
This implementation guide should be sufficient to implement SDS-R without access to the original specification. The key insight is that SDS-R elegantly uses timing and randomization to coordinate distributed repair without central coordination or excessive network traffic.

View File

@ -0,0 +1,229 @@
import { expect } from "chai";
import {
IncomingRepairBuffer,
OutgoingRepairBuffer,
RepairHistoryEntry
} from "./buffers.js";
describe("OutgoingRepairBuffer", () => {
let buffer: OutgoingRepairBuffer;
beforeEach(() => {
buffer = new OutgoingRepairBuffer(3); // Small buffer for testing
});
it("should add entries and maintain sorted order", () => {
const entry1: RepairHistoryEntry = { messageId: "msg1" };
const entry2: RepairHistoryEntry = { messageId: "msg2" };
const entry3: RepairHistoryEntry = { messageId: "msg3" };
buffer.add(entry2, 2000);
buffer.add(entry1, 1000);
buffer.add(entry3, 3000);
const items = buffer.getItems();
expect(items).to.have.lengthOf(3);
expect(items[0].tReq).to.equal(1000);
expect(items[1].tReq).to.equal(2000);
expect(items[2].tReq).to.equal(3000);
});
it("should not update T_req if message already exists", () => {
const entry: RepairHistoryEntry = { messageId: "msg1" };
buffer.add(entry, 1000);
buffer.add(entry, 2000); // Try to add again with different T_req
const items = buffer.getItems();
expect(items).to.have.lengthOf(1);
expect(items[0].tReq).to.equal(1000); // Should keep original
});
it("should evict oldest entry when buffer is full", () => {
const entry1: RepairHistoryEntry = { messageId: "msg1" };
const entry2: RepairHistoryEntry = { messageId: "msg2" };
const entry3: RepairHistoryEntry = { messageId: "msg3" };
const entry4: RepairHistoryEntry = { messageId: "msg4" };
buffer.add(entry2, 2000);
buffer.add(entry1, 1000);
buffer.add(entry3, 3000);
buffer.add(entry4, 1500); // Should evict msg1 (oldest T_req)
const items = buffer.getItems();
expect(items).to.have.lengthOf(3);
expect(buffer.has("msg1")).to.be.false; // msg1 should be evicted
expect(buffer.has("msg2")).to.be.true;
expect(buffer.has("msg3")).to.be.true;
expect(buffer.has("msg4")).to.be.true;
});
it("should get eligible entries based on current time", () => {
const entry1: RepairHistoryEntry = { messageId: "msg1" };
const entry2: RepairHistoryEntry = { messageId: "msg2" };
const entry3: RepairHistoryEntry = { messageId: "msg3" };
buffer.add(entry1, 1000);
buffer.add(entry2, 2000);
buffer.add(entry3, 3000);
const eligible = buffer.getEligible(1500, 3);
expect(eligible).to.have.lengthOf(1);
expect(eligible[0].messageId).to.equal("msg1");
const eligible2 = buffer.getEligible(2500, 3);
expect(eligible2).to.have.lengthOf(2);
expect(eligible2[0].messageId).to.equal("msg1");
expect(eligible2[1].messageId).to.equal("msg2");
});
it("should respect maxRequests limit", () => {
const entry1: RepairHistoryEntry = { messageId: "msg1" };
const entry2: RepairHistoryEntry = { messageId: "msg2" };
const entry3: RepairHistoryEntry = { messageId: "msg3" };
buffer.add(entry1, 1000);
buffer.add(entry2, 2000);
buffer.add(entry3, 3000);
const eligible = buffer.getEligible(5000, 2); // All are eligible but limit to 2
expect(eligible).to.have.lengthOf(2);
expect(eligible[0].messageId).to.equal("msg1");
expect(eligible[1].messageId).to.equal("msg2");
});
it("should remove entries", () => {
const entry1: RepairHistoryEntry = { messageId: "msg1" };
const entry2: RepairHistoryEntry = { messageId: "msg2" };
buffer.add(entry1, 1000);
buffer.add(entry2, 2000);
expect(buffer.size).to.equal(2);
buffer.remove("msg1");
expect(buffer.size).to.equal(1);
expect(buffer.has("msg1")).to.be.false;
expect(buffer.has("msg2")).to.be.true;
});
it("should handle retrieval hint and sender_id", () => {
const hint = new Uint8Array([1, 2, 3]);
const entry: RepairHistoryEntry = {
messageId: "msg1",
retrievalHint: hint,
senderId: "sender1"
};
buffer.add(entry, 1000);
const all = buffer.getAll();
expect(all[0].retrievalHint).to.deep.equal(hint);
expect(all[0].senderId).to.equal("sender1");
});
});
describe("IncomingRepairBuffer", () => {
let buffer: IncomingRepairBuffer;
beforeEach(() => {
buffer = new IncomingRepairBuffer(3); // Small buffer for testing
});
it("should add entries and maintain sorted order", () => {
const entry1: RepairHistoryEntry = { messageId: "msg1" };
const entry2: RepairHistoryEntry = { messageId: "msg2" };
const entry3: RepairHistoryEntry = { messageId: "msg3" };
buffer.add(entry2, 2000);
buffer.add(entry1, 1000);
buffer.add(entry3, 3000);
const items = buffer.getItems();
expect(items).to.have.lengthOf(3);
expect(items[0].tResp).to.equal(1000);
expect(items[1].tResp).to.equal(2000);
expect(items[2].tResp).to.equal(3000);
});
it("should ignore duplicate entries", () => {
const entry: RepairHistoryEntry = { messageId: "msg1" };
buffer.add(entry, 1000);
buffer.add(entry, 500); // Try to add again with earlier T_resp
const items = buffer.getItems();
expect(items).to.have.lengthOf(1);
expect(items[0].tResp).to.equal(1000); // Should keep original
});
it("should evict furthest entry when buffer is full", () => {
const entry1: RepairHistoryEntry = { messageId: "msg1" };
const entry2: RepairHistoryEntry = { messageId: "msg2" };
const entry3: RepairHistoryEntry = { messageId: "msg3" };
const entry4: RepairHistoryEntry = { messageId: "msg4" };
buffer.add(entry1, 1000);
buffer.add(entry2, 2000);
buffer.add(entry3, 3000);
buffer.add(entry4, 1500); // Should evict msg3 (furthest T_resp)
const items = buffer.getItems();
expect(items).to.have.lengthOf(3);
expect(buffer.has("msg3")).to.be.false; // msg3 should be evicted
expect(buffer.has("msg1")).to.be.true;
expect(buffer.has("msg2")).to.be.true;
expect(buffer.has("msg4")).to.be.true;
});
it("should get and remove ready entries", () => {
const entry1: RepairHistoryEntry = { messageId: "msg1" };
const entry2: RepairHistoryEntry = { messageId: "msg2" };
const entry3: RepairHistoryEntry = { messageId: "msg3" };
buffer.add(entry1, 1000);
buffer.add(entry2, 2000);
buffer.add(entry3, 3000);
const ready = buffer.getReady(1500);
expect(ready).to.have.lengthOf(1);
expect(ready[0].messageId).to.equal("msg1");
// Entry should be removed from buffer
expect(buffer.size).to.equal(2);
expect(buffer.has("msg1")).to.be.false;
const ready2 = buffer.getReady(2500);
expect(ready2).to.have.lengthOf(1);
expect(ready2[0].messageId).to.equal("msg2");
expect(buffer.size).to.equal(1);
expect(buffer.has("msg2")).to.be.false;
expect(buffer.has("msg3")).to.be.true;
});
it("should remove entries", () => {
const entry1: RepairHistoryEntry = { messageId: "msg1" };
const entry2: RepairHistoryEntry = { messageId: "msg2" };
buffer.add(entry1, 1000);
buffer.add(entry2, 2000);
expect(buffer.size).to.equal(2);
buffer.remove("msg1");
expect(buffer.size).to.equal(1);
expect(buffer.has("msg1")).to.be.false;
expect(buffer.has("msg2")).to.be.true;
});
it("should clear all entries", () => {
const entry1: RepairHistoryEntry = { messageId: "msg1" };
const entry2: RepairHistoryEntry = { messageId: "msg2" };
buffer.add(entry1, 1000);
buffer.add(entry2, 2000);
expect(buffer.size).to.equal(2);
buffer.clear();
expect(buffer.size).to.equal(0);
});
});

View File

@ -0,0 +1,249 @@
import { Logger } from "@waku/utils";
import _ from "lodash";
const log = new Logger("sds:repair:buffers");
/**
* Extended HistoryEntry that includes sender_id for SDS-R
*/
export interface RepairHistoryEntry {
messageId: string;
retrievalHint?: Uint8Array;
senderId?: string; // Original sender's ID for repair calculations
}
/**
* Entry in the outgoing repair buffer with request timing
*/
interface OutgoingBufferEntry {
entry: RepairHistoryEntry;
tReq: number; // Timestamp when this repair request should be sent
}
/**
* Entry in the incoming repair buffer with response timing
*/
interface IncomingBufferEntry {
entry: RepairHistoryEntry;
tResp: number; // Timestamp when we should respond with this repair
}
/**
* Buffer for outgoing repair requests (messages we need)
* Maintains a sorted array by T_req for efficient retrieval of eligible entries
*/
export class OutgoingRepairBuffer {
// Sorted array by T_req (ascending - earliest first)
private items: OutgoingBufferEntry[] = [];
private readonly maxSize: number;
constructor(maxSize = 1000) {
this.maxSize = maxSize;
}
/**
* Add a missing message to the outgoing repair request buffer
* If message already exists, it is not updated (keeps original T_req)
*/
public add(entry: RepairHistoryEntry, tReq: number): void {
const messageId = entry.messageId;
// Check if already exists - do NOT update T_req per spec
const existingIndex = this.items.findIndex(item => item.entry.messageId === messageId);
if (existingIndex !== -1) {
log.info(`Message ${messageId} already in outgoing buffer, keeping original T_req`);
return;
}
// Check buffer size limit
if (this.items.length >= this.maxSize) {
// Evict oldest T_req entry (first in sorted array since we want to evict oldest)
const evicted = this.items.shift()!;
log.warn(`Buffer full, evicted oldest entry ${evicted.entry.messageId} with T_req ${evicted.tReq}`);
}
// Add new entry and re-sort
const newEntry: OutgoingBufferEntry = { entry, tReq };
const combined = [...this.items, newEntry];
// Sort by T_req (ascending)
combined.sort((a, b) => a.tReq - b.tReq);
this.items = combined;
log.info(`Added ${messageId} to outgoing buffer with T_req: ${tReq}`);
}
/**
* Remove a message from the buffer (e.g., when received)
*/
public remove(messageId: string): void {
this.items = this.items.filter(item => item.entry.messageId !== messageId);
}
/**
* Get eligible repair requests (where T_req <= currentTime)
* Returns up to maxRequests entries from the front of the sorted array
*/
public getEligible(currentTime: number, maxRequests = 3): RepairHistoryEntry[] {
const eligible: RepairHistoryEntry[] = [];
// Iterate from front of sorted array (earliest T_req first)
for (const item of this.items) {
if (item.tReq <= currentTime && eligible.length < maxRequests) {
eligible.push(item.entry);
} else if (item.tReq > currentTime) {
// Since array is sorted, no more eligible entries
break;
}
}
return eligible;
}
/**
* Check if a message is in the buffer
*/
public has(messageId: string): boolean {
return this.items.some(item => item.entry.messageId === messageId);
}
/**
* Get the current buffer size
*/
public get size(): number {
return this.items.length;
}
/**
* Clear all entries
*/
public clear(): void {
this.items = [];
}
/**
* Get all entries (for testing/debugging)
*/
public getAll(): RepairHistoryEntry[] {
return this.items.map(item => item.entry);
}
/**
* Get items array directly (for testing)
*/
public getItems(): OutgoingBufferEntry[] {
return [...this.items];
}
}
/**
* Buffer for incoming repair requests (repairs we need to send)
* Maintains a sorted array by T_resp for efficient retrieval of ready entries
*/
export class IncomingRepairBuffer {
// Sorted array by T_resp (ascending - earliest first)
private items: IncomingBufferEntry[] = [];
private readonly maxSize: number;
constructor(maxSize = 1000) {
this.maxSize = maxSize;
}
/**
* Add a repair request that we can fulfill
* If message already exists, it is ignored (not updated)
*/
public add(entry: RepairHistoryEntry, tResp: number): void {
const messageId = entry.messageId;
// Check if already exists - ignore per spec
const existingIndex = this.items.findIndex(item => item.entry.messageId === messageId);
if (existingIndex !== -1) {
log.info(`Message ${messageId} already in incoming buffer, ignoring`);
return;
}
// Check buffer size limit
if (this.items.length >= this.maxSize) {
// Evict furthest T_resp entry (last in sorted array)
const evicted = this.items.pop()!;
log.warn(`Buffer full, evicted furthest entry ${evicted.entry.messageId} with T_resp ${evicted.tResp}`);
}
// Add new entry and re-sort
const newEntry: IncomingBufferEntry = { entry, tResp };
const combined = [...this.items, newEntry];
// Sort by T_resp (ascending)
combined.sort((a, b) => a.tResp - b.tResp);
this.items = combined;
log.info(`Added ${messageId} to incoming buffer with T_resp: ${tResp}`);
}
/**
* Remove a message from the buffer
*/
public remove(messageId: string): void {
this.items = this.items.filter(item => item.entry.messageId !== messageId);
}
/**
* Get repairs ready to be sent (where T_resp <= currentTime)
* Removes and returns ready entries
*/
public getReady(currentTime: number): RepairHistoryEntry[] {
const ready: RepairHistoryEntry[] = [];
const remaining: IncomingBufferEntry[] = [];
for (const item of this.items) {
if (item.tResp <= currentTime) {
ready.push(item.entry);
log.info(`Repair for ${item.entry.messageId} is ready to be sent`);
} else {
// Since array is sorted, all remaining entries are not ready
remaining.push(item);
}
}
// Keep only non-ready entries
this.items = remaining;
return ready;
}
/**
* Check if a message is in the buffer
*/
public has(messageId: string): boolean {
return this.items.some(item => item.entry.messageId === messageId);
}
/**
* Get the current buffer size
*/
public get size(): number {
return this.items.length;
}
/**
* Clear all entries
*/
public clear(): void {
this.items = [];
}
/**
* Get all entries (for testing/debugging)
*/
public getAll(): RepairHistoryEntry[] {
return this.items.map(item => item.entry);
}
/**
* Get items array directly (for testing)
*/
public getItems(): IncomingBufferEntry[] {
return [...this.items];
}
}

View File

@ -0,0 +1,285 @@
import { Logger } from "@waku/utils";
import { Message } from "../message.js";
import {
IncomingRepairBuffer,
OutgoingRepairBuffer,
RepairHistoryEntry
} from "./buffers.js";
import {
bigintToNumber,
calculateXorDistance,
combinedHash,
hashString,
ParticipantId
} from "./utils.js";
const log = new Logger("sds:repair:manager");
/**
* Configuration for SDS-R repair protocol
*/
export interface RepairConfig {
/** Minimum wait time before requesting repair (milliseconds) */
tMin?: number;
/** Maximum wait time for repair window (milliseconds) */
tMax?: number;
/** Number of response groups for load distribution */
numResponseGroups?: number;
/** Maximum buffer size for repair requests */
bufferSize?: number;
/** Whether repair is enabled */
enabled?: boolean;
}
/**
* Default configuration values based on spec recommendations
*/
export const DEFAULT_REPAIR_CONFIG: Required<RepairConfig> = {
tMin: 30000, // 30 seconds
tMax: 120000, // 120 seconds
numResponseGroups: 1, // Recommendation is 1 group per 128 participants
bufferSize: 1000,
enabled: true
};
/**
* Manager for SDS-R repair protocol
* Handles repair request/response timing and coordination
*/
export class RepairManager {
private readonly participantId: ParticipantId;
private readonly config: Required<RepairConfig>;
private readonly outgoingBuffer: OutgoingRepairBuffer;
private readonly incomingBuffer: IncomingRepairBuffer;
constructor(participantId: ParticipantId, config: RepairConfig = {}) {
this.participantId = participantId;
this.config = { ...DEFAULT_REPAIR_CONFIG, ...config };
this.outgoingBuffer = new OutgoingRepairBuffer(this.config.bufferSize);
this.incomingBuffer = new IncomingRepairBuffer(this.config.bufferSize);
log.info(`RepairManager initialized for participant ${participantId}`);
}
/**
* Calculate T_req - when to request repair for a missing message
* Per spec (with bug fix): T_req = current_time + hash(participant_id, message_id) % (T_max - T_min) + T_min
*/
public calculateTReq(messageId: string, currentTime = Date.now()): number {
const hash = combinedHash(this.participantId, messageId);
const range = BigInt(this.config.tMax - this.config.tMin);
const offset = bigintToNumber(hash % range) + this.config.tMin;
return currentTime + offset;
}
/**
* Calculate T_resp - when to respond with a repair
* Per spec: T_resp = current_time + (distance * hash(message_id)) % T_max
* where distance = participant_id XOR sender_id
*/
public calculateTResp(
senderId: ParticipantId,
messageId: string,
currentTime = Date.now()
): number {
const distance = calculateXorDistance(this.participantId, senderId);
const messageHash = hashString(messageId);
const product = distance * messageHash;
const offset = bigintToNumber(product % BigInt(this.config.tMax));
return currentTime + offset;
}
/**
* Determine if this participant is in the response group for a message
* Per spec: (hash(participant_id, message_id) % num_response_groups) ==
* (hash(sender_id, message_id) % num_response_groups)
*/
public isInResponseGroup(
senderId: ParticipantId,
messageId: string
): boolean {
if (!senderId) {
// Cannot determine response group without sender_id
return false;
}
const numGroups = BigInt(this.config.numResponseGroups);
if (numGroups <= 1n) {
// Single group, everyone is in it
return true;
}
const participantGroup = combinedHash(this.participantId, messageId) % numGroups;
const senderGroup = combinedHash(senderId, messageId) % numGroups;
return participantGroup === senderGroup;
}
/**
* Handle missing dependencies by adding them to outgoing repair buffer
* Called when causal dependencies are detected as missing
*/
public onMissingDependencies(
missingEntries: RepairHistoryEntry[],
currentTime = Date.now()
): void {
if (!this.config.enabled) {
return;
}
for (const entry of missingEntries) {
// Calculate when to request this repair
const tReq = this.calculateTReq(entry.messageId, currentTime);
// Add to outgoing buffer
this.outgoingBuffer.add(entry, tReq);
log.info(
`Added missing dependency ${entry.messageId} to repair buffer with T_req=${tReq}`
);
}
}
/**
* Handle receipt of a message - remove from repair buffers
* Called when a message is successfully received
*/
public onMessageReceived(messageId: string): void {
// Remove from both buffers as we no longer need to request or respond
this.outgoingBuffer.remove(messageId);
this.incomingBuffer.remove(messageId);
log.info(`Removed ${messageId} from repair buffers after receipt`);
}
/**
* Get repair requests that are eligible to be sent
* Returns up to maxRequests entries where T_req <= currentTime
*/
public getRepairRequests(
maxRequests = 3,
currentTime = Date.now()
): RepairHistoryEntry[] {
if (!this.config.enabled) {
return [];
}
return this.outgoingBuffer.getEligible(currentTime, maxRequests);
}
/**
* Process incoming repair requests from other participants
* Adds to incoming buffer if we can fulfill and are in response group
*/
public processIncomingRepairRequests(
requests: RepairHistoryEntry[],
localHistory: Map<string, Message>,
currentTime = Date.now()
): void {
if (!this.config.enabled) {
return;
}
for (const request of requests) {
// Remove from our own outgoing buffer (someone else is requesting it)
this.outgoingBuffer.remove(request.messageId);
// Check if we have this message
if (!localHistory.has(request.messageId)) {
log.info(`Cannot fulfill repair for ${request.messageId} - not in local history`);
continue;
}
// Check if we're in the response group
if (!request.senderId) {
log.warn(`Cannot determine response group for ${request.messageId} - missing sender_id`);
continue;
}
if (!this.isInResponseGroup(request.senderId, request.messageId)) {
log.info(`Not in response group for ${request.messageId}`);
continue;
}
// Calculate when to respond
const tResp = this.calculateTResp(request.senderId, request.messageId, currentTime);
// Add to incoming buffer
this.incomingBuffer.add(request, tResp);
log.info(
`Will respond to repair request for ${request.messageId} at T_resp=${tResp}`
);
}
}
/**
* Sweep outgoing buffer for repairs that should be requested
* Returns entries where T_req <= currentTime
*/
public sweepOutgoingBuffer(
maxRequests = 3,
currentTime = Date.now()
): RepairHistoryEntry[] {
if (!this.config.enabled) {
return [];
}
return this.getRepairRequests(maxRequests, currentTime);
}
/**
* Sweep incoming buffer for repairs ready to be sent
* Returns messages that should be rebroadcast
*/
public sweepIncomingBuffer(
localHistory: Map<string, Message>,
currentTime = Date.now()
): Message[] {
if (!this.config.enabled) {
return [];
}
const ready = this.incomingBuffer.getReady(currentTime);
const messages: Message[] = [];
for (const entry of ready) {
const message = localHistory.get(entry.messageId);
if (message) {
messages.push(message);
log.info(`Sending repair for ${entry.messageId}`);
} else {
log.warn(`Message ${entry.messageId} no longer in local history`);
}
}
return messages;
}
/**
* Clear all buffers
*/
public clear(): void {
this.outgoingBuffer.clear();
this.incomingBuffer.clear();
}
/**
* Check if repair is enabled
*/
public get isEnabled(): boolean {
return this.config.enabled;
}
/**
* Update number of response groups (e.g., when participants change)
*/
public updateResponseGroups(numParticipants: number): void {
// Per spec: num_response_groups = max(1, num_participants / 128)
this.config.numResponseGroups = Math.max(1, Math.floor(numParticipants / 128));
log.info(`Updated response groups to ${this.config.numResponseGroups} for ${numParticipants} participants`);
}
}

View File

@ -0,0 +1,78 @@
import { sha256 } from "@noble/hashes/sha2";
import { bytesToHex } from "@noble/hashes/utils";
/**
* ParticipantId can be a string or converted to a numeric representation for XOR operations
*/
export type ParticipantId = string;
/**
* Compute SHA256 hash and convert to integer for modulo operations
* Uses first 8 bytes of hash for the integer conversion
*/
export function hashToInteger(input: string): bigint {
const hashBytes = sha256(new TextEncoder().encode(input));
// Use first 8 bytes for a 64-bit integer
const view = new DataView(hashBytes.buffer, 0, 8);
return view.getBigUint64(0, false); // big-endian
}
/**
* Compute combined hash for (participantId, messageId) and convert to integer
* This is used for T_req calculations and response group membership
*/
export function combinedHash(
participantId: ParticipantId,
messageId: string
): bigint {
const combined = `${participantId}${messageId}`;
return hashToInteger(combined);
}
/**
* Convert ParticipantId to numeric representation for XOR operations
* TODO: Not per spec, further review needed
* The spec assumes participant IDs support XOR natively, but we're using
* SHA256 hash to ensure consistent numeric representation for string IDs
*/
export function participantIdToNumeric(participantId: ParticipantId): bigint {
return hashToInteger(participantId);
}
/**
* Calculate XOR distance between two participant IDs
* Used for T_resp calculations where distance affects response timing
*/
export function calculateXorDistance(
participantId1: ParticipantId,
participantId2: ParticipantId
): bigint {
const numeric1 = participantIdToNumeric(participantId1);
const numeric2 = participantIdToNumeric(participantId2);
return numeric1 ^ numeric2;
}
/**
* Helper to convert bigint to number for timing calculations
* Ensures the result fits in JavaScript's number range
*/
export function bigintToNumber(value: bigint): number {
// For timing calculations, we modulo by MAX_SAFE_INTEGER to ensure it fits
const maxSafe = BigInt(Number.MAX_SAFE_INTEGER);
return Number(value % maxSafe);
}
/**
* Calculate hash for a single string (used for message_id in T_resp)
*/
export function hashString(input: string): bigint {
return hashToInteger(input);
}
/**
* Convert a hash result to hex string for debugging/logging
*/
export function hashToHex(input: string): string {
const hashBytes = sha256(new TextEncoder().encode(input));
return bytesToHex(hashBytes);
}

363
sds.md Normal file
View File

@ -0,0 +1,363 @@
---
title: SDS
name: Scalable Data Sync protocol for distributed logs
status: raw
editor: Hanno Cornelius <hanno@status.im>
contributors:
- Akhil Peddireddy <akhil@status.im>
---
## Abstract
This specification introduces the Scalable Data Sync (SDS) protocol
to achieve end-to-end reliability
when consolidating distributed logs in a decentralized manner.
The protocol is designed for a peer-to-peer (p2p) topology
where an append-only log is maintained by each member of a group of nodes
who may individually append new entries to their local log at any time and
is interested in merging new entries from other nodes in real-time or close to real-time
while maintaining a consistent order.
The outcome of the log consolidation procedure is
that all nodes in the group eventually reflect in their own logs
the same entries in the same order.
The protocol aims to scale to very large groups.
## Motivation
A common application that fits this model is a p2p group chat (or group communication),
where the participants act as log nodes
and the group conversation is modelled as the consolidated logs
maintained on each node.
The problem of end-to-end reliability can then be stated as
ensuring that all participants eventually see the same sequence of messages
in the same causal order,
despite the challenges of network latency, message loss,
and scalability present in any communications transport layer.
The rest of this document will assume the terminology of a group communication:
log nodes being the _participants_ in the group chat
and the logged entries being the _messages_ exchanged between participants.
## Design Assumptions
We make the following simplifying assumptions for a proposed reliability protocol:
* **Broadcast routing:**
Messages are broadcast disseminated by the underlying transport.
The selected transport takes care of routing messages
to all participants of the communication.
* **Store nodes:**
There are high-availability caches (a.k.a. Store nodes)
from which missed messages can be retrieved.
These caches maintain the full history of all messages that have been broadcast.
This is an optional element in the protocol design,
but improves scalability by reducing direct interactions between participants.
* **Message ID:**
Each message has a globally unique, immutable ID (or hash).
Messages can be requested from the high-availability caches or
other participants using the corresponding message ID.
* **Participant ID:**
Each participant has a globally unique, immutable ID
visible to other participants in the communication.
## Wire protocol
The keywords “MUST”, “MUST NOT”, “REQUIRED”, “SHALL”, “SHALL NOT”, “SHOULD”,
“SHOULD NOT”, “RECOMMENDED”, “MAY”, and
“OPTIONAL” in this document are to be interpreted as described in [2119](https://www.ietf.org/rfc/rfc2119.txt).
### Message
Messages MUST adhere to the following meta structure:
```protobuf
syntax = "proto3";
message HistoryEntry {
string message_id = 1; // Unique identifier of the SDS message, as defined in `Message`
optional bytes retrieval_hint = 2; // Optional information to help remote parties retrieve this SDS message; For example, A Waku deterministic message hash or routing payload hash
}
message Message {
string sender_id = 1; // Participant ID of the message sender
string message_id = 2; // Unique identifier of the message
string channel_id = 3; // Identifier of the channel to which the message belongs
optional int32 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel
repeated HistoryEntry causal_history = 11; // List of preceding message IDs that this message causally depends on. Generally 2 or 3 message IDs are included.
optional bytes bloom_filter = 12; // Bloom filter representing received message IDs in channel
optional bytes content = 20; // Actual content of the message
}
```
The sending participant MUST include its own globally unique identifier in the `sender_id` field.
In addition, it MUST include a globally unique identifier for the message in the `message_id` field,
likely based on a message hash.
The `channel_id` field MUST be set to the identifier of the channel of group communication
that is being synchronized.
For simple group communications without individual channels,
the `channel_id` SHOULD be set to `0`.
The `lamport_timestamp`, `causal_history` and
`bloom_filter` fields MUST be set according to the [protocol steps](#protocol-steps)
set out below.
These fields MAY be left unset in the case of [ephemeral messages](#ephemeral-messages).
The message `content` MAY be left empty for [periodic sync messages](#periodic-sync-message),
otherwise it MUST contain the application-level content
> **_Note:_** Close readers may notice that, outside of filtering messages originating from the sender itself,
the `sender_id` field is not used for much.
Its importance is expected to increase once a p2p retrieval mechanism is added to SDS, as is planned for the protocol.
### Participant state
Each participant MUST maintain:
* A Lamport timestamp for each channel of communication,
initialized to current epoch time in nanosecond resolution.
* A bloom filter for received message IDs per channel.
The bloom filter SHOULD be rolled over and
recomputed once it reaches a predefined capacity of message IDs.
Furthermore,
it SHOULD be designed to minimize false positives through an optimal selection of
size and hash functions.
* A buffer for unacknowledged outgoing messages
* A buffer for incoming messages with unmet causal dependencies
* A local log (or history) for each channel,
containing all message IDs in the communication channel,
ordered by Lamport timestamp.
Messages in the unacknowledged outgoing buffer can be in one of three states:
1. **Unacknowledged** - there has been no acknowledgement of message receipt
by any participant in the channel
2. **Possibly acknowledged** - there has been ambiguous indication that the message
has been _possibly_ received by at least one participant in the channel
3. **Acknowledged** - there has been sufficient indication that the message
has been received by at least some of the participants in the channel.
This state will also remove the message from the outgoing buffer.
### Protocol Steps
For each channel of communication,
participants MUST follow these protocol steps to populate and interpret
the `lamport_timestamp`, `causal_history` and `bloom_filter` fields.
#### Send Message
Before broadcasting a message:
* the participant MUST increase its local Lamport timestamp by `1` and
include this in the `lamport_timestamp` field.
* the participant MUST determine the preceding few message IDs in the local history
and include these in an ordered list in the `causal_history` field.
The number of message IDs to include in the `causal_history` depends on the application.
We recommend a causal history of two message IDs.
* the participant MAY include a `retrieval_hint` in the `HistoryEntry`
for each message ID in the `causal_history` field.
This is an application-specific field to facilitate retrieval of messages,
e.g. from high-availability caches.
* the participant MUST include the current `bloom_filter`
state in the broadcast message.
After broadcasting a message,
the message MUST be added to the participants buffer
of unacknowledged outgoing messages.
#### Receive Message
Upon receiving a message,
* the participant SHOULD ignore the message if it has a `sender_id` matching its own.
* the participant MAY deduplicate the message by comparing its `message_id` to previously received message IDs.
* the participant MUST [review the ACK status](#review-ack-status) of messages
in its unacknowledged outgoing buffer
using the received message's causal history and bloom filter.
* if the message has a populated `content` field,
the participant MUST include the received message ID in its local bloom filter.
* the participant MUST verify that all causal dependencies are met
for the received message.
Dependencies are met if the message IDs in the `causal_history` of the received message
appear in the local history of the receiving participant.
If all dependencies are met and the message has a populated `content` field,
the participant MUST [deliver the message](#deliver-message).
If dependencies are unmet,
the participant MUST add the message to the incoming buffer of messages
with unmet causal dependencies.
#### Deliver Message
Triggered by the [Receive Message](#receive-message) procedure.
If the received messages Lamport timestamp is greater than the participant's
local Lamport timestamp,
the participant MUST update its local Lamport timestamp to match the received message.
The participant MUST insert the message ID into its local log,
based on Lamport timestamp.
If one or more message IDs with the same Lamport timestamp already exists,
the participant MUST follow the [Resolve Conflicts](#resolve-conflicts) procedure.
#### Resolve Conflicts
Triggered by the [Deliver Message](#deliver-message) procedure.
The participant MUST order messages with the same Lamport timestamp
in ascending order of message ID.
If the message ID is implemented as a hash of the message,
this means the message with the lowest hash would precede
other messages with the same Lamport timestamp in the local log.
#### Review ACK Status
Triggered by the [Receive Message](#receive-message) procedure.
For each message in the unacknowledged outgoing buffer,
based on the received `bloom_filter` and `causal_history`:
* the participant MUST mark all messages in the received `causal_history` as **acknowledged**.
* the participant MUST mark all messages included in the `bloom_filter`
as **possibly acknowledged**.
If a message appears as **possibly acknowledged** in multiple received bloom filters,
the participant MAY mark it as acknowledged based on probabilistic grounds,
taking into account the bloom filter size and hash number.
#### Periodic Incoming Buffer Sweep
The participant MUST periodically check causal dependencies for each message
in the incoming buffer.
For each message in the incoming buffer:
* the participant MAY attempt to retrieve missing dependencies from the Store node
(high-availability cache) or other peers.
It MAY use the application-specific `retrieval_hint` in the `HistoryEntry` to facilitate retrieval.
* if all dependencies of a message are met,
the participant MUST proceed to [deliver the message](#deliver-message).
If a message's causal dependencies have failed to be met
after a predetermined amount of time,
the participant MAY mark them as **irretrievably lost**.
#### Periodic Outgoing Buffer Sweep
The participant MUST rebroadcast **unacknowledged** outgoing messages
after a set period.
The participant SHOULD use distinct resend periods for **unacknowledged** and
**possibly acknowledged** messages,
prioritizing **unacknowledged** messages.
#### Periodic Sync Message
For each channel of communication,
participants SHOULD periodically send sync messages to maintain state.
These sync messages:
* MUST be sent with empty content
* MUST include an incremented Lamport timestamp
* MUST include causal history and bloom filter according to regular message rules
* MUST NOT be added to the unacknowledged outgoing buffer
* MUST NOT be included in causal histories of subsequent messages
* MUST NOT be included in bloom filters
* MUST NOT be added to the local log
Since sync messages are not persisted,
they MAY have non-unique message IDs without impacting the protocol.
To avoid network activity bursts in large groups,
a participant MAY choose to only send periodic sync messages
if no other messages have been broadcast in the channel after a random backoff period.
Participants MUST process the causal history and bloom filter of these sync messages
following the same steps as regular messages,
but MUST NOT persist the sync messages themselves.
#### Ephemeral Messages
Participants MAY choose to send short-lived messages for which no synchronization
or reliability is required.
These messages are termed _ephemeral_.
Ephemeral messages SHOULD be sent with `lamport_timestamp`, `causal_history`, and
`bloom_filter` unset.
Ephemeral messages SHOULD NOT be added to the unacknowledged outgoing buffer
after broadcast.
Upon reception,
ephemeral messages SHOULD be delivered immediately without buffering for causal dependencies
or including in the local log.
## Implementation Suggestions
This section provides practical guidance based on the js-waku implementation of SDS.
### Default Configuration Values
The js-waku implementation uses the following defaults:
- **Bloom filter capacity**: 10,000 messages
- **Bloom filter error rate**: 0.001 (0.1% false positive rate)
- **Causal history size**: 200 message IDs
- **Possible ACKs threshold**: 2 bloom filter hits before considering a message acknowledged
With 200 messages in causal history, assuming 32-byte message IDs and 32-byte retrieval hints (e.g., Waku message hashes),
each message carries 200 × 64 bytes = 12.8 KB of causal history overhead.
### External Task Scheduling
The js-waku implementation delegates periodic task scheduling to the library consumer by providing methods:
- `processTasks()`: Process queued send/receive operations
- `sweepIncomingBuffer()`: Check and deliver messages with met dependencies, returns missing dependencies
- `sweepOutgoingBuffer()`: Return unacknowledged and possibly acknowledged messages for retry
- `pushOutgoingSyncMessage(callback)`: Send a sync message
The implementation does not include internal timers,
allowing applications to integrate SDS with their existing scheduling infrastructure.
### Message Processing
#### Handling Missing Dependencies
When `sweepIncomingBuffer()` returns missing dependencies,
the implementation emits an `InMessageMissing` event with `HistoryEntry[]` containing:
- `messageId`: The missing message identifier
- `retrievalHint`: Optional bytes to help retrieve the message (e.g., transport-specific hash)
#### Timeout for Lost Messages
The `timeoutForLostMessagesMs` option allows marking messages as irretrievably lost after a timeout.
When configured, the implementation emits an `InMessageLost` event after the timeout expires.
### Events Emitted
The js-waku implementation uses a `TypedEventEmitter` pattern to emit events for:
- **Incoming messages**: received, delivered, missing dependencies, lost (after timeout)
- **Outgoing messages**: sent, acknowledged, possibly acknowledged
- **Sync messages**: sent, received
- **Errors**: task execution failures
### SDK Usage: ReliableChannel
The SDK provides a high-level `ReliableChannel` abstraction that wraps the core SDS `MessageChannel` with automatic task scheduling and Waku protocol integration:
#### Configuration
The ReliableChannel uses these default intervals:
- **Sync message interval**: 30 seconds minimum between sync messages (randomized backoff)
- **Retry interval**: 30 seconds for unacknowledged messages
- **Max retry attempts**: 10 attempts before giving up
- **Store query interval**: 10 seconds for missing message retrieval
#### Task Scheduling Implementation
The SDK automatically schedules SDS periodic tasks:
- **Sync messages**: Uses exponential backoff with randomization; sent faster (0.5x multiplier) after receiving content to acknowledge others
- **Outgoing buffer sweeps**: Triggered after each retry interval for unacknowledged messages
- **Incoming buffer sweeps**: Performed after each incoming message and during missing message retrieval
- **Process tasks**: Called immediately after sending/receiving messages and during sync
#### Integration with Waku Protocols
ReliableChannel integrates SDS with Waku:
- **Sending**: Uses LightPush or Relay protocols; includes Waku message hash as retrieval hint (32 bytes)
- **Receiving**: Subscribes via Filter protocol; unwraps SDS messages before passing to application
- **Missing message retrieval**: Queries Store nodes using retrieval hints from causal history
- **Query on connect**: Automatically queries Store when connecting to new peers (enabled by default)
## Copyright
Copyright and related rights waived via [CC0](https://creativecommons.org/publicdomain/zero/1.0/).