From c0ecb6abbaae0544f352b89293f59f274600a916 Mon Sep 17 00:00:00 2001 From: fryorcraken <110212804+fryorcraken@users.noreply.github.com> Date: Thu, 2 Oct 2025 09:07:10 +1000 Subject: [PATCH] fix!: SDS lamport timestamp overflow and keep it to current time (#2664) * fix!: avoid SDS lamport timestamp overflow The SDS timestamp is initialized to the current time in milliseconds, which is a 13 digits value (e.g. 1,759,223,090,052). The maximum value for int32 is 2,147,483,647 (10 digits), which is clearly less than the timestamp. Maximum value for uint32 is 4,294,967,295 (10 digits), which does not help with ms timestamp. uint64 is BigInt in JavaScript, so best to be avoided unless strictly necessary as it creates complexity. max uint64 is 18,446,744,073,709,551,615 (20 digits). Using seconds instead of milliseconds would enable usage of uint32 valid until the year 2106. The lamport timestamp is only initialized to current time for a new channel. The only scenario is when a user comes in a channel, and thinks it's new (did not get previous messages), and then starts sending messages. Meaning that there may be an initial timestamp conflict until the logs are consolidated, which is already handled by the protocol. * change lamportTimestamp to uint64 in protobuf * lamport timestamp remains close to current time --- packages/proto/src/generated/sds_message.ts | 6 +- packages/proto/src/lib/sds_message.proto | 2 +- .../reliable_channel/reliable_channel.spec.ts | 34 +++++------ .../message_channel/lamport_timestamp.spec.ts | 56 +++++++++++++++++++ .../sds/src/message_channel/message.spec.ts | 12 ++-- packages/sds/src/message_channel/message.ts | 18 +++--- .../message_channel/message_channel.spec.ts | 32 ++++++----- .../src/message_channel/message_channel.ts | 20 +++++-- 8 files changed, 123 insertions(+), 57 deletions(-) create mode 100644 packages/sds/src/message_channel/lamport_timestamp.spec.ts diff --git a/packages/proto/src/generated/sds_message.ts b/packages/proto/src/generated/sds_message.ts index 20ef8746ae..eba12d4acd 100644 --- a/packages/proto/src/generated/sds_message.ts +++ b/packages/proto/src/generated/sds_message.ts @@ -84,7 +84,7 @@ export interface SdsMessage { senderId: string messageId: string channelId: string - lamportTimestamp?: number + lamportTimestamp?: bigint causalHistory: HistoryEntry[] bloomFilter?: Uint8Array content?: Uint8Array @@ -117,7 +117,7 @@ export namespace SdsMessage { if (obj.lamportTimestamp != null) { w.uint32(80) - w.int32(obj.lamportTimestamp) + w.uint64(obj.lamportTimestamp) } if (obj.causalHistory != null) { @@ -167,7 +167,7 @@ export namespace SdsMessage { break } case 10: { - obj.lamportTimestamp = reader.int32() + obj.lamportTimestamp = reader.uint64() break } case 11: { diff --git a/packages/proto/src/lib/sds_message.proto b/packages/proto/src/lib/sds_message.proto index 5344a0d33a..c38e99b084 100644 --- a/packages/proto/src/lib/sds_message.proto +++ b/packages/proto/src/lib/sds_message.proto @@ -9,7 +9,7 @@ message SdsMessage { string sender_id = 1; // Participant ID of the message sender string message_id = 2; // Unique identifier of the message string channel_id = 3; // Identifier of the channel to which the message belongs - optional int32 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel + optional uint64 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel repeated HistoryEntry causal_history = 11; // List of preceding message IDs that this message causally depends on. Generally 2 or 3 message IDs are included. optional bytes bloom_filter = 12; // Bloom filter representing received message IDs in channel optional bytes content = 20; // Actual content of the message diff --git a/packages/sdk/src/reliable_channel/reliable_channel.spec.ts b/packages/sdk/src/reliable_channel/reliable_channel.spec.ts index 7b29a6b55d..ad69d35009 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel.spec.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel.spec.ts @@ -419,7 +419,7 @@ describe("Reliable Channel", () => { "MyChannel", "alice", [], - 1, + 1n, undefined, message ); @@ -532,7 +532,7 @@ describe("Reliable Channel", () => { "testChannel", "testSender", [], - 1, + 1n, undefined, messagePayload ); @@ -600,7 +600,7 @@ describe("Reliable Channel", () => { "testChannel", "testSender", [], - 1, + 1n, undefined, message1Payload ); @@ -610,7 +610,7 @@ describe("Reliable Channel", () => { "testChannel", "testSender", [], - 2, + 2n, undefined, message2Payload ); @@ -721,7 +721,7 @@ describe("Reliable Channel", () => { "differentChannel", senderId, [], - 1, + 1n, undefined, utf8ToBytes("different channel") ); @@ -731,7 +731,7 @@ describe("Reliable Channel", () => { channelId, senderId, [], - 2, + 2n, undefined, undefined ); @@ -741,7 +741,7 @@ describe("Reliable Channel", () => { channelId, senderId, [], - 3, + 3n, undefined, utf8ToBytes("after sync") ); @@ -824,7 +824,7 @@ describe("Reliable Channel", () => { channelId, senderId, [{ messageId: "previous-msg-id" }], - 1, + 1n, undefined, utf8ToBytes("content message") ); @@ -834,7 +834,7 @@ describe("Reliable Channel", () => { channelId, senderId, [], - 2, + 2n, undefined, utf8ToBytes("after content") ); @@ -905,7 +905,7 @@ describe("Reliable Channel", () => { "differentChannel1", senderId, [], - 1, + 1n, undefined, utf8ToBytes("different 1") ); @@ -915,7 +915,7 @@ describe("Reliable Channel", () => { "differentChannel2", senderId, [], - 2, + 2n, undefined, utf8ToBytes("different 2") ); @@ -925,7 +925,7 @@ describe("Reliable Channel", () => { "differentChannel3", senderId, [], - 3, + 3n, undefined, utf8ToBytes("different 3") ); @@ -1041,7 +1041,7 @@ describe("Reliable Channel", () => { "differentChannel", "sender", [], - 1, + 1n, undefined, utf8ToBytes("content") ); @@ -1060,7 +1060,7 @@ describe("Reliable Channel", () => { "testChannel", "sender", [], - 1, + 1n, undefined, undefined ); @@ -1079,7 +1079,7 @@ describe("Reliable Channel", () => { "testChannel", "sender", [], - 1, + 1n, undefined, utf8ToBytes("content") ); @@ -1098,7 +1098,7 @@ describe("Reliable Channel", () => { "testChannel", "sender", [{ messageId: "previous-msg-id" }], - 1, + 1n, undefined, utf8ToBytes("content") ); @@ -1117,7 +1117,7 @@ describe("Reliable Channel", () => { "testChannel", "sender", [{ messageId: "previous-msg-id" }], - 1, + 1n, undefined, undefined ); diff --git a/packages/sds/src/message_channel/lamport_timestamp.spec.ts b/packages/sds/src/message_channel/lamport_timestamp.spec.ts new file mode 100644 index 0000000000..57aec2666e --- /dev/null +++ b/packages/sds/src/message_channel/lamport_timestamp.spec.ts @@ -0,0 +1,56 @@ +import { expect } from "chai"; + +import { lamportTimestampIncrement } from "./message_channel.js"; + +describe("lamportTimestampIncrement", () => { + it("should increment timestamp by 1 when current time is not greater", () => { + const futureTimestamp = BigInt(Date.now()) + 1000n; + const result = lamportTimestampIncrement(futureTimestamp); + expect(result).to.equal(futureTimestamp + 1n); + }); + + it("should use current time when it's greater than incremented timestamp", () => { + const pastTimestamp = BigInt(Date.now()) - 1000n; + const result = lamportTimestampIncrement(pastTimestamp); + const now = BigInt(Date.now()); + // Result should be at least as large as now (within small tolerance for test execution time) + expect(result >= now - 10n).to.be.true; + expect(result <= now + 10n).to.be.true; + }); + + it("should handle timestamp equal to current time", () => { + const currentTimestamp = BigInt(Date.now()); + const result = lamportTimestampIncrement(currentTimestamp); + // Should increment by 1 since now is likely not greater than current + 1 + expect(result >= currentTimestamp + 1n).to.be.true; + }); + + it("should ensure monotonic increase", () => { + let timestamp = BigInt(Date.now()) + 5000n; + const results: bigint[] = []; + + for (let i = 0; i < 5; i++) { + timestamp = lamportTimestampIncrement(timestamp); + results.push(timestamp); + } + + // Verify all timestamps are strictly increasing + for (let i = 1; i < results.length; i++) { + expect(results[i] > results[i - 1]).to.be.true; + } + }); + + it("should handle very large timestamps", () => { + const largeTimestamp = BigInt(Number.MAX_SAFE_INTEGER) * 1000n; + const result = lamportTimestampIncrement(largeTimestamp); + expect(result).to.equal(largeTimestamp + 1n); + }); + + it("should jump to current time when timestamp is far in the past", () => { + const veryOldTimestamp = 1000n; // Very old timestamp (1 second after epoch) + const result = lamportTimestampIncrement(veryOldTimestamp); + const now = BigInt(Date.now()); + expect(result >= now - 10n).to.be.true; + expect(result <= now + 10n).to.be.true; + }); +}); diff --git a/packages/sds/src/message_channel/message.spec.ts b/packages/sds/src/message_channel/message.spec.ts index 11bb9b3735..680bf5cdb5 100644 --- a/packages/sds/src/message_channel/message.spec.ts +++ b/packages/sds/src/message_channel/message.spec.ts @@ -18,7 +18,7 @@ describe("Message serialization", () => { "my-channel", "me", [], - 0, + 0n, bloomFilter.toBytes(), undefined ); @@ -42,7 +42,7 @@ describe("Message serialization", () => { "my-channel", "me", [{ messageId: depMessageId, retrievalHint: depRetrievalHint }], - 0, + 0n, undefined, undefined ); @@ -63,7 +63,7 @@ describe("ContentMessage comparison with < operator", () => { "channel", "sender", [], - 100, // Lower timestamp + 100n, // Lower timestamp undefined, new Uint8Array([1]) ); @@ -73,7 +73,7 @@ describe("ContentMessage comparison with < operator", () => { "channel", "sender", [], - 200, // Higher timestamp + 200n, // Higher timestamp undefined, new Uint8Array([2]) ); @@ -89,7 +89,7 @@ describe("ContentMessage comparison with < operator", () => { "channel", "sender", [], - 100, // Same timestamp + 100n, // Same timestamp undefined, new Uint8Array([1]) ); @@ -99,7 +99,7 @@ describe("ContentMessage comparison with < operator", () => { "channel", "sender", [], - 100, // Same timestamp + 100n, // Same timestamp undefined, new Uint8Array([2]) ); diff --git a/packages/sds/src/message_channel/message.ts b/packages/sds/src/message_channel/message.ts index 638f6c71a0..78b99f9006 100644 --- a/packages/sds/src/message_channel/message.ts +++ b/packages/sds/src/message_channel/message.ts @@ -14,7 +14,7 @@ export class Message implements proto_sds_message.SdsMessage { public channelId: string, public senderId: string, public causalHistory: proto_sds_message.HistoryEntry[], - public lamportTimestamp?: number | undefined, + public lamportTimestamp?: bigint | undefined, public bloomFilter?: Uint8Array | undefined, public content?: Uint8Array | undefined, /** @@ -94,7 +94,7 @@ export class SyncMessage extends Message { public channelId: string, public senderId: string, public causalHistory: proto_sds_message.HistoryEntry[], - public lamportTimestamp: number, + public lamportTimestamp: bigint, public bloomFilter: Uint8Array | undefined, public content: undefined, /** @@ -116,12 +116,12 @@ export class SyncMessage extends Message { } function testSyncMessage(message: { - lamportTimestamp?: number; + lamportTimestamp?: bigint; content?: Uint8Array; }): boolean { return Boolean( "lamportTimestamp" in message && - typeof message.lamportTimestamp === "number" && + typeof message.lamportTimestamp === "bigint" && (message.content === undefined || message.content.length === 0) ); } @@ -169,7 +169,7 @@ export function isEphemeralMessage( } function testEphemeralMessage(message: { - lamportTimestamp?: number; + lamportTimestamp?: bigint; content?: Uint8Array; }): boolean { return Boolean( @@ -186,7 +186,7 @@ export class ContentMessage extends Message { public channelId: string, public senderId: string, public causalHistory: proto_sds_message.HistoryEntry[], - public lamportTimestamp: number, + public lamportTimestamp: bigint, public bloomFilter: Uint8Array | undefined, public content: Uint8Array, /** @@ -226,12 +226,12 @@ export function isContentMessage( } function testContentMessage(message: { - lamportTimestamp?: number; + lamportTimestamp?: bigint; content?: Uint8Array; -}): message is { lamportTimestamp: number; content: Uint8Array } { +}): message is { lamportTimestamp: bigint; content: Uint8Array } { return Boolean( "lamportTimestamp" in message && - typeof message.lamportTimestamp === "number" && + typeof message.lamportTimestamp === "bigint" && message.content && message.content.length ); diff --git a/packages/sds/src/message_channel/message_channel.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts index 604994c0f8..6098f4c5b7 100644 --- a/packages/sds/src/message_channel/message_channel.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -75,7 +75,7 @@ describe("MessageChannel", function () { const timestampBefore = channelA["lamportTimestamp"]; await sendMessage(channelA, utf8ToBytes("message"), callback); const timestampAfter = channelA["lamportTimestamp"]; - expect(timestampAfter).to.equal(timestampBefore + 1); + expect(timestampAfter).to.equal(timestampBefore + 1n); }); it("should push the message to the outgoing buffer", async () => { @@ -95,7 +95,7 @@ describe("MessageChannel", function () { it("should insert message id into causal history", async () => { const payload = utf8ToBytes("message"); - const expectedTimestamp = channelA["lamportTimestamp"] + 1; + const expectedTimestamp = channelA["lamportTimestamp"] + 1n; const messageId = MessageChannel.getMessageId(payload); await sendMessage(channelA, payload, callback); const messageIdLog = channelA["localHistory"] as ILocalHistory; @@ -181,7 +181,7 @@ describe("MessageChannel", function () { return { success: true }; }); const timestampAfter = channelA["lamportTimestamp"]; - expect(timestampAfter).to.equal(timestampBefore + 1); + expect(timestampAfter).to.equal(timestampBefore + 1n); }); // TODO: test is failing in CI, investigate in https://github.com/waku-org/js-waku/issues/2648 @@ -201,7 +201,9 @@ describe("MessageChannel", function () { }); } const timestampAfter = testChannelA["lamportTimestamp"]; - expect(timestampAfter - timestampBefore).to.equal(messagesB.length); + expect(timestampAfter - timestampBefore).to.equal( + BigInt(messagesB.length) + ); }); // TODO: test is failing in CI, investigate in https://github.com/waku-org/js-waku/issues/2648 @@ -228,7 +230,7 @@ describe("MessageChannel", function () { const expectedLength = messagesA.length + messagesB.length; expect(channelA["lamportTimestamp"]).to.equal( - aTimestampBefore + expectedLength + aTimestampBefore + BigInt(expectedLength) ); expect(channelA["lamportTimestamp"]).to.equal( channelB["lamportTimestamp"] @@ -293,7 +295,7 @@ describe("MessageChannel", function () { channelA.channelId, "not-alice", [], - 1, + 1n, undefined, payload, testRetrievalHint @@ -335,7 +337,7 @@ describe("MessageChannel", function () { channelA.channelId, "bob", [], - startTimestamp + 3, // Higher timestamp + startTimestamp + 3n, // Higher timestamp undefined, message3Payload ) @@ -349,7 +351,7 @@ describe("MessageChannel", function () { channelA.channelId, "carol", [], - startTimestamp + 2, // Middle timestamp + startTimestamp + 2n, // Middle timestamp undefined, message2Payload ) @@ -363,7 +365,7 @@ describe("MessageChannel", function () { const first = localHistory.findIndex( ({ messageId, lamportTimestamp }) => { return ( - messageId === message1Id && lamportTimestamp === startTimestamp + 1 + messageId === message1Id && lamportTimestamp === startTimestamp + 1n ); } ); @@ -372,7 +374,7 @@ describe("MessageChannel", function () { const second = localHistory.findIndex( ({ messageId, lamportTimestamp }) => { return ( - messageId === message2Id && lamportTimestamp === startTimestamp + 2 + messageId === message2Id && lamportTimestamp === startTimestamp + 2n ); } ); @@ -381,7 +383,7 @@ describe("MessageChannel", function () { const third = localHistory.findIndex( ({ messageId, lamportTimestamp }) => { return ( - messageId === message3Id && lamportTimestamp === startTimestamp + 3 + messageId === message3Id && lamportTimestamp === startTimestamp + 3n ); } ); @@ -404,7 +406,7 @@ describe("MessageChannel", function () { channelA.channelId, "bob", [], - 5, // Same timestamp + 5n, // Same timestamp undefined, message2Payload ) @@ -417,7 +419,7 @@ describe("MessageChannel", function () { channelA.channelId, "carol", [], - 5, // Same timestamp + 5n, // Same timestamp undefined, message1Payload ) @@ -432,14 +434,14 @@ describe("MessageChannel", function () { const first = localHistory.findIndex( ({ messageId, lamportTimestamp }) => { - return messageId === expectedOrder[0] && lamportTimestamp == 5; + return messageId === expectedOrder[0] && lamportTimestamp == 5n; } ); expect(first).to.eq(0); const second = localHistory.findIndex( ({ messageId, lamportTimestamp }) => { - return messageId === expectedOrder[1] && lamportTimestamp == 5; + return messageId === expectedOrder[1] && lamportTimestamp == 5n; } ); expect(second).to.eq(1); diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index 6375e819f7..37dd1a6160 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -56,7 +56,7 @@ export type ILocalHistory = Pick< export class MessageChannel extends TypedEventEmitter { public readonly channelId: ChannelId; public readonly senderId: SenderId; - private lamportTimestamp: number; + private lamportTimestamp: bigint; private filter: DefaultBloomFilter; private outgoingBuffer: ContentMessage[]; private possibleAcks: Map; @@ -95,9 +95,8 @@ export class MessageChannel extends TypedEventEmitter { super(); this.channelId = channelId; this.senderId = senderId; - // SDS RFC says to use nanoseconds, but current time in nanosecond is > Number.MAX_SAFE_INTEGER - // So instead we are using milliseconds and proposing a spec change (TODO) - this.lamportTimestamp = Date.now(); + // Initialize channel lamport timestamp to current time in milliseconds. + this.lamportTimestamp = BigInt(Date.now()); this.filter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS); this.outgoingBuffer = []; this.possibleAcks = new Map(); @@ -369,7 +368,7 @@ export class MessageChannel extends TypedEventEmitter { public async pushOutgoingSyncMessage( callback?: (message: SyncMessage) => Promise ): Promise { - this.lamportTimestamp++; + this.lamportTimestamp = lamportTimestampIncrement(this.lamportTimestamp); const message = new SyncMessage( // does not need to be secure randomness `sync-${Math.random().toString(36).substring(2)}`, @@ -526,7 +525,7 @@ export class MessageChannel extends TypedEventEmitter { retrievalHint?: Uint8Array; }> ): Promise { - this.lamportTimestamp++; + this.lamportTimestamp = lamportTimestampIncrement(this.lamportTimestamp); const messageId = MessageChannel.getMessageId(payload); @@ -724,3 +723,12 @@ export class MessageChannel extends TypedEventEmitter { }); } } + +export function lamportTimestampIncrement(lamportTimestamp: bigint): bigint { + const now = BigInt(Date.now()); + lamportTimestamp++; + if (now > lamportTimestamp) { + return now; + } + return lamportTimestamp; +}