From fe41e8d2e634b6cdcb0579fb2cdf521c4b93c109 Mon Sep 17 00:00:00 2001 From: Sasha Date: Mon, 13 Oct 2025 00:15:56 +0200 Subject: [PATCH] refactor to new interface --- packages/interfaces/src/index.ts | 1 + packages/interfaces/src/waku.ts | 3 + packages/interfaces/src/webrtc.ts | 93 +++++++++ packages/sdk/src/waku/waku.ts | 18 ++ packages/sdk/src/webrtc/index.ts | 2 +- packages/sdk/src/webrtc/webrtc.ts | 303 +++++++++--------------------- 6 files changed, 210 insertions(+), 210 deletions(-) create mode 100644 packages/interfaces/src/webrtc.ts diff --git a/packages/interfaces/src/index.ts b/packages/interfaces/src/index.ts index ac79a2ed34..f7739613b6 100644 --- a/packages/interfaces/src/index.ts +++ b/packages/interfaces/src/index.ts @@ -18,3 +18,4 @@ export * from "./constants.js"; export * from "./sharding.js"; export * from "./health_status.js"; export * from "./discovery.js"; +export * from "./webrtc.js"; diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts index 71914755f7..00f79ef656 100644 --- a/packages/interfaces/src/waku.ts +++ b/packages/interfaces/src/waku.ts @@ -15,6 +15,7 @@ import type { Protocols } from "./protocols.js"; import type { IRelay } from "./relay.js"; import type { ShardId } from "./sharding.js"; import type { IStore } from "./store.js"; +import type { IWebRTC } from "./webrtc.js"; export type CreateDecoderParams = { contentTopic: string; @@ -62,6 +63,7 @@ export interface IWaku { store?: IStore; filter?: IFilter; lightPush?: ILightPush; + webRTC?: IWebRTC; /** * Emits events related to the Waku node. @@ -272,6 +274,7 @@ export interface LightNode extends IWaku { store: IStore; filter: IFilter; lightPush: ILightPush; + webRTC: IWebRTC; } export interface RelayNode extends IWaku { diff --git a/packages/interfaces/src/webrtc.ts b/packages/interfaces/src/webrtc.ts new file mode 100644 index 0000000000..61de12764d --- /dev/null +++ b/packages/interfaces/src/webrtc.ts @@ -0,0 +1,93 @@ +import type { PeerId, TypedEventEmitter } from "@libp2p/interface"; + +export enum WebRTCEvent { + InboundRequest = "webRTC:inbound-request", + Connected = "webRTC:connected", + Closed = "webRTC:closed", + Rejected = "webRTC:rejected" +} + +export interface IWebRTCEvents { + /** + * Used to listen to incoming WebRTC connection request. + * + * @example + * ```typescript + * waku.addEventListener(WebRTCEvent.Inbound, (event) => { + * const requesterPeerId = event.detail; + * + * if (requesterPeerId.equals(expectedPeerId)) { + * waku.webRTC.accept(requesterPeerId); + * } else { + * waku.webRTC.hangUp(requesterPeerId); + * } + * }); + */ + [WebRTCEvent.InboundRequest]: CustomEvent; + + /** + * Used to listen to get notified when a WebRTC connection is established. + * + * @example + * ```typescript + * waku.addEventListener(WebRTCEvent.Connected, (event) => { + * const connection = event.detail; // RTCPeerConnection + * }); + * ``` + */ + [WebRTCEvent.Connected]: CustomEvent; +} + +export type PeerIdOrString = PeerId | string; + +export type WebRTCDialOptions = { + peerId: PeerIdOrString; + timeoutMs?: number; +}; + +export interface IWebRTC { + /** + * Used to listen to incoming WebRTC connection request or progress of established connections. + */ + events: TypedEventEmitter; + + /** + * Starts the listening to incoming WebRTC connection requests. + */ + start(): Promise; + + /** + * Stops the listening to incoming WebRTC connection requests. + */ + stop(): Promise; + + /** + * Dials a peer using Waku WebRTC protocol. + */ + dial(options: WebRTCDialOptions): Promise; + + /** + * Accepts a WebRTC connection request from a peer. + */ + accept(peerId: PeerIdOrString): void; + + /** + * Hang up a WebRTC connection to a peer or incoming connection request. + */ + hangUp(peerId: PeerIdOrString): void; + + /** + * Checks if a WebRTC connection is established to a peer. + */ + isConnected(peerId: PeerIdOrString): boolean; + + /** + * Gets the list of connected peers using Waku WebRTC protocol. + */ + getConnectedPeers(): PeerId[]; + + /** + * Gets map of WebRTC connections by peer ID. + */ + getConnections(): Record; +} diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index 7336b06df7..8fdcd49eb8 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -20,6 +20,7 @@ import type { IStore, IWaku, IWakuEventEmitter, + IWebRTC, Libp2p, NetworkConfig } from "@waku/interfaces"; @@ -35,6 +36,7 @@ import { HealthIndicator } from "../health_indicator/index.js"; import { LightPush } from "../light_push/index.js"; import { PeerManager } from "../peer_manager/index.js"; import { Store } from "../store/index.js"; +import { WebRTC } from "../webrtc/index.js"; import { waitForRemotePeer } from "./wait_for_remote_peer.js"; @@ -52,6 +54,7 @@ export class WakuNode implements IWaku { public store?: IStore; public filter?: IFilter; public lightPush?: ILightPush; + public webRTC?: IWebRTC; public readonly events: IWakuEventEmitter = new TypedEventEmitter(); @@ -126,6 +129,21 @@ export class WakuNode implements IWaku { }); } + if (this.lightPush && this.filter) { + const webRtcContentTopic = WebRTC.buildContentTopic(this.libp2p.peerId); + + this.webRTC = new WebRTC({ + lightPush: this.lightPush, + filter: this.filter, + encoder: this.createEncoder({ + contentTopic: webRtcContentTopic + }), + decoder: this.createDecoder({ + contentTopic: webRtcContentTopic + }) + }); + } + log.info( "Waku node created", peerId, diff --git a/packages/sdk/src/webrtc/index.ts b/packages/sdk/src/webrtc/index.ts index b080278a78..6eca21c36f 100644 --- a/packages/sdk/src/webrtc/index.ts +++ b/packages/sdk/src/webrtc/index.ts @@ -1 +1 @@ -export WebRTC from "./webrtc.js"; \ No newline at end of file +export { WebRTC } from "./webrtc.js"; diff --git a/packages/sdk/src/webrtc/webrtc.ts b/packages/sdk/src/webrtc/webrtc.ts index 50dee778e3..71057f9f07 100644 --- a/packages/sdk/src/webrtc/webrtc.ts +++ b/packages/sdk/src/webrtc/webrtc.ts @@ -1,236 +1,121 @@ -/* eslint-disable eslint-comments/no-unlimited-disable */ -/* eslint-disable */ -import type { IDecodedMessage, IDecoder, IEncoder, LightNode } from "@waku/sdk"; -import { createDecoder, createEncoder, bytesToUtf8, utf8ToBytes } from "@waku/sdk"; -import type { MediaStreams } from "./media"; -import { AudioSignal, SignalType } from "./audiosignal"; +import { PeerId, TypedEventEmitter } from "@libp2p/interface"; +import type { + IDecodedMessage, + IDecoder, + IEncoder, + IFilter, + ILightPush, + IWebRTC, + IWebRTCEvents, + PeerIdOrString, + WebRTCDialOptions +} from "@waku/interfaces"; -type WakuRTCParams = { - node: LightNode; - config?: RTCConfiguration; +type WebRTCConstructorOptions = { + lightPush: ILightPush; + filter: IFilter; + decoder: IDecoder; + encoder: IEncoder; }; -const DEFAULT_STUN = "stun:stun.l.google.com:19302"; -const DEFAULT_CONTENT_TOPIC = "/waku-phone/1/sig01/proto"; +export class WebRTC implements IWebRTC { + private readonly lightPush: ILightPush; + private readonly filter: IFilter; -export class WakuRTC { - private started = false; - private inProgress = false; - - private readonly node: LightNode; + private readonly decoder: IDecoder; private readonly encoder: IEncoder; - private readonly decoder: IDecoder; - public readonly rtcConnection: RTCPeerConnection; - private iceCandidates: RTCIceCandidate[] = []; + public readonly events: TypedEventEmitter = + new TypedEventEmitter(); - private inboundChannel: RTCDataChannel | undefined; - private readonly outboundChannel: RTCDataChannel; + private isStarted = false; - private filterUnsubscribe: undefined | (() => void); - public mediaStreams: MediaStreams | undefined; - public audioSignal: AudioSignal | undefined; - public isFree: boolean = true; - private inCallwith: string= ''; + public static buildContentTopic(peerId: PeerId): string { + return `/js-waku-webrtc/1/${peerId.toString()}/proto`; + } - public constructor(params: WakuRTCParams) { - this.node = params.node; + public constructor(options: WebRTCConstructorOptions) { + this.lightPush = options.lightPush; + this.filter = options.filter; - this.encoder = createEncoder({ - contentTopic: DEFAULT_CONTENT_TOPIC, - pubsubTopicShardInfo: { - clusterId: 42, - shard: 0 - } - }); - this.decoder = createDecoder(DEFAULT_CONTENT_TOPIC, { clusterId: 42 , shard: 0}); + this.decoder = options.decoder; + this.encoder = options.encoder; - this.rtcConnection = new RTCPeerConnection({ - iceServers: [{ urls: DEFAULT_STUN }], - ...(params.config || {}) - }); - this.outboundChannel = this.rtcConnection.createDataChannel("outbound"); - this.onICECandidate = this.onICECandidate.bind(this); - this.onInboundChannel = this.onInboundChannel.bind(this); + this.handleInboundRequest = this.handleInboundRequest.bind(this); } public async start(): Promise { - if (this.started || this.inProgress) { + if (this.isStarted) { return; } - this.inProgress = true; - - this.rtcConnection.addEventListener("datachannel", this.onInboundChannel); - this.rtcConnection.addEventListener("icecandidate", this.onICECandidate); - - try { - this.filterUnsubscribe = await this.node.filter.subscribeWithUnsubscribe(this.decoder, this.onWakuMessage.bind(this)); - } catch(e) { - console.error("Error while Filter subscribe:", e); - } - - this.inProgress = false; - this.started = true; + this.isStarted = true; + await this.subscribeToInboundRequests(); } public async stop(): Promise { - if (!this.started || this.inProgress) { + if (!this.isStarted) { return; } - this.inProgress = true; + this.isStarted = false; + await this.unsubscribeFromInboundRequests(); + } - this.rtcConnection.removeEventListener("datachannel", this.onInboundChannel); - this.rtcConnection.removeEventListener("icecandidate", this.onICECandidate); + public async dial(options: WebRTCDialOptions): Promise { + // TODO: implement + } - try { - this?.filterUnsubscribe?.(); - } catch(e) { - console.error("Error while Filter unsubscribe:", e); + public accept(peerId: PeerIdOrString): void { + // TODO: implement + } + + public hangUp(peerId: PeerIdOrString): void { + // TODO: implement + } + + public isConnected(peerId: PeerIdOrString): boolean { + // TODO: implement + return false; + } + + public getConnectedPeers(): PeerId[] { + // TODO: implement + return []; + } + + public getConnections(): Record { + // TODO: implement + return {}; + } + + private async subscribeToInboundRequests(): Promise { + await this.filter.subscribe(this.decoder, this.handleInboundRequest); + } + + private async unsubscribeFromInboundRequests(): Promise { + await this.filter.unsubscribe(this.decoder); + } + + private handleInboundRequest(message: IDecodedMessage): void { + /* + const decryptedMessage = decrypt(message.payload, this.privateKey); + switch (decryptedMessage.type) { + case "dial": + break; + case "ack": + break; + case "answer": + break; + case "reject": + break; + case "candidate": + break; + case "close": + break; + default: + break; } - - this.inProgress = false; - this.started = false; + */ } - - public async initiateConnection(peerId: string): Promise { - this.audioSignal?.playSignal(SignalType.RINGING); - - this.inCallwith = peerId; - await this.sendWakuMessage("call", ''); - } - - public async hangupCall(): Promise { - await this.sendWakuMessage("bye", ""); - this.inCallwith = ''; - this.isFree = true; - } - - private onInboundChannel(event: RTCDataChannelEvent): void { - this.inboundChannel = event.channel; - this.inboundChannel.addEventListener("message", (event) => { - console.log("Received message:", event.data); - }); - } - - private async onICECandidate(event: RTCPeerConnectionIceEvent): Promise { - if (!event.candidate) { - return; - } - - this.iceCandidates.push(event.candidate); - await this.sendWakuMessage("candidate", this.iceCandidates); - } - - private async onWakuMessage(message: IDecodedMessage): Promise { - const payload = bytesToUtf8(message.payload); - const data = JSON.parse(payload); - - if (data.receiver !== this.node.peerId.toString() || - data.sender === this.node.peerId.toString()) { - return; - } - console.log("received a waku message with payload:", data); - - if (data.type === "call") { - await this.onConnectionRequestMessage(data.receiver, data.sender); - } else if (data.type === "candidate") { - await this.onCandidateMessage(data.payload); - } else if (data.type === "offer") { - await this.onOfferMessage(data.payload); - } else if (data.type === "answer") { - await this.onAnswerMessage(data.payload); - } else if (data.type === "ready") { - this.onReadyMessage(); - }else if (data.type === "bye"){ - this.onByeMessage(); - } else if (data.type === "busy"){ - this.onBusyMessage(); - } - } - - private async onBusyMessage() { - this.audioSignal?.playSignal(SignalType.BUSY, 5000); - this.isFree = true; - this.inCallwith = ''; - this.rtcConnection.close(); - } - - private async onByeMessage() { - if (this.isFree){ - return; - } - this.isFree = true; - this.inCallwith = ''; - this.rtcConnection.close(); - this.mediaStreams?.stopStreams(); - } - - private async onCandidateMessage(candidates: RTCIceCandidate[]): Promise { - for (const candidate of candidates) { - await this.rtcConnection.addIceCandidate( - new RTCIceCandidate(candidate) - ); - } - } - - private async onOfferMessage(offer: RTCSessionDescriptionInit): Promise { - await this.rtcConnection.setRemoteDescription( - new RTCSessionDescription(offer) - ); - - const answer = await this.rtcConnection.createAnswer(); - this.rtcConnection.setLocalDescription(answer); - - await this.sendWakuMessage("answer", answer); - this.audioSignal?.stopSignal(); - } - - private async onAnswerMessage(answer: RTCSessionDescriptionInit) { - await this.rtcConnection.setRemoteDescription( - new RTCSessionDescription(answer) - ); - } - - private async onConnectionRequestMessage(peerId: string, remotePeerId: string): Promise { - if(!this.isFree) { - await this.sendWakuMessage("busy",'', remotePeerId); - return; - } - //this.mediaStreams?.setupLocalStream(); - //this.mediaStreams?.setupRemoteStream(); - this.isFree = false; - this.inCallwith = remotePeerId; - const offer = await this.rtcConnection.createOffer(); - await this.rtcConnection.setLocalDescription(offer); - - await this.sendWakuMessage("offer", offer); - - if (this.iceCandidates.length) { - await this.sendWakuMessage("candidate", this.iceCandidates); - } - } - - private async onReadyMessage() { - console.log("RTC: partner is ready"); - } - - private async sendWakuMessage(type: string, payload: any, remotePeerId:string=this.inCallwith): Promise { - const response = await this.node.lightPush.send(this.encoder, { - payload: utf8ToBytes(JSON.stringify({ - type, - payload, - sender: this.node.peerId.toString(), - receiver: remotePeerId - })) - }); - - console.log(`sendWakuMessage of type:${type}, with ${response} , receiver ${remotePeerId}`); - } - - public sendChatMessage(message: string): void { - this.outboundChannel.send(message); - } - -} \ No newline at end of file +}