diff --git a/packages/proto/src/generated/sds_message.ts b/packages/proto/src/generated/sds_message.ts index 20ef8746ae..eba12d4acd 100644 --- a/packages/proto/src/generated/sds_message.ts +++ b/packages/proto/src/generated/sds_message.ts @@ -84,7 +84,7 @@ export interface SdsMessage { senderId: string messageId: string channelId: string - lamportTimestamp?: number + lamportTimestamp?: bigint causalHistory: HistoryEntry[] bloomFilter?: Uint8Array content?: Uint8Array @@ -117,7 +117,7 @@ export namespace SdsMessage { if (obj.lamportTimestamp != null) { w.uint32(80) - w.int32(obj.lamportTimestamp) + w.uint64(obj.lamportTimestamp) } if (obj.causalHistory != null) { @@ -167,7 +167,7 @@ export namespace SdsMessage { break } case 10: { - obj.lamportTimestamp = reader.int32() + obj.lamportTimestamp = reader.uint64() break } case 11: { diff --git a/packages/proto/src/lib/sds_message.proto b/packages/proto/src/lib/sds_message.proto index 5344a0d33a..c38e99b084 100644 --- a/packages/proto/src/lib/sds_message.proto +++ b/packages/proto/src/lib/sds_message.proto @@ -9,7 +9,7 @@ message SdsMessage { string sender_id = 1; // Participant ID of the message sender string message_id = 2; // Unique identifier of the message string channel_id = 3; // Identifier of the channel to which the message belongs - optional int32 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel + optional uint64 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel repeated HistoryEntry causal_history = 11; // List of preceding message IDs that this message causally depends on. Generally 2 or 3 message IDs are included. optional bytes bloom_filter = 12; // Bloom filter representing received message IDs in channel optional bytes content = 20; // Actual content of the message diff --git a/packages/sdk/src/reliable_channel/reliable_channel.spec.ts b/packages/sdk/src/reliable_channel/reliable_channel.spec.ts index 7b29a6b55d..ad69d35009 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel.spec.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel.spec.ts @@ -419,7 +419,7 @@ describe("Reliable Channel", () => { "MyChannel", "alice", [], - 1, + 1n, undefined, message ); @@ -532,7 +532,7 @@ describe("Reliable Channel", () => { "testChannel", "testSender", [], - 1, + 1n, undefined, messagePayload ); @@ -600,7 +600,7 @@ describe("Reliable Channel", () => { "testChannel", "testSender", [], - 1, + 1n, undefined, message1Payload ); @@ -610,7 +610,7 @@ describe("Reliable Channel", () => { "testChannel", "testSender", [], - 2, + 2n, undefined, message2Payload ); @@ -721,7 +721,7 @@ describe("Reliable Channel", () => { "differentChannel", senderId, [], - 1, + 1n, undefined, utf8ToBytes("different channel") ); @@ -731,7 +731,7 @@ describe("Reliable Channel", () => { channelId, senderId, [], - 2, + 2n, undefined, undefined ); @@ -741,7 +741,7 @@ describe("Reliable Channel", () => { channelId, senderId, [], - 3, + 3n, undefined, utf8ToBytes("after sync") ); @@ -824,7 +824,7 @@ describe("Reliable Channel", () => { channelId, senderId, [{ messageId: "previous-msg-id" }], - 1, + 1n, undefined, utf8ToBytes("content message") ); @@ -834,7 +834,7 @@ describe("Reliable Channel", () => { channelId, senderId, [], - 2, + 2n, undefined, utf8ToBytes("after content") ); @@ -905,7 +905,7 @@ describe("Reliable Channel", () => { "differentChannel1", senderId, [], - 1, + 1n, undefined, utf8ToBytes("different 1") ); @@ -915,7 +915,7 @@ describe("Reliable Channel", () => { "differentChannel2", senderId, [], - 2, + 2n, undefined, utf8ToBytes("different 2") ); @@ -925,7 +925,7 @@ describe("Reliable Channel", () => { "differentChannel3", senderId, [], - 3, + 3n, undefined, utf8ToBytes("different 3") ); @@ -1041,7 +1041,7 @@ describe("Reliable Channel", () => { "differentChannel", "sender", [], - 1, + 1n, undefined, utf8ToBytes("content") ); @@ -1060,7 +1060,7 @@ describe("Reliable Channel", () => { "testChannel", "sender", [], - 1, + 1n, undefined, undefined ); @@ -1079,7 +1079,7 @@ describe("Reliable Channel", () => { "testChannel", "sender", [], - 1, + 1n, undefined, utf8ToBytes("content") ); @@ -1098,7 +1098,7 @@ describe("Reliable Channel", () => { "testChannel", "sender", [{ messageId: "previous-msg-id" }], - 1, + 1n, undefined, utf8ToBytes("content") ); @@ -1117,7 +1117,7 @@ describe("Reliable Channel", () => { "testChannel", "sender", [{ messageId: "previous-msg-id" }], - 1, + 1n, undefined, undefined ); diff --git a/packages/sds/src/message_channel/lamport_timestamp.spec.ts b/packages/sds/src/message_channel/lamport_timestamp.spec.ts new file mode 100644 index 0000000000..57aec2666e --- /dev/null +++ b/packages/sds/src/message_channel/lamport_timestamp.spec.ts @@ -0,0 +1,56 @@ +import { expect } from "chai"; + +import { lamportTimestampIncrement } from "./message_channel.js"; + +describe("lamportTimestampIncrement", () => { + it("should increment timestamp by 1 when current time is not greater", () => { + const futureTimestamp = BigInt(Date.now()) + 1000n; + const result = lamportTimestampIncrement(futureTimestamp); + expect(result).to.equal(futureTimestamp + 1n); + }); + + it("should use current time when it's greater than incremented timestamp", () => { + const pastTimestamp = BigInt(Date.now()) - 1000n; + const result = lamportTimestampIncrement(pastTimestamp); + const now = BigInt(Date.now()); + // Result should be at least as large as now (within small tolerance for test execution time) + expect(result >= now - 10n).to.be.true; + expect(result <= now + 10n).to.be.true; + }); + + it("should handle timestamp equal to current time", () => { + const currentTimestamp = BigInt(Date.now()); + const result = lamportTimestampIncrement(currentTimestamp); + // Should increment by 1 since now is likely not greater than current + 1 + expect(result >= currentTimestamp + 1n).to.be.true; + }); + + it("should ensure monotonic increase", () => { + let timestamp = BigInt(Date.now()) + 5000n; + const results: bigint[] = []; + + for (let i = 0; i < 5; i++) { + timestamp = lamportTimestampIncrement(timestamp); + results.push(timestamp); + } + + // Verify all timestamps are strictly increasing + for (let i = 1; i < results.length; i++) { + expect(results[i] > results[i - 1]).to.be.true; + } + }); + + it("should handle very large timestamps", () => { + const largeTimestamp = BigInt(Number.MAX_SAFE_INTEGER) * 1000n; + const result = lamportTimestampIncrement(largeTimestamp); + expect(result).to.equal(largeTimestamp + 1n); + }); + + it("should jump to current time when timestamp is far in the past", () => { + const veryOldTimestamp = 1000n; // Very old timestamp (1 second after epoch) + const result = lamportTimestampIncrement(veryOldTimestamp); + const now = BigInt(Date.now()); + expect(result >= now - 10n).to.be.true; + expect(result <= now + 10n).to.be.true; + }); +}); diff --git a/packages/sds/src/message_channel/message.spec.ts b/packages/sds/src/message_channel/message.spec.ts index 11bb9b3735..680bf5cdb5 100644 --- a/packages/sds/src/message_channel/message.spec.ts +++ b/packages/sds/src/message_channel/message.spec.ts @@ -18,7 +18,7 @@ describe("Message serialization", () => { "my-channel", "me", [], - 0, + 0n, bloomFilter.toBytes(), undefined ); @@ -42,7 +42,7 @@ describe("Message serialization", () => { "my-channel", "me", [{ messageId: depMessageId, retrievalHint: depRetrievalHint }], - 0, + 0n, undefined, undefined ); @@ -63,7 +63,7 @@ describe("ContentMessage comparison with < operator", () => { "channel", "sender", [], - 100, // Lower timestamp + 100n, // Lower timestamp undefined, new Uint8Array([1]) ); @@ -73,7 +73,7 @@ describe("ContentMessage comparison with < operator", () => { "channel", "sender", [], - 200, // Higher timestamp + 200n, // Higher timestamp undefined, new Uint8Array([2]) ); @@ -89,7 +89,7 @@ describe("ContentMessage comparison with < operator", () => { "channel", "sender", [], - 100, // Same timestamp + 100n, // Same timestamp undefined, new Uint8Array([1]) ); @@ -99,7 +99,7 @@ describe("ContentMessage comparison with < operator", () => { "channel", "sender", [], - 100, // Same timestamp + 100n, // Same timestamp undefined, new Uint8Array([2]) ); diff --git a/packages/sds/src/message_channel/message.ts b/packages/sds/src/message_channel/message.ts index 638f6c71a0..78b99f9006 100644 --- a/packages/sds/src/message_channel/message.ts +++ b/packages/sds/src/message_channel/message.ts @@ -14,7 +14,7 @@ export class Message implements proto_sds_message.SdsMessage { public channelId: string, public senderId: string, public causalHistory: proto_sds_message.HistoryEntry[], - public lamportTimestamp?: number | undefined, + public lamportTimestamp?: bigint | undefined, public bloomFilter?: Uint8Array | undefined, public content?: Uint8Array | undefined, /** @@ -94,7 +94,7 @@ export class SyncMessage extends Message { public channelId: string, public senderId: string, public causalHistory: proto_sds_message.HistoryEntry[], - public lamportTimestamp: number, + public lamportTimestamp: bigint, public bloomFilter: Uint8Array | undefined, public content: undefined, /** @@ -116,12 +116,12 @@ export class SyncMessage extends Message { } function testSyncMessage(message: { - lamportTimestamp?: number; + lamportTimestamp?: bigint; content?: Uint8Array; }): boolean { return Boolean( "lamportTimestamp" in message && - typeof message.lamportTimestamp === "number" && + typeof message.lamportTimestamp === "bigint" && (message.content === undefined || message.content.length === 0) ); } @@ -169,7 +169,7 @@ export function isEphemeralMessage( } function testEphemeralMessage(message: { - lamportTimestamp?: number; + lamportTimestamp?: bigint; content?: Uint8Array; }): boolean { return Boolean( @@ -186,7 +186,7 @@ export class ContentMessage extends Message { public channelId: string, public senderId: string, public causalHistory: proto_sds_message.HistoryEntry[], - public lamportTimestamp: number, + public lamportTimestamp: bigint, public bloomFilter: Uint8Array | undefined, public content: Uint8Array, /** @@ -226,12 +226,12 @@ export function isContentMessage( } function testContentMessage(message: { - lamportTimestamp?: number; + lamportTimestamp?: bigint; content?: Uint8Array; -}): message is { lamportTimestamp: number; content: Uint8Array } { +}): message is { lamportTimestamp: bigint; content: Uint8Array } { return Boolean( "lamportTimestamp" in message && - typeof message.lamportTimestamp === "number" && + typeof message.lamportTimestamp === "bigint" && message.content && message.content.length ); diff --git a/packages/sds/src/message_channel/message_channel.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts index 604994c0f8..6098f4c5b7 100644 --- a/packages/sds/src/message_channel/message_channel.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -75,7 +75,7 @@ describe("MessageChannel", function () { const timestampBefore = channelA["lamportTimestamp"]; await sendMessage(channelA, utf8ToBytes("message"), callback); const timestampAfter = channelA["lamportTimestamp"]; - expect(timestampAfter).to.equal(timestampBefore + 1); + expect(timestampAfter).to.equal(timestampBefore + 1n); }); it("should push the message to the outgoing buffer", async () => { @@ -95,7 +95,7 @@ describe("MessageChannel", function () { it("should insert message id into causal history", async () => { const payload = utf8ToBytes("message"); - const expectedTimestamp = channelA["lamportTimestamp"] + 1; + const expectedTimestamp = channelA["lamportTimestamp"] + 1n; const messageId = MessageChannel.getMessageId(payload); await sendMessage(channelA, payload, callback); const messageIdLog = channelA["localHistory"] as ILocalHistory; @@ -181,7 +181,7 @@ describe("MessageChannel", function () { return { success: true }; }); const timestampAfter = channelA["lamportTimestamp"]; - expect(timestampAfter).to.equal(timestampBefore + 1); + expect(timestampAfter).to.equal(timestampBefore + 1n); }); // TODO: test is failing in CI, investigate in https://github.com/waku-org/js-waku/issues/2648 @@ -201,7 +201,9 @@ describe("MessageChannel", function () { }); } const timestampAfter = testChannelA["lamportTimestamp"]; - expect(timestampAfter - timestampBefore).to.equal(messagesB.length); + expect(timestampAfter - timestampBefore).to.equal( + BigInt(messagesB.length) + ); }); // TODO: test is failing in CI, investigate in https://github.com/waku-org/js-waku/issues/2648 @@ -228,7 +230,7 @@ describe("MessageChannel", function () { const expectedLength = messagesA.length + messagesB.length; expect(channelA["lamportTimestamp"]).to.equal( - aTimestampBefore + expectedLength + aTimestampBefore + BigInt(expectedLength) ); expect(channelA["lamportTimestamp"]).to.equal( channelB["lamportTimestamp"] @@ -293,7 +295,7 @@ describe("MessageChannel", function () { channelA.channelId, "not-alice", [], - 1, + 1n, undefined, payload, testRetrievalHint @@ -335,7 +337,7 @@ describe("MessageChannel", function () { channelA.channelId, "bob", [], - startTimestamp + 3, // Higher timestamp + startTimestamp + 3n, // Higher timestamp undefined, message3Payload ) @@ -349,7 +351,7 @@ describe("MessageChannel", function () { channelA.channelId, "carol", [], - startTimestamp + 2, // Middle timestamp + startTimestamp + 2n, // Middle timestamp undefined, message2Payload ) @@ -363,7 +365,7 @@ describe("MessageChannel", function () { const first = localHistory.findIndex( ({ messageId, lamportTimestamp }) => { return ( - messageId === message1Id && lamportTimestamp === startTimestamp + 1 + messageId === message1Id && lamportTimestamp === startTimestamp + 1n ); } ); @@ -372,7 +374,7 @@ describe("MessageChannel", function () { const second = localHistory.findIndex( ({ messageId, lamportTimestamp }) => { return ( - messageId === message2Id && lamportTimestamp === startTimestamp + 2 + messageId === message2Id && lamportTimestamp === startTimestamp + 2n ); } ); @@ -381,7 +383,7 @@ describe("MessageChannel", function () { const third = localHistory.findIndex( ({ messageId, lamportTimestamp }) => { return ( - messageId === message3Id && lamportTimestamp === startTimestamp + 3 + messageId === message3Id && lamportTimestamp === startTimestamp + 3n ); } ); @@ -404,7 +406,7 @@ describe("MessageChannel", function () { channelA.channelId, "bob", [], - 5, // Same timestamp + 5n, // Same timestamp undefined, message2Payload ) @@ -417,7 +419,7 @@ describe("MessageChannel", function () { channelA.channelId, "carol", [], - 5, // Same timestamp + 5n, // Same timestamp undefined, message1Payload ) @@ -432,14 +434,14 @@ describe("MessageChannel", function () { const first = localHistory.findIndex( ({ messageId, lamportTimestamp }) => { - return messageId === expectedOrder[0] && lamportTimestamp == 5; + return messageId === expectedOrder[0] && lamportTimestamp == 5n; } ); expect(first).to.eq(0); const second = localHistory.findIndex( ({ messageId, lamportTimestamp }) => { - return messageId === expectedOrder[1] && lamportTimestamp == 5; + return messageId === expectedOrder[1] && lamportTimestamp == 5n; } ); expect(second).to.eq(1); diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index 6375e819f7..37dd1a6160 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -56,7 +56,7 @@ export type ILocalHistory = Pick< export class MessageChannel extends TypedEventEmitter { public readonly channelId: ChannelId; public readonly senderId: SenderId; - private lamportTimestamp: number; + private lamportTimestamp: bigint; private filter: DefaultBloomFilter; private outgoingBuffer: ContentMessage[]; private possibleAcks: Map; @@ -95,9 +95,8 @@ export class MessageChannel extends TypedEventEmitter { super(); this.channelId = channelId; this.senderId = senderId; - // SDS RFC says to use nanoseconds, but current time in nanosecond is > Number.MAX_SAFE_INTEGER - // So instead we are using milliseconds and proposing a spec change (TODO) - this.lamportTimestamp = Date.now(); + // Initialize channel lamport timestamp to current time in milliseconds. + this.lamportTimestamp = BigInt(Date.now()); this.filter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS); this.outgoingBuffer = []; this.possibleAcks = new Map(); @@ -369,7 +368,7 @@ export class MessageChannel extends TypedEventEmitter { public async pushOutgoingSyncMessage( callback?: (message: SyncMessage) => Promise ): Promise { - this.lamportTimestamp++; + this.lamportTimestamp = lamportTimestampIncrement(this.lamportTimestamp); const message = new SyncMessage( // does not need to be secure randomness `sync-${Math.random().toString(36).substring(2)}`, @@ -526,7 +525,7 @@ export class MessageChannel extends TypedEventEmitter { retrievalHint?: Uint8Array; }> ): Promise { - this.lamportTimestamp++; + this.lamportTimestamp = lamportTimestampIncrement(this.lamportTimestamp); const messageId = MessageChannel.getMessageId(payload); @@ -724,3 +723,12 @@ export class MessageChannel extends TypedEventEmitter { }); } } + +export function lamportTimestampIncrement(lamportTimestamp: bigint): bigint { + const now = BigInt(Date.now()); + lamportTimestamp++; + if (now > lamportTimestamp) { + return now; + } + return lamportTimestamp; +}