Merge pull request #2280 from waku-org/feat/sds-buffer-sweep

feat(sds): adds logic to sweep incoming and outgoing buffers
This commit is contained in:
Arseniy Klempner 2025-03-04 07:01:23 -08:00 committed by GitHub
commit 5b3a256b4c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 223 additions and 1 deletions

View File

@ -330,4 +330,145 @@ describe("MessageChannel", function () {
});
});
});
describe("Sweeping incoming buffer", () => {
beforeEach(() => {
channelA = new MessageChannel(channelId);
channelB = new MessageChannel(channelId);
});
it("should detect messages with missing dependencies", () => {
const causalHistorySize = (channelA as any).causalHistorySize;
messagesA.forEach((m) => {
channelA.sendMessage(utf8ToBytes(m), callback);
});
channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
channelB.receiveMessage(message);
return true;
});
const incomingBuffer = (channelB as any).incomingBuffer as Message[];
expect(incomingBuffer.length).to.equal(1);
expect(incomingBuffer[0].messageId).to.equal(
MessageChannel.getMessageId(utf8ToBytes(messagesB[0]))
);
const missingMessages = channelB.sweepIncomingBuffer();
expect(missingMessages.length).to.equal(causalHistorySize);
expect(missingMessages[0]).to.equal(
MessageChannel.getMessageId(utf8ToBytes(messagesA[0]))
);
});
it("should deliver messages after dependencies are met", () => {
const causalHistorySize = (channelA as any).causalHistorySize;
const sentMessages = new Array<Message>();
messagesA.forEach((m) => {
channelA.sendMessage(utf8ToBytes(m), (message) => {
sentMessages.push(message);
return true;
});
});
channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
channelB.receiveMessage(message);
return true;
});
const missingMessages = channelB.sweepIncomingBuffer();
expect(missingMessages.length).to.equal(causalHistorySize);
expect(missingMessages[0]).to.equal(
MessageChannel.getMessageId(utf8ToBytes(messagesA[0]))
);
let incomingBuffer = (channelB as any).incomingBuffer as Message[];
expect(incomingBuffer.length).to.equal(1);
sentMessages.forEach((m) => {
channelB.receiveMessage(m);
});
const missingMessages2 = channelB.sweepIncomingBuffer();
expect(missingMessages2.length).to.equal(0);
incomingBuffer = (channelB as any).incomingBuffer as Message[];
expect(incomingBuffer.length).to.equal(0);
});
it("should remove messages without delivering if timeout is exceeded", async () => {
const causalHistorySize = (channelA as any).causalHistorySize;
// Create a channel with very very short timeout
const channelC: MessageChannel = new MessageChannel(
channelId,
causalHistorySize,
true,
10
);
messagesA.forEach((m) => {
channelA.sendMessage(utf8ToBytes(m), callback);
});
channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
channelC.receiveMessage(message);
return true;
});
const missingMessages = channelC.sweepIncomingBuffer();
expect(missingMessages.length).to.equal(causalHistorySize);
let incomingBuffer = (channelC as any).incomingBuffer as Message[];
expect(incomingBuffer.length).to.equal(1);
await new Promise((resolve) => setTimeout(resolve, 20));
channelC.sweepIncomingBuffer();
incomingBuffer = (channelC as any).incomingBuffer as Message[];
expect(incomingBuffer.length).to.equal(0);
});
});
describe("Sweeping outgoing buffer", () => {
beforeEach(() => {
channelA = new MessageChannel(channelId);
channelB = new MessageChannel(channelId);
});
it("should partition messages based on acknowledgement status", () => {
const unacknowledgedMessages: Message[] = [];
messagesA.forEach((m) => {
channelA.sendMessage(utf8ToBytes(m), (message) => {
unacknowledgedMessages.push(message);
channelB.receiveMessage(message);
return true;
});
});
let { unacknowledged, possiblyAcknowledged } =
channelA.sweepOutgoingBuffer();
expect(unacknowledged.length).to.equal(messagesA.length);
expect(possiblyAcknowledged.length).to.equal(0);
// Make sure messages sent by channel A are not in causal history
const causalHistorySize = (channelA as any).causalHistorySize;
messagesB.slice(0, causalHistorySize).forEach((m) => {
channelB.sendMessage(utf8ToBytes(m), callback);
});
channelB.sendMessage(
utf8ToBytes(messagesB[causalHistorySize]),
(message) => {
channelA.receiveMessage(message);
return true;
}
);
// All messages that were previously unacknowledged should now be possibly acknowledged
// since they were included in one of the bloom filters sent from channel B
({ unacknowledged, possiblyAcknowledged } =
channelA.sweepOutgoingBuffer());
expect(unacknowledged.length).to.equal(0);
expect(possiblyAcknowledged.length).to.equal(messagesA.length);
});
});
});

