js-waku/packages/sdk/src/light_push/retry_manager.ts
Arseniy Klempner 16253026c6
feat: implement lp-v3 error codes with backwards compatibility (#2501)
* feat: implement LightPush v3 protocol support

Add comprehensive LightPush v3 protocol implementation with:

Core Features:
- LightPush v3 protocol codec and multicodec detection
- Status code-based error handling and validation
- Protocol version inference and compatibility layers
- Enhanced error types with detailed failure information

Protocol Support:
- Automatic v3/v2 protocol negotiation and fallback
- Status code mapping to LightPush error types
- Protocol version tracking in SDK results
- Mixed protocol environment support

Testing Infrastructure:
- Comprehensive v3 error code handling tests
- Mock functions for v3/v2 response scenarios
- Protocol version detection and validation tests
- Backward compatibility verification

Implementation Details:
- Clean separation between v2 and v3 response handling
- Type-safe status code validation with isSuccess helper
- Enhanced failure reporting with protocol version context
- Proper error propagation through SDK layers

This implementation maintains full backward compatibility with v2
while providing enhanced functionality for v3 protocol features.

* feat: handle both light push protocols

* fix: unsubscribe test

* feat: consolidate lpv2/v3 types

* feat(tests): bump nwaku to 0.36.0

* fix: remove extraneous exports

* fix: add delay to tests

* fix: remove protocol result types

* feat: consolidate light push codec branching

* fix: revert nwaku image

* fix: remove multicodec

* fix: remove protocolversion

* feat: simplify v2/v3 branching logic to use two stream managers

* fix: remove unused utils

* fix: remove comments

* fix: revert store test

* fix: cleanup lightpush sdk

* fix: remove unused util

* fix: remove unused exports

* fix: rename file from public to protocol_handler

* fix: use proper type for sdk result

* fix: update return types in filter

* fix: rebase against latest master

* fix: use both lightpush codecs when waiting for peer

* fix: handle both lp codecs

* fix: remove unused code

* feat: use array for multicodec fields

* fix: add timestamp if missing in v3 rpc

* fix: resolve on either lp codec when waiting for peer

* fix: remove unused util

* fix: remove unnecessary abstraction

* feat: accept nwaku docker image as arg, test lp backwards compat

* fix: revert filter error

* feat: add legacy flag to enable lightpushv2 only

* Revert "feat: accept nwaku docker image as arg, test lp backwards compat"

This reverts commit 857e12cbc73305e5c51abd057665bd34708b2737.

* fix: remove unused test

* feat: improve lp3 (#2597)

* improve light push core

* move back to singualar multicodec property, enable array prop only for light push

* implement v2/v3 interop e2e test, re-add useLegacy flag, ensure e2e runs for v2 and v3

* fix v2 v3 condition

* generate message package earlier

* add log, fix condition

---------

Co-authored-by: Sasha <118575614+weboko@users.noreply.github.com>
Co-authored-by: Sasha <oleksandr@status.im>
2025-09-05 00:52:37 +02:00

173 lines
4.0 KiB
TypeScript

import type { PeerId } from "@libp2p/interface";
import {
type IRoutingInfo,
type LightPushCoreResult,
Protocols
} from "@waku/interfaces";
import { Logger } from "@waku/utils";
import type { PeerManager } from "../peer_manager/index.js";
import { shouldPeerBeChanged, timeout } from "./utils.js";
type RetryManagerConfig = {
retryIntervalMs: number;
peerManager: PeerManager;
};
type AttemptCallback = (peerId: PeerId) => Promise<LightPushCoreResult>;
export type ScheduledTask = {
maxAttempts: number;
routingInfo: IRoutingInfo;
callback: AttemptCallback;
};
const MAX_CONCURRENT_TASKS = 5;
const TASK_TIMEOUT_MS = 10_000;
const log = new Logger("sdk:retry-manager");
export class RetryManager {
private intervalID: number | null = null;
private readonly retryIntervalMs: number;
private inProgress: number = 0;
private readonly queue: ScheduledTask[] = [];
private readonly peerManager: PeerManager;
public constructor(config: RetryManagerConfig) {
this.peerManager = config.peerManager;
this.retryIntervalMs = config.retryIntervalMs || 1000;
}
public start(): void {
this.intervalID = setInterval(() => {
this.processQueue();
}, this.retryIntervalMs) as unknown as number;
}
public stop(): void {
if (this.intervalID) {
clearInterval(this.intervalID);
this.intervalID = null;
}
}
public push(
callback: AttemptCallback,
maxAttempts: number,
routingInfo: IRoutingInfo
): void {
this.queue.push({
maxAttempts,
callback,
routingInfo
});
}
private processQueue(): void {
if (this.queue.length === 0) {
return;
}
while (this.queue.length && this.inProgress < MAX_CONCURRENT_TASKS) {
const task = this.queue.shift();
if (task) {
this.scheduleTask(task);
}
}
}
private scheduleTask(task: ScheduledTask): void {
const delayedTask = async (): Promise<void> => {
return this.taskExecutor(task);
};
// schedule execution ASAP
// need to use setTimeout to avoid blocking main execution
setTimeout(delayedTask as () => void, 100);
}
private async taskExecutor(task: ScheduledTask): Promise<void> {
if (task.maxAttempts <= 0) {
log.warn("scheduleTask: max attempts has reached, removing from queue");
return;
}
const peerId = (
await this.peerManager.getPeers({
protocol: Protocols.LightPush,
pubsubTopic: task.routingInfo.pubsubTopic
})
)[0];
if (!peerId) {
log.warn("scheduleTask: no peers, putting back to queue");
this.queue.push({
...task,
maxAttempts: task.maxAttempts - 1
});
return;
}
try {
this.inProgress += 1;
const response = await Promise.race([
timeout(TASK_TIMEOUT_MS),
task.callback(peerId)
]);
// If timeout resolves first, response will be void (undefined)
// In this case, we should treat it as a timeout error
if (response === undefined) {
throw new Error("Task timeout");
}
if (response.failure) {
throw Error(response.failure.error);
}
log.info("scheduleTask: executed successfully");
if (task.maxAttempts === 0) {
log.warn("scheduleTask: discarded a task due to limit of max attempts");
return;
}
this.queue.push({
...task,
maxAttempts: task.maxAttempts - 1
});
} catch (_err) {
const error = _err as unknown as { message: string };
log.error("scheduleTask: task execution failed with error:", error);
if (shouldPeerBeChanged(error.message)) {
await this.peerManager.renewPeer(peerId, {
protocol: Protocols.LightPush,
pubsubTopic: task.routingInfo.pubsubTopic
});
}
if (task.maxAttempts === 0) {
log.warn("scheduleTask: discarded a task due to limit of max attempts");
return;
}
this.queue.push({
...task,
maxAttempts: task.maxAttempts - 1
});
} finally {
this.inProgress -= 1;
}
}
}