fix(sds): initialize lamport timestamp with current time (#2610)

This commit is contained in:
fryorcraken 2025-09-11 18:06:54 +10:00 committed by GitHub
parent 4d5c152f5b
commit cb3af8cd4d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 26 additions and 11 deletions

View File

@ -154,8 +154,8 @@ describe("MessageChannel", function () {
});
// Causal history should only contain the last N messages as defined by causalHistorySize
const causalHistory = outgoingBuffer[outgoingBuffer.length - 1]
.causalHistory as HistoryEntry[];
const causalHistory =
outgoingBuffer[outgoingBuffer.length - 1].causalHistory;
expect(causalHistory.length).to.equal(causalHistorySize);
const expectedCausalHistory = messages
@ -185,6 +185,8 @@ describe("MessageChannel", function () {
});
it("should update lamport timestamp if greater than current timestamp and dependencies are met", async () => {
const timestampBefore = channelA["lamportTimestamp"];
for (const m of messagesA) {
await sendMessage(channelA, utf8ToBytes(m), callback);
}
@ -195,11 +197,12 @@ describe("MessageChannel", function () {
});
}
const timestampAfter = channelA["lamportTimestamp"];
expect(timestampAfter).to.equal(messagesB.length);
expect(timestampAfter - timestampBefore).to.equal(messagesB.length);
});
it("should maintain proper timestamps if all messages received", async () => {
let timestamp = 0;
const aTimestampBefore = channelA["lamportTimestamp"];
let timestamp = channelB["lamportTimestamp"];
for (const m of messagesA) {
await sendMessage(channelA, utf8ToBytes(m), async (message) => {
timestamp++;
@ -219,7 +222,9 @@ describe("MessageChannel", function () {
}
const expectedLength = messagesA.length + messagesB.length;
expect(channelA["lamportTimestamp"]).to.equal(expectedLength);
expect(channelA["lamportTimestamp"]).to.equal(
aTimestampBefore + expectedLength
);
expect(channelA["lamportTimestamp"]).to.equal(
channelB["lamportTimestamp"]
);
@ -312,6 +317,8 @@ describe("MessageChannel", function () {
const message2Id = MessageChannel.getMessageId(message2Payload);
const message3Id = MessageChannel.getMessageId(message3Payload);
const startTimestamp = channelA["lamportTimestamp"];
// Send own message first (timestamp will be 1)
await sendMessage(channelA, message1Payload, callback);
@ -323,7 +330,7 @@ describe("MessageChannel", function () {
channelA.channelId,
"bob",
[],
3, // Higher timestamp
startTimestamp + 3, // Higher timestamp
undefined,
message3Payload
)
@ -337,7 +344,7 @@ describe("MessageChannel", function () {
channelA.channelId,
"carol",
[],
2, // Middle timestamp
startTimestamp + 2, // Middle timestamp
undefined,
message2Payload
)
@ -350,21 +357,27 @@ describe("MessageChannel", function () {
const first = localHistory.findIndex(
({ messageId, lamportTimestamp }) => {
return messageId === message1Id && lamportTimestamp === 1;
return (
messageId === message1Id && lamportTimestamp === startTimestamp + 1
);
}
);
expect(first).to.eq(0);
const second = localHistory.findIndex(
({ messageId, lamportTimestamp }) => {
return messageId === message2Id && lamportTimestamp === 2;
return (
messageId === message2Id && lamportTimestamp === startTimestamp + 2
);
}
);
expect(second).to.eq(1);
const third = localHistory.findIndex(
({ messageId, lamportTimestamp }) => {
return messageId === message3Id && lamportTimestamp === 3;
return (
messageId === message3Id && lamportTimestamp === startTimestamp + 3
);
}
);
expect(third).to.eq(2);

View File

@ -95,7 +95,9 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
super();
this.channelId = channelId;
this.senderId = senderId;
this.lamportTimestamp = 0;
// SDS RFC says to use nanoseconds, but current time in nanosecond is > Number.MAX_SAFE_INTEGER
// So instead we are using milliseconds and proposing a spec change (TODO)
this.lamportTimestamp = Date.now();
this.filter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS);
this.outgoingBuffer = [];
this.possibleAcks = new Map();