feat: minimal SDS integration for vote consistency

- Added @waku/sds dependency
- Created MinimalSDSWrapper for basic causal ordering
- Enhanced vote messages with SDS metadata (Lamport timestamps, causal history)
- Updated MessageManager to use SDS for vote conflict resolution
- Added SDS status indicator in header
- Added debug logging for SDS operations

This is a minimal proof-of-concept that:
- Uses a single SDS channel for all votes
- Only applies to vote messages (other message types unchanged)
- Maintains backward compatibility
- Can be disabled by removing the SDS enhancement
This commit is contained in:
Arseniy Klempner 2025-06-11 14:07:47 -07:00
parent 63bbdde5e2
commit ac6e6a7d21
No known key found for this signature in database
GPG Key ID: 51653F18863BD24B
9 changed files with 343 additions and 6 deletions

148
package-lock.json generated
View File

@ -40,6 +40,7 @@
"@radix-ui/react-tooltip": "^1.1.4",
"@tanstack/react-query": "^5.56.2",
"@waku/sdk": "^0.0.30",
"@waku/sds": "^0.0.3",
"class-variance-authority": "^0.7.1",
"clsx": "^2.1.1",
"cmdk": "^1.0.0",
@ -3924,6 +3925,153 @@
"node": ">=20"
}
},
"node_modules/@waku/sds": {
"version": "0.0.3",
"resolved": "https://registry.npmjs.org/@waku/sds/-/sds-0.0.3.tgz",
"integrity": "sha512-tKqa8S4GW1x9Xfmhm+kV7IlylYCXw2sE/bDS9ZM58VAUww98Tli8218ezqiu8uE+MPLmC/0SIPlRbrpveJ2qig==",
"dependencies": {
"@libp2p/interface": "2.7.0",
"@noble/hashes": "^1.7.1",
"@waku/message-hash": "^0.1.19",
"@waku/proto": "^0.0.10",
"@waku/utils": "^0.0.23",
"chai": "^5.1.2"
},
"engines": {
"node": ">=20"
}
},
"node_modules/@waku/sds/node_modules/@libp2p/interface": {
"version": "2.7.0",
"resolved": "https://registry.npmjs.org/@libp2p/interface/-/interface-2.7.0.tgz",
"integrity": "sha512-lWmfIGzbSaw//yoEWWJh8dXNDGSCwUyXwC7P1Q6jCFWNoEtCaB1pvwOGBtri7Db/aNFZryMzN5covoq5ulldnA==",
"dependencies": {
"@multiformats/multiaddr": "^12.3.3",
"it-pushable": "^3.2.3",
"it-stream-types": "^2.0.2",
"multiformats": "^13.3.1",
"progress-events": "^1.0.1",
"uint8arraylist": "^2.4.8"
}
},
"node_modules/@waku/sds/node_modules/@waku/interfaces": {
"version": "0.0.30",
"resolved": "https://registry.npmjs.org/@waku/interfaces/-/interfaces-0.0.30.tgz",
"integrity": "sha512-2cR8+u0CePmUFBB4vVL1zw403Rki5hK+7rKQH0WikDT4SD4lJTdMV4j3q3+YBfPTsMJrFCVFhLcqpeBADgavAw==",
"dependencies": {
"@waku/proto": "^0.0.10"
},
"engines": {
"node": ">=20"
}
},
"node_modules/@waku/sds/node_modules/@waku/message-hash": {
"version": "0.1.19",
"resolved": "https://registry.npmjs.org/@waku/message-hash/-/message-hash-0.1.19.tgz",
"integrity": "sha512-fl+qky3MQK8l3HTT5wq23NcdYFYNqVcUVwBblX9/IArcDlDNjEEdK68K3n8rFWxBBd2JAK0RxU7MMkLiK3vWUA==",
"dependencies": {
"@noble/hashes": "^1.3.2",
"@waku/utils": "0.0.23"
},
"engines": {
"node": ">=20"
}
},
"node_modules/@waku/sds/node_modules/@waku/proto": {
"version": "0.0.10",
"resolved": "https://registry.npmjs.org/@waku/proto/-/proto-0.0.10.tgz",
"integrity": "sha512-dgBOjwRtduZSHxmr2IqDfrzgDnog8f/qiseLV39W1WNDkVLqpNT7K2bPDPz5/e2e7EtVtTAzbGPZPakOswn5FQ==",
"dependencies": {
"protons-runtime": "^5.4.0"
},
"engines": {
"node": ">=20"
}
},
"node_modules/@waku/sds/node_modules/@waku/utils": {
"version": "0.0.23",
"resolved": "https://registry.npmjs.org/@waku/utils/-/utils-0.0.23.tgz",
"integrity": "sha512-8abBIAI7hq1kb5WVpv0o6CCW5Go3bwxo1xovKXfTZfdERwgV7/R6VcijKaUWOHF9SYIskyJuC98TFx/1HgrUBw==",
"dependencies": {
"@noble/hashes": "^1.3.2",
"@waku/interfaces": "0.0.30",
"chai": "^4.3.10",
"debug": "^4.3.4",
"uint8arrays": "^5.0.1"
},
"engines": {
"node": ">=20"
}
},
"node_modules/@waku/sds/node_modules/@waku/utils/node_modules/chai": {
"version": "4.5.0",
"resolved": "https://registry.npmjs.org/chai/-/chai-4.5.0.tgz",
"integrity": "sha512-RITGBfijLkBddZvnn8jdqoTypxvqbOLYQkGGxXzeFjVHvudaPw0HNFD9x928/eUwYWd2dPCugVqspGALTZZQKw==",
"dependencies": {
"assertion-error": "^1.1.0",
"check-error": "^1.0.3",
"deep-eql": "^4.1.3",
"get-func-name": "^2.0.2",
"loupe": "^2.3.6",
"pathval": "^1.1.1",
"type-detect": "^4.1.0"
},
"engines": {
"node": ">=4"
}
},
"node_modules/@waku/sds/node_modules/chai": {
"version": "5.2.0",
"resolved": "https://registry.npmjs.org/chai/-/chai-5.2.0.tgz",
"integrity": "sha512-mCuXncKXk5iCLhfhwTc0izo0gtEmpz5CtG2y8GiOINBlMVS6v8TMRc5TaLWKS6692m9+dVVfzgeVxR5UxWHTYw==",
"dependencies": {
"assertion-error": "^2.0.1",
"check-error": "^2.1.1",
"deep-eql": "^5.0.1",
"loupe": "^3.1.0",
"pathval": "^2.0.0"
},
"engines": {
"node": ">=12"
}
},
"node_modules/@waku/sds/node_modules/chai/node_modules/assertion-error": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/assertion-error/-/assertion-error-2.0.1.tgz",
"integrity": "sha512-Izi8RQcffqCeNVgFigKli1ssklIbpHnCYc6AknXGYoB6grJqyeby7jv12JUQgmTAnIDnbck1uxksT4dzN3PWBA==",
"engines": {
"node": ">=12"
}
},
"node_modules/@waku/sds/node_modules/chai/node_modules/check-error": {
"version": "2.1.1",
"resolved": "https://registry.npmjs.org/check-error/-/check-error-2.1.1.tgz",
"integrity": "sha512-OAlb+T7V4Op9OwdkjmguYRqncdlx5JiofwOAUkmTF+jNdHwzTaTs4sRAGpzLF3oOz5xAyDGrPgeIDFQmDOTiJw==",
"engines": {
"node": ">= 16"
}
},
"node_modules/@waku/sds/node_modules/chai/node_modules/deep-eql": {
"version": "5.0.2",
"resolved": "https://registry.npmjs.org/deep-eql/-/deep-eql-5.0.2.tgz",
"integrity": "sha512-h5k/5U50IJJFpzfL6nO9jaaumfjO/f2NjK/oYB2Djzm4p9L+3T9qWpZqZ2hAbLPuuYq9wrU08WQyBTL5GbPk5Q==",
"engines": {
"node": ">=6"
}
},
"node_modules/@waku/sds/node_modules/chai/node_modules/loupe": {
"version": "3.1.3",
"resolved": "https://registry.npmjs.org/loupe/-/loupe-3.1.3.tgz",
"integrity": "sha512-kkIp7XSkP78ZxJEsSxW3712C6teJVoeHHwgo9zJ380de7IYyJ2ISlxojcH2pC5OFLewESmnRi/+XCDIEEVyoug=="
},
"node_modules/@waku/sds/node_modules/chai/node_modules/pathval": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/pathval/-/pathval-2.0.0.tgz",
"integrity": "sha512-vE7JKRyES09KiunauX7nd2Q9/L7lhok4smP9RZTDeD4MVs72Dp2qNFVz39Nz5a0FVEW0BJR6C0DYrq6unoziZA==",
"engines": {
"node": ">= 14.16"
}
},
"node_modules/@waku/utils": {
"version": "0.0.22",
"resolved": "https://registry.npmjs.org/@waku/utils/-/utils-0.0.22.tgz",

View File

@ -43,6 +43,7 @@
"@radix-ui/react-tooltip": "^1.1.4",
"@tanstack/react-query": "^5.56.2",
"@waku/sdk": "^0.0.30",
"@waku/sds": "^0.0.3",
"class-variance-authority": "^0.7.1",
"clsx": "^2.1.1",
"cmdk": "^1.0.0",

View File

@ -6,6 +6,7 @@ import { Button } from '@/components/ui/button';
import { Badge } from '@/components/ui/badge';
import { ShieldCheck, LogOut, Terminal, Wifi, WifiOff, AlertTriangle, CheckCircle, Key, RefreshCw, CircleSlash } from 'lucide-react';
import { Tooltip, TooltipContent, TooltipTrigger } from '@/components/ui/tooltip';
import { SDSStatus } from './SDSStatus';
const Header = () => {
const {
@ -197,6 +198,8 @@ const Header = () => {
</TooltipContent>
</Tooltip>
<SDSStatus />
{!currentUser ? (
<Button
variant="outline"

View File

@ -0,0 +1,10 @@
import React from 'react';
import { Badge } from "@/components/ui/badge";
export const SDSStatus: React.FC = () => {
return (
<Badge variant="outline" className="text-xs">
SDS: Active (Votes)
</Badge>
);
};

View File

@ -10,6 +10,9 @@ import { CellCache } from "./types";
import { OpchanMessage } from "@/types";
import { EphemeralProtocolsManager } from "./lightpush_filter";
import { NETWORK_CONFIG } from "./constants";
import { MinimalSDSWrapper } from "./sds/minimal-sds";
import { SDSEnhancedMessage } from "./sds/types";
import { SDSDebug } from "./sds/debug";
export type HealthChangeCallback = (isReady: boolean) => void;
@ -59,6 +62,7 @@ class MessageManager {
private constructor(node: LightNode) {
this.node = node;
this.sds = new MinimalSDSWrapper();
this.ephemeralProtocolsManager = new EphemeralProtocolsManager(node);
this.storeManager = new StoreManager(node);
@ -166,13 +170,22 @@ class MessageManager {
}
public async sendMessage(message: OpchanMessage) {
await this.ephemeralProtocolsManager.sendMessage(message);
// Enhance vote messages with SDS metadata
const enhancedMessage = this.sds.enhanceMessage(message);
await this.ephemeralProtocolsManager.sendMessage(enhancedMessage);
//TODO: should we update the cache here? or just from store/filter?
this.updateCache(message);
this.updateCache(enhancedMessage);
}
public async subscribeToMessages(types: MessageType[] = [MessageType.CELL, MessageType.POST, MessageType.COMMENT, MessageType.VOTE, MessageType.MODERATE]) {
const { result, subscription } = await this.ephemeralProtocolsManager.subscribeToMessages(types);
const { result, subscription } = await this.ephemeralProtocolsManager.subscribeToMessages(types, (message) => {
// Process incoming messages with SDS
if ('sds' in message) {
this.sds.processIncomingMessage(message as SDSEnhancedMessage);
}
this.updateCache(message);
});
for (const message of result) {
this.updateCache(message);
@ -181,7 +194,7 @@ class MessageManager {
return { messages: result, subscription };
}
private updateCache(message: OpchanMessage) {
private updateCache(message: OpchanMessage | SDSEnhancedMessage) {
switch (message.type) {
case MessageType.CELL:
this.messageCache.cells[message.id] = message;
@ -195,7 +208,24 @@ class MessageManager {
case MessageType.VOTE: {
// For votes, we use a composite key of targetId + author to handle multiple votes from same user
const voteKey = `${message.targetId}:${message.author}`;
this.messageCache.votes[voteKey] = message;
const existingVote = this.messageCache.votes[voteKey];
// Use SDS causal ordering if available
if (existingVote && 'sds' in message) {
const enhancedMessage = message as SDSEnhancedMessage;
const existingEnhanced = existingVote as SDSEnhancedMessage;
// Only update if causally newer
if (this.sds.isCausallyNewer(enhancedMessage, existingEnhanced)) {
this.messageCache.votes[voteKey] = message;
console.log(`[SDS] Updated vote for ${voteKey} with causally newer message`);
} else {
console.log(`[SDS] Ignored older vote for ${voteKey}`);
}
} else {
// No existing vote or no SDS metadata, just update
this.messageCache.votes[voteKey] = message;
}
break;
}
case MessageType.MODERATE: {

View File

@ -18,7 +18,7 @@ export class EphemeralProtocolsManager {
return result;
}
public async subscribeToMessages(types: MessageType[]) {
public async subscribeToMessages(types: MessageType[], onMessage?: (message: OpchanMessage) => void) {
const result: (CellMessage | PostMessage | CommentMessage | VoteMessage | ModerateMessage)[] = [];
const subscription = await this.node.filter.subscribe(Object.values(decoders), async (message) => {
@ -27,6 +27,10 @@ export class EphemeralProtocolsManager {
const decodedMessage = decodeMessage(payload);
if (types.includes(decodedMessage.type)) {
result.push(decodedMessage);
// Call the callback if provided
if (onMessage) {
onMessage(decodedMessage);
}
}
});

21
src/lib/waku/sds/debug.ts Normal file
View File

@ -0,0 +1,21 @@
// Simple debug logger for SDS operations
export const SDSDebug = {
enabled: true,
log(message: string, data?: any) {
if (!this.enabled) return;
const timestamp = new Date().toISOString().split('T')[1].split('.')[0];
const prefix = `[SDS ${timestamp}]`;
if (data) {
console.log(`${prefix} ${message}`, data);
} else {
console.log(`${prefix} ${message}`);
}
},
logVote(action: string, voteKey: string, data?: any) {
this.log(`VOTE ${action} - ${voteKey}`, data);
}
};

View File

@ -0,0 +1,101 @@
import { createEncoder, createDecoder } from "@waku/sdk";
import { SDSEnhancedMessage, SDSChannelState } from "./types";
import { OpchanMessage } from "../../types";
// For now, use a single channel for all votes to test SDS
const VOTE_CHANNEL_ID = "opchan:votes:all";
const SDS_CONTENT_TOPIC = "/opchan/1/sds-votes/proto";
export class MinimalSDSWrapper {
private channelStates: Map<string, SDSChannelState> = new Map();
private encoder = createEncoder({ contentTopic: SDS_CONTENT_TOPIC });
private decoder = createDecoder(SDS_CONTENT_TOPIC);
constructor() {
// Initialize vote channel
this.channelStates.set(VOTE_CHANNEL_ID, {
channelId: VOTE_CHANNEL_ID,
lamportTimestamp: 0,
messageHistory: [],
lastSync: Date.now()
});
}
// Enhance a message with SDS metadata
enhanceMessage(message: OpchanMessage): SDSEnhancedMessage {
// Only enhance vote messages for minimal implementation
if (message.type !== 'vote') {
return message;
}
const state = this.channelStates.get(VOTE_CHANNEL_ID)!;
// Increment Lamport timestamp
state.lamportTimestamp++;
// Get last 3 message IDs for causal history
const causalHistory = state.messageHistory.slice(-3);
// Add current message to history
state.messageHistory.push(message.id);
if (state.messageHistory.length > 100) {
state.messageHistory = state.messageHistory.slice(-100);
}
return {
...message,
sds: {
channelId: VOTE_CHANNEL_ID,
lamportTimestamp: state.lamportTimestamp,
causalHistory
}
};
}
// Process incoming message with SDS metadata
processIncomingMessage(message: SDSEnhancedMessage): void {
if (!message.sds || message.type !== 'vote') {
return;
}
const state = this.channelStates.get(VOTE_CHANNEL_ID)!;
// Update Lamport timestamp (max of local and received + 1)
state.lamportTimestamp = Math.max(
state.lamportTimestamp,
message.sds.lamportTimestamp
) + 1;
// Add to message history if not already present
if (!state.messageHistory.includes(message.id)) {
state.messageHistory.push(message.id);
if (state.messageHistory.length > 100) {
state.messageHistory = state.messageHistory.slice(-100);
}
}
}
// Check if message A is causally newer than B
isCausallyNewer(a: SDSEnhancedMessage, b: SDSEnhancedMessage): boolean {
if (!a.sds || !b.sds) {
return a.timestamp > b.timestamp;
}
// First check Lamport timestamps
if (a.sds.lamportTimestamp !== b.sds.lamportTimestamp) {
return a.sds.lamportTimestamp > b.sds.lamportTimestamp;
}
// If equal, use message ID as tiebreaker
return a.id > b.id;
}
// Get encoder/decoder for SDS messages
getEncoder() {
return this.encoder;
}
getDecoder() {
return this.decoder;
}
}

19
src/lib/waku/sds/types.ts Normal file
View File

@ -0,0 +1,19 @@
import { OpchanMessage } from "../../types";
export interface SDSMetadata {
channelId: string;
lamportTimestamp: number;
causalHistory: string[];
bloomFilter?: Uint8Array;
}
export interface SDSEnhancedMessage extends OpchanMessage {
sds?: SDSMetadata;
}
export interface SDSChannelState {
channelId: string;
lamportTimestamp: number;
messageHistory: string[];
lastSync: number;
}