mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-01-14 03:33:11 +00:00
fix: replace delay with promise to test reliable channel events
This commit is contained in:
parent
b8a9d132c1
commit
8774e942c2
@ -6,18 +6,37 @@ import {
|
||||
IDecoder,
|
||||
IEncoder
|
||||
} from "@waku/interfaces";
|
||||
import {
|
||||
createRoutingInfo,
|
||||
delay,
|
||||
MockWakuEvents,
|
||||
MockWakuNode
|
||||
} from "@waku/utils";
|
||||
import { createRoutingInfo, MockWakuEvents, MockWakuNode } from "@waku/utils";
|
||||
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
|
||||
import { expect } from "chai";
|
||||
import { beforeEach, describe } from "mocha";
|
||||
|
||||
import { ReliableChannel } from "./index.js";
|
||||
|
||||
function waitForEvent<T>(
|
||||
emitter: TypedEventEmitter<any>,
|
||||
eventName: string,
|
||||
predicate?: (detail: T) => boolean,
|
||||
timeoutMs: number = 5000
|
||||
): Promise<T> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const timeout = setTimeout(() => {
|
||||
emitter.removeEventListener(eventName, handler);
|
||||
reject(new Error(`Timeout waiting for event: ${eventName}`));
|
||||
}, timeoutMs);
|
||||
|
||||
const handler = (event: CustomEvent<T>): void => {
|
||||
if (!predicate || predicate(event.detail)) {
|
||||
clearTimeout(timeout);
|
||||
emitter.removeEventListener(eventName, handler);
|
||||
resolve(event.detail);
|
||||
}
|
||||
};
|
||||
|
||||
emitter.addEventListener(eventName, handler);
|
||||
});
|
||||
}
|
||||
|
||||
const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto";
|
||||
const TEST_NETWORK_CONFIG: AutoSharding = {
|
||||
clusterId: 0,
|
||||
@ -64,33 +83,29 @@ describe("Reliable Channel: Acks", () => {
|
||||
// Alice sets up message tracking
|
||||
const messageId = ReliableChannel.getMessageId(message);
|
||||
|
||||
let messageReceived = false;
|
||||
reliableChannelBob.addEventListener("message-received", (event) => {
|
||||
if (bytesToUtf8(event.detail.payload) === "first message in channel") {
|
||||
messageReceived = true;
|
||||
}
|
||||
});
|
||||
const messageReceivedPromise = waitForEvent<IDecodedMessage>(
|
||||
reliableChannelBob,
|
||||
"message-received",
|
||||
(msg) => bytesToUtf8(msg.payload) === "first message in channel"
|
||||
);
|
||||
|
||||
let messageAcknowledged = false;
|
||||
reliableChannelAlice.addEventListener("message-acknowledged", (event) => {
|
||||
if (event.detail === messageId) {
|
||||
messageAcknowledged = true;
|
||||
}
|
||||
});
|
||||
const messageAcknowledgedPromise = waitForEvent<string>(
|
||||
reliableChannelAlice,
|
||||
"message-acknowledged",
|
||||
(id) => id === messageId
|
||||
);
|
||||
|
||||
// Alice sends the message
|
||||
reliableChannelAlice.send(message);
|
||||
|
||||
// Wait for Bob to receive the message to ensure it uses it in causal history
|
||||
while (!messageReceived) {
|
||||
await delay(50);
|
||||
}
|
||||
// Bobs sends a message now, it should include first one in causal history
|
||||
reliableChannelBob.send(utf8ToBytes("second message in channel"));
|
||||
while (!messageAcknowledged) {
|
||||
await delay(50);
|
||||
}
|
||||
await messageReceivedPromise;
|
||||
|
||||
expect(messageAcknowledged).to.be.true;
|
||||
// Bob sends a message now, it should include first one in causal history
|
||||
reliableChannelBob.send(utf8ToBytes("second message in channel"));
|
||||
|
||||
// Wait for Alice to receive acknowledgment
|
||||
await messageAcknowledgedPromise;
|
||||
});
|
||||
|
||||
it("Re-sent message is acknowledged once other parties join.", async () => {
|
||||
@ -115,18 +130,17 @@ describe("Reliable Channel: Acks", () => {
|
||||
// acknowledged in this test.
|
||||
const message = utf8ToBytes("message to be acknowledged");
|
||||
const messageId = ReliableChannel.getMessageId(message);
|
||||
|
||||
let messageAcknowledged = false;
|
||||
reliableChannelAlice.addEventListener("message-acknowledged", (event) => {
|
||||
if (event.detail === messageId) {
|
||||
messageAcknowledged = true;
|
||||
}
|
||||
});
|
||||
|
||||
reliableChannelAlice.send(message);
|
||||
|
||||
// Wait a bit to ensure Bob does not receive the message
|
||||
await delay(100);
|
||||
|
||||
// Now Bob goes online
|
||||
// Now Bob goes online (no need to wait since Bob wasn't online to receive it)
|
||||
const mockWakuNodeBob = new MockWakuNode(commonEventEmitter);
|
||||
const reliableChannelBob = await ReliableChannel.create(
|
||||
mockWakuNodeBob,
|
||||
@ -141,47 +155,51 @@ describe("Reliable Channel: Acks", () => {
|
||||
}
|
||||
);
|
||||
|
||||
// Track when Bob receives the message
|
||||
let bobReceivedMessage = false;
|
||||
reliableChannelBob.addEventListener("message-received", (event) => {
|
||||
if (bytesToUtf8(event.detail.payload!) === "message to be acknowledged") {
|
||||
bobReceivedMessage = true;
|
||||
}
|
||||
});
|
||||
|
||||
// Some sync messages are exchanged
|
||||
await reliableChannelAlice["sendSyncMessage"]();
|
||||
await reliableChannelBob["sendSyncMessage"]();
|
||||
|
||||
// wait a bit to ensure messages are processed
|
||||
await delay(100);
|
||||
// Wait for Bob to receive "some message" to ensure sync messages were processed
|
||||
const bobReceivedSomeMessagePromise = waitForEvent<IDecodedMessage>(
|
||||
reliableChannelBob,
|
||||
"message-received",
|
||||
(msg) => bytesToUtf8(msg.payload) === "some message"
|
||||
);
|
||||
|
||||
// Some content messages are exchanged too
|
||||
reliableChannelAlice.send(utf8ToBytes("some message"));
|
||||
reliableChannelBob.send(utf8ToBytes("some other message"));
|
||||
|
||||
// wait a bit to ensure messages are processed
|
||||
await delay(100);
|
||||
// Wait for the "some message" to be received to ensure messages are processed
|
||||
await bobReceivedSomeMessagePromise;
|
||||
|
||||
// At this point, the message shouldn't be acknowledged yet as Bob
|
||||
// does not have a complete log
|
||||
expect(messageAcknowledged).to.be.false;
|
||||
|
||||
// Now Alice resends the message
|
||||
const bobReceivedMessagePromise = waitForEvent<IDecodedMessage>(
|
||||
reliableChannelBob,
|
||||
"message-received",
|
||||
(msg) => bytesToUtf8(msg.payload) === "message to be acknowledged"
|
||||
);
|
||||
|
||||
reliableChannelAlice.send(message);
|
||||
|
||||
// Wait for Bob to receive the message
|
||||
while (!bobReceivedMessage) {
|
||||
await delay(50);
|
||||
}
|
||||
await bobReceivedMessagePromise;
|
||||
|
||||
// Set up promise waiter for acknowledgment before Bob sends sync
|
||||
const messageAcknowledgedPromise = waitForEvent<string>(
|
||||
reliableChannelAlice,
|
||||
"message-acknowledged",
|
||||
(id) => id === messageId
|
||||
);
|
||||
|
||||
// Bob receives it, and should include it in its sync
|
||||
await reliableChannelBob["sendSyncMessage"]();
|
||||
while (!messageAcknowledged) {
|
||||
await delay(50);
|
||||
}
|
||||
|
||||
// The sync should acknowledge the message
|
||||
expect(messageAcknowledged).to.be.true;
|
||||
// Wait for acknowledgment
|
||||
await messageAcknowledgedPromise;
|
||||
});
|
||||
});
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user