Merge pull request #2299 from waku-org/feat/sds-message-history

feat(sds): add retrieval hint to causal history
This commit is contained in:
Arseniy Klempner 2025-04-22 08:05:41 -07:00 committed by GitHub
commit 4da382d594
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 165 additions and 64 deletions

1
package-lock.json generated
View File

@ -42806,6 +42806,7 @@
"version": "0.0.2",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@libp2p/interface": "2.7.0",
"@noble/hashes": "^1.7.1",
"@waku/message-hash": "^0.1.18",
"@waku/proto": "^0.0.9",

View File

@ -7,11 +7,81 @@
import { type Codec, decodeMessage, type DecodeOptions, encodeMessage, MaxLengthError, message } from 'protons-runtime'
import type { Uint8ArrayList } from 'uint8arraylist'
export interface HistoryEntry {
messageId: string
retrievalHint?: Uint8Array
}
export namespace HistoryEntry {
let _codec: Codec<HistoryEntry>
export const codec = (): Codec<HistoryEntry> => {
if (_codec == null) {
_codec = message<HistoryEntry>((obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork()
}
if ((obj.messageId != null && obj.messageId !== '')) {
w.uint32(10)
w.string(obj.messageId)
}
if (obj.retrievalHint != null) {
w.uint32(18)
w.bytes(obj.retrievalHint)
}
if (opts.lengthDelimited !== false) {
w.ldelim()
}
}, (reader, length, opts = {}) => {
const obj: any = {
messageId: ''
}
const end = length == null ? reader.len : reader.pos + length
while (reader.pos < end) {
const tag = reader.uint32()
switch (tag >>> 3) {
case 1: {
obj.messageId = reader.string()
break
}
case 2: {
obj.retrievalHint = reader.bytes()
break
}
default: {
reader.skipType(tag & 7)
break
}
}
}
return obj
})
}
return _codec
}
export const encode = (obj: Partial<HistoryEntry>): Uint8Array => {
return encodeMessage(obj, HistoryEntry.codec())
}
export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions<HistoryEntry>): HistoryEntry => {
return decodeMessage(buf, HistoryEntry.codec(), opts)
}
}
export interface SdsMessage {
messageId: string
channelId: string
lamportTimestamp?: number
causalHistory: string[]
causalHistory: HistoryEntry[]
bloomFilter?: Uint8Array
content?: Uint8Array
}
@ -44,7 +114,7 @@ export namespace SdsMessage {
if (obj.causalHistory != null) {
for (const value of obj.causalHistory) {
w.uint32(90)
w.string(value)
HistoryEntry.codec().encode(value, w)
}
}
@ -91,7 +161,9 @@ export namespace SdsMessage {
throw new MaxLengthError('Decode error - map field "causalHistory" had too many elements')
}
obj.causalHistory.push(reader.string())
obj.causalHistory.push(HistoryEntry.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.causalHistory$
}))
break
}
case 12: {

View File

@ -1,11 +1,16 @@
syntax = "proto3";
message HistoryEntry {
string message_id = 1; // Unique identifier of the SDS message, as defined in `Message`
optional bytes retrieval_hint = 2; // Optional information to help remote parties retrieve this SDS message; For example, A Waku deterministic message hash or routing payload hash
}
message SdsMessage {
// 1 Reserved for sender/participant id
string message_id = 2; // Unique identifier of the message
string channel_id = 3; // Identifier of the channel to which the message belongs
optional int32 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel
repeated string causal_history = 11; // List of preceding message IDs that this message causally depends on. Generally 2 or 3 message IDs are included.
repeated HistoryEntry causal_history = 11; // List of preceding message IDs that this message causally depends on. Generally 2 or 3 message IDs are included.
optional bytes bloom_filter = 12; // Bloom filter representing received message IDs in channel
optional bytes content = 20; // Actual content of the message
}

View File

@ -59,7 +59,7 @@
"node": ">=20"
},
"dependencies": {
"@libp2p/interface": "^2.7.0",
"@libp2p/interface": "2.7.0",
"@noble/hashes": "^1.7.1",
"@waku/message-hash": "^0.1.18",
"@waku/proto": "^0.0.9",

View File

@ -4,14 +4,15 @@ import { expect } from "chai";
import { DefaultBloomFilter } from "./bloom.js";
import {
DEFAULT_BLOOM_FILTER_OPTIONS,
HistoryEntry,
Message,
MessageChannel,
MessageChannelEvent
} from "./sds.js";
const channelId = "test-channel";
const callback = (_message: Message): Promise<boolean> => {
return Promise.resolve(true);
const callback = (_message: Message): Promise<{ success: boolean }> => {
return Promise.resolve({ success: true });
};
const getBloomFilter = (channel: MessageChannel): DefaultBloomFilter => {
@ -62,15 +63,16 @@ describe("MessageChannel", function () {
const expectedTimestamp = (channelA as any).lamportTimestamp + 1;
const messageId = MessageChannel.getMessageId(new Uint8Array());
await channelA.sendMessage(new Uint8Array(), callback);
const messageIdLog = (channelA as any).messageIdLog as {
const messageIdLog = (channelA as any).localHistory as {
timestamp: number;
messageId: string;
historyEntry: HistoryEntry;
}[];
expect(messageIdLog.length).to.equal(1);
expect(
messageIdLog.some(
(log) =>
log.timestamp === expectedTimestamp && log.messageId === messageId
log.timestamp === expectedTimestamp &&
log.historyEntry.messageId === messageId
)
).to.equal(true);
});
@ -100,12 +102,15 @@ describe("MessageChannel", function () {
// Causal history should only contain the last N messages as defined by causalHistorySize
const causalHistory = outgoingBuffer[outgoingBuffer.length - 1]
.causalHistory as string[];
.causalHistory as HistoryEntry[];
expect(causalHistory.length).to.equal(causalHistorySize);
const expectedCausalHistory = messages
.slice(-causalHistorySize - 1, -1)
.map((message) => MessageChannel.getMessageId(utf8ToBytes(message)));
.map((message) => ({
messageId: MessageChannel.getMessageId(utf8ToBytes(message)),
retrievalHint: undefined
}));
expect(causalHistory).to.deep.equal(expectedCausalHistory);
});
});
@ -120,7 +125,7 @@ describe("MessageChannel", function () {
const timestampBefore = (channelA as any).lamportTimestamp;
await channelB.sendMessage(new Uint8Array(), (message) => {
channelA.receiveMessage(message);
return Promise.resolve(true);
return Promise.resolve({ success: true });
});
const timestampAfter = (channelA as any).lamportTimestamp;
expect(timestampAfter).to.equal(timestampBefore + 1);
@ -133,7 +138,7 @@ describe("MessageChannel", function () {
for (const m of messagesB) {
await channelB.sendMessage(utf8ToBytes(m), (message) => {
channelA.receiveMessage(message);
return Promise.resolve(true);
return Promise.resolve({ success: true });
});
}
const timestampAfter = (channelA as any).lamportTimestamp;
@ -147,7 +152,7 @@ describe("MessageChannel", function () {
timestamp++;
channelB.receiveMessage(message);
expect((channelB as any).lamportTimestamp).to.equal(timestamp);
return Promise.resolve(true);
return Promise.resolve({ success: true });
});
}
@ -156,7 +161,7 @@ describe("MessageChannel", function () {
timestamp++;
channelA.receiveMessage(message);
expect((channelA as any).lamportTimestamp).to.equal(timestamp);
return Promise.resolve(true);
return Promise.resolve({ success: true });
});
}
@ -173,7 +178,7 @@ describe("MessageChannel", function () {
channelB.receiveMessage(message);
const bloomFilter = getBloomFilter(channelB);
expect(bloomFilter.lookup(message.messageId)).to.equal(true);
return Promise.resolve(true);
return Promise.resolve({ success: true });
});
}
});
@ -189,7 +194,7 @@ describe("MessageChannel", function () {
await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
receivedMessage = message;
channelB.receiveMessage(message);
return Promise.resolve(true);
return Promise.resolve({ success: true });
});
const incomingBuffer = (channelB as any).incomingBuffer as Message[];
@ -201,12 +206,15 @@ describe("MessageChannel", function () {
expect(timestampAfter).to.equal(timestampBefore);
// Message should not be in local history
const localHistory = (channelB as any).messageIdLog as {
const localHistory = (channelB as any).localHistory as {
timestamp: number;
messageId: string;
historyEntry: HistoryEntry;
}[];
expect(
localHistory.some((m) => m.messageId === receivedMessage!.messageId)
localHistory.some(
({ historyEntry: { messageId } }) =>
messageId === receivedMessage!.messageId
)
).to.equal(false);
});
});
@ -221,14 +229,14 @@ describe("MessageChannel", function () {
for (const m of messagesA) {
await channelA.sendMessage(utf8ToBytes(m), (message) => {
channelB.receiveMessage(message);
return Promise.resolve(true);
return Promise.resolve({ success: true });
});
}
let notInHistory: Message | null = null;
await channelA.sendMessage(utf8ToBytes("not-in-history"), (message) => {
notInHistory = message;
return Promise.resolve(true);
return Promise.resolve({ success: true });
});
expect((channelA as any).outgoingBuffer.length).to.equal(
@ -237,7 +245,7 @@ describe("MessageChannel", function () {
await channelB.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
channelA.receiveMessage(message);
return Promise.resolve(true);
return Promise.resolve({ success: true });
});
// Since messagesA are in causal history of channel B's message
@ -262,7 +270,7 @@ describe("MessageChannel", function () {
for (const m of messages) {
await channelA.sendMessage(utf8ToBytes(m), (message) => {
channelB.receiveMessage(message);
return Promise.resolve(true);
return Promise.resolve({ success: true });
});
}
@ -276,7 +284,7 @@ describe("MessageChannel", function () {
utf8ToBytes(messagesB[messagesB.length - 1]),
(message) => {
channelA.receiveMessage(message);
return Promise.resolve(true);
return Promise.resolve({ success: true });
}
);
@ -310,7 +318,7 @@ describe("MessageChannel", function () {
// Send messages until acknowledgement count is reached
await channelB.sendMessage(utf8ToBytes(`x-${i}`), (message) => {
channelA.receiveMessage(message);
return Promise.resolve(true);
return Promise.resolve({ success: true });
});
}
@ -344,9 +352,9 @@ describe("MessageChannel", function () {
await channelA.sendMessage(utf8ToBytes(m), callback);
}
await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
await channelA.sendMessage(utf8ToBytes(messagesB[0]), async (message) => {
channelB.receiveMessage(message);
return Promise.resolve(true);
return Promise.resolve({ success: true });
});
const incomingBuffer = (channelB as any).incomingBuffer as Message[];
@ -357,7 +365,7 @@ describe("MessageChannel", function () {
const missingMessages = channelB.sweepIncomingBuffer();
expect(missingMessages.length).to.equal(causalHistorySize);
expect(missingMessages[0]).to.equal(
expect(missingMessages[0].messageId).to.equal(
MessageChannel.getMessageId(utf8ToBytes(messagesA[0]))
);
});
@ -368,18 +376,18 @@ describe("MessageChannel", function () {
for (const m of messagesA) {
await channelA.sendMessage(utf8ToBytes(m), (message) => {
sentMessages.push(message);
return Promise.resolve(true);
return Promise.resolve({ success: true });
});
}
await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
channelB.receiveMessage(message);
return Promise.resolve(true);
return Promise.resolve({ success: true });
});
const missingMessages = channelB.sweepIncomingBuffer();
expect(missingMessages.length).to.equal(causalHistorySize);
expect(missingMessages[0]).to.equal(
expect(missingMessages[0].messageId).to.equal(
MessageChannel.getMessageId(utf8ToBytes(messagesA[0]))
);
@ -411,7 +419,7 @@ describe("MessageChannel", function () {
await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
channelC.receiveMessage(message);
return Promise.resolve(true);
return Promise.resolve({ success: true });
});
const missingMessages = channelC.sweepIncomingBuffer();
@ -439,7 +447,7 @@ describe("MessageChannel", function () {
await channelA.sendMessage(utf8ToBytes(m), (message) => {
unacknowledgedMessages.push(message);
channelB.receiveMessage(message);
return Promise.resolve(true);
return Promise.resolve({ success: true });
});
}
@ -458,7 +466,7 @@ describe("MessageChannel", function () {
utf8ToBytes(messagesB[causalHistorySize]),
(message) => {
channelA.receiveMessage(message);
return Promise.resolve(true);
return Promise.resolve({ success: true });
}
);
@ -495,7 +503,7 @@ describe("MessageChannel", function () {
bloomFilter.lookup(MessageChannel.getMessageId(new Uint8Array()))
).to.equal(false);
const localLog = (channelA as any).messageIdLog as {
const localLog = (channelA as any).localHistory as {
timestamp: number;
messageId: string;
}[];
@ -514,7 +522,7 @@ describe("MessageChannel", function () {
expect(timestampAfter).to.equal(expectedTimestamp);
expect(timestampAfter).to.be.greaterThan(timestampBefore);
const localLog = (channelB as any).messageIdLog as {
const localLog = (channelB as any).localHistory as {
timestamp: number;
messageId: string;
}[];
@ -530,7 +538,7 @@ describe("MessageChannel", function () {
for (const m of messagesA) {
await channelA.sendMessage(utf8ToBytes(m), (message) => {
channelB.receiveMessage(message);
return Promise.resolve(true);
return Promise.resolve({ success: true });
});
}

View File

@ -13,6 +13,7 @@ type MessageChannelEvents = {
};
export type Message = proto_sds_message.SdsMessage;
export type HistoryEntry = proto_sds_message.HistoryEntry;
export type ChannelId = string;
export const DEFAULT_BLOOM_FILTER_OPTIONS = {
@ -36,7 +37,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
private outgoingBuffer: Message[];
private acknowledgements: Map<string, number>;
private incomingBuffer: Message[];
private messageIdLog: { timestamp: number; messageId: string }[];
private localHistory: { timestamp: number; historyEntry: HistoryEntry }[];
private channelId: ChannelId;
private causalHistorySize: number;
private acknowledgementCount: number;
@ -56,7 +57,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
this.outgoingBuffer = [];
this.acknowledgements = new Map();
this.incomingBuffer = [];
this.messageIdLog = [];
this.localHistory = [];
this.causalHistorySize =
options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE;
this.acknowledgementCount = this.getAcknowledgementCount();
@ -90,7 +91,10 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
*/
public async sendMessage(
payload: Uint8Array,
callback?: (message: Message) => Promise<boolean>
callback?: (message: Message) => Promise<{
success: boolean;
retrievalHint?: Uint8Array;
}>
): Promise<void> {
this.lamportTimestamp++;
@ -100,9 +104,9 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
messageId,
channelId: this.channelId,
lamportTimestamp: this.lamportTimestamp,
causalHistory: this.messageIdLog
causalHistory: this.localHistory
.slice(-this.causalHistorySize)
.map(({ messageId }) => messageId),
.map(({ historyEntry }) => historyEntry),
bloomFilter: this.filter.toBytes(),
content: payload
};
@ -110,10 +114,16 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
this.outgoingBuffer.push(message);
if (callback) {
const success = await callback(message);
const { success, retrievalHint } = await callback(message);
if (success) {
this.filter.insert(messageId);
this.messageIdLog.push({ timestamp: this.lamportTimestamp, messageId });
this.localHistory.push({
timestamp: this.lamportTimestamp,
historyEntry: {
messageId,
retrievalHint
}
});
}
}
}
@ -175,9 +185,10 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
this.filter.insert(message.messageId);
}
// verify causal history
const dependenciesMet = message.causalHistory.every((messageId) =>
this.messageIdLog.some(
({ messageId: logMessageId }) => logMessageId === messageId
const dependenciesMet = message.causalHistory.every((historyEntry) =>
this.localHistory.some(
({ historyEntry: { messageId } }) =>
messageId === historyEntry.messageId
)
);
if (!dependenciesMet) {
@ -189,17 +200,18 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
}
// https://rfc.vac.dev/vac/raw/sds/#periodic-incoming-buffer-sweep
public sweepIncomingBuffer(): string[] {
public sweepIncomingBuffer(): HistoryEntry[] {
const { buffer, missing } = this.incomingBuffer.reduce<{
buffer: Message[];
missing: string[];
missing: HistoryEntry[];
}>(
({ buffer, missing }, message) => {
// Check each message for missing dependencies
const missingDependencies = message.causalHistory.filter(
(messageId) =>
!this.messageIdLog.some(
({ messageId: logMessageId }) => logMessageId === messageId
(messageHistoryEntry) =>
!this.localHistory.some(
({ historyEntry: { messageId } }) =>
messageId === messageHistoryEntry.messageId
)
);
if (missingDependencies.length === 0) {
@ -227,7 +239,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
missing: missing.concat(missingDependencies)
};
},
{ buffer: new Array<Message>(), missing: new Array<string>() }
{ buffer: new Array<Message>(), missing: new Array<HistoryEntry>() }
);
// Update the incoming buffer to only include messages with no missing dependencies
this.incomingBuffer = buffer;
@ -284,9 +296,9 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
messageId: MessageChannel.getMessageId(emptyMessage),
channelId: this.channelId,
lamportTimestamp: this.lamportTimestamp,
causalHistory: this.messageIdLog
causalHistory: this.localHistory
.slice(-this.causalHistorySize)
.map(({ messageId }) => messageId),
.map(({ historyEntry }) => historyEntry),
bloomFilter: this.filter.toBytes(),
content: emptyMessage
};
@ -298,7 +310,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
}
// See https://rfc.vac.dev/vac/raw/sds/#deliver-message
private deliverMessage(message: Message): void {
private deliverMessage(message: Message, retrievalHint?: Uint8Array): void {
this.notifyDeliveredMessage(message.messageId);
const messageLamportTimestamp = message.lamportTimestamp ?? 0;
@ -321,15 +333,18 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
// If one or more message IDs with the same Lamport timestamp already exists,
// the participant MUST follow the Resolve Conflicts procedure.
// https://rfc.vac.dev/vac/raw/sds/#resolve-conflicts
this.messageIdLog.push({
this.localHistory.push({
timestamp: messageLamportTimestamp,
messageId: message.messageId
historyEntry: {
messageId: message.messageId,
retrievalHint
}
});
this.messageIdLog.sort((a, b) => {
this.localHistory.sort((a, b) => {
if (a.timestamp !== b.timestamp) {
return a.timestamp - b.timestamp;
}
return a.messageId.localeCompare(b.messageId);
return a.historyEntry.messageId.localeCompare(b.historyEntry.messageId);
});
}
@ -338,9 +353,9 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
// See https://rfc.vac.dev/vac/raw/sds/#review-ack-status
private reviewAckStatus(receivedMessage: Message): void {
// the participant MUST mark all messages in the received causal_history as acknowledged.
receivedMessage.causalHistory.forEach((messageId) => {
receivedMessage.causalHistory.forEach(({ messageId }) => {
this.outgoingBuffer = this.outgoingBuffer.filter(
(msg) => msg.messageId !== messageId
({ messageId: outgoingMessageId }) => outgoingMessageId !== messageId
);
this.acknowledgements.delete(messageId);
if (!this.filter.lookup(messageId)) {