diff --git a/packages/sdk/src/reliable_channel/reliable_channel_acks.spec.ts b/packages/sdk/src/reliable_channel/reliable_channel_acks.spec.ts index 9cce421c59..88b5974372 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel_acks.spec.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel_acks.spec.ts @@ -6,18 +6,37 @@ import { IDecoder, IEncoder } from "@waku/interfaces"; -import { - createRoutingInfo, - delay, - MockWakuEvents, - MockWakuNode -} from "@waku/utils"; +import { createRoutingInfo, MockWakuEvents, MockWakuNode } from "@waku/utils"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; import { beforeEach, describe } from "mocha"; import { ReliableChannel } from "./index.js"; +function waitForEvent( + emitter: TypedEventEmitter, + eventName: string, + predicate?: (detail: T) => boolean, + timeoutMs: number = 5000 +): Promise { + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + emitter.removeEventListener(eventName, handler); + reject(new Error(`Timeout waiting for event: ${eventName}`)); + }, timeoutMs); + + const handler = (event: CustomEvent): void => { + if (!predicate || predicate(event.detail)) { + clearTimeout(timeout); + emitter.removeEventListener(eventName, handler); + resolve(event.detail); + } + }; + + emitter.addEventListener(eventName, handler); + }); +} + const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto"; const TEST_NETWORK_CONFIG: AutoSharding = { clusterId: 0, @@ -64,33 +83,29 @@ describe("Reliable Channel: Acks", () => { // Alice sets up message tracking const messageId = ReliableChannel.getMessageId(message); - let messageReceived = false; - reliableChannelBob.addEventListener("message-received", (event) => { - if (bytesToUtf8(event.detail.payload) === "first message in channel") { - messageReceived = true; - } - }); + const messageReceivedPromise = waitForEvent( + reliableChannelBob, + "message-received", + (msg) => bytesToUtf8(msg.payload) === "first message in channel" + ); - let messageAcknowledged = false; - reliableChannelAlice.addEventListener("message-acknowledged", (event) => { - if (event.detail === messageId) { - messageAcknowledged = true; - } - }); + const messageAcknowledgedPromise = waitForEvent( + reliableChannelAlice, + "message-acknowledged", + (id) => id === messageId + ); + // Alice sends the message reliableChannelAlice.send(message); // Wait for Bob to receive the message to ensure it uses it in causal history - while (!messageReceived) { - await delay(50); - } - // Bobs sends a message now, it should include first one in causal history - reliableChannelBob.send(utf8ToBytes("second message in channel")); - while (!messageAcknowledged) { - await delay(50); - } + await messageReceivedPromise; - expect(messageAcknowledged).to.be.true; + // Bob sends a message now, it should include first one in causal history + reliableChannelBob.send(utf8ToBytes("second message in channel")); + + // Wait for Alice to receive acknowledgment + await messageAcknowledgedPromise; }); it("Re-sent message is acknowledged once other parties join.", async () => { @@ -115,18 +130,17 @@ describe("Reliable Channel: Acks", () => { // acknowledged in this test. const message = utf8ToBytes("message to be acknowledged"); const messageId = ReliableChannel.getMessageId(message); + let messageAcknowledged = false; reliableChannelAlice.addEventListener("message-acknowledged", (event) => { if (event.detail === messageId) { messageAcknowledged = true; } }); + reliableChannelAlice.send(message); - // Wait a bit to ensure Bob does not receive the message - await delay(100); - - // Now Bob goes online + // Now Bob goes online (no need to wait since Bob wasn't online to receive it) const mockWakuNodeBob = new MockWakuNode(commonEventEmitter); const reliableChannelBob = await ReliableChannel.create( mockWakuNodeBob, @@ -141,47 +155,51 @@ describe("Reliable Channel: Acks", () => { } ); - // Track when Bob receives the message - let bobReceivedMessage = false; - reliableChannelBob.addEventListener("message-received", (event) => { - if (bytesToUtf8(event.detail.payload!) === "message to be acknowledged") { - bobReceivedMessage = true; - } - }); - // Some sync messages are exchanged await reliableChannelAlice["sendSyncMessage"](); await reliableChannelBob["sendSyncMessage"](); - // wait a bit to ensure messages are processed - await delay(100); + // Wait for Bob to receive "some message" to ensure sync messages were processed + const bobReceivedSomeMessagePromise = waitForEvent( + reliableChannelBob, + "message-received", + (msg) => bytesToUtf8(msg.payload) === "some message" + ); // Some content messages are exchanged too reliableChannelAlice.send(utf8ToBytes("some message")); reliableChannelBob.send(utf8ToBytes("some other message")); - // wait a bit to ensure messages are processed - await delay(100); + // Wait for the "some message" to be received to ensure messages are processed + await bobReceivedSomeMessagePromise; // At this point, the message shouldn't be acknowledged yet as Bob // does not have a complete log expect(messageAcknowledged).to.be.false; // Now Alice resends the message + const bobReceivedMessagePromise = waitForEvent( + reliableChannelBob, + "message-received", + (msg) => bytesToUtf8(msg.payload) === "message to be acknowledged" + ); + reliableChannelAlice.send(message); // Wait for Bob to receive the message - while (!bobReceivedMessage) { - await delay(50); - } + await bobReceivedMessagePromise; + + // Set up promise waiter for acknowledgment before Bob sends sync + const messageAcknowledgedPromise = waitForEvent( + reliableChannelAlice, + "message-acknowledged", + (id) => id === messageId + ); // Bob receives it, and should include it in its sync await reliableChannelBob["sendSyncMessage"](); - while (!messageAcknowledged) { - await delay(50); - } - // The sync should acknowledge the message - expect(messageAcknowledged).to.be.true; + // Wait for acknowledgment + await messageAcknowledgedPromise; }); });