View File

@ -13,6 +13,7 @@ export const DEFAULT_BLOOM_FILTER_OPTIONS = {
};
const DEFAULT_CAUSAL_HISTORY_SIZE = 2;
const DEFAULT_RECEIVED_MESSAGE_TIMEOUT = 1000 * 60 * 5; // 5 minutes
export class MessageChannel {
private lamportTimestamp: number;
@ -24,10 +25,13 @@ export class MessageChannel {
private channelId: ChannelId;
private causalHistorySize: number;
private acknowledgementCount: number;
private timeReceived: Map<string, number>;
public constructor(
channelId: ChannelId,
causalHistorySize: number = DEFAULT_CAUSAL_HISTORY_SIZE
causalHistorySize: number = DEFAULT_CAUSAL_HISTORY_SIZE,
private receivedMessageTimeoutEnabled: boolean = false,
private receivedMessageTimeout: number = DEFAULT_RECEIVED_MESSAGE_TIMEOUT
) {
this.channelId = channelId;
this.lamportTimestamp = 0;
@ -38,6 +42,7 @@ export class MessageChannel {
this.messageIdLog = [];
this.causalHistorySize = causalHistorySize;
this.acknowledgementCount = this.getAcknowledgementCount();
this.timeReceived = new Map();
}
public static getMessageId(payload: Uint8Array): string {
@ -117,11 +122,87 @@ export class MessageChannel {
);
if (!dependenciesMet) {
this.incomingBuffer.push(message);
this.timeReceived.set(message.messageId, Date.now());
} else {
this.deliverMessage(message);
}
}
// https://rfc.vac.dev/vac/raw/sds/#periodic-incoming-buffer-sweep
public sweepIncomingBuffer(): string[] {
const { buffer, missing } = this.incomingBuffer.reduce<{
buffer: Message[];
missing: string[];
}>(
({ buffer, missing }, message) => {
// Check each message for missing dependencies
const missingDependencies = message.causalHistory.filter(
(messageId) =>
!this.messageIdLog.some(
({ messageId: logMessageId }) => logMessageId === messageId
)
);
if (missingDependencies.length === 0) {
// Any message with no missing dependencies is delivered
// and removed from the buffer (implicitly by not adding it to the new incoming buffer)
this.deliverMessage(message);
return { buffer, missing };
}
// Optionally, if a message has not been received after a predetermined amount of time,
// it is marked as irretrievably lost (implicitly by removing it from the buffer without delivery)
if (this.receivedMessageTimeoutEnabled) {
const timeReceived = this.timeReceived.get(message.messageId);
if (
timeReceived &&
Date.now() - timeReceived > this.receivedMessageTimeout
) {
return { buffer, missing };
}
}
// Any message with missing dependencies stays in the buffer
// and the missing message IDs are returned for processing.
return {
buffer: buffer.concat(message),
missing: missing.concat(missingDependencies)
};
},
{ buffer: new Array<Message>(), missing: new Array<string>() }
);
// Update the incoming buffer to only include messages with no missing dependencies
this.incomingBuffer = buffer;
return missing;
}
// https://rfc.vac.dev/vac/raw/sds/#periodic-outgoing-buffer-sweep
public sweepOutgoingBuffer(): {
unacknowledged: Message[];
possiblyAcknowledged: Message[];
} {
// Partition all messages in the outgoing buffer into unacknowledged and possibly acknowledged messages
return this.outgoingBuffer.reduce<{
unacknowledged: Message[];
possiblyAcknowledged: Message[];
}>(
({ unacknowledged, possiblyAcknowledged }, message) => {
if (this.acknowledgements.has(message.messageId)) {
return {
unacknowledged,
possiblyAcknowledged: possiblyAcknowledged.concat(message)
};
}
return {
unacknowledged: unacknowledged.concat(message),
possiblyAcknowledged
};
},
{
unacknowledged: new Array<Message>(),
possiblyAcknowledged: new Array<Message>()
}
);
}
// See https://rfc.vac.dev/vac/raw/sds/#deliver-message
private deliverMessage(message: Message): void {
const messageLamportTimestamp = message.lamportTimestamp ?? 0;