fix: refactor delays in reliable channel tests

This commit is contained in:
Arseniy Klempner 2025-10-20 19:04:37 -07:00
parent 8774e942c2
commit 0c4ad5f2b8
No known key found for this signature in database
GPG Key ID: 51653F18863BD24B
5 changed files with 214 additions and 262 deletions

View File

@ -25,6 +25,8 @@ import { expect } from "chai";
import { beforeEach, describe } from "mocha";
import sinon from "sinon";
import { waitForEvent } from "./test_utils.js";
import { ReliableChannel } from "./index.js";
const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto";
@ -63,18 +65,12 @@ describe("Reliable Channel", () => {
// Setting up message tracking
const messageId = reliableChannel.send(message);
let messageSending = false;
reliableChannel.addEventListener("sending-message", (event) => {
if (event.detail === messageId) {
messageSending = true;
}
});
while (!messageSending) {
await delay(50);
}
expect(messageSending).to.be.true;
await waitForEvent<string>(
reliableChannel,
"sending-message",
(id) => id === messageId
);
});
it("Outgoing message is emitted as sent", async () => {
@ -90,19 +86,11 @@ describe("Reliable Channel", () => {
const messageId = reliableChannel.send(message);
// Setting up message tracking
let messageSent = false;
reliableChannel.addEventListener("message-sent", (event) => {
if (event.detail === messageId) {
messageSent = true;
}
});
while (!messageSent) {
await delay(50);
}
expect(messageSent).to.be.true;
await waitForEvent<string>(
reliableChannel,
"message-sent",
(id) => id === messageId
);
});
it("Encoder error raises irrecoverable error", async () => {
@ -130,22 +118,11 @@ describe("Reliable Channel", () => {
encoder.contentTopic = "...";
const messageId = reliableChannel.send(message);
// Setting up message tracking
let irrecoverableError = false;
reliableChannel.addEventListener(
await waitForEvent<{ messageId: string; error: any }>(
reliableChannel,
"sending-message-irrecoverable-error",
(event) => {
if (event.detail.messageId === messageId) {
irrecoverableError = true;
}
}
(detail) => detail.messageId === messageId
);
while (!irrecoverableError) {
await delay(50);
}
expect(irrecoverableError).to.be.true;
});
it("Outgoing message is not emitted as acknowledged from own outgoing messages", async () => {
@ -205,39 +182,32 @@ describe("Reliable Channel", () => {
// Alice sets up message tracking for first message
const firstMessageId = ReliableChannel.getMessageId(messages[0]);
let firstMessagePossiblyAcknowledged = false;
reliableChannelAlice.addEventListener(
"message-possibly-acknowledged",
(event) => {
if (event.detail.messageId === firstMessageId) {
firstMessagePossiblyAcknowledged = true;
}
}
const bobReceivedThirdPromise = waitForEvent<IDecodedMessage>(
reliableChannelBob,
"message-received",
(msg) => bytesToUtf8(msg.payload) === "third"
);
let messageReceived = false;
reliableChannelBob.addEventListener("message-received", (event) => {
if (bytesToUtf8(event.detail.payload) === "third") {
messageReceived = true;
}
});
const firstMessagePossiblyAckPromise = waitForEvent<{
messageId: string;
possibleAckCount: number;
}>(
reliableChannelAlice,
"message-possibly-acknowledged",
(detail) => detail.messageId === firstMessageId
);
for (const m of messages) {
reliableChannelAlice.send(m);
}
// Wait for Bob to receive last message to ensure it is all included in filter
while (!messageReceived) {
await delay(50);
}
await bobReceivedThirdPromise;
// Bobs sends a message now, it should include first one in bloom filter
// Bob sends a message now, it should include first one in bloom filter
reliableChannelBob.send(utf8ToBytes("message back"));
while (!firstMessagePossiblyAcknowledged) {
await delay(50);
}
expect(firstMessagePossiblyAcknowledged).to.be.true;
await firstMessagePossiblyAckPromise;
});
it("Outgoing message is acknowledged", async () => {
@ -264,31 +234,23 @@ describe("Reliable Channel", () => {
const messageId = reliableChannelAlice.send(message);
// Alice sets up message tracking
let messageAcknowledged = false;
reliableChannelAlice.addEventListener("message-acknowledged", (event) => {
if (event.detail === messageId) {
messageAcknowledged = true;
}
});
const bobReceivedPromise = waitForEvent<IDecodedMessage>(
reliableChannelBob,
"message-received"
);
let bobReceivedMessage = false;
reliableChannelBob.addEventListener("message-received", () => {
bobReceivedMessage = true;
});
const messageAcknowledgedPromise = waitForEvent<string>(
reliableChannelAlice,
"message-acknowledged",
(id) => id === messageId
);
// Wait for bob to receive the message to ensure it's included in causal history
while (!bobReceivedMessage) {
await delay(50);
}
await bobReceivedPromise;
// Bobs sends a message now, it should include first one in causal history
// Bob sends a message now, it should include first one in causal history
reliableChannelBob.send(utf8ToBytes("second message in channel"));
while (!messageAcknowledged) {
await delay(50);
}
expect(messageAcknowledged).to.be.true;
await messageAcknowledgedPromise;
});
it("Incoming message is emitted as received", async () => {
@ -300,19 +262,17 @@ describe("Reliable Channel", () => {
decoder
);
let receivedMessage: IDecodedMessage;
reliableChannel.addEventListener("message-received", (event) => {
receivedMessage = event.detail;
});
const message = utf8ToBytes("message in channel");
reliableChannel.send(message);
while (!receivedMessage!) {
await delay(50);
}
const receivedPromise = waitForEvent<IDecodedMessage>(
reliableChannel,
"message-received"
);
expect(bytesToUtf8(receivedMessage!.payload)).to.eq(bytesToUtf8(message));
reliableChannel.send(message);
const receivedMessage = await receivedPromise;
expect(bytesToUtf8(receivedMessage.payload)).to.eq(bytesToUtf8(message));
});
describe("Retries", () => {
@ -355,20 +315,29 @@ describe("Reliable Channel", () => {
}
});
reliableChannelAlice.send(message);
// Wait for first message
const firstMessagePromise = waitForEvent<IDecodedMessage>(
reliableChannelBob,
"message-received",
(msg) => bytesToUtf8(msg.payload) === msgTxt
);
while (messageCount < 1) {
await delay(10);
}
reliableChannelAlice.send(message);
await firstMessagePromise;
expect(messageCount).to.equal(1, "Bob received Alice's message once");
// Wait for retry - Bob should receive the same message again
const retryMessagePromise = waitForEvent<IDecodedMessage>(
reliableChannelBob,
"message-received",
(msg) => bytesToUtf8(msg.payload) === msgTxt
);
// No response from Bob should trigger a retry from Alice
while (messageCount < 2) {
await delay(10);
}
await retryMessagePromise;
expect(messageCount).to.equal(2, "retried once");
// Bobs sends a message now, it should include first one in causal history
// Bob sends a message now, it should include first one in causal history
reliableChannelBob.send(utf8ToBytes("second message in channel"));
// Wait long enough to confirm no retry is executed

View File

@ -11,32 +11,10 @@ import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import { beforeEach, describe } from "mocha";
import { waitForEvent } from "./test_utils.js";
import { ReliableChannel } from "./index.js";
function waitForEvent<T>(
emitter: TypedEventEmitter<any>,
eventName: string,
predicate?: (detail: T) => boolean,
timeoutMs: number = 5000
): Promise<T> {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
emitter.removeEventListener(eventName, handler);
reject(new Error(`Timeout waiting for event: ${eventName}`));
}, timeoutMs);
const handler = (event: CustomEvent<T>): void => {
if (!predicate || predicate(event.detail)) {
clearTimeout(timeout);
emitter.removeEventListener(eventName, handler);
resolve(event.detail);
}
};
emitter.addEventListener(eventName, handler);
});
}
const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto";
const TEST_NETWORK_CONFIG: AutoSharding = {
clusterId: 0,

View File

@ -25,6 +25,8 @@ import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import { beforeEach, describe } from "mocha";
import { waitForEvent } from "./test_utils.js";
import { ReliableChannel } from "./index.js";
const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto";
@ -70,19 +72,15 @@ describe("Reliable Channel: Encryption", () => {
// Setting up message tracking
const messageId = ReliableChannel.getMessageId(message);
let messageSending = false;
reliableChannel.addEventListener("sending-message", (event) => {
if (event.detail === messageId) {
messageSending = true;
}
});
const sendingPromise = waitForEvent<string>(
reliableChannel,
"sending-message",
(id) => id === messageId
);
reliableChannel.send(message);
while (!messageSending) {
await delay(50);
}
expect(messageSending).to.be.true;
await sendingPromise;
});
it("Outgoing message is emitted as sent", async () => {
@ -98,19 +96,15 @@ describe("Reliable Channel: Encryption", () => {
// Setting up message tracking
const messageId = ReliableChannel.getMessageId(message);
let messageSent = false;
reliableChannel.addEventListener("message-sent", (event) => {
if (event.detail === messageId) {
messageSent = true;
}
});
const sentPromise = waitForEvent<string>(
reliableChannel,
"message-sent",
(id) => id === messageId
);
reliableChannel.send(message);
while (!messageSent) {
await delay(50);
}
expect(messageSent).to.be.true;
await sentPromise;
});
it("Encoder error raises irrecoverable error", async () => {
@ -137,23 +131,16 @@ describe("Reliable Channel: Encryption", () => {
// Setting up message tracking
const messageId = ReliableChannel.getMessageId(message);
let irrecoverableError = false;
reliableChannel.addEventListener(
const errorPromise = waitForEvent<{ messageId: string; error: any }>(
reliableChannel,
"sending-message-irrecoverable-error",
(event) => {
if (event.detail.messageId === messageId) {
irrecoverableError = true;
}
}
(detail) => detail.messageId === messageId
);
encoder.contentTopic = "...";
reliableChannel.send(message);
while (!irrecoverableError) {
await delay(50);
}
expect(irrecoverableError).to.be.true;
await errorPromise;
});
it("Outgoing message is not emitted as acknowledged from own outgoing messages", async () => {
@ -216,21 +203,21 @@ describe("Reliable Channel: Encryption", () => {
// Alice sets up message tracking for first message
const firstMessageId = ReliableChannel.getMessageId(messages[0]);
let firstMessagePossiblyAcknowledged = false;
reliableChannelAlice.addEventListener(
"message-possibly-acknowledged",
(event) => {
if (event.detail.messageId === firstMessageId) {
firstMessagePossiblyAcknowledged = true;
}
}
);
let bobMessageReceived = 0;
reliableChannelAlice.addEventListener("message-received", () => {
bobMessageReceived++;
});
const firstMessagePossiblyAckPromise = waitForEvent<{
messageId: string;
possibleAckCount: number;
}>(
reliableChannelAlice,
"message-possibly-acknowledged",
(detail) => detail.messageId === firstMessageId
);
for (const m of messages) {
reliableChannelAlice.send(m);
}
@ -240,13 +227,9 @@ describe("Reliable Channel: Encryption", () => {
await delay(50);
}
// Bobs sends a message now, it should include first one in bloom filter
// Bob sends a message now, it should include first one in bloom filter
reliableChannelBob.send(utf8ToBytes("message back"));
while (!firstMessagePossiblyAcknowledged) {
await delay(50);
}
expect(firstMessagePossiblyAcknowledged).to.be.true;
await firstMessagePossiblyAckPromise;
});
it("Outgoing message is acknowledged", async () => {
@ -273,32 +256,26 @@ describe("Reliable Channel: Encryption", () => {
// Alice sets up message tracking
const messageId = ReliableChannel.getMessageId(message);
let messageAcknowledged = false;
reliableChannelAlice.addEventListener("message-acknowledged", (event) => {
if (event.detail === messageId) {
messageAcknowledged = true;
}
});
let bobReceivedMessage = false;
reliableChannelBob.addEventListener("message-received", () => {
bobReceivedMessage = true;
});
const bobReceivedPromise = waitForEvent<IDecodedMessage>(
reliableChannelBob,
"message-received"
);
const messageAcknowledgedPromise = waitForEvent<string>(
reliableChannelAlice,
"message-acknowledged",
(id) => id === messageId
);
reliableChannelAlice.send(message);
// Wait for Bob to receive the message
while (!bobReceivedMessage) {
await delay(50);
}
await bobReceivedPromise;
// Bobs sends a message now, it should include first one in causal history
// Bob sends a message now, it should include first one in causal history
reliableChannelBob.send(utf8ToBytes("second message in channel"));
while (!messageAcknowledged) {
await delay(50);
}
expect(messageAcknowledged).to.be.true;
await messageAcknowledgedPromise;
});
it("Incoming message is emitted as received", async () => {
@ -310,18 +287,16 @@ describe("Reliable Channel: Encryption", () => {
decoder
);
let receivedMessage: IDecodedMessage;
reliableChannel.addEventListener("message-received", (event) => {
receivedMessage = event.detail;
});
const message = utf8ToBytes("message in channel");
reliableChannel.send(message);
while (!receivedMessage!) {
await delay(50);
}
const receivedPromise = waitForEvent<IDecodedMessage>(
reliableChannel,
"message-received"
);
expect(bytesToUtf8(receivedMessage!.payload)).to.eq(bytesToUtf8(message));
reliableChannel.send(message);
const receivedMessage = await receivedPromise;
expect(bytesToUtf8(receivedMessage.payload)).to.eq(bytesToUtf8(message));
});
});

View File

@ -18,6 +18,8 @@ import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import { beforeEach, describe } from "mocha";
import { waitForEvent } from "./test_utils.js";
import { ReliableChannel } from "./index.js";
const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto";
@ -58,16 +60,12 @@ describe("Reliable Channel: Sync", () => {
// Send a message to have a history
const sentMsgId = reliableChannel.send(utf8ToBytes("some message"));
let messageSent = false;
reliableChannel.addEventListener("message-sent", (event) => {
if (event.detail === sentMsgId) {
messageSent = true;
}
});
while (!messageSent) {
await delay(50);
}
await waitForEvent<string>(
reliableChannel,
"message-sent",
(id) => id === sentMsgId
);
let syncMessageSent = false;
reliableChannel.messageChannel.addEventListener(
@ -146,16 +144,12 @@ describe("Reliable Channel: Sync", () => {
// Send a message to have a history
const sentMsgId = reliableChannelAlice.send(utf8ToBytes("some message"));
let messageSent = false;
reliableChannelAlice.addEventListener("message-sent", (event) => {
if (event.detail === sentMsgId) {
messageSent = true;
}
});
while (!messageSent) {
await delay(50);
}
await waitForEvent<string>(
reliableChannelAlice,
"message-sent",
(id) => id === sentMsgId
);
let syncMessageSent = false;
reliableChannelBob.messageChannel.addEventListener(
@ -165,10 +159,11 @@ describe("Reliable Channel: Sync", () => {
}
);
while (!syncMessageSent) {
// Bob will send a sync message as soon as it started, we are waiting for this one
await delay(100);
}
// Bob will send a sync message as soon as it started, we are waiting for this one
await waitForEvent(
reliableChannelBob.messageChannel,
MessageChannelEvent.OutSyncSent
);
// Let's reset the tracker
syncMessageSent = false;
// We should be faster than Bob as Bob will "randomly" wait a full second
@ -219,16 +214,12 @@ describe("Reliable Channel: Sync", () => {
// Send a message to have a history
const sentMsgId = reliableChannelAlice.send(utf8ToBytes("some message"));
let messageSent = false;
reliableChannelAlice.addEventListener("message-sent", (event) => {
if (event.detail === sentMsgId) {
messageSent = true;
}
});
while (!messageSent) {
await delay(50);
}
await waitForEvent<string>(
reliableChannelAlice,
"message-sent",
(id) => id === sentMsgId
);
let syncMessageSent = false;
reliableChannelBob.messageChannel.addEventListener(
@ -238,10 +229,11 @@ describe("Reliable Channel: Sync", () => {
}
);
while (!syncMessageSent) {
// Bob will send a sync message as soon as it started, we are waiting for this one
await delay(100);
}
// Bob will send a sync message as soon as it started, we are waiting for this one
await waitForEvent(
reliableChannelBob.messageChannel,
MessageChannelEvent.OutSyncSent
);
// Let's reset the tracker
syncMessageSent = false;
// We should be faster than Bob as Bob will "randomly" wait a full second
@ -273,16 +265,12 @@ describe("Reliable Channel: Sync", () => {
// Send a message to have a history
const sentMsgId = reliableChannel.send(utf8ToBytes("some message"));
let messageSent = false;
reliableChannel.addEventListener("message-sent", (event) => {
if (event.detail === sentMsgId) {
messageSent = true;
}
});
while (!messageSent) {
await delay(50);
}
await waitForEvent<string>(
reliableChannel,
"message-sent",
(id) => id === sentMsgId
);
let syncMessageSent = false;
reliableChannel.messageChannel.addEventListener(
@ -292,10 +280,11 @@ describe("Reliable Channel: Sync", () => {
}
);
while (!syncMessageSent) {
// Will send a sync message as soon as it started, we are waiting for this one
await delay(100);
}
// Will send a sync message as soon as it started, we are waiting for this one
await waitForEvent(
reliableChannel.messageChannel,
MessageChannelEvent.OutSyncSent
);
// Let's reset the tracker
syncMessageSent = false;
// We should be faster than automated sync as it will "randomly" wait a full second
@ -327,16 +316,12 @@ describe("Reliable Channel: Sync", () => {
// Send a message to have a history
const sentMsgId = reliableChannel.send(utf8ToBytes("some message"));
let messageSent = false;
reliableChannel.addEventListener("message-sent", (event) => {
if (event.detail === sentMsgId) {
messageSent = true;
}
});
while (!messageSent) {
await delay(50);
}
await waitForEvent<string>(
reliableChannel,
"message-sent",
(id) => id === sentMsgId
);
let syncMessageSent = false;
reliableChannel.messageChannel.addEventListener(
@ -346,10 +331,11 @@ describe("Reliable Channel: Sync", () => {
}
);
while (!syncMessageSent) {
// Will send a sync message as soon as it started, we are waiting for this one
await delay(100);
}
// Will send a sync message as soon as it started, we are waiting for this one
await waitForEvent(
reliableChannel.messageChannel,
MessageChannelEvent.OutSyncSent
);
// Let's reset the tracker
syncMessageSent = false;
// We should be faster than automated sync as it will "randomly" wait a full second

View File

@ -0,0 +1,44 @@
import { TypedEventEmitter } from "@libp2p/interface";
/**
* Helper function to wait for an event with an optional predicate.
* Useful for replacing delay-based polling in tests.
*
* @param emitter - The event emitter to listen to
* @param eventName - The name of the event to wait for
* @param predicate - Optional function to filter events by detail
* @param timeoutMs - Timeout in milliseconds (default: 5000)
* @returns Promise that resolves with the event detail
*
* @example
* ```typescript
* const messageId = await waitForEvent<string>(
* channel,
* "message-acknowledged",
* (id) => id === expectedId
* );
* ```
*/
export function waitForEvent<T>(
emitter: TypedEventEmitter<any>,
eventName: string,
predicate?: (detail: T) => boolean,
timeoutMs: number = 5000
): Promise<T> {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
emitter.removeEventListener(eventName, handler);
reject(new Error(`Timeout waiting for event: ${eventName}`));
}, timeoutMs);
const handler = (event: CustomEvent<T>): void => {
if (!predicate || predicate(event.detail)) {
clearTimeout(timeout);
emitter.removeEventListener(eventName, handler);
resolve(event.detail);
}
};
emitter.addEventListener(eventName, handler);
});
}