diff --git a/packages/core/src/lib/light_push/light_push.ts b/packages/core/src/lib/light_push/light_push.ts index ca132a5607..738308b809 100644 --- a/packages/core/src/lib/light_push/light_push.ts +++ b/packages/core/src/lib/light_push/light_push.ts @@ -20,7 +20,7 @@ import { Uint8ArrayList } from "uint8arraylist"; import { BaseProtocol } from "../base_protocol.js"; import { PushRpc } from "./push_rpc.js"; -import { isRLNResponseError, matchRLNErrorMessage } from "./utils.js"; +import { isRLNResponseError } from "./utils.js"; const log = new Logger("light-push"); @@ -120,11 +120,12 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore { async (source) => await all(source) ); } catch (err) { + // can fail only because of `stream` abortion log.error("Failed to send waku light push request", err); return { success: null, failure: { - error: ProtocolError.GENERIC_FAIL, + error: ProtocolError.STREAM_ABORTED, peerId: peerId } }; @@ -161,12 +162,11 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore { } if (isRLNResponseError(response.info)) { - const rlnErrorCase = matchRLNErrorMessage(response.info!); - log.error("Remote peer rejected the message: ", rlnErrorCase); + log.error("Remote peer fault: RLN generation"); return { success: null, failure: { - error: rlnErrorCase, + error: ProtocolError.RLN_PROOF_GENERATION, peerId: peerId } }; diff --git a/packages/core/src/lib/light_push/utils.ts b/packages/core/src/lib/light_push/utils.ts index 0867d02f82..9f33572cff 100644 --- a/packages/core/src/lib/light_push/utils.ts +++ b/packages/core/src/lib/light_push/utils.ts @@ -1,31 +1,23 @@ -import { ProtocolError } from "@waku/interfaces"; - // should match nwaku // https://github.com/waku-org/nwaku/blob/c3cb06ac6c03f0f382d3941ea53b330f6a8dd127/waku/waku_rln_relay/rln_relay.nim#L309 // https://github.com/waku-org/nwaku/blob/c3cb06ac6c03f0f382d3941ea53b330f6a8dd127/tests/waku_rln_relay/rln/waku_rln_relay_utils.nim#L20 const RLN_GENERATION_PREFIX_ERROR = "could not generate rln-v2 proof"; +const RLN_MESSAGE_ID_PREFIX_ERROR = + "could not get new message id to generate an rln proof"; + +// rare case on nwaku side +// https://github.com/waku-org/nwaku/blob/a4e92a3d02448fd708857b7b6cac2a7faa7eb4f9/waku/waku_lightpush/callbacks.nim#L49 +// https://github.com/waku-org/nwaku/blob/a4e92a3d02448fd708857b7b6cac2a7faa7eb4f9/waku/node/waku_node.nim#L1117 +const RLN_REMOTE_VALIDATION = "RLN validation failed"; export const isRLNResponseError = (info?: string): boolean => { if (!info) { return false; } - return info.includes(RLN_GENERATION_PREFIX_ERROR); -}; - -export const matchRLNErrorMessage = (info: string): ProtocolError => { - const rlnErrorMap: { [key: string]: ProtocolError } = { - [ProtocolError.RLN_IDENTITY_MISSING]: ProtocolError.RLN_IDENTITY_MISSING, - [ProtocolError.RLN_MEMBERSHIP_INDEX]: ProtocolError.RLN_MEMBERSHIP_INDEX, - [ProtocolError.RLN_LIMIT_MISSING]: ProtocolError.RLN_LIMIT_MISSING - }; - - const infoLowerCase = info.toLowerCase(); - for (const errorKey in rlnErrorMap) { - if (infoLowerCase.includes(errorKey.toLowerCase())) { - return rlnErrorMap[errorKey]; - } - } - - return ProtocolError.RLN_PROOF_GENERATION; + return ( + info.includes(RLN_GENERATION_PREFIX_ERROR) || + info.includes(RLN_MESSAGE_ID_PREFIX_ERROR) || + info.includes(RLN_REMOTE_VALIDATION) + ); }; diff --git a/packages/interfaces/src/light_push.ts b/packages/interfaces/src/light_push.ts index b9350c6b6b..6a9eb9d83d 100644 --- a/packages/interfaces/src/light_push.ts +++ b/packages/interfaces/src/light_push.ts @@ -1,4 +1,23 @@ import { IBaseProtocolCore } from "./protocols.js"; -import type { ISender } from "./sender.js"; +import type { ISender, ISendOptions } from "./sender.js"; -export type ILightPush = ISender & { protocol: IBaseProtocolCore }; +export type LightPushProtocolOptions = ISendOptions & { + /** + * The interval in milliseconds to wait before retrying a failed push. + * @default 1000 + */ + retryIntervalMs: number; + + /** + * Number of peers to send message to. + * + * @default 1 + */ + numPeersToUse?: number; +}; + +export type ILightPush = ISender & { + start: () => void; + stop: () => void; + protocol: IBaseProtocolCore; +}; diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 40fc66542d..67ec66e9d1 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -4,9 +4,10 @@ import type { PeerId } from "@libp2p/interface"; import type { ConnectionManagerOptions } from "./connection_manager.js"; import type { FilterProtocolOptions } from "./filter.js"; import type { CreateLibp2pOptions } from "./libp2p.js"; +import type { LightPushProtocolOptions } from "./light_push.js"; import type { IDecodedMessage } from "./message.js"; -import { ThisAndThat, ThisOrThat } from "./misc.js"; -import { AutoSharding, StaticSharding } from "./sharding.js"; +import type { ThisAndThat, ThisOrThat } from "./misc.js"; +import type { AutoSharding, StaticSharding } from "./sharding.js"; import type { StoreProtocolOptions } from "./store.js"; export enum Protocols { @@ -62,9 +63,7 @@ export type CreateNodeOptions = { /** * Number of peers to connect to, for the usage of the protocol. - * This is used by: - * - Light Push to send messages, - * - Filter to retrieve messages. + * This is used by Filter to retrieve messages. * * @default 2. */ @@ -101,8 +100,15 @@ export type CreateNodeOptions = { /** * Options for the Store protocol. + * If not specified - default values are applied. */ store?: Partial; + + /** + * Options for the LightPush protocol. + * If not specified - default values are applied. + */ + lightPush?: Partial; }; export type Callback = ( @@ -110,43 +116,27 @@ export type Callback = ( ) => void | Promise; export enum ProtocolError { - /** Could not determine the origin of the fault. Best to check connectivity and try again */ - GENERIC_FAIL = "Generic error", + // + // GENERAL ERRORS SECTION + // /** - * Failure to protobuf encode the message. This is not recoverable and needs - * further investigation. + * Could not determine the origin of the fault. Best to check connectivity and try again + * */ + GENERIC_FAIL = "Generic error", + + /** + * The remote peer rejected the message. Information provided by the remote peer + * is logged. Review message validity, or mitigation for `NO_PEER_AVAILABLE` + * or `DECODE_FAILED` can be used. */ - ENCODE_FAILED = "Failed to encode", + REMOTE_PEER_REJECTED = "Remote peer rejected", + /** * Failure to protobuf decode the message. May be due to a remote peer issue, * ensuring that messages are sent via several peer enable mitigation of this error. */ DECODE_FAILED = "Failed to decode", - /** - * The message payload is empty, making the message invalid. Ensure that a non-empty - * payload is set on the outgoing message. - */ - EMPTY_PAYLOAD = "Payload is empty", - /** - * The message size is above the maximum message size allowed on the Waku Network. - * Compressing the message or using an alternative strategy for large messages is recommended. - */ - SIZE_TOO_BIG = "Size is too big", - /** - * The PubsubTopic passed to the send function is not configured on the Waku node. - * Please ensure that the PubsubTopic is used when initializing the Waku node. - */ - TOPIC_NOT_CONFIGURED = "Topic not configured", - /** - * The pubsub topic configured on the decoder does not match the pubsub topic setup on the protocol. - * Ensure that the pubsub topic used for decoder creation is the same as the one used for protocol. - */ - TOPIC_DECODER_MISMATCH = "Topic decoder mismatch", - /** - * The topics passed in the decoders do not match each other, or don't exist at all. - * Ensure that all the pubsub topics used in the decoders are valid and match each other. - */ - INVALID_DECODER_TOPICS = "Invalid decoder topics", + /** * Failure to find a peer with suitable protocols. This may due to a connection issue. * Mitigation can be: retrying after a given time period, display connectivity issue @@ -154,47 +144,71 @@ export enum ProtocolError { * on the connection manager before retrying. */ NO_PEER_AVAILABLE = "No peer available", + /** * Failure to find a stream to the peer. This may be because the connection with the peer is not still alive. * Mitigation can be: retrying after a given time period, or mitigation for `NO_PEER_AVAILABLE` can be used. */ NO_STREAM_AVAILABLE = "No stream available", + /** * The remote peer did not behave as expected. Mitigation for `NO_PEER_AVAILABLE` * or `DECODE_FAILED` can be used. */ NO_RESPONSE = "No response received", + + // + // SEND ERRORS SECTION + // /** - * The remote peer rejected the message. Information provided by the remote peer - * is logged. Review message validity, or mitigation for `NO_PEER_AVAILABLE` - * or `DECODE_FAILED` can be used. + * Failure to protobuf encode the message. This is not recoverable and needs + * further investigation. */ - REMOTE_PEER_REJECTED = "Remote peer rejected", + ENCODE_FAILED = "Failed to encode", + /** - * The protocol request timed out without a response. This may be due to a connection issue. - * Mitigation can be: retrying after a given time period + * The message payload is empty, making the message invalid. Ensure that a non-empty + * payload is set on the outgoing message. */ - REQUEST_TIMEOUT = "Request timeout", + EMPTY_PAYLOAD = "Payload is empty", + /** - * Missing credentials info message. - * nwaku: https://github.com/waku-org/nwaku/blob/c3cb06ac6c03f0f382d3941ea53b330f6a8dd127/waku/waku_rln_relay/group_manager/group_manager_base.nim#L186 + * The message size is above the maximum message size allowed on the Waku Network. + * Compressing the message or using an alternative strategy for large messages is recommended. */ - RLN_IDENTITY_MISSING = "Identity credentials are not set", + SIZE_TOO_BIG = "Size is too big", + /** - * Membership index missing info message. - * nwaku: https://github.com/waku-org/nwaku/blob/c3cb06ac6c03f0f382d3941ea53b330f6a8dd127/waku/waku_rln_relay/group_manager/group_manager_base.nim#L188 + * The PubsubTopic passed to the send function is not configured on the Waku node. + * Please ensure that the PubsubTopic is used when initializing the Waku node. */ - RLN_MEMBERSHIP_INDEX = "Membership index is not set", + TOPIC_NOT_CONFIGURED = "Topic not configured", + /** - * Message limit is missing. - * nwaku: https://github.com/waku-org/nwaku/blob/c3cb06ac6c03f0f382d3941ea53b330f6a8dd127/waku/waku_rln_relay/group_manager/group_manager_base.nim#L190 + * Fails when */ - RLN_LIMIT_MISSING = "User message limit is not set", + STREAM_ABORTED = "Stream aborted", + /** * General proof generation error message. * nwaku: https://github.com/waku-org/nwaku/blob/c3cb06ac6c03f0f382d3941ea53b330f6a8dd127/waku/waku_rln_relay/group_manager/group_manager_base.nim#L201C19-L201C42 */ - RLN_PROOF_GENERATION = "Proof generation failed" + RLN_PROOF_GENERATION = "Proof generation failed", + + // + // RECEIVE ERRORS SECTION + // + /** + * The pubsub topic configured on the decoder does not match the pubsub topic setup on the protocol. + * Ensure that the pubsub topic used for decoder creation is the same as the one used for protocol. + */ + TOPIC_DECODER_MISMATCH = "Topic decoder mismatch", + + /** + * The topics passed in the decoders do not match each other, or don't exist at all. + * Ensure that all the pubsub topics used in the decoders are valid and match each other. + */ + INVALID_DECODER_TOPICS = "Invalid decoder topics" } export interface Failure { diff --git a/packages/interfaces/src/sender.ts b/packages/interfaces/src/sender.ts index 19270d5ebd..0c924b3f3f 100644 --- a/packages/interfaces/src/sender.ts +++ b/packages/interfaces/src/sender.ts @@ -1,12 +1,13 @@ import type { IEncoder, IMessage } from "./message.js"; import { SDKProtocolResult } from "./protocols.js"; -export type ISenderOptions = { +export type ISendOptions = { /** * Enables retry of a message that was failed to be sent. - * @default false + * @default true */ autoRetry?: boolean; + /** * Sets number of attempts if `autoRetry` is enabled. * @default 3 @@ -18,6 +19,6 @@ export interface ISender { send: ( encoder: IEncoder, message: IMessage, - sendOptions?: ISenderOptions + sendOptions?: ISendOptions ) => Promise; } diff --git a/packages/relay/src/relay.ts b/packages/relay/src/relay.ts index 5f46640f98..363ce9bc44 100644 --- a/packages/relay/src/relay.ts +++ b/packages/relay/src/relay.ts @@ -6,7 +6,7 @@ import { } from "@chainsafe/libp2p-gossipsub"; import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types"; import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types"; -import type { PubSub as Libp2pPubsub, PeerId } from "@libp2p/interface"; +import type { PubSub as Libp2pPubsub } from "@libp2p/interface"; import { sha256 } from "@noble/hashes/sha256"; import { ActiveSubscriptions, @@ -123,13 +123,11 @@ export class Relay implements IRelay { encoder: IEncoder, message: IMessage ): Promise { - const successes: PeerId[] = []; - const { pubsubTopic } = encoder; if (!this.pubsubTopics.has(pubsubTopic)) { log.error("Failed to send waku relay: topic not configured"); return { - successes, + successes: [], failures: [ { error: ProtocolError.TOPIC_NOT_CONFIGURED @@ -142,7 +140,7 @@ export class Relay implements IRelay { if (!msg) { log.error("Failed to encode message, aborting publish"); return { - successes, + successes: [], failures: [ { error: ProtocolError.ENCODE_FAILED @@ -154,7 +152,7 @@ export class Relay implements IRelay { if (!isWireSizeUnderCap(msg)) { log.error("Failed to send waku relay: message is bigger that 1MB"); return { - successes, + successes: [], failures: [ { error: ProtocolError.SIZE_TOO_BIG diff --git a/packages/sdk/src/filter/filter.ts b/packages/sdk/src/filter/filter.ts index 351172150d..3325703d16 100644 --- a/packages/sdk/src/filter/filter.ts +++ b/packages/sdk/src/filter/filter.ts @@ -167,7 +167,7 @@ export class Filter implements IFilter { ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics); - const peerIds = await this.peerManager.getPeers(); + const peerIds = this.peerManager.getPeers(); if (peerIds.length === 0) { return { error: ProtocolError.NO_PEER_AVAILABLE, diff --git a/packages/sdk/src/filter/subscription_monitor.ts b/packages/sdk/src/filter/subscription_monitor.ts index 116a8e8022..da6df2c004 100644 --- a/packages/sdk/src/filter/subscription_monitor.ts +++ b/packages/sdk/src/filter/subscription_monitor.ts @@ -110,7 +110,7 @@ export class SubscriptionMonitor { */ public async getPeers(): Promise { if (!this.isStarted) { - this.peerIds = await this.peerManager.getPeers(); + this.peerIds = this.peerManager.getPeers(); } return this.peerIds; @@ -221,7 +221,7 @@ export class SubscriptionMonitor { return; } - this.peerIds = await this.peerManager.getPeers(); + this.peerIds = this.peerManager.getPeers(); await Promise.all(this.peerIds.map((id) => this.subscribe(id))); } @@ -232,7 +232,7 @@ export class SubscriptionMonitor { return; } - this.peerIds = await this.peerManager.getPeers(); + this.peerIds = this.peerManager.getPeers(); // we trigger subscribe for peer that was used before // it will expectedly fail and we will initiate addition of a new peer @@ -257,7 +257,7 @@ export class SubscriptionMonitor { return; } - peerId = await this.peerManager.requestRenew(peerId); + peerId = this.peerManager.requestRenew(peerId); } } @@ -269,7 +269,7 @@ export class SubscriptionMonitor { const response = await this.filter.ping(peerId); if (response.failure && renewOnFirstFail) { - const newPeer = await this.peerManager.requestRenew(peerId); + const newPeer = this.peerManager.requestRenew(peerId); await this.subscribe(newPeer); return; } @@ -286,7 +286,7 @@ export class SubscriptionMonitor { const madeAttempts = this.pingFailedAttempts.get(peerIdStr) || 0; if (madeAttempts >= this.config.pingsBeforePeerRenewed) { - const newPeer = await this.peerManager.requestRenew(peerId); + const newPeer = this.peerManager.requestRenew(peerId); await this.subscribe(newPeer); } } diff --git a/packages/sdk/src/light_push/light_push.spec.ts b/packages/sdk/src/light_push/light_push.spec.ts index b56f46a256..d566380212 100644 --- a/packages/sdk/src/light_push/light_push.spec.ts +++ b/packages/sdk/src/light_push/light_push.spec.ts @@ -8,7 +8,7 @@ import { import { Libp2p, ProtocolError } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; -import sinon from "sinon"; +import sinon, { SinonSpy } from "sinon"; import { PeerManager } from "../peer_manager/index.js"; @@ -60,7 +60,7 @@ describe("LightPush SDK", () => { lightPush = mockLightPush({ libp2p, numPeersToUse: 2 }); let sendSpy = sinon.spy( (_encoder: any, _message: any, peerId: PeerId) => - ({ success: peerId }) as any + Promise.resolve({ success: peerId }) as any ); lightPush.protocol.send = sendSpy; @@ -68,39 +68,62 @@ describe("LightPush SDK", () => { payload: utf8ToBytes("test") }); - expect(sendSpy.calledTwice).to.be.true; - expect(result.successes?.length).to.be.eq(2); + expect(sendSpy.calledTwice, "1").to.be.true; + expect(result.successes?.length, "2").to.be.eq(2); // check if setting another value works lightPush = mockLightPush({ libp2p, numPeersToUse: 3 }); sendSpy = sinon.spy( (_encoder: any, _message: any, peerId: PeerId) => - ({ success: peerId }) as any + Promise.resolve({ success: peerId }) as any ); lightPush.protocol.send = sendSpy; result = await lightPush.send(encoder, { payload: utf8ToBytes("test") }); - expect(sendSpy.calledThrice).to.be.true; - expect(result.successes?.length).to.be.eq(3); + expect(sendSpy.calledThrice, "3").to.be.true; + expect(result.successes?.length, "4").to.be.eq(3); }); - it("should retry on failure if specified", async () => { + it("should retry on complete failure if specified", async () => { libp2p = mockLibp2p({ peers: [mockPeer("1"), mockPeer("2")] }); lightPush = mockLightPush({ libp2p }); - let sendSpy = sinon.spy((_encoder: any, _message: any, peerId: PeerId) => { - if (peerId.toString() === "1") { - return { success: peerId }; - } - - return { failure: { error: "problem" } }; - }); + const sendSpy = sinon.spy((_encoder: any, _message: any, _peerId: PeerId) => + Promise.resolve({ failure: { error: "problem" } }) + ); lightPush.protocol.send = sendSpy as any; - const attemptRetriesSpy = sinon.spy(lightPush["attemptRetries"]); - lightPush["attemptRetries"] = attemptRetriesSpy; + + const retryPushSpy = (lightPush as any)["retryManager"].push as SinonSpy; + const result = await lightPush.send( + encoder, + { payload: utf8ToBytes("test") }, + { autoRetry: true } + ); + + expect(retryPushSpy.callCount).to.be.eq(1); + expect(result.failures?.length).to.be.eq(2); + }); + + it("should not retry if at least one success", async () => { + libp2p = mockLibp2p({ + peers: [mockPeer("1"), mockPeer("2")] + }); + + lightPush = mockLightPush({ libp2p }); + const sendSpy = sinon.spy( + (_encoder: any, _message: any, peerId: PeerId) => { + if (peerId.toString() === "1") { + return Promise.resolve({ success: peerId }); + } + + return Promise.resolve({ failure: { error: "problem" } }); + } + ); + lightPush.protocol.send = sendSpy as any; + const retryPushSpy = (lightPush as any)["retryManager"].push as SinonSpy; const result = await lightPush.send( encoder, @@ -108,19 +131,9 @@ describe("LightPush SDK", () => { { autoRetry: true } ); - expect(attemptRetriesSpy.callCount).to.be.eq(1); + expect(retryPushSpy.callCount).to.be.eq(0); expect(result.successes?.length).to.be.eq(1); expect(result.failures?.length).to.be.eq(1); - - sendSpy = sinon.spy(() => ({ failure: { error: "problem" } })) as any; - await lightPush["attemptRetries"](sendSpy as any); - - expect(sendSpy.callCount).to.be.eq(3); - - sendSpy = sinon.spy(() => ({ failure: { error: "problem" } })) as any; - await lightPush["attemptRetries"](sendSpy as any, 2); - - expect(sendSpy.callCount).to.be.eq(2); }); }); @@ -154,7 +167,7 @@ type MockLightPushOptions = { }; function mockLightPush(options: MockLightPushOptions): LightPush { - return new LightPush({ + const lightPush = new LightPush({ connectionManager: { pubsubTopics: options.pubsubTopics || [PUBSUB_TOPIC] } as ConnectionManager, @@ -164,8 +177,17 @@ function mockLightPush(options: MockLightPushOptions): LightPush { .getPeers() .slice(0, options.numPeersToUse || options.libp2p.getPeers().length) } as unknown as PeerManager, - libp2p: options.libp2p + libp2p: options.libp2p, + options: { + numPeersToUse: options.numPeersToUse + } }); + + (lightPush as any)["retryManager"] = { + push: sinon.spy() + }; + + return lightPush; } function mockPeer(id: string): Peer { diff --git a/packages/sdk/src/light_push/light_push.ts b/packages/sdk/src/light_push/light_push.ts index c4339e9641..476b1b6c90 100644 --- a/packages/sdk/src/light_push/light_push.ts +++ b/packages/sdk/src/light_push/light_push.ts @@ -6,59 +6,84 @@ import { type IEncoder, ILightPush, type IMessage, - type ISenderOptions, + type ISendOptions, type Libp2p, + LightPushProtocolOptions, ProtocolError, SDKProtocolResult } from "@waku/interfaces"; -import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils"; +import { Logger } from "@waku/utils"; import { PeerManager } from "../peer_manager/index.js"; +import { RetryManager } from "./retry_manager.js"; + const log = new Logger("sdk:light-push"); const DEFAULT_MAX_ATTEMPTS = 3; -const DEFAULT_SEND_OPTIONS: ISenderOptions = { - autoRetry: false, - maxAttempts: DEFAULT_MAX_ATTEMPTS +const DEFAULT_SEND_OPTIONS: LightPushProtocolOptions = { + autoRetry: true, + retryIntervalMs: 1000, + maxAttempts: DEFAULT_MAX_ATTEMPTS, + numPeersToUse: 1 }; -type RetryCallback = (peerId: PeerId) => Promise; - type LightPushConstructorParams = { connectionManager: ConnectionManager; peerManager: PeerManager; libp2p: Libp2p; + options?: Partial; }; export class LightPush implements ILightPush { + private readonly config: LightPushProtocolOptions; + private readonly retryManager: RetryManager; private peerManager: PeerManager; public readonly protocol: LightPushCore; public constructor(params: LightPushConstructorParams) { + this.config = { + ...DEFAULT_SEND_OPTIONS, + ...(params.options || {}) + } as LightPushProtocolOptions; + this.peerManager = params.peerManager; this.protocol = new LightPushCore( params.connectionManager.pubsubTopics, params.libp2p ); + this.retryManager = new RetryManager({ + peerManager: params.peerManager, + retryIntervalMs: this.config.retryIntervalMs + }); + } + + public start(): void { + this.retryManager.start(); + } + + public stop(): void { + this.retryManager.stop(); } public async send( encoder: IEncoder, message: IMessage, - options: ISenderOptions = DEFAULT_SEND_OPTIONS + options: ISendOptions = {} ): Promise { - const successes: PeerId[] = []; - const failures: Failure[] = []; + options = { + ...this.config, + ...options + }; const { pubsubTopic } = encoder; - try { - ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics); - } catch (error) { - log.error("Failed to send waku light push: pubsub topic not configured"); + + log.info("send: attempting to send a message to pubsubTopic:", pubsubTopic); + + if (!this.protocol.pubsubTopics.includes(pubsubTopic)) { return { - successes, + successes: [], failures: [ { error: ProtocolError.TOPIC_NOT_CONFIGURED @@ -67,10 +92,13 @@ export class LightPush implements ILightPush { }; } - const peerIds = await this.peerManager.getPeers(); + const peerIds = this.peerManager + .getPeers() + .slice(0, this.config.numPeersToUse); + if (peerIds.length === 0) { return { - successes, + successes: [], failures: [ { error: ProtocolError.NO_PEER_AVAILABLE @@ -79,65 +107,35 @@ export class LightPush implements ILightPush { }; } - const results = await Promise.allSettled( - peerIds.map((id) => this.protocol.send(encoder, message, id)) + const coreResults: CoreProtocolResult[] = await Promise.all( + peerIds.map((peerId) => + this.protocol.send(encoder, message, peerId).catch((_e) => ({ + success: null, + failure: { + error: ProtocolError.GENERIC_FAIL + } + })) + ) ); - for (const result of results) { - if (result.status !== "fulfilled") { - log.error("Failed unexpectedly while sending:", result.reason); - failures.push({ error: ProtocolError.GENERIC_FAIL }); - continue; - } - - const { failure, success } = result.value; - - if (success) { - successes.push(success); - continue; - } - - if (failure) { - failures.push(failure); - - if (options?.autoRetry) { - void this.attemptRetries( - (id: PeerId) => this.protocol.send(encoder, message, id), - options.maxAttempts - ); - } - } - } - - return { - successes, - failures + const results: SDKProtocolResult = { + successes: coreResults + .filter((v) => v.success) + .map((v) => v.success) as PeerId[], + failures: coreResults + .filter((v) => v.failure) + .map((v) => v.failure) as Failure[] }; - } - private async attemptRetries( - fn: RetryCallback, - maxAttempts?: number - ): Promise { - maxAttempts = maxAttempts || DEFAULT_MAX_ATTEMPTS; - const peerIds = await this.peerManager.getPeers(); - - if (peerIds.length === 0) { - log.warn("Cannot retry with no connected peers."); - return; - } - - for (let i = 0; i < maxAttempts; i++) { - const id = peerIds[i % peerIds.length]; // always present as we checked for the length already - const response = await fn(id); - - if (response.success) { - return; - } - - log.info( - `Attempted retry for peer:${id} failed with:${response?.failure?.error}` + if (options.autoRetry && results.successes.length === 0) { + const sendCallback = (peerId: PeerId): Promise => + this.protocol.send(encoder, message, peerId); + this.retryManager.push( + sendCallback.bind(this), + options.maxAttempts || DEFAULT_MAX_ATTEMPTS ); } + + return results; } } diff --git a/packages/sdk/src/light_push/retry_manager.spec.ts b/packages/sdk/src/light_push/retry_manager.spec.ts new file mode 100644 index 0000000000..3d3daabbdd --- /dev/null +++ b/packages/sdk/src/light_push/retry_manager.spec.ts @@ -0,0 +1,129 @@ +import type { PeerId } from "@libp2p/interface"; +import { type CoreProtocolResult, ProtocolError } from "@waku/interfaces"; +import { expect } from "chai"; +import sinon from "sinon"; + +import { PeerManager } from "../peer_manager/index.js"; + +import { RetryManager, ScheduledTask } from "./retry_manager.js"; + +describe("RetryManager", () => { + let retryManager: RetryManager; + let peerManager: PeerManager; + let mockPeerId: PeerId; + let clock: sinon.SinonFakeTimers; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + + mockPeerId = { toString: () => "test-peer-id" } as PeerId; + peerManager = { + getPeers: () => [mockPeerId], + requestRenew: sinon.spy(), + start: sinon.spy(), + stop: sinon.spy() + } as unknown as PeerManager; + + retryManager = new RetryManager({ + peerManager, + retryIntervalMs: 100 + }); + }); + + afterEach(() => { + clock.restore(); + retryManager.stop(); + sinon.restore(); + }); + + it("should start and stop interval correctly", () => { + const setIntervalSpy = sinon.spy(global, "setInterval"); + const clearIntervalSpy = sinon.spy(global, "clearInterval"); + + retryManager.start(); + expect(setIntervalSpy.calledOnce).to.be.true; + + retryManager.stop(); + expect(clearIntervalSpy.calledOnce).to.be.true; + }); + + it("should process tasks in queue", async () => { + const successCallback = sinon.spy( + async (peerId: PeerId): Promise => ({ + success: peerId, + failure: null + }) + ); + + retryManager.push(successCallback, 3); + retryManager.start(); + + clock.tick(1000); + + expect(successCallback.calledOnce, "called").to.be.true; + expect(successCallback.calledWith(mockPeerId), "called with peer").to.be + .true; + }); + + it("should retry failed tasks", async () => { + const failingCallback = sinon.spy( + async (): Promise => ({ + success: null, + failure: { error: "test error" as any } + }) + ); + + const queue = (retryManager as any)["queue"] as ScheduledTask[]; + + const task = { callback: failingCallback, maxAttempts: 2 }; + await (retryManager as any)["taskExecutor"](task); + + expect(failingCallback.calledOnce, "executed callback").to.be.true; + expect( + queue.some((t) => t.maxAttempts === 1), + "task attempt decreased" + ).to.be.true; + }); + + it("should request peer renewal on specific errors", async () => { + const errorCallback = sinon.spy(async (): Promise => { + throw new Error(ProtocolError.NO_PEER_AVAILABLE); + }); + + await (retryManager as any)["taskExecutor"]({ + callback: errorCallback, + maxAttempts: 1 + }); + + expect((peerManager.requestRenew as sinon.SinonSpy).calledOnce).to.be.true; + expect((peerManager.requestRenew as sinon.SinonSpy).calledWith(mockPeerId)) + .to.be.true; + }); + + it("should handle task timeouts", async () => { + const slowCallback = sinon.spy(async (): Promise => { + await new Promise((resolve) => setTimeout(resolve, 15000)); + return { success: mockPeerId, failure: null }; + }); + + const task = { callback: slowCallback, maxAttempts: 1 }; + const executionPromise = (retryManager as any)["taskExecutor"](task); + + clock.tick(11000); + await executionPromise; + + expect(slowCallback.calledOnce).to.be.true; + }); + + it("should respect max attempts limit", async () => { + const failingCallback = sinon.spy(async (): Promise => { + throw new Error("test error" as any); + }); + + const task = { callback: failingCallback, maxAttempts: 0 }; + await (retryManager as any)["taskExecutor"](task); + + expect(failingCallback.calledOnce).to.be.true; + expect(task.maxAttempts).to.equal(0); + }); +}); diff --git a/packages/sdk/src/light_push/retry_manager.ts b/packages/sdk/src/light_push/retry_manager.ts new file mode 100644 index 0000000000..3067847fa7 --- /dev/null +++ b/packages/sdk/src/light_push/retry_manager.ts @@ -0,0 +1,138 @@ +import type { PeerId } from "@libp2p/interface"; +import type { CoreProtocolResult } from "@waku/interfaces"; +import { Logger } from "@waku/utils"; + +import type { PeerManager } from "../peer_manager/index.js"; + +import { shouldPeerBeChanged, timeout } from "./utils.js"; + +type RetryManagerConfig = { + retryIntervalMs: number; + peerManager: PeerManager; +}; + +type AttemptCallback = (peerId: PeerId) => Promise; + +export type ScheduledTask = { + maxAttempts: number; + callback: AttemptCallback; +}; + +const MAX_CONCURRENT_TASKS = 5; +const TASK_TIMEOUT_MS = 10_000; + +const log = new Logger("sdk:retry-manager"); + +export class RetryManager { + private intervalID: number | null = null; + private readonly retryIntervalMs: number; + + private inProgress: number = 0; + private readonly queue: ScheduledTask[] = []; + + private readonly peerManager: PeerManager; + + public constructor(config: RetryManagerConfig) { + this.peerManager = config.peerManager; + this.retryIntervalMs = config.retryIntervalMs; + } + + public start(): void { + this.intervalID = setInterval(() => { + this.processQueue(); + }, this.retryIntervalMs) as unknown as number; + } + + public stop(): void { + if (this.intervalID) { + clearInterval(this.intervalID); + this.intervalID = null; + } + } + + public push(callback: AttemptCallback, maxAttempts: number): void { + this.queue.push({ + maxAttempts, + callback + }); + } + + private processQueue(): void { + if (this.queue.length === 0) { + log.info("processQueue: queue is empty"); + return; + } + + while (this.queue.length && this.inProgress < MAX_CONCURRENT_TASKS) { + const task = this.queue.shift(); + + if (task) { + this.scheduleTask(task); + } + } + } + + private scheduleTask(task: ScheduledTask): void { + const delayedTask = async (): Promise => { + return this.taskExecutor(task); + }; + + // schedule execution ASAP + // need to use setTimeout to avoid blocking main execution + setTimeout(delayedTask as () => void, 100); + } + + private async taskExecutor(task: ScheduledTask): Promise { + const peerId = this.peerManager.getPeers()[0]; + + if (!peerId) { + log.warn("scheduleTask: no peers, skipping"); + return; + } + + try { + this.inProgress += 1; + + const response = await Promise.race([ + timeout(TASK_TIMEOUT_MS), + task.callback(peerId) + ]); + + if (response?.failure) { + throw Error(response.failure.error); + } + + log.info("scheduleTask: executed successfully"); + + if (task.maxAttempts === 0) { + log.warn("scheduleTask: discarded a task due to limit of max attempts"); + return; + } + + this.queue.push({ + ...task, + maxAttempts: task.maxAttempts - 1 + }); + } catch (_err) { + const error = _err as unknown as { message: string }; + + log.error("scheduleTask: task execution failed with error:", error); + + if (shouldPeerBeChanged(error.message)) { + this.peerManager.requestRenew(peerId); + } + + if (task.maxAttempts === 0) { + log.warn("scheduleTask: discarded a task due to limit of max attempts"); + return; + } + + this.queue.push({ + ...task, + maxAttempts: task.maxAttempts - 1 + }); + } finally { + this.inProgress -= 1; + } + } +} diff --git a/packages/sdk/src/light_push/utils.ts b/packages/sdk/src/light_push/utils.ts new file mode 100644 index 0000000000..85afd07a39 --- /dev/null +++ b/packages/sdk/src/light_push/utils.ts @@ -0,0 +1,23 @@ +import { ProtocolError } from "@waku/interfaces"; + +export const shouldPeerBeChanged = ( + failure: string | ProtocolError +): boolean => { + const toBeChanged = + failure === ProtocolError.REMOTE_PEER_REJECTED || + failure === ProtocolError.NO_RESPONSE || + failure === ProtocolError.RLN_PROOF_GENERATION || + failure === ProtocolError.NO_PEER_AVAILABLE; + + if (toBeChanged) { + return true; + } + + return false; +}; + +export const timeout = (timeout: number): Promise => { + return new Promise((_, reject) => + setTimeout(() => reject(new Error("Task timeout")), timeout) + ); +}; diff --git a/packages/sdk/src/peer_manager/peer_manager.spec.ts b/packages/sdk/src/peer_manager/peer_manager.spec.ts index e496f4ccfc..231629cf88 100644 --- a/packages/sdk/src/peer_manager/peer_manager.spec.ts +++ b/packages/sdk/src/peer_manager/peer_manager.spec.ts @@ -36,7 +36,7 @@ describe("PeerManager", () => { ]; sinon.stub(libp2p, "getConnections").returns(connections); - const peers = await peerManager.getPeers(); + const peers = peerManager.getPeers(); expect(peers.length).to.equal(2); }); @@ -48,7 +48,7 @@ describe("PeerManager", () => { ]; sinon.stub(libp2p, "getConnections").returns(connections); - const peerId = await peerManager.requestRenew("1"); + const peerId = peerManager.requestRenew("1"); expect(peerId).to.not.be.undefined; expect(peerId).to.not.equal("1"); }); @@ -59,11 +59,20 @@ describe("PeerManager", () => { peerManager["lockPeerIfNeeded"] = connectSpy; peerManager["requestRenew"] = disconnectSpy; + peerManager.start(); + libp2p.dispatchEvent(new CustomEvent("peer:connect", { detail: "1" })); libp2p.dispatchEvent(new CustomEvent("peer:disconnect", { detail: "1" })); expect(connectSpy.calledOnce).to.be.true; expect(disconnectSpy.calledOnce).to.be.true; + + const removeEventListenerSpy = sinon.spy(libp2p.removeEventListener); + libp2p.removeEventListener = removeEventListenerSpy; + + peerManager.stop(); + + expect(removeEventListenerSpy.callCount).to.eq(2); }); }); diff --git a/packages/sdk/src/peer_manager/peer_manager.ts b/packages/sdk/src/peer_manager/peer_manager.ts index 3a7c704df2..9a8f2d9d84 100644 --- a/packages/sdk/src/peer_manager/peer_manager.ts +++ b/packages/sdk/src/peer_manager/peer_manager.ts @@ -29,7 +29,9 @@ export class PeerManager { params?.config?.numPeersToUse || DEFAULT_NUM_PEERS_TO_USE; this.libp2p = params.libp2p; + } + public start(): void { this.startConnectionListener(); } @@ -37,13 +39,11 @@ export class PeerManager { this.stopConnectionListener(); } - public async getPeers(): Promise { - return Promise.all(this.getLockedConnections().map((c) => c.remotePeer)); + public getPeers(): PeerId[] { + return this.getLockedConnections().map((c) => c.remotePeer); } - public async requestRenew( - peerId: PeerId | string - ): Promise { + public requestRenew(peerId: PeerId | string): PeerId | undefined { const lockedConnections = this.getLockedConnections(); const neededPeers = this.numPeersToUse - lockedConnections.length; @@ -51,15 +51,13 @@ export class PeerManager { return; } - const result = await Promise.all( - this.getUnlockedConnections() - .filter((c) => !c.remotePeer.equals(peerId)) - .slice(0, neededPeers) - .map((c) => this.lockConnection(c)) - .map((c) => c.remotePeer) - ); + const connections = this.getUnlockedConnections() + .filter((c) => !c.remotePeer.equals(peerId)) + .slice(0, neededPeers) + .map((c) => this.lockConnection(c)) + .map((c) => c.remotePeer); - const newPeerId = result[0]; + const newPeerId = connections[0]; if (!newPeerId) { log.warn( @@ -87,15 +85,15 @@ export class PeerManager { private onConnected(event: CustomEvent): void { const peerId = event.detail; - void this.lockPeerIfNeeded(peerId); + this.lockPeerIfNeeded(peerId); } private onDisconnected(event: CustomEvent): void { const peerId = event.detail; - void this.requestRenew(peerId); + this.requestRenew(peerId); } - private async lockPeerIfNeeded(peerId: PeerId): Promise { + private lockPeerIfNeeded(peerId: PeerId): void { const lockedConnections = this.getLockedConnections(); const neededPeers = this.numPeersToUse - lockedConnections.length; diff --git a/packages/sdk/src/store/store.ts b/packages/sdk/src/store/store.ts index cf4e712c5b..3e026f8900 100644 --- a/packages/sdk/src/store/store.ts +++ b/packages/sdk/src/store/store.ts @@ -254,7 +254,7 @@ export class Store implements IStore { } } - const peerIds = await this.peerManager.getPeers(); + const peerIds = this.peerManager.getPeers(); if (peerIds.length > 0) { // TODO(weboko): implement smart way of getting a peer https://github.com/waku-org/js-waku/issues/2243 diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index 55e28003ae..d1b0a43a9c 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -100,7 +100,8 @@ export class WakuNode implements IWaku { this.lightPush = new LightPush({ libp2p, peerManager: this.peerManager, - connectionManager: this.connectionManager + connectionManager: this.connectionManager, + options: options?.lightPush }); } @@ -190,10 +191,13 @@ export class WakuNode implements IWaku { public async start(): Promise { await this.libp2p.start(); + this.peerManager.start(); this.health.start(); + this.lightPush?.start(); } public async stop(): Promise { + this.lightPush?.stop(); this.health.stop(); this.peerManager.stop(); this.connectionManager.stop(); diff --git a/packages/tests/src/lib/runNodes.ts b/packages/tests/src/lib/runNodes.ts index 1a196208c4..f387e90493 100644 --- a/packages/tests/src/lib/runNodes.ts +++ b/packages/tests/src/lib/runNodes.ts @@ -45,7 +45,8 @@ export async function runNodes( const waku_options: CreateNodeOptions = { staticNoiseKey: NOISE_KEY_1, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }, - networkConfig: shardInfo + networkConfig: shardInfo, + lightPush: { numPeersToUse: 2 } }; log.info("Starting js waku node with :", JSON.stringify(waku_options)); diff --git a/packages/tests/src/utils/nodes.ts b/packages/tests/src/utils/nodes.ts index c3e8cc7a9b..6053cfdf63 100644 --- a/packages/tests/src/utils/nodes.ts +++ b/packages/tests/src/utils/nodes.ts @@ -40,7 +40,8 @@ export async function runMultipleNodes( libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }, - networkConfig + networkConfig, + lightPush: { numPeersToUse: numServiceNodes } }; const waku = await createLightNode(wakuOptions); diff --git a/packages/tests/tests/peer-exchange/query.spec.ts b/packages/tests/tests/peer-exchange/query.spec.ts index 8dd70aad28..57be183659 100644 --- a/packages/tests/tests/peer-exchange/query.spec.ts +++ b/packages/tests/tests/peer-exchange/query.spec.ts @@ -104,7 +104,7 @@ describe("Peer Exchange Query", function () { () => resolve({ peerInfos: null, - error: ProtocolError.REQUEST_TIMEOUT + error: ProtocolError.GENERIC_FAIL }), 5000 ) @@ -115,11 +115,7 @@ describe("Peer Exchange Query", function () { queryResult?.peerInfos && queryResult.peerInfos.length === numPeersToRequest; if (hasErrors) { - if (queryResult.error === ProtocolError.REQUEST_TIMEOUT) { - log.warn("Query timed out, retrying..."); - } else { - log.error("Error encountered, retrying...", queryResult.error); - } + log.error("Error encountered, retrying...", queryResult.error); continue; } if (!hasPeerInfos) { diff --git a/packages/tests/tests/sharding/auto_sharding.spec.ts b/packages/tests/tests/sharding/auto_sharding.spec.ts index e2cdca6d41..4d02260761 100644 --- a/packages/tests/tests/sharding/auto_sharding.spec.ts +++ b/packages/tests/tests/sharding/auto_sharding.spec.ts @@ -55,6 +55,7 @@ describe("Autosharding: Running Nodes", function () { contentTopics: [ContentTopic] } }); + await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); await waku.waitForPeers([Protocols.LightPush]); @@ -97,6 +98,7 @@ describe("Autosharding: Running Nodes", function () { contentTopics: [ContentTopic] } }); + await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); await waku.waitForPeers([Protocols.LightPush]); @@ -153,7 +155,7 @@ describe("Autosharding: Running Nodes", function () { contentTopics: [ContentTopic] } }); - + await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); await waku.waitForPeers([Protocols.LightPush]); @@ -217,6 +219,7 @@ describe("Autosharding: Running Nodes", function () { contentTopics: [ContentTopic, ContentTopic2] } }); + await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); await waku.waitForPeers([Protocols.LightPush]); @@ -274,6 +277,7 @@ describe("Autosharding: Running Nodes", function () { contentTopics: [ContentTopic] } }); + await waku.start(); // use a content topic that is not configured const encoder = createEncoder({ diff --git a/packages/tests/tests/sharding/static_sharding.spec.ts b/packages/tests/tests/sharding/static_sharding.spec.ts index f4cdccb4ee..0178dac497 100644 --- a/packages/tests/tests/sharding/static_sharding.spec.ts +++ b/packages/tests/tests/sharding/static_sharding.spec.ts @@ -56,6 +56,7 @@ describe("Static Sharding: Running Nodes", function () { waku = await createLightNode({ networkConfig: shardInfo }); + await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); await waku.waitForPeers([Protocols.LightPush]); @@ -96,6 +97,7 @@ describe("Static Sharding: Running Nodes", function () { waku = await createLightNode({ networkConfig: shardInfo }); + await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); await waku.waitForPeers([Protocols.LightPush]); @@ -145,6 +147,7 @@ describe("Static Sharding: Running Nodes", function () { waku = await createLightNode({ networkConfig: shardInfo }); + await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); await waku.waitForPeers([Protocols.LightPush]); @@ -211,6 +214,7 @@ describe("Static Sharding: Running Nodes", function () { waku = await createLightNode({ networkConfig: shardInfoBothShards }); + await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); await waku.waitForPeers([Protocols.LightPush]); @@ -250,6 +254,7 @@ describe("Static Sharding: Running Nodes", function () { waku = await createLightNode({ networkConfig: shardInfoFirstShard }); + await waku.start(); // use a pubsub topic that is not configured const encoder = createEncoder({