fix: (sds) ensure incoming messages have their retrieval hint stored (#2604)

* SDS: export `MessageId`

* SDS: attach retrieval hints to incoming messages

* sds: ensure items are ordered by timestamp

* test: sds: avoid using "as any" as it bypasses type checks

* test: filter: avoid using "as any" as it bypasses type checks

* test: fix tests without introducing proxy
This commit is contained in:
fryorcraken 2025-08-28 15:57:23 +10:00 committed by GitHub
parent 8542d04bf5
commit 914beb6531
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 539 additions and 97 deletions

View File

@ -91,7 +91,7 @@ describe("Filter SDK", () => {
const message = createMockMessage(testContentTopic); const message = createMockMessage(testContentTopic);
const peerId = "peer1"; const peerId = "peer1";
await (filter as any).onIncomingMessage(testPubsubTopic, message, peerId); await filter["onIncomingMessage"](testPubsubTopic, message, peerId);
expect(subscriptionInvokeStub.calledOnce).to.be.true; expect(subscriptionInvokeStub.calledOnce).to.be.true;
expect(subscriptionInvokeStub.firstCall.args[0]).to.equal(message); expect(subscriptionInvokeStub.firstCall.args[0]).to.equal(message);

View File

@ -14,7 +14,8 @@ export {
type HistoryEntry, type HistoryEntry,
type ChannelId, type ChannelId,
type MessageChannelEvents, type MessageChannelEvents,
type SenderId type SenderId,
type MessageId
} from "./message_channel/index.js"; } from "./message_channel/index.js";
export { BloomFilter }; export { BloomFilter };

View File

@ -23,9 +23,15 @@ export class MemLocalHistory {
this.validateMessage(item); this.validateMessage(item);
} }
// Add new items and ensure uniqueness by messageId using sortedUniqBy // Add new items and sort by timestamp, ensuring uniqueness by messageId
// The valueOf() method on ContentMessage enables native < operator sorting // The valueOf() method on ContentMessage enables native < operator sorting
this.items = _.sortedUniqBy([...this.items, ...items], "messageId"); const combinedItems = [...this.items, ...items];
// Sort by timestamp (using valueOf() which creates timestamp_messageId string)
combinedItems.sort((a, b) => a.valueOf().localeCompare(b.valueOf()));
// Remove duplicates by messageId while maintaining order
this.items = _.uniqBy(combinedItems, "messageId");
return this.items.length; return this.items.length;
} }
@ -56,6 +62,17 @@ export class MemLocalHistory {
return this.items.find(predicate, thisArg); return this.items.find(predicate, thisArg);
} }
public findIndex(
predicate: (
value: ContentMessage,
index: number,
obj: ContentMessage[]
) => unknown,
thisArg?: any
): number {
return this.items.findIndex(predicate, thisArg);
}
private validateMessage(message: ContentMessage): void { private validateMessage(message: ContentMessage): void {
if (!isContentMessage(message)) { if (!isContentMessage(message)) {
throw new Error( throw new Error(

View File

@ -1,3 +1,4 @@
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai"; import { expect } from "chai";
import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; import { DefaultBloomFilter } from "../bloom_filter/bloom.js";
@ -32,6 +33,27 @@ describe("Message serialization", () => {
expect(decBloomFilter.lookup(messageId)).to.be.true; expect(decBloomFilter.lookup(messageId)).to.be.true;
}); });
it("Retrieval Hint", () => {
const depMessageId = "dependency";
const depRetrievalHint = utf8ToBytes("dependency");
const message = new Message(
"123",
"my-channel",
"me",
[{ messageId: depMessageId, retrievalHint: depRetrievalHint }],
0,
undefined,
undefined
);
const bytes = message.encode();
const decMessage = Message.decode(bytes);
expect(decMessage.causalHistory).to.deep.equal([
{ messageId: depMessageId, retrievalHint: depRetrievalHint }
]);
});
}); });
describe("ContentMessage comparison with < operator", () => { describe("ContentMessage comparison with < operator", () => {

View File

@ -23,7 +23,7 @@ const callback = (_message: Message): Promise<{ success: boolean }> => {
}; };
const getBloomFilter = (channel: MessageChannel): DefaultBloomFilter => { const getBloomFilter = (channel: MessageChannel): DefaultBloomFilter => {
return (channel as any).filter as DefaultBloomFilter; return channel["filter"] as DefaultBloomFilter;
}; };
const messagesA = ["message-1", "message-2"]; const messagesA = ["message-1", "message-2"];
@ -54,9 +54,10 @@ const sendSyncMessage = async (
const receiveMessage = async ( const receiveMessage = async (
channel: MessageChannel, channel: MessageChannel,
message: Message message: Message,
retrievalHint?: Uint8Array
): Promise<void> => { ): Promise<void> => {
channel.pushIncomingMessage(message); channel.pushIncomingMessage(message, retrievalHint);
await channel.processTasks(); await channel.processTasks();
}; };
@ -71,16 +72,16 @@ describe("MessageChannel", function () {
}); });
it("should increase lamport timestamp", async () => { it("should increase lamport timestamp", async () => {
const timestampBefore = (channelA as any).lamportTimestamp; const timestampBefore = channelA["lamportTimestamp"];
await sendMessage(channelA, utf8ToBytes("message"), callback); await sendMessage(channelA, utf8ToBytes("message"), callback);
const timestampAfter = (channelA as any).lamportTimestamp; const timestampAfter = channelA["lamportTimestamp"];
expect(timestampAfter).to.equal(timestampBefore + 1); expect(timestampAfter).to.equal(timestampBefore + 1);
}); });
it("should push the message to the outgoing buffer", async () => { it("should push the message to the outgoing buffer", async () => {
const bufferLengthBefore = (channelA as any).outgoingBuffer.length; const bufferLengthBefore = channelA["outgoingBuffer"].length;
await sendMessage(channelA, utf8ToBytes("message"), callback); await sendMessage(channelA, utf8ToBytes("message"), callback);
const bufferLengthAfter = (channelA as any).outgoingBuffer.length; const bufferLengthAfter = channelA["outgoingBuffer"].length;
expect(bufferLengthAfter).to.equal(bufferLengthBefore + 1); expect(bufferLengthAfter).to.equal(bufferLengthBefore + 1);
}); });
@ -94,10 +95,10 @@ describe("MessageChannel", function () {
it("should insert message id into causal history", async () => { it("should insert message id into causal history", async () => {
const payload = utf8ToBytes("message"); const payload = utf8ToBytes("message");
const expectedTimestamp = (channelA as any).lamportTimestamp + 1; const expectedTimestamp = channelA["lamportTimestamp"] + 1;
const messageId = MessageChannel.getMessageId(payload); const messageId = MessageChannel.getMessageId(payload);
await sendMessage(channelA, payload, callback); await sendMessage(channelA, payload, callback);
const messageIdLog = (channelA as any).localHistory as ILocalHistory; const messageIdLog = channelA["localHistory"] as ILocalHistory;
expect(messageIdLog.length).to.equal(1); expect(messageIdLog.length).to.equal(1);
expect( expect(
messageIdLog.some( messageIdLog.some(
@ -108,9 +109,30 @@ describe("MessageChannel", function () {
).to.equal(true); ).to.equal(true);
}); });
it("should add sent message to localHistory with retrievalHint", async () => {
const payload = utf8ToBytes("message with retrieval hint");
const messageId = MessageChannel.getMessageId(payload);
const testRetrievalHint = utf8ToBytes("test-retrieval-hint-data");
await sendMessage(channelA, payload, async (_message) => {
// Simulate successful sending with retrievalHint
return { success: true, retrievalHint: testRetrievalHint };
});
const localHistory = channelA["localHistory"] as ILocalHistory;
expect(localHistory.length).to.equal(1);
// Find the message in local history
const historyEntry = localHistory.find(
(entry) => entry.messageId === messageId
);
expect(historyEntry).to.exist;
expect(historyEntry!.retrievalHint).to.deep.equal(testRetrievalHint);
});
it("should attach causal history and bloom filter to each message", async () => { it("should attach causal history and bloom filter to each message", async () => {
const bloomFilter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS); const bloomFilter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS);
const causalHistorySize = (channelA as any).causalHistorySize; const causalHistorySize = channelA["causalHistorySize"];
const filterBytes = new Array<Uint8Array>(); const filterBytes = new Array<Uint8Array>();
const messages = new Array<string>(causalHistorySize + 5) const messages = new Array<string>(causalHistorySize + 5)
.fill("message") .fill("message")
@ -122,7 +144,7 @@ describe("MessageChannel", function () {
bloomFilter.insert(MessageChannel.getMessageId(utf8ToBytes(message))); bloomFilter.insert(MessageChannel.getMessageId(utf8ToBytes(message)));
} }
const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; const outgoingBuffer = channelA["outgoingBuffer"] as Message[];
expect(outgoingBuffer.length).to.equal(messages.length); expect(outgoingBuffer.length).to.equal(messages.length);
outgoingBuffer.forEach((message, index) => { outgoingBuffer.forEach((message, index) => {
@ -153,12 +175,12 @@ describe("MessageChannel", function () {
}); });
it("should increase lamport timestamp", async () => { it("should increase lamport timestamp", async () => {
const timestampBefore = (channelA as any).lamportTimestamp; const timestampBefore = channelA["lamportTimestamp"];
await sendMessage(channelB, utf8ToBytes("message"), async (message) => { await sendMessage(channelB, utf8ToBytes("message"), async (message) => {
await receiveMessage(channelA, message); await receiveMessage(channelA, message);
return { success: true }; return { success: true };
}); });
const timestampAfter = (channelA as any).lamportTimestamp; const timestampAfter = channelA["lamportTimestamp"];
expect(timestampAfter).to.equal(timestampBefore + 1); expect(timestampAfter).to.equal(timestampBefore + 1);
}); });
@ -172,7 +194,7 @@ describe("MessageChannel", function () {
return { success: true }; return { success: true };
}); });
} }
const timestampAfter = (channelA as any).lamportTimestamp; const timestampAfter = channelA["lamportTimestamp"];
expect(timestampAfter).to.equal(messagesB.length); expect(timestampAfter).to.equal(messagesB.length);
}); });
@ -182,7 +204,7 @@ describe("MessageChannel", function () {
await sendMessage(channelA, utf8ToBytes(m), async (message) => { await sendMessage(channelA, utf8ToBytes(m), async (message) => {
timestamp++; timestamp++;
await receiveMessage(channelB, message); await receiveMessage(channelB, message);
expect((channelB as any).lamportTimestamp).to.equal(timestamp); expect(channelB["lamportTimestamp"]).to.equal(timestamp);
return { success: true }; return { success: true };
}); });
} }
@ -191,15 +213,15 @@ describe("MessageChannel", function () {
await sendMessage(channelB, utf8ToBytes(m), async (message) => { await sendMessage(channelB, utf8ToBytes(m), async (message) => {
timestamp++; timestamp++;
await receiveMessage(channelA, message); await receiveMessage(channelA, message);
expect((channelA as any).lamportTimestamp).to.equal(timestamp); expect(channelA["lamportTimestamp"]).to.equal(timestamp);
return { success: true }; return { success: true };
}); });
} }
const expectedLength = messagesA.length + messagesB.length; const expectedLength = messagesA.length + messagesB.length;
expect((channelA as any).lamportTimestamp).to.equal(expectedLength); expect(channelA["lamportTimestamp"]).to.equal(expectedLength);
expect((channelA as any).lamportTimestamp).to.equal( expect(channelA["lamportTimestamp"]).to.equal(
(channelB as any).lamportTimestamp channelB["lamportTimestamp"]
); );
}); });
@ -220,7 +242,7 @@ describe("MessageChannel", function () {
} }
let receivedMessage: Message | null = null; let receivedMessage: Message | null = null;
const timestampBefore = (channelB as any).lamportTimestamp; const timestampBefore = channelB["lamportTimestamp"];
await sendMessage( await sendMessage(
channelA, channelA,
@ -232,26 +254,180 @@ describe("MessageChannel", function () {
} }
); );
const incomingBuffer = (channelB as any).incomingBuffer as Message[]; const incomingBuffer = channelB["incomingBuffer"];
expect(incomingBuffer.length).to.equal(1); expect(incomingBuffer.length).to.equal(1);
expect(incomingBuffer[0].messageId).to.equal(receivedMessage!.messageId); expect(incomingBuffer[0].messageId).to.equal(receivedMessage!.messageId);
// Since the dependency is not met, the lamport timestamp should not increase // Since the dependency is not met, the lamport timestamp should not increase
const timestampAfter = (channelB as any).lamportTimestamp; const timestampAfter = channelB["lamportTimestamp"];
expect(timestampAfter).to.equal(timestampBefore); expect(timestampAfter).to.equal(timestampBefore);
// Message should not be in local history // Message should not be in local history
const localHistory = (channelB as any).localHistory as { const localHistory = channelB["localHistory"];
timestamp: number;
historyEntry: HistoryEntry;
}[];
expect( expect(
localHistory.some( localHistory.some(
({ historyEntry: { messageId } }) => ({ messageId }) => messageId === receivedMessage!.messageId
messageId === receivedMessage!.messageId
) )
).to.equal(false); ).to.equal(false);
}); });
it("should add received message to localHistory with retrievalHint", async () => {
const payload = utf8ToBytes("message with retrieval hint");
const messageId = MessageChannel.getMessageId(payload);
const testRetrievalHint = utf8ToBytes("test-retrieval-hint-data");
await receiveMessage(
channelA,
new Message(
messageId,
channelA.channelId,
"not-alice",
[],
1,
undefined,
payload,
testRetrievalHint
),
testRetrievalHint
);
const localHistory = channelA["localHistory"] as ILocalHistory;
console.log("localHistory", localHistory);
expect(localHistory.length).to.equal(1);
// Find the message in local history
const historyEntry = localHistory.find(
(entry) => entry.messageId === messageId
);
console.log("history entry", historyEntry);
expect(historyEntry).to.exist;
expect(historyEntry!.retrievalHint).to.deep.equal(testRetrievalHint);
});
it("should maintain chronological order of messages in localHistory", async () => {
// Send messages with different timestamps (including own messages)
const message1Payload = utf8ToBytes("message 1");
const message2Payload = utf8ToBytes("message 2");
const message3Payload = utf8ToBytes("message 3");
const message1Id = MessageChannel.getMessageId(message1Payload);
const message2Id = MessageChannel.getMessageId(message2Payload);
const message3Id = MessageChannel.getMessageId(message3Payload);
// Send own message first (timestamp will be 1)
await sendMessage(channelA, message1Payload, callback);
// Receive a message from another sender with higher timestamp (3)
await receiveMessage(
channelA,
new ContentMessage(
message3Id,
channelA.channelId,
"bob",
[],
3, // Higher timestamp
undefined,
message3Payload
)
);
// Receive a message from another sender with middle timestamp (2)
await receiveMessage(
channelA,
new ContentMessage(
message2Id,
channelA.channelId,
"carol",
[],
2, // Middle timestamp
undefined,
message2Payload
)
);
const localHistory = channelA["localHistory"];
expect(localHistory.length).to.equal(3);
// Verify chronological order: message1 (ts=1), message2 (ts=2), message3 (ts=3)
const first = localHistory.findIndex(
({ messageId, lamportTimestamp }) => {
return messageId === message1Id && lamportTimestamp === 1;
}
);
expect(first).to.eq(0);
const second = localHistory.findIndex(
({ messageId, lamportTimestamp }) => {
return messageId === message2Id && lamportTimestamp === 2;
}
);
expect(second).to.eq(1);
const third = localHistory.findIndex(
({ messageId, lamportTimestamp }) => {
return messageId === message3Id && lamportTimestamp === 3;
}
);
expect(third).to.eq(2);
});
it("should handle messages with same timestamp ordered by messageId", async () => {
const message1Payload = utf8ToBytes("message a");
const message2Payload = utf8ToBytes("message b");
const message1Id = MessageChannel.getMessageId(message1Payload);
const message2Id = MessageChannel.getMessageId(message2Payload);
// Receive messages with same timestamp but different message IDs
// The valueOf() method ensures ordering by messageId when timestamps are equal
await receiveMessage(
channelA,
new ContentMessage(
message2Id, // This will come second alphabetically by messageId
channelA.channelId,
"bob",
[],
5, // Same timestamp
undefined,
message2Payload
)
);
await receiveMessage(
channelA,
new ContentMessage(
message1Id, // This will come first alphabetically by messageId
channelA.channelId,
"carol",
[],
5, // Same timestamp
undefined,
message1Payload
)
);
const localHistory = channelA["localHistory"] as ILocalHistory;
expect(localHistory.length).to.equal(2);
// When timestamps are equal, should be ordered by messageId lexicographically
// The valueOf() method creates "000000000000005_messageId" for comparison
const expectedOrder = [message1Id, message2Id].sort();
const first = localHistory.findIndex(
({ messageId, lamportTimestamp }) => {
return messageId === expectedOrder[0] && lamportTimestamp == 5;
}
);
expect(first).to.eq(0);
const second = localHistory.findIndex(
({ messageId, lamportTimestamp }) => {
return messageId === expectedOrder[1] && lamportTimestamp == 5;
}
);
expect(second).to.eq(1);
});
}); });
describe("reviewing ack status", () => { describe("reviewing ack status", () => {
@ -283,9 +459,7 @@ describe("MessageChannel", function () {
await channelA.processTasks(); await channelA.processTasks();
await channelB.processTasks(); await channelB.processTasks();
expect((channelA as any).outgoingBuffer.length).to.equal( expect(channelA["outgoingBuffer"].length).to.equal(messagesA.length + 1);
messagesA.length + 1
);
await sendMessage( await sendMessage(
channelB, channelB,
@ -302,7 +476,7 @@ describe("MessageChannel", function () {
// Since B received message-1, message-2, and not-in-history (3 messages), // Since B received message-1, message-2, and not-in-history (3 messages),
// and causalHistorySize is 3, it will only include the last 2 in its causal history // and causalHistorySize is 3, it will only include the last 2 in its causal history
// So message-1 won't be acknowledged, only message-2 and not-in-history // So message-1 won't be acknowledged, only message-2 and not-in-history
const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; const outgoingBuffer = channelA["outgoingBuffer"] as Message[];
expect(outgoingBuffer.length).to.equal(1); expect(outgoingBuffer.length).to.equal(1);
// The remaining message should be message-1 (not acknowledged) // The remaining message should be message-1 (not acknowledged)
expect(outgoingBuffer[0].messageId).to.equal( expect(outgoingBuffer[0].messageId).to.equal(
@ -320,15 +494,12 @@ describe("MessageChannel", function () {
await channelA.processTasks(); await channelA.processTasks();
// All messages remain in the buffer // All messages remain in the buffer
expect((channelA as any).outgoingBuffer.length).to.equal( expect(channelA["outgoingBuffer"].length).to.equal(messagesA.length);
messagesA.length
);
}); });
it("should track probabilistic acknowledgements of messages received in bloom filter", async () => { it("should track probabilistic acknowledgements of messages received in bloom filter", async () => {
const possibleAcksThreshold = (channelA as any).possibleAcksThreshold; const possibleAcksThreshold = channelA["possibleAcksThreshold"];
const causalHistorySize = channelA["causalHistorySize"];
const causalHistorySize = (channelA as any).causalHistorySize;
const unacknowledgedMessages = [ const unacknowledgedMessages = [
"unacknowledged-message-1", "unacknowledged-message-1",
@ -358,8 +529,8 @@ describe("MessageChannel", function () {
} }
); );
const possibleAcks: ReadonlyMap<MessageId, number> = (channelA as any) const possibleAcks: ReadonlyMap<MessageId, number> =
.possibleAcks; channelA["possibleAcks"];
// Other than the message IDs which were included in causal history, // Other than the message IDs which were included in causal history,
// the remaining messages sent by channel A should be considered possibly acknowledged // the remaining messages sent by channel A should be considered possibly acknowledged
// for having been included in the bloom filter sent from channel B // for having been included in the bloom filter sent from channel B
@ -396,12 +567,12 @@ describe("MessageChannel", function () {
expect(possibleAcks.size).to.equal(0); expect(possibleAcks.size).to.equal(0);
// Messages that were not acknowledged should still be in the outgoing buffer // Messages that were not acknowledged should still be in the outgoing buffer
expect((channelA as any).outgoingBuffer.length).to.equal( expect(channelA["outgoingBuffer"].length).to.equal(
unacknowledgedMessages.length unacknowledgedMessages.length
); );
unacknowledgedMessages.forEach((m) => { unacknowledgedMessages.forEach((m) => {
expect( expect(
((channelA as any).outgoingBuffer as Message[]).some( (channelA["outgoingBuffer"] as Message[]).some(
(message) => (message) =>
message.messageId === MessageChannel.getMessageId(utf8ToBytes(m)) message.messageId === MessageChannel.getMessageId(utf8ToBytes(m))
) )
@ -417,9 +588,8 @@ describe("MessageChannel", function () {
}); });
} }
const possibleAcks: ReadonlyMap<MessageId, number> = (channelA as any) const possibleAcks: ReadonlyMap<MessageId, number> =
.possibleAcks; channelA["possibleAcks"];
expect(possibleAcks.size).to.equal(0); expect(possibleAcks.size).to.equal(0);
}); });
@ -478,7 +648,7 @@ describe("MessageChannel", function () {
}); });
it("should detect messages with missing dependencies", async () => { it("should detect messages with missing dependencies", async () => {
const causalHistorySize = (channelA as any).causalHistorySize; const causalHistorySize = channelA["causalHistorySize"];
for (const m of messagesA) { for (const m of messagesA) {
await sendMessage(channelA, utf8ToBytes(m), callback); await sendMessage(channelA, utf8ToBytes(m), callback);
} }
@ -492,7 +662,7 @@ describe("MessageChannel", function () {
} }
); );
const incomingBuffer = (channelB as any).incomingBuffer as Message[]; const incomingBuffer = channelB["incomingBuffer"];
expect(incomingBuffer.length).to.equal(1); expect(incomingBuffer.length).to.equal(1);
expect(incomingBuffer[0].messageId).to.equal( expect(incomingBuffer[0].messageId).to.equal(
MessageChannel.getMessageId(utf8ToBytes(messagesB[0])) MessageChannel.getMessageId(utf8ToBytes(messagesB[0]))
@ -506,7 +676,7 @@ describe("MessageChannel", function () {
}); });
it("should deliver messages after dependencies are met", async () => { it("should deliver messages after dependencies are met", async () => {
const causalHistorySize = (channelA as any).causalHistorySize; const causalHistorySize = channelA["causalHistorySize"];
const sentMessages = new Array<Message>(); const sentMessages = new Array<Message>();
// First, send messages from A but DON'T deliver them to B yet // First, send messages from A but DON'T deliver them to B yet
for (const m of messagesA) { for (const m of messagesA) {
@ -537,7 +707,7 @@ describe("MessageChannel", function () {
MessageChannel.getMessageId(utf8ToBytes(messagesA[0])) MessageChannel.getMessageId(utf8ToBytes(messagesA[0]))
); );
let incomingBuffer = (channelB as any).incomingBuffer as Message[]; let incomingBuffer = channelB["incomingBuffer"];
expect(incomingBuffer.length).to.equal(1); expect(incomingBuffer.length).to.equal(1);
// Now deliver the missing dependencies // Now deliver the missing dependencies
@ -550,7 +720,7 @@ describe("MessageChannel", function () {
const missingMessages2 = channelB.sweepIncomingBuffer(); const missingMessages2 = channelB.sweepIncomingBuffer();
expect(missingMessages2.length).to.equal(0); expect(missingMessages2.length).to.equal(0);
incomingBuffer = (channelB as any).incomingBuffer as Message[]; incomingBuffer = channelB["incomingBuffer"];
expect(incomingBuffer.length).to.equal(0); expect(incomingBuffer.length).to.equal(0);
}); });
@ -594,8 +764,74 @@ describe("MessageChannel", function () {
expect(irretrievablyLost).to.be.true; expect(irretrievablyLost).to.be.true;
}); });
it("should emit InMessageLost event with retrievalHint when timeout is exceeded", async () => {
const testRetrievalHint = utf8ToBytes("lost-message-hint");
let lostMessages: HistoryEntry[] = [];
// Create a channel with very short timeout
const channelC: MessageChannel = new MessageChannel(channelId, "carol", {
timeoutForLostMessagesMs: 10
});
channelC.addEventListener(MessageChannelEvent.InMessageLost, (event) => {
lostMessages = event.detail;
});
// Send message from A with retrievalHint
await sendMessage(
channelA,
utf8ToBytes(messagesA[0]),
async (message) => {
message.retrievalHint = testRetrievalHint;
return { success: true, retrievalHint: testRetrievalHint };
}
);
// Send another message from A
await sendMessage(channelA, utf8ToBytes(messagesA[1]), callback);
// Send a message to C that depends on the previous messages
await sendMessage(
channelA,
utf8ToBytes(messagesB[0]),
async (message) => {
await receiveMessage(channelC, message);
return { success: true };
}
);
// First sweep - should detect missing messages
channelC.sweepIncomingBuffer();
// Wait for timeout
await new Promise((resolve) => setTimeout(resolve, 20));
// Second sweep - should mark messages as lost
channelC.sweepIncomingBuffer();
expect(lostMessages.length).to.equal(2);
// Verify retrievalHint is included in the lost message
const lostMessageWithHint = lostMessages.find(
(m) =>
m.messageId === MessageChannel.getMessageId(utf8ToBytes(messagesA[0]))
);
expect(lostMessageWithHint).to.exist;
expect(lostMessageWithHint!.retrievalHint).to.deep.equal(
testRetrievalHint
);
// Verify message without retrievalHint has undefined
const lostMessageWithoutHint = lostMessages.find(
(m) =>
m.messageId === MessageChannel.getMessageId(utf8ToBytes(messagesA[1]))
);
expect(lostMessageWithoutHint).to.exist;
expect(lostMessageWithoutHint!.retrievalHint).to.be.undefined;
});
it("should remove messages without delivering if timeout is exceeded", async () => { it("should remove messages without delivering if timeout is exceeded", async () => {
const causalHistorySize = (channelA as any).causalHistorySize; const causalHistorySize = channelA["causalHistorySize"];
// Create a channel with very very short timeout // Create a channel with very very short timeout
const channelC: MessageChannel = new MessageChannel(channelId, "carol", { const channelC: MessageChannel = new MessageChannel(channelId, "carol", {
timeoutForLostMessagesMs: 10 timeoutForLostMessagesMs: 10
@ -616,15 +852,173 @@ describe("MessageChannel", function () {
const missingMessages = channelC.sweepIncomingBuffer(); const missingMessages = channelC.sweepIncomingBuffer();
expect(missingMessages.length).to.equal(causalHistorySize); expect(missingMessages.length).to.equal(causalHistorySize);
let incomingBuffer = (channelC as any).incomingBuffer as Message[]; let incomingBuffer = channelC["incomingBuffer"];
expect(incomingBuffer.length).to.equal(1); expect(incomingBuffer.length).to.equal(1);
await new Promise((resolve) => setTimeout(resolve, 20)); await new Promise((resolve) => setTimeout(resolve, 20));
channelC.sweepIncomingBuffer(); channelC.sweepIncomingBuffer();
incomingBuffer = (channelC as any).incomingBuffer as Message[]; incomingBuffer = channelC["incomingBuffer"];
expect(incomingBuffer.length).to.equal(0); expect(incomingBuffer.length).to.equal(0);
}); });
it("should return HistoryEntry with retrievalHint from sweepIncomingBuffer", async () => {
const testRetrievalHint = utf8ToBytes("test-retrieval-hint");
// Send message from A with a retrievalHint
await sendMessage(
channelA,
utf8ToBytes(messagesA[0]),
async (message) => {
message.retrievalHint = testRetrievalHint;
return { success: true, retrievalHint: testRetrievalHint };
}
);
// Send another message from A that depends on the first one
await sendMessage(
channelA,
utf8ToBytes(messagesA[1]),
async (_message) => {
// Don't send to B yet - we want B to have missing dependencies
return { success: true };
}
);
// Send a message from A to B that depends on previous messages
await sendMessage(
channelA,
utf8ToBytes(messagesB[0]),
async (message) => {
await receiveMessage(channelB, message);
return { success: true };
}
);
// Sweep should detect missing dependencies and return them with retrievalHint
const missingMessages = channelB.sweepIncomingBuffer();
expect(missingMessages.length).to.equal(2);
// Find the first message in missing dependencies
const firstMissingMessage = missingMessages.find(
(m) =>
m.messageId === MessageChannel.getMessageId(utf8ToBytes(messagesA[0]))
);
expect(firstMissingMessage).to.exist;
expect(firstMissingMessage!.retrievalHint).to.deep.equal(
testRetrievalHint
);
});
it("should emit InMessageMissing event with retrievalHint", async () => {
const testRetrievalHint1 = utf8ToBytes("hint-for-message-1");
const testRetrievalHint2 = utf8ToBytes("hint-for-message-2");
let eventReceived = false;
let emittedMissingMessages: HistoryEntry[] = [];
// Listen for InMessageMissing event
channelB.addEventListener(
MessageChannelEvent.InMessageMissing,
(event) => {
eventReceived = true;
emittedMissingMessages = event.detail;
}
);
// Send messages from A with retrievalHints
await sendMessage(
channelA,
utf8ToBytes(messagesA[0]),
async (message) => {
message.retrievalHint = testRetrievalHint1;
return { success: true, retrievalHint: testRetrievalHint1 };
}
);
await sendMessage(
channelA,
utf8ToBytes(messagesA[1]),
async (message) => {
message.retrievalHint = testRetrievalHint2;
return { success: true, retrievalHint: testRetrievalHint2 };
}
);
// Send a message to B that depends on the previous messages
await sendMessage(
channelA,
utf8ToBytes(messagesB[0]),
async (message) => {
await receiveMessage(channelB, message);
return { success: true };
}
);
// Sweep should trigger InMessageMissing event
channelB.sweepIncomingBuffer();
expect(eventReceived).to.be.true;
expect(emittedMissingMessages.length).to.equal(2);
// Verify retrievalHints are included in the event
const firstMissing = emittedMissingMessages.find(
(m) =>
m.messageId === MessageChannel.getMessageId(utf8ToBytes(messagesA[0]))
);
const secondMissing = emittedMissingMessages.find(
(m) =>
m.messageId === MessageChannel.getMessageId(utf8ToBytes(messagesA[1]))
);
expect(firstMissing).to.exist;
expect(firstMissing!.retrievalHint).to.deep.equal(testRetrievalHint1);
expect(secondMissing).to.exist;
expect(secondMissing!.retrievalHint).to.deep.equal(testRetrievalHint2);
});
it("should handle missing messages with undefined retrievalHint", async () => {
let emittedMissingMessages: HistoryEntry[] = [];
channelB.addEventListener(
MessageChannelEvent.InMessageMissing,
(event) => {
emittedMissingMessages = event.detail;
}
);
// Send message from A without retrievalHint
await sendMessage(
channelA,
utf8ToBytes(messagesA[0]),
async (_message) => {
// Don't set retrievalHint
return { success: true };
}
);
// Send a message to B that depends on the previous message
await sendMessage(
channelA,
utf8ToBytes(messagesB[0]),
async (message) => {
await receiveMessage(channelB, message);
return { success: true };
}
);
// Sweep should handle missing message with undefined retrievalHint
const missingMessages = channelB.sweepIncomingBuffer();
expect(missingMessages.length).to.equal(1);
expect(missingMessages[0].messageId).to.equal(
MessageChannel.getMessageId(utf8ToBytes(messagesA[0]))
);
expect(missingMessages[0].retrievalHint).to.be.undefined;
// Event should also reflect undefined retrievalHint
expect(emittedMissingMessages.length).to.equal(1);
expect(emittedMissingMessages[0].retrievalHint).to.be.undefined;
});
}); });
describe("Sweeping outgoing buffer", () => { describe("Sweeping outgoing buffer", () => {
@ -649,7 +1043,7 @@ describe("MessageChannel", function () {
expect(possiblyAcknowledged.length).to.equal(0); expect(possiblyAcknowledged.length).to.equal(0);
// Make sure messages sent by channel A are not in causal history // Make sure messages sent by channel A are not in causal history
const causalHistorySize = (channelA as any).causalHistorySize; const causalHistorySize = channelA["causalHistorySize"];
for (const m of messagesB.slice(0, causalHistorySize)) { for (const m of messagesB.slice(0, causalHistorySize)) {
await sendMessage(channelB, utf8ToBytes(m), callback); await sendMessage(channelB, utf8ToBytes(m), callback);
} }
@ -690,7 +1084,7 @@ describe("MessageChannel", function () {
it("should not be added to outgoing buffer, bloom filter, or local log", async () => { it("should not be added to outgoing buffer, bloom filter, or local log", async () => {
await channelA.pushOutgoingSyncMessage(); await channelA.pushOutgoingSyncMessage();
const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; const outgoingBuffer = channelA["outgoingBuffer"] as Message[];
expect(outgoingBuffer.length).to.equal(0); expect(outgoingBuffer.length).to.equal(0);
const bloomFilter = getBloomFilter(channelA); const bloomFilter = getBloomFilter(channelA);
@ -698,26 +1092,20 @@ describe("MessageChannel", function () {
bloomFilter.lookup(MessageChannel.getMessageId(new Uint8Array())) bloomFilter.lookup(MessageChannel.getMessageId(new Uint8Array()))
).to.equal(false); ).to.equal(false);
const localLog = (channelA as any).localHistory as { const localLog = channelA["localHistory"];
timestamp: number;
messageId: MessageId;
}[];
expect(localLog.length).to.equal(0); expect(localLog.length).to.equal(0);
}); });
it("should not be delivered", async () => { it("should not be delivered", async () => {
const timestampBefore = (channelB as any).lamportTimestamp; const timestampBefore = channelB["lamportTimestamp"];
await channelA.pushOutgoingSyncMessage(async (message) => { await channelA.pushOutgoingSyncMessage(async (message) => {
await receiveMessage(channelB, message); await receiveMessage(channelB, message);
return true; return true;
}); });
const timestampAfter = (channelB as any).lamportTimestamp; const timestampAfter = channelB["lamportTimestamp"];
expect(timestampAfter).to.equal(timestampBefore); expect(timestampAfter).to.equal(timestampBefore);
const localLog = (channelB as any).localHistory as { const localLog = channelB["localHistory"];
timestamp: number;
messageId: MessageId;
}[];
expect(localLog.length).to.equal(0); expect(localLog.length).to.equal(0);
const bloomFilter = getBloomFilter(channelB); const bloomFilter = getBloomFilter(channelB);
@ -739,8 +1127,8 @@ describe("MessageChannel", function () {
return true; return true;
}); });
const causalHistorySize = (channelA as any).causalHistorySize; const causalHistorySize = channelA["causalHistorySize"];
const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; const outgoingBuffer = channelA["outgoingBuffer"] as Message[];
expect(outgoingBuffer.length).to.equal( expect(outgoingBuffer.length).to.equal(
messagesA.length - causalHistorySize messagesA.length - causalHistorySize
); );
@ -753,7 +1141,7 @@ describe("MessageChannel", function () {
}); });
it("should be sent without a timestamp, causal history, or bloom filter", async () => { it("should be sent without a timestamp, causal history, or bloom filter", async () => {
const timestampBefore = (channelA as any).lamportTimestamp; const timestampBefore = channelA["lamportTimestamp"];
await channelA.pushOutgoingEphemeralMessage( await channelA.pushOutgoingEphemeralMessage(
new Uint8Array(), new Uint8Array(),
async (message) => { async (message) => {
@ -764,10 +1152,10 @@ describe("MessageChannel", function () {
} }
); );
const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; const outgoingBuffer = channelA["outgoingBuffer"] as Message[];
expect(outgoingBuffer.length).to.equal(0); expect(outgoingBuffer.length).to.equal(0);
const timestampAfter = (channelA as any).lamportTimestamp; const timestampAfter = channelA["lamportTimestamp"];
expect(timestampAfter).to.equal(timestampBefore); expect(timestampAfter).to.equal(timestampBefore);
}); });
@ -775,9 +1163,9 @@ describe("MessageChannel", function () {
const channelB = new MessageChannel(channelId, "bob"); const channelB = new MessageChannel(channelId, "bob");
// Track initial state // Track initial state
const localHistoryBefore = (channelB as any).localHistory.length; const localHistoryBefore = channelB["localHistory"].length;
const incomingBufferBefore = (channelB as any).incomingBuffer.length; const incomingBufferBefore = channelB["incomingBuffer"].length;
const timestampBefore = (channelB as any).lamportTimestamp; const timestampBefore = channelB["lamportTimestamp"];
await channelA.pushOutgoingEphemeralMessage( await channelA.pushOutgoingEphemeralMessage(
utf8ToBytes(messagesA[0]), utf8ToBytes(messagesA[0]),
@ -793,15 +1181,11 @@ describe("MessageChannel", function () {
// Verify ephemeral message behavior: // Verify ephemeral message behavior:
// 1. Not added to local history // 1. Not added to local history
expect((channelB as any).localHistory.length).to.equal( expect(channelB["localHistory"].length).to.equal(localHistoryBefore);
localHistoryBefore
);
// 2. Not added to incoming buffer // 2. Not added to incoming buffer
expect((channelB as any).incomingBuffer.length).to.equal( expect(channelB["incomingBuffer"].length).to.equal(incomingBufferBefore);
incomingBufferBefore
);
// 3. Doesn't update lamport timestamp // 3. Doesn't update lamport timestamp
expect((channelB as any).lamportTimestamp).to.equal(timestampBefore); expect(channelB["lamportTimestamp"]).to.equal(timestampBefore);
}); });
}); });
}); });

View File

@ -50,7 +50,7 @@ export interface MessageChannelOptions {
export type ILocalHistory = Pick< export type ILocalHistory = Pick<
Array<ContentMessage>, Array<ContentMessage>,
"some" | "push" | "slice" | "find" | "length" "some" | "push" | "slice" | "find" | "length" | "findIndex"
>; >;
export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> { export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
@ -61,7 +61,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
private outgoingBuffer: ContentMessage[]; private outgoingBuffer: ContentMessage[];
private possibleAcks: Map<MessageId, number>; private possibleAcks: Map<MessageId, number>;
private incomingBuffer: Array<ContentMessage | SyncMessage>; private incomingBuffer: Array<ContentMessage | SyncMessage>;
private localHistory: ILocalHistory; private readonly localHistory: ILocalHistory;
private timeReceived: Map<MessageId, number>; private timeReceived: Map<MessageId, number>;
private readonly causalHistorySize: number; private readonly causalHistorySize: number;
private readonly possibleAcksThreshold: number; private readonly possibleAcksThreshold: number;
@ -226,7 +226,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
* proper dependency resolution and causal ordering. * proper dependency resolution and causal ordering.
* *
* @param message - The message to receive and process * @param message - The message to receive and process
* * @param retrievalHint - The retrieval hint for the message, provided by the transport layer
* @example * @example
* ```typescript * ```typescript
* const channel = new MessageChannel("chat-room"); * const channel = new MessageChannel("chat-room");
@ -238,7 +238,12 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
* await channel.processTasks(); * await channel.processTasks();
* ``` * ```
*/ */
public pushIncomingMessage(message: Message): void { public pushIncomingMessage(
message: Message,
retrievalHint: Uint8Array | undefined
): void {
message.retrievalHint = retrievalHint;
this.tasks.push({ this.tasks.push({
command: Command.Receive, command: Command.Receive,
params: { params: {
@ -282,7 +287,9 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
this.senderId, this.senderId,
message.messageId, message.messageId,
"is missing dependencies", "is missing dependencies",
missingDependencies.map((ch) => ch.messageId) missingDependencies.map(({ messageId, retrievalHint }) => {
return { messageId, retrievalHint };
})
); );
// Optionally, if a message has not been received after a predetermined amount of time, // Optionally, if a message has not been received after a predetermined amount of time,
@ -395,7 +402,15 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
} }
private _pushIncomingMessage(message: Message): void { private _pushIncomingMessage(message: Message): void {
log.info(this.senderId, "incoming message", message.messageId); if (message.channelId !== this.channelId) {
log.warn("dropping message on different channel", message.channelId);
return;
}
log.info(
`${this.senderId} incoming message ${message.messageId}`,
`retrieval hint: ${bytesToHex(message.retrievalHint ?? new Uint8Array())}`
);
const isDuplicate = const isDuplicate =
message.content && message.content &&
message.content.length > 0 && message.content.length > 0 &&
@ -589,14 +604,10 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
* Return true if the message was "delivered" * Return true if the message was "delivered"
* *
* @param message * @param message
* @param retrievalHint
* @private * @private
*/ */
// See https://rfc.vac.dev/vac/raw/sds/#deliver-message // See https://rfc.vac.dev/vac/raw/sds/#deliver-message
private deliverMessage( private deliverMessage(message: ContentMessage): boolean {
message: ContentMessage,
retrievalHint?: Uint8Array
): boolean {
if (!isContentMessage(message)) { if (!isContentMessage(message)) {
// Messages with empty content are sync messages. // Messages with empty content are sync messages.
// Messages with no timestamp are ephemeral messages. // Messages with no timestamp are ephemeral messages.
@ -605,7 +616,12 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
return false; return false;
} }
log.info(this.senderId, "delivering message", message.messageId); log.info(
this.senderId,
"delivering message",
message.messageId,
message.retrievalHint
);
if (message.lamportTimestamp > this.lamportTimestamp) { if (message.lamportTimestamp > this.lamportTimestamp) {
this.lamportTimestamp = message.lamportTimestamp; this.lamportTimestamp = message.lamportTimestamp;
} }
@ -620,7 +636,9 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
return true; return true;
} }
message.retrievalHint = retrievalHint; if (!message.retrievalHint) {
log.warn("message delivered without a retrieval hint", message.messageId);
}
this.localHistory.push(message); this.localHistory.push(message);
return true; return true;