fix!: SDS lamport timestamp overflow and keep it to current time (#2664)

* fix!: avoid SDS lamport timestamp overflow

The SDS timestamp is initialized to the current time in milliseconds, which is a 13 digits value (e.g. 1,759,223,090,052).

The maximum value for int32 is 2,147,483,647 (10 digits), which is clearly less than the timestamp.
Maximum value for uint32 is 4,294,967,295 (10 digits), which does not help with ms timestamp.

uint64 is BigInt in JavaScript, so best to be avoided unless strictly necessary as it creates complexity.
max uint64 is 18,446,744,073,709,551,615 (20 digits).

Using seconds instead of milliseconds would enable usage of uint32 valid until the year 2106.

The lamport timestamp is only initialized to current time for a new channel. The only scenario is when a user comes in a channel, and thinks it's new (did not get previous messages), and then starts sending messages. Meaning that there may be an initial timestamp conflict until the logs are consolidated, which is already handled by the protocol.

* change lamportTimestamp to uint64 in protobuf

* lamport timestamp remains close to current time
This commit is contained in:
fryorcraken 2025-10-02 09:07:10 +10:00 committed by GitHub
parent 593bc45225
commit c0ecb6abba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 123 additions and 57 deletions

View File

@ -84,7 +84,7 @@ export interface SdsMessage {
senderId: string senderId: string
messageId: string messageId: string
channelId: string channelId: string
lamportTimestamp?: number lamportTimestamp?: bigint
causalHistory: HistoryEntry[] causalHistory: HistoryEntry[]
bloomFilter?: Uint8Array bloomFilter?: Uint8Array
content?: Uint8Array content?: Uint8Array
@ -117,7 +117,7 @@ export namespace SdsMessage {
if (obj.lamportTimestamp != null) { if (obj.lamportTimestamp != null) {
w.uint32(80) w.uint32(80)
w.int32(obj.lamportTimestamp) w.uint64(obj.lamportTimestamp)
} }
if (obj.causalHistory != null) { if (obj.causalHistory != null) {
@ -167,7 +167,7 @@ export namespace SdsMessage {
break break
} }
case 10: { case 10: {
obj.lamportTimestamp = reader.int32() obj.lamportTimestamp = reader.uint64()
break break
} }
case 11: { case 11: {

View File

@ -9,7 +9,7 @@ message SdsMessage {
string sender_id = 1; // Participant ID of the message sender string sender_id = 1; // Participant ID of the message sender
string message_id = 2; // Unique identifier of the message string message_id = 2; // Unique identifier of the message
string channel_id = 3; // Identifier of the channel to which the message belongs string channel_id = 3; // Identifier of the channel to which the message belongs
optional int32 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel optional uint64 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel
repeated HistoryEntry causal_history = 11; // List of preceding message IDs that this message causally depends on. Generally 2 or 3 message IDs are included. repeated HistoryEntry causal_history = 11; // List of preceding message IDs that this message causally depends on. Generally 2 or 3 message IDs are included.
optional bytes bloom_filter = 12; // Bloom filter representing received message IDs in channel optional bytes bloom_filter = 12; // Bloom filter representing received message IDs in channel
optional bytes content = 20; // Actual content of the message optional bytes content = 20; // Actual content of the message

View File

@ -419,7 +419,7 @@ describe("Reliable Channel", () => {
"MyChannel", "MyChannel",
"alice", "alice",
[], [],
1, 1n,
undefined, undefined,
message message
); );
@ -532,7 +532,7 @@ describe("Reliable Channel", () => {
"testChannel", "testChannel",
"testSender", "testSender",
[], [],
1, 1n,
undefined, undefined,
messagePayload messagePayload
); );
@ -600,7 +600,7 @@ describe("Reliable Channel", () => {
"testChannel", "testChannel",
"testSender", "testSender",
[], [],
1, 1n,
undefined, undefined,
message1Payload message1Payload
); );
@ -610,7 +610,7 @@ describe("Reliable Channel", () => {
"testChannel", "testChannel",
"testSender", "testSender",
[], [],
2, 2n,
undefined, undefined,
message2Payload message2Payload
); );
@ -721,7 +721,7 @@ describe("Reliable Channel", () => {
"differentChannel", "differentChannel",
senderId, senderId,
[], [],
1, 1n,
undefined, undefined,
utf8ToBytes("different channel") utf8ToBytes("different channel")
); );
@ -731,7 +731,7 @@ describe("Reliable Channel", () => {
channelId, channelId,
senderId, senderId,
[], [],
2, 2n,
undefined, undefined,
undefined undefined
); );
@ -741,7 +741,7 @@ describe("Reliable Channel", () => {
channelId, channelId,
senderId, senderId,
[], [],
3, 3n,
undefined, undefined,
utf8ToBytes("after sync") utf8ToBytes("after sync")
); );
@ -824,7 +824,7 @@ describe("Reliable Channel", () => {
channelId, channelId,
senderId, senderId,
[{ messageId: "previous-msg-id" }], [{ messageId: "previous-msg-id" }],
1, 1n,
undefined, undefined,
utf8ToBytes("content message") utf8ToBytes("content message")
); );
@ -834,7 +834,7 @@ describe("Reliable Channel", () => {
channelId, channelId,
senderId, senderId,
[], [],
2, 2n,
undefined, undefined,
utf8ToBytes("after content") utf8ToBytes("after content")
); );
@ -905,7 +905,7 @@ describe("Reliable Channel", () => {
"differentChannel1", "differentChannel1",
senderId, senderId,
[], [],
1, 1n,
undefined, undefined,
utf8ToBytes("different 1") utf8ToBytes("different 1")
); );
@ -915,7 +915,7 @@ describe("Reliable Channel", () => {
"differentChannel2", "differentChannel2",
senderId, senderId,
[], [],
2, 2n,
undefined, undefined,
utf8ToBytes("different 2") utf8ToBytes("different 2")
); );
@ -925,7 +925,7 @@ describe("Reliable Channel", () => {
"differentChannel3", "differentChannel3",
senderId, senderId,
[], [],
3, 3n,
undefined, undefined,
utf8ToBytes("different 3") utf8ToBytes("different 3")
); );
@ -1041,7 +1041,7 @@ describe("Reliable Channel", () => {
"differentChannel", "differentChannel",
"sender", "sender",
[], [],
1, 1n,
undefined, undefined,
utf8ToBytes("content") utf8ToBytes("content")
); );
@ -1060,7 +1060,7 @@ describe("Reliable Channel", () => {
"testChannel", "testChannel",
"sender", "sender",
[], [],
1, 1n,
undefined, undefined,
undefined undefined
); );
@ -1079,7 +1079,7 @@ describe("Reliable Channel", () => {
"testChannel", "testChannel",
"sender", "sender",
[], [],
1, 1n,
undefined, undefined,
utf8ToBytes("content") utf8ToBytes("content")
); );
@ -1098,7 +1098,7 @@ describe("Reliable Channel", () => {
"testChannel", "testChannel",
"sender", "sender",
[{ messageId: "previous-msg-id" }], [{ messageId: "previous-msg-id" }],
1, 1n,
undefined, undefined,
utf8ToBytes("content") utf8ToBytes("content")
); );
@ -1117,7 +1117,7 @@ describe("Reliable Channel", () => {
"testChannel", "testChannel",
"sender", "sender",
[{ messageId: "previous-msg-id" }], [{ messageId: "previous-msg-id" }],
1, 1n,
undefined, undefined,
undefined undefined
); );

View File

@ -0,0 +1,56 @@
import { expect } from "chai";
import { lamportTimestampIncrement } from "./message_channel.js";
describe("lamportTimestampIncrement", () => {
it("should increment timestamp by 1 when current time is not greater", () => {
const futureTimestamp = BigInt(Date.now()) + 1000n;
const result = lamportTimestampIncrement(futureTimestamp);
expect(result).to.equal(futureTimestamp + 1n);
});
it("should use current time when it's greater than incremented timestamp", () => {
const pastTimestamp = BigInt(Date.now()) - 1000n;
const result = lamportTimestampIncrement(pastTimestamp);
const now = BigInt(Date.now());
// Result should be at least as large as now (within small tolerance for test execution time)
expect(result >= now - 10n).to.be.true;
expect(result <= now + 10n).to.be.true;
});
it("should handle timestamp equal to current time", () => {
const currentTimestamp = BigInt(Date.now());
const result = lamportTimestampIncrement(currentTimestamp);
// Should increment by 1 since now is likely not greater than current + 1
expect(result >= currentTimestamp + 1n).to.be.true;
});
it("should ensure monotonic increase", () => {
let timestamp = BigInt(Date.now()) + 5000n;
const results: bigint[] = [];
for (let i = 0; i < 5; i++) {
timestamp = lamportTimestampIncrement(timestamp);
results.push(timestamp);
}
// Verify all timestamps are strictly increasing
for (let i = 1; i < results.length; i++) {
expect(results[i] > results[i - 1]).to.be.true;
}
});
it("should handle very large timestamps", () => {
const largeTimestamp = BigInt(Number.MAX_SAFE_INTEGER) * 1000n;
const result = lamportTimestampIncrement(largeTimestamp);
expect(result).to.equal(largeTimestamp + 1n);
});
it("should jump to current time when timestamp is far in the past", () => {
const veryOldTimestamp = 1000n; // Very old timestamp (1 second after epoch)
const result = lamportTimestampIncrement(veryOldTimestamp);
const now = BigInt(Date.now());
expect(result >= now - 10n).to.be.true;
expect(result <= now + 10n).to.be.true;
});
});

View File

@ -18,7 +18,7 @@ describe("Message serialization", () => {
"my-channel", "my-channel",
"me", "me",
[], [],
0, 0n,
bloomFilter.toBytes(), bloomFilter.toBytes(),
undefined undefined
); );
@ -42,7 +42,7 @@ describe("Message serialization", () => {
"my-channel", "my-channel",
"me", "me",
[{ messageId: depMessageId, retrievalHint: depRetrievalHint }], [{ messageId: depMessageId, retrievalHint: depRetrievalHint }],
0, 0n,
undefined, undefined,
undefined undefined
); );
@ -63,7 +63,7 @@ describe("ContentMessage comparison with < operator", () => {
"channel", "channel",
"sender", "sender",
[], [],
100, // Lower timestamp 100n, // Lower timestamp
undefined, undefined,
new Uint8Array([1]) new Uint8Array([1])
); );
@ -73,7 +73,7 @@ describe("ContentMessage comparison with < operator", () => {
"channel", "channel",
"sender", "sender",
[], [],
200, // Higher timestamp 200n, // Higher timestamp
undefined, undefined,
new Uint8Array([2]) new Uint8Array([2])
); );
@ -89,7 +89,7 @@ describe("ContentMessage comparison with < operator", () => {
"channel", "channel",
"sender", "sender",
[], [],
100, // Same timestamp 100n, // Same timestamp
undefined, undefined,
new Uint8Array([1]) new Uint8Array([1])
); );
@ -99,7 +99,7 @@ describe("ContentMessage comparison with < operator", () => {
"channel", "channel",
"sender", "sender",
[], [],
100, // Same timestamp 100n, // Same timestamp
undefined, undefined,
new Uint8Array([2]) new Uint8Array([2])
); );

View File

@ -14,7 +14,7 @@ export class Message implements proto_sds_message.SdsMessage {
public channelId: string, public channelId: string,
public senderId: string, public senderId: string,
public causalHistory: proto_sds_message.HistoryEntry[], public causalHistory: proto_sds_message.HistoryEntry[],
public lamportTimestamp?: number | undefined, public lamportTimestamp?: bigint | undefined,
public bloomFilter?: Uint8Array<ArrayBufferLike> | undefined, public bloomFilter?: Uint8Array<ArrayBufferLike> | undefined,
public content?: Uint8Array<ArrayBufferLike> | undefined, public content?: Uint8Array<ArrayBufferLike> | undefined,
/** /**
@ -94,7 +94,7 @@ export class SyncMessage extends Message {
public channelId: string, public channelId: string,
public senderId: string, public senderId: string,
public causalHistory: proto_sds_message.HistoryEntry[], public causalHistory: proto_sds_message.HistoryEntry[],
public lamportTimestamp: number, public lamportTimestamp: bigint,
public bloomFilter: Uint8Array<ArrayBufferLike> | undefined, public bloomFilter: Uint8Array<ArrayBufferLike> | undefined,
public content: undefined, public content: undefined,
/** /**
@ -116,12 +116,12 @@ export class SyncMessage extends Message {
} }
function testSyncMessage(message: { function testSyncMessage(message: {
lamportTimestamp?: number; lamportTimestamp?: bigint;
content?: Uint8Array; content?: Uint8Array;
}): boolean { }): boolean {
return Boolean( return Boolean(
"lamportTimestamp" in message && "lamportTimestamp" in message &&
typeof message.lamportTimestamp === "number" && typeof message.lamportTimestamp === "bigint" &&
(message.content === undefined || message.content.length === 0) (message.content === undefined || message.content.length === 0)
); );
} }
@ -169,7 +169,7 @@ export function isEphemeralMessage(
} }
function testEphemeralMessage(message: { function testEphemeralMessage(message: {
lamportTimestamp?: number; lamportTimestamp?: bigint;
content?: Uint8Array; content?: Uint8Array;
}): boolean { }): boolean {
return Boolean( return Boolean(
@ -186,7 +186,7 @@ export class ContentMessage extends Message {
public channelId: string, public channelId: string,
public senderId: string, public senderId: string,
public causalHistory: proto_sds_message.HistoryEntry[], public causalHistory: proto_sds_message.HistoryEntry[],
public lamportTimestamp: number, public lamportTimestamp: bigint,
public bloomFilter: Uint8Array<ArrayBufferLike> | undefined, public bloomFilter: Uint8Array<ArrayBufferLike> | undefined,
public content: Uint8Array<ArrayBufferLike>, public content: Uint8Array<ArrayBufferLike>,
/** /**
@ -226,12 +226,12 @@ export function isContentMessage(
} }
function testContentMessage(message: { function testContentMessage(message: {
lamportTimestamp?: number; lamportTimestamp?: bigint;
content?: Uint8Array; content?: Uint8Array;
}): message is { lamportTimestamp: number; content: Uint8Array } { }): message is { lamportTimestamp: bigint; content: Uint8Array } {
return Boolean( return Boolean(
"lamportTimestamp" in message && "lamportTimestamp" in message &&
typeof message.lamportTimestamp === "number" && typeof message.lamportTimestamp === "bigint" &&
message.content && message.content &&
message.content.length message.content.length
); );

View File

@ -75,7 +75,7 @@ describe("MessageChannel", function () {
const timestampBefore = channelA["lamportTimestamp"]; const timestampBefore = channelA["lamportTimestamp"];
await sendMessage(channelA, utf8ToBytes("message"), callback); await sendMessage(channelA, utf8ToBytes("message"), callback);
const timestampAfter = channelA["lamportTimestamp"]; const timestampAfter = channelA["lamportTimestamp"];
expect(timestampAfter).to.equal(timestampBefore + 1); expect(timestampAfter).to.equal(timestampBefore + 1n);
}); });
it("should push the message to the outgoing buffer", async () => { it("should push the message to the outgoing buffer", async () => {
@ -95,7 +95,7 @@ describe("MessageChannel", function () {
it("should insert message id into causal history", async () => { it("should insert message id into causal history", async () => {
const payload = utf8ToBytes("message"); const payload = utf8ToBytes("message");
const expectedTimestamp = channelA["lamportTimestamp"] + 1; const expectedTimestamp = channelA["lamportTimestamp"] + 1n;
const messageId = MessageChannel.getMessageId(payload); const messageId = MessageChannel.getMessageId(payload);
await sendMessage(channelA, payload, callback); await sendMessage(channelA, payload, callback);
const messageIdLog = channelA["localHistory"] as ILocalHistory; const messageIdLog = channelA["localHistory"] as ILocalHistory;
@ -181,7 +181,7 @@ describe("MessageChannel", function () {
return { success: true }; return { success: true };
}); });
const timestampAfter = channelA["lamportTimestamp"]; const timestampAfter = channelA["lamportTimestamp"];
expect(timestampAfter).to.equal(timestampBefore + 1); expect(timestampAfter).to.equal(timestampBefore + 1n);
}); });
// TODO: test is failing in CI, investigate in https://github.com/waku-org/js-waku/issues/2648 // TODO: test is failing in CI, investigate in https://github.com/waku-org/js-waku/issues/2648
@ -201,7 +201,9 @@ describe("MessageChannel", function () {
}); });
} }
const timestampAfter = testChannelA["lamportTimestamp"]; const timestampAfter = testChannelA["lamportTimestamp"];
expect(timestampAfter - timestampBefore).to.equal(messagesB.length); expect(timestampAfter - timestampBefore).to.equal(
BigInt(messagesB.length)
);
}); });
// TODO: test is failing in CI, investigate in https://github.com/waku-org/js-waku/issues/2648 // TODO: test is failing in CI, investigate in https://github.com/waku-org/js-waku/issues/2648
@ -228,7 +230,7 @@ describe("MessageChannel", function () {
const expectedLength = messagesA.length + messagesB.length; const expectedLength = messagesA.length + messagesB.length;
expect(channelA["lamportTimestamp"]).to.equal( expect(channelA["lamportTimestamp"]).to.equal(
aTimestampBefore + expectedLength aTimestampBefore + BigInt(expectedLength)
); );
expect(channelA["lamportTimestamp"]).to.equal( expect(channelA["lamportTimestamp"]).to.equal(
channelB["lamportTimestamp"] channelB["lamportTimestamp"]
@ -293,7 +295,7 @@ describe("MessageChannel", function () {
channelA.channelId, channelA.channelId,
"not-alice", "not-alice",
[], [],
1, 1n,
undefined, undefined,
payload, payload,
testRetrievalHint testRetrievalHint
@ -335,7 +337,7 @@ describe("MessageChannel", function () {
channelA.channelId, channelA.channelId,
"bob", "bob",
[], [],
startTimestamp + 3, // Higher timestamp startTimestamp + 3n, // Higher timestamp
undefined, undefined,
message3Payload message3Payload
) )
@ -349,7 +351,7 @@ describe("MessageChannel", function () {
channelA.channelId, channelA.channelId,
"carol", "carol",
[], [],
startTimestamp + 2, // Middle timestamp startTimestamp + 2n, // Middle timestamp
undefined, undefined,
message2Payload message2Payload
) )
@ -363,7 +365,7 @@ describe("MessageChannel", function () {
const first = localHistory.findIndex( const first = localHistory.findIndex(
({ messageId, lamportTimestamp }) => { ({ messageId, lamportTimestamp }) => {
return ( return (
messageId === message1Id && lamportTimestamp === startTimestamp + 1 messageId === message1Id && lamportTimestamp === startTimestamp + 1n
); );
} }
); );
@ -372,7 +374,7 @@ describe("MessageChannel", function () {
const second = localHistory.findIndex( const second = localHistory.findIndex(
({ messageId, lamportTimestamp }) => { ({ messageId, lamportTimestamp }) => {
return ( return (
messageId === message2Id && lamportTimestamp === startTimestamp + 2 messageId === message2Id && lamportTimestamp === startTimestamp + 2n
); );
} }
); );
@ -381,7 +383,7 @@ describe("MessageChannel", function () {
const third = localHistory.findIndex( const third = localHistory.findIndex(
({ messageId, lamportTimestamp }) => { ({ messageId, lamportTimestamp }) => {
return ( return (
messageId === message3Id && lamportTimestamp === startTimestamp + 3 messageId === message3Id && lamportTimestamp === startTimestamp + 3n
); );
} }
); );
@ -404,7 +406,7 @@ describe("MessageChannel", function () {
channelA.channelId, channelA.channelId,
"bob", "bob",
[], [],
5, // Same timestamp 5n, // Same timestamp
undefined, undefined,
message2Payload message2Payload
) )
@ -417,7 +419,7 @@ describe("MessageChannel", function () {
channelA.channelId, channelA.channelId,
"carol", "carol",
[], [],
5, // Same timestamp 5n, // Same timestamp
undefined, undefined,
message1Payload message1Payload
) )
@ -432,14 +434,14 @@ describe("MessageChannel", function () {
const first = localHistory.findIndex( const first = localHistory.findIndex(
({ messageId, lamportTimestamp }) => { ({ messageId, lamportTimestamp }) => {
return messageId === expectedOrder[0] && lamportTimestamp == 5; return messageId === expectedOrder[0] && lamportTimestamp == 5n;
} }
); );
expect(first).to.eq(0); expect(first).to.eq(0);
const second = localHistory.findIndex( const second = localHistory.findIndex(
({ messageId, lamportTimestamp }) => { ({ messageId, lamportTimestamp }) => {
return messageId === expectedOrder[1] && lamportTimestamp == 5; return messageId === expectedOrder[1] && lamportTimestamp == 5n;
} }
); );
expect(second).to.eq(1); expect(second).to.eq(1);

View File

@ -56,7 +56,7 @@ export type ILocalHistory = Pick<
export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> { export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
public readonly channelId: ChannelId; public readonly channelId: ChannelId;
public readonly senderId: SenderId; public readonly senderId: SenderId;
private lamportTimestamp: number; private lamportTimestamp: bigint;
private filter: DefaultBloomFilter; private filter: DefaultBloomFilter;
private outgoingBuffer: ContentMessage[]; private outgoingBuffer: ContentMessage[];
private possibleAcks: Map<MessageId, number>; private possibleAcks: Map<MessageId, number>;
@ -95,9 +95,8 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
super(); super();
this.channelId = channelId; this.channelId = channelId;
this.senderId = senderId; this.senderId = senderId;
// SDS RFC says to use nanoseconds, but current time in nanosecond is > Number.MAX_SAFE_INTEGER // Initialize channel lamport timestamp to current time in milliseconds.
// So instead we are using milliseconds and proposing a spec change (TODO) this.lamportTimestamp = BigInt(Date.now());
this.lamportTimestamp = Date.now();
this.filter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS); this.filter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS);
this.outgoingBuffer = []; this.outgoingBuffer = [];
this.possibleAcks = new Map(); this.possibleAcks = new Map();
@ -369,7 +368,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
public async pushOutgoingSyncMessage( public async pushOutgoingSyncMessage(
callback?: (message: SyncMessage) => Promise<boolean> callback?: (message: SyncMessage) => Promise<boolean>
): Promise<boolean> { ): Promise<boolean> {
this.lamportTimestamp++; this.lamportTimestamp = lamportTimestampIncrement(this.lamportTimestamp);
const message = new SyncMessage( const message = new SyncMessage(
// does not need to be secure randomness // does not need to be secure randomness
`sync-${Math.random().toString(36).substring(2)}`, `sync-${Math.random().toString(36).substring(2)}`,
@ -526,7 +525,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
retrievalHint?: Uint8Array; retrievalHint?: Uint8Array;
}> }>
): Promise<void> { ): Promise<void> {
this.lamportTimestamp++; this.lamportTimestamp = lamportTimestampIncrement(this.lamportTimestamp);
const messageId = MessageChannel.getMessageId(payload); const messageId = MessageChannel.getMessageId(payload);
@ -724,3 +723,12 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
}); });
} }
} }
export function lamportTimestampIncrement(lamportTimestamp: bigint): bigint {
const now = BigInt(Date.now());
lamportTimestamp++;
if (now > lamportTimestamp) {
return now;
}
return lamportTimestamp;
}