From 914beb6531a84f8c11ca951721225d47f9e6c285 Mon Sep 17 00:00:00 2001 From: fryorcraken <110212804+fryorcraken@users.noreply.github.com> Date: Thu, 28 Aug 2025 15:57:23 +1000 Subject: [PATCH] fix: (sds) ensure incoming messages have their retrieval hint stored (#2604) * SDS: export `MessageId` * SDS: attach retrieval hints to incoming messages * sds: ensure items are ordered by timestamp * test: sds: avoid using "as any" as it bypasses type checks * test: filter: avoid using "as any" as it bypasses type checks * test: fix tests without introducing proxy --- packages/sdk/src/filter/filter.spec.ts | 2 +- packages/sds/src/index.ts | 3 +- .../src/message_channel/mem_local_history.ts | 21 +- .../sds/src/message_channel/message.spec.ts | 22 + .../message_channel/message_channel.spec.ts | 544 +++++++++++++++--- .../src/message_channel/message_channel.ts | 44 +- 6 files changed, 539 insertions(+), 97 deletions(-) diff --git a/packages/sdk/src/filter/filter.spec.ts b/packages/sdk/src/filter/filter.spec.ts index a0819b36a5..4eabec6969 100644 --- a/packages/sdk/src/filter/filter.spec.ts +++ b/packages/sdk/src/filter/filter.spec.ts @@ -91,7 +91,7 @@ describe("Filter SDK", () => { const message = createMockMessage(testContentTopic); const peerId = "peer1"; - await (filter as any).onIncomingMessage(testPubsubTopic, message, peerId); + await filter["onIncomingMessage"](testPubsubTopic, message, peerId); expect(subscriptionInvokeStub.calledOnce).to.be.true; expect(subscriptionInvokeStub.firstCall.args[0]).to.equal(message); diff --git a/packages/sds/src/index.ts b/packages/sds/src/index.ts index f5e4586fe0..3c1fb30cb1 100644 --- a/packages/sds/src/index.ts +++ b/packages/sds/src/index.ts @@ -14,7 +14,8 @@ export { type HistoryEntry, type ChannelId, type MessageChannelEvents, - type SenderId + type SenderId, + type MessageId } from "./message_channel/index.js"; export { BloomFilter }; diff --git a/packages/sds/src/message_channel/mem_local_history.ts b/packages/sds/src/message_channel/mem_local_history.ts index 2d2f22abae..8218bdf2f9 100644 --- a/packages/sds/src/message_channel/mem_local_history.ts +++ b/packages/sds/src/message_channel/mem_local_history.ts @@ -23,9 +23,15 @@ export class MemLocalHistory { this.validateMessage(item); } - // Add new items and ensure uniqueness by messageId using sortedUniqBy + // Add new items and sort by timestamp, ensuring uniqueness by messageId // The valueOf() method on ContentMessage enables native < operator sorting - this.items = _.sortedUniqBy([...this.items, ...items], "messageId"); + const combinedItems = [...this.items, ...items]; + + // Sort by timestamp (using valueOf() which creates timestamp_messageId string) + combinedItems.sort((a, b) => a.valueOf().localeCompare(b.valueOf())); + + // Remove duplicates by messageId while maintaining order + this.items = _.uniqBy(combinedItems, "messageId"); return this.items.length; } @@ -56,6 +62,17 @@ export class MemLocalHistory { return this.items.find(predicate, thisArg); } + public findIndex( + predicate: ( + value: ContentMessage, + index: number, + obj: ContentMessage[] + ) => unknown, + thisArg?: any + ): number { + return this.items.findIndex(predicate, thisArg); + } + private validateMessage(message: ContentMessage): void { if (!isContentMessage(message)) { throw new Error( diff --git a/packages/sds/src/message_channel/message.spec.ts b/packages/sds/src/message_channel/message.spec.ts index 9b4a2e0841..37dfd5db28 100644 --- a/packages/sds/src/message_channel/message.spec.ts +++ b/packages/sds/src/message_channel/message.spec.ts @@ -1,3 +1,4 @@ +import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; @@ -32,6 +33,27 @@ describe("Message serialization", () => { expect(decBloomFilter.lookup(messageId)).to.be.true; }); + + it("Retrieval Hint", () => { + const depMessageId = "dependency"; + const depRetrievalHint = utf8ToBytes("dependency"); + const message = new Message( + "123", + "my-channel", + "me", + [{ messageId: depMessageId, retrievalHint: depRetrievalHint }], + 0, + undefined, + undefined + ); + + const bytes = message.encode(); + const decMessage = Message.decode(bytes); + + expect(decMessage.causalHistory).to.deep.equal([ + { messageId: depMessageId, retrievalHint: depRetrievalHint } + ]); + }); }); describe("ContentMessage comparison with < operator", () => { diff --git a/packages/sds/src/message_channel/message_channel.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts index 624e9a97e8..6132f469d9 100644 --- a/packages/sds/src/message_channel/message_channel.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -23,7 +23,7 @@ const callback = (_message: Message): Promise<{ success: boolean }> => { }; const getBloomFilter = (channel: MessageChannel): DefaultBloomFilter => { - return (channel as any).filter as DefaultBloomFilter; + return channel["filter"] as DefaultBloomFilter; }; const messagesA = ["message-1", "message-2"]; @@ -54,9 +54,10 @@ const sendSyncMessage = async ( const receiveMessage = async ( channel: MessageChannel, - message: Message + message: Message, + retrievalHint?: Uint8Array ): Promise => { - channel.pushIncomingMessage(message); + channel.pushIncomingMessage(message, retrievalHint); await channel.processTasks(); }; @@ -71,16 +72,16 @@ describe("MessageChannel", function () { }); it("should increase lamport timestamp", async () => { - const timestampBefore = (channelA as any).lamportTimestamp; + const timestampBefore = channelA["lamportTimestamp"]; await sendMessage(channelA, utf8ToBytes("message"), callback); - const timestampAfter = (channelA as any).lamportTimestamp; + const timestampAfter = channelA["lamportTimestamp"]; expect(timestampAfter).to.equal(timestampBefore + 1); }); it("should push the message to the outgoing buffer", async () => { - const bufferLengthBefore = (channelA as any).outgoingBuffer.length; + const bufferLengthBefore = channelA["outgoingBuffer"].length; await sendMessage(channelA, utf8ToBytes("message"), callback); - const bufferLengthAfter = (channelA as any).outgoingBuffer.length; + const bufferLengthAfter = channelA["outgoingBuffer"].length; expect(bufferLengthAfter).to.equal(bufferLengthBefore + 1); }); @@ -94,10 +95,10 @@ describe("MessageChannel", function () { it("should insert message id into causal history", async () => { const payload = utf8ToBytes("message"); - const expectedTimestamp = (channelA as any).lamportTimestamp + 1; + const expectedTimestamp = channelA["lamportTimestamp"] + 1; const messageId = MessageChannel.getMessageId(payload); await sendMessage(channelA, payload, callback); - const messageIdLog = (channelA as any).localHistory as ILocalHistory; + const messageIdLog = channelA["localHistory"] as ILocalHistory; expect(messageIdLog.length).to.equal(1); expect( messageIdLog.some( @@ -108,9 +109,30 @@ describe("MessageChannel", function () { ).to.equal(true); }); + it("should add sent message to localHistory with retrievalHint", async () => { + const payload = utf8ToBytes("message with retrieval hint"); + const messageId = MessageChannel.getMessageId(payload); + const testRetrievalHint = utf8ToBytes("test-retrieval-hint-data"); + + await sendMessage(channelA, payload, async (_message) => { + // Simulate successful sending with retrievalHint + return { success: true, retrievalHint: testRetrievalHint }; + }); + + const localHistory = channelA["localHistory"] as ILocalHistory; + expect(localHistory.length).to.equal(1); + + // Find the message in local history + const historyEntry = localHistory.find( + (entry) => entry.messageId === messageId + ); + expect(historyEntry).to.exist; + expect(historyEntry!.retrievalHint).to.deep.equal(testRetrievalHint); + }); + it("should attach causal history and bloom filter to each message", async () => { const bloomFilter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS); - const causalHistorySize = (channelA as any).causalHistorySize; + const causalHistorySize = channelA["causalHistorySize"]; const filterBytes = new Array(); const messages = new Array(causalHistorySize + 5) .fill("message") @@ -122,7 +144,7 @@ describe("MessageChannel", function () { bloomFilter.insert(MessageChannel.getMessageId(utf8ToBytes(message))); } - const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; + const outgoingBuffer = channelA["outgoingBuffer"] as Message[]; expect(outgoingBuffer.length).to.equal(messages.length); outgoingBuffer.forEach((message, index) => { @@ -153,12 +175,12 @@ describe("MessageChannel", function () { }); it("should increase lamport timestamp", async () => { - const timestampBefore = (channelA as any).lamportTimestamp; + const timestampBefore = channelA["lamportTimestamp"]; await sendMessage(channelB, utf8ToBytes("message"), async (message) => { await receiveMessage(channelA, message); return { success: true }; }); - const timestampAfter = (channelA as any).lamportTimestamp; + const timestampAfter = channelA["lamportTimestamp"]; expect(timestampAfter).to.equal(timestampBefore + 1); }); @@ -172,7 +194,7 @@ describe("MessageChannel", function () { return { success: true }; }); } - const timestampAfter = (channelA as any).lamportTimestamp; + const timestampAfter = channelA["lamportTimestamp"]; expect(timestampAfter).to.equal(messagesB.length); }); @@ -182,7 +204,7 @@ describe("MessageChannel", function () { await sendMessage(channelA, utf8ToBytes(m), async (message) => { timestamp++; await receiveMessage(channelB, message); - expect((channelB as any).lamportTimestamp).to.equal(timestamp); + expect(channelB["lamportTimestamp"]).to.equal(timestamp); return { success: true }; }); } @@ -191,15 +213,15 @@ describe("MessageChannel", function () { await sendMessage(channelB, utf8ToBytes(m), async (message) => { timestamp++; await receiveMessage(channelA, message); - expect((channelA as any).lamportTimestamp).to.equal(timestamp); + expect(channelA["lamportTimestamp"]).to.equal(timestamp); return { success: true }; }); } const expectedLength = messagesA.length + messagesB.length; - expect((channelA as any).lamportTimestamp).to.equal(expectedLength); - expect((channelA as any).lamportTimestamp).to.equal( - (channelB as any).lamportTimestamp + expect(channelA["lamportTimestamp"]).to.equal(expectedLength); + expect(channelA["lamportTimestamp"]).to.equal( + channelB["lamportTimestamp"] ); }); @@ -220,7 +242,7 @@ describe("MessageChannel", function () { } let receivedMessage: Message | null = null; - const timestampBefore = (channelB as any).lamportTimestamp; + const timestampBefore = channelB["lamportTimestamp"]; await sendMessage( channelA, @@ -232,26 +254,180 @@ describe("MessageChannel", function () { } ); - const incomingBuffer = (channelB as any).incomingBuffer as Message[]; + const incomingBuffer = channelB["incomingBuffer"]; expect(incomingBuffer.length).to.equal(1); expect(incomingBuffer[0].messageId).to.equal(receivedMessage!.messageId); // Since the dependency is not met, the lamport timestamp should not increase - const timestampAfter = (channelB as any).lamportTimestamp; + const timestampAfter = channelB["lamportTimestamp"]; expect(timestampAfter).to.equal(timestampBefore); // Message should not be in local history - const localHistory = (channelB as any).localHistory as { - timestamp: number; - historyEntry: HistoryEntry; - }[]; + const localHistory = channelB["localHistory"]; expect( localHistory.some( - ({ historyEntry: { messageId } }) => - messageId === receivedMessage!.messageId + ({ messageId }) => messageId === receivedMessage!.messageId ) ).to.equal(false); }); + + it("should add received message to localHistory with retrievalHint", async () => { + const payload = utf8ToBytes("message with retrieval hint"); + const messageId = MessageChannel.getMessageId(payload); + const testRetrievalHint = utf8ToBytes("test-retrieval-hint-data"); + + await receiveMessage( + channelA, + new Message( + messageId, + channelA.channelId, + "not-alice", + [], + 1, + undefined, + payload, + testRetrievalHint + ), + testRetrievalHint + ); + + const localHistory = channelA["localHistory"] as ILocalHistory; + console.log("localHistory", localHistory); + expect(localHistory.length).to.equal(1); + + // Find the message in local history + const historyEntry = localHistory.find( + (entry) => entry.messageId === messageId + ); + console.log("history entry", historyEntry); + expect(historyEntry).to.exist; + expect(historyEntry!.retrievalHint).to.deep.equal(testRetrievalHint); + }); + + it("should maintain chronological order of messages in localHistory", async () => { + // Send messages with different timestamps (including own messages) + const message1Payload = utf8ToBytes("message 1"); + const message2Payload = utf8ToBytes("message 2"); + const message3Payload = utf8ToBytes("message 3"); + + const message1Id = MessageChannel.getMessageId(message1Payload); + const message2Id = MessageChannel.getMessageId(message2Payload); + const message3Id = MessageChannel.getMessageId(message3Payload); + + // Send own message first (timestamp will be 1) + await sendMessage(channelA, message1Payload, callback); + + // Receive a message from another sender with higher timestamp (3) + await receiveMessage( + channelA, + new ContentMessage( + message3Id, + channelA.channelId, + "bob", + [], + 3, // Higher timestamp + undefined, + message3Payload + ) + ); + + // Receive a message from another sender with middle timestamp (2) + await receiveMessage( + channelA, + new ContentMessage( + message2Id, + channelA.channelId, + "carol", + [], + 2, // Middle timestamp + undefined, + message2Payload + ) + ); + + const localHistory = channelA["localHistory"]; + expect(localHistory.length).to.equal(3); + + // Verify chronological order: message1 (ts=1), message2 (ts=2), message3 (ts=3) + + const first = localHistory.findIndex( + ({ messageId, lamportTimestamp }) => { + return messageId === message1Id && lamportTimestamp === 1; + } + ); + expect(first).to.eq(0); + + const second = localHistory.findIndex( + ({ messageId, lamportTimestamp }) => { + return messageId === message2Id && lamportTimestamp === 2; + } + ); + expect(second).to.eq(1); + + const third = localHistory.findIndex( + ({ messageId, lamportTimestamp }) => { + return messageId === message3Id && lamportTimestamp === 3; + } + ); + expect(third).to.eq(2); + }); + + it("should handle messages with same timestamp ordered by messageId", async () => { + const message1Payload = utf8ToBytes("message a"); + const message2Payload = utf8ToBytes("message b"); + + const message1Id = MessageChannel.getMessageId(message1Payload); + const message2Id = MessageChannel.getMessageId(message2Payload); + + // Receive messages with same timestamp but different message IDs + // The valueOf() method ensures ordering by messageId when timestamps are equal + await receiveMessage( + channelA, + new ContentMessage( + message2Id, // This will come second alphabetically by messageId + channelA.channelId, + "bob", + [], + 5, // Same timestamp + undefined, + message2Payload + ) + ); + + await receiveMessage( + channelA, + new ContentMessage( + message1Id, // This will come first alphabetically by messageId + channelA.channelId, + "carol", + [], + 5, // Same timestamp + undefined, + message1Payload + ) + ); + + const localHistory = channelA["localHistory"] as ILocalHistory; + expect(localHistory.length).to.equal(2); + + // When timestamps are equal, should be ordered by messageId lexicographically + // The valueOf() method creates "000000000000005_messageId" for comparison + const expectedOrder = [message1Id, message2Id].sort(); + + const first = localHistory.findIndex( + ({ messageId, lamportTimestamp }) => { + return messageId === expectedOrder[0] && lamportTimestamp == 5; + } + ); + expect(first).to.eq(0); + + const second = localHistory.findIndex( + ({ messageId, lamportTimestamp }) => { + return messageId === expectedOrder[1] && lamportTimestamp == 5; + } + ); + expect(second).to.eq(1); + }); }); describe("reviewing ack status", () => { @@ -283,9 +459,7 @@ describe("MessageChannel", function () { await channelA.processTasks(); await channelB.processTasks(); - expect((channelA as any).outgoingBuffer.length).to.equal( - messagesA.length + 1 - ); + expect(channelA["outgoingBuffer"].length).to.equal(messagesA.length + 1); await sendMessage( channelB, @@ -302,7 +476,7 @@ describe("MessageChannel", function () { // Since B received message-1, message-2, and not-in-history (3 messages), // and causalHistorySize is 3, it will only include the last 2 in its causal history // So message-1 won't be acknowledged, only message-2 and not-in-history - const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; + const outgoingBuffer = channelA["outgoingBuffer"] as Message[]; expect(outgoingBuffer.length).to.equal(1); // The remaining message should be message-1 (not acknowledged) expect(outgoingBuffer[0].messageId).to.equal( @@ -320,15 +494,12 @@ describe("MessageChannel", function () { await channelA.processTasks(); // All messages remain in the buffer - expect((channelA as any).outgoingBuffer.length).to.equal( - messagesA.length - ); + expect(channelA["outgoingBuffer"].length).to.equal(messagesA.length); }); it("should track probabilistic acknowledgements of messages received in bloom filter", async () => { - const possibleAcksThreshold = (channelA as any).possibleAcksThreshold; - - const causalHistorySize = (channelA as any).causalHistorySize; + const possibleAcksThreshold = channelA["possibleAcksThreshold"]; + const causalHistorySize = channelA["causalHistorySize"]; const unacknowledgedMessages = [ "unacknowledged-message-1", @@ -358,8 +529,8 @@ describe("MessageChannel", function () { } ); - const possibleAcks: ReadonlyMap = (channelA as any) - .possibleAcks; + const possibleAcks: ReadonlyMap = + channelA["possibleAcks"]; // Other than the message IDs which were included in causal history, // the remaining messages sent by channel A should be considered possibly acknowledged // for having been included in the bloom filter sent from channel B @@ -396,12 +567,12 @@ describe("MessageChannel", function () { expect(possibleAcks.size).to.equal(0); // Messages that were not acknowledged should still be in the outgoing buffer - expect((channelA as any).outgoingBuffer.length).to.equal( + expect(channelA["outgoingBuffer"].length).to.equal( unacknowledgedMessages.length ); unacknowledgedMessages.forEach((m) => { expect( - ((channelA as any).outgoingBuffer as Message[]).some( + (channelA["outgoingBuffer"] as Message[]).some( (message) => message.messageId === MessageChannel.getMessageId(utf8ToBytes(m)) ) @@ -417,9 +588,8 @@ describe("MessageChannel", function () { }); } - const possibleAcks: ReadonlyMap = (channelA as any) - .possibleAcks; - + const possibleAcks: ReadonlyMap = + channelA["possibleAcks"]; expect(possibleAcks.size).to.equal(0); }); @@ -478,7 +648,7 @@ describe("MessageChannel", function () { }); it("should detect messages with missing dependencies", async () => { - const causalHistorySize = (channelA as any).causalHistorySize; + const causalHistorySize = channelA["causalHistorySize"]; for (const m of messagesA) { await sendMessage(channelA, utf8ToBytes(m), callback); } @@ -492,7 +662,7 @@ describe("MessageChannel", function () { } ); - const incomingBuffer = (channelB as any).incomingBuffer as Message[]; + const incomingBuffer = channelB["incomingBuffer"]; expect(incomingBuffer.length).to.equal(1); expect(incomingBuffer[0].messageId).to.equal( MessageChannel.getMessageId(utf8ToBytes(messagesB[0])) @@ -506,7 +676,7 @@ describe("MessageChannel", function () { }); it("should deliver messages after dependencies are met", async () => { - const causalHistorySize = (channelA as any).causalHistorySize; + const causalHistorySize = channelA["causalHistorySize"]; const sentMessages = new Array(); // First, send messages from A but DON'T deliver them to B yet for (const m of messagesA) { @@ -537,7 +707,7 @@ describe("MessageChannel", function () { MessageChannel.getMessageId(utf8ToBytes(messagesA[0])) ); - let incomingBuffer = (channelB as any).incomingBuffer as Message[]; + let incomingBuffer = channelB["incomingBuffer"]; expect(incomingBuffer.length).to.equal(1); // Now deliver the missing dependencies @@ -550,7 +720,7 @@ describe("MessageChannel", function () { const missingMessages2 = channelB.sweepIncomingBuffer(); expect(missingMessages2.length).to.equal(0); - incomingBuffer = (channelB as any).incomingBuffer as Message[]; + incomingBuffer = channelB["incomingBuffer"]; expect(incomingBuffer.length).to.equal(0); }); @@ -594,8 +764,74 @@ describe("MessageChannel", function () { expect(irretrievablyLost).to.be.true; }); + it("should emit InMessageLost event with retrievalHint when timeout is exceeded", async () => { + const testRetrievalHint = utf8ToBytes("lost-message-hint"); + let lostMessages: HistoryEntry[] = []; + + // Create a channel with very short timeout + const channelC: MessageChannel = new MessageChannel(channelId, "carol", { + timeoutForLostMessagesMs: 10 + }); + + channelC.addEventListener(MessageChannelEvent.InMessageLost, (event) => { + lostMessages = event.detail; + }); + + // Send message from A with retrievalHint + await sendMessage( + channelA, + utf8ToBytes(messagesA[0]), + async (message) => { + message.retrievalHint = testRetrievalHint; + return { success: true, retrievalHint: testRetrievalHint }; + } + ); + + // Send another message from A + await sendMessage(channelA, utf8ToBytes(messagesA[1]), callback); + + // Send a message to C that depends on the previous messages + await sendMessage( + channelA, + utf8ToBytes(messagesB[0]), + async (message) => { + await receiveMessage(channelC, message); + return { success: true }; + } + ); + + // First sweep - should detect missing messages + channelC.sweepIncomingBuffer(); + + // Wait for timeout + await new Promise((resolve) => setTimeout(resolve, 20)); + + // Second sweep - should mark messages as lost + channelC.sweepIncomingBuffer(); + + expect(lostMessages.length).to.equal(2); + + // Verify retrievalHint is included in the lost message + const lostMessageWithHint = lostMessages.find( + (m) => + m.messageId === MessageChannel.getMessageId(utf8ToBytes(messagesA[0])) + ); + expect(lostMessageWithHint).to.exist; + expect(lostMessageWithHint!.retrievalHint).to.deep.equal( + testRetrievalHint + ); + + // Verify message without retrievalHint has undefined + const lostMessageWithoutHint = lostMessages.find( + (m) => + m.messageId === MessageChannel.getMessageId(utf8ToBytes(messagesA[1])) + ); + expect(lostMessageWithoutHint).to.exist; + expect(lostMessageWithoutHint!.retrievalHint).to.be.undefined; + }); + it("should remove messages without delivering if timeout is exceeded", async () => { - const causalHistorySize = (channelA as any).causalHistorySize; + const causalHistorySize = channelA["causalHistorySize"]; // Create a channel with very very short timeout const channelC: MessageChannel = new MessageChannel(channelId, "carol", { timeoutForLostMessagesMs: 10 @@ -616,15 +852,173 @@ describe("MessageChannel", function () { const missingMessages = channelC.sweepIncomingBuffer(); expect(missingMessages.length).to.equal(causalHistorySize); - let incomingBuffer = (channelC as any).incomingBuffer as Message[]; + let incomingBuffer = channelC["incomingBuffer"]; expect(incomingBuffer.length).to.equal(1); await new Promise((resolve) => setTimeout(resolve, 20)); channelC.sweepIncomingBuffer(); - incomingBuffer = (channelC as any).incomingBuffer as Message[]; + incomingBuffer = channelC["incomingBuffer"]; expect(incomingBuffer.length).to.equal(0); }); + + it("should return HistoryEntry with retrievalHint from sweepIncomingBuffer", async () => { + const testRetrievalHint = utf8ToBytes("test-retrieval-hint"); + + // Send message from A with a retrievalHint + await sendMessage( + channelA, + utf8ToBytes(messagesA[0]), + async (message) => { + message.retrievalHint = testRetrievalHint; + return { success: true, retrievalHint: testRetrievalHint }; + } + ); + + // Send another message from A that depends on the first one + await sendMessage( + channelA, + utf8ToBytes(messagesA[1]), + async (_message) => { + // Don't send to B yet - we want B to have missing dependencies + return { success: true }; + } + ); + + // Send a message from A to B that depends on previous messages + await sendMessage( + channelA, + utf8ToBytes(messagesB[0]), + async (message) => { + await receiveMessage(channelB, message); + return { success: true }; + } + ); + + // Sweep should detect missing dependencies and return them with retrievalHint + const missingMessages = channelB.sweepIncomingBuffer(); + expect(missingMessages.length).to.equal(2); + + // Find the first message in missing dependencies + const firstMissingMessage = missingMessages.find( + (m) => + m.messageId === MessageChannel.getMessageId(utf8ToBytes(messagesA[0])) + ); + expect(firstMissingMessage).to.exist; + expect(firstMissingMessage!.retrievalHint).to.deep.equal( + testRetrievalHint + ); + }); + + it("should emit InMessageMissing event with retrievalHint", async () => { + const testRetrievalHint1 = utf8ToBytes("hint-for-message-1"); + const testRetrievalHint2 = utf8ToBytes("hint-for-message-2"); + let eventReceived = false; + let emittedMissingMessages: HistoryEntry[] = []; + + // Listen for InMessageMissing event + channelB.addEventListener( + MessageChannelEvent.InMessageMissing, + (event) => { + eventReceived = true; + emittedMissingMessages = event.detail; + } + ); + + // Send messages from A with retrievalHints + await sendMessage( + channelA, + utf8ToBytes(messagesA[0]), + async (message) => { + message.retrievalHint = testRetrievalHint1; + return { success: true, retrievalHint: testRetrievalHint1 }; + } + ); + + await sendMessage( + channelA, + utf8ToBytes(messagesA[1]), + async (message) => { + message.retrievalHint = testRetrievalHint2; + return { success: true, retrievalHint: testRetrievalHint2 }; + } + ); + + // Send a message to B that depends on the previous messages + await sendMessage( + channelA, + utf8ToBytes(messagesB[0]), + async (message) => { + await receiveMessage(channelB, message); + return { success: true }; + } + ); + + // Sweep should trigger InMessageMissing event + channelB.sweepIncomingBuffer(); + + expect(eventReceived).to.be.true; + expect(emittedMissingMessages.length).to.equal(2); + + // Verify retrievalHints are included in the event + const firstMissing = emittedMissingMessages.find( + (m) => + m.messageId === MessageChannel.getMessageId(utf8ToBytes(messagesA[0])) + ); + const secondMissing = emittedMissingMessages.find( + (m) => + m.messageId === MessageChannel.getMessageId(utf8ToBytes(messagesA[1])) + ); + + expect(firstMissing).to.exist; + expect(firstMissing!.retrievalHint).to.deep.equal(testRetrievalHint1); + expect(secondMissing).to.exist; + expect(secondMissing!.retrievalHint).to.deep.equal(testRetrievalHint2); + }); + + it("should handle missing messages with undefined retrievalHint", async () => { + let emittedMissingMessages: HistoryEntry[] = []; + + channelB.addEventListener( + MessageChannelEvent.InMessageMissing, + (event) => { + emittedMissingMessages = event.detail; + } + ); + + // Send message from A without retrievalHint + await sendMessage( + channelA, + utf8ToBytes(messagesA[0]), + async (_message) => { + // Don't set retrievalHint + return { success: true }; + } + ); + + // Send a message to B that depends on the previous message + await sendMessage( + channelA, + utf8ToBytes(messagesB[0]), + async (message) => { + await receiveMessage(channelB, message); + return { success: true }; + } + ); + + // Sweep should handle missing message with undefined retrievalHint + const missingMessages = channelB.sweepIncomingBuffer(); + + expect(missingMessages.length).to.equal(1); + expect(missingMessages[0].messageId).to.equal( + MessageChannel.getMessageId(utf8ToBytes(messagesA[0])) + ); + expect(missingMessages[0].retrievalHint).to.be.undefined; + + // Event should also reflect undefined retrievalHint + expect(emittedMissingMessages.length).to.equal(1); + expect(emittedMissingMessages[0].retrievalHint).to.be.undefined; + }); }); describe("Sweeping outgoing buffer", () => { @@ -649,7 +1043,7 @@ describe("MessageChannel", function () { expect(possiblyAcknowledged.length).to.equal(0); // Make sure messages sent by channel A are not in causal history - const causalHistorySize = (channelA as any).causalHistorySize; + const causalHistorySize = channelA["causalHistorySize"]; for (const m of messagesB.slice(0, causalHistorySize)) { await sendMessage(channelB, utf8ToBytes(m), callback); } @@ -690,7 +1084,7 @@ describe("MessageChannel", function () { it("should not be added to outgoing buffer, bloom filter, or local log", async () => { await channelA.pushOutgoingSyncMessage(); - const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; + const outgoingBuffer = channelA["outgoingBuffer"] as Message[]; expect(outgoingBuffer.length).to.equal(0); const bloomFilter = getBloomFilter(channelA); @@ -698,26 +1092,20 @@ describe("MessageChannel", function () { bloomFilter.lookup(MessageChannel.getMessageId(new Uint8Array())) ).to.equal(false); - const localLog = (channelA as any).localHistory as { - timestamp: number; - messageId: MessageId; - }[]; + const localLog = channelA["localHistory"]; expect(localLog.length).to.equal(0); }); it("should not be delivered", async () => { - const timestampBefore = (channelB as any).lamportTimestamp; + const timestampBefore = channelB["lamportTimestamp"]; await channelA.pushOutgoingSyncMessage(async (message) => { await receiveMessage(channelB, message); return true; }); - const timestampAfter = (channelB as any).lamportTimestamp; + const timestampAfter = channelB["lamportTimestamp"]; expect(timestampAfter).to.equal(timestampBefore); - const localLog = (channelB as any).localHistory as { - timestamp: number; - messageId: MessageId; - }[]; + const localLog = channelB["localHistory"]; expect(localLog.length).to.equal(0); const bloomFilter = getBloomFilter(channelB); @@ -739,8 +1127,8 @@ describe("MessageChannel", function () { return true; }); - const causalHistorySize = (channelA as any).causalHistorySize; - const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; + const causalHistorySize = channelA["causalHistorySize"]; + const outgoingBuffer = channelA["outgoingBuffer"] as Message[]; expect(outgoingBuffer.length).to.equal( messagesA.length - causalHistorySize ); @@ -753,7 +1141,7 @@ describe("MessageChannel", function () { }); it("should be sent without a timestamp, causal history, or bloom filter", async () => { - const timestampBefore = (channelA as any).lamportTimestamp; + const timestampBefore = channelA["lamportTimestamp"]; await channelA.pushOutgoingEphemeralMessage( new Uint8Array(), async (message) => { @@ -764,10 +1152,10 @@ describe("MessageChannel", function () { } ); - const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; + const outgoingBuffer = channelA["outgoingBuffer"] as Message[]; expect(outgoingBuffer.length).to.equal(0); - const timestampAfter = (channelA as any).lamportTimestamp; + const timestampAfter = channelA["lamportTimestamp"]; expect(timestampAfter).to.equal(timestampBefore); }); @@ -775,9 +1163,9 @@ describe("MessageChannel", function () { const channelB = new MessageChannel(channelId, "bob"); // Track initial state - const localHistoryBefore = (channelB as any).localHistory.length; - const incomingBufferBefore = (channelB as any).incomingBuffer.length; - const timestampBefore = (channelB as any).lamportTimestamp; + const localHistoryBefore = channelB["localHistory"].length; + const incomingBufferBefore = channelB["incomingBuffer"].length; + const timestampBefore = channelB["lamportTimestamp"]; await channelA.pushOutgoingEphemeralMessage( utf8ToBytes(messagesA[0]), @@ -793,15 +1181,11 @@ describe("MessageChannel", function () { // Verify ephemeral message behavior: // 1. Not added to local history - expect((channelB as any).localHistory.length).to.equal( - localHistoryBefore - ); + expect(channelB["localHistory"].length).to.equal(localHistoryBefore); // 2. Not added to incoming buffer - expect((channelB as any).incomingBuffer.length).to.equal( - incomingBufferBefore - ); + expect(channelB["incomingBuffer"].length).to.equal(incomingBufferBefore); // 3. Doesn't update lamport timestamp - expect((channelB as any).lamportTimestamp).to.equal(timestampBefore); + expect(channelB["lamportTimestamp"]).to.equal(timestampBefore); }); }); }); diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index 332dfbed4d..a9cd980a71 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -50,7 +50,7 @@ export interface MessageChannelOptions { export type ILocalHistory = Pick< Array, - "some" | "push" | "slice" | "find" | "length" + "some" | "push" | "slice" | "find" | "length" | "findIndex" >; export class MessageChannel extends TypedEventEmitter { @@ -61,7 +61,7 @@ export class MessageChannel extends TypedEventEmitter { private outgoingBuffer: ContentMessage[]; private possibleAcks: Map; private incomingBuffer: Array; - private localHistory: ILocalHistory; + private readonly localHistory: ILocalHistory; private timeReceived: Map; private readonly causalHistorySize: number; private readonly possibleAcksThreshold: number; @@ -226,7 +226,7 @@ export class MessageChannel extends TypedEventEmitter { * proper dependency resolution and causal ordering. * * @param message - The message to receive and process - * + * @param retrievalHint - The retrieval hint for the message, provided by the transport layer * @example * ```typescript * const channel = new MessageChannel("chat-room"); @@ -238,7 +238,12 @@ export class MessageChannel extends TypedEventEmitter { * await channel.processTasks(); * ``` */ - public pushIncomingMessage(message: Message): void { + public pushIncomingMessage( + message: Message, + retrievalHint: Uint8Array | undefined + ): void { + message.retrievalHint = retrievalHint; + this.tasks.push({ command: Command.Receive, params: { @@ -282,7 +287,9 @@ export class MessageChannel extends TypedEventEmitter { this.senderId, message.messageId, "is missing dependencies", - missingDependencies.map((ch) => ch.messageId) + missingDependencies.map(({ messageId, retrievalHint }) => { + return { messageId, retrievalHint }; + }) ); // Optionally, if a message has not been received after a predetermined amount of time, @@ -395,7 +402,15 @@ export class MessageChannel extends TypedEventEmitter { } private _pushIncomingMessage(message: Message): void { - log.info(this.senderId, "incoming message", message.messageId); + if (message.channelId !== this.channelId) { + log.warn("dropping message on different channel", message.channelId); + return; + } + + log.info( + `${this.senderId} incoming message ${message.messageId}`, + `retrieval hint: ${bytesToHex(message.retrievalHint ?? new Uint8Array())}` + ); const isDuplicate = message.content && message.content.length > 0 && @@ -589,14 +604,10 @@ export class MessageChannel extends TypedEventEmitter { * Return true if the message was "delivered" * * @param message - * @param retrievalHint * @private */ // See https://rfc.vac.dev/vac/raw/sds/#deliver-message - private deliverMessage( - message: ContentMessage, - retrievalHint?: Uint8Array - ): boolean { + private deliverMessage(message: ContentMessage): boolean { if (!isContentMessage(message)) { // Messages with empty content are sync messages. // Messages with no timestamp are ephemeral messages. @@ -605,7 +616,12 @@ export class MessageChannel extends TypedEventEmitter { return false; } - log.info(this.senderId, "delivering message", message.messageId); + log.info( + this.senderId, + "delivering message", + message.messageId, + message.retrievalHint + ); if (message.lamportTimestamp > this.lamportTimestamp) { this.lamportTimestamp = message.lamportTimestamp; } @@ -620,7 +636,9 @@ export class MessageChannel extends TypedEventEmitter { return true; } - message.retrievalHint = retrievalHint; + if (!message.retrievalHint) { + log.warn("message delivered without a retrieval hint", message.messageId); + } this.localHistory.push(message); return true;