feat: implement lp-v3 error codes with backwards compatibility (#2501)

* feat: implement LightPush v3 protocol support

Add comprehensive LightPush v3 protocol implementation with:

Core Features:
- LightPush v3 protocol codec and multicodec detection
- Status code-based error handling and validation
- Protocol version inference and compatibility layers
- Enhanced error types with detailed failure information

Protocol Support:
- Automatic v3/v2 protocol negotiation and fallback
- Status code mapping to LightPush error types
- Protocol version tracking in SDK results
- Mixed protocol environment support

Testing Infrastructure:
- Comprehensive v3 error code handling tests
- Mock functions for v3/v2 response scenarios
- Protocol version detection and validation tests
- Backward compatibility verification

Implementation Details:
- Clean separation between v2 and v3 response handling
- Type-safe status code validation with isSuccess helper
- Enhanced failure reporting with protocol version context
- Proper error propagation through SDK layers

This implementation maintains full backward compatibility with v2
while providing enhanced functionality for v3 protocol features.

* feat: handle both light push protocols

* fix: unsubscribe test

* feat: consolidate lpv2/v3 types

* feat(tests): bump nwaku to 0.36.0

* fix: remove extraneous exports

* fix: add delay to tests

* fix: remove protocol result types

* feat: consolidate light push codec branching

* fix: revert nwaku image

* fix: remove multicodec

* fix: remove protocolversion

* feat: simplify v2/v3 branching logic to use two stream managers

* fix: remove unused utils

* fix: remove comments

* fix: revert store test

* fix: cleanup lightpush sdk

* fix: remove unused util

* fix: remove unused exports

* fix: rename file from public to protocol_handler

* fix: use proper type for sdk result

* fix: update return types in filter

* fix: rebase against latest master

* fix: use both lightpush codecs when waiting for peer

* fix: handle both lp codecs

* fix: remove unused code

* feat: use array for multicodec fields

* fix: add timestamp if missing in v3 rpc

* fix: resolve on either lp codec when waiting for peer

* fix: remove unused util

* fix: remove unnecessary abstraction

* feat: accept nwaku docker image as arg, test lp backwards compat

* fix: revert filter error

* feat: add legacy flag to enable lightpushv2 only

* Revert "feat: accept nwaku docker image as arg, test lp backwards compat"

This reverts commit 857e12cbc73305e5c51abd057665bd34708b2737.

* fix: remove unused test

* feat: improve lp3 (#2597)

* improve light push core

* move back to singualar multicodec property, enable array prop only for light push

* implement v2/v3 interop e2e test, re-add useLegacy flag, ensure e2e runs for v2 and v3

* fix v2 v3 condition

* generate message package earlier

* add log, fix condition

---------

Co-authored-by: Sasha <118575614+weboko@users.noreply.github.com>
Co-authored-by: Sasha <oleksandr@status.im>
This commit is contained in:
Arseniy Klempner 2025-09-04 15:52:37 -07:00 committed by GitHub
parent a7f30b1211
commit 16253026c6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 1049 additions and 361 deletions

View File

@ -10,7 +10,11 @@ export * as waku_filter from "./lib/filter/index.js";
export { FilterCore, FilterCodecs } from "./lib/filter/index.js";
export * as waku_light_push from "./lib/light_push/index.js";
export { LightPushCodec, LightPushCore } from "./lib/light_push/index.js";
export {
LightPushCore,
LightPushCodec,
LightPushCodecV2
} from "./lib/light_push/index.js";
export * as waku_store from "./lib/store/index.js";
export { StoreCore, StoreCodec } from "./lib/store/index.js";

View File

@ -2,9 +2,9 @@ import type { PeerId } from "@libp2p/interface";
import type { IncomingStreamData } from "@libp2p/interface-internal";
import {
type ContentTopic,
type CoreProtocolResult,
type FilterCoreResult,
FilterError,
type Libp2p,
ProtocolError,
type PubsubTopic
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
@ -72,14 +72,14 @@ export class FilterCore {
pubsubTopic: PubsubTopic,
peerId: PeerId,
contentTopics: ContentTopic[]
): Promise<CoreProtocolResult> {
): Promise<FilterCoreResult> {
const stream = await this.streamManager.getStream(peerId);
if (!stream) {
return {
success: null,
failure: {
error: ProtocolError.NO_STREAM_AVAILABLE,
error: FilterError.NO_STREAM_AVAILABLE,
peerId: peerId
}
};
@ -108,7 +108,7 @@ export class FilterCore {
return {
success: null,
failure: {
error: ProtocolError.GENERIC_FAIL,
error: FilterError.GENERIC_FAIL,
peerId: peerId
}
};
@ -123,7 +123,7 @@ export class FilterCore {
);
return {
failure: {
error: ProtocolError.REMOTE_PEER_REJECTED,
error: FilterError.REMOTE_PEER_REJECTED,
peerId: peerId
},
success: null
@ -140,7 +140,7 @@ export class FilterCore {
pubsubTopic: PubsubTopic,
peerId: PeerId,
contentTopics: ContentTopic[]
): Promise<CoreProtocolResult> {
): Promise<FilterCoreResult> {
const stream = await this.streamManager.getStream(peerId);
if (!stream) {
@ -148,7 +148,7 @@ export class FilterCore {
return {
success: null,
failure: {
error: ProtocolError.NO_STREAM_AVAILABLE,
error: FilterError.NO_STREAM_AVAILABLE,
peerId: peerId
}
};
@ -166,7 +166,7 @@ export class FilterCore {
return {
success: null,
failure: {
error: ProtocolError.GENERIC_FAIL,
error: FilterError.GENERIC_FAIL,
peerId: peerId
}
};
@ -181,7 +181,7 @@ export class FilterCore {
public async unsubscribeAll(
pubsubTopic: PubsubTopic,
peerId: PeerId
): Promise<CoreProtocolResult> {
): Promise<FilterCoreResult> {
const stream = await this.streamManager.getStream(peerId);
if (!stream) {
@ -189,7 +189,7 @@ export class FilterCore {
return {
success: null,
failure: {
error: ProtocolError.NO_STREAM_AVAILABLE,
error: FilterError.NO_STREAM_AVAILABLE,
peerId: peerId
}
};
@ -208,7 +208,7 @@ export class FilterCore {
if (!res || !res.length) {
return {
failure: {
error: ProtocolError.NO_RESPONSE,
error: FilterError.NO_RESPONSE,
peerId: peerId
},
success: null
@ -224,7 +224,7 @@ export class FilterCore {
);
return {
failure: {
error: ProtocolError.REMOTE_PEER_REJECTED,
error: FilterError.REMOTE_PEER_REJECTED,
peerId: peerId
},
success: null
@ -237,7 +237,7 @@ export class FilterCore {
};
}
public async ping(peerId: PeerId): Promise<CoreProtocolResult> {
public async ping(peerId: PeerId): Promise<FilterCoreResult> {
const stream = await this.streamManager.getStream(peerId);
if (!stream) {
@ -245,7 +245,7 @@ export class FilterCore {
return {
success: null,
failure: {
error: ProtocolError.NO_STREAM_AVAILABLE,
error: FilterError.NO_STREAM_AVAILABLE,
peerId: peerId
}
};
@ -267,7 +267,7 @@ export class FilterCore {
return {
success: null,
failure: {
error: ProtocolError.GENERIC_FAIL,
error: FilterError.GENERIC_FAIL,
peerId: peerId
}
};
@ -277,7 +277,7 @@ export class FilterCore {
return {
success: null,
failure: {
error: ProtocolError.NO_RESPONSE,
error: FilterError.NO_RESPONSE,
peerId: peerId
}
};
@ -293,7 +293,7 @@ export class FilterCore {
return {
success: null,
failure: {
error: ProtocolError.REMOTE_PEER_REJECTED,
error: FilterError.REMOTE_PEER_REJECTED,
peerId: peerId
}
};

View File

@ -0,0 +1,7 @@
export const CODECS = {
v2: "/vac/waku/lightpush/2.0.0-beta1",
v3: "/vac/waku/lightpush/3.0.0"
} as const;
export const LightPushCodecV2 = CODECS.v2;
export const LightPushCodec = CODECS.v3;

View File

@ -1 +1,2 @@
export { LightPushCore, LightPushCodec, PushResponse } from "./light_push.js";
export { LightPushCore } from "./light_push.js";
export { LightPushCodec, LightPushCodecV2 } from "./constants.js";

View File

@ -1,14 +1,11 @@
import type { PeerId } from "@libp2p/interface";
import type { PeerId, Stream } from "@libp2p/interface";
import {
type CoreProtocolResult,
type IEncoder,
type IMessage,
type Libp2p,
ProtocolError,
type ThisOrThat
type LightPushCoreResult,
LightPushError
} from "@waku/interfaces";
import { PushResponse } from "@waku/proto";
import { isMessageSizeUnderCap } from "@waku/utils";
import { Logger } from "@waku/utils";
import all from "it-all";
import * as lp from "it-length-prefixed";
@ -17,92 +14,71 @@ import { Uint8ArrayList } from "uint8arraylist";
import { StreamManager } from "../stream_manager/index.js";
import { PushRpc } from "./push_rpc.js";
import { isRLNResponseError } from "./utils.js";
import { CODECS } from "./constants.js";
import { ProtocolHandler } from "./protocol_handler.js";
const log = new Logger("light-push");
export const LightPushCodec = "/vac/waku/lightpush/2.0.0-beta1";
export { PushResponse };
type PreparePushMessageResult = ThisOrThat<"query", PushRpc>;
/**
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
*/
export class LightPushCore {
private readonly streamManager: StreamManager;
private readonly streamManagerV2: StreamManager;
public readonly multicodec = LightPushCodec;
public readonly multicodec = [CODECS.v3, CODECS.v2];
public constructor(libp2p: Libp2p) {
this.streamManager = new StreamManager(LightPushCodec, libp2p.components);
}
private async preparePushMessage(
encoder: IEncoder,
message: IMessage
): Promise<PreparePushMessageResult> {
try {
if (!message.payload || message.payload.length === 0) {
log.error("Failed to send waku light push: payload is empty");
return { query: null, error: ProtocolError.EMPTY_PAYLOAD };
}
if (!(await isMessageSizeUnderCap(encoder, message))) {
log.error("Failed to send waku light push: message is bigger than 1MB");
return { query: null, error: ProtocolError.SIZE_TOO_BIG };
}
const protoMessage = await encoder.toProtoObj(message);
if (!protoMessage) {
log.error("Failed to encode to protoMessage, aborting push");
return {
query: null,
error: ProtocolError.ENCODE_FAILED
};
}
const query = PushRpc.createRequest(protoMessage, encoder.pubsubTopic);
return { query, error: null };
} catch (error) {
log.error("Failed to prepare push message", error);
return {
query: null,
error: ProtocolError.GENERIC_FAIL
};
}
public constructor(private libp2p: Libp2p) {
this.streamManagerV2 = new StreamManager(CODECS.v2, libp2p.components);
this.streamManager = new StreamManager(CODECS.v3, libp2p.components);
}
public async send(
encoder: IEncoder,
message: IMessage,
peerId: PeerId
): Promise<CoreProtocolResult> {
const { query, error: preparationError } = await this.preparePushMessage(
encoder,
message
peerId: PeerId,
useLegacy: boolean = false
): Promise<LightPushCoreResult> {
const protocol = await this.getProtocol(peerId, useLegacy);
log.info(
`Sending light push request to peer:${peerId.toString()}, protocol:${protocol}`
);
if (preparationError || !query) {
if (!protocol) {
return {
success: null,
failure: {
error: preparationError,
error: LightPushError.GENERIC_FAIL,
peerId
}
};
}
const stream = await this.streamManager.getStream(peerId);
const { rpc, error: prepError } = await ProtocolHandler.preparePushMessage(
encoder,
message,
protocol
);
if (prepError) {
return {
success: null,
failure: {
error: prepError,
peerId
}
};
}
const stream = await this.getStream(peerId, protocol);
if (!stream) {
log.error(`Failed to get a stream for remote peer:${peerId.toString()}`);
return {
success: null,
failure: {
error: ProtocolError.NO_STREAM_AVAILABLE,
error: LightPushError.NO_STREAM_AVAILABLE,
peerId: peerId
}
};
@ -111,76 +87,74 @@ export class LightPushCore {
let res: Uint8ArrayList[] | undefined;
try {
res = await pipe(
[query.encode()],
[rpc.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
} catch (err) {
// can fail only because of `stream` abortion
log.error("Failed to send waku light push request", err);
return {
success: null,
failure: {
error: ProtocolError.STREAM_ABORTED,
error: LightPushError.STREAM_ABORTED,
peerId: peerId
}
};
}
const bytes = new Uint8ArrayList();
res.forEach((chunk) => {
bytes.append(chunk);
});
res.forEach((chunk) => bytes.append(chunk));
let response: PushResponse | undefined;
if (bytes.length === 0) {
return {
success: null,
failure: {
error: LightPushError.NO_RESPONSE,
peerId: peerId
}
};
}
return ProtocolHandler.handleResponse(bytes, protocol, peerId);
}
private async getProtocol(
peerId: PeerId,
useLegacy: boolean
): Promise<string | undefined> {
try {
response = PushRpc.decode(bytes).response;
} catch (err) {
log.error("Failed to decode push reply", err);
return {
success: null,
failure: {
error: ProtocolError.DECODE_FAILED,
peerId: peerId
}
};
}
const peer = await this.libp2p.peerStore.get(peerId);
if (!response) {
log.error("Remote peer fault: No response in PushRPC");
return {
success: null,
failure: {
error: ProtocolError.NO_RESPONSE,
peerId: peerId
}
};
if (
useLegacy ||
(!peer.protocols.includes(CODECS.v3) &&
peer.protocols.includes(CODECS.v2))
) {
return CODECS.v2;
} else if (peer.protocols.includes(CODECS.v3)) {
return CODECS.v3;
} else {
throw new Error("No supported protocol found");
}
} catch (error) {
log.error("Failed to get protocol", error);
return undefined;
}
}
if (isRLNResponseError(response.info)) {
log.error("Remote peer fault: RLN generation");
return {
success: null,
failure: {
error: ProtocolError.RLN_PROOF_GENERATION,
peerId: peerId
}
};
private async getStream(
peerId: PeerId,
protocol: string
): Promise<Stream | undefined> {
switch (protocol) {
case CODECS.v2:
return this.streamManagerV2.getStream(peerId);
case CODECS.v3:
return this.streamManager.getStream(peerId);
default:
return undefined;
}
if (!response.isSuccess) {
log.error("Remote peer rejected the message: ", response.info);
return {
success: null,
failure: {
error: ProtocolError.REMOTE_PEER_REJECTED,
peerId: peerId
}
};
}
return { success: peerId, failure: null };
}
}

View File

@ -0,0 +1,191 @@
import type { PeerId } from "@libp2p/interface";
import type { IEncoder, IMessage, LightPushCoreResult } from "@waku/interfaces";
import { LightPushError, LightPushStatusCode } from "@waku/interfaces";
import { PushResponse, WakuMessage } from "@waku/proto";
import { isMessageSizeUnderCap, Logger } from "@waku/utils";
import { Uint8ArrayList } from "uint8arraylist";
import { CODECS } from "./constants.js";
import { PushRpcV2 } from "./push_rpc.js";
import { PushRpc } from "./push_rpc_v3.js";
import { isRLNResponseError } from "./utils.js";
type VersionedPushRpc =
| ({ version: "v2" } & PushRpcV2)
| ({ version: "v3" } & PushRpc);
type PreparePushMessageResult =
| { rpc: VersionedPushRpc; error: null }
| { rpc: null; error: LightPushError };
const log = new Logger("light-push:protocol-handler");
export class ProtocolHandler {
public static async preparePushMessage(
encoder: IEncoder,
message: IMessage,
protocol: string
): Promise<PreparePushMessageResult> {
try {
if (!message.payload || message.payload.length === 0) {
log.error("Failed to send waku light push: payload is empty");
return { rpc: null, error: LightPushError.EMPTY_PAYLOAD };
}
if (!(await isMessageSizeUnderCap(encoder, message))) {
log.error("Failed to send waku light push: message is bigger than 1MB");
return { rpc: null, error: LightPushError.SIZE_TOO_BIG };
}
const protoMessage = await encoder.toProtoObj(message);
if (!protoMessage) {
log.error("Failed to encode to protoMessage, aborting push");
return { rpc: null, error: LightPushError.ENCODE_FAILED };
}
if (protocol === CODECS.v3) {
log.info("Creating v3 RPC message");
return {
rpc: ProtocolHandler.createV3Rpc(protoMessage, encoder.pubsubTopic),
error: null
};
}
log.info("Creating v2 RPC message");
return {
rpc: ProtocolHandler.createV2Rpc(protoMessage, encoder.pubsubTopic),
error: null
};
} catch (err) {
log.error("Failed to prepare push message", err);
return { rpc: null, error: LightPushError.GENERIC_FAIL };
}
}
/**
* Decode and evaluate a LightPush response according to the protocol version
*/
public static handleResponse(
bytes: Uint8ArrayList,
protocol: string,
peerId: PeerId
): LightPushCoreResult {
if (protocol === CODECS.v3) {
return ProtocolHandler.handleV3Response(bytes, peerId);
}
return ProtocolHandler.handleV2Response(bytes, peerId);
}
private static handleV3Response(
bytes: Uint8ArrayList,
peerId: PeerId
): LightPushCoreResult {
try {
const decodedRpcV3 = PushRpc.decodeResponse(bytes);
const statusCode = decodedRpcV3.statusCode;
const statusDesc = decodedRpcV3.statusDesc;
if (statusCode !== LightPushStatusCode.SUCCESS) {
const error = LightPushError.REMOTE_PEER_REJECTED;
log.error(
`Remote peer rejected with v3 status code ${statusCode}: ${statusDesc}`
);
return {
success: null,
failure: {
error,
peerId: peerId
}
};
}
if (decodedRpcV3.relayPeerCount !== undefined) {
log.info(`Message relayed to ${decodedRpcV3.relayPeerCount} peers`);
}
return { success: peerId, failure: null };
} catch (err) {
return {
success: null,
failure: {
error: LightPushError.DECODE_FAILED,
peerId: peerId
}
};
}
}
private static handleV2Response(
bytes: Uint8ArrayList,
peerId: PeerId
): LightPushCoreResult {
let response: PushResponse | undefined;
try {
const decodedRpc = PushRpcV2.decode(bytes);
response = decodedRpc.response;
} catch (err) {
return {
success: null,
failure: {
error: LightPushError.DECODE_FAILED,
peerId: peerId
}
};
}
if (!response) {
return {
success: null,
failure: {
error: LightPushError.NO_RESPONSE,
peerId: peerId
}
};
}
if (isRLNResponseError(response.info)) {
log.error("Remote peer fault: RLN generation");
return {
success: null,
failure: {
error: LightPushError.RLN_PROOF_GENERATION,
peerId: peerId
}
};
}
if (!response.isSuccess) {
log.error("Remote peer rejected the message: ", response.info);
return {
success: null,
failure: {
error: LightPushError.REMOTE_PEER_REJECTED,
peerId: peerId
}
};
}
return { success: peerId, failure: null };
}
private static createV2Rpc(
message: WakuMessage,
pubsubTopic: string
): VersionedPushRpc {
const v2Rpc = PushRpcV2.createRequest(message, pubsubTopic);
return Object.assign(v2Rpc, { version: "v2" as const });
}
private static createV3Rpc(
message: WakuMessage,
pubsubTopic: string
): VersionedPushRpc {
if (!message.timestamp) {
message.timestamp = BigInt(Date.now()) * BigInt(1_000_000);
}
const v3Rpc = PushRpc.createRequest(message, pubsubTopic);
return Object.assign(v3Rpc, { version: "v3" as const });
}
}

View File

@ -2,14 +2,14 @@ import { proto_lightpush as proto } from "@waku/proto";
import type { Uint8ArrayList } from "uint8arraylist";
import { v4 as uuid } from "uuid";
export class PushRpc {
export class PushRpcV2 {
public constructor(public proto: proto.PushRpc) {}
public static createRequest(
message: proto.WakuMessage,
pubsubTopic: string
): PushRpc {
return new PushRpc({
): PushRpcV2 {
return new PushRpcV2({
requestId: uuid(),
request: {
message: message,
@ -19,9 +19,9 @@ export class PushRpc {
});
}
public static decode(bytes: Uint8ArrayList): PushRpc {
public static decode(bytes: Uint8ArrayList): PushRpcV2 {
const res = proto.PushRpc.decode(bytes);
return new PushRpc(res);
return new PushRpcV2(res);
}
public encode(): Uint8Array {

View File

@ -0,0 +1,162 @@
import { proto_lightpush as proto } from "@waku/proto";
import type { Uint8ArrayList } from "uint8arraylist";
import { v4 as uuid } from "uuid";
/**
* LightPush v3 protocol RPC handler.
* Implements the v3 message format with correct field numbers:
* - requestId: 1
* - pubsubTopic: 20
* - message: 21
*/
export class PushRpc {
public constructor(
public proto: proto.LightPushRequestV3 | proto.LightPushResponseV3
) {}
/**
* Create a v3 request message with proper field numbering
*/
public static createRequest(
message: proto.WakuMessage,
pubsubTopic: string
): PushRpc {
return new PushRpc({
requestId: uuid(),
pubsubTopic: pubsubTopic,
message: message
});
}
/**
* Create a v3 response message with status code handling
*/
public static createResponse(
requestId: string,
statusCode: number,
statusDesc?: string,
relayPeerCount?: number
): PushRpc {
return new PushRpc({
requestId,
statusCode,
statusDesc,
relayPeerCount
});
}
/**
* Decode v3 request message
*/
public static decodeRequest(bytes: Uint8ArrayList): PushRpc {
const res = proto.LightPushRequestV3.decode(bytes);
return new PushRpc(res);
}
/**
* Decode v3 response message
*/
public static decodeResponse(bytes: Uint8ArrayList): PushRpc {
const res = proto.LightPushResponseV3.decode(bytes);
return new PushRpc(res);
}
/**
* Encode message to bytes
*/
public encode(): Uint8Array {
if (this.isRequest()) {
return proto.LightPushRequestV3.encode(
this.proto as proto.LightPushRequestV3
);
} else {
return proto.LightPushResponseV3.encode(
this.proto as proto.LightPushResponseV3
);
}
}
/**
* Get request data (if this is a request message)
*/
public get request(): proto.LightPushRequestV3 | undefined {
return this.isRequest()
? (this.proto as proto.LightPushRequestV3)
: undefined;
}
/**
* Get response data (if this is a response message)
*/
public get response(): proto.LightPushResponseV3 | undefined {
return this.isResponse()
? (this.proto as proto.LightPushResponseV3)
: undefined;
}
/**
* Get the request ID
*/
public get requestId(): string {
return this.proto.requestId;
}
/**
* Get the pubsub topic (only available in requests)
*/
public get pubsubTopic(): string | undefined {
return this.isRequest()
? (this.proto as proto.LightPushRequestV3).pubsubTopic
: undefined;
}
/**
* Get the message (only available in requests)
*/
public get message(): proto.WakuMessage | undefined {
return this.isRequest()
? (this.proto as proto.LightPushRequestV3).message
: undefined;
}
/**
* Get the status code (only available in responses)
*/
public get statusCode(): number | undefined {
return this.isResponse()
? (this.proto as proto.LightPushResponseV3).statusCode
: undefined;
}
/**
* Get the status description (only available in responses)
*/
public get statusDesc(): string | undefined {
return this.isResponse()
? (this.proto as proto.LightPushResponseV3).statusDesc
: undefined;
}
/**
* Get the relay peer count (only available in responses)
*/
public get relayPeerCount(): number | undefined {
return this.isResponse()
? (this.proto as proto.LightPushResponseV3).relayPeerCount
: undefined;
}
/**
* Check if this is a request message
*/
private isRequest(): boolean {
return "pubsubTopic" in this.proto && "message" in this.proto;
}
/**
* Check if this is a response message
*/
private isResponse(): boolean {
return "statusCode" in this.proto;
}
}

View File

@ -13,7 +13,7 @@ export class StreamManager {
private streamPool: Map<string, Promise<void>> = new Map();
public constructor(
private multicodec: string,
private readonly multicodec: string,
private readonly libp2p: Libp2pComponents
) {
this.log = new Logger(`stream-manager:${multicodec}`);

View File

@ -1,4 +1,6 @@
import type { ISender, ISendOptions } from "./sender.js";
import { IEncoder, IMessage } from "./message.js";
import { LightPushSDKResult } from "./protocols.js";
import type { ISendOptions } from "./sender.js";
export type LightPushProtocolOptions = ISendOptions & {
/**
@ -15,8 +17,40 @@ export type LightPushProtocolOptions = ISendOptions & {
numPeersToUse?: number;
};
export type ILightPush = ISender & {
readonly multicodec: string;
export type ILightPush = {
readonly multicodec: string[];
start: () => void;
stop: () => void;
send: (
encoder: IEncoder,
message: IMessage,
options?: ISendOptions
) => Promise<LightPushSDKResult>;
};
export enum LightPushStatusCode {
SUCCESS = 200,
BAD_REQUEST = 400,
PAYLOAD_TOO_LARGE = 413,
INVALID_MESSAGE = 420,
UNSUPPORTED_TOPIC = 421,
TOO_MANY_REQUESTS = 429,
INTERNAL_ERROR = 500,
UNAVAILABLE = 503,
NO_RLN_PROOF = 504,
NO_PEERS = 505
}
export const StatusDescriptions: Record<LightPushStatusCode, string> = {
[LightPushStatusCode.SUCCESS]: "Message sent successfully",
[LightPushStatusCode.BAD_REQUEST]: "Bad request format",
[LightPushStatusCode.PAYLOAD_TOO_LARGE]:
"Message payload exceeds maximum size",
[LightPushStatusCode.INVALID_MESSAGE]: "Message validation failed",
[LightPushStatusCode.UNSUPPORTED_TOPIC]: "Unsupported pubsub topic",
[LightPushStatusCode.TOO_MANY_REQUESTS]: "Rate limit exceeded",
[LightPushStatusCode.INTERNAL_ERROR]: "Internal server error",
[LightPushStatusCode.UNAVAILABLE]: "Service temporarily unavailable",
[LightPushStatusCode.NO_RLN_PROOF]: "RLN proof generation failed",
[LightPushStatusCode.NO_PEERS]: "No relay peers available"
};

View File

@ -130,117 +130,123 @@ export type Callback<T extends IDecodedMessage> = (
msg: T
) => void | Promise<void>;
export enum ProtocolError {
//
// GENERAL ERRORS SECTION
//
/**
* Could not determine the origin of the fault. Best to check connectivity and try again
* */
export enum LightPushError {
GENERIC_FAIL = "Generic error",
/**
* The remote peer rejected the message. Information provided by the remote peer
* is logged. Review message validity, or mitigation for `NO_PEER_AVAILABLE`
* or `DECODE_FAILED` can be used.
*/
REMOTE_PEER_REJECTED = "Remote peer rejected",
/**
* Failure to protobuf decode the message. May be due to a remote peer issue,
* ensuring that messages are sent via several peer enable mitigation of this error.
*/
DECODE_FAILED = "Failed to decode",
/**
* Failure to find a peer with suitable protocols. This may due to a connection issue.
* Mitigation can be: retrying after a given time period, display connectivity issue
* to user or listening for `peer:connected:bootstrap` or `peer:connected:peer-exchange`
* on the connection manager before retrying.
*/
NO_PEER_AVAILABLE = "No peer available",
/**
* Failure to find a stream to the peer. This may be because the connection with the peer is not still alive.
* Mitigation can be: retrying after a given time period, or mitigation for `NO_PEER_AVAILABLE` can be used.
*/
NO_STREAM_AVAILABLE = "No stream available",
/**
* The remote peer did not behave as expected. Mitigation for `NO_PEER_AVAILABLE`
* or `DECODE_FAILED` can be used.
*/
NO_RESPONSE = "No response received",
//
// SEND ERRORS SECTION
//
/**
* Failure to protobuf encode the message. This is not recoverable and needs
* further investigation.
*/
ENCODE_FAILED = "Failed to encode",
/**
* The message payload is empty, making the message invalid. Ensure that a non-empty
* payload is set on the outgoing message.
*/
EMPTY_PAYLOAD = "Payload is empty",
/**
* The message size is above the maximum message size allowed on the Waku Network.
* Compressing the message or using an alternative strategy for large messages is recommended.
*/
SIZE_TOO_BIG = "Size is too big",
/**
* The PubsubTopic passed to the send function is not configured on the Waku node.
* Please ensure that the PubsubTopic is used when initializing the Waku node.
*/
TOPIC_NOT_CONFIGURED = "Topic not configured",
/**
* Fails when
*/
STREAM_ABORTED = "Stream aborted",
/**
* General proof generation error message.
* nwaku: https://github.com/waku-org/nwaku/blob/c3cb06ac6c03f0f382d3941ea53b330f6a8dd127/waku/waku_rln_relay/group_manager/group_manager_base.nim#L201C19-L201C42
*/
ENCODE_FAILED = "Failed to encode",
EMPTY_PAYLOAD = "Payload is empty",
SIZE_TOO_BIG = "Size is too big",
TOPIC_NOT_CONFIGURED = "Topic not configured",
RLN_PROOF_GENERATION = "Proof generation failed",
REMOTE_PEER_REJECTED = "Remote peer rejected",
//
// RECEIVE ERRORS SECTION
//
/**
* The pubsub topic configured on the decoder does not match the pubsub topic setup on the protocol.
* Ensure that the pubsub topic used for decoder creation is the same as the one used for protocol.
*/
TOPIC_DECODER_MISMATCH = "Topic decoder mismatch",
/**
* The topics passed in the decoders do not match each other, or don't exist at all.
* Ensure that all the pubsub topics used in the decoders are valid and match each other.
*/
INVALID_DECODER_TOPICS = "Invalid decoder topics"
BAD_REQUEST = "Bad request format",
PAYLOAD_TOO_LARGE = "Message payload exceeds maximum size",
INVALID_MESSAGE = "Message validation failed",
UNSUPPORTED_TOPIC = "Unsupported pubsub topic",
TOO_MANY_REQUESTS = "Rate limit exceeded",
INTERNAL_ERROR = "Internal server error",
UNAVAILABLE = "Service temporarily unavailable",
NO_RLN_PROOF = "RLN proof generation failed",
NO_PEERS = "No relay peers available"
}
export interface Failure {
error: ProtocolError;
export enum FilterError {
// General errors
GENERIC_FAIL = "Generic error",
DECODE_FAILED = "Failed to decode",
NO_PEER_AVAILABLE = "No peer available",
NO_STREAM_AVAILABLE = "No stream available",
NO_RESPONSE = "No response received",
STREAM_ABORTED = "Stream aborted",
// Filter specific errors
REMOTE_PEER_REJECTED = "Remote peer rejected",
TOPIC_NOT_CONFIGURED = "Topic not configured",
SUBSCRIPTION_FAILED = "Subscription failed",
UNSUBSCRIBE_FAILED = "Unsubscribe failed",
PING_FAILED = "Ping failed",
TOPIC_DECODER_MISMATCH = "Topic decoder mismatch",
INVALID_DECODER_TOPICS = "Invalid decoder topics",
SUBSCRIPTION_LIMIT_EXCEEDED = "Subscription limit exceeded",
INVALID_CONTENT_TOPIC = "Invalid content topic",
PUSH_MESSAGE_FAILED = "Push message failed",
EMPTY_MESSAGE = "Empty message received",
MISSING_PUBSUB_TOPIC = "Pubsub topic missing from push message"
}
export interface LightPushFailure {
error: LightPushError;
peerId?: PeerId;
}
export type CoreProtocolResult = ThisOrThat<
export interface FilterFailure {
error: FilterError;
peerId?: PeerId;
}
export type LightPushCoreResult = ThisOrThat<
"success",
PeerId,
"failure",
Failure
LightPushFailure
>;
export type FilterCoreResult = ThisOrThat<
"success",
PeerId,
"failure",
FilterFailure
>;
export type LightPushSDKResult = ThisAndThat<
"successes",
PeerId[],
"failures",
LightPushFailure[]
>;
export type FilterSDKResult = ThisAndThat<
"successes",
PeerId[],
"failures",
FilterFailure[]
>;
/**
* @deprecated replace usage by specific result types
*/
export type SDKProtocolResult = ThisAndThat<
"successes",
PeerId[],
"failures",
Failure[]
Array<{
error: ProtocolError;
peerId?: PeerId;
}>
>;
/**
* @deprecated replace usage by specific result types
*/
export enum ProtocolError {
GENERIC_FAIL = "Generic error",
REMOTE_PEER_REJECTED = "Remote peer rejected",
DECODE_FAILED = "Failed to decode",
NO_PEER_AVAILABLE = "No peer available",
NO_STREAM_AVAILABLE = "No stream available",
NO_RESPONSE = "No response received",
ENCODE_FAILED = "Failed to encode",
EMPTY_PAYLOAD = "Payload is empty",
SIZE_TOO_BIG = "Size is too big",
TOPIC_NOT_CONFIGURED = "Topic not configured",
STREAM_ABORTED = "Stream aborted",
RLN_PROOF_GENERATION = "Proof generation failed",
TOPIC_DECODER_MISMATCH = "Topic decoder mismatch",
INVALID_DECODER_TOPICS = "Invalid decoder topics"
}

View File

@ -1,5 +1,5 @@
import type { IEncoder, IMessage } from "./message.js";
import { SDKProtocolResult } from "./protocols.js";
import { LightPushSDKResult } from "./protocols.js";
export type ISendOptions = {
/**
@ -13,6 +13,13 @@ export type ISendOptions = {
* @default 3
*/
maxAttempts?: number;
/**
* Use v2 of the light push protocol.
* This parameter will be removed in the future.
* @default false
*/
useLegacy?: boolean;
};
export interface ISender {
@ -20,5 +27,5 @@ export interface ISender {
encoder: IEncoder,
message: IMessage,
sendOptions?: ISendOptions
) => Promise<SDKProtocolResult>;
) => Promise<LightPushSDKResult>;
}

View File

@ -39,4 +39,4 @@ message LightPushResponseV3 {
uint32 status_code = 10;
optional string status_desc = 11;
optional uint32 relay_peer_count = 12;
}
}

View File

@ -19,9 +19,9 @@ import {
IRelay,
type IRoutingInfo,
Libp2p,
ProtocolError,
PubsubTopic,
SDKProtocolResult
LightPushError,
LightPushSDKResult,
PubsubTopic
} from "@waku/interfaces";
import { isWireSizeUnderCap, toAsyncIterator } from "@waku/utils";
import { pushOrInitMapSet } from "@waku/utils";
@ -127,7 +127,7 @@ export class Relay implements IRelay {
public async send(
encoder: IEncoder,
message: IMessage
): Promise<SDKProtocolResult> {
): Promise<LightPushSDKResult> {
const { pubsubTopic } = encoder;
if (!this.pubsubTopics.has(pubsubTopic)) {
log.error("Failed to send waku relay: topic not configured");
@ -135,7 +135,7 @@ export class Relay implements IRelay {
successes: [],
failures: [
{
error: ProtocolError.TOPIC_NOT_CONFIGURED
error: LightPushError.TOPIC_NOT_CONFIGURED
}
]
};
@ -148,7 +148,7 @@ export class Relay implements IRelay {
successes: [],
failures: [
{
error: ProtocolError.ENCODE_FAILED
error: LightPushError.ENCODE_FAILED
}
]
};
@ -160,7 +160,7 @@ export class Relay implements IRelay {
successes: [],
failures: [
{
error: ProtocolError.SIZE_TOO_BIG
error: LightPushError.SIZE_TOO_BIG
}
]
};

View File

@ -1,6 +1,11 @@
import { Peer, PeerId } from "@libp2p/interface";
import { createEncoder, Encoder, LightPushCodec } from "@waku/core";
import { Libp2p, ProtocolError } from "@waku/interfaces";
import {
createEncoder,
Encoder,
LightPushCodec,
LightPushCodecV2
} from "@waku/core";
import { Libp2p, LightPushError, LightPushStatusCode } from "@waku/interfaces";
import { createRoutingInfo } from "@waku/utils";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
@ -40,8 +45,8 @@ describe("LightPush SDK", () => {
const failures = result.failures ?? [];
expect(failures.length).to.be.eq(1);
expect(failures.some((v) => v.error === ProtocolError.NO_PEER_AVAILABLE)).to
.be.true;
expect(failures.some((v) => v.error === LightPushError.NO_PEER_AVAILABLE))
.to.be.true;
});
it("should send to specified number of peers of used peers", async () => {
@ -127,6 +132,45 @@ describe("LightPush SDK", () => {
expect(result.successes?.length).to.be.eq(1);
expect(result.failures?.length).to.be.eq(1);
});
describe("v3 protocol support", () => {
it("should work with v3 peers", async () => {
libp2p = mockLibp2p({
peers: [mockV3Peer("1"), mockV3Peer("2")]
});
});
it("should work with mixed v2 and v3 peers", async () => {
libp2p = mockLibp2p({
peers: [mockV2AndV3Peer("1"), mockPeer("2"), mockV3Peer("3")]
});
// Mock responses for different protocol versions
const v3Response = mockV3SuccessResponse(5);
const v2Response = mockV2SuccessResponse();
const v3ErrorResponse = mockV3ErrorResponse(
LightPushStatusCode.PAYLOAD_TOO_LARGE
);
const v2ErrorResponse = mockV2ErrorResponse("Message too large");
expect(v3Response.statusCode).to.eq(LightPushStatusCode.SUCCESS);
expect(v3Response.relayPeerCount).to.eq(5);
expect(v2Response.isSuccess).to.be.true;
expect(v3ErrorResponse.statusCode).to.eq(
LightPushStatusCode.PAYLOAD_TOO_LARGE
);
expect(v2ErrorResponse.isSuccess).to.be.false;
});
it("should handle v3 RLN errors", async () => {
const v3RLNError = mockV3RLNErrorResponse();
const v2RLNError = mockV2RLNErrorResponse();
expect(v3RLNError.statusCode).to.eq(LightPushStatusCode.NO_RLN_PROOF);
expect(v3RLNError.statusDesc).to.include("RLN proof generation failed");
expect(v2RLNError.info).to.include("RLN proof generation failed");
});
});
});
type MockLibp2pOptions = {
@ -136,7 +180,16 @@ type MockLibp2pOptions = {
function mockLibp2p(options?: MockLibp2pOptions): Libp2p {
const peers = options?.peers || [];
const peerStore = {
get: (id: any) => Promise.resolve(peers.find((p) => p.id === id))
get: (id: any) => {
const peer = peers.find((p) => p.id === id);
if (peer) {
return Promise.resolve({
...peer,
protocols: peer.protocols || [LightPushCodec]
});
}
return Promise.resolve(undefined);
}
};
return {
@ -179,9 +232,92 @@ function mockLightPush(options: MockLightPushOptions): LightPush {
return lightPush;
}
function mockPeer(id: string): Peer {
function mockPeer(id: string, protocols: string[] = [LightPushCodec]): Peer {
return {
id,
protocols: [LightPushCodec]
} as unknown as Peer;
id: { toString: () => id } as PeerId,
protocols: protocols,
metadata: new Map(),
addresses: [],
tags: new Map()
};
}
// V3-specific mock functions
function mockV3Peer(id: string): Peer {
return mockPeer(id, [LightPushCodec]);
}
function mockV2AndV3Peer(id: string): Peer {
return mockPeer(id, [LightPushCodec, LightPushCodecV2]);
}
function mockV3SuccessResponse(relayPeerCount?: number): {
statusCode: LightPushStatusCode;
statusDesc: string;
relayPeerCount?: number;
isSuccess: boolean;
} {
return {
statusCode: LightPushStatusCode.SUCCESS,
statusDesc: "Message sent successfully",
relayPeerCount,
isSuccess: true
};
}
function mockV3ErrorResponse(
statusCode: LightPushStatusCode,
statusDesc?: string
): {
statusCode: LightPushStatusCode;
statusDesc: string;
isSuccess: boolean;
} {
return {
statusCode,
statusDesc: statusDesc || "Error occurred",
isSuccess: false
};
}
function mockV2SuccessResponse(): {
isSuccess: boolean;
info: string;
} {
return {
isSuccess: true,
info: "Message sent successfully"
};
}
function mockV2ErrorResponse(info?: string): {
isSuccess: boolean;
info: string;
} {
return {
isSuccess: false,
info: info || "Error occurred"
};
}
function mockV3RLNErrorResponse(): {
statusCode: LightPushStatusCode;
statusDesc: string;
isSuccess: boolean;
} {
return {
statusCode: LightPushStatusCode.NO_RLN_PROOF,
statusDesc: "RLN proof generation failed",
isSuccess: false
};
}
function mockV2RLNErrorResponse(): {
isSuccess: boolean;
info: string;
} {
return {
isSuccess: false,
info: "RLN proof generation failed"
};
}

View File

@ -1,17 +1,17 @@
import type { PeerId } from "@libp2p/interface";
import { LightPushCore } from "@waku/core";
import {
type CoreProtocolResult,
Failure,
type IEncoder,
ILightPush,
type IMessage,
type ISendOptions,
type Libp2p,
LightPushCoreResult,
LightPushError,
LightPushFailure,
type LightPushProtocolOptions,
ProtocolError,
Protocols,
SDKProtocolResult
LightPushSDKResult,
Protocols
} from "@waku/interfaces";
import { Logger } from "@waku/utils";
@ -55,7 +55,7 @@ export class LightPush implements ILightPush {
});
}
public get multicodec(): string {
public get multicodec(): string[] {
return this.protocol.multicodec;
}
@ -71,8 +71,9 @@ export class LightPush implements ILightPush {
encoder: IEncoder,
message: IMessage,
options: ISendOptions = {}
): Promise<SDKProtocolResult> {
): Promise<LightPushSDKResult> {
options = {
useLegacy: false,
...this.config,
...options
};
@ -82,45 +83,48 @@ export class LightPush implements ILightPush {
log.info("send: attempting to send a message to pubsubTopic:", pubsubTopic);
const peerIds = await this.peerManager.getPeers({
protocol: Protocols.LightPush,
protocol: options.useLegacy ? "light-push-v2" : Protocols.LightPush,
pubsubTopic: encoder.pubsubTopic
});
const coreResults: CoreProtocolResult[] =
const coreResults =
peerIds?.length > 0
? await Promise.all(
peerIds.map((peerId) =>
this.protocol.send(encoder, message, peerId).catch((_e) => ({
success: null,
failure: {
error: ProtocolError.GENERIC_FAIL
}
}))
this.protocol
.send(encoder, message, peerId, options.useLegacy)
.catch((_e) => ({
success: null,
failure: {
error: LightPushError.GENERIC_FAIL
}
}))
)
)
: [];
const results: SDKProtocolResult = coreResults.length
const results: LightPushSDKResult = coreResults.length
? {
successes: coreResults
.filter((v) => v.success)
.map((v) => v.success) as PeerId[],
failures: coreResults
.filter((v) => v.failure)
.map((v) => v.failure) as Failure[]
.map((v) => v.failure) as LightPushFailure[]
}
: {
successes: [],
failures: [
{
error: ProtocolError.NO_PEER_AVAILABLE
error: LightPushError.NO_PEER_AVAILABLE
}
]
};
if (options.autoRetry && results.successes.length === 0) {
const sendCallback = (peerId: PeerId): Promise<CoreProtocolResult> =>
this.protocol.send(encoder, message, peerId);
const sendCallback = (peerId: PeerId): Promise<LightPushCoreResult> =>
this.protocol.send(encoder, message, peerId, options.useLegacy);
this.retryManager.push(
sendCallback.bind(this),
options.maxAttempts || DEFAULT_MAX_ATTEMPTS,

View File

@ -1,6 +1,7 @@
import type { PeerId } from "@libp2p/interface";
import {
type CoreProtocolResult,
type LightPushCoreResult,
LightPushError,
ProtocolError,
Protocols
} from "@waku/interfaces";
@ -59,7 +60,7 @@ describe("RetryManager", () => {
it("should process tasks in queue", async () => {
const successCallback = sinon.spy(
async (peerId: PeerId): Promise<CoreProtocolResult> => ({
async (peerId: PeerId): Promise<LightPushCoreResult> => ({
success: peerId,
failure: null
})
@ -112,9 +113,9 @@ describe("RetryManager", () => {
it("should retry failed tasks", async () => {
const failingCallback = sinon.spy(
async (): Promise<CoreProtocolResult> => ({
async (): Promise<LightPushCoreResult> => ({
success: null,
failure: { error: "test error" as any }
failure: { error: LightPushError.GENERIC_FAIL }
})
);
@ -135,7 +136,7 @@ describe("RetryManager", () => {
});
it("should request peer renewal on specific errors", async () => {
const errorCallback = sinon.spy(async (): Promise<CoreProtocolResult> => {
const errorCallback = sinon.spy(async (): Promise<LightPushCoreResult> => {
throw new Error(ProtocolError.NO_PEER_AVAILABLE);
});
@ -155,7 +156,7 @@ describe("RetryManager", () => {
});
it("should handle task timeouts", async () => {
const slowCallback = sinon.spy(async (): Promise<CoreProtocolResult> => {
const slowCallback = sinon.spy(async (): Promise<LightPushCoreResult> => {
await new Promise((resolve) => setTimeout(resolve, 15000));
return { success: mockPeerId, failure: null };
});
@ -174,9 +175,11 @@ describe("RetryManager", () => {
});
it("should not execute task if max attempts is 0", async () => {
const failingCallback = sinon.spy(async (): Promise<CoreProtocolResult> => {
throw new Error("test error" as any);
});
const failingCallback = sinon.spy(
async (): Promise<LightPushCoreResult> => {
throw new Error("test error" as any);
}
);
const task = {
callback: failingCallback,
@ -209,7 +212,7 @@ describe("RetryManager", () => {
called++;
return Promise.resolve({
success: null,
failure: { error: ProtocolError.GENERIC_FAIL }
failure: { error: LightPushError.GENERIC_FAIL }
});
});
retryManager.push(failCallback, 2, TestRoutingInfo);

View File

@ -1,7 +1,7 @@
import type { PeerId } from "@libp2p/interface";
import {
type CoreProtocolResult,
type IRoutingInfo,
type LightPushCoreResult,
Protocols
} from "@waku/interfaces";
import { Logger } from "@waku/utils";
@ -15,7 +15,7 @@ type RetryManagerConfig = {
peerManager: PeerManager;
};
type AttemptCallback = (peerId: PeerId) => Promise<CoreProtocolResult>;
type AttemptCallback = (peerId: PeerId) => Promise<LightPushCoreResult>;
export type ScheduledTask = {
maxAttempts: number;
@ -123,7 +123,13 @@ export class RetryManager {
task.callback(peerId)
]);
if (response?.failure) {
// If timeout resolves first, response will be void (undefined)
// In this case, we should treat it as a timeout error
if (response === undefined) {
throw new Error("Task timeout");
}
if (response.failure) {
throw Error(response.failure.error);
}

View File

@ -1,13 +1,13 @@
import { ProtocolError } from "@waku/interfaces";
import { LightPushError } from "@waku/interfaces";
export const shouldPeerBeChanged = (
failure: string | ProtocolError
failure: string | LightPushError
): boolean => {
const toBeChanged =
failure === ProtocolError.REMOTE_PEER_REJECTED ||
failure === ProtocolError.NO_RESPONSE ||
failure === ProtocolError.RLN_PROOF_GENERATION ||
failure === ProtocolError.NO_PEER_AVAILABLE;
failure === LightPushError.REMOTE_PEER_REJECTED ||
failure === LightPushError.NO_RESPONSE ||
failure === LightPushError.RLN_PROOF_GENERATION ||
failure === LightPushError.NO_PEER_AVAILABLE;
if (toBeChanged) {
return true;

View File

@ -85,7 +85,8 @@ describe("PeerManager", () => {
_clusterId: ClusterId,
_shardId: ShardId
) => true,
isPeerOnTopic: async (_id: PeerId, _topic: string) => true
isPeerOnTopic: async (_id: PeerId, _topic: string) => true,
hasShardInfo: async (_id: PeerId) => true
} as unknown as IConnectionManager;
peerManager = new PeerManager({
libp2p,

View File

@ -4,7 +4,12 @@ import {
PeerId,
TypedEventEmitter
} from "@libp2p/interface";
import { FilterCodecs, LightPushCodec, StoreCodec } from "@waku/core";
import {
FilterCodecs,
LightPushCodec,
LightPushCodecV2,
StoreCodec
} from "@waku/core";
import {
CONNECTION_LOCKED_TAG,
type IConnectionManager,
@ -28,8 +33,10 @@ type PeerManagerParams = {
connectionManager: IConnectionManager;
};
type SupportedProtocols = Protocols | "light-push-v2";
type GetPeersParams = {
protocol: Protocols;
protocol: SupportedProtocols;
pubsubTopic: string;
};
@ -119,7 +126,7 @@ export class PeerManager {
for (const peer of connectedPeers) {
const hasProtocol = this.hasPeerProtocol(peer, params.protocol);
const hasSamePubsub = await this.connectionManager.isPeerOnTopic(
const hasSamePubsub = await this.isPeerOnPubsub(
peer.id,
params.pubsubTopic
);
@ -204,12 +211,19 @@ export class PeerManager {
private async onConnected(event: CustomEvent<IdentifyResult>): Promise<void> {
const result = event.detail;
if (
result.protocols.includes(this.matchProtocolToCodec(Protocols.Filter))
) {
const isFilterPeer = result.protocols.includes(
this.getProtocolCodecs(Protocols.Filter)
);
const isStorePeer = result.protocols.includes(
this.getProtocolCodecs(Protocols.Store)
);
if (isFilterPeer) {
this.dispatchFilterPeerConnect(result.peerId);
}
if (result.protocols.includes(this.matchProtocolToCodec(Protocols.Store))) {
if (isStorePeer) {
this.dispatchStorePeerConnect(result.peerId);
}
}
@ -230,8 +244,8 @@ export class PeerManager {
}
}
private hasPeerProtocol(peer: Peer, protocol: Protocols): boolean {
return peer.protocols.includes(this.matchProtocolToCodec(protocol));
private hasPeerProtocol(peer: Peer, protocol: SupportedProtocols): boolean {
return peer.protocols.includes(this.getProtocolCodecs(protocol));
}
private lockPeer(id: PeerId): void {
@ -289,14 +303,18 @@ export class PeerManager {
);
}
private matchProtocolToCodec(protocol: Protocols): string {
const protocolToCodec = {
private getProtocolCodecs(protocol: SupportedProtocols): string {
if (protocol === Protocols.Relay) {
throw new Error("Relay protocol is not supported");
}
const protocolToCodecs = {
[Protocols.Filter]: FilterCodecs.SUBSCRIBE,
[Protocols.LightPush]: LightPushCodec,
[Protocols.Store]: StoreCodec,
[Protocols.Relay]: ""
"light-push-v2": LightPushCodecV2
};
return protocolToCodec[protocol];
return protocolToCodecs[protocol];
}
}

View File

@ -1,5 +1,10 @@
import type { Connection, Peer, PeerStore } from "@libp2p/interface";
import { FilterCodecs, LightPushCodec, StoreCodec } from "@waku/core";
import {
FilterCodecs,
LightPushCodec,
LightPushCodecV2,
StoreCodec
} from "@waku/core";
import { IRelay, Protocols } from "@waku/interfaces";
import { expect } from "chai";
import sinon from "sinon";
@ -114,7 +119,10 @@ describe("waitForRemotePeer", () => {
err = e as Error;
}
expect(addEventListenerSpy.calledOnceWith("peer:identify")).to.be.true;
expect(addEventListenerSpy.calledTwice).to.be.true;
addEventListenerSpy
.getCalls()
.forEach((c) => expect(c.firstArg).to.equal("peer:identify"));
expect(err).not.to.be.undefined;
expect(err!.message).to.be.eq("Timed out waiting for a remote peer.");
@ -148,9 +156,12 @@ describe("waitForRemotePeer", () => {
});
it("should wait for LightPush peer to be connected", async () => {
let call = 0;
const addEventListenerSpy = sinon.spy(
(_type: string, _cb: (e: any) => void) => {
_cb({ detail: { protocols: [LightPushCodec] } });
const proto = call === 0 ? LightPushCodec : LightPushCodecV2;
call++;
_cb({ detail: { protocols: [proto] } });
}
);
eventTarget.addEventListener = addEventListenerSpy;
@ -174,7 +185,10 @@ describe("waitForRemotePeer", () => {
err = e as Error;
}
expect(addEventListenerSpy.calledOnceWith("peer:identify")).to.be.true;
expect(addEventListenerSpy.calledTwice).to.be.true;
addEventListenerSpy
.getCalls()
.forEach((c) => expect(c.firstArg).to.equal("peer:identify"));
expect(err).to.be.undefined;
// check with metadata serivice
@ -196,8 +210,10 @@ describe("waitForRemotePeer", () => {
err = e as Error;
}
expect(addEventListenerSpy.calledTwice).to.be.true;
expect(addEventListenerSpy.lastCall.calledWith("peer:identify")).to.be.true;
expect(addEventListenerSpy.callCount).to.equal(4);
addEventListenerSpy
.getCalls()
.forEach((c) => expect(c.firstArg).to.equal("peer:identify"));
expect(err).to.be.undefined;
});

View File

@ -1,5 +1,10 @@
import type { IdentifyResult } from "@libp2p/interface";
import { FilterCodecs, LightPushCodec, StoreCodec } from "@waku/core";
import {
FilterCodecs,
LightPushCodec,
LightPushCodecV2,
StoreCodec
} from "@waku/core";
import type { IWaku, Libp2p } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { Logger } from "@waku/utils";
@ -82,6 +87,13 @@ export async function waitForRemotePeer(
type EventListener = (_: CustomEvent<IdentifyResult>) => void;
function protocolToPeerPromise(
codecs: string[],
libp2p: Libp2p
): Promise<void>[] {
return codecs.map((codec) => waitForConnectedPeer(codec, libp2p));
}
/**
* Waits for required peers to be connected.
*/
@ -96,15 +108,21 @@ async function waitForProtocols(
}
if (waku.store && protocols.includes(Protocols.Store)) {
promises.push(waitForConnectedPeer(StoreCodec, waku.libp2p));
promises.push(...protocolToPeerPromise([StoreCodec], waku.libp2p));
}
if (waku.lightPush && protocols.includes(Protocols.LightPush)) {
promises.push(waitForConnectedPeer(LightPushCodec, waku.libp2p));
const lpPromises = protocolToPeerPromise(
[LightPushCodec, LightPushCodecV2],
waku.libp2p
);
promises.push(Promise.any(lpPromises));
}
if (waku.filter && protocols.includes(Protocols.Filter)) {
promises.push(waitForConnectedPeer(FilterCodecs.SUBSCRIBE, waku.libp2p));
promises.push(
...protocolToPeerPromise([FilterCodecs.SUBSCRIBE], waku.libp2p)
);
}
return Promise.all(promises);
@ -246,15 +264,17 @@ function getEnabledProtocols(waku: IWaku): Protocols[] {
function mapProtocolsToCodecs(protocols: Protocols[]): Map<string, boolean> {
const codecs: Map<string, boolean> = new Map();
const protocolToCodec: Record<string, string> = {
[Protocols.Filter]: FilterCodecs.SUBSCRIBE,
[Protocols.LightPush]: LightPushCodec,
[Protocols.Store]: StoreCodec
const protocolToCodec: Record<string, string[]> = {
[Protocols.Filter]: [FilterCodecs.SUBSCRIBE],
[Protocols.LightPush]: [LightPushCodec, LightPushCodecV2],
[Protocols.Store]: [StoreCodec]
};
for (const protocol of protocols) {
if (protocolToCodec[protocol]) {
codecs.set(protocolToCodec[protocol], false);
protocolToCodec[protocol].forEach((codec) => {
codecs.set(codec, false);
});
}
}

View File

@ -182,7 +182,7 @@ export class WakuNode implements IWaku {
}
if (_protocols.includes(Protocols.LightPush)) {
if (this.lightPush) {
codecs.push(this.lightPush.multicodec);
codecs.push(...this.lightPush.multicodec);
} else {
log.error(
"Light Push codec not included in dial codec: protocol not mounted locally"

View File

@ -1,5 +1,5 @@
import { createEncoder } from "@waku/core";
import { IRateLimitProof, LightNode, ProtocolError } from "@waku/interfaces";
import { IRateLimitProof, LightNode, LightPushError } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
@ -21,9 +21,9 @@ import {
TestRoutingInfo
} from "./utils.js";
const runTests = (strictNodeCheck: boolean): void => {
const runTests = (strictNodeCheck: boolean, useLegacy: boolean): void => {
const numServiceNodes = 2;
describe(`Waku Light Push: Multiple Nodes: Strict Check: ${strictNodeCheck}`, function () {
describe(`Waku Light Push (legacy=${useLegacy ? "v2" : "v3"}): Multiple Nodes: Strict Check: ${strictNodeCheck}`, function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
this.timeout(15000);
let waku: LightNode;
@ -36,7 +36,8 @@ const runTests = (strictNodeCheck: boolean): void => {
{ lightpush: true, filter: true },
strictNodeCheck,
numServiceNodes,
true
true,
{ lightPush: { useLegacy } }
);
});
@ -95,7 +96,7 @@ const runTests = (strictNodeCheck: boolean): void => {
expect(pushResponse.successes.length).to.eq(0);
expect(pushResponse.failures?.map((failure) => failure.error)).to.include(
ProtocolError.EMPTY_PAYLOAD
LightPushError.EMPTY_PAYLOAD
);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
@ -174,7 +175,7 @@ const runTests = (strictNodeCheck: boolean): void => {
expect(pushResponse.successes.length).to.eq(0);
expect(pushResponse.failures?.map((failure) => failure.error)).to.include(
ProtocolError.REMOTE_PEER_REJECTED
LightPushError.REMOTE_PEER_REJECTED
);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
false
@ -248,7 +249,7 @@ const runTests = (strictNodeCheck: boolean): void => {
});
expect(pushResponse.successes.length).to.eq(0);
expect(pushResponse.failures?.map((failure) => failure.error)).to.include(
ProtocolError.SIZE_TOO_BIG
LightPushError.SIZE_TOO_BIG
);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
false
@ -257,4 +258,6 @@ const runTests = (strictNodeCheck: boolean): void => {
});
};
[true, false].map(runTests);
[true, false].forEach((strictNodeCheck) => {
[true, false].forEach((legacy) => runTests(strictNodeCheck, legacy));
});

View File

@ -1,5 +1,5 @@
import { createEncoder } from "@waku/core";
import { LightNode, Protocols } from "@waku/interfaces";
import { IWaku, Protocols } from "@waku/interfaces";
import { createRoutingInfo } from "@waku/utils";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
@ -28,7 +28,7 @@ describe("Waku Light Push (Autosharding): Multiple Shards", function () {
this.timeout(30000);
const numServiceNodes = 2;
let waku: LightNode;
let waku: IWaku;
let serviceNodes: ServiceNodesFleet;
const customContentTopic2 = "/test/2/waku-light-push/utf8";
@ -48,6 +48,7 @@ describe("Waku Light Push (Autosharding): Multiple Shards", function () {
{
lightpush: true,
filter: true,
relay: true,
contentTopic: [TestEncoder.contentTopic, customEncoder2.contentTopic]
},
false,
@ -60,45 +61,56 @@ describe("Waku Light Push (Autosharding): Multiple Shards", function () {
await teardownNodesWithRedundancy(serviceNodes, waku);
});
it("Subscribe and receive messages on 2 different pubsubtopics", async function () {
if (customRoutingInfo2.pubsubTopic === TestEncoder.pubsubTopic)
throw "Invalid test, both encoder uses same shard";
[true, false].forEach((useLegacy) => {
it(`Subscribe and receive messages on 2 different pubsubtopics with ${useLegacy ? "v2" : "v3"} protocol`, async function () {
if (customRoutingInfo2.pubsubTopic === TestEncoder.pubsubTopic)
throw "Invalid test, both encoder uses same shard";
const pushResponse1 = await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("M1")
});
const pushResponse2 = await waku.lightPush.send(customEncoder2, {
payload: utf8ToBytes("M2")
});
const pushResponse1 = await waku.lightPush!.send(
TestEncoder,
{
payload: utf8ToBytes("M1")
},
{ useLegacy }
);
expect(pushResponse1.successes.length).to.eq(numServiceNodes);
expect(pushResponse2.successes.length).to.eq(numServiceNodes);
const pushResponse2 = await waku.lightPush!.send(
customEncoder2,
{
payload: utf8ToBytes("M2")
},
{ useLegacy }
);
const messageCollector1 = new MessageCollector(serviceNodes.nodes[0]);
const messageCollector2 = new MessageCollector(serviceNodes.nodes[1]);
expect(pushResponse1?.successes.length).to.eq(numServiceNodes);
expect(pushResponse2?.successes.length).to.eq(numServiceNodes);
expect(
await messageCollector1.waitForMessagesAutosharding(1, {
contentTopic: TestEncoder.contentTopic
})
).to.eq(true);
const messageCollector1 = new MessageCollector(serviceNodes.nodes[0]);
const messageCollector2 = new MessageCollector(serviceNodes.nodes[1]);
expect(
await messageCollector2.waitForMessagesAutosharding(1, {
contentTopic: customEncoder2.contentTopic
})
).to.eq(true);
expect(
await messageCollector1.waitForMessagesAutosharding(1, {
contentTopic: TestEncoder.contentTopic
})
).to.eq(true);
messageCollector1.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: TestEncoder.contentTopic,
expectedPubsubTopic: TestEncoder.pubsubTopic
});
expect(
await messageCollector2.waitForMessagesAutosharding(1, {
contentTopic: customEncoder2.contentTopic
})
).to.eq(true);
messageCollector2.verifyReceivedMessage(0, {
expectedMessageText: "M2",
expectedContentTopic: customEncoder2.contentTopic,
expectedPubsubTopic: customEncoder2.pubsubTopic
messageCollector1.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: TestEncoder.contentTopic,
expectedPubsubTopic: TestEncoder.pubsubTopic
});
messageCollector2.verifyReceivedMessage(0, {
expectedMessageText: "M2",
expectedContentTopic: customEncoder2.contentTopic,
expectedPubsubTopic: customEncoder2.pubsubTopic
});
});
});
@ -122,10 +134,10 @@ describe("Waku Light Push (Autosharding): Multiple Shards", function () {
const messageCollector2 = new MessageCollector(nwaku2);
await waku.lightPush.send(TestEncoder, {
await waku.lightPush!.send(TestEncoder, {
payload: utf8ToBytes("M1")
});
await waku.lightPush.send(customEncoder2, {
await waku.lightPush!.send(customEncoder2, {
payload: utf8ToBytes("M2")
});

View File

@ -0,0 +1,83 @@
import { LightNode } from "@waku/interfaces";
import { createLightNode, utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
import {
afterEachCustom,
beforeEachCustom,
NOISE_KEY_2,
runMultipleNodes,
ServiceNodesFleet,
teardownNodesWithRedundancy
} from "../../src/index.js";
import { DEFAULT_DISCOVERIES_ENABLED } from "../../src/lib/runNodes.js";
import { TestContentTopic, TestEncoder, TestRoutingInfo } from "./utils.js";
describe(`Waku Light Push V2 and V3 interop`, function () {
this.timeout(15000);
let waku: LightNode;
let waku2: LightNode;
let serviceNodes: ServiceNodesFleet;
beforeEachCustom(this, async () => {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
TestRoutingInfo,
{ lightpush: true, filter: true, relay: true },
true,
2,
true
);
waku2 = await createLightNode({
staticNoiseKey: NOISE_KEY_2,
libp2p: {
addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] }
},
networkConfig: TestRoutingInfo.networkConfig,
lightPush: { numPeersToUse: 1 },
discovery: DEFAULT_DISCOVERIES_ENABLED
});
await waku2.dial(await serviceNodes.nodes[1].getMultiaddrWithId());
});
afterEachCustom(this, async () => {
await teardownNodesWithRedundancy(serviceNodes, [waku, waku2]);
});
it(`Push messages througth V2 and V3 from 2 js-waku and receives`, async function () {
let pushResponse = await waku.lightPush.send(
TestEncoder,
{
payload: utf8ToBytes("v2")
},
{ useLegacy: true }
);
expect(pushResponse.successes.length).to.eq(2);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(true);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "v2",
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestRoutingInfo.pubsubTopic
});
pushResponse = await waku2.lightPush.send(
TestEncoder,
{
payload: utf8ToBytes("v3")
},
{ useLegacy: false }
);
expect(pushResponse.successes.length).to.eq(1);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(true);
serviceNodes.messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "v3",
expectedContentTopic: TestContentTopic,
expectedPubsubTopic: TestRoutingInfo.pubsubTopic
});
});
});