diff --git a/packages/core/src/lib/light_push/light_push.ts b/packages/core/src/lib/light_push/light_push.ts index 6c2430e5a5..a528dc4ecb 100644 --- a/packages/core/src/lib/light_push/light_push.ts +++ b/packages/core/src/lib/light_push/light_push.ts @@ -3,9 +3,11 @@ import { type CoreProtocolResult, type IEncoder, type IMessage, + isSuccess as isV3Success, type Libp2p, ProtocolError, - type ThisOrThat + type ThisOrThat, + toProtocolError } from "@waku/interfaces"; import { PushResponse } from "@waku/proto"; import { isMessageSizeUnderCap } from "@waku/utils"; @@ -16,6 +18,7 @@ import { pipe } from "it-pipe"; import { Uint8ArrayList } from "uint8arraylist"; import { StreamManager } from "../stream_manager/index.js"; +import { selectOpenConnection } from "../stream_manager/utils.js"; import { PushRpc } from "./push_rpc.js"; import { isRLNResponseError } from "./utils.js"; @@ -23,6 +26,8 @@ import { isRLNResponseError } from "./utils.js"; const log = new Logger("light-push"); export const LightPushCodec = "/vac/waku/lightpush/2.0.0-beta1"; +export const LightPushCodecV3 = "/vac/waku/lightpush/3.0.0"; +export const LightPushCodecs = [LightPushCodecV3, LightPushCodec]; export { PushResponse }; type PreparePushMessageResult = ThisOrThat<"query", PushRpc>; @@ -34,11 +39,49 @@ export class LightPushCore { private readonly streamManager: StreamManager; public readonly multicodec = LightPushCodec; + public readonly multicodecs = LightPushCodecs; - public constructor(libp2p: Libp2p) { + public constructor(private libp2p: Libp2p) { this.streamManager = new StreamManager(LightPushCodec, libp2p.components); } + private async getProtocolStream( + peerId: PeerId + ): Promise<{ stream: Stream; protocol: string }> { + const peer = await this.libp2p.peerStore.get(peerId); + const protocols = peer.protocols; + + const supportsV3 = protocols.includes(LightPushCodecV3); + const supportsV2 = protocols.includes(LightPushCodec); + + if (!supportsV2 && !supportsV3) { + throw new Error("Peer does not support any Light Push protocol"); + } + + const protocol = supportsV3 ? LightPushCodecV3 : LightPushCodec; + + let stream: Stream; + try { + const connections = this.libp2p.getConnections(peerId); + const connection = selectOpenConnection(connections); + + if (!connection) { + throw new Error("No open connection to peer"); + } + + stream = await connection.newStream(protocol); + } catch (error) { + if (supportsV3 && supportsV2) { + log.warn("Failed to create v3 stream, falling back to v2", error); + stream = await this.streamManager.getStream(peerId); + return { stream, protocol: LightPushCodec }; + } + throw error; + } + + return { stream, protocol }; + } + private async preparePushMessage( encoder: IEncoder, message: IMessage @@ -96,17 +139,26 @@ export class LightPushCore { } let stream: Stream; + let protocol: string; try { - stream = await this.streamManager.getStream(peerId); + const result = await this.getProtocolStream(peerId); + stream = result.stream; + protocol = result.protocol; + log.info(`Using protocol ${protocol} for peer ${peerId.toString()}`); } catch (error) { log.error("Failed to get stream", error); - return { - success: null, - failure: { - error: ProtocolError.NO_STREAM_AVAILABLE, - peerId: peerId - } - }; + try { + stream = await this.streamManager.getStream(peerId); + protocol = LightPushCodec; + } catch (fallbackError) { + return { + success: null, + failure: { + error: ProtocolError.NO_STREAM_AVAILABLE, + peerId: peerId + } + }; + } } let res: Uint8ArrayList[] | undefined; @@ -160,6 +212,28 @@ export class LightPushCore { }; } + if (protocol === LightPushCodecV3 && response.statusCode !== undefined) { + if (!isV3Success(response.statusCode)) { + const error = toProtocolError(response.statusCode); + log.error( + `Remote peer rejected with v3 status code ${response.statusCode}: ${response.statusDesc || response.info}` + ); + return { + success: null, + failure: { + error, + peerId: peerId + } + }; + } + + if (response.relayPeerCount !== undefined) { + log.info(`Message relayed to ${response.relayPeerCount} peers`); + } + + return { success: peerId, failure: null }; + } + if (isRLNResponseError(response.info)) { log.error("Remote peer fault: RLN generation"); return {