From 166f086faa8760dd6b91353b580fd347c5193b20 Mon Sep 17 00:00:00 2001 From: fryorcraken Date: Wed, 1 Oct 2025 21:25:58 +1000 Subject: [PATCH] lamport timestamp remains close to current time --- .../reliable_channel/reliable_channel.spec.ts | 26 ++++----- .../message_channel/lamport_timestamp.spec.ts | 56 +++++++++++++++++++ .../src/message_channel/message_channel.ts | 17 ++++-- 3 files changed, 82 insertions(+), 17 deletions(-) create mode 100644 packages/sds/src/message_channel/lamport_timestamp.spec.ts diff --git a/packages/sdk/src/reliable_channel/reliable_channel.spec.ts b/packages/sdk/src/reliable_channel/reliable_channel.spec.ts index e1928ceef8..ad69d35009 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel.spec.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel.spec.ts @@ -721,7 +721,7 @@ describe("Reliable Channel", () => { "differentChannel", senderId, [], - 1, + 1n, undefined, utf8ToBytes("different channel") ); @@ -731,7 +731,7 @@ describe("Reliable Channel", () => { channelId, senderId, [], - 2, + 2n, undefined, undefined ); @@ -741,7 +741,7 @@ describe("Reliable Channel", () => { channelId, senderId, [], - 3, + 3n, undefined, utf8ToBytes("after sync") ); @@ -824,7 +824,7 @@ describe("Reliable Channel", () => { channelId, senderId, [{ messageId: "previous-msg-id" }], - 1, + 1n, undefined, utf8ToBytes("content message") ); @@ -834,7 +834,7 @@ describe("Reliable Channel", () => { channelId, senderId, [], - 2, + 2n, undefined, utf8ToBytes("after content") ); @@ -905,7 +905,7 @@ describe("Reliable Channel", () => { "differentChannel1", senderId, [], - 1, + 1n, undefined, utf8ToBytes("different 1") ); @@ -915,7 +915,7 @@ describe("Reliable Channel", () => { "differentChannel2", senderId, [], - 2, + 2n, undefined, utf8ToBytes("different 2") ); @@ -925,7 +925,7 @@ describe("Reliable Channel", () => { "differentChannel3", senderId, [], - 3, + 3n, undefined, utf8ToBytes("different 3") ); @@ -1041,7 +1041,7 @@ describe("Reliable Channel", () => { "differentChannel", "sender", [], - 1, + 1n, undefined, utf8ToBytes("content") ); @@ -1060,7 +1060,7 @@ describe("Reliable Channel", () => { "testChannel", "sender", [], - 1, + 1n, undefined, undefined ); @@ -1079,7 +1079,7 @@ describe("Reliable Channel", () => { "testChannel", "sender", [], - 1, + 1n, undefined, utf8ToBytes("content") ); @@ -1098,7 +1098,7 @@ describe("Reliable Channel", () => { "testChannel", "sender", [{ messageId: "previous-msg-id" }], - 1, + 1n, undefined, utf8ToBytes("content") ); @@ -1117,7 +1117,7 @@ describe("Reliable Channel", () => { "testChannel", "sender", [{ messageId: "previous-msg-id" }], - 1, + 1n, undefined, undefined ); diff --git a/packages/sds/src/message_channel/lamport_timestamp.spec.ts b/packages/sds/src/message_channel/lamport_timestamp.spec.ts new file mode 100644 index 0000000000..57aec2666e --- /dev/null +++ b/packages/sds/src/message_channel/lamport_timestamp.spec.ts @@ -0,0 +1,56 @@ +import { expect } from "chai"; + +import { lamportTimestampIncrement } from "./message_channel.js"; + +describe("lamportTimestampIncrement", () => { + it("should increment timestamp by 1 when current time is not greater", () => { + const futureTimestamp = BigInt(Date.now()) + 1000n; + const result = lamportTimestampIncrement(futureTimestamp); + expect(result).to.equal(futureTimestamp + 1n); + }); + + it("should use current time when it's greater than incremented timestamp", () => { + const pastTimestamp = BigInt(Date.now()) - 1000n; + const result = lamportTimestampIncrement(pastTimestamp); + const now = BigInt(Date.now()); + // Result should be at least as large as now (within small tolerance for test execution time) + expect(result >= now - 10n).to.be.true; + expect(result <= now + 10n).to.be.true; + }); + + it("should handle timestamp equal to current time", () => { + const currentTimestamp = BigInt(Date.now()); + const result = lamportTimestampIncrement(currentTimestamp); + // Should increment by 1 since now is likely not greater than current + 1 + expect(result >= currentTimestamp + 1n).to.be.true; + }); + + it("should ensure monotonic increase", () => { + let timestamp = BigInt(Date.now()) + 5000n; + const results: bigint[] = []; + + for (let i = 0; i < 5; i++) { + timestamp = lamportTimestampIncrement(timestamp); + results.push(timestamp); + } + + // Verify all timestamps are strictly increasing + for (let i = 1; i < results.length; i++) { + expect(results[i] > results[i - 1]).to.be.true; + } + }); + + it("should handle very large timestamps", () => { + const largeTimestamp = BigInt(Number.MAX_SAFE_INTEGER) * 1000n; + const result = lamportTimestampIncrement(largeTimestamp); + expect(result).to.equal(largeTimestamp + 1n); + }); + + it("should jump to current time when timestamp is far in the past", () => { + const veryOldTimestamp = 1000n; // Very old timestamp (1 second after epoch) + const result = lamportTimestampIncrement(veryOldTimestamp); + const now = BigInt(Date.now()); + expect(result >= now - 10n).to.be.true; + expect(result <= now + 10n).to.be.true; + }); +}); diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index a416d53198..37dd1a6160 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -95,8 +95,8 @@ export class MessageChannel extends TypedEventEmitter { super(); this.channelId = channelId; this.senderId = senderId; - // Initialize channel lamport timestamp to current time in seconds. - this.lamportTimestamp = BigInt(Math.floor(Date.now() / 1000)); + // Initialize channel lamport timestamp to current time in milliseconds. + this.lamportTimestamp = BigInt(Date.now()); this.filter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS); this.outgoingBuffer = []; this.possibleAcks = new Map(); @@ -368,7 +368,7 @@ export class MessageChannel extends TypedEventEmitter { public async pushOutgoingSyncMessage( callback?: (message: SyncMessage) => Promise ): Promise { - this.lamportTimestamp = this.lamportTimestamp + 1n; + this.lamportTimestamp = lamportTimestampIncrement(this.lamportTimestamp); const message = new SyncMessage( // does not need to be secure randomness `sync-${Math.random().toString(36).substring(2)}`, @@ -525,7 +525,7 @@ export class MessageChannel extends TypedEventEmitter { retrievalHint?: Uint8Array; }> ): Promise { - this.lamportTimestamp = this.lamportTimestamp + 1n; + this.lamportTimestamp = lamportTimestampIncrement(this.lamportTimestamp); const messageId = MessageChannel.getMessageId(payload); @@ -723,3 +723,12 @@ export class MessageChannel extends TypedEventEmitter { }); } } + +export function lamportTimestampIncrement(lamportTimestamp: bigint): bigint { + const now = BigInt(Date.now()); + lamportTimestamp++; + if (now > lamportTimestamp) { + return now; + } + return lamportTimestamp; +}