feat: migrate to latest LightPush version (#2281)

* create retry manager

* update tests

* add retry manager tests, update peer manager

* fix start & merge with master

* return send to many logic

* add new error handling

* add sections to protocol errors

* fix check

* up test

* add waku.start in test

* fix check and test

* improve name
This commit is contained in:
Sasha 2025-02-25 22:40:03 +01:00 committed by GitHub
parent 0ede57f387
commit f199d92d60
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 577 additions and 225 deletions

View File

@ -20,7 +20,7 @@ import { Uint8ArrayList } from "uint8arraylist";
import { BaseProtocol } from "../base_protocol.js";
import { PushRpc } from "./push_rpc.js";
import { isRLNResponseError, matchRLNErrorMessage } from "./utils.js";
import { isRLNResponseError } from "./utils.js";
const log = new Logger("light-push");
@ -120,11 +120,12 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
async (source) => await all(source)
);
} catch (err) {
// can fail only because of `stream` abortion
log.error("Failed to send waku light push request", err);
return {
success: null,
failure: {
error: ProtocolError.GENERIC_FAIL,
error: ProtocolError.STREAM_ABORTED,
peerId: peerId
}
};
@ -161,12 +162,11 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
}
if (isRLNResponseError(response.info)) {
const rlnErrorCase = matchRLNErrorMessage(response.info!);
log.error("Remote peer rejected the message: ", rlnErrorCase);
log.error("Remote peer fault: RLN generation");
return {
success: null,
failure: {
error: rlnErrorCase,
error: ProtocolError.RLN_PROOF_GENERATION,
peerId: peerId
}
};

View File

@ -1,31 +1,23 @@
import { ProtocolError } from "@waku/interfaces";
// should match nwaku
// https://github.com/waku-org/nwaku/blob/c3cb06ac6c03f0f382d3941ea53b330f6a8dd127/waku/waku_rln_relay/rln_relay.nim#L309
// https://github.com/waku-org/nwaku/blob/c3cb06ac6c03f0f382d3941ea53b330f6a8dd127/tests/waku_rln_relay/rln/waku_rln_relay_utils.nim#L20
const RLN_GENERATION_PREFIX_ERROR = "could not generate rln-v2 proof";
const RLN_MESSAGE_ID_PREFIX_ERROR =
"could not get new message id to generate an rln proof";
// rare case on nwaku side
// https://github.com/waku-org/nwaku/blob/a4e92a3d02448fd708857b7b6cac2a7faa7eb4f9/waku/waku_lightpush/callbacks.nim#L49
// https://github.com/waku-org/nwaku/blob/a4e92a3d02448fd708857b7b6cac2a7faa7eb4f9/waku/node/waku_node.nim#L1117
const RLN_REMOTE_VALIDATION = "RLN validation failed";
export const isRLNResponseError = (info?: string): boolean => {
if (!info) {
return false;
}
return info.includes(RLN_GENERATION_PREFIX_ERROR);
};
export const matchRLNErrorMessage = (info: string): ProtocolError => {
const rlnErrorMap: { [key: string]: ProtocolError } = {
[ProtocolError.RLN_IDENTITY_MISSING]: ProtocolError.RLN_IDENTITY_MISSING,
[ProtocolError.RLN_MEMBERSHIP_INDEX]: ProtocolError.RLN_MEMBERSHIP_INDEX,
[ProtocolError.RLN_LIMIT_MISSING]: ProtocolError.RLN_LIMIT_MISSING
};
const infoLowerCase = info.toLowerCase();
for (const errorKey in rlnErrorMap) {
if (infoLowerCase.includes(errorKey.toLowerCase())) {
return rlnErrorMap[errorKey];
}
}
return ProtocolError.RLN_PROOF_GENERATION;
return (
info.includes(RLN_GENERATION_PREFIX_ERROR) ||
info.includes(RLN_MESSAGE_ID_PREFIX_ERROR) ||
info.includes(RLN_REMOTE_VALIDATION)
);
};

View File

@ -1,4 +1,23 @@
import { IBaseProtocolCore } from "./protocols.js";
import type { ISender } from "./sender.js";
import type { ISender, ISendOptions } from "./sender.js";
export type ILightPush = ISender & { protocol: IBaseProtocolCore };
export type LightPushProtocolOptions = ISendOptions & {
/**
* The interval in milliseconds to wait before retrying a failed push.
* @default 1000
*/
retryIntervalMs: number;
/**
* Number of peers to send message to.
*
* @default 1
*/
numPeersToUse?: number;
};
export type ILightPush = ISender & {
start: () => void;
stop: () => void;
protocol: IBaseProtocolCore;
};

View File

@ -4,9 +4,10 @@ import type { PeerId } from "@libp2p/interface";
import type { ConnectionManagerOptions } from "./connection_manager.js";
import type { FilterProtocolOptions } from "./filter.js";
import type { CreateLibp2pOptions } from "./libp2p.js";
import type { LightPushProtocolOptions } from "./light_push.js";
import type { IDecodedMessage } from "./message.js";
import { ThisAndThat, ThisOrThat } from "./misc.js";
import { AutoSharding, StaticSharding } from "./sharding.js";
import type { ThisAndThat, ThisOrThat } from "./misc.js";
import type { AutoSharding, StaticSharding } from "./sharding.js";
import type { StoreProtocolOptions } from "./store.js";
export enum Protocols {
@ -62,9 +63,7 @@ export type CreateNodeOptions = {
/**
* Number of peers to connect to, for the usage of the protocol.
* This is used by:
* - Light Push to send messages,
* - Filter to retrieve messages.
* This is used by Filter to retrieve messages.
*
* @default 2.
*/
@ -101,8 +100,15 @@ export type CreateNodeOptions = {
/**
* Options for the Store protocol.
* If not specified - default values are applied.
*/
store?: Partial<StoreProtocolOptions>;
/**
* Options for the LightPush protocol.
* If not specified - default values are applied.
*/
lightPush?: Partial<LightPushProtocolOptions>;
};
export type Callback<T extends IDecodedMessage> = (
@ -110,43 +116,27 @@ export type Callback<T extends IDecodedMessage> = (
) => void | Promise<void>;
export enum ProtocolError {
/** Could not determine the origin of the fault. Best to check connectivity and try again */
GENERIC_FAIL = "Generic error",
//
// GENERAL ERRORS SECTION
//
/**
* Failure to protobuf encode the message. This is not recoverable and needs
* further investigation.
* Could not determine the origin of the fault. Best to check connectivity and try again
* */
GENERIC_FAIL = "Generic error",
/**
* The remote peer rejected the message. Information provided by the remote peer
* is logged. Review message validity, or mitigation for `NO_PEER_AVAILABLE`
* or `DECODE_FAILED` can be used.
*/
ENCODE_FAILED = "Failed to encode",
REMOTE_PEER_REJECTED = "Remote peer rejected",
/**
* Failure to protobuf decode the message. May be due to a remote peer issue,
* ensuring that messages are sent via several peer enable mitigation of this error.
*/
DECODE_FAILED = "Failed to decode",
/**
* The message payload is empty, making the message invalid. Ensure that a non-empty
* payload is set on the outgoing message.
*/
EMPTY_PAYLOAD = "Payload is empty",
/**
* The message size is above the maximum message size allowed on the Waku Network.
* Compressing the message or using an alternative strategy for large messages is recommended.
*/
SIZE_TOO_BIG = "Size is too big",
/**
* The PubsubTopic passed to the send function is not configured on the Waku node.
* Please ensure that the PubsubTopic is used when initializing the Waku node.
*/
TOPIC_NOT_CONFIGURED = "Topic not configured",
/**
* The pubsub topic configured on the decoder does not match the pubsub topic setup on the protocol.
* Ensure that the pubsub topic used for decoder creation is the same as the one used for protocol.
*/
TOPIC_DECODER_MISMATCH = "Topic decoder mismatch",
/**
* The topics passed in the decoders do not match each other, or don't exist at all.
* Ensure that all the pubsub topics used in the decoders are valid and match each other.
*/
INVALID_DECODER_TOPICS = "Invalid decoder topics",
/**
* Failure to find a peer with suitable protocols. This may due to a connection issue.
* Mitigation can be: retrying after a given time period, display connectivity issue
@ -154,47 +144,71 @@ export enum ProtocolError {
* on the connection manager before retrying.
*/
NO_PEER_AVAILABLE = "No peer available",
/**
* Failure to find a stream to the peer. This may be because the connection with the peer is not still alive.
* Mitigation can be: retrying after a given time period, or mitigation for `NO_PEER_AVAILABLE` can be used.
*/
NO_STREAM_AVAILABLE = "No stream available",
/**
* The remote peer did not behave as expected. Mitigation for `NO_PEER_AVAILABLE`
* or `DECODE_FAILED` can be used.
*/
NO_RESPONSE = "No response received",
//
// SEND ERRORS SECTION
//
/**
* The remote peer rejected the message. Information provided by the remote peer
* is logged. Review message validity, or mitigation for `NO_PEER_AVAILABLE`
* or `DECODE_FAILED` can be used.
* Failure to protobuf encode the message. This is not recoverable and needs
* further investigation.
*/
REMOTE_PEER_REJECTED = "Remote peer rejected",
ENCODE_FAILED = "Failed to encode",
/**
* The protocol request timed out without a response. This may be due to a connection issue.
* Mitigation can be: retrying after a given time period
* The message payload is empty, making the message invalid. Ensure that a non-empty
* payload is set on the outgoing message.
*/
REQUEST_TIMEOUT = "Request timeout",
EMPTY_PAYLOAD = "Payload is empty",
/**
* Missing credentials info message.
* nwaku: https://github.com/waku-org/nwaku/blob/c3cb06ac6c03f0f382d3941ea53b330f6a8dd127/waku/waku_rln_relay/group_manager/group_manager_base.nim#L186
* The message size is above the maximum message size allowed on the Waku Network.
* Compressing the message or using an alternative strategy for large messages is recommended.
*/
RLN_IDENTITY_MISSING = "Identity credentials are not set",
SIZE_TOO_BIG = "Size is too big",
/**
* Membership index missing info message.
* nwaku: https://github.com/waku-org/nwaku/blob/c3cb06ac6c03f0f382d3941ea53b330f6a8dd127/waku/waku_rln_relay/group_manager/group_manager_base.nim#L188
* The PubsubTopic passed to the send function is not configured on the Waku node.
* Please ensure that the PubsubTopic is used when initializing the Waku node.
*/
RLN_MEMBERSHIP_INDEX = "Membership index is not set",
TOPIC_NOT_CONFIGURED = "Topic not configured",
/**
* Message limit is missing.
* nwaku: https://github.com/waku-org/nwaku/blob/c3cb06ac6c03f0f382d3941ea53b330f6a8dd127/waku/waku_rln_relay/group_manager/group_manager_base.nim#L190
* Fails when
*/
RLN_LIMIT_MISSING = "User message limit is not set",
STREAM_ABORTED = "Stream aborted",
/**
* General proof generation error message.
* nwaku: https://github.com/waku-org/nwaku/blob/c3cb06ac6c03f0f382d3941ea53b330f6a8dd127/waku/waku_rln_relay/group_manager/group_manager_base.nim#L201C19-L201C42
*/
RLN_PROOF_GENERATION = "Proof generation failed"
RLN_PROOF_GENERATION = "Proof generation failed",
//
// RECEIVE ERRORS SECTION
//
/**
* The pubsub topic configured on the decoder does not match the pubsub topic setup on the protocol.
* Ensure that the pubsub topic used for decoder creation is the same as the one used for protocol.
*/
TOPIC_DECODER_MISMATCH = "Topic decoder mismatch",
/**
* The topics passed in the decoders do not match each other, or don't exist at all.
* Ensure that all the pubsub topics used in the decoders are valid and match each other.
*/
INVALID_DECODER_TOPICS = "Invalid decoder topics"
}
export interface Failure {

View File

@ -1,12 +1,13 @@
import type { IEncoder, IMessage } from "./message.js";
import { SDKProtocolResult } from "./protocols.js";
export type ISenderOptions = {
export type ISendOptions = {
/**
* Enables retry of a message that was failed to be sent.
* @default false
* @default true
*/
autoRetry?: boolean;
/**
* Sets number of attempts if `autoRetry` is enabled.
* @default 3
@ -18,6 +19,6 @@ export interface ISender {
send: (
encoder: IEncoder,
message: IMessage,
sendOptions?: ISenderOptions
sendOptions?: ISendOptions
) => Promise<SDKProtocolResult>;
}

View File

@ -6,7 +6,7 @@ import {
} from "@chainsafe/libp2p-gossipsub";
import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types";
import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types";
import type { PubSub as Libp2pPubsub, PeerId } from "@libp2p/interface";
import type { PubSub as Libp2pPubsub } from "@libp2p/interface";
import { sha256 } from "@noble/hashes/sha256";
import {
ActiveSubscriptions,
@ -123,13 +123,11 @@ export class Relay implements IRelay {
encoder: IEncoder,
message: IMessage
): Promise<SDKProtocolResult> {
const successes: PeerId[] = [];
const { pubsubTopic } = encoder;
if (!this.pubsubTopics.has(pubsubTopic)) {
log.error("Failed to send waku relay: topic not configured");
return {
successes,
successes: [],
failures: [
{
error: ProtocolError.TOPIC_NOT_CONFIGURED
@ -142,7 +140,7 @@ export class Relay implements IRelay {
if (!msg) {
log.error("Failed to encode message, aborting publish");
return {
successes,
successes: [],
failures: [
{
error: ProtocolError.ENCODE_FAILED
@ -154,7 +152,7 @@ export class Relay implements IRelay {
if (!isWireSizeUnderCap(msg)) {
log.error("Failed to send waku relay: message is bigger that 1MB");
return {
successes,
successes: [],
failures: [
{
error: ProtocolError.SIZE_TOO_BIG

View File

@ -167,7 +167,7 @@ export class Filter implements IFilter {
ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics);
const peerIds = await this.peerManager.getPeers();
const peerIds = this.peerManager.getPeers();
if (peerIds.length === 0) {
return {
error: ProtocolError.NO_PEER_AVAILABLE,

View File

@ -110,7 +110,7 @@ export class SubscriptionMonitor {
*/
public async getPeers(): Promise<PeerId[]> {
if (!this.isStarted) {
this.peerIds = await this.peerManager.getPeers();
this.peerIds = this.peerManager.getPeers();
}
return this.peerIds;
@ -221,7 +221,7 @@ export class SubscriptionMonitor {
return;
}
this.peerIds = await this.peerManager.getPeers();
this.peerIds = this.peerManager.getPeers();
await Promise.all(this.peerIds.map((id) => this.subscribe(id)));
}
@ -232,7 +232,7 @@ export class SubscriptionMonitor {
return;
}
this.peerIds = await this.peerManager.getPeers();
this.peerIds = this.peerManager.getPeers();
// we trigger subscribe for peer that was used before
// it will expectedly fail and we will initiate addition of a new peer
@ -257,7 +257,7 @@ export class SubscriptionMonitor {
return;
}
peerId = await this.peerManager.requestRenew(peerId);
peerId = this.peerManager.requestRenew(peerId);
}
}
@ -269,7 +269,7 @@ export class SubscriptionMonitor {
const response = await this.filter.ping(peerId);
if (response.failure && renewOnFirstFail) {
const newPeer = await this.peerManager.requestRenew(peerId);
const newPeer = this.peerManager.requestRenew(peerId);
await this.subscribe(newPeer);
return;
}
@ -286,7 +286,7 @@ export class SubscriptionMonitor {
const madeAttempts = this.pingFailedAttempts.get(peerIdStr) || 0;
if (madeAttempts >= this.config.pingsBeforePeerRenewed) {
const newPeer = await this.peerManager.requestRenew(peerId);
const newPeer = this.peerManager.requestRenew(peerId);
await this.subscribe(newPeer);
}
}

View File

@ -8,7 +8,7 @@ import {
import { Libp2p, ProtocolError } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import sinon from "sinon";
import sinon, { SinonSpy } from "sinon";
import { PeerManager } from "../peer_manager/index.js";
@ -60,7 +60,7 @@ describe("LightPush SDK", () => {
lightPush = mockLightPush({ libp2p, numPeersToUse: 2 });
let sendSpy = sinon.spy(
(_encoder: any, _message: any, peerId: PeerId) =>
({ success: peerId }) as any
Promise.resolve({ success: peerId }) as any
);
lightPush.protocol.send = sendSpy;
@ -68,39 +68,62 @@ describe("LightPush SDK", () => {
payload: utf8ToBytes("test")
});
expect(sendSpy.calledTwice).to.be.true;
expect(result.successes?.length).to.be.eq(2);
expect(sendSpy.calledTwice, "1").to.be.true;
expect(result.successes?.length, "2").to.be.eq(2);
// check if setting another value works
lightPush = mockLightPush({ libp2p, numPeersToUse: 3 });
sendSpy = sinon.spy(
(_encoder: any, _message: any, peerId: PeerId) =>
({ success: peerId }) as any
Promise.resolve({ success: peerId }) as any
);
lightPush.protocol.send = sendSpy;
result = await lightPush.send(encoder, { payload: utf8ToBytes("test") });
expect(sendSpy.calledThrice).to.be.true;
expect(result.successes?.length).to.be.eq(3);
expect(sendSpy.calledThrice, "3").to.be.true;
expect(result.successes?.length, "4").to.be.eq(3);
});
it("should retry on failure if specified", async () => {
it("should retry on complete failure if specified", async () => {
libp2p = mockLibp2p({
peers: [mockPeer("1"), mockPeer("2")]
});
lightPush = mockLightPush({ libp2p });
let sendSpy = sinon.spy((_encoder: any, _message: any, peerId: PeerId) => {
if (peerId.toString() === "1") {
return { success: peerId };
}
return { failure: { error: "problem" } };
});
const sendSpy = sinon.spy((_encoder: any, _message: any, _peerId: PeerId) =>
Promise.resolve({ failure: { error: "problem" } })
);
lightPush.protocol.send = sendSpy as any;
const attemptRetriesSpy = sinon.spy(lightPush["attemptRetries"]);
lightPush["attemptRetries"] = attemptRetriesSpy;
const retryPushSpy = (lightPush as any)["retryManager"].push as SinonSpy;
const result = await lightPush.send(
encoder,
{ payload: utf8ToBytes("test") },
{ autoRetry: true }
);
expect(retryPushSpy.callCount).to.be.eq(1);
expect(result.failures?.length).to.be.eq(2);
});
it("should not retry if at least one success", async () => {
libp2p = mockLibp2p({
peers: [mockPeer("1"), mockPeer("2")]
});
lightPush = mockLightPush({ libp2p });
const sendSpy = sinon.spy(
(_encoder: any, _message: any, peerId: PeerId) => {
if (peerId.toString() === "1") {
return Promise.resolve({ success: peerId });
}
return Promise.resolve({ failure: { error: "problem" } });
}
);
lightPush.protocol.send = sendSpy as any;
const retryPushSpy = (lightPush as any)["retryManager"].push as SinonSpy;
const result = await lightPush.send(
encoder,
@ -108,19 +131,9 @@ describe("LightPush SDK", () => {
{ autoRetry: true }
);
expect(attemptRetriesSpy.callCount).to.be.eq(1);
expect(retryPushSpy.callCount).to.be.eq(0);
expect(result.successes?.length).to.be.eq(1);
expect(result.failures?.length).to.be.eq(1);
sendSpy = sinon.spy(() => ({ failure: { error: "problem" } })) as any;
await lightPush["attemptRetries"](sendSpy as any);
expect(sendSpy.callCount).to.be.eq(3);
sendSpy = sinon.spy(() => ({ failure: { error: "problem" } })) as any;
await lightPush["attemptRetries"](sendSpy as any, 2);
expect(sendSpy.callCount).to.be.eq(2);
});
});
@ -154,7 +167,7 @@ type MockLightPushOptions = {
};
function mockLightPush(options: MockLightPushOptions): LightPush {
return new LightPush({
const lightPush = new LightPush({
connectionManager: {
pubsubTopics: options.pubsubTopics || [PUBSUB_TOPIC]
} as ConnectionManager,
@ -164,8 +177,17 @@ function mockLightPush(options: MockLightPushOptions): LightPush {
.getPeers()
.slice(0, options.numPeersToUse || options.libp2p.getPeers().length)
} as unknown as PeerManager,
libp2p: options.libp2p
libp2p: options.libp2p,
options: {
numPeersToUse: options.numPeersToUse
}
});
(lightPush as any)["retryManager"] = {
push: sinon.spy()
};
return lightPush;
}
function mockPeer(id: string): Peer {

View File

@ -6,59 +6,84 @@ import {
type IEncoder,
ILightPush,
type IMessage,
type ISenderOptions,
type ISendOptions,
type Libp2p,
LightPushProtocolOptions,
ProtocolError,
SDKProtocolResult
} from "@waku/interfaces";
import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils";
import { Logger } from "@waku/utils";
import { PeerManager } from "../peer_manager/index.js";
import { RetryManager } from "./retry_manager.js";
const log = new Logger("sdk:light-push");
const DEFAULT_MAX_ATTEMPTS = 3;
const DEFAULT_SEND_OPTIONS: ISenderOptions = {
autoRetry: false,
maxAttempts: DEFAULT_MAX_ATTEMPTS
const DEFAULT_SEND_OPTIONS: LightPushProtocolOptions = {
autoRetry: true,
retryIntervalMs: 1000,
maxAttempts: DEFAULT_MAX_ATTEMPTS,
numPeersToUse: 1
};
type RetryCallback = (peerId: PeerId) => Promise<CoreProtocolResult>;
type LightPushConstructorParams = {
connectionManager: ConnectionManager;
peerManager: PeerManager;
libp2p: Libp2p;
options?: Partial<LightPushProtocolOptions>;
};
export class LightPush implements ILightPush {
private readonly config: LightPushProtocolOptions;
private readonly retryManager: RetryManager;
private peerManager: PeerManager;
public readonly protocol: LightPushCore;
public constructor(params: LightPushConstructorParams) {
this.config = {
...DEFAULT_SEND_OPTIONS,
...(params.options || {})
} as LightPushProtocolOptions;
this.peerManager = params.peerManager;
this.protocol = new LightPushCore(
params.connectionManager.pubsubTopics,
params.libp2p
);
this.retryManager = new RetryManager({
peerManager: params.peerManager,
retryIntervalMs: this.config.retryIntervalMs
});
}
public start(): void {
this.retryManager.start();
}
public stop(): void {
this.retryManager.stop();
}
public async send(
encoder: IEncoder,
message: IMessage,
options: ISenderOptions = DEFAULT_SEND_OPTIONS
options: ISendOptions = {}
): Promise<SDKProtocolResult> {
const successes: PeerId[] = [];
const failures: Failure[] = [];
options = {
...this.config,
...options
};
const { pubsubTopic } = encoder;
try {
ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics);
} catch (error) {
log.error("Failed to send waku light push: pubsub topic not configured");
log.info("send: attempting to send a message to pubsubTopic:", pubsubTopic);
if (!this.protocol.pubsubTopics.includes(pubsubTopic)) {
return {
successes,
successes: [],
failures: [
{
error: ProtocolError.TOPIC_NOT_CONFIGURED
@ -67,10 +92,13 @@ export class LightPush implements ILightPush {
};
}
const peerIds = await this.peerManager.getPeers();
const peerIds = this.peerManager
.getPeers()
.slice(0, this.config.numPeersToUse);
if (peerIds.length === 0) {
return {
successes,
successes: [],
failures: [
{
error: ProtocolError.NO_PEER_AVAILABLE
@ -79,65 +107,35 @@ export class LightPush implements ILightPush {
};
}
const results = await Promise.allSettled(
peerIds.map((id) => this.protocol.send(encoder, message, id))
const coreResults: CoreProtocolResult[] = await Promise.all(
peerIds.map((peerId) =>
this.protocol.send(encoder, message, peerId).catch((_e) => ({
success: null,
failure: {
error: ProtocolError.GENERIC_FAIL
}
}))
)
);
for (const result of results) {
if (result.status !== "fulfilled") {
log.error("Failed unexpectedly while sending:", result.reason);
failures.push({ error: ProtocolError.GENERIC_FAIL });
continue;
}
const { failure, success } = result.value;
if (success) {
successes.push(success);
continue;
}
if (failure) {
failures.push(failure);
if (options?.autoRetry) {
void this.attemptRetries(
(id: PeerId) => this.protocol.send(encoder, message, id),
options.maxAttempts
);
}
}
}
return {
successes,
failures
const results: SDKProtocolResult = {
successes: coreResults
.filter((v) => v.success)
.map((v) => v.success) as PeerId[],
failures: coreResults
.filter((v) => v.failure)
.map((v) => v.failure) as Failure[]
};
}
private async attemptRetries(
fn: RetryCallback,
maxAttempts?: number
): Promise<void> {
maxAttempts = maxAttempts || DEFAULT_MAX_ATTEMPTS;
const peerIds = await this.peerManager.getPeers();
if (peerIds.length === 0) {
log.warn("Cannot retry with no connected peers.");
return;
}
for (let i = 0; i < maxAttempts; i++) {
const id = peerIds[i % peerIds.length]; // always present as we checked for the length already
const response = await fn(id);
if (response.success) {
return;
}
log.info(
`Attempted retry for peer:${id} failed with:${response?.failure?.error}`
if (options.autoRetry && results.successes.length === 0) {
const sendCallback = (peerId: PeerId): Promise<CoreProtocolResult> =>
this.protocol.send(encoder, message, peerId);
this.retryManager.push(
sendCallback.bind(this),
options.maxAttempts || DEFAULT_MAX_ATTEMPTS
);
}
return results;
}
}

View File

@ -0,0 +1,129 @@
import type { PeerId } from "@libp2p/interface";
import { type CoreProtocolResult, ProtocolError } from "@waku/interfaces";
import { expect } from "chai";
import sinon from "sinon";
import { PeerManager } from "../peer_manager/index.js";
import { RetryManager, ScheduledTask } from "./retry_manager.js";
describe("RetryManager", () => {
let retryManager: RetryManager;
let peerManager: PeerManager;
let mockPeerId: PeerId;
let clock: sinon.SinonFakeTimers;
beforeEach(() => {
clock = sinon.useFakeTimers();
mockPeerId = { toString: () => "test-peer-id" } as PeerId;
peerManager = {
getPeers: () => [mockPeerId],
requestRenew: sinon.spy(),
start: sinon.spy(),
stop: sinon.spy()
} as unknown as PeerManager;
retryManager = new RetryManager({
peerManager,
retryIntervalMs: 100
});
});
afterEach(() => {
clock.restore();
retryManager.stop();
sinon.restore();
});
it("should start and stop interval correctly", () => {
const setIntervalSpy = sinon.spy(global, "setInterval");
const clearIntervalSpy = sinon.spy(global, "clearInterval");
retryManager.start();
expect(setIntervalSpy.calledOnce).to.be.true;
retryManager.stop();
expect(clearIntervalSpy.calledOnce).to.be.true;
});
it("should process tasks in queue", async () => {
const successCallback = sinon.spy(
async (peerId: PeerId): Promise<CoreProtocolResult> => ({
success: peerId,
failure: null
})
);
retryManager.push(successCallback, 3);
retryManager.start();
clock.tick(1000);
expect(successCallback.calledOnce, "called").to.be.true;
expect(successCallback.calledWith(mockPeerId), "called with peer").to.be
.true;
});
it("should retry failed tasks", async () => {
const failingCallback = sinon.spy(
async (): Promise<CoreProtocolResult> => ({
success: null,
failure: { error: "test error" as any }
})
);
const queue = (retryManager as any)["queue"] as ScheduledTask[];
const task = { callback: failingCallback, maxAttempts: 2 };
await (retryManager as any)["taskExecutor"](task);
expect(failingCallback.calledOnce, "executed callback").to.be.true;
expect(
queue.some((t) => t.maxAttempts === 1),
"task attempt decreased"
).to.be.true;
});
it("should request peer renewal on specific errors", async () => {
const errorCallback = sinon.spy(async (): Promise<CoreProtocolResult> => {
throw new Error(ProtocolError.NO_PEER_AVAILABLE);
});
await (retryManager as any)["taskExecutor"]({
callback: errorCallback,
maxAttempts: 1
});
expect((peerManager.requestRenew as sinon.SinonSpy).calledOnce).to.be.true;
expect((peerManager.requestRenew as sinon.SinonSpy).calledWith(mockPeerId))
.to.be.true;
});
it("should handle task timeouts", async () => {
const slowCallback = sinon.spy(async (): Promise<CoreProtocolResult> => {
await new Promise((resolve) => setTimeout(resolve, 15000));
return { success: mockPeerId, failure: null };
});
const task = { callback: slowCallback, maxAttempts: 1 };
const executionPromise = (retryManager as any)["taskExecutor"](task);
clock.tick(11000);
await executionPromise;
expect(slowCallback.calledOnce).to.be.true;
});
it("should respect max attempts limit", async () => {
const failingCallback = sinon.spy(async (): Promise<CoreProtocolResult> => {
throw new Error("test error" as any);
});
const task = { callback: failingCallback, maxAttempts: 0 };
await (retryManager as any)["taskExecutor"](task);
expect(failingCallback.calledOnce).to.be.true;
expect(task.maxAttempts).to.equal(0);
});
});

View File

@ -0,0 +1,138 @@
import type { PeerId } from "@libp2p/interface";
import type { CoreProtocolResult } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import type { PeerManager } from "../peer_manager/index.js";
import { shouldPeerBeChanged, timeout } from "./utils.js";
type RetryManagerConfig = {
retryIntervalMs: number;
peerManager: PeerManager;
};
type AttemptCallback = (peerId: PeerId) => Promise<CoreProtocolResult>;
export type ScheduledTask = {
maxAttempts: number;
callback: AttemptCallback;
};
const MAX_CONCURRENT_TASKS = 5;
const TASK_TIMEOUT_MS = 10_000;
const log = new Logger("sdk:retry-manager");
export class RetryManager {
private intervalID: number | null = null;
private readonly retryIntervalMs: number;
private inProgress: number = 0;
private readonly queue: ScheduledTask[] = [];
private readonly peerManager: PeerManager;
public constructor(config: RetryManagerConfig) {
this.peerManager = config.peerManager;
this.retryIntervalMs = config.retryIntervalMs;
}
public start(): void {
this.intervalID = setInterval(() => {
this.processQueue();
}, this.retryIntervalMs) as unknown as number;
}
public stop(): void {
if (this.intervalID) {
clearInterval(this.intervalID);
this.intervalID = null;
}
}
public push(callback: AttemptCallback, maxAttempts: number): void {
this.queue.push({
maxAttempts,
callback
});
}
private processQueue(): void {
if (this.queue.length === 0) {
log.info("processQueue: queue is empty");
return;
}
while (this.queue.length && this.inProgress < MAX_CONCURRENT_TASKS) {
const task = this.queue.shift();
if (task) {
this.scheduleTask(task);
}
}
}
private scheduleTask(task: ScheduledTask): void {
const delayedTask = async (): Promise<void> => {
return this.taskExecutor(task);
};
// schedule execution ASAP
// need to use setTimeout to avoid blocking main execution
setTimeout(delayedTask as () => void, 100);
}
private async taskExecutor(task: ScheduledTask): Promise<void> {
const peerId = this.peerManager.getPeers()[0];
if (!peerId) {
log.warn("scheduleTask: no peers, skipping");
return;
}
try {
this.inProgress += 1;
const response = await Promise.race([
timeout(TASK_TIMEOUT_MS),
task.callback(peerId)
]);
if (response?.failure) {
throw Error(response.failure.error);
}
log.info("scheduleTask: executed successfully");
if (task.maxAttempts === 0) {
log.warn("scheduleTask: discarded a task due to limit of max attempts");
return;
}
this.queue.push({
...task,
maxAttempts: task.maxAttempts - 1
});
} catch (_err) {
const error = _err as unknown as { message: string };
log.error("scheduleTask: task execution failed with error:", error);
if (shouldPeerBeChanged(error.message)) {
this.peerManager.requestRenew(peerId);
}
if (task.maxAttempts === 0) {
log.warn("scheduleTask: discarded a task due to limit of max attempts");
return;
}
this.queue.push({
...task,
maxAttempts: task.maxAttempts - 1
});
} finally {
this.inProgress -= 1;
}
}
}

View File

@ -0,0 +1,23 @@
import { ProtocolError } from "@waku/interfaces";
export const shouldPeerBeChanged = (
failure: string | ProtocolError
): boolean => {
const toBeChanged =
failure === ProtocolError.REMOTE_PEER_REJECTED ||
failure === ProtocolError.NO_RESPONSE ||
failure === ProtocolError.RLN_PROOF_GENERATION ||
failure === ProtocolError.NO_PEER_AVAILABLE;
if (toBeChanged) {
return true;
}
return false;
};
export const timeout = (timeout: number): Promise<void> => {
return new Promise((_, reject) =>
setTimeout(() => reject(new Error("Task timeout")), timeout)
);
};

View File

@ -36,7 +36,7 @@ describe("PeerManager", () => {
];
sinon.stub(libp2p, "getConnections").returns(connections);
const peers = await peerManager.getPeers();
const peers = peerManager.getPeers();
expect(peers.length).to.equal(2);
});
@ -48,7 +48,7 @@ describe("PeerManager", () => {
];
sinon.stub(libp2p, "getConnections").returns(connections);
const peerId = await peerManager.requestRenew("1");
const peerId = peerManager.requestRenew("1");
expect(peerId).to.not.be.undefined;
expect(peerId).to.not.equal("1");
});
@ -59,11 +59,20 @@ describe("PeerManager", () => {
peerManager["lockPeerIfNeeded"] = connectSpy;
peerManager["requestRenew"] = disconnectSpy;
peerManager.start();
libp2p.dispatchEvent(new CustomEvent("peer:connect", { detail: "1" }));
libp2p.dispatchEvent(new CustomEvent("peer:disconnect", { detail: "1" }));
expect(connectSpy.calledOnce).to.be.true;
expect(disconnectSpy.calledOnce).to.be.true;
const removeEventListenerSpy = sinon.spy(libp2p.removeEventListener);
libp2p.removeEventListener = removeEventListenerSpy;
peerManager.stop();
expect(removeEventListenerSpy.callCount).to.eq(2);
});
});

View File

@ -29,7 +29,9 @@ export class PeerManager {
params?.config?.numPeersToUse || DEFAULT_NUM_PEERS_TO_USE;
this.libp2p = params.libp2p;
}
public start(): void {
this.startConnectionListener();
}
@ -37,13 +39,11 @@ export class PeerManager {
this.stopConnectionListener();
}
public async getPeers(): Promise<PeerId[]> {
return Promise.all(this.getLockedConnections().map((c) => c.remotePeer));
public getPeers(): PeerId[] {
return this.getLockedConnections().map((c) => c.remotePeer);
}
public async requestRenew(
peerId: PeerId | string
): Promise<PeerId | undefined> {
public requestRenew(peerId: PeerId | string): PeerId | undefined {
const lockedConnections = this.getLockedConnections();
const neededPeers = this.numPeersToUse - lockedConnections.length;
@ -51,15 +51,13 @@ export class PeerManager {
return;
}
const result = await Promise.all(
this.getUnlockedConnections()
.filter((c) => !c.remotePeer.equals(peerId))
.slice(0, neededPeers)
.map((c) => this.lockConnection(c))
.map((c) => c.remotePeer)
);
const connections = this.getUnlockedConnections()
.filter((c) => !c.remotePeer.equals(peerId))
.slice(0, neededPeers)
.map((c) => this.lockConnection(c))
.map((c) => c.remotePeer);
const newPeerId = result[0];
const newPeerId = connections[0];
if (!newPeerId) {
log.warn(
@ -87,15 +85,15 @@ export class PeerManager {
private onConnected(event: CustomEvent<PeerId>): void {
const peerId = event.detail;
void this.lockPeerIfNeeded(peerId);
this.lockPeerIfNeeded(peerId);
}
private onDisconnected(event: CustomEvent<PeerId>): void {
const peerId = event.detail;
void this.requestRenew(peerId);
this.requestRenew(peerId);
}
private async lockPeerIfNeeded(peerId: PeerId): Promise<void> {
private lockPeerIfNeeded(peerId: PeerId): void {
const lockedConnections = this.getLockedConnections();
const neededPeers = this.numPeersToUse - lockedConnections.length;

View File

@ -254,7 +254,7 @@ export class Store implements IStore {
}
}
const peerIds = await this.peerManager.getPeers();
const peerIds = this.peerManager.getPeers();
if (peerIds.length > 0) {
// TODO(weboko): implement smart way of getting a peer https://github.com/waku-org/js-waku/issues/2243

View File

@ -100,7 +100,8 @@ export class WakuNode implements IWaku {
this.lightPush = new LightPush({
libp2p,
peerManager: this.peerManager,
connectionManager: this.connectionManager
connectionManager: this.connectionManager,
options: options?.lightPush
});
}
@ -190,10 +191,13 @@ export class WakuNode implements IWaku {
public async start(): Promise<void> {
await this.libp2p.start();
this.peerManager.start();
this.health.start();
this.lightPush?.start();
}
public async stop(): Promise<void> {
this.lightPush?.stop();
this.health.stop();
this.peerManager.stop();
this.connectionManager.stop();

View File

@ -45,7 +45,8 @@ export async function runNodes<T>(
const waku_options: CreateNodeOptions = {
staticNoiseKey: NOISE_KEY_1,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
networkConfig: shardInfo
networkConfig: shardInfo,
lightPush: { numPeersToUse: 2 }
};
log.info("Starting js waku node with :", JSON.stringify(waku_options));

View File

@ -40,7 +40,8 @@ export async function runMultipleNodes(
libp2p: {
addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] }
},
networkConfig
networkConfig,
lightPush: { numPeersToUse: numServiceNodes }
};
const waku = await createLightNode(wakuOptions);

View File

@ -104,7 +104,7 @@ describe("Peer Exchange Query", function () {
() =>
resolve({
peerInfos: null,
error: ProtocolError.REQUEST_TIMEOUT
error: ProtocolError.GENERIC_FAIL
}),
5000
)
@ -115,11 +115,7 @@ describe("Peer Exchange Query", function () {
queryResult?.peerInfos &&
queryResult.peerInfos.length === numPeersToRequest;
if (hasErrors) {
if (queryResult.error === ProtocolError.REQUEST_TIMEOUT) {
log.warn("Query timed out, retrying...");
} else {
log.error("Error encountered, retrying...", queryResult.error);
}
log.error("Error encountered, retrying...", queryResult.error);
continue;
}
if (!hasPeerInfos) {

View File

@ -55,6 +55,7 @@ describe("Autosharding: Running Nodes", function () {
contentTopics: [ContentTopic]
}
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForPeers([Protocols.LightPush]);
@ -97,6 +98,7 @@ describe("Autosharding: Running Nodes", function () {
contentTopics: [ContentTopic]
}
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForPeers([Protocols.LightPush]);
@ -153,7 +155,7 @@ describe("Autosharding: Running Nodes", function () {
contentTopics: [ContentTopic]
}
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForPeers([Protocols.LightPush]);
@ -217,6 +219,7 @@ describe("Autosharding: Running Nodes", function () {
contentTopics: [ContentTopic, ContentTopic2]
}
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForPeers([Protocols.LightPush]);
@ -274,6 +277,7 @@ describe("Autosharding: Running Nodes", function () {
contentTopics: [ContentTopic]
}
});
await waku.start();
// use a content topic that is not configured
const encoder = createEncoder({

View File

@ -56,6 +56,7 @@ describe("Static Sharding: Running Nodes", function () {
waku = await createLightNode({
networkConfig: shardInfo
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForPeers([Protocols.LightPush]);
@ -96,6 +97,7 @@ describe("Static Sharding: Running Nodes", function () {
waku = await createLightNode({
networkConfig: shardInfo
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForPeers([Protocols.LightPush]);
@ -145,6 +147,7 @@ describe("Static Sharding: Running Nodes", function () {
waku = await createLightNode({
networkConfig: shardInfo
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForPeers([Protocols.LightPush]);
@ -211,6 +214,7 @@ describe("Static Sharding: Running Nodes", function () {
waku = await createLightNode({
networkConfig: shardInfoBothShards
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForPeers([Protocols.LightPush]);
@ -250,6 +254,7 @@ describe("Static Sharding: Running Nodes", function () {
waku = await createLightNode({
networkConfig: shardInfoFirstShard
});
await waku.start();
// use a pubsub topic that is not configured
const encoder = createEncoder({