feat: add light push v3 protocol detection and handling

This commit is contained in:
Arseniy Klempner 2025-07-11 16:53:46 -07:00
parent b7bdecc760
commit 539c92f1bb
No known key found for this signature in database
GPG Key ID: 51653F18863BD24B

View File

@ -3,9 +3,11 @@ import {
type CoreProtocolResult,
type IEncoder,
type IMessage,
isSuccess as isV3Success,
type Libp2p,
ProtocolError,
type ThisOrThat
type ThisOrThat,
toProtocolError
} from "@waku/interfaces";
import { PushResponse } from "@waku/proto";
import { isMessageSizeUnderCap } from "@waku/utils";
@ -16,6 +18,7 @@ import { pipe } from "it-pipe";
import { Uint8ArrayList } from "uint8arraylist";
import { StreamManager } from "../stream_manager/index.js";
import { selectOpenConnection } from "../stream_manager/utils.js";
import { PushRpc } from "./push_rpc.js";
import { isRLNResponseError } from "./utils.js";
@ -23,6 +26,8 @@ import { isRLNResponseError } from "./utils.js";
const log = new Logger("light-push");
export const LightPushCodec = "/vac/waku/lightpush/2.0.0-beta1";
export const LightPushCodecV3 = "/vac/waku/lightpush/3.0.0";
export const LightPushCodecs = [LightPushCodecV3, LightPushCodec];
export { PushResponse };
type PreparePushMessageResult = ThisOrThat<"query", PushRpc>;
@ -34,11 +39,49 @@ export class LightPushCore {
private readonly streamManager: StreamManager;
public readonly multicodec = LightPushCodec;
public readonly multicodecs = LightPushCodecs;
public constructor(libp2p: Libp2p) {
public constructor(private libp2p: Libp2p) {
this.streamManager = new StreamManager(LightPushCodec, libp2p.components);
}
private async getProtocolStream(
peerId: PeerId
): Promise<{ stream: Stream; protocol: string }> {
const peer = await this.libp2p.peerStore.get(peerId);
const protocols = peer.protocols;
const supportsV3 = protocols.includes(LightPushCodecV3);
const supportsV2 = protocols.includes(LightPushCodec);
if (!supportsV2 && !supportsV3) {
throw new Error("Peer does not support any Light Push protocol");
}
const protocol = supportsV3 ? LightPushCodecV3 : LightPushCodec;
let stream: Stream;
try {
const connections = this.libp2p.getConnections(peerId);
const connection = selectOpenConnection(connections);
if (!connection) {
throw new Error("No open connection to peer");
}
stream = await connection.newStream(protocol);
} catch (error) {
if (supportsV3 && supportsV2) {
log.warn("Failed to create v3 stream, falling back to v2", error);
stream = await this.streamManager.getStream(peerId);
return { stream, protocol: LightPushCodec };
}
throw error;
}
return { stream, protocol };
}
private async preparePushMessage(
encoder: IEncoder,
message: IMessage
@ -96,17 +139,26 @@ export class LightPushCore {
}
let stream: Stream;
let protocol: string;
try {
stream = await this.streamManager.getStream(peerId);
const result = await this.getProtocolStream(peerId);
stream = result.stream;
protocol = result.protocol;
log.info(`Using protocol ${protocol} for peer ${peerId.toString()}`);
} catch (error) {
log.error("Failed to get stream", error);
return {
success: null,
failure: {
error: ProtocolError.NO_STREAM_AVAILABLE,
peerId: peerId
}
};
try {
stream = await this.streamManager.getStream(peerId);
protocol = LightPushCodec;
} catch (fallbackError) {
return {
success: null,
failure: {
error: ProtocolError.NO_STREAM_AVAILABLE,
peerId: peerId
}
};
}
}
let res: Uint8ArrayList[] | undefined;
@ -160,6 +212,28 @@ export class LightPushCore {
};
}
if (protocol === LightPushCodecV3 && response.statusCode !== undefined) {
if (!isV3Success(response.statusCode)) {
const error = toProtocolError(response.statusCode);
log.error(
`Remote peer rejected with v3 status code ${response.statusCode}: ${response.statusDesc || response.info}`
);
return {
success: null,
failure: {
error,
peerId: peerId
}
};
}
if (response.relayPeerCount !== undefined) {
log.info(`Message relayed to ${response.relayPeerCount} peers`);
}
return { success: peerId, failure: null };
}
if (isRLNResponseError(response.info)) {
log.error("Remote peer fault: RLN generation");
return {