feat: Waku API's subscribe

Introduced a new `subscribe` function to the Waku API.
This commit is contained in:
fryorcraken 2025-10-13 16:35:01 +11:00
parent e92f6a2409
commit 4e6983f6df
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
10 changed files with 452 additions and 73 deletions

View File

@ -105,6 +105,7 @@ export interface IEncoder {
export interface IDecoder<T extends IDecodedMessage> {
contentTopic: string;
pubsubTopic: PubsubTopic;
routingInfo: IRoutingInfo;
fromWireToProtoObj: (bytes: Uint8Array) => Promise<IProtoMessage | undefined>;
fromProtoObj: (
pubsubTopic: string,

View File

@ -11,6 +11,7 @@ import type { HealthStatus } from "./health_status.js";
import type { Libp2p } from "./libp2p.js";
import type { ILightPush } from "./light_push.js";
import { IDecodedMessage, IDecoder, IEncoder } from "./message.js";
import { ContentTopic } from "./misc.js";
import type { Protocols } from "./protocols.js";
import type { IRelay } from "./relay.js";
import type { ShardId } from "./sharding.js";
@ -27,7 +28,8 @@ export type CreateEncoderParams = CreateDecoderParams & {
export enum WakuEvent {
Connection = "waku:connection",
Health = "waku:health"
Health = "waku:health",
SubscribeError = "subscribe:error"
}
export interface IWakuEvents {
@ -52,9 +54,25 @@ export interface IWakuEvents {
* });
*/
[WakuEvent.Health]: CustomEvent<HealthStatus>;
/**
* Emitted when there is an irrecoverable error when subscribing.
*
* @example
* ```typescript
* waku.addEventListener(WakuEvent.SubscribeError, (event) => {
* console.log(event.detail);
* });
*/
[WakuEvent.SubscribeError]: CustomEvent<string>;
}
export interface IMessageEmitterEvents {
[contentTopic: string]: CustomEvent<Uint8Array>;
}
export type IWakuEventEmitter = TypedEventEmitter<IWakuEvents>;
export type IMessageEmitter = TypedEventEmitter<IMessageEmitterEvents>;
export interface IWaku {
libp2p: Libp2p;
@ -78,6 +96,20 @@ export interface IWaku {
*/
events: IWakuEventEmitter;
/**
* Emits messages on their content topic. Messages may be coming from subscriptions
* or store queries (TODO). The payload is directly emitted
*
* @example
* ```typescript
* waku.messageEmitter.addEventListener("/some/0/content-topic/proto", (event) => {
* const payload: UInt8Array = event.detail
* MyDecoder.decode(payload);
* });
* ```
*/
messageEmitter: IMessageEmitter;
/**
* Returns a unique identifier for a node on the network.
*
@ -251,6 +283,8 @@ export interface IWaku {
*/
createEncoder(params: CreateEncoderParams): IEncoder;
subscribe(contentTopics: ContentTopic[]): void;
/**
* @returns {boolean} `true` if the node was started and `false` otherwise
*/

View File

@ -74,3 +74,32 @@ export async function sign(
export function keccak256(input: Uint8Array): Uint8Array {
return new Uint8Array(sha3.keccak256.arrayBuffer(input));
}
/**
* Compare two public keys, can be used to verify that a given signature matches
* expectations.
*
* @param publicKeyA - The first public key to compare
* @param publicKeyB - The second public key to compare
* @returns true if the public keys are the same
*/
export function comparePublicKeys(
publicKeyA: Uint8Array | undefined,
publicKeyB: Uint8Array | undefined
): boolean {
if (!publicKeyA || !publicKeyB) {
return false;
}
if (publicKeyA.length !== publicKeyB.length) {
return false;
}
for (let i = 0; i < publicKeyA.length; i++) {
if (publicKeyA[i] !== publicKeyB[i]) {
return false;
}
}
return true;
}

View File

@ -1,11 +1,17 @@
import {
comparePublicKeys,
generatePrivateKey,
generateSymmetricKey,
getPublicKey
} from "./crypto/index.js";
import { DecodedMessage } from "./decoded_message.js";
export { generatePrivateKey, generateSymmetricKey, getPublicKey };
export {
generatePrivateKey,
generateSymmetricKey,
getPublicKey,
comparePublicKeys
};
export type { DecodedMessage };
export * as ecies from "./ecies.js";

View File

@ -206,3 +206,105 @@ export function createDecoder(
): Decoder {
return new Decoder(contentTopic, routingInfo, symKey);
}
/**
* Result of decrypting a message with AES symmetric encryption.
*/
export interface SymmetricDecryptionResult {
/** The decrypted payload */
payload: Uint8Array;
/** The signature if the message was signed */
signature?: Uint8Array;
/** The recovered public key if the message was signed */
signaturePublicKey?: Uint8Array;
}
/**
* AES symmetric encryption.
*
*
* Follows [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/) encryption standard.
*/
export class SymmetricEncryption {
/**
* Creates an AES Symmetric encryption instance.
*
* @param symKey - The symmetric key for encryption (32 bytes recommended)
* @param sigPrivKey - Optional private key to sign messages before encryption
*/
public constructor(
private symKey: Uint8Array,
private sigPrivKey?: Uint8Array
) {}
/**
* Encrypts a byte array payload.
*
* The encryption process:
* 1. Optionally signs the payload with the private key
* 2. Adds padding to obscure payload size
* 3. Encrypts using AES-256-GCM
*
* @param payload - The data to encrypt
* @returns The encrypted payload
*/
public async encrypt(payload: Uint8Array): Promise<Uint8Array> {
const preparedPayload = await preCipher(payload, this.sigPrivKey);
return encryptSymmetric(preparedPayload, this.symKey);
}
}
/**
* AES symmetric decryption.
*
* Follows [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/) encryption standard.
*/
export class SymmetricDecryption {
/**
* Creates an AES Symmetric decryption instance.
*
* @param symKey - The symmetric key for decryption (must match encryption key)
*/
public constructor(private symKey: Uint8Array) {}
/**
* Decrypts an encrypted byte array payload.
*
* The decryption process:
* 1. Decrypts using AES-256-GCM
* 2. Removes padding
* 3. Verifies and recovers signature if present
*
* @param encryptedPayload - The encrypted data (from [[SymmetricEncryption.encrypt]])
* @returns Object containing the decrypted payload and signature info, or undefined if decryption fails
*/
public async decrypt(
encryptedPayload: Uint8Array
): Promise<SymmetricDecryptionResult | undefined> {
try {
const decryptedData = await decryptSymmetric(
encryptedPayload,
this.symKey
);
if (!decryptedData) {
return undefined;
}
const result = postCipher(decryptedData);
if (!result) {
return undefined;
}
return {
payload: result.payload,
signature: result.sig?.signature,
signaturePublicKey: result.sig?.publicKey
};
} catch (error) {
log.error("Failed to decrypt payload", error);
return undefined;
}
}
}

View File

@ -1,4 +1,4 @@
import { IDecodedMessage, ProtocolError } from "@waku/interfaces";
import { ProtocolError } from "@waku/interfaces";
import type { HistoryEntry, MessageId } from "@waku/sds";
export const ReliableChannelEvent = {
@ -56,8 +56,7 @@ export interface ReliableChannelEvents {
possibleAckCount: number;
}>;
"message-acknowledged": CustomEvent<MessageId>;
// TODO probably T extends IDecodedMessage?
"message-received": CustomEvent<IDecodedMessage>;
"message-received": CustomEvent<Uint8Array>;
"irretrievable-message": CustomEvent<HistoryEntry>;
"sending-message-irrecoverable-error": CustomEvent<{
messageId: MessageId;

View File

@ -1,7 +1,7 @@
import { TypedEventEmitter } from "@libp2p/interface";
import { messageHash } from "@waku/core";
import {
type Callback,
type ContentTopic,
type IDecodedMessage,
type IDecoder,
type IEncoder,
@ -23,6 +23,7 @@ import {
SyncMessage
} from "@waku/sds";
import { Logger } from "@waku/utils";
import { bytesToHex } from "@waku/utils/bytes";
import {
QueryOnConnect,
@ -131,10 +132,7 @@ export class ReliableChannel<
sendOptions?: ISendOptions
) => Promise<LightPushSDKResult>;
private readonly _subscribe: (
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
) => Promise<boolean>;
private readonly _subscribe: (contentTopics: ContentTopic[]) => void;
private readonly _retrieve?: <T extends IDecodedMessage>(
decoders: IDecoder<T>[],
@ -156,6 +154,7 @@ export class ReliableChannel<
public node: IWaku,
public messageChannel: MessageChannel,
private encoder: IEncoder,
// TODO: remove once _retrieve is aligned
private decoder: IDecoder<T>,
options?: ReliableChannelOptions
) {
@ -168,15 +167,7 @@ export class ReliableChannel<
throw "No protocol available to send messages";
}
if (node.filter) {
this._subscribe = node.filter.subscribe.bind(node.filter);
} else if (node.relay) {
// TODO: Why do relay and filter have different interfaces?
// this._subscribe = node.relay.subscribeWithUnsubscribe;
throw "Not implemented";
} else {
throw "No protocol available to receive messages";
}
this._subscribe = node.subscribe.bind(node);
if (node.store) {
this._retrieve = node.store.queryGenerator.bind(node.store);
@ -220,7 +211,7 @@ export class ReliableChannel<
options?.retrieveFrequencyMs,
this._retrieve,
async (msg: T) => {
await this.processIncomingMessage(msg);
await this.processIncomingMessage(msg.payload);
}
);
}
@ -280,7 +271,7 @@ export class ReliableChannel<
const autoStart = options?.autoStart ?? true;
if (autoStart) {
await messageChannel.start();
messageChannel.start();
}
return messageChannel;
@ -324,10 +315,11 @@ export class ReliableChannel<
const messageId = ReliableChannel.getMessageId(messagePayload);
// TODO: should the encoder give me the message hash?
// Encoding now to fail early, used later to get message hash
const protoMessage = await this.encoder.toProtoObj(wakuMessage);
if (!protoMessage) {
const retrievalHint = await computeRetrievalHint(
messagePayload,
this.encoder
);
if (!retrievalHint) {
this.safeSendEvent("sending-message-irrecoverable-error", {
detail: {
messageId: messageId,
@ -336,10 +328,6 @@ export class ReliableChannel<
});
return { success: false };
}
const retrievalHint = messageHash(
this.encoder.pubsubTopic,
protoMessage
);
this.safeSendEvent("sending-message", {
detail: messageId
@ -381,11 +369,14 @@ export class ReliableChannel<
});
}
private async subscribe(): Promise<boolean> {
private subscribe(): void {
this.assertStarted();
return this._subscribe(this.decoder, async (message: T) => {
await this.processIncomingMessage(message);
});
this.node.messageEmitter.addEventListener(
this.decoder.contentTopic,
(event) => void this.processIncomingMessage(event.detail)
);
this._subscribe([this.decoder.contentTopic]);
}
/**
@ -393,14 +384,12 @@ export class ReliableChannel<
* @param msg
* @private
*/
private async processIncomingMessage<T extends IDecodedMessage>(
msg: T
): Promise<void> {
private async processIncomingMessage(payload: Uint8Array): Promise<void> {
// New message arrives, we need to unwrap it first
const sdsMessage = SdsMessage.decode(msg.payload);
const sdsMessage = SdsMessage.decode(payload);
if (!sdsMessage) {
log.error("could not SDS decode message", msg);
log.error("could not SDS decode message");
return;
}
@ -412,8 +401,14 @@ export class ReliableChannel<
return;
}
const retrievalHint = msg.hash;
log.info(`processing message ${sdsMessage.messageId}:${msg.hashStr}`);
const retrievalHint = await computeRetrievalHint(payload, this.encoder);
if (!retrievalHint) {
log.error("could not compute retrieval hint");
return;
}
log.info(
`processing message ${sdsMessage.messageId}:${bytesToHex(retrievalHint)}`
);
// SDS Message decoded, let's pass it to the channel so we can learn about
// missing messages or the status of previous outgoing messages
this.messageChannel.pushIncomingMessage(sdsMessage, retrievalHint);
@ -422,25 +417,8 @@ export class ReliableChannel<
if (sdsMessage.content && sdsMessage.content.length > 0) {
// Now, process the message with callback
// Overrides msg.payload with unwrapped payload
// TODO: can we do better?
const { payload: _p, ...allButPayload } = msg;
const unwrappedMessage = Object.assign(allButPayload, {
payload: sdsMessage.content,
hash: msg.hash,
hashStr: msg.hashStr,
version: msg.version,
contentTopic: msg.contentTopic,
pubsubTopic: msg.pubsubTopic,
timestamp: msg.timestamp,
rateLimitProof: msg.rateLimitProof,
ephemeral: msg.ephemeral,
meta: msg.meta
});
this.safeSendEvent("message-received", {
detail: unwrappedMessage as unknown as T
detail: sdsMessage.content
});
}
@ -451,7 +429,7 @@ export class ReliableChannel<
messages: T[]
): Promise<void> {
for (const message of messages) {
await this.processIncomingMessage(message);
await this.processIncomingMessage(message.payload);
}
}
@ -472,8 +450,8 @@ export class ReliableChannel<
}
}
public async start(): Promise<boolean> {
if (this._started) return true;
public start(): void {
if (this._started) return;
this._started = true;
this.setupEventListeners();
this.restartSync();
@ -482,7 +460,7 @@ export class ReliableChannel<
this.missingMessageRetriever?.start();
this.queryOnConnect?.start();
}
return this.subscribe();
this.subscribe();
}
public stop(): void {
@ -689,3 +667,16 @@ export class ReliableChannel<
}
}
}
async function computeRetrievalHint(
payload: Uint8Array,
encoder: IEncoder
): Promise<Uint8Array | undefined> {
// TODO: should the encoder give me the message hash?
// Encoding now to fail early, used later to get message hash
const protoMessage = await encoder.toProtoObj({ payload });
if (!protoMessage) {
return undefined;
}
return messageHash(encoder.pubsubTopic, protoMessage);
}

View File

@ -5,8 +5,14 @@ import {
TypedEventEmitter
} from "@libp2p/interface";
import type { MultiaddrInput } from "@multiformats/multiaddr";
import { ConnectionManager, createDecoder, createEncoder } from "@waku/core";
import type {
import {
ConnectionManager,
createDecoder,
createEncoder,
DecodedMessage
} from "@waku/core";
import {
ContentTopic,
CreateDecoderParams,
CreateEncoderParams,
CreateNodeOptions,
@ -15,20 +21,22 @@ import type {
IEncoder,
IFilter,
ILightPush,
IMessageEmitter,
IRelay,
IRoutingInfo,
IStore,
IWaku,
IWakuEventEmitter,
Libp2p,
NetworkConfig
NetworkConfig,
PubsubTopic
} from "@waku/interfaces";
import {
DefaultNetworkConfig,
HealthStatus,
Protocols
} from "@waku/interfaces";
import { createRoutingInfo, Logger } from "@waku/utils";
import { createRoutingInfo, Logger, pushOrInitMapSet } from "@waku/utils";
import { Filter } from "../filter/index.js";
import { HealthIndicator } from "../health_indicator/index.js";
@ -54,6 +62,7 @@ export class WakuNode implements IWaku {
public lightPush?: ILightPush;
public readonly events: IWakuEventEmitter = new TypedEventEmitter();
public readonly messageEmitter: IMessageEmitter = new TypedEventEmitter();
private readonly networkConfig: NetworkConfig;
@ -134,6 +143,41 @@ export class WakuNode implements IWaku {
);
}
public subscribe(contentTopics: ContentTopic[]): void {
// Group decoders by pubsubTopics in case they spread across several shards
const ctToDecoders: Map<
PubsubTopic,
Set<IDecoder<DecodedMessage>>
> = new Map();
for (const contentTopic of contentTopics) {
const decoder = this.createDecoder({ contentTopic });
pushOrInitMapSet(ctToDecoders, decoder.pubsubTopic, decoder);
}
if (this.filter) {
for (const [_, decoders] of ctToDecoders) {
void this.filter
.subscribe(
Array.from(decoders),
this.emitIncomingMessages.bind(this, Array.from(contentTopics))
)
.then((_) => {
// TODO: emit irrecoverable errors
})
.catch((_) => {
// TODO: emit irrecoverable errors
});
}
return;
}
if (this.relay) {
throw "not implemented";
}
throw "no subscribe protocol available";
}
public get peerId(): PeerId {
return this.libp2p.peerId;
}
@ -288,4 +332,20 @@ export class WakuNode implements IWaku {
): IRoutingInfo {
return createRoutingInfo(this.networkConfig, { contentTopic, shardId });
}
private emitIncomingMessages(
contentTopics: ContentTopic[],
message: {
contentTopic: ContentTopic;
payload: Uint8Array;
}
): void {
if (contentTopics.includes(message.contentTopic)) {
this.messageEmitter.dispatchEvent(
new CustomEvent<Uint8Array>(message.contentTopic, {
detail: message.payload
})
);
}
}
}

View File

@ -124,14 +124,14 @@ export class ServiceNodesFleet {
}
class MultipleNodesMessageCollector {
public callback: (msg: IDecodedMessage) => void = () => {};
protected messageList: Array<IDecodedMessage> = [];
public callback: (msg: Partial<IDecodedMessage>) => void = () => {};
protected messageList: Array<Partial<IDecodedMessage>> = [];
public constructor(
private messageCollectors: MessageCollector[],
private relayNodes?: ServiceNode[],
private strictChecking: boolean = false
) {
this.callback = (msg: IDecodedMessage): void => {
this.callback = (msg: Partial<IDecodedMessage>): void => {
log.info("Got a message");
this.messageList.push(msg);
};
@ -153,7 +153,9 @@ class MultipleNodesMessageCollector {
}
}
public getMessage(index: number): MessageRpcResponse | IDecodedMessage {
public getMessage(
index: number
): MessageRpcResponse | Partial<IDecodedMessage> {
return this.messageList[index];
}

View File

@ -7,10 +7,17 @@ import type {
RelayNode
} from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { generateSymmetricKey } from "@waku/message-encryption";
import {
comparePublicKeys,
generatePrivateKey,
generateSymmetricKey,
getPublicKey
} from "@waku/message-encryption";
import {
createDecoder,
createEncoder
createEncoder,
SymmetricDecryption,
SymmetricDecryptionResult
} from "@waku/message-encryption/symmetric";
import { createRelayNode } from "@waku/relay";
import {
@ -29,8 +36,11 @@ import {
makeLogFileName,
NOISE_KEY_1,
NOISE_KEY_2,
runMultipleNodes,
ServiceNode,
tearDownNodes
ServiceNodesFleet,
tearDownNodes,
teardownNodesWithRedundancy
} from "../src/index.js";
const TestContentTopic = "/test/1/waku/utf8";
@ -291,3 +301,148 @@ describe("User Agent", function () {
);
});
});
describe("Waku API", function () {
describe("WakuNode.subscribe (light node)", function () {
this.timeout(100000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
const messageText = "some message";
const messagePayload = utf8ToBytes(messageText);
beforeEachCustom(this, async () => {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
TestRoutingInfo,
undefined
);
});
afterEachCustom(this, async () => {
await teardownNodesWithRedundancy(serviceNodes, waku);
});
it("Subscribe and receive messages on 2 different content topics", async function () {
// Subscribe to the first content topic and send a message.
waku.messageEmitter.addEventListener(TestContentTopic, (event) => {
// TODO: fix the callback type
serviceNodes.messageCollector.callback({
contentTopic: TestContentTopic,
payload: event.detail
});
});
waku.subscribe([TestContentTopic]);
await waku.lightPush.send(TestEncoder, { payload: messagePayload });
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true,
"Waiting for the first message"
);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestRoutingInfo.pubsubTopic
});
// Modify subscription to include a new content topic and send a message.
const newMessageText = "Filtering still works!";
const newContentTopic = "/test/2/waku-filter/default";
const newRoutingInfo = createRoutingInfo(DefaultTestNetworkConfig, {
contentTopic: newContentTopic
});
const newEncoder = createPlainEncoder({
contentTopic: newContentTopic,
routingInfo: newRoutingInfo
});
// subscribe to second content topic
waku.messageEmitter.addEventListener(newContentTopic, (event) => {
// TODO: fix the callback type
serviceNodes.messageCollector.callback({
contentTopic: TestContentTopic,
payload: event.detail
});
});
waku.subscribe([newContentTopic]);
await waku.lightPush.send(newEncoder, {
payload: utf8ToBytes(newMessageText)
});
expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
true,
"Waiting for the second message"
);
serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedContentTopic: newContentTopic,
expectedMessageText: newMessageText,
expectedPubsubTopic: TestRoutingInfo.pubsubTopic
});
// Send another message on the initial content topic to verify it still works.
const thirdMessageText = "Filtering still works on first subscription!";
const thirdMessagePayload = { payload: utf8ToBytes(thirdMessageText) };
await waku.lightPush.send(TestEncoder, thirdMessagePayload);
expect(await serviceNodes.messageCollector.waitForMessages(3)).to.eq(
true,
"Waiting for the third message"
);
serviceNodes.messageCollector.verifyReceivedMessage(2, {
expectedMessageText: thirdMessageText,
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestRoutingInfo.pubsubTopic
});
});
it("Subscribe and receive messages encrypted with AES", async function () {
const symKey = generateSymmetricKey();
const senderPrivKey = generatePrivateKey();
// TODO: For now, still using encoder
const newEncoder = createEncoder({
contentTopic: TestContentTopic,
routingInfo: TestRoutingInfo,
symKey,
sigPrivKey: senderPrivKey
});
// Setup payload decryption
const symDecryption = new SymmetricDecryption(symKey);
// subscribe to second content topic
waku.messageEmitter.addEventListener(TestContentTopic, (event) => {
const encryptedPayload = event.detail;
void symDecryption
.decrypt(encryptedPayload)
.then((decryptionResult: SymmetricDecryptionResult | undefined) => {
if (!decryptionResult) return;
serviceNodes.messageCollector.callback({
contentTopic: TestContentTopic,
payload: decryptionResult.payload
});
// TODO: probably best to adapt the message collector
expect(decryptionResult?.signature).to.not.be.undefined;
expect(
comparePublicKeys(
getPublicKey(senderPrivKey),
decryptionResult?.signaturePublicKey
)
);
// usually best to ignore decryption failure
});
});
waku.subscribe([TestContentTopic]);
await waku.lightPush.send(newEncoder, {
payload: utf8ToBytes(messageText)
});
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
true,
"Waiting for the message"
);
serviceNodes.messageCollector.verifyReceivedMessage(1, {
expectedContentTopic: TestContentTopic,
expectedMessageText: messageText,
expectedPubsubTopic: TestRoutingInfo.pubsubTopic
});
});
});
});