diff --git a/packages/sds/src/message_channel/message_channel.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts index 1a4e357b6e..fd7844bbf4 100644 --- a/packages/sds/src/message_channel/message_channel.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -154,8 +154,8 @@ describe("MessageChannel", function () { }); // Causal history should only contain the last N messages as defined by causalHistorySize - const causalHistory = outgoingBuffer[outgoingBuffer.length - 1] - .causalHistory as HistoryEntry[]; + const causalHistory = + outgoingBuffer[outgoingBuffer.length - 1].causalHistory; expect(causalHistory.length).to.equal(causalHistorySize); const expectedCausalHistory = messages @@ -185,6 +185,8 @@ describe("MessageChannel", function () { }); it("should update lamport timestamp if greater than current timestamp and dependencies are met", async () => { + const timestampBefore = channelA["lamportTimestamp"]; + for (const m of messagesA) { await sendMessage(channelA, utf8ToBytes(m), callback); } @@ -195,11 +197,12 @@ describe("MessageChannel", function () { }); } const timestampAfter = channelA["lamportTimestamp"]; - expect(timestampAfter).to.equal(messagesB.length); + expect(timestampAfter - timestampBefore).to.equal(messagesB.length); }); it("should maintain proper timestamps if all messages received", async () => { - let timestamp = 0; + const aTimestampBefore = channelA["lamportTimestamp"]; + let timestamp = channelB["lamportTimestamp"]; for (const m of messagesA) { await sendMessage(channelA, utf8ToBytes(m), async (message) => { timestamp++; @@ -219,7 +222,9 @@ describe("MessageChannel", function () { } const expectedLength = messagesA.length + messagesB.length; - expect(channelA["lamportTimestamp"]).to.equal(expectedLength); + expect(channelA["lamportTimestamp"]).to.equal( + aTimestampBefore + expectedLength + ); expect(channelA["lamportTimestamp"]).to.equal( channelB["lamportTimestamp"] ); @@ -312,6 +317,8 @@ describe("MessageChannel", function () { const message2Id = MessageChannel.getMessageId(message2Payload); const message3Id = MessageChannel.getMessageId(message3Payload); + const startTimestamp = channelA["lamportTimestamp"]; + // Send own message first (timestamp will be 1) await sendMessage(channelA, message1Payload, callback); @@ -323,7 +330,7 @@ describe("MessageChannel", function () { channelA.channelId, "bob", [], - 3, // Higher timestamp + startTimestamp + 3, // Higher timestamp undefined, message3Payload ) @@ -337,7 +344,7 @@ describe("MessageChannel", function () { channelA.channelId, "carol", [], - 2, // Middle timestamp + startTimestamp + 2, // Middle timestamp undefined, message2Payload ) @@ -350,21 +357,27 @@ describe("MessageChannel", function () { const first = localHistory.findIndex( ({ messageId, lamportTimestamp }) => { - return messageId === message1Id && lamportTimestamp === 1; + return ( + messageId === message1Id && lamportTimestamp === startTimestamp + 1 + ); } ); expect(first).to.eq(0); const second = localHistory.findIndex( ({ messageId, lamportTimestamp }) => { - return messageId === message2Id && lamportTimestamp === 2; + return ( + messageId === message2Id && lamportTimestamp === startTimestamp + 2 + ); } ); expect(second).to.eq(1); const third = localHistory.findIndex( ({ messageId, lamportTimestamp }) => { - return messageId === message3Id && lamportTimestamp === 3; + return ( + messageId === message3Id && lamportTimestamp === startTimestamp + 3 + ); } ); expect(third).to.eq(2); diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index a4492f8708..6375e819f7 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -95,7 +95,9 @@ export class MessageChannel extends TypedEventEmitter { super(); this.channelId = channelId; this.senderId = senderId; - this.lamportTimestamp = 0; + // SDS RFC says to use nanoseconds, but current time in nanosecond is > Number.MAX_SAFE_INTEGER + // So instead we are using milliseconds and proposing a spec change (TODO) + this.lamportTimestamp = Date.now(); this.filter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS); this.outgoingBuffer = []; this.possibleAcks = new Map();