logos-messaging-js/packages/tests/src/lib/message_collector.ts
Sasha f3627c46a4
feat!: use ShardingParams on subscriptions, make Decoder/Encoder auto sharding friendly by default (#1958)
* fix: use pubsubTopic from current ones if not set

* fix: improve type on dial method

* enforce same pubusb on filter.subscribe, make content topic to pubsub mapping default for decoder / encoder

* fix mapping problem

* update tests

* add error handling

* fix typo

* up lock

* rm lock

* up lock

* remove only

* fix content topic

* fix ephemeral test

* fix filter unsubscribe test

* up utils

* fix subscribe test

* up interfaces and filter api

* remove only

* up ping test

* fix subscribe test

* fix push test

* fix lightPush

* fix multiple pubsub

* remove only, fix subscribe filter test

* remove only

* fix cluster ID selection and named sharding subscription test

* fix unsubscribe test

* fix light push test

* fix light push test

* fix push test

* fix relay publish

* create runNode and fix relay tests

* generalize runNodes, fix some tests

* fix store tests

* fix toAsyncIterator tests

* remove only

* fix lightPush

* use generics

* try fix test

* run failing tests

* remove only

* address failed tests, remove DefaultPubsubTopic dependency in some tests
2024-04-28 11:15:17 +02:00

275 lines
8.1 KiB
TypeScript

import { DecodedMessage } from "@waku/core";
import { DefaultPubsubTopic } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
import { AssertionError, expect } from "chai";
import { equals } from "uint8arrays/equals";
import { MessageRpcResponse } from "../types.js";
import { base64ToUtf8 } from "../utils/base64_utf8.js";
import { delay } from "../utils/delay.js";
import { ServiceNode } from "./service_node.js";
const log = new Logger("test:message-collector");
/**
* Class responsible for collecting messages.
* It provides utility methods to interact with the collected messages,
* and offers a way to wait for incoming messages.
*/
export class MessageCollector {
list: Array<MessageRpcResponse | DecodedMessage> = [];
callback: (msg: DecodedMessage) => void = () => {};
constructor(private nwaku?: ServiceNode) {
if (!this.nwaku) {
this.callback = (msg: DecodedMessage): void => {
log.info("Got a message");
this.list.push(msg);
};
}
}
get count(): number {
return this.list.length;
}
getMessage(index: number): MessageRpcResponse | DecodedMessage {
return this.list[index];
}
hasMessage(topic: string, text: string): boolean {
return this.list.some((message) => {
if (message.contentTopic !== topic) {
return false;
}
if (typeof message.payload === "string") {
return message.payload === text;
} else if (message.payload instanceof Uint8Array) {
log.info(`Checking payload: ${bytesToUtf8(message.payload)}`);
return equals(message.payload, utf8ToBytes(text));
}
return false;
});
}
// Type guard to determine if a message is of type MessageRpcResponse
isMessageRpcResponse(
message: MessageRpcResponse | DecodedMessage
): message is MessageRpcResponse {
return (
("payload" in message && typeof message.payload === "string") ||
!!this.nwaku
);
}
async waitForMessages(
numMessages: number,
options?: {
pubsubTopic?: string;
timeoutDuration?: number;
exact?: boolean;
}
): Promise<boolean> {
const startTime = Date.now();
const pubsubTopic = this.getPubsubTopicToUse(options?.pubsubTopic);
const timeoutDuration = options?.timeoutDuration || 400;
const exact = options?.exact || false;
while (this.count < numMessages) {
if (this.nwaku) {
try {
this.list = await this.nwaku.messages(pubsubTopic);
} catch (error) {
log.error(`Can't retrieve messages because of ${error}`);
await delay(10);
}
}
if (Date.now() - startTime > timeoutDuration * numMessages) {
return false;
}
await delay(10);
}
if (exact) {
if (this.count == numMessages) {
return true;
} else {
log.warn(`Was expecting exactly ${numMessages} messages`);
return false;
}
} else {
return true;
}
}
async waitForMessagesAutosharding(
numMessages: number,
options?: {
contentTopic: string;
timeoutDuration?: number;
exact?: boolean;
}
): Promise<boolean> {
const startTime = Date.now();
const timeoutDuration = options?.timeoutDuration || 400;
const exact = options?.exact || false;
while (this.count < numMessages) {
if (this.nwaku) {
try {
this.list = await this.nwaku.messagesAutosharding(
options!.contentTopic
);
} catch (error) {
log.error(`Can't retrieve messages because of ${error}`);
await delay(10);
}
}
if (Date.now() - startTime > timeoutDuration * numMessages) {
return false;
}
await delay(10);
}
if (exact) {
if (this.count == numMessages) {
return true;
} else {
log.warn(`Was expecting exactly ${numMessages} messages`);
return false;
}
} else {
return true;
}
}
// Verifies a received message against expected values.
verifyReceivedMessage(
index: number,
options: {
expectedMessageText: string | Uint8Array | undefined;
expectedContentTopic?: string;
expectedPubsubTopic?: string;
expectedVersion?: number;
expectedMeta?: Uint8Array;
expectedEphemeral?: boolean;
expectedTimestamp?: bigint | number;
checkTimestamp?: boolean; // Used to determine if we need to check the timestamp
}
): void {
expect(this.list.length).to.be.greaterThan(
index,
`The message list does not have a message at index ${index}`
);
const message = this.getMessage(index);
expect(message.contentTopic).to.eq(
options.expectedContentTopic,
`Message content topic mismatch. Expected: ${options.expectedContentTopic}. Got: ${message.contentTopic}`
);
expect(message.version).to.eq(
options.expectedVersion || 0,
`Message version mismatch. Expected: ${
options.expectedVersion || 0
}. Got: ${message.version}`
);
if (message.ephemeral !== undefined) {
expect(message.ephemeral).to.eq(
options.expectedEphemeral !== undefined
? options.expectedEphemeral
: false,
`Message ephemeral value mismatch. Expected: ${
options.expectedEphemeral !== undefined
? options.expectedEphemeral
: false
}. Got: ${message.ephemeral}`
);
}
const shouldCheckTimestamp =
options.checkTimestamp !== undefined ? options.checkTimestamp : true;
if (shouldCheckTimestamp && message.timestamp) {
// In we send timestamp in the request we assert that it matches the timestamp in the response +- 1 sec
// We take the 1s deviation because there are some ms diffs in timestamps, probably because of conversions
let timestampAsNumber: number;
if (message.timestamp instanceof Date) {
timestampAsNumber = message.timestamp.getTime();
} else {
timestampAsNumber = Number(message.timestamp) / 1_000_000;
}
let lowerBound: number;
let upperBound: number;
// Define the bounds based on the expectedTimestamp
if (options.expectedTimestamp !== undefined) {
lowerBound = Number(options.expectedTimestamp) - 1000;
upperBound = Number(options.expectedTimestamp) + 1000;
} else {
upperBound = Date.now();
lowerBound = upperBound - 10000;
}
if (timestampAsNumber < lowerBound || timestampAsNumber > upperBound) {
throw new AssertionError(
`Message timestamp not within the expected range. Expected between: ${lowerBound} and ${upperBound}. Got: ${timestampAsNumber}`
);
}
}
if (this.isMessageRpcResponse(message)) {
// nwaku message specific assertions
const receivedMessageText = message.payload
? base64ToUtf8(message.payload)
: undefined;
expect(receivedMessageText).to.eq(
options.expectedMessageText,
`Message text mismatch. Expected: ${options.expectedMessageText}. Got: ${receivedMessageText}`
);
} else {
const pubsubTopicToUse = this.getPubsubTopicToUse(
options.expectedPubsubTopic
);
// js-waku message specific assertions
expect(message.pubsubTopic).to.eq(
pubsubTopicToUse,
`Message pub/sub topic mismatch. Expected: ${pubsubTopicToUse}. Got: ${message.pubsubTopic}`
);
expect(bytesToUtf8(message.payload)).to.eq(
options.expectedMessageText,
`Message text mismatch. Expected: ${
options.expectedMessageText
}. Got: ${bytesToUtf8(message.payload)}`
);
expect([
options.expectedMeta,
undefined,
new Uint8Array(0)
]).to.deep.include(
message.meta,
`Message meta mismatch. Expected: ${
options.expectedMeta
? JSON.stringify(options.expectedMeta)
: "undefined or " + JSON.stringify(new Uint8Array(0))
}. Got: ${JSON.stringify(message.meta)}`
);
}
}
private getPubsubTopicToUse(pubsubTopic: string | undefined): string {
return pubsubTopic || this.nwaku?.pubsubTopics?.[0] || DefaultPubsubTopic;
}
}