diff --git a/packages/sdk/src/reliable_channel/reliable_channel.spec.ts b/packages/sdk/src/reliable_channel/reliable_channel.spec.ts index ad69d35009..d586d4409b 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel.spec.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel.spec.ts @@ -25,6 +25,8 @@ import { expect } from "chai"; import { beforeEach, describe } from "mocha"; import sinon from "sinon"; +import { waitForEvent } from "./test_utils.js"; + import { ReliableChannel } from "./index.js"; const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto"; @@ -63,18 +65,12 @@ describe("Reliable Channel", () => { // Setting up message tracking const messageId = reliableChannel.send(message); - let messageSending = false; - reliableChannel.addEventListener("sending-message", (event) => { - if (event.detail === messageId) { - messageSending = true; - } - }); - while (!messageSending) { - await delay(50); - } - - expect(messageSending).to.be.true; + await waitForEvent( + reliableChannel, + "sending-message", + (id) => id === messageId + ); }); it("Outgoing message is emitted as sent", async () => { @@ -90,19 +86,11 @@ describe("Reliable Channel", () => { const messageId = reliableChannel.send(message); - // Setting up message tracking - let messageSent = false; - reliableChannel.addEventListener("message-sent", (event) => { - if (event.detail === messageId) { - messageSent = true; - } - }); - - while (!messageSent) { - await delay(50); - } - - expect(messageSent).to.be.true; + await waitForEvent( + reliableChannel, + "message-sent", + (id) => id === messageId + ); }); it("Encoder error raises irrecoverable error", async () => { @@ -130,22 +118,11 @@ describe("Reliable Channel", () => { encoder.contentTopic = "..."; const messageId = reliableChannel.send(message); - // Setting up message tracking - let irrecoverableError = false; - reliableChannel.addEventListener( + await waitForEvent<{ messageId: string; error: any }>( + reliableChannel, "sending-message-irrecoverable-error", - (event) => { - if (event.detail.messageId === messageId) { - irrecoverableError = true; - } - } + (detail) => detail.messageId === messageId ); - - while (!irrecoverableError) { - await delay(50); - } - - expect(irrecoverableError).to.be.true; }); it("Outgoing message is not emitted as acknowledged from own outgoing messages", async () => { @@ -205,39 +182,32 @@ describe("Reliable Channel", () => { // Alice sets up message tracking for first message const firstMessageId = ReliableChannel.getMessageId(messages[0]); - let firstMessagePossiblyAcknowledged = false; - reliableChannelAlice.addEventListener( - "message-possibly-acknowledged", - (event) => { - if (event.detail.messageId === firstMessageId) { - firstMessagePossiblyAcknowledged = true; - } - } + + const bobReceivedThirdPromise = waitForEvent( + reliableChannelBob, + "message-received", + (msg) => bytesToUtf8(msg.payload) === "third" ); - let messageReceived = false; - reliableChannelBob.addEventListener("message-received", (event) => { - if (bytesToUtf8(event.detail.payload) === "third") { - messageReceived = true; - } - }); + const firstMessagePossiblyAckPromise = waitForEvent<{ + messageId: string; + possibleAckCount: number; + }>( + reliableChannelAlice, + "message-possibly-acknowledged", + (detail) => detail.messageId === firstMessageId + ); for (const m of messages) { reliableChannelAlice.send(m); } // Wait for Bob to receive last message to ensure it is all included in filter - while (!messageReceived) { - await delay(50); - } + await bobReceivedThirdPromise; - // Bobs sends a message now, it should include first one in bloom filter + // Bob sends a message now, it should include first one in bloom filter reliableChannelBob.send(utf8ToBytes("message back")); - while (!firstMessagePossiblyAcknowledged) { - await delay(50); - } - - expect(firstMessagePossiblyAcknowledged).to.be.true; + await firstMessagePossiblyAckPromise; }); it("Outgoing message is acknowledged", async () => { @@ -264,31 +234,23 @@ describe("Reliable Channel", () => { const messageId = reliableChannelAlice.send(message); - // Alice sets up message tracking - let messageAcknowledged = false; - reliableChannelAlice.addEventListener("message-acknowledged", (event) => { - if (event.detail === messageId) { - messageAcknowledged = true; - } - }); + const bobReceivedPromise = waitForEvent( + reliableChannelBob, + "message-received" + ); - let bobReceivedMessage = false; - reliableChannelBob.addEventListener("message-received", () => { - bobReceivedMessage = true; - }); + const messageAcknowledgedPromise = waitForEvent( + reliableChannelAlice, + "message-acknowledged", + (id) => id === messageId + ); // Wait for bob to receive the message to ensure it's included in causal history - while (!bobReceivedMessage) { - await delay(50); - } + await bobReceivedPromise; - // Bobs sends a message now, it should include first one in causal history + // Bob sends a message now, it should include first one in causal history reliableChannelBob.send(utf8ToBytes("second message in channel")); - while (!messageAcknowledged) { - await delay(50); - } - - expect(messageAcknowledged).to.be.true; + await messageAcknowledgedPromise; }); it("Incoming message is emitted as received", async () => { @@ -300,19 +262,17 @@ describe("Reliable Channel", () => { decoder ); - let receivedMessage: IDecodedMessage; - reliableChannel.addEventListener("message-received", (event) => { - receivedMessage = event.detail; - }); - const message = utf8ToBytes("message in channel"); - reliableChannel.send(message); - while (!receivedMessage!) { - await delay(50); - } + const receivedPromise = waitForEvent( + reliableChannel, + "message-received" + ); - expect(bytesToUtf8(receivedMessage!.payload)).to.eq(bytesToUtf8(message)); + reliableChannel.send(message); + const receivedMessage = await receivedPromise; + + expect(bytesToUtf8(receivedMessage.payload)).to.eq(bytesToUtf8(message)); }); describe("Retries", () => { @@ -355,20 +315,29 @@ describe("Reliable Channel", () => { } }); - reliableChannelAlice.send(message); + // Wait for first message + const firstMessagePromise = waitForEvent( + reliableChannelBob, + "message-received", + (msg) => bytesToUtf8(msg.payload) === msgTxt + ); - while (messageCount < 1) { - await delay(10); - } + reliableChannelAlice.send(message); + await firstMessagePromise; expect(messageCount).to.equal(1, "Bob received Alice's message once"); + // Wait for retry - Bob should receive the same message again + const retryMessagePromise = waitForEvent( + reliableChannelBob, + "message-received", + (msg) => bytesToUtf8(msg.payload) === msgTxt + ); + // No response from Bob should trigger a retry from Alice - while (messageCount < 2) { - await delay(10); - } + await retryMessagePromise; expect(messageCount).to.equal(2, "retried once"); - // Bobs sends a message now, it should include first one in causal history + // Bob sends a message now, it should include first one in causal history reliableChannelBob.send(utf8ToBytes("second message in channel")); // Wait long enough to confirm no retry is executed diff --git a/packages/sdk/src/reliable_channel/reliable_channel_acks.spec.ts b/packages/sdk/src/reliable_channel/reliable_channel_acks.spec.ts index 88b5974372..0c66f75c6c 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel_acks.spec.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel_acks.spec.ts @@ -11,32 +11,10 @@ import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; import { beforeEach, describe } from "mocha"; +import { waitForEvent } from "./test_utils.js"; + import { ReliableChannel } from "./index.js"; -function waitForEvent( - emitter: TypedEventEmitter, - eventName: string, - predicate?: (detail: T) => boolean, - timeoutMs: number = 5000 -): Promise { - return new Promise((resolve, reject) => { - const timeout = setTimeout(() => { - emitter.removeEventListener(eventName, handler); - reject(new Error(`Timeout waiting for event: ${eventName}`)); - }, timeoutMs); - - const handler = (event: CustomEvent): void => { - if (!predicate || predicate(event.detail)) { - clearTimeout(timeout); - emitter.removeEventListener(eventName, handler); - resolve(event.detail); - } - }; - - emitter.addEventListener(eventName, handler); - }); -} - const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto"; const TEST_NETWORK_CONFIG: AutoSharding = { clusterId: 0, diff --git a/packages/sdk/src/reliable_channel/reliable_channel_encryption.spec.ts b/packages/sdk/src/reliable_channel/reliable_channel_encryption.spec.ts index 628c99bfaa..24770f1fce 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel_encryption.spec.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel_encryption.spec.ts @@ -25,6 +25,8 @@ import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; import { beforeEach, describe } from "mocha"; +import { waitForEvent } from "./test_utils.js"; + import { ReliableChannel } from "./index.js"; const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto"; @@ -70,19 +72,15 @@ describe("Reliable Channel: Encryption", () => { // Setting up message tracking const messageId = ReliableChannel.getMessageId(message); - let messageSending = false; - reliableChannel.addEventListener("sending-message", (event) => { - if (event.detail === messageId) { - messageSending = true; - } - }); + + const sendingPromise = waitForEvent( + reliableChannel, + "sending-message", + (id) => id === messageId + ); reliableChannel.send(message); - while (!messageSending) { - await delay(50); - } - - expect(messageSending).to.be.true; + await sendingPromise; }); it("Outgoing message is emitted as sent", async () => { @@ -98,19 +96,15 @@ describe("Reliable Channel: Encryption", () => { // Setting up message tracking const messageId = ReliableChannel.getMessageId(message); - let messageSent = false; - reliableChannel.addEventListener("message-sent", (event) => { - if (event.detail === messageId) { - messageSent = true; - } - }); + + const sentPromise = waitForEvent( + reliableChannel, + "message-sent", + (id) => id === messageId + ); reliableChannel.send(message); - while (!messageSent) { - await delay(50); - } - - expect(messageSent).to.be.true; + await sentPromise; }); it("Encoder error raises irrecoverable error", async () => { @@ -137,23 +131,16 @@ describe("Reliable Channel: Encryption", () => { // Setting up message tracking const messageId = ReliableChannel.getMessageId(message); - let irrecoverableError = false; - reliableChannel.addEventListener( + + const errorPromise = waitForEvent<{ messageId: string; error: any }>( + reliableChannel, "sending-message-irrecoverable-error", - (event) => { - if (event.detail.messageId === messageId) { - irrecoverableError = true; - } - } + (detail) => detail.messageId === messageId ); encoder.contentTopic = "..."; reliableChannel.send(message); - while (!irrecoverableError) { - await delay(50); - } - - expect(irrecoverableError).to.be.true; + await errorPromise; }); it("Outgoing message is not emitted as acknowledged from own outgoing messages", async () => { @@ -216,21 +203,21 @@ describe("Reliable Channel: Encryption", () => { // Alice sets up message tracking for first message const firstMessageId = ReliableChannel.getMessageId(messages[0]); - let firstMessagePossiblyAcknowledged = false; - reliableChannelAlice.addEventListener( - "message-possibly-acknowledged", - (event) => { - if (event.detail.messageId === firstMessageId) { - firstMessagePossiblyAcknowledged = true; - } - } - ); let bobMessageReceived = 0; reliableChannelAlice.addEventListener("message-received", () => { bobMessageReceived++; }); + const firstMessagePossiblyAckPromise = waitForEvent<{ + messageId: string; + possibleAckCount: number; + }>( + reliableChannelAlice, + "message-possibly-acknowledged", + (detail) => detail.messageId === firstMessageId + ); + for (const m of messages) { reliableChannelAlice.send(m); } @@ -240,13 +227,9 @@ describe("Reliable Channel: Encryption", () => { await delay(50); } - // Bobs sends a message now, it should include first one in bloom filter + // Bob sends a message now, it should include first one in bloom filter reliableChannelBob.send(utf8ToBytes("message back")); - while (!firstMessagePossiblyAcknowledged) { - await delay(50); - } - - expect(firstMessagePossiblyAcknowledged).to.be.true; + await firstMessagePossiblyAckPromise; }); it("Outgoing message is acknowledged", async () => { @@ -273,32 +256,26 @@ describe("Reliable Channel: Encryption", () => { // Alice sets up message tracking const messageId = ReliableChannel.getMessageId(message); - let messageAcknowledged = false; - reliableChannelAlice.addEventListener("message-acknowledged", (event) => { - if (event.detail === messageId) { - messageAcknowledged = true; - } - }); - let bobReceivedMessage = false; - reliableChannelBob.addEventListener("message-received", () => { - bobReceivedMessage = true; - }); + const bobReceivedPromise = waitForEvent( + reliableChannelBob, + "message-received" + ); + + const messageAcknowledgedPromise = waitForEvent( + reliableChannelAlice, + "message-acknowledged", + (id) => id === messageId + ); reliableChannelAlice.send(message); // Wait for Bob to receive the message - while (!bobReceivedMessage) { - await delay(50); - } + await bobReceivedPromise; - // Bobs sends a message now, it should include first one in causal history + // Bob sends a message now, it should include first one in causal history reliableChannelBob.send(utf8ToBytes("second message in channel")); - while (!messageAcknowledged) { - await delay(50); - } - - expect(messageAcknowledged).to.be.true; + await messageAcknowledgedPromise; }); it("Incoming message is emitted as received", async () => { @@ -310,18 +287,16 @@ describe("Reliable Channel: Encryption", () => { decoder ); - let receivedMessage: IDecodedMessage; - reliableChannel.addEventListener("message-received", (event) => { - receivedMessage = event.detail; - }); - const message = utf8ToBytes("message in channel"); - reliableChannel.send(message); - while (!receivedMessage!) { - await delay(50); - } + const receivedPromise = waitForEvent( + reliableChannel, + "message-received" + ); - expect(bytesToUtf8(receivedMessage!.payload)).to.eq(bytesToUtf8(message)); + reliableChannel.send(message); + const receivedMessage = await receivedPromise; + + expect(bytesToUtf8(receivedMessage.payload)).to.eq(bytesToUtf8(message)); }); }); diff --git a/packages/sdk/src/reliable_channel/reliable_channel_sync.spec.ts b/packages/sdk/src/reliable_channel/reliable_channel_sync.spec.ts index 226d5b8c6a..298a7e4341 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel_sync.spec.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel_sync.spec.ts @@ -18,6 +18,8 @@ import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; import { beforeEach, describe } from "mocha"; +import { waitForEvent } from "./test_utils.js"; + import { ReliableChannel } from "./index.js"; const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto"; @@ -58,16 +60,12 @@ describe("Reliable Channel: Sync", () => { // Send a message to have a history const sentMsgId = reliableChannel.send(utf8ToBytes("some message")); - let messageSent = false; - reliableChannel.addEventListener("message-sent", (event) => { - if (event.detail === sentMsgId) { - messageSent = true; - } - }); - while (!messageSent) { - await delay(50); - } + await waitForEvent( + reliableChannel, + "message-sent", + (id) => id === sentMsgId + ); let syncMessageSent = false; reliableChannel.messageChannel.addEventListener( @@ -146,16 +144,12 @@ describe("Reliable Channel: Sync", () => { // Send a message to have a history const sentMsgId = reliableChannelAlice.send(utf8ToBytes("some message")); - let messageSent = false; - reliableChannelAlice.addEventListener("message-sent", (event) => { - if (event.detail === sentMsgId) { - messageSent = true; - } - }); - while (!messageSent) { - await delay(50); - } + await waitForEvent( + reliableChannelAlice, + "message-sent", + (id) => id === sentMsgId + ); let syncMessageSent = false; reliableChannelBob.messageChannel.addEventListener( @@ -165,10 +159,11 @@ describe("Reliable Channel: Sync", () => { } ); - while (!syncMessageSent) { - // Bob will send a sync message as soon as it started, we are waiting for this one - await delay(100); - } + // Bob will send a sync message as soon as it started, we are waiting for this one + await waitForEvent( + reliableChannelBob.messageChannel, + MessageChannelEvent.OutSyncSent + ); // Let's reset the tracker syncMessageSent = false; // We should be faster than Bob as Bob will "randomly" wait a full second @@ -219,16 +214,12 @@ describe("Reliable Channel: Sync", () => { // Send a message to have a history const sentMsgId = reliableChannelAlice.send(utf8ToBytes("some message")); - let messageSent = false; - reliableChannelAlice.addEventListener("message-sent", (event) => { - if (event.detail === sentMsgId) { - messageSent = true; - } - }); - while (!messageSent) { - await delay(50); - } + await waitForEvent( + reliableChannelAlice, + "message-sent", + (id) => id === sentMsgId + ); let syncMessageSent = false; reliableChannelBob.messageChannel.addEventListener( @@ -238,10 +229,11 @@ describe("Reliable Channel: Sync", () => { } ); - while (!syncMessageSent) { - // Bob will send a sync message as soon as it started, we are waiting for this one - await delay(100); - } + // Bob will send a sync message as soon as it started, we are waiting for this one + await waitForEvent( + reliableChannelBob.messageChannel, + MessageChannelEvent.OutSyncSent + ); // Let's reset the tracker syncMessageSent = false; // We should be faster than Bob as Bob will "randomly" wait a full second @@ -273,16 +265,12 @@ describe("Reliable Channel: Sync", () => { // Send a message to have a history const sentMsgId = reliableChannel.send(utf8ToBytes("some message")); - let messageSent = false; - reliableChannel.addEventListener("message-sent", (event) => { - if (event.detail === sentMsgId) { - messageSent = true; - } - }); - while (!messageSent) { - await delay(50); - } + await waitForEvent( + reliableChannel, + "message-sent", + (id) => id === sentMsgId + ); let syncMessageSent = false; reliableChannel.messageChannel.addEventListener( @@ -292,10 +280,11 @@ describe("Reliable Channel: Sync", () => { } ); - while (!syncMessageSent) { - // Will send a sync message as soon as it started, we are waiting for this one - await delay(100); - } + // Will send a sync message as soon as it started, we are waiting for this one + await waitForEvent( + reliableChannel.messageChannel, + MessageChannelEvent.OutSyncSent + ); // Let's reset the tracker syncMessageSent = false; // We should be faster than automated sync as it will "randomly" wait a full second @@ -327,16 +316,12 @@ describe("Reliable Channel: Sync", () => { // Send a message to have a history const sentMsgId = reliableChannel.send(utf8ToBytes("some message")); - let messageSent = false; - reliableChannel.addEventListener("message-sent", (event) => { - if (event.detail === sentMsgId) { - messageSent = true; - } - }); - while (!messageSent) { - await delay(50); - } + await waitForEvent( + reliableChannel, + "message-sent", + (id) => id === sentMsgId + ); let syncMessageSent = false; reliableChannel.messageChannel.addEventListener( @@ -346,10 +331,11 @@ describe("Reliable Channel: Sync", () => { } ); - while (!syncMessageSent) { - // Will send a sync message as soon as it started, we are waiting for this one - await delay(100); - } + // Will send a sync message as soon as it started, we are waiting for this one + await waitForEvent( + reliableChannel.messageChannel, + MessageChannelEvent.OutSyncSent + ); // Let's reset the tracker syncMessageSent = false; // We should be faster than automated sync as it will "randomly" wait a full second diff --git a/packages/sdk/src/reliable_channel/test_utils.ts b/packages/sdk/src/reliable_channel/test_utils.ts new file mode 100644 index 0000000000..62f92bdbcd --- /dev/null +++ b/packages/sdk/src/reliable_channel/test_utils.ts @@ -0,0 +1,44 @@ +import { TypedEventEmitter } from "@libp2p/interface"; + +/** + * Helper function to wait for an event with an optional predicate. + * Useful for replacing delay-based polling in tests. + * + * @param emitter - The event emitter to listen to + * @param eventName - The name of the event to wait for + * @param predicate - Optional function to filter events by detail + * @param timeoutMs - Timeout in milliseconds (default: 5000) + * @returns Promise that resolves with the event detail + * + * @example + * ```typescript + * const messageId = await waitForEvent( + * channel, + * "message-acknowledged", + * (id) => id === expectedId + * ); + * ``` + */ +export function waitForEvent( + emitter: TypedEventEmitter, + eventName: string, + predicate?: (detail: T) => boolean, + timeoutMs: number = 5000 +): Promise { + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + emitter.removeEventListener(eventName, handler); + reject(new Error(`Timeout waiting for event: ${eventName}`)); + }, timeoutMs); + + const handler = (event: CustomEvent): void => { + if (!predicate || predicate(event.detail)) { + clearTimeout(timeout); + emitter.removeEventListener(eventName, handler); + resolve(event.detail); + } + }; + + emitter.addEventListener(eventName, handler); + }); +}