diff --git a/packages/sds/src/message_channel/message_channel.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts index 6098f4c5b7..98e963a9bc 100644 --- a/packages/sds/src/message_channel/message_channel.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -1089,13 +1089,36 @@ describe("MessageChannel", function () { causalHistorySize: 2 }); channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 }); + const message = utf8ToBytes("first message in channel"); + channelA["localHistory"].push( + new ContentMessage( + MessageChannel.getMessageId(message), + "MyChannel", + "alice", + [], + 1n, + undefined, + message + ) + ); }); it("should be sent with empty content", async () => { - await channelA.pushOutgoingSyncMessage(async (message) => { + const res = await channelA.pushOutgoingSyncMessage(async (message) => { expect(message.content).to.be.undefined; return true; }); + expect(res).to.be.true; + }); + + it("should not be sent when there is no history", async () => { + const channelC = new MessageChannel(channelId, "carol", { + causalHistorySize: 2 + }); + const res = await channelC.pushOutgoingSyncMessage(async (_msg) => { + throw "callback was called when it's not expected"; + }); + expect(res).to.be.false; }); it("should not be added to outgoing buffer, bloom filter, or local log", async () => { @@ -1110,15 +1133,16 @@ describe("MessageChannel", function () { ).to.equal(false); const localLog = channelA["localHistory"]; - expect(localLog.length).to.equal(0); + expect(localLog.length).to.equal(1); // beforeEach adds one message }); it("should not be delivered", async () => { const timestampBefore = channelB["lamportTimestamp"]; - await channelA.pushOutgoingSyncMessage(async (message) => { + const res = await channelA.pushOutgoingSyncMessage(async (message) => { await receiveMessage(channelB, message); return true; }); + expect(res).to.be.true; const timestampAfter = channelB["lamportTimestamp"]; expect(timestampAfter).to.equal(timestampBefore); @@ -1132,20 +1156,23 @@ describe("MessageChannel", function () { }); it("should update ack status of messages in outgoing buffer", async () => { + const channelC = new MessageChannel(channelId, "carol", { + causalHistorySize: 2 + }); for (const m of messagesA) { - await sendMessage(channelA, utf8ToBytes(m), async (message) => { + await sendMessage(channelC, utf8ToBytes(m), async (message) => { await receiveMessage(channelB, message); return { success: true }; }); } await sendSyncMessage(channelB, async (message) => { - await receiveMessage(channelA, message); + await receiveMessage(channelC, message); return true; }); - const causalHistorySize = channelA["causalHistorySize"]; - const outgoingBuffer = channelA["outgoingBuffer"] as Message[]; + const causalHistorySize = channelC["causalHistorySize"]; + const outgoingBuffer = channelC["outgoingBuffer"] as Message[]; expect(outgoingBuffer.length).to.equal( messagesA.length - causalHistorySize ); diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index 37dd1a6160..f81f8bad20 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -384,6 +384,14 @@ export class MessageChannel extends TypedEventEmitter { undefined ); + if (!message.causalHistory || message.causalHistory.length === 0) { + log.info( + this.senderId, + "no causal history in sync message, aborting sending" + ); + return false; + } + if (callback) { try { await callback(message); @@ -400,6 +408,7 @@ export class MessageChannel extends TypedEventEmitter { throw error; } } + // Why returning false if no callback is set? return false; }