From 5dd9f77ce3e57db1f9a796a2266d9359cf6a9046 Mon Sep 17 00:00:00 2001 From: fryorcraken Date: Wed, 1 Oct 2025 21:15:46 +1000 Subject: [PATCH] change lamportTimestamp to uint64 in protobuf --- packages/proto/src/generated/sds_message.ts | 6 ++-- packages/proto/src/lib/sds_message.proto | 2 +- .../reliable_channel/reliable_channel.spec.ts | 8 ++--- .../sds/src/message_channel/message.spec.ts | 12 +++---- packages/sds/src/message_channel/message.ts | 18 +++++------ .../message_channel/message_channel.spec.ts | 32 ++++++++++--------- .../src/message_channel/message_channel.ts | 8 ++--- 7 files changed, 44 insertions(+), 42 deletions(-) diff --git a/packages/proto/src/generated/sds_message.ts b/packages/proto/src/generated/sds_message.ts index f91ef23cfa..eba12d4acd 100644 --- a/packages/proto/src/generated/sds_message.ts +++ b/packages/proto/src/generated/sds_message.ts @@ -84,7 +84,7 @@ export interface SdsMessage { senderId: string messageId: string channelId: string - lamportTimestamp?: number + lamportTimestamp?: bigint causalHistory: HistoryEntry[] bloomFilter?: Uint8Array content?: Uint8Array @@ -117,7 +117,7 @@ export namespace SdsMessage { if (obj.lamportTimestamp != null) { w.uint32(80) - w.uint32(obj.lamportTimestamp) + w.uint64(obj.lamportTimestamp) } if (obj.causalHistory != null) { @@ -167,7 +167,7 @@ export namespace SdsMessage { break } case 10: { - obj.lamportTimestamp = reader.uint32() + obj.lamportTimestamp = reader.uint64() break } case 11: { diff --git a/packages/proto/src/lib/sds_message.proto b/packages/proto/src/lib/sds_message.proto index 626ae12525..c38e99b084 100644 --- a/packages/proto/src/lib/sds_message.proto +++ b/packages/proto/src/lib/sds_message.proto @@ -9,7 +9,7 @@ message SdsMessage { string sender_id = 1; // Participant ID of the message sender string message_id = 2; // Unique identifier of the message string channel_id = 3; // Identifier of the channel to which the message belongs - optional uint32 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel + optional uint64 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel repeated HistoryEntry causal_history = 11; // List of preceding message IDs that this message causally depends on. Generally 2 or 3 message IDs are included. optional bytes bloom_filter = 12; // Bloom filter representing received message IDs in channel optional bytes content = 20; // Actual content of the message diff --git a/packages/sdk/src/reliable_channel/reliable_channel.spec.ts b/packages/sdk/src/reliable_channel/reliable_channel.spec.ts index 7b29a6b55d..e1928ceef8 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel.spec.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel.spec.ts @@ -419,7 +419,7 @@ describe("Reliable Channel", () => { "MyChannel", "alice", [], - 1, + 1n, undefined, message ); @@ -532,7 +532,7 @@ describe("Reliable Channel", () => { "testChannel", "testSender", [], - 1, + 1n, undefined, messagePayload ); @@ -600,7 +600,7 @@ describe("Reliable Channel", () => { "testChannel", "testSender", [], - 1, + 1n, undefined, message1Payload ); @@ -610,7 +610,7 @@ describe("Reliable Channel", () => { "testChannel", "testSender", [], - 2, + 2n, undefined, message2Payload ); diff --git a/packages/sds/src/message_channel/message.spec.ts b/packages/sds/src/message_channel/message.spec.ts index 11bb9b3735..680bf5cdb5 100644 --- a/packages/sds/src/message_channel/message.spec.ts +++ b/packages/sds/src/message_channel/message.spec.ts @@ -18,7 +18,7 @@ describe("Message serialization", () => { "my-channel", "me", [], - 0, + 0n, bloomFilter.toBytes(), undefined ); @@ -42,7 +42,7 @@ describe("Message serialization", () => { "my-channel", "me", [{ messageId: depMessageId, retrievalHint: depRetrievalHint }], - 0, + 0n, undefined, undefined ); @@ -63,7 +63,7 @@ describe("ContentMessage comparison with < operator", () => { "channel", "sender", [], - 100, // Lower timestamp + 100n, // Lower timestamp undefined, new Uint8Array([1]) ); @@ -73,7 +73,7 @@ describe("ContentMessage comparison with < operator", () => { "channel", "sender", [], - 200, // Higher timestamp + 200n, // Higher timestamp undefined, new Uint8Array([2]) ); @@ -89,7 +89,7 @@ describe("ContentMessage comparison with < operator", () => { "channel", "sender", [], - 100, // Same timestamp + 100n, // Same timestamp undefined, new Uint8Array([1]) ); @@ -99,7 +99,7 @@ describe("ContentMessage comparison with < operator", () => { "channel", "sender", [], - 100, // Same timestamp + 100n, // Same timestamp undefined, new Uint8Array([2]) ); diff --git a/packages/sds/src/message_channel/message.ts b/packages/sds/src/message_channel/message.ts index 638f6c71a0..78b99f9006 100644 --- a/packages/sds/src/message_channel/message.ts +++ b/packages/sds/src/message_channel/message.ts @@ -14,7 +14,7 @@ export class Message implements proto_sds_message.SdsMessage { public channelId: string, public senderId: string, public causalHistory: proto_sds_message.HistoryEntry[], - public lamportTimestamp?: number | undefined, + public lamportTimestamp?: bigint | undefined, public bloomFilter?: Uint8Array | undefined, public content?: Uint8Array | undefined, /** @@ -94,7 +94,7 @@ export class SyncMessage extends Message { public channelId: string, public senderId: string, public causalHistory: proto_sds_message.HistoryEntry[], - public lamportTimestamp: number, + public lamportTimestamp: bigint, public bloomFilter: Uint8Array | undefined, public content: undefined, /** @@ -116,12 +116,12 @@ export class SyncMessage extends Message { } function testSyncMessage(message: { - lamportTimestamp?: number; + lamportTimestamp?: bigint; content?: Uint8Array; }): boolean { return Boolean( "lamportTimestamp" in message && - typeof message.lamportTimestamp === "number" && + typeof message.lamportTimestamp === "bigint" && (message.content === undefined || message.content.length === 0) ); } @@ -169,7 +169,7 @@ export function isEphemeralMessage( } function testEphemeralMessage(message: { - lamportTimestamp?: number; + lamportTimestamp?: bigint; content?: Uint8Array; }): boolean { return Boolean( @@ -186,7 +186,7 @@ export class ContentMessage extends Message { public channelId: string, public senderId: string, public causalHistory: proto_sds_message.HistoryEntry[], - public lamportTimestamp: number, + public lamportTimestamp: bigint, public bloomFilter: Uint8Array | undefined, public content: Uint8Array, /** @@ -226,12 +226,12 @@ export function isContentMessage( } function testContentMessage(message: { - lamportTimestamp?: number; + lamportTimestamp?: bigint; content?: Uint8Array; -}): message is { lamportTimestamp: number; content: Uint8Array } { +}): message is { lamportTimestamp: bigint; content: Uint8Array } { return Boolean( "lamportTimestamp" in message && - typeof message.lamportTimestamp === "number" && + typeof message.lamportTimestamp === "bigint" && message.content && message.content.length ); diff --git a/packages/sds/src/message_channel/message_channel.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts index 604994c0f8..6098f4c5b7 100644 --- a/packages/sds/src/message_channel/message_channel.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -75,7 +75,7 @@ describe("MessageChannel", function () { const timestampBefore = channelA["lamportTimestamp"]; await sendMessage(channelA, utf8ToBytes("message"), callback); const timestampAfter = channelA["lamportTimestamp"]; - expect(timestampAfter).to.equal(timestampBefore + 1); + expect(timestampAfter).to.equal(timestampBefore + 1n); }); it("should push the message to the outgoing buffer", async () => { @@ -95,7 +95,7 @@ describe("MessageChannel", function () { it("should insert message id into causal history", async () => { const payload = utf8ToBytes("message"); - const expectedTimestamp = channelA["lamportTimestamp"] + 1; + const expectedTimestamp = channelA["lamportTimestamp"] + 1n; const messageId = MessageChannel.getMessageId(payload); await sendMessage(channelA, payload, callback); const messageIdLog = channelA["localHistory"] as ILocalHistory; @@ -181,7 +181,7 @@ describe("MessageChannel", function () { return { success: true }; }); const timestampAfter = channelA["lamportTimestamp"]; - expect(timestampAfter).to.equal(timestampBefore + 1); + expect(timestampAfter).to.equal(timestampBefore + 1n); }); // TODO: test is failing in CI, investigate in https://github.com/waku-org/js-waku/issues/2648 @@ -201,7 +201,9 @@ describe("MessageChannel", function () { }); } const timestampAfter = testChannelA["lamportTimestamp"]; - expect(timestampAfter - timestampBefore).to.equal(messagesB.length); + expect(timestampAfter - timestampBefore).to.equal( + BigInt(messagesB.length) + ); }); // TODO: test is failing in CI, investigate in https://github.com/waku-org/js-waku/issues/2648 @@ -228,7 +230,7 @@ describe("MessageChannel", function () { const expectedLength = messagesA.length + messagesB.length; expect(channelA["lamportTimestamp"]).to.equal( - aTimestampBefore + expectedLength + aTimestampBefore + BigInt(expectedLength) ); expect(channelA["lamportTimestamp"]).to.equal( channelB["lamportTimestamp"] @@ -293,7 +295,7 @@ describe("MessageChannel", function () { channelA.channelId, "not-alice", [], - 1, + 1n, undefined, payload, testRetrievalHint @@ -335,7 +337,7 @@ describe("MessageChannel", function () { channelA.channelId, "bob", [], - startTimestamp + 3, // Higher timestamp + startTimestamp + 3n, // Higher timestamp undefined, message3Payload ) @@ -349,7 +351,7 @@ describe("MessageChannel", function () { channelA.channelId, "carol", [], - startTimestamp + 2, // Middle timestamp + startTimestamp + 2n, // Middle timestamp undefined, message2Payload ) @@ -363,7 +365,7 @@ describe("MessageChannel", function () { const first = localHistory.findIndex( ({ messageId, lamportTimestamp }) => { return ( - messageId === message1Id && lamportTimestamp === startTimestamp + 1 + messageId === message1Id && lamportTimestamp === startTimestamp + 1n ); } ); @@ -372,7 +374,7 @@ describe("MessageChannel", function () { const second = localHistory.findIndex( ({ messageId, lamportTimestamp }) => { return ( - messageId === message2Id && lamportTimestamp === startTimestamp + 2 + messageId === message2Id && lamportTimestamp === startTimestamp + 2n ); } ); @@ -381,7 +383,7 @@ describe("MessageChannel", function () { const third = localHistory.findIndex( ({ messageId, lamportTimestamp }) => { return ( - messageId === message3Id && lamportTimestamp === startTimestamp + 3 + messageId === message3Id && lamportTimestamp === startTimestamp + 3n ); } ); @@ -404,7 +406,7 @@ describe("MessageChannel", function () { channelA.channelId, "bob", [], - 5, // Same timestamp + 5n, // Same timestamp undefined, message2Payload ) @@ -417,7 +419,7 @@ describe("MessageChannel", function () { channelA.channelId, "carol", [], - 5, // Same timestamp + 5n, // Same timestamp undefined, message1Payload ) @@ -432,14 +434,14 @@ describe("MessageChannel", function () { const first = localHistory.findIndex( ({ messageId, lamportTimestamp }) => { - return messageId === expectedOrder[0] && lamportTimestamp == 5; + return messageId === expectedOrder[0] && lamportTimestamp == 5n; } ); expect(first).to.eq(0); const second = localHistory.findIndex( ({ messageId, lamportTimestamp }) => { - return messageId === expectedOrder[1] && lamportTimestamp == 5; + return messageId === expectedOrder[1] && lamportTimestamp == 5n; } ); expect(second).to.eq(1); diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index 9b1ab0e885..a416d53198 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -56,7 +56,7 @@ export type ILocalHistory = Pick< export class MessageChannel extends TypedEventEmitter { public readonly channelId: ChannelId; public readonly senderId: SenderId; - private lamportTimestamp: number; + private lamportTimestamp: bigint; private filter: DefaultBloomFilter; private outgoingBuffer: ContentMessage[]; private possibleAcks: Map; @@ -96,7 +96,7 @@ export class MessageChannel extends TypedEventEmitter { this.channelId = channelId; this.senderId = senderId; // Initialize channel lamport timestamp to current time in seconds. - this.lamportTimestamp = Date.now() / 1000; + this.lamportTimestamp = BigInt(Math.floor(Date.now() / 1000)); this.filter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS); this.outgoingBuffer = []; this.possibleAcks = new Map(); @@ -368,7 +368,7 @@ export class MessageChannel extends TypedEventEmitter { public async pushOutgoingSyncMessage( callback?: (message: SyncMessage) => Promise ): Promise { - this.lamportTimestamp++; + this.lamportTimestamp = this.lamportTimestamp + 1n; const message = new SyncMessage( // does not need to be secure randomness `sync-${Math.random().toString(36).substring(2)}`, @@ -525,7 +525,7 @@ export class MessageChannel extends TypedEventEmitter { retrievalHint?: Uint8Array; }> ): Promise { - this.lamportTimestamp++; + this.lamportTimestamp = this.lamportTimestamp + 1n; const messageId = MessageChannel.getMessageId(payload);