change lamportTimestamp to uint64 in protobuf

This commit is contained in:
fryorcraken 2025-10-01 21:15:46 +10:00
parent c25278d7e6
commit 5dd9f77ce3
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
7 changed files with 44 additions and 42 deletions

View File

@ -84,7 +84,7 @@ export interface SdsMessage {
senderId: string
messageId: string
channelId: string
lamportTimestamp?: number
lamportTimestamp?: bigint
causalHistory: HistoryEntry[]
bloomFilter?: Uint8Array
content?: Uint8Array
@ -117,7 +117,7 @@ export namespace SdsMessage {
if (obj.lamportTimestamp != null) {
w.uint32(80)
w.uint32(obj.lamportTimestamp)
w.uint64(obj.lamportTimestamp)
}
if (obj.causalHistory != null) {
@ -167,7 +167,7 @@ export namespace SdsMessage {
break
}
case 10: {
obj.lamportTimestamp = reader.uint32()
obj.lamportTimestamp = reader.uint64()
break
}
case 11: {

View File

@ -9,7 +9,7 @@ message SdsMessage {
string sender_id = 1; // Participant ID of the message sender
string message_id = 2; // Unique identifier of the message
string channel_id = 3; // Identifier of the channel to which the message belongs
optional uint32 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel
optional uint64 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel
repeated HistoryEntry causal_history = 11; // List of preceding message IDs that this message causally depends on. Generally 2 or 3 message IDs are included.
optional bytes bloom_filter = 12; // Bloom filter representing received message IDs in channel
optional bytes content = 20; // Actual content of the message

View File

@ -419,7 +419,7 @@ describe("Reliable Channel", () => {
"MyChannel",
"alice",
[],
1,
1n,
undefined,
message
);
@ -532,7 +532,7 @@ describe("Reliable Channel", () => {
"testChannel",
"testSender",
[],
1,
1n,
undefined,
messagePayload
);
@ -600,7 +600,7 @@ describe("Reliable Channel", () => {
"testChannel",
"testSender",
[],
1,
1n,
undefined,
message1Payload
);
@ -610,7 +610,7 @@ describe("Reliable Channel", () => {
"testChannel",
"testSender",
[],
2,
2n,
undefined,
message2Payload
);

View File

@ -18,7 +18,7 @@ describe("Message serialization", () => {
"my-channel",
"me",
[],
0,
0n,
bloomFilter.toBytes(),
undefined
);
@ -42,7 +42,7 @@ describe("Message serialization", () => {
"my-channel",
"me",
[{ messageId: depMessageId, retrievalHint: depRetrievalHint }],
0,
0n,
undefined,
undefined
);
@ -63,7 +63,7 @@ describe("ContentMessage comparison with < operator", () => {
"channel",
"sender",
[],
100, // Lower timestamp
100n, // Lower timestamp
undefined,
new Uint8Array([1])
);
@ -73,7 +73,7 @@ describe("ContentMessage comparison with < operator", () => {
"channel",
"sender",
[],
200, // Higher timestamp
200n, // Higher timestamp
undefined,
new Uint8Array([2])
);
@ -89,7 +89,7 @@ describe("ContentMessage comparison with < operator", () => {
"channel",
"sender",
[],
100, // Same timestamp
100n, // Same timestamp
undefined,
new Uint8Array([1])
);
@ -99,7 +99,7 @@ describe("ContentMessage comparison with < operator", () => {
"channel",
"sender",
[],
100, // Same timestamp
100n, // Same timestamp
undefined,
new Uint8Array([2])
);

View File

@ -14,7 +14,7 @@ export class Message implements proto_sds_message.SdsMessage {
public channelId: string,
public senderId: string,
public causalHistory: proto_sds_message.HistoryEntry[],
public lamportTimestamp?: number | undefined,
public lamportTimestamp?: bigint | undefined,
public bloomFilter?: Uint8Array<ArrayBufferLike> | undefined,
public content?: Uint8Array<ArrayBufferLike> | undefined,
/**
@ -94,7 +94,7 @@ export class SyncMessage extends Message {
public channelId: string,
public senderId: string,
public causalHistory: proto_sds_message.HistoryEntry[],
public lamportTimestamp: number,
public lamportTimestamp: bigint,
public bloomFilter: Uint8Array<ArrayBufferLike> | undefined,
public content: undefined,
/**
@ -116,12 +116,12 @@ export class SyncMessage extends Message {
}
function testSyncMessage(message: {
lamportTimestamp?: number;
lamportTimestamp?: bigint;
content?: Uint8Array;
}): boolean {
return Boolean(
"lamportTimestamp" in message &&
typeof message.lamportTimestamp === "number" &&
typeof message.lamportTimestamp === "bigint" &&
(message.content === undefined || message.content.length === 0)
);
}
@ -169,7 +169,7 @@ export function isEphemeralMessage(
}
function testEphemeralMessage(message: {
lamportTimestamp?: number;
lamportTimestamp?: bigint;
content?: Uint8Array;
}): boolean {
return Boolean(
@ -186,7 +186,7 @@ export class ContentMessage extends Message {
public channelId: string,
public senderId: string,
public causalHistory: proto_sds_message.HistoryEntry[],
public lamportTimestamp: number,
public lamportTimestamp: bigint,
public bloomFilter: Uint8Array<ArrayBufferLike> | undefined,
public content: Uint8Array<ArrayBufferLike>,
/**
@ -226,12 +226,12 @@ export function isContentMessage(
}
function testContentMessage(message: {
lamportTimestamp?: number;
lamportTimestamp?: bigint;
content?: Uint8Array;
}): message is { lamportTimestamp: number; content: Uint8Array } {
}): message is { lamportTimestamp: bigint; content: Uint8Array } {
return Boolean(
"lamportTimestamp" in message &&
typeof message.lamportTimestamp === "number" &&
typeof message.lamportTimestamp === "bigint" &&
message.content &&
message.content.length
);

View File

@ -75,7 +75,7 @@ describe("MessageChannel", function () {
const timestampBefore = channelA["lamportTimestamp"];
await sendMessage(channelA, utf8ToBytes("message"), callback);
const timestampAfter = channelA["lamportTimestamp"];
expect(timestampAfter).to.equal(timestampBefore + 1);
expect(timestampAfter).to.equal(timestampBefore + 1n);
});
it("should push the message to the outgoing buffer", async () => {
@ -95,7 +95,7 @@ describe("MessageChannel", function () {
it("should insert message id into causal history", async () => {
const payload = utf8ToBytes("message");
const expectedTimestamp = channelA["lamportTimestamp"] + 1;
const expectedTimestamp = channelA["lamportTimestamp"] + 1n;
const messageId = MessageChannel.getMessageId(payload);
await sendMessage(channelA, payload, callback);
const messageIdLog = channelA["localHistory"] as ILocalHistory;
@ -181,7 +181,7 @@ describe("MessageChannel", function () {
return { success: true };
});
const timestampAfter = channelA["lamportTimestamp"];
expect(timestampAfter).to.equal(timestampBefore + 1);
expect(timestampAfter).to.equal(timestampBefore + 1n);
});
// TODO: test is failing in CI, investigate in https://github.com/waku-org/js-waku/issues/2648
@ -201,7 +201,9 @@ describe("MessageChannel", function () {
});
}
const timestampAfter = testChannelA["lamportTimestamp"];
expect(timestampAfter - timestampBefore).to.equal(messagesB.length);
expect(timestampAfter - timestampBefore).to.equal(
BigInt(messagesB.length)
);
});
// TODO: test is failing in CI, investigate in https://github.com/waku-org/js-waku/issues/2648
@ -228,7 +230,7 @@ describe("MessageChannel", function () {
const expectedLength = messagesA.length + messagesB.length;
expect(channelA["lamportTimestamp"]).to.equal(
aTimestampBefore + expectedLength
aTimestampBefore + BigInt(expectedLength)
);
expect(channelA["lamportTimestamp"]).to.equal(
channelB["lamportTimestamp"]
@ -293,7 +295,7 @@ describe("MessageChannel", function () {
channelA.channelId,
"not-alice",
[],
1,
1n,
undefined,
payload,
testRetrievalHint
@ -335,7 +337,7 @@ describe("MessageChannel", function () {
channelA.channelId,
"bob",
[],
startTimestamp + 3, // Higher timestamp
startTimestamp + 3n, // Higher timestamp
undefined,
message3Payload
)
@ -349,7 +351,7 @@ describe("MessageChannel", function () {
channelA.channelId,
"carol",
[],
startTimestamp + 2, // Middle timestamp
startTimestamp + 2n, // Middle timestamp
undefined,
message2Payload
)
@ -363,7 +365,7 @@ describe("MessageChannel", function () {
const first = localHistory.findIndex(
({ messageId, lamportTimestamp }) => {
return (
messageId === message1Id && lamportTimestamp === startTimestamp + 1
messageId === message1Id && lamportTimestamp === startTimestamp + 1n
);
}
);
@ -372,7 +374,7 @@ describe("MessageChannel", function () {
const second = localHistory.findIndex(
({ messageId, lamportTimestamp }) => {
return (
messageId === message2Id && lamportTimestamp === startTimestamp + 2
messageId === message2Id && lamportTimestamp === startTimestamp + 2n
);
}
);
@ -381,7 +383,7 @@ describe("MessageChannel", function () {
const third = localHistory.findIndex(
({ messageId, lamportTimestamp }) => {
return (
messageId === message3Id && lamportTimestamp === startTimestamp + 3
messageId === message3Id && lamportTimestamp === startTimestamp + 3n
);
}
);
@ -404,7 +406,7 @@ describe("MessageChannel", function () {
channelA.channelId,
"bob",
[],
5, // Same timestamp
5n, // Same timestamp
undefined,
message2Payload
)
@ -417,7 +419,7 @@ describe("MessageChannel", function () {
channelA.channelId,
"carol",
[],
5, // Same timestamp
5n, // Same timestamp
undefined,
message1Payload
)
@ -432,14 +434,14 @@ describe("MessageChannel", function () {
const first = localHistory.findIndex(
({ messageId, lamportTimestamp }) => {
return messageId === expectedOrder[0] && lamportTimestamp == 5;
return messageId === expectedOrder[0] && lamportTimestamp == 5n;
}
);
expect(first).to.eq(0);
const second = localHistory.findIndex(
({ messageId, lamportTimestamp }) => {
return messageId === expectedOrder[1] && lamportTimestamp == 5;
return messageId === expectedOrder[1] && lamportTimestamp == 5n;
}
);
expect(second).to.eq(1);

View File

@ -56,7 +56,7 @@ export type ILocalHistory = Pick<
export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
public readonly channelId: ChannelId;
public readonly senderId: SenderId;
private lamportTimestamp: number;
private lamportTimestamp: bigint;
private filter: DefaultBloomFilter;
private outgoingBuffer: ContentMessage[];
private possibleAcks: Map<MessageId, number>;
@ -96,7 +96,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
this.channelId = channelId;
this.senderId = senderId;
// Initialize channel lamport timestamp to current time in seconds.
this.lamportTimestamp = Date.now() / 1000;
this.lamportTimestamp = BigInt(Math.floor(Date.now() / 1000));
this.filter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS);
this.outgoingBuffer = [];
this.possibleAcks = new Map();
@ -368,7 +368,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
public async pushOutgoingSyncMessage(
callback?: (message: SyncMessage) => Promise<boolean>
): Promise<boolean> {
this.lamportTimestamp++;
this.lamportTimestamp = this.lamportTimestamp + 1n;
const message = new SyncMessage(
// does not need to be secure randomness
`sync-${Math.random().toString(36).substring(2)}`,
@ -525,7 +525,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
retrievalHint?: Uint8Array;
}>
): Promise<void> {
this.lamportTimestamp++;
this.lamportTimestamp = this.lamportTimestamp + 1n;
const messageId = MessageChannel.getMessageId(payload);