Merge pull request #2261 from waku-org/feat/sds-participant-state

feat(sds): add message channel with buffers and send/receive logic
This commit is contained in:
Arseniy Klempner 2025-02-21 15:23:32 -08:00 committed by GitHub
commit 4cd1eea05a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 604 additions and 28 deletions

View File

@ -64,6 +64,7 @@
"kdfparams",
"keccak",
"keypair",
"lamport",
"lastpub",
"libauth",
"libp",

5
package-lock.json generated
View File

@ -6937,7 +6937,6 @@
"version": "1.7.1",
"resolved": "https://registry.npmjs.org/@noble/hashes/-/hashes-1.7.1.tgz",
"integrity": "sha512-B8XBPsn4vT/KJAGqDzbwztd+6Yte3P4V7iafm24bxgDe/mlRuK6xmWPuCNrKt2vDafZ8MfJLlchDG/vYafQEjQ==",
"license": "MIT",
"engines": {
"node": "^14.21.3 || >=16"
},
@ -42781,6 +42780,10 @@
"version": "0.0.1",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@noble/hashes": "^1.7.1",
"@waku/message-hash": "^0.1.17",
"@waku/proto": "^0.0.8",
"@waku/utils": "^0.0.21",
"chai": "^5.1.2"
},
"devDependencies": {

View File

@ -15,3 +15,5 @@ export * as proto_store from './generated/store_v3.js'
export * as proto_peer_exchange from "./generated/peer_exchange.js";
export * as proto_metadata from './generated/metadata.js'
export * as proto_sds_message from './generated/sds_message.js'

View File

@ -8,7 +8,7 @@ const config = {
'loader=ts-node/esm'
],
exit: true,
retries: 4
retries: 2
};
if (process.env.CI) {

View File

@ -59,6 +59,10 @@
"node": ">=20"
},
"dependencies": {
"@noble/hashes": "^1.7.1",
"@waku/message-hash": "^0.1.17",
"@waku/proto": "^0.0.8",
"@waku/utils": "^0.0.21",
"chai": "^5.1.2"
},
"devDependencies": {

View File

@ -1,7 +1,6 @@
import { expect } from "chai";
import { BloomFilter } from "./bloom.js";
import { hashN } from "./nim_hashn/nim_hashn.mjs";
import { BloomFilter, DefaultBloomFilter } from "./bloom.js";
const n = 10000;
const sampleChars =
@ -20,13 +19,10 @@ describe("BloomFilter", () => {
let testElements: string[];
beforeEach(() => {
bloomFilter = new BloomFilter(
{
capacity: n,
errorRate: 0.001
},
hashN
);
bloomFilter = new DefaultBloomFilter({
capacity: n,
errorRate: 0.001
});
testElements = new Array<string>(n);
@ -55,15 +51,12 @@ describe("BloomFilter", () => {
expect(bloomFilter.kHashes).to.equal(10);
expect(bloomFilter.totalBits / n).to.equal(15);
const bloomFilter2 = new BloomFilter(
{
capacity: 10000,
errorRate: 0.001,
kHashes: 4,
forceNBitsPerElem: 20
},
hashN
);
const bloomFilter2 = new DefaultBloomFilter({
capacity: 10000,
errorRate: 0.001,
kHashes: 4,
forceNBitsPerElem: 20
});
expect(bloomFilter2.kHashes).to.equal(4);
expect(bloomFilter2.totalBits).to.equal(200000);
});
@ -107,6 +100,17 @@ describe("BloomFilter", () => {
expect(bloomFilter.lookup(item)).to.equal(true);
}
});
it("should serialize and deserialize correctly", () => {
const serialized = bloomFilter.toBytes();
const deserialized = DefaultBloomFilter.fromBytes(
serialized,
bloomFilter.options
);
for (const item of testElements) {
expect(deserialized.lookup(item)).to.equal(true);
}
});
});
describe("BloomFilter with special patterns", () => {
@ -114,13 +118,10 @@ describe("BloomFilter with special patterns", () => {
const inserted: string[] = [];
beforeEach(() => {
bloomFilter = new BloomFilter(
{
capacity: n,
errorRate: 0.001
},
hashN
);
bloomFilter = new DefaultBloomFilter({
capacity: n,
errorRate: 0.001
});
});
it("should handle special patterns correctly", () => {

View File

@ -1,3 +1,4 @@
import { hashN } from "./nim_hashn/nim_hashn.mjs";
import { getMOverNBitsForK } from "./probabilities.js";
export interface BloomFilterOptions {
@ -30,11 +31,15 @@ export class BloomFilter {
public kHashes: number;
public errorRate: number;
public options: BloomFilterOptions;
private hashN: (item: string, n: number, maxValue: number) => number;
public constructor(
options: BloomFilterOptions,
hashN: (item: string, n: number, maxValue: number) => number
) {
this.options = options;
let nBitsPerElem: number;
let k = options.kHashes ?? 0;
const forceNBitsPerElem = options.forceNBitsPerElem ?? 0;
@ -103,4 +108,39 @@ export class BloomFilter {
}
return true;
}
public toBytes(): Uint8Array {
const buffer = new ArrayBuffer(this.data.length * 8);
const view = new DataView(buffer);
for (let i = 0; i < this.data.length; i++) {
view.setBigInt64(i * 8, this.data[i]);
}
return new Uint8Array(buffer);
}
public static fromBytes(
bytes: Uint8Array,
options: BloomFilterOptions,
hashN: (item: string, n: number, maxValue: number) => number
): BloomFilter {
const bloomFilter = new BloomFilter(options, hashN);
const view = new DataView(bytes.buffer);
for (let i = 0; i < bloomFilter.data.length; i++) {
bloomFilter.data[i] = view.getBigUint64(i * 8, false);
}
return bloomFilter;
}
}
export class DefaultBloomFilter extends BloomFilter {
public constructor(options: BloomFilterOptions) {
super(options, hashN);
}
public static fromBytes(
bytes: Uint8Array,
options: BloomFilterOptions
): DefaultBloomFilter {
return BloomFilter.fromBytes(bytes, options, hashN);
}
}

View File

@ -0,0 +1,333 @@
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import { DefaultBloomFilter } from "./bloom.js";
import {
DEFAULT_BLOOM_FILTER_OPTIONS,
Message,
MessageChannel
} from "./sds.js";
const channelId = "test-channel";
const callback = (_message: Message): boolean => {
return true;
};
const getBloomFilter = (channel: MessageChannel): DefaultBloomFilter => {
return (channel as any).filter as DefaultBloomFilter;
};
const messagesA = ["message-1", "message-2"];
const messagesB = [
"message-3",
"message-4",
"message-5",
"message-6",
"message-7"
];
describe("MessageChannel", function () {
this.timeout(5000);
let channelA: MessageChannel;
let channelB: MessageChannel;
describe("sending a message ", () => {
beforeEach(() => {
channelA = new MessageChannel(channelId);
});
it("should increase lamport timestamp", () => {
const timestampBefore = (channelA as any).lamportTimestamp;
channelA.sendMessage(new Uint8Array(), callback);
const timestampAfter = (channelA as any).lamportTimestamp;
expect(timestampAfter).to.equal(timestampBefore + 1);
});
it("should push the message to the outgoing buffer", () => {
const bufferLengthBefore = (channelA as any).outgoingBuffer.length;
channelA.sendMessage(new Uint8Array(), callback);
const bufferLengthAfter = (channelA as any).outgoingBuffer.length;
expect(bufferLengthAfter).to.equal(bufferLengthBefore + 1);
});
it("should insert message into bloom filter", () => {
const messageId = MessageChannel.getMessageId(new Uint8Array());
channelA.sendMessage(new Uint8Array(), callback);
const bloomFilter = getBloomFilter(channelA);
expect(bloomFilter.lookup(messageId)).to.equal(true);
});
it("should insert message id into causal history", () => {
const expectedTimestamp = (channelA as any).lamportTimestamp + 1;
const messageId = MessageChannel.getMessageId(new Uint8Array());
channelA.sendMessage(new Uint8Array(), callback);
const messageIdLog = (channelA as any).messageIdLog as {
timestamp: number;
messageId: string;
}[];
expect(messageIdLog.length).to.equal(1);
expect(
messageIdLog.some(
(log) =>
log.timestamp === expectedTimestamp && log.messageId === messageId
)
).to.equal(true);
});
it("should attach causal history and bloom filter to each message", () => {
const bloomFilter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS);
const causalHistorySize = (channelA as any).causalHistorySize;
const filterBytes = new Array<Uint8Array>();
const messages = new Array<string>(causalHistorySize + 5)
.fill("message")
.map((message, index) => `${message}-${index}`);
messages.forEach((message) => {
filterBytes.push(bloomFilter.toBytes());
channelA.sendMessage(utf8ToBytes(message), callback);
bloomFilter.insert(MessageChannel.getMessageId(utf8ToBytes(message)));
});
const outgoingBuffer = (channelA as any).outgoingBuffer as Message[];
expect(outgoingBuffer.length).to.equal(messages.length);
outgoingBuffer.forEach((message, index) => {
expect(message.content).to.deep.equal(utf8ToBytes(messages[index]));
// Correct bloom filter should be attached to each message
expect(message.bloomFilter).to.deep.equal(filterBytes[index]);
});
// Causal history should only contain the last N messages as defined by causalHistorySize
const causalHistory = outgoingBuffer[outgoingBuffer.length - 1]
.causalHistory as string[];
expect(causalHistory.length).to.equal(causalHistorySize);
const expectedCausalHistory = messages
.slice(-causalHistorySize - 1, -1)
.map((message) => MessageChannel.getMessageId(utf8ToBytes(message)));
expect(causalHistory).to.deep.equal(expectedCausalHistory);
});
});
describe("receiving a message", () => {
beforeEach(() => {
channelA = new MessageChannel(channelId);
channelB = new MessageChannel(channelId);
});
it("should increase lamport timestamp", () => {
const timestampBefore = (channelA as any).lamportTimestamp;
channelB.sendMessage(new Uint8Array(), (message) => {
channelA.receiveMessage(message);
return true;
});
const timestampAfter = (channelA as any).lamportTimestamp;
expect(timestampAfter).to.equal(timestampBefore + 1);
});
it("should update lamport timestamp if greater than current timestamp and dependencies are met", () => {
messagesA.forEach((m) => {
channelA.sendMessage(utf8ToBytes(m), callback);
});
messagesB.forEach((m) => {
channelB.sendMessage(utf8ToBytes(m), (message) => {
channelA.receiveMessage(message);
return true;
});
});
const timestampAfter = (channelA as any).lamportTimestamp;
expect(timestampAfter).to.equal(messagesB.length);
});
it("should maintain proper timestamps if all messages received", () => {
let timestamp = 0;
messagesA.forEach((m) => {
channelA.sendMessage(utf8ToBytes(m), (message) => {
timestamp++;
channelB.receiveMessage(message);
expect((channelB as any).lamportTimestamp).to.equal(timestamp);
return true;
});
});
messagesB.forEach((m) => {
channelB.sendMessage(utf8ToBytes(m), (message) => {
timestamp++;
channelA.receiveMessage(message);
expect((channelA as any).lamportTimestamp).to.equal(timestamp);
return true;
});
});
const expectedLength = messagesA.length + messagesB.length;
expect((channelA as any).lamportTimestamp).to.equal(expectedLength);
expect((channelA as any).lamportTimestamp).to.equal(
(channelB as any).lamportTimestamp
);
});
it("should add received messages to bloom filter", () => {
messagesA.forEach((m) => {
channelA.sendMessage(utf8ToBytes(m), (message) => {
channelB.receiveMessage(message);
const bloomFilter = getBloomFilter(channelB);
expect(bloomFilter.lookup(message.messageId)).to.equal(true);
return true;
});
});
});
it("should add to incoming buffer if dependencies are not met", () => {
messagesA.forEach((m) => {
channelA.sendMessage(utf8ToBytes(m), callback);
});
let receivedMessage: Message | null = null;
const timestampBefore = (channelB as any).lamportTimestamp;
channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
receivedMessage = message;
channelB.receiveMessage(message);
return true;
});
const incomingBuffer = (channelB as any).incomingBuffer as Message[];
expect(incomingBuffer.length).to.equal(1);
expect(incomingBuffer[0].messageId).to.equal(receivedMessage!.messageId);
// Since the dependency is not met, the lamport timestamp should not increase
const timestampAfter = (channelB as any).lamportTimestamp;
expect(timestampAfter).to.equal(timestampBefore);
// Message should not be in local history
const localHistory = (channelB as any).messageIdLog as {
timestamp: number;
messageId: string;
}[];
expect(
localHistory.some((m) => m.messageId === receivedMessage!.messageId)
).to.equal(false);
});
});
describe("reviewing ack status", () => {
beforeEach(() => {
channelA = new MessageChannel(channelId);
channelB = new MessageChannel(channelId);
});
it("should mark all messages in causal history as acknowledged", () => {
messagesA.forEach((m) => {
channelA.sendMessage(utf8ToBytes(m), (message) => {
channelB.receiveMessage(message);
return true;
});
});
let notInHistory: Message | null = null;
channelA.sendMessage(utf8ToBytes("not-in-history"), (message) => {
notInHistory = message;
return true;
});
expect((channelA as any).outgoingBuffer.length).to.equal(
messagesA.length + 1
);
channelB.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
channelA.receiveMessage(message);
return true;
});
// Since messagesA are in causal history of channel B's message
// they should be gone from channel A's outgoing buffer
// and notInHistory should still be in the outgoing buffer
const outgoingBuffer = (channelA as any).outgoingBuffer as Message[];
expect(outgoingBuffer.length).to.equal(1);
expect(outgoingBuffer[0].messageId).to.equal(notInHistory!.messageId);
});
it("should track probabilistic acknowledgements of messages received in bloom filter", () => {
const acknowledgementCount = (channelA as any).acknowledgementCount;
const causalHistorySize = (channelA as any).causalHistorySize;
const unacknowledgedMessages = [
"unacknowledged-message-1",
"unacknowledged-message-2"
];
const messages = [...messagesA, ...messagesB.slice(0, -1)];
// Send messages to be received by channel B
messages.forEach((m) => {
channelA.sendMessage(utf8ToBytes(m), (message) => {
channelB.receiveMessage(message);
return true;
});
});
// Send messages not received by channel B
unacknowledgedMessages.forEach((m) => {
channelA.sendMessage(utf8ToBytes(m), callback);
});
// Channel B sends a message to channel A
channelB.sendMessage(
utf8ToBytes(messagesB[messagesB.length - 1]),
(message) => {
channelA.receiveMessage(message);
return true;
}
);
const acknowledgements: ReadonlyMap<string, number> = (channelA as any)
.acknowledgements;
// Other than the message IDs which were included in causal history,
// the remaining messages sent by channel A should be considered possibly acknowledged
// for having been included in the bloom filter sent from channel B
const expectedAcknowledgementsSize = messages.length - causalHistorySize;
if (expectedAcknowledgementsSize <= 0) {
throw new Error("expectedAcknowledgementsSize must be greater than 0");
}
expect(acknowledgements.size).to.equal(expectedAcknowledgementsSize);
// Channel B only included the last N messages in causal history
messages.slice(0, -causalHistorySize).forEach((m) => {
expect(
acknowledgements.get(MessageChannel.getMessageId(utf8ToBytes(m)))
).to.equal(1);
});
// Messages that never reached channel B should not be acknowledged
unacknowledgedMessages.forEach((m) => {
expect(
acknowledgements.has(MessageChannel.getMessageId(utf8ToBytes(m)))
).to.equal(false);
});
// When channel C sends more messages, it will include all the same messages
// in the bloom filter as before, which should mark them as fully acknowledged in channel A
for (let i = 1; i < acknowledgementCount; i++) {
// Send messages until acknowledgement count is reached
channelB.sendMessage(utf8ToBytes(`x-${i}`), (message) => {
channelA.receiveMessage(message);
return true;
});
}
// No more partial acknowledgements should be in channel A
expect(acknowledgements.size).to.equal(0);
// Messages that were not acknowledged should still be in the outgoing buffer
expect((channelA as any).outgoingBuffer.length).to.equal(
unacknowledgedMessages.length
);
unacknowledgedMessages.forEach((m) => {
expect(
((channelA as any).outgoingBuffer as Message[]).some(
(message) =>
message.messageId === MessageChannel.getMessageId(utf8ToBytes(m))
)
).to.equal(true);
});
});
});
});

190
packages/sds/src/sds.ts Normal file
View File

@ -0,0 +1,190 @@
import { sha256 } from "@noble/hashes/sha256";
import { bytesToHex } from "@noble/hashes/utils";
import { proto_sds_message } from "@waku/proto";
import { DefaultBloomFilter } from "./bloom.js";
export type Message = proto_sds_message.SdsMessage;
export type ChannelId = string;
export const DEFAULT_BLOOM_FILTER_OPTIONS = {
capacity: 10000,
errorRate: 0.001
};
const DEFAULT_CAUSAL_HISTORY_SIZE = 2;
export class MessageChannel {
private lamportTimestamp: number;
private filter: DefaultBloomFilter;
private outgoingBuffer: Message[];
private acknowledgements: Map<string, number>;
private incomingBuffer: Message[];
private messageIdLog: { timestamp: number; messageId: string }[];
private channelId: ChannelId;
private causalHistorySize: number;
private acknowledgementCount: number;
public constructor(
channelId: ChannelId,
causalHistorySize: number = DEFAULT_CAUSAL_HISTORY_SIZE
) {
this.channelId = channelId;
this.lamportTimestamp = 0;
this.filter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS);
this.outgoingBuffer = [];
this.acknowledgements = new Map();
this.incomingBuffer = [];
this.messageIdLog = [];
this.causalHistorySize = causalHistorySize;
this.acknowledgementCount = this.getAcknowledgementCount();
}
public static getMessageId(payload: Uint8Array): string {
return bytesToHex(sha256(payload));
}
/**
* Send a message to the SDS channel.
*
* Increments the lamport timestamp, constructs a `Message` object
* with the given payload, and adds it to the outgoing buffer.
*
* If the callback is successful, the message is also added to
* the bloom filter and message history. In the context of
* Waku, this likely means the message was published via
* light push or relay.
*
* See https://rfc.vac.dev/vac/raw/sds/#send-message
*
* @param payload - The payload to send.
* @param callback - A callback function that returns a boolean indicating whether the message was sent successfully.
*/
public sendMessage(
payload: Uint8Array,
callback?: (message: Message) => boolean
): void {
this.lamportTimestamp++;
const messageId = MessageChannel.getMessageId(payload);
const message: Message = {
messageId,
channelId: this.channelId,
lamportTimestamp: this.lamportTimestamp,
causalHistory: this.messageIdLog
.slice(-this.causalHistorySize)
.map(({ messageId }) => messageId),
bloomFilter: this.filter.toBytes(),
content: payload
};
this.outgoingBuffer.push(message);
if (callback) {
const success = callback(message);
if (success) {
this.filter.insert(messageId);
this.messageIdLog.push({ timestamp: this.lamportTimestamp, messageId });
}
}
}
/**
* Process a received SDS message for this channel.
*
* Review the acknowledgement status of messages in the outgoing buffer
* by inspecting the received message's bloom filter and causal history.
* Add the received message to the bloom filter.
* If the local history contains every message in the received message's
* causal history, deliver the message. Otherwise, add the message to the
* incoming buffer.
*
* See https://rfc.vac.dev/vac/raw/sds/#receive-message
*
* @param message - The received SDS message.
*/
public receiveMessage(message: Message): void {
// review ack status
this.reviewAckStatus(message);
// add to bloom filter
this.filter.insert(message.messageId);
// verify causal history
const dependenciesMet = message.causalHistory.every((messageId) =>
this.messageIdLog.some(
({ messageId: logMessageId }) => logMessageId === messageId
)
);
if (!dependenciesMet) {
this.incomingBuffer.push(message);
} else {
this.deliverMessage(message);
}
}
// See https://rfc.vac.dev/vac/raw/sds/#deliver-message
private deliverMessage(message: Message): void {
const messageLamportTimestamp = message.lamportTimestamp ?? 0;
if (messageLamportTimestamp > this.lamportTimestamp) {
this.lamportTimestamp = messageLamportTimestamp;
}
// The participant MUST insert the message ID into its local log,
// based on Lamport timestamp.
// If one or more message IDs with the same Lamport timestamp already exists,
// the participant MUST follow the Resolve Conflicts procedure.
// https://rfc.vac.dev/vac/raw/sds/#resolve-conflicts
this.messageIdLog.push({
timestamp: messageLamportTimestamp,
messageId: message.messageId
});
this.messageIdLog.sort((a, b) => {
if (a.timestamp !== b.timestamp) {
return a.timestamp - b.timestamp;
}
return a.messageId.localeCompare(b.messageId);
});
}
// See https://rfc.vac.dev/vac/raw/sds/#review-ack-status
private reviewAckStatus(receivedMessage: Message): void {
// the participant MUST mark all messages in the received causal_history as acknowledged.
receivedMessage.causalHistory.forEach((messageId) => {
this.outgoingBuffer = this.outgoingBuffer.filter(
(msg) => msg.messageId !== messageId
);
this.acknowledgements.delete(messageId);
if (!this.filter.lookup(messageId)) {
this.filter.insert(messageId);
}
});
// the participant MUST mark all messages included in the bloom_filter as possibly acknowledged
if (!receivedMessage.bloomFilter) {
return;
}
const messageBloomFilter = DefaultBloomFilter.fromBytes(
receivedMessage.bloomFilter,
this.filter.options
);
this.outgoingBuffer = this.outgoingBuffer.filter((message) => {
if (!messageBloomFilter.lookup(message.messageId)) {
return true;
}
// If a message appears as possibly acknowledged in multiple received bloom filters,
// the participant MAY mark it as acknowledged based on probabilistic grounds,
// taking into account the bloom filter size and hash number.
const count = (this.acknowledgements.get(message.messageId) ?? 0) + 1;
if (count < this.acknowledgementCount) {
this.acknowledgements.set(message.messageId, count);
return true;
}
this.acknowledgements.delete(message.messageId);
return false;
});
}
// TODO: this should be determined based on the bloom filter parameters and number of hashes
private getAcknowledgementCount(): number {
return 2;
}
}

View File

@ -3,7 +3,9 @@
"compilerOptions": {
"outDir": "dist/",
"rootDir": "src",
"tsBuildInfoFile": "dist/.tsbuildinfo"
"tsBuildInfoFile": "dist/.tsbuildinfo",
"allowJs": true,
"moduleResolution": "node"
},
"include": ["src"],
"exclude": ["src/**/*.spec.ts", "src/test_utils"]