diff --git a/packages/sds/src/sds.spec.ts b/packages/sds/src/sds.spec.ts index 0a22563cd7..db69b14a04 100644 --- a/packages/sds/src/sds.spec.ts +++ b/packages/sds/src/sds.spec.ts @@ -330,4 +330,145 @@ describe("MessageChannel", function () { }); }); }); + + describe("Sweeping incoming buffer", () => { + beforeEach(() => { + channelA = new MessageChannel(channelId); + channelB = new MessageChannel(channelId); + }); + + it("should detect messages with missing dependencies", () => { + const causalHistorySize = (channelA as any).causalHistorySize; + messagesA.forEach((m) => { + channelA.sendMessage(utf8ToBytes(m), callback); + }); + + channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { + channelB.receiveMessage(message); + return true; + }); + + const incomingBuffer = (channelB as any).incomingBuffer as Message[]; + expect(incomingBuffer.length).to.equal(1); + expect(incomingBuffer[0].messageId).to.equal( + MessageChannel.getMessageId(utf8ToBytes(messagesB[0])) + ); + + const missingMessages = channelB.sweepIncomingBuffer(); + expect(missingMessages.length).to.equal(causalHistorySize); + expect(missingMessages[0]).to.equal( + MessageChannel.getMessageId(utf8ToBytes(messagesA[0])) + ); + }); + + it("should deliver messages after dependencies are met", () => { + const causalHistorySize = (channelA as any).causalHistorySize; + const sentMessages = new Array(); + messagesA.forEach((m) => { + channelA.sendMessage(utf8ToBytes(m), (message) => { + sentMessages.push(message); + return true; + }); + }); + + channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { + channelB.receiveMessage(message); + return true; + }); + + const missingMessages = channelB.sweepIncomingBuffer(); + expect(missingMessages.length).to.equal(causalHistorySize); + expect(missingMessages[0]).to.equal( + MessageChannel.getMessageId(utf8ToBytes(messagesA[0])) + ); + + let incomingBuffer = (channelB as any).incomingBuffer as Message[]; + expect(incomingBuffer.length).to.equal(1); + + sentMessages.forEach((m) => { + channelB.receiveMessage(m); + }); + + const missingMessages2 = channelB.sweepIncomingBuffer(); + expect(missingMessages2.length).to.equal(0); + + incomingBuffer = (channelB as any).incomingBuffer as Message[]; + expect(incomingBuffer.length).to.equal(0); + }); + + it("should remove messages without delivering if timeout is exceeded", async () => { + const causalHistorySize = (channelA as any).causalHistorySize; + // Create a channel with very very short timeout + const channelC: MessageChannel = new MessageChannel( + channelId, + causalHistorySize, + true, + 10 + ); + + messagesA.forEach((m) => { + channelA.sendMessage(utf8ToBytes(m), callback); + }); + + channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { + channelC.receiveMessage(message); + return true; + }); + + const missingMessages = channelC.sweepIncomingBuffer(); + expect(missingMessages.length).to.equal(causalHistorySize); + let incomingBuffer = (channelC as any).incomingBuffer as Message[]; + expect(incomingBuffer.length).to.equal(1); + + await new Promise((resolve) => setTimeout(resolve, 20)); + + channelC.sweepIncomingBuffer(); + incomingBuffer = (channelC as any).incomingBuffer as Message[]; + expect(incomingBuffer.length).to.equal(0); + }); + }); + + describe("Sweeping outgoing buffer", () => { + beforeEach(() => { + channelA = new MessageChannel(channelId); + channelB = new MessageChannel(channelId); + }); + + it("should partition messages based on acknowledgement status", () => { + const unacknowledgedMessages: Message[] = []; + messagesA.forEach((m) => { + channelA.sendMessage(utf8ToBytes(m), (message) => { + unacknowledgedMessages.push(message); + channelB.receiveMessage(message); + return true; + }); + }); + + let { unacknowledged, possiblyAcknowledged } = + channelA.sweepOutgoingBuffer(); + expect(unacknowledged.length).to.equal(messagesA.length); + expect(possiblyAcknowledged.length).to.equal(0); + + // Make sure messages sent by channel A are not in causal history + const causalHistorySize = (channelA as any).causalHistorySize; + messagesB.slice(0, causalHistorySize).forEach((m) => { + channelB.sendMessage(utf8ToBytes(m), callback); + }); + + channelB.sendMessage( + utf8ToBytes(messagesB[causalHistorySize]), + (message) => { + channelA.receiveMessage(message); + return true; + } + ); + + // All messages that were previously unacknowledged should now be possibly acknowledged + // since they were included in one of the bloom filters sent from channel B + ({ unacknowledged, possiblyAcknowledged } = + channelA.sweepOutgoingBuffer()); + expect(unacknowledged.length).to.equal(0); + expect(possiblyAcknowledged.length).to.equal(messagesA.length); + }); + }); }); diff --git a/packages/sds/src/sds.ts b/packages/sds/src/sds.ts index 1643c272b5..1e2f73662c 100644 --- a/packages/sds/src/sds.ts +++ b/packages/sds/src/sds.ts @@ -13,6 +13,7 @@ export const DEFAULT_BLOOM_FILTER_OPTIONS = { }; const DEFAULT_CAUSAL_HISTORY_SIZE = 2; +const DEFAULT_RECEIVED_MESSAGE_TIMEOUT = 1000 * 60 * 5; // 5 minutes export class MessageChannel { private lamportTimestamp: number; @@ -24,10 +25,13 @@ export class MessageChannel { private channelId: ChannelId; private causalHistorySize: number; private acknowledgementCount: number; + private timeReceived: Map; public constructor( channelId: ChannelId, - causalHistorySize: number = DEFAULT_CAUSAL_HISTORY_SIZE + causalHistorySize: number = DEFAULT_CAUSAL_HISTORY_SIZE, + private receivedMessageTimeoutEnabled: boolean = false, + private receivedMessageTimeout: number = DEFAULT_RECEIVED_MESSAGE_TIMEOUT ) { this.channelId = channelId; this.lamportTimestamp = 0; @@ -38,6 +42,7 @@ export class MessageChannel { this.messageIdLog = []; this.causalHistorySize = causalHistorySize; this.acknowledgementCount = this.getAcknowledgementCount(); + this.timeReceived = new Map(); } public static getMessageId(payload: Uint8Array): string { @@ -117,11 +122,87 @@ export class MessageChannel { ); if (!dependenciesMet) { this.incomingBuffer.push(message); + this.timeReceived.set(message.messageId, Date.now()); } else { this.deliverMessage(message); } } + // https://rfc.vac.dev/vac/raw/sds/#periodic-incoming-buffer-sweep + public sweepIncomingBuffer(): string[] { + const { buffer, missing } = this.incomingBuffer.reduce<{ + buffer: Message[]; + missing: string[]; + }>( + ({ buffer, missing }, message) => { + // Check each message for missing dependencies + const missingDependencies = message.causalHistory.filter( + (messageId) => + !this.messageIdLog.some( + ({ messageId: logMessageId }) => logMessageId === messageId + ) + ); + if (missingDependencies.length === 0) { + // Any message with no missing dependencies is delivered + // and removed from the buffer (implicitly by not adding it to the new incoming buffer) + this.deliverMessage(message); + return { buffer, missing }; + } + + // Optionally, if a message has not been received after a predetermined amount of time, + // it is marked as irretrievably lost (implicitly by removing it from the buffer without delivery) + if (this.receivedMessageTimeoutEnabled) { + const timeReceived = this.timeReceived.get(message.messageId); + if ( + timeReceived && + Date.now() - timeReceived > this.receivedMessageTimeout + ) { + return { buffer, missing }; + } + } + // Any message with missing dependencies stays in the buffer + // and the missing message IDs are returned for processing. + return { + buffer: buffer.concat(message), + missing: missing.concat(missingDependencies) + }; + }, + { buffer: new Array(), missing: new Array() } + ); + // Update the incoming buffer to only include messages with no missing dependencies + this.incomingBuffer = buffer; + return missing; + } + + // https://rfc.vac.dev/vac/raw/sds/#periodic-outgoing-buffer-sweep + public sweepOutgoingBuffer(): { + unacknowledged: Message[]; + possiblyAcknowledged: Message[]; + } { + // Partition all messages in the outgoing buffer into unacknowledged and possibly acknowledged messages + return this.outgoingBuffer.reduce<{ + unacknowledged: Message[]; + possiblyAcknowledged: Message[]; + }>( + ({ unacknowledged, possiblyAcknowledged }, message) => { + if (this.acknowledgements.has(message.messageId)) { + return { + unacknowledged, + possiblyAcknowledged: possiblyAcknowledged.concat(message) + }; + } + return { + unacknowledged: unacknowledged.concat(message), + possiblyAcknowledged + }; + }, + { + unacknowledged: new Array(), + possiblyAcknowledged: new Array() + } + ); + } + // See https://rfc.vac.dev/vac/raw/sds/#deliver-message private deliverMessage(message: Message): void { const messageLamportTimestamp = message.lamportTimestamp ?? 0;