use event emitting design instead of callback

This commit is contained in:
fryorcraken 2025-10-03 15:48:15 +10:00
parent 6e84d888c4
commit 7fdd15e725
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
4 changed files with 150 additions and 22 deletions

View File

@ -55,7 +55,12 @@ export interface IWakuEvents {
[WakuEvent.Health]: CustomEvent<HealthStatus>;
}
export interface IMessageEmitterEvents {
[contentTopic: string]: CustomEvent<Uint8Array>;
}
export type IWakuEventEmitter = TypedEventEmitter<IWakuEvents>;
export type IMessageEmitter = TypedEventEmitter<IMessageEmitterEvents>;
export interface IWaku {
libp2p: Libp2p;
@ -79,6 +84,20 @@ export interface IWaku {
*/
events: IWakuEventEmitter;
/**
* Emits messages on their content topic. Messages may be coming from subscriptions
* or store queries (TODO). The payload is directly emitted
*
* @example
* ```typescript
* waku.messageEmitter.addEventListener("/some/0/content-topic/proto", (event) => {
* const payload: UInt8Array = event.detail
* MyDecoder.decode(payload);
* });
* ```
*/
messageEmitter: IMessageEmitter;
/**
* Returns a unique identifier for a node on the network.
*
@ -252,13 +271,7 @@ export interface IWaku {
*/
createEncoder(params: CreateEncoderParams): IEncoder;
subscribe(
contentTopics: ContentTopic[],
callback: (message: {
contentTopic: ContentTopic;
payload: Uint8Array;
}) => void | Promise<void>
): Promise<void>;
subscribe(contentTopics: ContentTopic[]): Promise<void>;
/**
* @returns {boolean} `true` if the node was started and `false` otherwise

View File

@ -6,7 +6,7 @@ import {
} from "@libp2p/interface";
import type { MultiaddrInput } from "@multiformats/multiaddr";
import { ConnectionManager, createDecoder, createEncoder } from "@waku/core";
import type {
import {
ContentTopic,
CreateDecoderParams,
CreateEncoderParams,
@ -16,6 +16,7 @@ import type {
IEncoder,
IFilter,
ILightPush,
IMessageEmitter,
IRelay,
IRoutingInfo,
IStore,
@ -55,6 +56,7 @@ export class WakuNode implements IWaku {
public lightPush?: ILightPush;
public readonly events: IWakuEventEmitter = new TypedEventEmitter();
public readonly messageEmitter: IMessageEmitter = new TypedEventEmitter();
private readonly networkConfig: NetworkConfig;
@ -135,15 +137,9 @@ export class WakuNode implements IWaku {
);
}
public async subscribe(
contentTopics: ContentTopic[],
callback: (message: {
contentTopic: ContentTopic;
payload: Uint8Array;
}) => void | Promise<void>
): Promise<void> {
public async subscribe(contentTopics: ContentTopic[]): Promise<void> {
// Group content topics via routing info in case they spread across several shards
const ctToRouting = new Map();
const ctToRouting: Map<IRoutingInfo, Set<ContentTopic>> = new Map();
for (const contentTopic of contentTopics) {
const routingInfo = this.createRoutingInfo(contentTopic);
pushOrInitMapSet(ctToRouting, routingInfo, contentTopic);
@ -152,12 +148,18 @@ export class WakuNode implements IWaku {
const promises = [];
if (this.filter) {
for (const [routingInfo, contentTopics] of ctToRouting) {
// TODO: Returned bool from subscribe should be used
promises.push(
this.filter.subscribe(contentTopics, routingInfo, callback)
this.filter.subscribe(
Array.from(contentTopics),
routingInfo,
this.emitIncomingMessages.bind(this, Array.from(contentTopics))
)
);
}
await Promise.all(promises);
return;
}
if (this.relay) {
@ -320,4 +322,20 @@ export class WakuNode implements IWaku {
): IRoutingInfo {
return createRoutingInfo(this.networkConfig, { contentTopic, shardId });
}
private emitIncomingMessages(
contentTopics: ContentTopic[],
message: {
contentTopic: ContentTopic;
payload: Uint8Array;
}
): void {
if (contentTopics.includes(message.contentTopic)) {
this.messageEmitter.dispatchEvent(
new CustomEvent<Uint8Array>(message.contentTopic, {
detail: message.payload
})
);
}
}
}

View File

@ -124,14 +124,14 @@ export class ServiceNodesFleet {
}
class MultipleNodesMessageCollector {
public callback: (msg: IDecodedMessage) => void = () => {};
protected messageList: Array<IDecodedMessage> = [];
public callback: (msg: Partial<IDecodedMessage>) => void = () => {};
protected messageList: Array<Partial<IDecodedMessage>> = [];
public constructor(
private messageCollectors: MessageCollector[],
private relayNodes?: ServiceNode[],
private strictChecking: boolean = false
) {
this.callback = (msg: IDecodedMessage): void => {
this.callback = (msg: Partial<IDecodedMessage>): void => {
log.info("Got a message");
this.messageList.push(msg);
};
@ -153,7 +153,9 @@ class MultipleNodesMessageCollector {
}
}
public getMessage(index: number): MessageRpcResponse | IDecodedMessage {
public getMessage(
index: number
): MessageRpcResponse | Partial<IDecodedMessage> {
return this.messageList[index];
}

View File

@ -29,8 +29,11 @@ import {
makeLogFileName,
NOISE_KEY_1,
NOISE_KEY_2,
runMultipleNodes,
ServiceNode,
tearDownNodes
ServiceNodesFleet,
tearDownNodes,
teardownNodesWithRedundancy
} from "../src/index.js";
const TestContentTopic = "/test/1/waku/utf8";
@ -291,3 +294,95 @@ describe("User Agent", function () {
);
});
});
describe("Waku API", function () {
describe("WakuNode.subscribe (light node)", function () {
this.timeout(100000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
const messageText = "some message";
const messagePayload = utf8ToBytes(messageText);
beforeEachCustom(this, async () => {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
TestRoutingInfo,
undefined
);
});
afterEachCustom(this, async () => {
await teardownNodesWithRedundancy(serviceNodes, waku);
});
it("Subscribe and receive messages on 2 different content topics", async function () {
// Subscribe to the first content topic and send a message.
waku.messageEmitter.addEventListener(TestContentTopic, (event) => {
// TODO: fix the callback type
serviceNodes.messageCollector.callback({
contentTopic: TestContentTopic,
payload: event.detail
});
});
await waku.subscribe([TestContentTopic]);
await waku.lightPush.send(TestEncoder, { payload: messagePayload });
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true,
"Waiting for the first message"
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestRoutingInfo.pubsubTopic
});
// Modify subscription to include a new content topic and send a message.
const newMessageText = "Filtering still works!";
const newContentTopic = "/test/2/waku-filter/default";
const newRoutingInfo = createRoutingInfo(DefaultTestNetworkConfig, {
contentTopic: newContentTopic
});
const newEncoder = createPlainEncoder({
contentTopic: newContentTopic,
routingInfo: newRoutingInfo
});
// subscribe to second content topic
waku.messageEmitter.addEventListener(newContentTopic, (event) => {
// TODO: fix the callback type
serviceNodes.messageCollector.callback({
contentTopic: TestContentTopic,
payload: event.detail
});
});
await waku.subscribe([newContentTopic]);
await waku.lightPush.send(newEncoder, {
payload: utf8ToBytes(newMessageText)
});
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true,
"Waiting for the second message"
);
serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedContentTopic: newContentTopic,
expectedMessageText: newMessageText,
expectedPubsubTopic: TestRoutingInfo.pubsubTopic
});
// Send another message on the initial content topic to verify it still works.
const thirdMessageText = "Filtering still works on first subscription!";
const thirdMessagePayload = { payload: utf8ToBytes(thirdMessageText) };
await waku.lightPush.send(TestEncoder, thirdMessagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(3)).to.eq(
true,
"Waiting for the third message"
);
serviceNodes.messageCollector.verifyReceivedMessage(2, {
expectedMessageText: thirdMessageText,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestRoutingInfo.pubsubTopic
});
});
});
});