From 16253026c6e30052d87d9975b58480951de469d8 Mon Sep 17 00:00:00 2001 From: Arseniy Klempner Date: Thu, 4 Sep 2025 15:52:37 -0700 Subject: [PATCH] feat: implement lp-v3 error codes with backwards compatibility (#2501) * feat: implement LightPush v3 protocol support Add comprehensive LightPush v3 protocol implementation with: Core Features: - LightPush v3 protocol codec and multicodec detection - Status code-based error handling and validation - Protocol version inference and compatibility layers - Enhanced error types with detailed failure information Protocol Support: - Automatic v3/v2 protocol negotiation and fallback - Status code mapping to LightPush error types - Protocol version tracking in SDK results - Mixed protocol environment support Testing Infrastructure: - Comprehensive v3 error code handling tests - Mock functions for v3/v2 response scenarios - Protocol version detection and validation tests - Backward compatibility verification Implementation Details: - Clean separation between v2 and v3 response handling - Type-safe status code validation with isSuccess helper - Enhanced failure reporting with protocol version context - Proper error propagation through SDK layers This implementation maintains full backward compatibility with v2 while providing enhanced functionality for v3 protocol features. * feat: handle both light push protocols * fix: unsubscribe test * feat: consolidate lpv2/v3 types * feat(tests): bump nwaku to 0.36.0 * fix: remove extraneous exports * fix: add delay to tests * fix: remove protocol result types * feat: consolidate light push codec branching * fix: revert nwaku image * fix: remove multicodec * fix: remove protocolversion * feat: simplify v2/v3 branching logic to use two stream managers * fix: remove unused utils * fix: remove comments * fix: revert store test * fix: cleanup lightpush sdk * fix: remove unused util * fix: remove unused exports * fix: rename file from public to protocol_handler * fix: use proper type for sdk result * fix: update return types in filter * fix: rebase against latest master * fix: use both lightpush codecs when waiting for peer * fix: handle both lp codecs * fix: remove unused code * feat: use array for multicodec fields * fix: add timestamp if missing in v3 rpc * fix: resolve on either lp codec when waiting for peer * fix: remove unused util * fix: remove unnecessary abstraction * feat: accept nwaku docker image as arg, test lp backwards compat * fix: revert filter error * feat: add legacy flag to enable lightpushv2 only * Revert "feat: accept nwaku docker image as arg, test lp backwards compat" This reverts commit 857e12cbc73305e5c51abd057665bd34708b2737. * fix: remove unused test * feat: improve lp3 (#2597) * improve light push core * move back to singualar multicodec property, enable array prop only for light push * implement v2/v3 interop e2e test, re-add useLegacy flag, ensure e2e runs for v2 and v3 * fix v2 v3 condition * generate message package earlier * add log, fix condition --------- Co-authored-by: Sasha <118575614+weboko@users.noreply.github.com> Co-authored-by: Sasha --- packages/core/src/index.ts | 6 +- packages/core/src/lib/filter/filter.ts | 36 ++-- packages/core/src/lib/light_push/constants.ts | 7 + packages/core/src/lib/light_push/index.ts | 3 +- .../core/src/lib/light_push/light_push.ts | 194 ++++++++---------- .../src/lib/light_push/protocol_handler.ts | 191 +++++++++++++++++ packages/core/src/lib/light_push/push_rpc.ts | 10 +- .../core/src/lib/light_push/push_rpc_v3.ts | 162 +++++++++++++++ .../src/lib/stream_manager/stream_manager.ts | 2 +- packages/interfaces/src/light_push.ts | 40 +++- packages/interfaces/src/protocols.ts | 186 +++++++++-------- packages/interfaces/src/sender.ts | 11 +- packages/proto/src/lib/light_push.proto | 2 +- packages/relay/src/relay.ts | 14 +- .../sdk/src/light_push/light_push.spec.ts | 154 +++++++++++++- packages/sdk/src/light_push/light_push.ts | 44 ++-- .../sdk/src/light_push/retry_manager.spec.ts | 23 ++- packages/sdk/src/light_push/retry_manager.ts | 12 +- packages/sdk/src/light_push/utils.ts | 12 +- .../sdk/src/peer_manager/peer_manager.spec.ts | 3 +- packages/sdk/src/peer_manager/peer_manager.ts | 44 ++-- .../sdk/src/waku/wait_for_remote_peer.spec.ts | 28 ++- packages/sdk/src/waku/wait_for_remote_peer.ts | 38 +++- packages/sdk/src/waku/waku.ts | 2 +- .../tests/tests/light-push/index.node.spec.ts | 19 +- .../light-push/multiple_pubsub.node.spec.ts | 84 ++++---- .../tests/tests/light-push/v2_interop.spec.ts | 83 ++++++++ 27 files changed, 1049 insertions(+), 361 deletions(-) create mode 100644 packages/core/src/lib/light_push/constants.ts create mode 100644 packages/core/src/lib/light_push/protocol_handler.ts create mode 100644 packages/core/src/lib/light_push/push_rpc_v3.ts create mode 100644 packages/tests/tests/light-push/v2_interop.spec.ts diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index c8ac89ffe5..8021ac0a91 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -10,7 +10,11 @@ export * as waku_filter from "./lib/filter/index.js"; export { FilterCore, FilterCodecs } from "./lib/filter/index.js"; export * as waku_light_push from "./lib/light_push/index.js"; -export { LightPushCodec, LightPushCore } from "./lib/light_push/index.js"; +export { + LightPushCore, + LightPushCodec, + LightPushCodecV2 +} from "./lib/light_push/index.js"; export * as waku_store from "./lib/store/index.js"; export { StoreCore, StoreCodec } from "./lib/store/index.js"; diff --git a/packages/core/src/lib/filter/filter.ts b/packages/core/src/lib/filter/filter.ts index f76203fbfb..0b8a32b92e 100644 --- a/packages/core/src/lib/filter/filter.ts +++ b/packages/core/src/lib/filter/filter.ts @@ -2,9 +2,9 @@ import type { PeerId } from "@libp2p/interface"; import type { IncomingStreamData } from "@libp2p/interface-internal"; import { type ContentTopic, - type CoreProtocolResult, + type FilterCoreResult, + FilterError, type Libp2p, - ProtocolError, type PubsubTopic } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; @@ -72,14 +72,14 @@ export class FilterCore { pubsubTopic: PubsubTopic, peerId: PeerId, contentTopics: ContentTopic[] - ): Promise { + ): Promise { const stream = await this.streamManager.getStream(peerId); if (!stream) { return { success: null, failure: { - error: ProtocolError.NO_STREAM_AVAILABLE, + error: FilterError.NO_STREAM_AVAILABLE, peerId: peerId } }; @@ -108,7 +108,7 @@ export class FilterCore { return { success: null, failure: { - error: ProtocolError.GENERIC_FAIL, + error: FilterError.GENERIC_FAIL, peerId: peerId } }; @@ -123,7 +123,7 @@ export class FilterCore { ); return { failure: { - error: ProtocolError.REMOTE_PEER_REJECTED, + error: FilterError.REMOTE_PEER_REJECTED, peerId: peerId }, success: null @@ -140,7 +140,7 @@ export class FilterCore { pubsubTopic: PubsubTopic, peerId: PeerId, contentTopics: ContentTopic[] - ): Promise { + ): Promise { const stream = await this.streamManager.getStream(peerId); if (!stream) { @@ -148,7 +148,7 @@ export class FilterCore { return { success: null, failure: { - error: ProtocolError.NO_STREAM_AVAILABLE, + error: FilterError.NO_STREAM_AVAILABLE, peerId: peerId } }; @@ -166,7 +166,7 @@ export class FilterCore { return { success: null, failure: { - error: ProtocolError.GENERIC_FAIL, + error: FilterError.GENERIC_FAIL, peerId: peerId } }; @@ -181,7 +181,7 @@ export class FilterCore { public async unsubscribeAll( pubsubTopic: PubsubTopic, peerId: PeerId - ): Promise { + ): Promise { const stream = await this.streamManager.getStream(peerId); if (!stream) { @@ -189,7 +189,7 @@ export class FilterCore { return { success: null, failure: { - error: ProtocolError.NO_STREAM_AVAILABLE, + error: FilterError.NO_STREAM_AVAILABLE, peerId: peerId } }; @@ -208,7 +208,7 @@ export class FilterCore { if (!res || !res.length) { return { failure: { - error: ProtocolError.NO_RESPONSE, + error: FilterError.NO_RESPONSE, peerId: peerId }, success: null @@ -224,7 +224,7 @@ export class FilterCore { ); return { failure: { - error: ProtocolError.REMOTE_PEER_REJECTED, + error: FilterError.REMOTE_PEER_REJECTED, peerId: peerId }, success: null @@ -237,7 +237,7 @@ export class FilterCore { }; } - public async ping(peerId: PeerId): Promise { + public async ping(peerId: PeerId): Promise { const stream = await this.streamManager.getStream(peerId); if (!stream) { @@ -245,7 +245,7 @@ export class FilterCore { return { success: null, failure: { - error: ProtocolError.NO_STREAM_AVAILABLE, + error: FilterError.NO_STREAM_AVAILABLE, peerId: peerId } }; @@ -267,7 +267,7 @@ export class FilterCore { return { success: null, failure: { - error: ProtocolError.GENERIC_FAIL, + error: FilterError.GENERIC_FAIL, peerId: peerId } }; @@ -277,7 +277,7 @@ export class FilterCore { return { success: null, failure: { - error: ProtocolError.NO_RESPONSE, + error: FilterError.NO_RESPONSE, peerId: peerId } }; @@ -293,7 +293,7 @@ export class FilterCore { return { success: null, failure: { - error: ProtocolError.REMOTE_PEER_REJECTED, + error: FilterError.REMOTE_PEER_REJECTED, peerId: peerId } }; diff --git a/packages/core/src/lib/light_push/constants.ts b/packages/core/src/lib/light_push/constants.ts new file mode 100644 index 0000000000..339e5f9090 --- /dev/null +++ b/packages/core/src/lib/light_push/constants.ts @@ -0,0 +1,7 @@ +export const CODECS = { + v2: "/vac/waku/lightpush/2.0.0-beta1", + v3: "/vac/waku/lightpush/3.0.0" +} as const; + +export const LightPushCodecV2 = CODECS.v2; +export const LightPushCodec = CODECS.v3; diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index 4c5c37dccb..87655dbb35 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -1 +1,2 @@ -export { LightPushCore, LightPushCodec, PushResponse } from "./light_push.js"; +export { LightPushCore } from "./light_push.js"; +export { LightPushCodec, LightPushCodecV2 } from "./constants.js"; diff --git a/packages/core/src/lib/light_push/light_push.ts b/packages/core/src/lib/light_push/light_push.ts index 6de027121b..eb3b517eeb 100644 --- a/packages/core/src/lib/light_push/light_push.ts +++ b/packages/core/src/lib/light_push/light_push.ts @@ -1,14 +1,11 @@ -import type { PeerId } from "@libp2p/interface"; +import type { PeerId, Stream } from "@libp2p/interface"; import { - type CoreProtocolResult, type IEncoder, type IMessage, type Libp2p, - ProtocolError, - type ThisOrThat + type LightPushCoreResult, + LightPushError } from "@waku/interfaces"; -import { PushResponse } from "@waku/proto"; -import { isMessageSizeUnderCap } from "@waku/utils"; import { Logger } from "@waku/utils"; import all from "it-all"; import * as lp from "it-length-prefixed"; @@ -17,92 +14,71 @@ import { Uint8ArrayList } from "uint8arraylist"; import { StreamManager } from "../stream_manager/index.js"; -import { PushRpc } from "./push_rpc.js"; -import { isRLNResponseError } from "./utils.js"; +import { CODECS } from "./constants.js"; +import { ProtocolHandler } from "./protocol_handler.js"; const log = new Logger("light-push"); -export const LightPushCodec = "/vac/waku/lightpush/2.0.0-beta1"; -export { PushResponse }; - -type PreparePushMessageResult = ThisOrThat<"query", PushRpc>; - /** * Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/). */ export class LightPushCore { private readonly streamManager: StreamManager; + private readonly streamManagerV2: StreamManager; - public readonly multicodec = LightPushCodec; + public readonly multicodec = [CODECS.v3, CODECS.v2]; - public constructor(libp2p: Libp2p) { - this.streamManager = new StreamManager(LightPushCodec, libp2p.components); - } - - private async preparePushMessage( - encoder: IEncoder, - message: IMessage - ): Promise { - try { - if (!message.payload || message.payload.length === 0) { - log.error("Failed to send waku light push: payload is empty"); - return { query: null, error: ProtocolError.EMPTY_PAYLOAD }; - } - - if (!(await isMessageSizeUnderCap(encoder, message))) { - log.error("Failed to send waku light push: message is bigger than 1MB"); - return { query: null, error: ProtocolError.SIZE_TOO_BIG }; - } - - const protoMessage = await encoder.toProtoObj(message); - if (!protoMessage) { - log.error("Failed to encode to protoMessage, aborting push"); - return { - query: null, - error: ProtocolError.ENCODE_FAILED - }; - } - - const query = PushRpc.createRequest(protoMessage, encoder.pubsubTopic); - return { query, error: null }; - } catch (error) { - log.error("Failed to prepare push message", error); - - return { - query: null, - error: ProtocolError.GENERIC_FAIL - }; - } + public constructor(private libp2p: Libp2p) { + this.streamManagerV2 = new StreamManager(CODECS.v2, libp2p.components); + this.streamManager = new StreamManager(CODECS.v3, libp2p.components); } public async send( encoder: IEncoder, message: IMessage, - peerId: PeerId - ): Promise { - const { query, error: preparationError } = await this.preparePushMessage( - encoder, - message + peerId: PeerId, + useLegacy: boolean = false + ): Promise { + const protocol = await this.getProtocol(peerId, useLegacy); + + log.info( + `Sending light push request to peer:${peerId.toString()}, protocol:${protocol}` ); - if (preparationError || !query) { + if (!protocol) { return { success: null, failure: { - error: preparationError, + error: LightPushError.GENERIC_FAIL, peerId } }; } - const stream = await this.streamManager.getStream(peerId); + const { rpc, error: prepError } = await ProtocolHandler.preparePushMessage( + encoder, + message, + protocol + ); + + if (prepError) { + return { + success: null, + failure: { + error: prepError, + peerId + } + }; + } + + const stream = await this.getStream(peerId, protocol); if (!stream) { log.error(`Failed to get a stream for remote peer:${peerId.toString()}`); return { success: null, failure: { - error: ProtocolError.NO_STREAM_AVAILABLE, + error: LightPushError.NO_STREAM_AVAILABLE, peerId: peerId } }; @@ -111,76 +87,74 @@ export class LightPushCore { let res: Uint8ArrayList[] | undefined; try { res = await pipe( - [query.encode()], + [rpc.encode()], lp.encode, stream, lp.decode, async (source) => await all(source) ); } catch (err) { - // can fail only because of `stream` abortion log.error("Failed to send waku light push request", err); return { success: null, failure: { - error: ProtocolError.STREAM_ABORTED, + error: LightPushError.STREAM_ABORTED, peerId: peerId } }; } const bytes = new Uint8ArrayList(); - res.forEach((chunk) => { - bytes.append(chunk); - }); + res.forEach((chunk) => bytes.append(chunk)); - let response: PushResponse | undefined; + if (bytes.length === 0) { + return { + success: null, + failure: { + error: LightPushError.NO_RESPONSE, + peerId: peerId + } + }; + } + + return ProtocolHandler.handleResponse(bytes, protocol, peerId); + } + + private async getProtocol( + peerId: PeerId, + useLegacy: boolean + ): Promise { try { - response = PushRpc.decode(bytes).response; - } catch (err) { - log.error("Failed to decode push reply", err); - return { - success: null, - failure: { - error: ProtocolError.DECODE_FAILED, - peerId: peerId - } - }; - } + const peer = await this.libp2p.peerStore.get(peerId); - if (!response) { - log.error("Remote peer fault: No response in PushRPC"); - return { - success: null, - failure: { - error: ProtocolError.NO_RESPONSE, - peerId: peerId - } - }; + if ( + useLegacy || + (!peer.protocols.includes(CODECS.v3) && + peer.protocols.includes(CODECS.v2)) + ) { + return CODECS.v2; + } else if (peer.protocols.includes(CODECS.v3)) { + return CODECS.v3; + } else { + throw new Error("No supported protocol found"); + } + } catch (error) { + log.error("Failed to get protocol", error); + return undefined; } + } - if (isRLNResponseError(response.info)) { - log.error("Remote peer fault: RLN generation"); - return { - success: null, - failure: { - error: ProtocolError.RLN_PROOF_GENERATION, - peerId: peerId - } - }; + private async getStream( + peerId: PeerId, + protocol: string + ): Promise { + switch (protocol) { + case CODECS.v2: + return this.streamManagerV2.getStream(peerId); + case CODECS.v3: + return this.streamManager.getStream(peerId); + default: + return undefined; } - - if (!response.isSuccess) { - log.error("Remote peer rejected the message: ", response.info); - return { - success: null, - failure: { - error: ProtocolError.REMOTE_PEER_REJECTED, - peerId: peerId - } - }; - } - - return { success: peerId, failure: null }; } } diff --git a/packages/core/src/lib/light_push/protocol_handler.ts b/packages/core/src/lib/light_push/protocol_handler.ts new file mode 100644 index 0000000000..429664f32d --- /dev/null +++ b/packages/core/src/lib/light_push/protocol_handler.ts @@ -0,0 +1,191 @@ +import type { PeerId } from "@libp2p/interface"; +import type { IEncoder, IMessage, LightPushCoreResult } from "@waku/interfaces"; +import { LightPushError, LightPushStatusCode } from "@waku/interfaces"; +import { PushResponse, WakuMessage } from "@waku/proto"; +import { isMessageSizeUnderCap, Logger } from "@waku/utils"; +import { Uint8ArrayList } from "uint8arraylist"; + +import { CODECS } from "./constants.js"; +import { PushRpcV2 } from "./push_rpc.js"; +import { PushRpc } from "./push_rpc_v3.js"; +import { isRLNResponseError } from "./utils.js"; + +type VersionedPushRpc = + | ({ version: "v2" } & PushRpcV2) + | ({ version: "v3" } & PushRpc); + +type PreparePushMessageResult = + | { rpc: VersionedPushRpc; error: null } + | { rpc: null; error: LightPushError }; + +const log = new Logger("light-push:protocol-handler"); + +export class ProtocolHandler { + public static async preparePushMessage( + encoder: IEncoder, + message: IMessage, + protocol: string + ): Promise { + try { + if (!message.payload || message.payload.length === 0) { + log.error("Failed to send waku light push: payload is empty"); + return { rpc: null, error: LightPushError.EMPTY_PAYLOAD }; + } + + if (!(await isMessageSizeUnderCap(encoder, message))) { + log.error("Failed to send waku light push: message is bigger than 1MB"); + return { rpc: null, error: LightPushError.SIZE_TOO_BIG }; + } + + const protoMessage = await encoder.toProtoObj(message); + if (!protoMessage) { + log.error("Failed to encode to protoMessage, aborting push"); + return { rpc: null, error: LightPushError.ENCODE_FAILED }; + } + + if (protocol === CODECS.v3) { + log.info("Creating v3 RPC message"); + return { + rpc: ProtocolHandler.createV3Rpc(protoMessage, encoder.pubsubTopic), + error: null + }; + } + + log.info("Creating v2 RPC message"); + return { + rpc: ProtocolHandler.createV2Rpc(protoMessage, encoder.pubsubTopic), + error: null + }; + } catch (err) { + log.error("Failed to prepare push message", err); + return { rpc: null, error: LightPushError.GENERIC_FAIL }; + } + } + + /** + * Decode and evaluate a LightPush response according to the protocol version + */ + public static handleResponse( + bytes: Uint8ArrayList, + protocol: string, + peerId: PeerId + ): LightPushCoreResult { + if (protocol === CODECS.v3) { + return ProtocolHandler.handleV3Response(bytes, peerId); + } + + return ProtocolHandler.handleV2Response(bytes, peerId); + } + + private static handleV3Response( + bytes: Uint8ArrayList, + peerId: PeerId + ): LightPushCoreResult { + try { + const decodedRpcV3 = PushRpc.decodeResponse(bytes); + const statusCode = decodedRpcV3.statusCode; + const statusDesc = decodedRpcV3.statusDesc; + + if (statusCode !== LightPushStatusCode.SUCCESS) { + const error = LightPushError.REMOTE_PEER_REJECTED; + log.error( + `Remote peer rejected with v3 status code ${statusCode}: ${statusDesc}` + ); + return { + success: null, + failure: { + error, + peerId: peerId + } + }; + } + + if (decodedRpcV3.relayPeerCount !== undefined) { + log.info(`Message relayed to ${decodedRpcV3.relayPeerCount} peers`); + } + + return { success: peerId, failure: null }; + } catch (err) { + return { + success: null, + failure: { + error: LightPushError.DECODE_FAILED, + peerId: peerId + } + }; + } + } + + private static handleV2Response( + bytes: Uint8ArrayList, + peerId: PeerId + ): LightPushCoreResult { + let response: PushResponse | undefined; + try { + const decodedRpc = PushRpcV2.decode(bytes); + response = decodedRpc.response; + } catch (err) { + return { + success: null, + failure: { + error: LightPushError.DECODE_FAILED, + peerId: peerId + } + }; + } + + if (!response) { + return { + success: null, + failure: { + error: LightPushError.NO_RESPONSE, + peerId: peerId + } + }; + } + + if (isRLNResponseError(response.info)) { + log.error("Remote peer fault: RLN generation"); + return { + success: null, + failure: { + error: LightPushError.RLN_PROOF_GENERATION, + peerId: peerId + } + }; + } + + if (!response.isSuccess) { + log.error("Remote peer rejected the message: ", response.info); + return { + success: null, + failure: { + error: LightPushError.REMOTE_PEER_REJECTED, + peerId: peerId + } + }; + } + + return { success: peerId, failure: null }; + } + + private static createV2Rpc( + message: WakuMessage, + pubsubTopic: string + ): VersionedPushRpc { + const v2Rpc = PushRpcV2.createRequest(message, pubsubTopic); + return Object.assign(v2Rpc, { version: "v2" as const }); + } + + private static createV3Rpc( + message: WakuMessage, + pubsubTopic: string + ): VersionedPushRpc { + if (!message.timestamp) { + message.timestamp = BigInt(Date.now()) * BigInt(1_000_000); + } + + const v3Rpc = PushRpc.createRequest(message, pubsubTopic); + return Object.assign(v3Rpc, { version: "v3" as const }); + } +} diff --git a/packages/core/src/lib/light_push/push_rpc.ts b/packages/core/src/lib/light_push/push_rpc.ts index 7b726e3e49..71fadde1fa 100644 --- a/packages/core/src/lib/light_push/push_rpc.ts +++ b/packages/core/src/lib/light_push/push_rpc.ts @@ -2,14 +2,14 @@ import { proto_lightpush as proto } from "@waku/proto"; import type { Uint8ArrayList } from "uint8arraylist"; import { v4 as uuid } from "uuid"; -export class PushRpc { +export class PushRpcV2 { public constructor(public proto: proto.PushRpc) {} public static createRequest( message: proto.WakuMessage, pubsubTopic: string - ): PushRpc { - return new PushRpc({ + ): PushRpcV2 { + return new PushRpcV2({ requestId: uuid(), request: { message: message, @@ -19,9 +19,9 @@ export class PushRpc { }); } - public static decode(bytes: Uint8ArrayList): PushRpc { + public static decode(bytes: Uint8ArrayList): PushRpcV2 { const res = proto.PushRpc.decode(bytes); - return new PushRpc(res); + return new PushRpcV2(res); } public encode(): Uint8Array { diff --git a/packages/core/src/lib/light_push/push_rpc_v3.ts b/packages/core/src/lib/light_push/push_rpc_v3.ts new file mode 100644 index 0000000000..d4a42b42f4 --- /dev/null +++ b/packages/core/src/lib/light_push/push_rpc_v3.ts @@ -0,0 +1,162 @@ +import { proto_lightpush as proto } from "@waku/proto"; +import type { Uint8ArrayList } from "uint8arraylist"; +import { v4 as uuid } from "uuid"; + +/** + * LightPush v3 protocol RPC handler. + * Implements the v3 message format with correct field numbers: + * - requestId: 1 + * - pubsubTopic: 20 + * - message: 21 + */ +export class PushRpc { + public constructor( + public proto: proto.LightPushRequestV3 | proto.LightPushResponseV3 + ) {} + + /** + * Create a v3 request message with proper field numbering + */ + public static createRequest( + message: proto.WakuMessage, + pubsubTopic: string + ): PushRpc { + return new PushRpc({ + requestId: uuid(), + pubsubTopic: pubsubTopic, + message: message + }); + } + + /** + * Create a v3 response message with status code handling + */ + public static createResponse( + requestId: string, + statusCode: number, + statusDesc?: string, + relayPeerCount?: number + ): PushRpc { + return new PushRpc({ + requestId, + statusCode, + statusDesc, + relayPeerCount + }); + } + + /** + * Decode v3 request message + */ + public static decodeRequest(bytes: Uint8ArrayList): PushRpc { + const res = proto.LightPushRequestV3.decode(bytes); + return new PushRpc(res); + } + + /** + * Decode v3 response message + */ + public static decodeResponse(bytes: Uint8ArrayList): PushRpc { + const res = proto.LightPushResponseV3.decode(bytes); + return new PushRpc(res); + } + + /** + * Encode message to bytes + */ + public encode(): Uint8Array { + if (this.isRequest()) { + return proto.LightPushRequestV3.encode( + this.proto as proto.LightPushRequestV3 + ); + } else { + return proto.LightPushResponseV3.encode( + this.proto as proto.LightPushResponseV3 + ); + } + } + + /** + * Get request data (if this is a request message) + */ + public get request(): proto.LightPushRequestV3 | undefined { + return this.isRequest() + ? (this.proto as proto.LightPushRequestV3) + : undefined; + } + + /** + * Get response data (if this is a response message) + */ + public get response(): proto.LightPushResponseV3 | undefined { + return this.isResponse() + ? (this.proto as proto.LightPushResponseV3) + : undefined; + } + + /** + * Get the request ID + */ + public get requestId(): string { + return this.proto.requestId; + } + + /** + * Get the pubsub topic (only available in requests) + */ + public get pubsubTopic(): string | undefined { + return this.isRequest() + ? (this.proto as proto.LightPushRequestV3).pubsubTopic + : undefined; + } + + /** + * Get the message (only available in requests) + */ + public get message(): proto.WakuMessage | undefined { + return this.isRequest() + ? (this.proto as proto.LightPushRequestV3).message + : undefined; + } + + /** + * Get the status code (only available in responses) + */ + public get statusCode(): number | undefined { + return this.isResponse() + ? (this.proto as proto.LightPushResponseV3).statusCode + : undefined; + } + + /** + * Get the status description (only available in responses) + */ + public get statusDesc(): string | undefined { + return this.isResponse() + ? (this.proto as proto.LightPushResponseV3).statusDesc + : undefined; + } + + /** + * Get the relay peer count (only available in responses) + */ + public get relayPeerCount(): number | undefined { + return this.isResponse() + ? (this.proto as proto.LightPushResponseV3).relayPeerCount + : undefined; + } + + /** + * Check if this is a request message + */ + private isRequest(): boolean { + return "pubsubTopic" in this.proto && "message" in this.proto; + } + + /** + * Check if this is a response message + */ + private isResponse(): boolean { + return "statusCode" in this.proto; + } +} diff --git a/packages/core/src/lib/stream_manager/stream_manager.ts b/packages/core/src/lib/stream_manager/stream_manager.ts index 20ac373ac8..63584c5086 100644 --- a/packages/core/src/lib/stream_manager/stream_manager.ts +++ b/packages/core/src/lib/stream_manager/stream_manager.ts @@ -13,7 +13,7 @@ export class StreamManager { private streamPool: Map> = new Map(); public constructor( - private multicodec: string, + private readonly multicodec: string, private readonly libp2p: Libp2pComponents ) { this.log = new Logger(`stream-manager:${multicodec}`); diff --git a/packages/interfaces/src/light_push.ts b/packages/interfaces/src/light_push.ts index e58d48f092..4a8790d90a 100644 --- a/packages/interfaces/src/light_push.ts +++ b/packages/interfaces/src/light_push.ts @@ -1,4 +1,6 @@ -import type { ISender, ISendOptions } from "./sender.js"; +import { IEncoder, IMessage } from "./message.js"; +import { LightPushSDKResult } from "./protocols.js"; +import type { ISendOptions } from "./sender.js"; export type LightPushProtocolOptions = ISendOptions & { /** @@ -15,8 +17,40 @@ export type LightPushProtocolOptions = ISendOptions & { numPeersToUse?: number; }; -export type ILightPush = ISender & { - readonly multicodec: string; +export type ILightPush = { + readonly multicodec: string[]; start: () => void; stop: () => void; + send: ( + encoder: IEncoder, + message: IMessage, + options?: ISendOptions + ) => Promise; +}; + +export enum LightPushStatusCode { + SUCCESS = 200, + BAD_REQUEST = 400, + PAYLOAD_TOO_LARGE = 413, + INVALID_MESSAGE = 420, + UNSUPPORTED_TOPIC = 421, + TOO_MANY_REQUESTS = 429, + INTERNAL_ERROR = 500, + UNAVAILABLE = 503, + NO_RLN_PROOF = 504, + NO_PEERS = 505 +} + +export const StatusDescriptions: Record = { + [LightPushStatusCode.SUCCESS]: "Message sent successfully", + [LightPushStatusCode.BAD_REQUEST]: "Bad request format", + [LightPushStatusCode.PAYLOAD_TOO_LARGE]: + "Message payload exceeds maximum size", + [LightPushStatusCode.INVALID_MESSAGE]: "Message validation failed", + [LightPushStatusCode.UNSUPPORTED_TOPIC]: "Unsupported pubsub topic", + [LightPushStatusCode.TOO_MANY_REQUESTS]: "Rate limit exceeded", + [LightPushStatusCode.INTERNAL_ERROR]: "Internal server error", + [LightPushStatusCode.UNAVAILABLE]: "Service temporarily unavailable", + [LightPushStatusCode.NO_RLN_PROOF]: "RLN proof generation failed", + [LightPushStatusCode.NO_PEERS]: "No relay peers available" }; diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 6cbea0a51b..0fb60c182f 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -130,117 +130,123 @@ export type Callback = ( msg: T ) => void | Promise; -export enum ProtocolError { - // - // GENERAL ERRORS SECTION - // - /** - * Could not determine the origin of the fault. Best to check connectivity and try again - * */ +export enum LightPushError { GENERIC_FAIL = "Generic error", - - /** - * The remote peer rejected the message. Information provided by the remote peer - * is logged. Review message validity, or mitigation for `NO_PEER_AVAILABLE` - * or `DECODE_FAILED` can be used. - */ - REMOTE_PEER_REJECTED = "Remote peer rejected", - - /** - * Failure to protobuf decode the message. May be due to a remote peer issue, - * ensuring that messages are sent via several peer enable mitigation of this error. - */ DECODE_FAILED = "Failed to decode", - - /** - * Failure to find a peer with suitable protocols. This may due to a connection issue. - * Mitigation can be: retrying after a given time period, display connectivity issue - * to user or listening for `peer:connected:bootstrap` or `peer:connected:peer-exchange` - * on the connection manager before retrying. - */ NO_PEER_AVAILABLE = "No peer available", - - /** - * Failure to find a stream to the peer. This may be because the connection with the peer is not still alive. - * Mitigation can be: retrying after a given time period, or mitigation for `NO_PEER_AVAILABLE` can be used. - */ NO_STREAM_AVAILABLE = "No stream available", - - /** - * The remote peer did not behave as expected. Mitigation for `NO_PEER_AVAILABLE` - * or `DECODE_FAILED` can be used. - */ NO_RESPONSE = "No response received", - - // - // SEND ERRORS SECTION - // - /** - * Failure to protobuf encode the message. This is not recoverable and needs - * further investigation. - */ - ENCODE_FAILED = "Failed to encode", - - /** - * The message payload is empty, making the message invalid. Ensure that a non-empty - * payload is set on the outgoing message. - */ - EMPTY_PAYLOAD = "Payload is empty", - - /** - * The message size is above the maximum message size allowed on the Waku Network. - * Compressing the message or using an alternative strategy for large messages is recommended. - */ - SIZE_TOO_BIG = "Size is too big", - - /** - * The PubsubTopic passed to the send function is not configured on the Waku node. - * Please ensure that the PubsubTopic is used when initializing the Waku node. - */ - TOPIC_NOT_CONFIGURED = "Topic not configured", - - /** - * Fails when - */ STREAM_ABORTED = "Stream aborted", - /** - * General proof generation error message. - * nwaku: https://github.com/waku-org/nwaku/blob/c3cb06ac6c03f0f382d3941ea53b330f6a8dd127/waku/waku_rln_relay/group_manager/group_manager_base.nim#L201C19-L201C42 - */ + ENCODE_FAILED = "Failed to encode", + EMPTY_PAYLOAD = "Payload is empty", + SIZE_TOO_BIG = "Size is too big", + TOPIC_NOT_CONFIGURED = "Topic not configured", RLN_PROOF_GENERATION = "Proof generation failed", + REMOTE_PEER_REJECTED = "Remote peer rejected", - // - // RECEIVE ERRORS SECTION - // - /** - * The pubsub topic configured on the decoder does not match the pubsub topic setup on the protocol. - * Ensure that the pubsub topic used for decoder creation is the same as the one used for protocol. - */ - TOPIC_DECODER_MISMATCH = "Topic decoder mismatch", - - /** - * The topics passed in the decoders do not match each other, or don't exist at all. - * Ensure that all the pubsub topics used in the decoders are valid and match each other. - */ - INVALID_DECODER_TOPICS = "Invalid decoder topics" + BAD_REQUEST = "Bad request format", + PAYLOAD_TOO_LARGE = "Message payload exceeds maximum size", + INVALID_MESSAGE = "Message validation failed", + UNSUPPORTED_TOPIC = "Unsupported pubsub topic", + TOO_MANY_REQUESTS = "Rate limit exceeded", + INTERNAL_ERROR = "Internal server error", + UNAVAILABLE = "Service temporarily unavailable", + NO_RLN_PROOF = "RLN proof generation failed", + NO_PEERS = "No relay peers available" } -export interface Failure { - error: ProtocolError; +export enum FilterError { + // General errors + GENERIC_FAIL = "Generic error", + DECODE_FAILED = "Failed to decode", + NO_PEER_AVAILABLE = "No peer available", + NO_STREAM_AVAILABLE = "No stream available", + NO_RESPONSE = "No response received", + STREAM_ABORTED = "Stream aborted", + + // Filter specific errors + REMOTE_PEER_REJECTED = "Remote peer rejected", + TOPIC_NOT_CONFIGURED = "Topic not configured", + SUBSCRIPTION_FAILED = "Subscription failed", + UNSUBSCRIBE_FAILED = "Unsubscribe failed", + PING_FAILED = "Ping failed", + TOPIC_DECODER_MISMATCH = "Topic decoder mismatch", + INVALID_DECODER_TOPICS = "Invalid decoder topics", + SUBSCRIPTION_LIMIT_EXCEEDED = "Subscription limit exceeded", + INVALID_CONTENT_TOPIC = "Invalid content topic", + PUSH_MESSAGE_FAILED = "Push message failed", + EMPTY_MESSAGE = "Empty message received", + MISSING_PUBSUB_TOPIC = "Pubsub topic missing from push message" +} + +export interface LightPushFailure { + error: LightPushError; peerId?: PeerId; } -export type CoreProtocolResult = ThisOrThat< +export interface FilterFailure { + error: FilterError; + peerId?: PeerId; +} + +export type LightPushCoreResult = ThisOrThat< "success", PeerId, "failure", - Failure + LightPushFailure >; +export type FilterCoreResult = ThisOrThat< + "success", + PeerId, + "failure", + FilterFailure +>; + +export type LightPushSDKResult = ThisAndThat< + "successes", + PeerId[], + "failures", + LightPushFailure[] +>; + +export type FilterSDKResult = ThisAndThat< + "successes", + PeerId[], + "failures", + FilterFailure[] +>; + +/** + * @deprecated replace usage by specific result types + */ export type SDKProtocolResult = ThisAndThat< "successes", PeerId[], "failures", - Failure[] + Array<{ + error: ProtocolError; + peerId?: PeerId; + }> >; + +/** + * @deprecated replace usage by specific result types + */ +export enum ProtocolError { + GENERIC_FAIL = "Generic error", + REMOTE_PEER_REJECTED = "Remote peer rejected", + DECODE_FAILED = "Failed to decode", + NO_PEER_AVAILABLE = "No peer available", + NO_STREAM_AVAILABLE = "No stream available", + NO_RESPONSE = "No response received", + ENCODE_FAILED = "Failed to encode", + EMPTY_PAYLOAD = "Payload is empty", + SIZE_TOO_BIG = "Size is too big", + TOPIC_NOT_CONFIGURED = "Topic not configured", + STREAM_ABORTED = "Stream aborted", + RLN_PROOF_GENERATION = "Proof generation failed", + TOPIC_DECODER_MISMATCH = "Topic decoder mismatch", + INVALID_DECODER_TOPICS = "Invalid decoder topics" +} diff --git a/packages/interfaces/src/sender.ts b/packages/interfaces/src/sender.ts index 0c924b3f3f..da4fc5f003 100644 --- a/packages/interfaces/src/sender.ts +++ b/packages/interfaces/src/sender.ts @@ -1,5 +1,5 @@ import type { IEncoder, IMessage } from "./message.js"; -import { SDKProtocolResult } from "./protocols.js"; +import { LightPushSDKResult } from "./protocols.js"; export type ISendOptions = { /** @@ -13,6 +13,13 @@ export type ISendOptions = { * @default 3 */ maxAttempts?: number; + + /** + * Use v2 of the light push protocol. + * This parameter will be removed in the future. + * @default false + */ + useLegacy?: boolean; }; export interface ISender { @@ -20,5 +27,5 @@ export interface ISender { encoder: IEncoder, message: IMessage, sendOptions?: ISendOptions - ) => Promise; + ) => Promise; } diff --git a/packages/proto/src/lib/light_push.proto b/packages/proto/src/lib/light_push.proto index b980115ab9..9ceba1ab2f 100644 --- a/packages/proto/src/lib/light_push.proto +++ b/packages/proto/src/lib/light_push.proto @@ -39,4 +39,4 @@ message LightPushResponseV3 { uint32 status_code = 10; optional string status_desc = 11; optional uint32 relay_peer_count = 12; -} \ No newline at end of file +} diff --git a/packages/relay/src/relay.ts b/packages/relay/src/relay.ts index 6f6dd98fa7..cd1336ef72 100644 --- a/packages/relay/src/relay.ts +++ b/packages/relay/src/relay.ts @@ -19,9 +19,9 @@ import { IRelay, type IRoutingInfo, Libp2p, - ProtocolError, - PubsubTopic, - SDKProtocolResult + LightPushError, + LightPushSDKResult, + PubsubTopic } from "@waku/interfaces"; import { isWireSizeUnderCap, toAsyncIterator } from "@waku/utils"; import { pushOrInitMapSet } from "@waku/utils"; @@ -127,7 +127,7 @@ export class Relay implements IRelay { public async send( encoder: IEncoder, message: IMessage - ): Promise { + ): Promise { const { pubsubTopic } = encoder; if (!this.pubsubTopics.has(pubsubTopic)) { log.error("Failed to send waku relay: topic not configured"); @@ -135,7 +135,7 @@ export class Relay implements IRelay { successes: [], failures: [ { - error: ProtocolError.TOPIC_NOT_CONFIGURED + error: LightPushError.TOPIC_NOT_CONFIGURED } ] }; @@ -148,7 +148,7 @@ export class Relay implements IRelay { successes: [], failures: [ { - error: ProtocolError.ENCODE_FAILED + error: LightPushError.ENCODE_FAILED } ] }; @@ -160,7 +160,7 @@ export class Relay implements IRelay { successes: [], failures: [ { - error: ProtocolError.SIZE_TOO_BIG + error: LightPushError.SIZE_TOO_BIG } ] }; diff --git a/packages/sdk/src/light_push/light_push.spec.ts b/packages/sdk/src/light_push/light_push.spec.ts index faabc16459..c0a9d0848d 100644 --- a/packages/sdk/src/light_push/light_push.spec.ts +++ b/packages/sdk/src/light_push/light_push.spec.ts @@ -1,6 +1,11 @@ import { Peer, PeerId } from "@libp2p/interface"; -import { createEncoder, Encoder, LightPushCodec } from "@waku/core"; -import { Libp2p, ProtocolError } from "@waku/interfaces"; +import { + createEncoder, + Encoder, + LightPushCodec, + LightPushCodecV2 +} from "@waku/core"; +import { Libp2p, LightPushError, LightPushStatusCode } from "@waku/interfaces"; import { createRoutingInfo } from "@waku/utils"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; @@ -40,8 +45,8 @@ describe("LightPush SDK", () => { const failures = result.failures ?? []; expect(failures.length).to.be.eq(1); - expect(failures.some((v) => v.error === ProtocolError.NO_PEER_AVAILABLE)).to - .be.true; + expect(failures.some((v) => v.error === LightPushError.NO_PEER_AVAILABLE)) + .to.be.true; }); it("should send to specified number of peers of used peers", async () => { @@ -127,6 +132,45 @@ describe("LightPush SDK", () => { expect(result.successes?.length).to.be.eq(1); expect(result.failures?.length).to.be.eq(1); }); + + describe("v3 protocol support", () => { + it("should work with v3 peers", async () => { + libp2p = mockLibp2p({ + peers: [mockV3Peer("1"), mockV3Peer("2")] + }); + }); + + it("should work with mixed v2 and v3 peers", async () => { + libp2p = mockLibp2p({ + peers: [mockV2AndV3Peer("1"), mockPeer("2"), mockV3Peer("3")] + }); + + // Mock responses for different protocol versions + const v3Response = mockV3SuccessResponse(5); + const v2Response = mockV2SuccessResponse(); + const v3ErrorResponse = mockV3ErrorResponse( + LightPushStatusCode.PAYLOAD_TOO_LARGE + ); + const v2ErrorResponse = mockV2ErrorResponse("Message too large"); + + expect(v3Response.statusCode).to.eq(LightPushStatusCode.SUCCESS); + expect(v3Response.relayPeerCount).to.eq(5); + expect(v2Response.isSuccess).to.be.true; + expect(v3ErrorResponse.statusCode).to.eq( + LightPushStatusCode.PAYLOAD_TOO_LARGE + ); + expect(v2ErrorResponse.isSuccess).to.be.false; + }); + + it("should handle v3 RLN errors", async () => { + const v3RLNError = mockV3RLNErrorResponse(); + const v2RLNError = mockV2RLNErrorResponse(); + + expect(v3RLNError.statusCode).to.eq(LightPushStatusCode.NO_RLN_PROOF); + expect(v3RLNError.statusDesc).to.include("RLN proof generation failed"); + expect(v2RLNError.info).to.include("RLN proof generation failed"); + }); + }); }); type MockLibp2pOptions = { @@ -136,7 +180,16 @@ type MockLibp2pOptions = { function mockLibp2p(options?: MockLibp2pOptions): Libp2p { const peers = options?.peers || []; const peerStore = { - get: (id: any) => Promise.resolve(peers.find((p) => p.id === id)) + get: (id: any) => { + const peer = peers.find((p) => p.id === id); + if (peer) { + return Promise.resolve({ + ...peer, + protocols: peer.protocols || [LightPushCodec] + }); + } + return Promise.resolve(undefined); + } }; return { @@ -179,9 +232,92 @@ function mockLightPush(options: MockLightPushOptions): LightPush { return lightPush; } -function mockPeer(id: string): Peer { +function mockPeer(id: string, protocols: string[] = [LightPushCodec]): Peer { return { - id, - protocols: [LightPushCodec] - } as unknown as Peer; + id: { toString: () => id } as PeerId, + protocols: protocols, + metadata: new Map(), + addresses: [], + tags: new Map() + }; +} + +// V3-specific mock functions +function mockV3Peer(id: string): Peer { + return mockPeer(id, [LightPushCodec]); +} + +function mockV2AndV3Peer(id: string): Peer { + return mockPeer(id, [LightPushCodec, LightPushCodecV2]); +} + +function mockV3SuccessResponse(relayPeerCount?: number): { + statusCode: LightPushStatusCode; + statusDesc: string; + relayPeerCount?: number; + isSuccess: boolean; +} { + return { + statusCode: LightPushStatusCode.SUCCESS, + statusDesc: "Message sent successfully", + relayPeerCount, + isSuccess: true + }; +} + +function mockV3ErrorResponse( + statusCode: LightPushStatusCode, + statusDesc?: string +): { + statusCode: LightPushStatusCode; + statusDesc: string; + isSuccess: boolean; +} { + return { + statusCode, + statusDesc: statusDesc || "Error occurred", + isSuccess: false + }; +} + +function mockV2SuccessResponse(): { + isSuccess: boolean; + info: string; +} { + return { + isSuccess: true, + info: "Message sent successfully" + }; +} + +function mockV2ErrorResponse(info?: string): { + isSuccess: boolean; + info: string; +} { + return { + isSuccess: false, + info: info || "Error occurred" + }; +} + +function mockV3RLNErrorResponse(): { + statusCode: LightPushStatusCode; + statusDesc: string; + isSuccess: boolean; +} { + return { + statusCode: LightPushStatusCode.NO_RLN_PROOF, + statusDesc: "RLN proof generation failed", + isSuccess: false + }; +} + +function mockV2RLNErrorResponse(): { + isSuccess: boolean; + info: string; +} { + return { + isSuccess: false, + info: "RLN proof generation failed" + }; } diff --git a/packages/sdk/src/light_push/light_push.ts b/packages/sdk/src/light_push/light_push.ts index 947ce1528b..669c77e38c 100644 --- a/packages/sdk/src/light_push/light_push.ts +++ b/packages/sdk/src/light_push/light_push.ts @@ -1,17 +1,17 @@ import type { PeerId } from "@libp2p/interface"; import { LightPushCore } from "@waku/core"; import { - type CoreProtocolResult, - Failure, type IEncoder, ILightPush, type IMessage, type ISendOptions, type Libp2p, + LightPushCoreResult, + LightPushError, + LightPushFailure, type LightPushProtocolOptions, - ProtocolError, - Protocols, - SDKProtocolResult + LightPushSDKResult, + Protocols } from "@waku/interfaces"; import { Logger } from "@waku/utils"; @@ -55,7 +55,7 @@ export class LightPush implements ILightPush { }); } - public get multicodec(): string { + public get multicodec(): string[] { return this.protocol.multicodec; } @@ -71,8 +71,9 @@ export class LightPush implements ILightPush { encoder: IEncoder, message: IMessage, options: ISendOptions = {} - ): Promise { + ): Promise { options = { + useLegacy: false, ...this.config, ...options }; @@ -82,45 +83,48 @@ export class LightPush implements ILightPush { log.info("send: attempting to send a message to pubsubTopic:", pubsubTopic); const peerIds = await this.peerManager.getPeers({ - protocol: Protocols.LightPush, + protocol: options.useLegacy ? "light-push-v2" : Protocols.LightPush, pubsubTopic: encoder.pubsubTopic }); - const coreResults: CoreProtocolResult[] = + const coreResults = peerIds?.length > 0 ? await Promise.all( peerIds.map((peerId) => - this.protocol.send(encoder, message, peerId).catch((_e) => ({ - success: null, - failure: { - error: ProtocolError.GENERIC_FAIL - } - })) + this.protocol + .send(encoder, message, peerId, options.useLegacy) + .catch((_e) => ({ + success: null, + failure: { + error: LightPushError.GENERIC_FAIL + } + })) ) ) : []; - const results: SDKProtocolResult = coreResults.length + const results: LightPushSDKResult = coreResults.length ? { successes: coreResults .filter((v) => v.success) .map((v) => v.success) as PeerId[], failures: coreResults .filter((v) => v.failure) - .map((v) => v.failure) as Failure[] + .map((v) => v.failure) as LightPushFailure[] } : { successes: [], failures: [ { - error: ProtocolError.NO_PEER_AVAILABLE + error: LightPushError.NO_PEER_AVAILABLE } ] }; if (options.autoRetry && results.successes.length === 0) { - const sendCallback = (peerId: PeerId): Promise => - this.protocol.send(encoder, message, peerId); + const sendCallback = (peerId: PeerId): Promise => + this.protocol.send(encoder, message, peerId, options.useLegacy); + this.retryManager.push( sendCallback.bind(this), options.maxAttempts || DEFAULT_MAX_ATTEMPTS, diff --git a/packages/sdk/src/light_push/retry_manager.spec.ts b/packages/sdk/src/light_push/retry_manager.spec.ts index c9eb95eea8..4ac5f9972e 100644 --- a/packages/sdk/src/light_push/retry_manager.spec.ts +++ b/packages/sdk/src/light_push/retry_manager.spec.ts @@ -1,6 +1,7 @@ import type { PeerId } from "@libp2p/interface"; import { - type CoreProtocolResult, + type LightPushCoreResult, + LightPushError, ProtocolError, Protocols } from "@waku/interfaces"; @@ -59,7 +60,7 @@ describe("RetryManager", () => { it("should process tasks in queue", async () => { const successCallback = sinon.spy( - async (peerId: PeerId): Promise => ({ + async (peerId: PeerId): Promise => ({ success: peerId, failure: null }) @@ -112,9 +113,9 @@ describe("RetryManager", () => { it("should retry failed tasks", async () => { const failingCallback = sinon.spy( - async (): Promise => ({ + async (): Promise => ({ success: null, - failure: { error: "test error" as any } + failure: { error: LightPushError.GENERIC_FAIL } }) ); @@ -135,7 +136,7 @@ describe("RetryManager", () => { }); it("should request peer renewal on specific errors", async () => { - const errorCallback = sinon.spy(async (): Promise => { + const errorCallback = sinon.spy(async (): Promise => { throw new Error(ProtocolError.NO_PEER_AVAILABLE); }); @@ -155,7 +156,7 @@ describe("RetryManager", () => { }); it("should handle task timeouts", async () => { - const slowCallback = sinon.spy(async (): Promise => { + const slowCallback = sinon.spy(async (): Promise => { await new Promise((resolve) => setTimeout(resolve, 15000)); return { success: mockPeerId, failure: null }; }); @@ -174,9 +175,11 @@ describe("RetryManager", () => { }); it("should not execute task if max attempts is 0", async () => { - const failingCallback = sinon.spy(async (): Promise => { - throw new Error("test error" as any); - }); + const failingCallback = sinon.spy( + async (): Promise => { + throw new Error("test error" as any); + } + ); const task = { callback: failingCallback, @@ -209,7 +212,7 @@ describe("RetryManager", () => { called++; return Promise.resolve({ success: null, - failure: { error: ProtocolError.GENERIC_FAIL } + failure: { error: LightPushError.GENERIC_FAIL } }); }); retryManager.push(failCallback, 2, TestRoutingInfo); diff --git a/packages/sdk/src/light_push/retry_manager.ts b/packages/sdk/src/light_push/retry_manager.ts index 0fc156efe4..3363149670 100644 --- a/packages/sdk/src/light_push/retry_manager.ts +++ b/packages/sdk/src/light_push/retry_manager.ts @@ -1,7 +1,7 @@ import type { PeerId } from "@libp2p/interface"; import { - type CoreProtocolResult, type IRoutingInfo, + type LightPushCoreResult, Protocols } from "@waku/interfaces"; import { Logger } from "@waku/utils"; @@ -15,7 +15,7 @@ type RetryManagerConfig = { peerManager: PeerManager; }; -type AttemptCallback = (peerId: PeerId) => Promise; +type AttemptCallback = (peerId: PeerId) => Promise; export type ScheduledTask = { maxAttempts: number; @@ -123,7 +123,13 @@ export class RetryManager { task.callback(peerId) ]); - if (response?.failure) { + // If timeout resolves first, response will be void (undefined) + // In this case, we should treat it as a timeout error + if (response === undefined) { + throw new Error("Task timeout"); + } + + if (response.failure) { throw Error(response.failure.error); } diff --git a/packages/sdk/src/light_push/utils.ts b/packages/sdk/src/light_push/utils.ts index 85afd07a39..a011c84a4f 100644 --- a/packages/sdk/src/light_push/utils.ts +++ b/packages/sdk/src/light_push/utils.ts @@ -1,13 +1,13 @@ -import { ProtocolError } from "@waku/interfaces"; +import { LightPushError } from "@waku/interfaces"; export const shouldPeerBeChanged = ( - failure: string | ProtocolError + failure: string | LightPushError ): boolean => { const toBeChanged = - failure === ProtocolError.REMOTE_PEER_REJECTED || - failure === ProtocolError.NO_RESPONSE || - failure === ProtocolError.RLN_PROOF_GENERATION || - failure === ProtocolError.NO_PEER_AVAILABLE; + failure === LightPushError.REMOTE_PEER_REJECTED || + failure === LightPushError.NO_RESPONSE || + failure === LightPushError.RLN_PROOF_GENERATION || + failure === LightPushError.NO_PEER_AVAILABLE; if (toBeChanged) { return true; diff --git a/packages/sdk/src/peer_manager/peer_manager.spec.ts b/packages/sdk/src/peer_manager/peer_manager.spec.ts index 60174a9078..f4d92301db 100644 --- a/packages/sdk/src/peer_manager/peer_manager.spec.ts +++ b/packages/sdk/src/peer_manager/peer_manager.spec.ts @@ -85,7 +85,8 @@ describe("PeerManager", () => { _clusterId: ClusterId, _shardId: ShardId ) => true, - isPeerOnTopic: async (_id: PeerId, _topic: string) => true + isPeerOnTopic: async (_id: PeerId, _topic: string) => true, + hasShardInfo: async (_id: PeerId) => true } as unknown as IConnectionManager; peerManager = new PeerManager({ libp2p, diff --git a/packages/sdk/src/peer_manager/peer_manager.ts b/packages/sdk/src/peer_manager/peer_manager.ts index 5b62292b24..cc378558b3 100644 --- a/packages/sdk/src/peer_manager/peer_manager.ts +++ b/packages/sdk/src/peer_manager/peer_manager.ts @@ -4,7 +4,12 @@ import { PeerId, TypedEventEmitter } from "@libp2p/interface"; -import { FilterCodecs, LightPushCodec, StoreCodec } from "@waku/core"; +import { + FilterCodecs, + LightPushCodec, + LightPushCodecV2, + StoreCodec +} from "@waku/core"; import { CONNECTION_LOCKED_TAG, type IConnectionManager, @@ -28,8 +33,10 @@ type PeerManagerParams = { connectionManager: IConnectionManager; }; +type SupportedProtocols = Protocols | "light-push-v2"; + type GetPeersParams = { - protocol: Protocols; + protocol: SupportedProtocols; pubsubTopic: string; }; @@ -119,7 +126,7 @@ export class PeerManager { for (const peer of connectedPeers) { const hasProtocol = this.hasPeerProtocol(peer, params.protocol); - const hasSamePubsub = await this.connectionManager.isPeerOnTopic( + const hasSamePubsub = await this.isPeerOnPubsub( peer.id, params.pubsubTopic ); @@ -204,12 +211,19 @@ export class PeerManager { private async onConnected(event: CustomEvent): Promise { const result = event.detail; - if ( - result.protocols.includes(this.matchProtocolToCodec(Protocols.Filter)) - ) { + + const isFilterPeer = result.protocols.includes( + this.getProtocolCodecs(Protocols.Filter) + ); + const isStorePeer = result.protocols.includes( + this.getProtocolCodecs(Protocols.Store) + ); + + if (isFilterPeer) { this.dispatchFilterPeerConnect(result.peerId); } - if (result.protocols.includes(this.matchProtocolToCodec(Protocols.Store))) { + + if (isStorePeer) { this.dispatchStorePeerConnect(result.peerId); } } @@ -230,8 +244,8 @@ export class PeerManager { } } - private hasPeerProtocol(peer: Peer, protocol: Protocols): boolean { - return peer.protocols.includes(this.matchProtocolToCodec(protocol)); + private hasPeerProtocol(peer: Peer, protocol: SupportedProtocols): boolean { + return peer.protocols.includes(this.getProtocolCodecs(protocol)); } private lockPeer(id: PeerId): void { @@ -289,14 +303,18 @@ export class PeerManager { ); } - private matchProtocolToCodec(protocol: Protocols): string { - const protocolToCodec = { + private getProtocolCodecs(protocol: SupportedProtocols): string { + if (protocol === Protocols.Relay) { + throw new Error("Relay protocol is not supported"); + } + + const protocolToCodecs = { [Protocols.Filter]: FilterCodecs.SUBSCRIBE, [Protocols.LightPush]: LightPushCodec, [Protocols.Store]: StoreCodec, - [Protocols.Relay]: "" + "light-push-v2": LightPushCodecV2 }; - return protocolToCodec[protocol]; + return protocolToCodecs[protocol]; } } diff --git a/packages/sdk/src/waku/wait_for_remote_peer.spec.ts b/packages/sdk/src/waku/wait_for_remote_peer.spec.ts index b3fb33d3e7..9b0073b379 100644 --- a/packages/sdk/src/waku/wait_for_remote_peer.spec.ts +++ b/packages/sdk/src/waku/wait_for_remote_peer.spec.ts @@ -1,5 +1,10 @@ import type { Connection, Peer, PeerStore } from "@libp2p/interface"; -import { FilterCodecs, LightPushCodec, StoreCodec } from "@waku/core"; +import { + FilterCodecs, + LightPushCodec, + LightPushCodecV2, + StoreCodec +} from "@waku/core"; import { IRelay, Protocols } from "@waku/interfaces"; import { expect } from "chai"; import sinon from "sinon"; @@ -114,7 +119,10 @@ describe("waitForRemotePeer", () => { err = e as Error; } - expect(addEventListenerSpy.calledOnceWith("peer:identify")).to.be.true; + expect(addEventListenerSpy.calledTwice).to.be.true; + addEventListenerSpy + .getCalls() + .forEach((c) => expect(c.firstArg).to.equal("peer:identify")); expect(err).not.to.be.undefined; expect(err!.message).to.be.eq("Timed out waiting for a remote peer."); @@ -148,9 +156,12 @@ describe("waitForRemotePeer", () => { }); it("should wait for LightPush peer to be connected", async () => { + let call = 0; const addEventListenerSpy = sinon.spy( (_type: string, _cb: (e: any) => void) => { - _cb({ detail: { protocols: [LightPushCodec] } }); + const proto = call === 0 ? LightPushCodec : LightPushCodecV2; + call++; + _cb({ detail: { protocols: [proto] } }); } ); eventTarget.addEventListener = addEventListenerSpy; @@ -174,7 +185,10 @@ describe("waitForRemotePeer", () => { err = e as Error; } - expect(addEventListenerSpy.calledOnceWith("peer:identify")).to.be.true; + expect(addEventListenerSpy.calledTwice).to.be.true; + addEventListenerSpy + .getCalls() + .forEach((c) => expect(c.firstArg).to.equal("peer:identify")); expect(err).to.be.undefined; // check with metadata serivice @@ -196,8 +210,10 @@ describe("waitForRemotePeer", () => { err = e as Error; } - expect(addEventListenerSpy.calledTwice).to.be.true; - expect(addEventListenerSpy.lastCall.calledWith("peer:identify")).to.be.true; + expect(addEventListenerSpy.callCount).to.equal(4); + addEventListenerSpy + .getCalls() + .forEach((c) => expect(c.firstArg).to.equal("peer:identify")); expect(err).to.be.undefined; }); diff --git a/packages/sdk/src/waku/wait_for_remote_peer.ts b/packages/sdk/src/waku/wait_for_remote_peer.ts index ae10d6be8a..37178ae989 100644 --- a/packages/sdk/src/waku/wait_for_remote_peer.ts +++ b/packages/sdk/src/waku/wait_for_remote_peer.ts @@ -1,5 +1,10 @@ import type { IdentifyResult } from "@libp2p/interface"; -import { FilterCodecs, LightPushCodec, StoreCodec } from "@waku/core"; +import { + FilterCodecs, + LightPushCodec, + LightPushCodecV2, + StoreCodec +} from "@waku/core"; import type { IWaku, Libp2p } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { Logger } from "@waku/utils"; @@ -82,6 +87,13 @@ export async function waitForRemotePeer( type EventListener = (_: CustomEvent) => void; +function protocolToPeerPromise( + codecs: string[], + libp2p: Libp2p +): Promise[] { + return codecs.map((codec) => waitForConnectedPeer(codec, libp2p)); +} + /** * Waits for required peers to be connected. */ @@ -96,15 +108,21 @@ async function waitForProtocols( } if (waku.store && protocols.includes(Protocols.Store)) { - promises.push(waitForConnectedPeer(StoreCodec, waku.libp2p)); + promises.push(...protocolToPeerPromise([StoreCodec], waku.libp2p)); } if (waku.lightPush && protocols.includes(Protocols.LightPush)) { - promises.push(waitForConnectedPeer(LightPushCodec, waku.libp2p)); + const lpPromises = protocolToPeerPromise( + [LightPushCodec, LightPushCodecV2], + waku.libp2p + ); + promises.push(Promise.any(lpPromises)); } if (waku.filter && protocols.includes(Protocols.Filter)) { - promises.push(waitForConnectedPeer(FilterCodecs.SUBSCRIBE, waku.libp2p)); + promises.push( + ...protocolToPeerPromise([FilterCodecs.SUBSCRIBE], waku.libp2p) + ); } return Promise.all(promises); @@ -246,15 +264,17 @@ function getEnabledProtocols(waku: IWaku): Protocols[] { function mapProtocolsToCodecs(protocols: Protocols[]): Map { const codecs: Map = new Map(); - const protocolToCodec: Record = { - [Protocols.Filter]: FilterCodecs.SUBSCRIBE, - [Protocols.LightPush]: LightPushCodec, - [Protocols.Store]: StoreCodec + const protocolToCodec: Record = { + [Protocols.Filter]: [FilterCodecs.SUBSCRIBE], + [Protocols.LightPush]: [LightPushCodec, LightPushCodecV2], + [Protocols.Store]: [StoreCodec] }; for (const protocol of protocols) { if (protocolToCodec[protocol]) { - codecs.set(protocolToCodec[protocol], false); + protocolToCodec[protocol].forEach((codec) => { + codecs.set(codec, false); + }); } } diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index a568474b1a..7336b06df7 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -182,7 +182,7 @@ export class WakuNode implements IWaku { } if (_protocols.includes(Protocols.LightPush)) { if (this.lightPush) { - codecs.push(this.lightPush.multicodec); + codecs.push(...this.lightPush.multicodec); } else { log.error( "Light Push codec not included in dial codec: protocol not mounted locally" diff --git a/packages/tests/tests/light-push/index.node.spec.ts b/packages/tests/tests/light-push/index.node.spec.ts index 6733c2dbb9..755dc178ea 100644 --- a/packages/tests/tests/light-push/index.node.spec.ts +++ b/packages/tests/tests/light-push/index.node.spec.ts @@ -1,5 +1,5 @@ import { createEncoder } from "@waku/core"; -import { IRateLimitProof, LightNode, ProtocolError } from "@waku/interfaces"; +import { IRateLimitProof, LightNode, LightPushError } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/sdk"; import { expect } from "chai"; @@ -21,9 +21,9 @@ import { TestRoutingInfo } from "./utils.js"; -const runTests = (strictNodeCheck: boolean): void => { +const runTests = (strictNodeCheck: boolean, useLegacy: boolean): void => { const numServiceNodes = 2; - describe(`Waku Light Push: Multiple Nodes: Strict Check: ${strictNodeCheck}`, function () { + describe(`Waku Light Push (legacy=${useLegacy ? "v2" : "v3"}): Multiple Nodes: Strict Check: ${strictNodeCheck}`, function () { // Set the timeout for all tests in this suite. Can be overwritten at test level this.timeout(15000); let waku: LightNode; @@ -36,7 +36,8 @@ const runTests = (strictNodeCheck: boolean): void => { { lightpush: true, filter: true }, strictNodeCheck, numServiceNodes, - true + true, + { lightPush: { useLegacy } } ); }); @@ -95,7 +96,7 @@ const runTests = (strictNodeCheck: boolean): void => { expect(pushResponse.successes.length).to.eq(0); expect(pushResponse.failures?.map((failure) => failure.error)).to.include( - ProtocolError.EMPTY_PAYLOAD + LightPushError.EMPTY_PAYLOAD ); expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( @@ -174,7 +175,7 @@ const runTests = (strictNodeCheck: boolean): void => { expect(pushResponse.successes.length).to.eq(0); expect(pushResponse.failures?.map((failure) => failure.error)).to.include( - ProtocolError.REMOTE_PEER_REJECTED + LightPushError.REMOTE_PEER_REJECTED ); expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( false @@ -248,7 +249,7 @@ const runTests = (strictNodeCheck: boolean): void => { }); expect(pushResponse.successes.length).to.eq(0); expect(pushResponse.failures?.map((failure) => failure.error)).to.include( - ProtocolError.SIZE_TOO_BIG + LightPushError.SIZE_TOO_BIG ); expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( false @@ -257,4 +258,6 @@ const runTests = (strictNodeCheck: boolean): void => { }); }; -[true, false].map(runTests); +[true, false].forEach((strictNodeCheck) => { + [true, false].forEach((legacy) => runTests(strictNodeCheck, legacy)); +}); diff --git a/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts index a71218faf0..5bafa45c71 100644 --- a/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts @@ -1,5 +1,5 @@ import { createEncoder } from "@waku/core"; -import { LightNode, Protocols } from "@waku/interfaces"; +import { IWaku, Protocols } from "@waku/interfaces"; import { createRoutingInfo } from "@waku/utils"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; @@ -28,7 +28,7 @@ describe("Waku Light Push (Autosharding): Multiple Shards", function () { this.timeout(30000); const numServiceNodes = 2; - let waku: LightNode; + let waku: IWaku; let serviceNodes: ServiceNodesFleet; const customContentTopic2 = "/test/2/waku-light-push/utf8"; @@ -48,6 +48,7 @@ describe("Waku Light Push (Autosharding): Multiple Shards", function () { { lightpush: true, filter: true, + relay: true, contentTopic: [TestEncoder.contentTopic, customEncoder2.contentTopic] }, false, @@ -60,45 +61,56 @@ describe("Waku Light Push (Autosharding): Multiple Shards", function () { await teardownNodesWithRedundancy(serviceNodes, waku); }); - it("Subscribe and receive messages on 2 different pubsubtopics", async function () { - if (customRoutingInfo2.pubsubTopic === TestEncoder.pubsubTopic) - throw "Invalid test, both encoder uses same shard"; + [true, false].forEach((useLegacy) => { + it(`Subscribe and receive messages on 2 different pubsubtopics with ${useLegacy ? "v2" : "v3"} protocol`, async function () { + if (customRoutingInfo2.pubsubTopic === TestEncoder.pubsubTopic) + throw "Invalid test, both encoder uses same shard"; - const pushResponse1 = await waku.lightPush.send(TestEncoder, { - payload: utf8ToBytes("M1") - }); - const pushResponse2 = await waku.lightPush.send(customEncoder2, { - payload: utf8ToBytes("M2") - }); + const pushResponse1 = await waku.lightPush!.send( + TestEncoder, + { + payload: utf8ToBytes("M1") + }, + { useLegacy } + ); - expect(pushResponse1.successes.length).to.eq(numServiceNodes); - expect(pushResponse2.successes.length).to.eq(numServiceNodes); + const pushResponse2 = await waku.lightPush!.send( + customEncoder2, + { + payload: utf8ToBytes("M2") + }, + { useLegacy } + ); - const messageCollector1 = new MessageCollector(serviceNodes.nodes[0]); - const messageCollector2 = new MessageCollector(serviceNodes.nodes[1]); + expect(pushResponse1?.successes.length).to.eq(numServiceNodes); + expect(pushResponse2?.successes.length).to.eq(numServiceNodes); - expect( - await messageCollector1.waitForMessagesAutosharding(1, { - contentTopic: TestEncoder.contentTopic - }) - ).to.eq(true); + const messageCollector1 = new MessageCollector(serviceNodes.nodes[0]); + const messageCollector2 = new MessageCollector(serviceNodes.nodes[1]); - expect( - await messageCollector2.waitForMessagesAutosharding(1, { - contentTopic: customEncoder2.contentTopic - }) - ).to.eq(true); + expect( + await messageCollector1.waitForMessagesAutosharding(1, { + contentTopic: TestEncoder.contentTopic + }) + ).to.eq(true); - messageCollector1.verifyReceivedMessage(0, { - expectedMessageText: "M1", - expectedContentTopic: TestEncoder.contentTopic, - expectedPubsubTopic: TestEncoder.pubsubTopic - }); + expect( + await messageCollector2.waitForMessagesAutosharding(1, { + contentTopic: customEncoder2.contentTopic + }) + ).to.eq(true); - messageCollector2.verifyReceivedMessage(0, { - expectedMessageText: "M2", - expectedContentTopic: customEncoder2.contentTopic, - expectedPubsubTopic: customEncoder2.pubsubTopic + messageCollector1.verifyReceivedMessage(0, { + expectedMessageText: "M1", + expectedContentTopic: TestEncoder.contentTopic, + expectedPubsubTopic: TestEncoder.pubsubTopic + }); + + messageCollector2.verifyReceivedMessage(0, { + expectedMessageText: "M2", + expectedContentTopic: customEncoder2.contentTopic, + expectedPubsubTopic: customEncoder2.pubsubTopic + }); }); }); @@ -122,10 +134,10 @@ describe("Waku Light Push (Autosharding): Multiple Shards", function () { const messageCollector2 = new MessageCollector(nwaku2); - await waku.lightPush.send(TestEncoder, { + await waku.lightPush!.send(TestEncoder, { payload: utf8ToBytes("M1") }); - await waku.lightPush.send(customEncoder2, { + await waku.lightPush!.send(customEncoder2, { payload: utf8ToBytes("M2") }); diff --git a/packages/tests/tests/light-push/v2_interop.spec.ts b/packages/tests/tests/light-push/v2_interop.spec.ts new file mode 100644 index 0000000000..f67d4c1a3a --- /dev/null +++ b/packages/tests/tests/light-push/v2_interop.spec.ts @@ -0,0 +1,83 @@ +import { LightNode } from "@waku/interfaces"; +import { createLightNode, utf8ToBytes } from "@waku/sdk"; +import { expect } from "chai"; + +import { + afterEachCustom, + beforeEachCustom, + NOISE_KEY_2, + runMultipleNodes, + ServiceNodesFleet, + teardownNodesWithRedundancy +} from "../../src/index.js"; +import { DEFAULT_DISCOVERIES_ENABLED } from "../../src/lib/runNodes.js"; + +import { TestContentTopic, TestEncoder, TestRoutingInfo } from "./utils.js"; + +describe(`Waku Light Push V2 and V3 interop`, function () { + this.timeout(15000); + let waku: LightNode; + let waku2: LightNode; + let serviceNodes: ServiceNodesFleet; + + beforeEachCustom(this, async () => { + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + TestRoutingInfo, + { lightpush: true, filter: true, relay: true }, + true, + 2, + true + ); + + waku2 = await createLightNode({ + staticNoiseKey: NOISE_KEY_2, + libp2p: { + addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } + }, + networkConfig: TestRoutingInfo.networkConfig, + lightPush: { numPeersToUse: 1 }, + discovery: DEFAULT_DISCOVERIES_ENABLED + }); + + await waku2.dial(await serviceNodes.nodes[1].getMultiaddrWithId()); + }); + + afterEachCustom(this, async () => { + await teardownNodesWithRedundancy(serviceNodes, [waku, waku2]); + }); + + it(`Push messages througth V2 and V3 from 2 js-waku and receives`, async function () { + let pushResponse = await waku.lightPush.send( + TestEncoder, + { + payload: utf8ToBytes("v2") + }, + { useLegacy: true } + ); + expect(pushResponse.successes.length).to.eq(2); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(true); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: "v2", + expectedContentTopic: TestContentTopic, + expectedPubsubTopic: TestRoutingInfo.pubsubTopic + }); + + pushResponse = await waku2.lightPush.send( + TestEncoder, + { + payload: utf8ToBytes("v3") + }, + { useLegacy: false } + ); + expect(pushResponse.successes.length).to.eq(1); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(true); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: "v3", + expectedContentTopic: TestContentTopic, + expectedPubsubTopic: TestRoutingInfo.pubsubTopic + }); + }); +});