feat: add v3 protocol support tests and exports

- Export LightPushCodecV3 and LightPushCodecs from core package
- Add v3 protocol support tests with status code validation
- Add mock functions for v3 responses and RLN errors
- Test mixed v2/v3 peer scenarios
- Validate protocol error mapping and status code handling
- Fix linting errors by adding explicit return types
This commit is contained in:
Arseniy Klempner 2025-07-14 18:49:16 -07:00
parent 539c92f1bb
commit 41301ea64a
No known key found for this signature in database
GPG Key ID: 51653F18863BD24B
11 changed files with 418 additions and 168 deletions

View File

@ -10,7 +10,12 @@ export * as waku_filter from "./lib/filter/index.js";
export { FilterCore, FilterCodecs } from "./lib/filter/index.js";
export * as waku_light_push from "./lib/light_push/index.js";
export { LightPushCodec, LightPushCore } from "./lib/light_push/index.js";
export {
LightPushCodec,
LightPushCodecV3,
LightPushCodecs,
LightPushCore
} from "./lib/light_push/index.js";
export * as waku_store from "./lib/store/index.js";
export { StoreCore, StoreCodec } from "./lib/store/index.js";

View File

@ -2,9 +2,9 @@ import type { PeerId, Stream } from "@libp2p/interface";
import type { IncomingStreamData } from "@libp2p/interface-internal";
import {
type ContentTopic,
type CoreProtocolResult,
type FilterCoreResult,
FilterError,
type Libp2p,
ProtocolError,
type PubsubTopic
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
@ -62,7 +62,7 @@ export class FilterCore {
pubsubTopic: PubsubTopic,
peerId: PeerId,
contentTopics: ContentTopic[]
): Promise<CoreProtocolResult> {
): Promise<FilterCoreResult> {
const stream = await this.streamManager.getStream(peerId);
const request = FilterSubscribeRpc.createSubscribeRequest(
@ -88,7 +88,7 @@ export class FilterCore {
return {
success: null,
failure: {
error: ProtocolError.GENERIC_FAIL,
error: FilterError.SUBSCRIPTION_FAILED,
peerId: peerId
}
};
@ -103,8 +103,11 @@ export class FilterCore {
);
return {
failure: {
error: ProtocolError.REMOTE_PEER_REJECTED,
peerId: peerId
error: FilterError.REMOTE_PEER_REJECTED,
peerId: peerId,
statusCode: statusCode,
statusDesc: statusDesc,
requestId: requestId
},
success: null
};
@ -120,7 +123,7 @@ export class FilterCore {
pubsubTopic: PubsubTopic,
peerId: PeerId,
contentTopics: ContentTopic[]
): Promise<CoreProtocolResult> {
): Promise<FilterCoreResult> {
let stream: Stream | undefined;
try {
stream = await this.streamManager.getStream(peerId);
@ -132,7 +135,7 @@ export class FilterCore {
return {
success: null,
failure: {
error: ProtocolError.NO_STREAM_AVAILABLE,
error: FilterError.NO_STREAM_AVAILABLE,
peerId: peerId
}
};
@ -150,7 +153,7 @@ export class FilterCore {
return {
success: null,
failure: {
error: ProtocolError.GENERIC_FAIL,
error: FilterError.GENERIC_FAIL,
peerId: peerId
}
};
@ -165,7 +168,7 @@ export class FilterCore {
public async unsubscribeAll(
pubsubTopic: PubsubTopic,
peerId: PeerId
): Promise<CoreProtocolResult> {
): Promise<FilterCoreResult> {
const stream = await this.streamManager.getStream(peerId);
const request = FilterSubscribeRpc.createUnsubscribeAllRequest(pubsubTopic);
@ -181,7 +184,7 @@ export class FilterCore {
if (!res || !res.length) {
return {
failure: {
error: ProtocolError.NO_RESPONSE,
error: FilterError.NO_RESPONSE,
peerId: peerId
},
success: null
@ -197,7 +200,7 @@ export class FilterCore {
);
return {
failure: {
error: ProtocolError.REMOTE_PEER_REJECTED,
error: FilterError.REMOTE_PEER_REJECTED,
peerId: peerId
},
success: null
@ -210,7 +213,7 @@ export class FilterCore {
};
}
public async ping(peerId: PeerId): Promise<CoreProtocolResult> {
public async ping(peerId: PeerId): Promise<FilterCoreResult> {
let stream: Stream | undefined;
try {
stream = await this.streamManager.getStream(peerId);
@ -222,7 +225,7 @@ export class FilterCore {
return {
success: null,
failure: {
error: ProtocolError.NO_STREAM_AVAILABLE,
error: FilterError.NO_STREAM_AVAILABLE,
peerId: peerId
}
};
@ -244,7 +247,7 @@ export class FilterCore {
return {
success: null,
failure: {
error: ProtocolError.GENERIC_FAIL,
error: FilterError.GENERIC_FAIL,
peerId: peerId
}
};
@ -254,7 +257,7 @@ export class FilterCore {
return {
success: null,
failure: {
error: ProtocolError.NO_RESPONSE,
error: FilterError.NO_RESPONSE,
peerId: peerId
}
};
@ -270,7 +273,7 @@ export class FilterCore {
return {
success: null,
failure: {
error: ProtocolError.REMOTE_PEER_REJECTED,
error: FilterError.REMOTE_PEER_REJECTED,
peerId: peerId
}
};

View File

@ -1 +1,7 @@
export { LightPushCore, LightPushCodec, PushResponse } from "./light_push.js";
export {
LightPushCore,
LightPushCodec,
LightPushCodecV3,
LightPushCodecs,
PushResponse
} from "./light_push.js";

View File

@ -1,13 +1,13 @@
import type { PeerId, Stream } from "@libp2p/interface";
import {
type CoreProtocolResult,
type IEncoder,
type IMessage,
isSuccess as isV3Success,
type Libp2p,
ProtocolError,
type LightPushCoreResult,
LightPushError,
type ThisOrThat,
toProtocolError
toLightPushError
} from "@waku/interfaces";
import { PushResponse } from "@waku/proto";
import { isMessageSizeUnderCap } from "@waku/utils";
@ -30,7 +30,12 @@ export const LightPushCodecV3 = "/vac/waku/lightpush/3.0.0";
export const LightPushCodecs = [LightPushCodecV3, LightPushCodec];
export { PushResponse };
type PreparePushMessageResult = ThisOrThat<"query", PushRpc>;
type PreparePushMessageResult = ThisOrThat<
"query",
PushRpc,
"error",
LightPushError
>;
/**
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
@ -89,12 +94,12 @@ export class LightPushCore {
try {
if (!message.payload || message.payload.length === 0) {
log.error("Failed to send waku light push: payload is empty");
return { query: null, error: ProtocolError.EMPTY_PAYLOAD };
return { query: null, error: LightPushError.EMPTY_PAYLOAD };
}
if (!(await isMessageSizeUnderCap(encoder, message))) {
log.error("Failed to send waku light push: message is bigger than 1MB");
return { query: null, error: ProtocolError.SIZE_TOO_BIG };
return { query: null, error: LightPushError.SIZE_TOO_BIG };
}
const protoMessage = await encoder.toProtoObj(message);
@ -102,7 +107,7 @@ export class LightPushCore {
log.error("Failed to encode to protoMessage, aborting push");
return {
query: null,
error: ProtocolError.ENCODE_FAILED
error: LightPushError.ENCODE_FAILED
};
}
@ -113,7 +118,7 @@ export class LightPushCore {
return {
query: null,
error: ProtocolError.GENERIC_FAIL
error: LightPushError.GENERIC_FAIL
};
}
}
@ -122,7 +127,7 @@ export class LightPushCore {
encoder: IEncoder,
message: IMessage,
peerId: PeerId
): Promise<CoreProtocolResult> {
): Promise<LightPushCoreResult> {
const { query, error: preparationError } = await this.preparePushMessage(
encoder,
message
@ -154,7 +159,7 @@ export class LightPushCore {
return {
success: null,
failure: {
error: ProtocolError.NO_STREAM_AVAILABLE,
error: LightPushError.NO_STREAM_AVAILABLE,
peerId: peerId
}
};
@ -176,7 +181,7 @@ export class LightPushCore {
return {
success: null,
failure: {
error: ProtocolError.STREAM_ABORTED,
error: LightPushError.STREAM_ABORTED,
peerId: peerId
}
};
@ -195,7 +200,7 @@ export class LightPushCore {
return {
success: null,
failure: {
error: ProtocolError.DECODE_FAILED,
error: LightPushError.DECODE_FAILED,
peerId: peerId
}
};
@ -206,7 +211,7 @@ export class LightPushCore {
return {
success: null,
failure: {
error: ProtocolError.NO_RESPONSE,
error: LightPushError.NO_RESPONSE,
peerId: peerId
}
};
@ -214,7 +219,7 @@ export class LightPushCore {
if (protocol === LightPushCodecV3 && response.statusCode !== undefined) {
if (!isV3Success(response.statusCode)) {
const error = toProtocolError(response.statusCode);
const error = toLightPushError(response.statusCode);
log.error(
`Remote peer rejected with v3 status code ${response.statusCode}: ${response.statusDesc || response.info}`
);
@ -222,7 +227,9 @@ export class LightPushCore {
success: null,
failure: {
error,
peerId: peerId
peerId: peerId,
statusCode: response.statusCode,
statusDesc: response.statusDesc || response.info
}
};
}
@ -239,7 +246,7 @@ export class LightPushCore {
return {
success: null,
failure: {
error: ProtocolError.RLN_PROOF_GENERATION,
error: LightPushError.RLN_PROOF_GENERATION,
peerId: peerId
}
};
@ -250,8 +257,9 @@ export class LightPushCore {
return {
success: null,
failure: {
error: ProtocolError.REMOTE_PEER_REJECTED,
peerId: peerId
error: LightPushError.REMOTE_PEER_REJECTED,
peerId: peerId,
statusDesc: response.info
}
};
}

View File

@ -1,5 +1,5 @@
import { ProtocolError } from "./protocols.js";
import type { ISender, ISendOptions } from "./sender.js";
import { LightPushError } from "./protocols.js";
import type { ILightPushSender, ISendOptions } from "./sender.js";
export type LightPushProtocolOptions = ISendOptions & {
/**
@ -16,7 +16,7 @@ export type LightPushProtocolOptions = ISendOptions & {
numPeersToUse?: number;
};
export type ILightPush = ISender & {
export type ILightPush = ILightPushSender & {
readonly multicodec: string;
start: () => void;
stop: () => void;
@ -53,35 +53,48 @@ export function isSuccess(statusCode: number | undefined): boolean {
return statusCode === LightPushStatusCode.SUCCESS;
}
export function toProtocolError(
export function toLightPushError(
statusCode: LightPushStatusCode | number | undefined
): ProtocolError {
): LightPushError {
if (!statusCode) {
return ProtocolError.GENERIC_FAIL;
return LightPushError.GENERIC_FAIL;
}
switch (statusCode) {
case LightPushStatusCode.SUCCESS:
return ProtocolError.GENERIC_FAIL;
return LightPushError.GENERIC_FAIL;
case LightPushStatusCode.BAD_REQUEST:
return LightPushError.BAD_REQUEST;
case LightPushStatusCode.INVALID_MESSAGE:
return LightPushError.INVALID_MESSAGE;
case LightPushStatusCode.TOO_MANY_REQUESTS:
return ProtocolError.REMOTE_PEER_REJECTED;
return LightPushError.TOO_MANY_REQUESTS;
case LightPushStatusCode.PAYLOAD_TOO_LARGE:
return ProtocolError.SIZE_TOO_BIG;
return LightPushError.PAYLOAD_TOO_LARGE;
case LightPushStatusCode.UNSUPPORTED_TOPIC:
return ProtocolError.TOPIC_NOT_CONFIGURED;
return LightPushError.UNSUPPORTED_TOPIC;
case LightPushStatusCode.UNAVAILABLE:
return LightPushError.UNAVAILABLE;
case LightPushStatusCode.NO_PEERS:
return ProtocolError.NO_PEER_AVAILABLE;
return LightPushError.NO_PEERS;
case LightPushStatusCode.NO_RLN_PROOF:
return ProtocolError.RLN_PROOF_GENERATION;
return LightPushError.NO_RLN_PROOF;
case LightPushStatusCode.INTERNAL_ERROR:
default:
return ProtocolError.GENERIC_FAIL;
return LightPushError.INTERNAL_ERROR;
}
}
// Legacy function for backward compatibility
/**
* @deprecated Use toLightPushError instead
*/
export function toProtocolError(
statusCode: LightPushStatusCode | number | undefined
): LightPushError {
return toLightPushError(statusCode);
}
export function getStatusDescription(
statusCode: number | undefined,
customDesc?: string

View File

@ -127,107 +127,138 @@ export type Callback<T extends IDecodedMessage> = (
msg: T
) => void | Promise<void>;
export enum ProtocolError {
//
// GENERAL ERRORS SECTION
//
/**
* Could not determine the origin of the fault. Best to check connectivity and try again
* */
// LightPush specific errors
export enum LightPushError {
// General errors
GENERIC_FAIL = "Generic error",
/**
* The remote peer rejected the message. Information provided by the remote peer
* is logged. Review message validity, or mitigation for `NO_PEER_AVAILABLE`
* or `DECODE_FAILED` can be used.
*/
REMOTE_PEER_REJECTED = "Remote peer rejected",
/**
* Failure to protobuf decode the message. May be due to a remote peer issue,
* ensuring that messages are sent via several peer enable mitigation of this error.
*/
DECODE_FAILED = "Failed to decode",
/**
* Failure to find a peer with suitable protocols. This may due to a connection issue.
* Mitigation can be: retrying after a given time period, display connectivity issue
* to user or listening for `peer:connected:bootstrap` or `peer:connected:peer-exchange`
* on the connection manager before retrying.
*/
NO_PEER_AVAILABLE = "No peer available",
/**
* Failure to find a stream to the peer. This may be because the connection with the peer is not still alive.
* Mitigation can be: retrying after a given time period, or mitigation for `NO_PEER_AVAILABLE` can be used.
*/
NO_STREAM_AVAILABLE = "No stream available",
/**
* The remote peer did not behave as expected. Mitigation for `NO_PEER_AVAILABLE`
* or `DECODE_FAILED` can be used.
*/
NO_RESPONSE = "No response received",
//
// SEND ERRORS SECTION
//
/**
* Failure to protobuf encode the message. This is not recoverable and needs
* further investigation.
*/
ENCODE_FAILED = "Failed to encode",
/**
* The message payload is empty, making the message invalid. Ensure that a non-empty
* payload is set on the outgoing message.
*/
EMPTY_PAYLOAD = "Payload is empty",
/**
* The message size is above the maximum message size allowed on the Waku Network.
* Compressing the message or using an alternative strategy for large messages is recommended.
*/
SIZE_TOO_BIG = "Size is too big",
/**
* The PubsubTopic passed to the send function is not configured on the Waku node.
* Please ensure that the PubsubTopic is used when initializing the Waku node.
*/
TOPIC_NOT_CONFIGURED = "Topic not configured",
/**
* Fails when
*/
STREAM_ABORTED = "Stream aborted",
/**
* General proof generation error message.
* nwaku: https://github.com/waku-org/nwaku/blob/c3cb06ac6c03f0f382d3941ea53b330f6a8dd127/waku/waku_rln_relay/group_manager/group_manager_base.nim#L201C19-L201C42
*/
// LightPush specific errors
ENCODE_FAILED = "Failed to encode",
EMPTY_PAYLOAD = "Payload is empty",
SIZE_TOO_BIG = "Size is too big",
TOPIC_NOT_CONFIGURED = "Topic not configured",
RLN_PROOF_GENERATION = "Proof generation failed",
REMOTE_PEER_REJECTED = "Remote peer rejected",
//
// RECEIVE ERRORS SECTION
//
/**
* The pubsub topic configured on the decoder does not match the pubsub topic setup on the protocol.
* Ensure that the pubsub topic used for decoder creation is the same as the one used for protocol.
*/
// Status code based errors
BAD_REQUEST = "Bad request format",
PAYLOAD_TOO_LARGE = "Message payload exceeds maximum size",
INVALID_MESSAGE = "Message validation failed",
UNSUPPORTED_TOPIC = "Unsupported pubsub topic",
TOO_MANY_REQUESTS = "Rate limit exceeded",
INTERNAL_ERROR = "Internal server error",
UNAVAILABLE = "Service temporarily unavailable",
NO_RLN_PROOF = "RLN proof generation failed",
NO_PEERS = "No relay peers available"
}
// Filter specific errors
export enum FilterError {
// General errors
GENERIC_FAIL = "Generic error",
DECODE_FAILED = "Failed to decode",
NO_PEER_AVAILABLE = "No peer available",
NO_STREAM_AVAILABLE = "No stream available",
NO_RESPONSE = "No response received",
STREAM_ABORTED = "Stream aborted",
// Filter specific errors
REMOTE_PEER_REJECTED = "Remote peer rejected",
TOPIC_NOT_CONFIGURED = "Topic not configured",
SUBSCRIPTION_FAILED = "Subscription failed",
UNSUBSCRIBE_FAILED = "Unsubscribe failed",
PING_FAILED = "Ping failed",
TOPIC_DECODER_MISMATCH = "Topic decoder mismatch",
INVALID_DECODER_TOPICS = "Invalid decoder topics",
SUBSCRIPTION_LIMIT_EXCEEDED = "Subscription limit exceeded",
INVALID_CONTENT_TOPIC = "Invalid content topic",
PUSH_MESSAGE_FAILED = "Push message failed",
EMPTY_MESSAGE = "Empty message received",
MISSING_PUBSUB_TOPIC = "Pubsub topic missing from push message"
}
/**
* The topics passed in the decoders do not match each other, or don't exist at all.
* Ensure that all the pubsub topics used in the decoders are valid and match each other.
*/
// Protocol-specific failure interfaces
export interface LightPushFailure {
error: LightPushError;
peerId?: PeerId;
statusCode?: number;
statusDesc?: string;
}
export interface FilterFailure {
error: FilterError;
peerId?: PeerId;
statusCode?: number;
statusDesc?: string;
requestId?: string;
}
// Protocol-specific result types
export type LightPushCoreResult = ThisOrThat<
"success",
PeerId,
"failure",
LightPushFailure
>;
export type FilterCoreResult = ThisOrThat<
"success",
PeerId,
"failure",
FilterFailure
>;
export type LightPushSDKResult = ThisAndThat<
"successes",
PeerId[],
"failures",
LightPushFailure[]
>;
export type FilterSDKResult = ThisAndThat<
"successes",
PeerId[],
"failures",
FilterFailure[]
>;
// Legacy types for backward compatibility (to be deprecated)
/**
* @deprecated Use LightPushError or FilterError instead
*/
export enum ProtocolError {
GENERIC_FAIL = "Generic error",
REMOTE_PEER_REJECTED = "Remote peer rejected",
DECODE_FAILED = "Failed to decode",
NO_PEER_AVAILABLE = "No peer available",
NO_STREAM_AVAILABLE = "No stream available",
NO_RESPONSE = "No response received",
ENCODE_FAILED = "Failed to encode",
EMPTY_PAYLOAD = "Payload is empty",
SIZE_TOO_BIG = "Size is too big",
TOPIC_NOT_CONFIGURED = "Topic not configured",
STREAM_ABORTED = "Stream aborted",
RLN_PROOF_GENERATION = "Proof generation failed",
TOPIC_DECODER_MISMATCH = "Topic decoder mismatch",
INVALID_DECODER_TOPICS = "Invalid decoder topics"
}
/**
* @deprecated Use LightPushFailure or FilterFailure instead
*/
export interface Failure {
error: ProtocolError;
peerId?: PeerId;
}
/**
* @deprecated Use LightPushCoreResult or FilterCoreResult instead
*/
export type CoreProtocolResult = ThisOrThat<
"success",
PeerId,
@ -235,6 +266,9 @@ export type CoreProtocolResult = ThisOrThat<
Failure
>;
/**
* @deprecated Use LightPushSDKResult or FilterSDKResult instead
*/
export type SDKProtocolResult = ThisAndThat<
"successes",
PeerId[],

View File

@ -1,5 +1,5 @@
import type { IEncoder, IMessage } from "./message.js";
import { SDKProtocolResult } from "./protocols.js";
import { LightPushSDKResult, SDKProtocolResult } from "./protocols.js";
export type ISendOptions = {
/**
@ -22,3 +22,11 @@ export interface ISender {
sendOptions?: ISendOptions
) => Promise<SDKProtocolResult>;
}
export interface ILightPushSender {
send: (
encoder: IEncoder,
message: IMessage,
sendOptions?: ISendOptions
) => Promise<LightPushSDKResult>;
}

View File

@ -3,9 +3,17 @@ import {
ConnectionManager,
createEncoder,
Encoder,
LightPushCodec
LightPushCodec,
LightPushCodecV3
} from "@waku/core";
import { Libp2p, ProtocolError } from "@waku/interfaces";
import {
isSuccess,
Libp2p,
LightPushError,
LightPushStatusCode,
ProtocolError,
toProtocolError
} from "@waku/interfaces";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import sinon, { SinonSpy } from "sinon";
@ -37,8 +45,9 @@ describe("LightPush SDK", () => {
const failures = result.failures ?? [];
expect(failures.length).to.be.eq(1);
expect(failures.some((v) => v.error === ProtocolError.TOPIC_NOT_CONFIGURED))
.to.be.true;
expect(
failures.some((v) => v.error === LightPushError.TOPIC_NOT_CONFIGURED)
).to.be.true;
});
it("should fail to send if no connected peers found", async () => {
@ -48,8 +57,8 @@ describe("LightPush SDK", () => {
const failures = result.failures ?? [];
expect(failures.length).to.be.eq(1);
expect(failures.some((v) => v.error === ProtocolError.NO_PEER_AVAILABLE)).to
.be.true;
expect(failures.some((v) => v.error === LightPushError.NO_PEER_AVAILABLE))
.to.be.true;
});
it("should send to specified number of peers of used peers", async () => {
@ -135,6 +144,72 @@ describe("LightPush SDK", () => {
expect(result.successes?.length).to.be.eq(1);
expect(result.failures?.length).to.be.eq(1);
});
describe("v3 protocol support", () => {
it("should work with v3 peers", async () => {
libp2p = mockLibp2p({
peers: [mockV3Peer("1"), mockV3Peer("2")]
});
expect(isSuccess(LightPushStatusCode.SUCCESS)).to.be.true;
expect(isSuccess(LightPushStatusCode.BAD_REQUEST)).to.be.false;
expect(toProtocolError(LightPushStatusCode.PAYLOAD_TOO_LARGE)).to.eq(
ProtocolError.SIZE_TOO_BIG
);
});
it("should work with mixed v2 and v3 peers", async () => {
libp2p = mockLibp2p({
peers: [mockV2AndV3Peer("1"), mockPeer("2"), mockV3Peer("3")]
});
// Mock responses for different protocol versions
const v3Response = mockV3SuccessResponse(5);
const v2Response = mockV2SuccessResponse();
const v3ErrorResponse = mockV3ErrorResponse(
LightPushStatusCode.PAYLOAD_TOO_LARGE
);
const v2ErrorResponse = mockV2ErrorResponse("Message too large");
expect(v3Response.statusCode).to.eq(LightPushStatusCode.SUCCESS);
expect(v3Response.relayPeerCount).to.eq(5);
expect(v2Response.isSuccess).to.be.true;
expect(v3ErrorResponse.statusCode).to.eq(
LightPushStatusCode.PAYLOAD_TOO_LARGE
);
expect(v2ErrorResponse.isSuccess).to.be.false;
});
it("should handle v3 RLN errors", async () => {
const v3RLNError = mockV3RLNErrorResponse();
const v2RLNError = mockV2RLNErrorResponse();
expect(v3RLNError.statusCode).to.eq(LightPushStatusCode.NO_RLN_PROOF);
expect(v3RLNError.statusDesc).to.include("RLN proof generation failed");
expect(v2RLNError.info).to.include("RLN proof generation failed");
});
it("should validate status codes", async () => {
const statusCodes = [
LightPushStatusCode.SUCCESS,
LightPushStatusCode.BAD_REQUEST,
LightPushStatusCode.PAYLOAD_TOO_LARGE,
LightPushStatusCode.INVALID_MESSAGE,
LightPushStatusCode.UNSUPPORTED_TOPIC,
LightPushStatusCode.TOO_MANY_REQUESTS,
LightPushStatusCode.INTERNAL_ERROR,
LightPushStatusCode.UNAVAILABLE,
LightPushStatusCode.NO_RLN_PROOF,
LightPushStatusCode.NO_PEERS
];
statusCodes.forEach((code) => {
const protocolError = toProtocolError(code);
expect(protocolError).to.be.a("string");
expect(Object.values(ProtocolError)).to.include(protocolError);
});
});
});
});
type MockLibp2pOptions = {
@ -144,7 +219,16 @@ type MockLibp2pOptions = {
function mockLibp2p(options?: MockLibp2pOptions): Libp2p {
const peers = options?.peers || [];
const peerStore = {
get: (id: any) => Promise.resolve(peers.find((p) => p.id === id))
get: (id: any) => {
const peer = peers.find((p) => p.id === id);
if (peer) {
return Promise.resolve({
...peer,
protocols: peer.protocols || [LightPushCodec]
});
}
return Promise.resolve(undefined);
}
};
return {
@ -191,9 +275,89 @@ function mockLightPush(options: MockLightPushOptions): LightPush {
return lightPush;
}
function mockPeer(id: string): Peer {
function mockPeer(id: string, protocols: string[] = [LightPushCodec]): Peer {
return {
id,
protocols: [LightPushCodec]
protocols
} as unknown as Peer;
}
// V3-specific mock functions
function mockV3Peer(id: string): Peer {
return mockPeer(id, [LightPushCodecV3]);
}
function mockV2AndV3Peer(id: string): Peer {
return mockPeer(id, [LightPushCodec, LightPushCodecV3]);
}
function mockV3SuccessResponse(relayPeerCount?: number): {
statusCode: LightPushStatusCode;
statusDesc: string;
relayPeerCount?: number;
isSuccess: boolean;
} {
return {
statusCode: LightPushStatusCode.SUCCESS,
statusDesc: "Message sent successfully",
relayPeerCount,
isSuccess: true
};
}
function mockV3ErrorResponse(
statusCode: LightPushStatusCode,
statusDesc?: string
): {
statusCode: LightPushStatusCode;
statusDesc: string;
isSuccess: boolean;
} {
return {
statusCode,
statusDesc: statusDesc || "Error occurred",
isSuccess: false
};
}
function mockV2SuccessResponse(): {
isSuccess: boolean;
info: string;
} {
return {
isSuccess: true,
info: "Message sent successfully"
};
}
function mockV2ErrorResponse(info?: string): {
isSuccess: boolean;
info: string;
} {
return {
isSuccess: false,
info: info || "Error occurred"
};
}
function mockV3RLNErrorResponse(): {
statusCode: LightPushStatusCode;
statusDesc: string;
isSuccess: boolean;
} {
return {
statusCode: LightPushStatusCode.NO_RLN_PROOF,
statusDesc: "RLN proof generation failed",
isSuccess: false
};
}
function mockV2RLNErrorResponse(): {
isSuccess: boolean;
info: string;
} {
return {
isSuccess: false,
info: "RLN proof generation failed"
};
}

View File

@ -1,17 +1,17 @@
import type { PeerId } from "@libp2p/interface";
import { ConnectionManager, LightPushCore } from "@waku/core";
import {
type CoreProtocolResult,
Failure,
type IEncoder,
ILightPush,
type IMessage,
type ISendOptions,
type Libp2p,
type LightPushCoreResult,
LightPushError,
type LightPushFailure,
LightPushProtocolOptions,
ProtocolError,
Protocols,
SDKProtocolResult
type LightPushSDKResult,
Protocols
} from "@waku/interfaces";
import { Logger } from "@waku/utils";
@ -74,7 +74,7 @@ export class LightPush implements ILightPush {
encoder: IEncoder,
message: IMessage,
options: ISendOptions = {}
): Promise<SDKProtocolResult> {
): Promise<LightPushSDKResult> {
options = {
...this.config,
...options
@ -89,7 +89,7 @@ export class LightPush implements ILightPush {
successes: [],
failures: [
{
error: ProtocolError.TOPIC_NOT_CONFIGURED
error: LightPushError.TOPIC_NOT_CONFIGURED
}
]
};
@ -100,40 +100,40 @@ export class LightPush implements ILightPush {
pubsubTopic: encoder.pubsubTopic
});
const coreResults: CoreProtocolResult[] =
const coreResults: LightPushCoreResult[] =
peerIds?.length > 0
? await Promise.all(
peerIds.map((peerId) =>
this.protocol.send(encoder, message, peerId).catch((_e) => ({
success: null,
failure: {
error: ProtocolError.GENERIC_FAIL
error: LightPushError.GENERIC_FAIL
}
}))
)
)
: [];
const results: SDKProtocolResult = coreResults.length
const results: LightPushSDKResult = coreResults.length
? {
successes: coreResults
.filter((v) => v.success)
.map((v) => v.success) as PeerId[],
failures: coreResults
.filter((v) => v.failure)
.map((v) => v.failure) as Failure[]
.map((v) => v.failure) as LightPushFailure[]
}
: {
successes: [],
failures: [
{
error: ProtocolError.NO_PEER_AVAILABLE
error: LightPushError.NO_PEER_AVAILABLE
}
]
};
if (options.autoRetry && results.successes.length === 0) {
const sendCallback = (peerId: PeerId): Promise<CoreProtocolResult> =>
const sendCallback = (peerId: PeerId): Promise<LightPushCoreResult> =>
this.protocol.send(encoder, message, peerId);
this.retryManager.push(
sendCallback.bind(this),

View File

@ -1,6 +1,7 @@
import type { PeerId } from "@libp2p/interface";
import {
type CoreProtocolResult,
type LightPushCoreResult,
LightPushError,
ProtocolError,
Protocols
} from "@waku/interfaces";
@ -53,7 +54,7 @@ describe("RetryManager", () => {
it("should process tasks in queue", async () => {
const successCallback = sinon.spy(
async (peerId: PeerId): Promise<CoreProtocolResult> => ({
async (peerId: PeerId): Promise<LightPushCoreResult> => ({
success: peerId,
failure: null
})
@ -106,9 +107,9 @@ describe("RetryManager", () => {
it("should retry failed tasks", async () => {
const failingCallback = sinon.spy(
async (): Promise<CoreProtocolResult> => ({
async (): Promise<LightPushCoreResult> => ({
success: null,
failure: { error: "test error" as any }
failure: { error: LightPushError.GENERIC_FAIL }
})
);
@ -129,7 +130,7 @@ describe("RetryManager", () => {
});
it("should request peer renewal on specific errors", async () => {
const errorCallback = sinon.spy(async (): Promise<CoreProtocolResult> => {
const errorCallback = sinon.spy(async (): Promise<LightPushCoreResult> => {
throw new Error(ProtocolError.NO_PEER_AVAILABLE);
});
@ -149,7 +150,7 @@ describe("RetryManager", () => {
});
it("should handle task timeouts", async () => {
const slowCallback = sinon.spy(async (): Promise<CoreProtocolResult> => {
const slowCallback = sinon.spy(async (): Promise<LightPushCoreResult> => {
await new Promise((resolve) => setTimeout(resolve, 15000));
return { success: mockPeerId, failure: null };
});
@ -168,9 +169,11 @@ describe("RetryManager", () => {
});
it("should not execute task if max attempts is 0", async () => {
const failingCallback = sinon.spy(async (): Promise<CoreProtocolResult> => {
throw new Error("test error" as any);
});
const failingCallback = sinon.spy(
async (): Promise<LightPushCoreResult> => {
throw new Error("test error" as any);
}
);
const task = {
callback: failingCallback,
@ -203,7 +206,7 @@ describe("RetryManager", () => {
called++;
return Promise.resolve({
success: null,
failure: { error: ProtocolError.GENERIC_FAIL }
failure: { error: LightPushError.GENERIC_FAIL }
});
});
retryManager.push(failCallback, 2, "test-topic");

View File

@ -1,5 +1,5 @@
import type { PeerId } from "@libp2p/interface";
import { type CoreProtocolResult, Protocols } from "@waku/interfaces";
import { type LightPushCoreResult, Protocols } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import type { PeerManager } from "../peer_manager/index.js";
@ -11,7 +11,7 @@ type RetryManagerConfig = {
peerManager: PeerManager;
};
type AttemptCallback = (peerId: PeerId) => Promise<CoreProtocolResult>;
type AttemptCallback = (peerId: PeerId) => Promise<LightPushCoreResult>;
export type ScheduledTask = {
maxAttempts: number;
@ -119,7 +119,13 @@ export class RetryManager {
task.callback(peerId)
]);
if (response?.failure) {
// If timeout resolves first, response will be void (undefined)
// In this case, we should treat it as a timeout error
if (response === undefined) {
throw new Error("Task timeout");
}
if (response.failure) {
throw Error(response.failure.error);
}