diff --git a/packages/sdk/src/reliable_channel/reliable_channel_sync.spec.ts b/packages/sdk/src/reliable_channel/reliable_channel_sync.spec.ts index 75dbb2eda0..226d5b8c6a 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel_sync.spec.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel_sync.spec.ts @@ -56,6 +56,19 @@ describe("Reliable Channel: Sync", () => { } ); + // Send a message to have a history + const sentMsgId = reliableChannel.send(utf8ToBytes("some message")); + let messageSent = false; + reliableChannel.addEventListener("message-sent", (event) => { + if (event.detail === sentMsgId) { + messageSent = true; + } + }); + + while (!messageSent) { + await delay(50); + } + let syncMessageSent = false; reliableChannel.messageChannel.addEventListener( MessageChannelEvent.OutSyncSent, @@ -131,6 +144,19 @@ describe("Reliable Channel: Sync", () => { return 1; }; // will wait a full second + // Send a message to have a history + const sentMsgId = reliableChannelAlice.send(utf8ToBytes("some message")); + let messageSent = false; + reliableChannelAlice.addEventListener("message-sent", (event) => { + if (event.detail === sentMsgId) { + messageSent = true; + } + }); + + while (!messageSent) { + await delay(50); + } + let syncMessageSent = false; reliableChannelBob.messageChannel.addEventListener( MessageChannelEvent.OutSyncSent, @@ -191,6 +217,19 @@ describe("Reliable Channel: Sync", () => { return 1; }; // will wait a full second + // Send a message to have a history + const sentMsgId = reliableChannelAlice.send(utf8ToBytes("some message")); + let messageSent = false; + reliableChannelAlice.addEventListener("message-sent", (event) => { + if (event.detail === sentMsgId) { + messageSent = true; + } + }); + + while (!messageSent) { + await delay(50); + } + let syncMessageSent = false; reliableChannelBob.messageChannel.addEventListener( MessageChannelEvent.OutSyncSent, @@ -232,6 +271,19 @@ describe("Reliable Channel: Sync", () => { return 1; }; // will wait a full second + // Send a message to have a history + const sentMsgId = reliableChannel.send(utf8ToBytes("some message")); + let messageSent = false; + reliableChannel.addEventListener("message-sent", (event) => { + if (event.detail === sentMsgId) { + messageSent = true; + } + }); + + while (!messageSent) { + await delay(50); + } + let syncMessageSent = false; reliableChannel.messageChannel.addEventListener( MessageChannelEvent.OutSyncSent, @@ -273,6 +325,19 @@ describe("Reliable Channel: Sync", () => { return 1; }; // will wait a full second + // Send a message to have a history + const sentMsgId = reliableChannel.send(utf8ToBytes("some message")); + let messageSent = false; + reliableChannel.addEventListener("message-sent", (event) => { + if (event.detail === sentMsgId) { + messageSent = true; + } + }); + + while (!messageSent) { + await delay(50); + } + let syncMessageSent = false; reliableChannel.messageChannel.addEventListener( MessageChannelEvent.OutSyncSent, diff --git a/packages/sds/src/message_channel/message_channel.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts index 6098f4c5b7..91184f04d8 100644 --- a/packages/sds/src/message_channel/message_channel.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -647,11 +647,12 @@ describe("MessageChannel", function () { }); // And be sends a sync message - await channelB.pushOutgoingSyncMessage(async (message) => { + const res = await channelB.pushOutgoingSyncMessage(async (message) => { await receiveMessage(channelA, message); return true; }); + expect(res).to.be.true; expect(messageAcked).to.be.true; }); }); @@ -1089,17 +1090,41 @@ describe("MessageChannel", function () { causalHistorySize: 2 }); channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 }); + const message = utf8ToBytes("first message in channel"); + channelA["localHistory"].push( + new ContentMessage( + MessageChannel.getMessageId(message), + "MyChannel", + "alice", + [], + 1n, + undefined, + message + ) + ); }); it("should be sent with empty content", async () => { - await channelA.pushOutgoingSyncMessage(async (message) => { + const res = await channelA.pushOutgoingSyncMessage(async (message) => { expect(message.content).to.be.undefined; return true; }); + expect(res).to.be.true; + }); + + it("should not be sent when there is no history", async () => { + const channelC = new MessageChannel(channelId, "carol", { + causalHistorySize: 2 + }); + const res = await channelC.pushOutgoingSyncMessage(async (_msg) => { + throw "callback was called when it's not expected"; + }); + expect(res).to.be.false; }); it("should not be added to outgoing buffer, bloom filter, or local log", async () => { - await channelA.pushOutgoingSyncMessage(); + const res = await channelA.pushOutgoingSyncMessage(); + expect(res).to.be.true; const outgoingBuffer = channelA["outgoingBuffer"] as Message[]; expect(outgoingBuffer.length).to.equal(0); @@ -1110,15 +1135,16 @@ describe("MessageChannel", function () { ).to.equal(false); const localLog = channelA["localHistory"]; - expect(localLog.length).to.equal(0); + expect(localLog.length).to.equal(1); // beforeEach adds one message }); it("should not be delivered", async () => { const timestampBefore = channelB["lamportTimestamp"]; - await channelA.pushOutgoingSyncMessage(async (message) => { + const res = await channelA.pushOutgoingSyncMessage(async (message) => { await receiveMessage(channelB, message); return true; }); + expect(res).to.be.true; const timestampAfter = channelB["lamportTimestamp"]; expect(timestampAfter).to.equal(timestampBefore); @@ -1132,20 +1158,23 @@ describe("MessageChannel", function () { }); it("should update ack status of messages in outgoing buffer", async () => { + const channelC = new MessageChannel(channelId, "carol", { + causalHistorySize: 2 + }); for (const m of messagesA) { - await sendMessage(channelA, utf8ToBytes(m), async (message) => { + await sendMessage(channelC, utf8ToBytes(m), async (message) => { await receiveMessage(channelB, message); return { success: true }; }); } await sendSyncMessage(channelB, async (message) => { - await receiveMessage(channelA, message); + await receiveMessage(channelC, message); return true; }); - const causalHistorySize = channelA["causalHistorySize"]; - const outgoingBuffer = channelA["outgoingBuffer"] as Message[]; + const causalHistorySize = channelC["causalHistorySize"]; + const outgoingBuffer = channelC["outgoingBuffer"] as Message[]; expect(outgoingBuffer.length).to.equal( messagesA.length - causalHistorySize ); diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index 37dd1a6160..3df21f160a 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -384,6 +384,14 @@ export class MessageChannel extends TypedEventEmitter { undefined ); + if (!message.causalHistory || message.causalHistory.length === 0) { + log.info( + this.senderId, + "no causal history in sync message, aborting sending" + ); + return false; + } + if (callback) { try { await callback(message); @@ -400,7 +408,8 @@ export class MessageChannel extends TypedEventEmitter { throw error; } } - return false; + // No problem encountered so returning true + return true; } private _pushIncomingMessage(message: Message): void {