diff --git a/packages/core/src/lib/connection_manager.ts b/packages/core/src/lib/connection_manager.ts index 1692154f5c..95f209e739 100644 --- a/packages/core/src/lib/connection_manager.ts +++ b/packages/core/src/lib/connection_manager.ts @@ -89,6 +89,34 @@ export class ConnectionManager return instance; } + stop(): void { + this.keepAliveManager.stopAll(); + this.libp2p.removeEventListener( + "peer:connect", + this.onEventHandlers["peer:connect"] + ); + this.libp2p.removeEventListener( + "peer:disconnect", + this.onEventHandlers["peer:disconnect"] + ); + this.libp2p.removeEventListener( + "peer:discovery", + this.onEventHandlers["peer:discovery"] + ); + } + + async dropConnection(peerId: PeerId): Promise { + try { + this.keepAliveManager.stop(peerId); + await this.libp2p.hangUp(peerId); + log.info(`Dropped connection with peer ${peerId.toString()}`); + } catch (error) { + log.error( + `Error dropping connection with peer ${peerId.toString()} - ${error}` + ); + } + } + public async getPeersByDiscovery(): Promise { const peersDiscovered = await this.libp2p.peerStore.all(); const peersConnected = this.libp2p @@ -200,22 +228,6 @@ export class ConnectionManager this.startPeerDisconnectionListener(); } - stop(): void { - this.keepAliveManager.stopAll(); - this.libp2p.removeEventListener( - "peer:connect", - this.onEventHandlers["peer:connect"] - ); - this.libp2p.removeEventListener( - "peer:disconnect", - this.onEventHandlers["peer:disconnect"] - ); - this.libp2p.removeEventListener( - "peer:discovery", - this.onEventHandlers["peer:discovery"] - ); - } - private async dialPeer(peerId: PeerId): Promise { this.currentActiveParallelDialCount += 1; let dialAttempt = 0; @@ -298,18 +310,6 @@ export class ConnectionManager } } - private async dropConnection(peerId: PeerId): Promise { - try { - this.keepAliveManager.stop(peerId); - await this.libp2p.hangUp(peerId); - log.info(`Dropped connection with peer ${peerId.toString()}`); - } catch (error) { - log.error( - `Error dropping connection with peer ${peerId.toString()} - ${error}` - ); - } - } - private processDialQueue(): void { if ( this.pendingPeerDialQueue.length > 0 && diff --git a/packages/interfaces/src/connection_manager.ts b/packages/interfaces/src/connection_manager.ts index 339a338684..8c57e8210f 100644 --- a/packages/interfaces/src/connection_manager.ts +++ b/packages/interfaces/src/connection_manager.ts @@ -61,6 +61,7 @@ export interface IConnectionStateEvents { export interface IConnectionManager extends TypedEventEmitter { + dropConnection(peerId: PeerId): Promise; getPeersByDiscovery(): Promise; stop(): void; } diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index b80b68860e..0371587ae7 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -25,7 +25,9 @@ export type IBaseProtocolCore = { }; export type IBaseProtocolSDK = { - numPeers: number; + renewPeer: (peerToDisconnect: PeerId) => Promise; + readonly connectedPeers: Peer[]; + readonly numPeersToUse: number; }; export type ContentTopicInfo = { diff --git a/packages/interfaces/src/sender.ts b/packages/interfaces/src/sender.ts index 2dbe72def9..672395e84c 100644 --- a/packages/interfaces/src/sender.ts +++ b/packages/interfaces/src/sender.ts @@ -2,5 +2,35 @@ import type { IEncoder, IMessage } from "./message.js"; import { SDKProtocolResult } from "./protocols.js"; export interface ISender { - send: (encoder: IEncoder, message: IMessage) => Promise; + send: ( + encoder: IEncoder, + message: IMessage, + sendOptions?: SendOptions + ) => Promise; } + +/** + * Options for using LightPush + */ +export type SendOptions = { + /** + * Optional flag to enable auto-retry with exponential backoff + */ + autoRetry?: boolean; + /** + * Optional flag to force using all available peers + */ + forceUseAllPeers?: boolean; + /** + * Optional maximum number of attempts for exponential backoff + */ + maxAttempts?: number; + /** + * Optional initial delay in milliseconds for exponential backoff + */ + initialDelay?: number; + /** + * Optional maximum delay in milliseconds for exponential backoff + */ + maxDelay?: number; +}; diff --git a/packages/sdk/src/light-node/index.ts b/packages/sdk/src/light-node/index.ts index 079f488063..713aec7d1f 100644 --- a/packages/sdk/src/light-node/index.ts +++ b/packages/sdk/src/light-node/index.ts @@ -1,8 +1,5 @@ import { type Libp2pComponents, type LightNode } from "@waku/interfaces"; -import { wakuFilter } from "../protocols/filter.js"; -import { wakuLightPush } from "../protocols/light_push.js"; -import { wakuStore } from "../protocols/store.js"; import { createLibp2pAndUpdateOptions } from "../utils/libp2p.js"; import { CreateWakuNodeOptions, WakuNode, WakuOptions } from "../waku.js"; @@ -18,15 +15,9 @@ export async function createLightNode( ): Promise { const libp2p = await createLibp2pAndUpdateOptions(options); - const store = wakuStore(options); - const lightPush = wakuLightPush(options); - const filter = wakuFilter(options); - - return new WakuNode( - options as WakuOptions, - libp2p, - store, - lightPush, - filter - ) as LightNode; + return new WakuNode(options as WakuOptions, libp2p, { + store: true, + lightpush: true, + filter: true + }) as LightNode; } diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index 6de20f5bf8..9688859cea 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -1,15 +1,220 @@ -import { IBaseProtocolSDK } from "@waku/interfaces"; +import type { Peer, PeerId } from "@libp2p/interface"; +import { ConnectionManager } from "@waku/core"; +import { BaseProtocol } from "@waku/core/lib/base_protocol"; +import { IBaseProtocolSDK, SendOptions } from "@waku/interfaces"; +import { delay, Logger } from "@waku/utils"; interface Options { numPeersToUse?: number; + maintainPeersInterval?: number; } const DEFAULT_NUM_PEERS_TO_USE = 3; +const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000; export class BaseProtocolSDK implements IBaseProtocolSDK { - public readonly numPeers: number; + public readonly numPeersToUse: number; + private peers: Peer[] = []; + private maintainPeersIntervalId: ReturnType< + typeof window.setInterval + > | null = null; + log: Logger; - constructor(options: Options) { - this.numPeers = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE; + private maintainPeersLock = false; + + constructor( + protected core: BaseProtocol, + private connectionManager: ConnectionManager, + options: Options + ) { + this.log = new Logger(`sdk:${core.multicodec}`); + this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE; + const maintainPeersInterval = + options?.maintainPeersInterval ?? DEFAULT_MAINTAIN_PEERS_INTERVAL; + + void this.startMaintainPeersInterval(maintainPeersInterval); + } + + get connectedPeers(): Peer[] { + return this.peers; + } + + /** + * Disconnects from a peer and tries to find a new one to replace it. + * @param peerToDisconnect The peer to disconnect from. + */ + public async renewPeer(peerToDisconnect: PeerId): Promise { + this.log.info(`Renewing peer ${peerToDisconnect}`); + try { + await this.connectionManager.dropConnection(peerToDisconnect); + this.peers = this.peers.filter((peer) => peer.id !== peerToDisconnect); + this.log.info( + `Peer ${peerToDisconnect} disconnected and removed from the peer list` + ); + + await this.findAndAddPeers(1); + } catch (error) { + this.log.info( + "Peer renewal failed, relying on the interval to find a new peer" + ); + } + } + + /** + * Stops the maintain peers interval. + */ + public stopMaintainPeersInterval(): void { + if (this.maintainPeersIntervalId) { + clearInterval(this.maintainPeersIntervalId); + this.maintainPeersIntervalId = null; + this.log.info("Maintain peers interval stopped"); + } + } + + /** + * Checks if there are peers to send a message to. + * If `forceUseAllPeers` is `false` (default) and there are connected peers, returns `true`. + * If `forceUseAllPeers` is `true` or there are no connected peers, tries to find new peers from the ConnectionManager. + * If `autoRetry` is `false`, returns `false` if no peers are found. + * If `autoRetry` is `true`, tries to find new peers from the ConnectionManager with exponential backoff. + * Returns `true` if peers are found, `false` otherwise. + * @param options Optional options object + * @param options.autoRetry Optional flag to enable auto-retry with exponential backoff (default: false) + * @param options.forceUseAllPeers Optional flag to force using all available peers (default: false) + * @param options.initialDelay Optional initial delay in milliseconds for exponential backoff (default: 10) + * @param options.maxAttempts Optional maximum number of attempts for exponential backoff (default: 3) + * @param options.maxDelay Optional maximum delay in milliseconds for exponential backoff (default: 100) + */ + protected hasPeers = async ( + options: Partial = {} + ): Promise => { + const { + autoRetry = false, + forceUseAllPeers = false, + initialDelay = 10, + maxAttempts = 3, + maxDelay = 100 + } = options; + + if (!forceUseAllPeers && this.connectedPeers.length > 0) return true; + + let attempts = 0; + while (attempts < maxAttempts) { + attempts++; + if (await this.maintainPeers()) { + if (this.peers.length < this.numPeersToUse) { + this.log.warn( + `Found only ${this.peers.length} peers, expected ${this.numPeersToUse}` + ); + } + return true; + } + if (!autoRetry) return false; + const delayMs = Math.min( + initialDelay * Math.pow(2, attempts - 1), + maxDelay + ); + await delay(delayMs); + } + + this.log.error("Failed to find peers to send message to"); + return false; + }; + + /** + * Starts an interval to maintain the peers list to `numPeersToUse`. + * @param interval The interval in milliseconds to maintain the peers. + */ + private async startMaintainPeersInterval(interval: number): Promise { + this.log.info("Starting maintain peers interval"); + try { + await this.maintainPeers(); + this.maintainPeersIntervalId = setInterval(() => { + this.maintainPeers().catch((error) => { + this.log.error("Error during maintain peers interval:", error); + }); + }, interval); + this.log.info( + `Maintain peers interval started with interval ${interval}ms` + ); + } catch (error) { + this.log.error("Error starting maintain peers interval:", error); + throw error; + } + } + + /** + * Maintains the peers list to `numPeersToUse`. + */ + private async maintainPeers(): Promise { + if (this.maintainPeersLock) { + return false; + } + + this.maintainPeersLock = true; + this.log.info(`Maintaining peers, current count: ${this.peers.length}`); + try { + const numPeersToAdd = this.numPeersToUse - this.peers.length; + if (numPeersToAdd > 0) { + await this.findAndAddPeers(numPeersToAdd); + } + this.log.info( + `Peer maintenance completed, current count: ${this.peers.length}` + ); + } finally { + this.maintainPeersLock = false; + } + return true; + } + + /** + * Finds and adds new peers to the peers list. + * @param numPeers The number of peers to find and add. + */ + private async findAndAddPeers(numPeers: number): Promise { + this.log.info(`Finding and adding ${numPeers} new peers`); + try { + const additionalPeers = await this.findAdditionalPeers(numPeers); + this.peers = [...this.peers, ...additionalPeers]; + this.log.info( + `Added ${additionalPeers.length} new peers, total peers: ${this.peers.length}` + ); + } catch (error) { + this.log.error("Error finding and adding new peers:", error); + throw error; + } + } + + /** + * Finds additional peers. + * Attempts to find peers without using bootstrap peers first, + * If no peers are found, + * tries with bootstrap peers. + * @param numPeers The number of peers to find. + */ + private async findAdditionalPeers(numPeers: number): Promise { + this.log.info(`Finding ${numPeers} additional peers`); + try { + let newPeers = await this.core.getPeers({ + maxBootstrapPeers: 0, + numPeers: numPeers + }); + + if (newPeers.length === 0) { + this.log.warn("No new peers found, trying with bootstrap peers"); + newPeers = await this.core.getPeers({ + maxBootstrapPeers: numPeers, + numPeers: numPeers + }); + } + + newPeers = newPeers.filter( + (peer) => this.peers.some((p) => p.id === peer.id) === false + ); + return newPeers; + } catch (error) { + this.log.error("Error finding additional peers:", error); + throw error; + } } } diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index ebd76e2f1c..559279c314 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -1,5 +1,5 @@ import type { Peer } from "@libp2p/interface"; -import { FilterCore } from "@waku/core"; +import { ConnectionManager, FilterCore } from "@waku/core"; import { type Callback, type ContentTopic, @@ -261,26 +261,34 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { public readonly protocol: FilterCore; private activeSubscriptions = new Map(); - private async handleIncomingMessage( - pubsubTopic: PubsubTopic, - wakuMessage: WakuMessage - ): Promise { - const subscription = this.getActiveSubscription(pubsubTopic); - if (!subscription) { - log.error(`No subscription locally registered for topic ${pubsubTopic}`); - return; - } - await subscription.processIncomingMessage(wakuMessage); - } + constructor( + connectionManager: ConnectionManager, + libp2p: Libp2p, + options?: ProtocolCreateOptions + ) { + super( + new FilterCore( + async (pubsubTopic: PubsubTopic, wakuMessage: WakuMessage) => { + const subscription = this.getActiveSubscription(pubsubTopic); + if (!subscription) { + log.error( + `No subscription locally registered for topic ${pubsubTopic}` + ); + return; + } - constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { - super({ numPeersToUse: options?.numPeersToUse }); - this.protocol = new FilterCore( - this.handleIncomingMessage.bind(this), - libp2p, - options + await subscription.processIncomingMessage(wakuMessage); + }, + libp2p, + options + ), + connectionManager, + { numPeersToUse: options?.numPeersToUse } ); + + this.protocol = this.core as FilterCore; + this.activeSubscriptions = new Map(); } @@ -430,9 +438,10 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { } export function wakuFilter( - init: ProtocolCreateOptions + connectionManager: ConnectionManager, + init?: ProtocolCreateOptions ): (libp2p: Libp2p) => IFilterSDK { - return (libp2p: Libp2p) => new FilterSDK(libp2p, init); + return (libp2p: Libp2p) => new FilterSDK(connectionManager, libp2p, init); } async function pushMessage( diff --git a/packages/sdk/src/protocols/light_push.ts b/packages/sdk/src/protocols/light_push.ts index 67e2117df2..ae52e06e49 100644 --- a/packages/sdk/src/protocols/light_push.ts +++ b/packages/sdk/src/protocols/light_push.ts @@ -1,5 +1,5 @@ import type { PeerId } from "@libp2p/interface"; -import { LightPushCore } from "@waku/core"; +import { ConnectionManager, LightPushCore } from "@waku/core"; import { Failure, type IEncoder, @@ -8,7 +8,8 @@ import { type Libp2p, type ProtocolCreateOptions, ProtocolError, - SDKProtocolResult + SDKProtocolResult, + SendOptions } from "@waku/interfaces"; import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils"; @@ -19,12 +20,28 @@ const log = new Logger("sdk:light-push"); class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { public readonly protocol: LightPushCore; - constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { - super({ numPeersToUse: options?.numPeersToUse }); - this.protocol = new LightPushCore(libp2p, options); + constructor( + connectionManager: ConnectionManager, + libp2p: Libp2p, + options?: ProtocolCreateOptions + ) { + super(new LightPushCore(libp2p, options), connectionManager, { + numPeersToUse: options?.numPeersToUse + }); + + this.protocol = this.core as LightPushCore; } - async send(encoder: IEncoder, message: IMessage): Promise { + async send( + encoder: IEncoder, + message: IMessage, + _options?: SendOptions + ): Promise { + const options = { + autoRetry: true, + ..._options + } as SendOptions; + const successes: PeerId[] = []; const failures: Failure[] = []; @@ -43,15 +60,19 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { }; } - const peers = await this.protocol.getPeers(); - if (!peers.length) { + const hasPeers = await this.hasPeers(options); + if (!hasPeers) { return { successes, - failures: [{ error: ProtocolError.NO_PEER_AVAILABLE }] + failures: [ + { + error: ProtocolError.NO_PEER_AVAILABLE + } + ] }; } - const sendPromises = peers.map((peer) => + const sendPromises = this.connectedPeers.map((peer) => this.protocol.send(encoder, message, peer) ); @@ -64,12 +85,15 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { successes.push(success); } if (failure) { + if (failure.peerId) { + await this.renewPeer(failure.peerId); + } + failures.push(failure); } } else { log.error("Failed to send message to peer", result.reason); failures.push({ error: ProtocolError.GENERIC_FAIL }); - // TODO: handle renewing faulty peers with new peers (https://github.com/waku-org/js-waku/issues/1463) } } @@ -81,7 +105,8 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { } export function wakuLightPush( + connectionManager: ConnectionManager, init: Partial = {} ): (libp2p: Libp2p) => ILightPushSDK { - return (libp2p: Libp2p) => new LightPushSDK(libp2p, init); + return (libp2p: Libp2p) => new LightPushSDK(connectionManager, libp2p, init); } diff --git a/packages/sdk/src/protocols/store.ts b/packages/sdk/src/protocols/store.ts index 774fe6a168..647667a949 100644 --- a/packages/sdk/src/protocols/store.ts +++ b/packages/sdk/src/protocols/store.ts @@ -1,5 +1,5 @@ import { sha256 } from "@noble/hashes/sha256"; -import { StoreCore, waku_store } from "@waku/core"; +import { ConnectionManager, StoreCore, waku_store } from "@waku/core"; import { Cursor, IDecodedMessage, @@ -25,11 +25,17 @@ const log = new Logger("waku:store:protocol"); export class StoreSDK extends BaseProtocolSDK implements IStoreSDK { public readonly protocol: StoreCore; - constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { + constructor( + connectionManager: ConnectionManager, + libp2p: Libp2p, + options?: ProtocolCreateOptions + ) { // TODO: options.numPeersToUse is disregarded: https://github.com/waku-org/js-waku/issues/1685 - super({ numPeersToUse: DEFAULT_NUM_PEERS }); + super(new StoreCore(libp2p, options), connectionManager, { + numPeersToUse: DEFAULT_NUM_PEERS + }); - this.protocol = new StoreCore(libp2p, options); + this.protocol = this.core as StoreCore; } /** @@ -67,7 +73,7 @@ export class StoreSDK extends BaseProtocolSDK implements IStoreSDK { const peer = ( await this.protocol.getPeers({ - numPeers: this.numPeers, + numPeers: this.numPeersToUse, maxBootstrapPeers: 1 }) )[0]; @@ -315,7 +321,8 @@ export class StoreSDK extends BaseProtocolSDK implements IStoreSDK { } export function wakuStore( + connectionManager: ConnectionManager, init: Partial = {} ): (libp2p: Libp2p) => IStoreSDK { - return (libp2p: Libp2p) => new StoreSDK(libp2p, init); + return (libp2p: Libp2p) => new StoreSDK(connectionManager, libp2p, init); } diff --git a/packages/sdk/src/relay-node/index.ts b/packages/sdk/src/relay-node/index.ts index 2ae2f6b402..3ef337c460 100644 --- a/packages/sdk/src/relay-node/index.ts +++ b/packages/sdk/src/relay-node/index.ts @@ -1,9 +1,6 @@ import { type FullNode, type RelayNode } from "@waku/interfaces"; -import { RelayCreateOptions, wakuRelay } from "@waku/relay"; +import { RelayCreateOptions } from "@waku/relay"; -import { wakuFilter } from "../protocols/filter.js"; -import { wakuLightPush } from "../protocols/light_push.js"; -import { wakuStore } from "../protocols/store.js"; import { createLibp2pAndUpdateOptions } from "../utils/libp2p.js"; import { CreateWakuNodeOptions, WakuNode, WakuOptions } from "../waku.js"; @@ -24,16 +21,9 @@ export async function createRelayNode( ): Promise { const libp2p = await createLibp2pAndUpdateOptions(options); - const relay = wakuRelay(options?.pubsubTopics || []); - - return new WakuNode( - options as WakuOptions, - libp2p, - undefined, - undefined, - undefined, - relay - ) as RelayNode; + return new WakuNode(options as WakuOptions, libp2p, { + relay: true + }) as RelayNode; } /** @@ -56,17 +46,10 @@ export async function createFullNode( ): Promise { const libp2p = await createLibp2pAndUpdateOptions(options); - const store = wakuStore(options); - const lightPush = wakuLightPush(options); - const filter = wakuFilter(options); - const relay = wakuRelay(options?.pubsubTopics || []); - - return new WakuNode( - options as WakuOptions, - libp2p, - store, - lightPush, - filter, - relay - ) as FullNode; + return new WakuNode(options as WakuOptions, libp2p, { + filter: true, + lightpush: true, + relay: true, + store: true + }) as FullNode; } diff --git a/packages/sdk/src/waku.ts b/packages/sdk/src/waku.ts index f11dd06ec7..6044ea7794 100644 --- a/packages/sdk/src/waku.ts +++ b/packages/sdk/src/waku.ts @@ -16,8 +16,12 @@ import type { Waku } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; +import { wakuRelay } from "@waku/relay"; import { Logger } from "@waku/utils"; +import { wakuFilter } from "./protocols/filter.js"; +import { wakuLightPush } from "./protocols/light_push.js"; +import { wakuStore } from "./protocols/store.js"; import { subscribeToContentTopic } from "./utils/content_topic.js"; export const DefaultPingKeepAliveValueSecs = 5 * 60; @@ -53,6 +57,13 @@ export interface WakuOptions { export type CreateWakuNodeOptions = ProtocolCreateOptions & Partial; +type ProtocolsEnabled = { + filter?: boolean; + lightpush?: boolean; + store?: boolean; + relay?: boolean; +}; + export class WakuNode implements Waku { public libp2p: Libp2p; public relay?: IRelay; @@ -65,10 +76,7 @@ export class WakuNode implements Waku { constructor( options: WakuOptions, libp2p: Libp2p, - store?: (libp2p: Libp2p) => IStoreSDK, - lightPush?: (libp2p: Libp2p) => ILightPushSDK, - filter?: (libp2p: Libp2p) => IFilterSDK, - relay?: (libp2p: Libp2p) => IRelay + protocolsEnabled: ProtocolsEnabled ) { if (options.pubsubTopics.length == 0) { throw new Error("At least one pubsub topic must be provided"); @@ -77,19 +85,13 @@ export class WakuNode implements Waku { this.libp2p = libp2p; - if (store) { - this.store = store(libp2p); - } - if (filter) { - this.filter = filter(libp2p); - } - if (lightPush) { - this.lightPush = lightPush(libp2p); - } - - if (relay) { - this.relay = relay(libp2p); - } + protocolsEnabled = { + filter: false, + lightpush: false, + store: false, + relay: false, + ...protocolsEnabled + }; const pingKeepAlive = options.pingKeepAlive || DefaultPingKeepAliveValueSecs; @@ -107,6 +109,26 @@ export class WakuNode implements Waku { this.relay ); + if (protocolsEnabled.store) { + const store = wakuStore(this.connectionManager, options); + this.store = store(libp2p); + } + + if (protocolsEnabled.lightpush) { + const lightPush = wakuLightPush(this.connectionManager, options); + this.lightPush = lightPush(libp2p); + } + + if (protocolsEnabled.filter) { + const filter = wakuFilter(this.connectionManager, options); + this.filter = filter(libp2p); + } + + if (protocolsEnabled.relay) { + const relay = wakuRelay(this.pubsubTopics); + this.relay = relay(libp2p); + } + log.info( "Waku node created", peerId, diff --git a/packages/tests/tests/filter/utils.ts b/packages/tests/tests/filter/utils.ts index 86b7ae7ea4..093d2e2ef0 100644 --- a/packages/tests/tests/filter/utils.ts +++ b/packages/tests/tests/filter/utils.ts @@ -1,5 +1,6 @@ import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; import { + DefaultPubsubTopic, ISubscriptionSDK, LightNode, ProtocolCreateOptions, @@ -68,12 +69,14 @@ export async function validatePingError( export async function runMultipleNodes( context: Context, - shardInfo: ShardingParams, + shardInfo?: ShardingParams, strictChecking: boolean = false, numServiceNodes = 3, withoutFilter = false ): Promise<[ServiceNodesFleet, LightNode]> { - const pubsubTopics = shardInfoToPubsubTopics(shardInfo); + const pubsubTopics = shardInfo + ? shardInfoToPubsubTopics(shardInfo) + : [DefaultPubsubTopic]; // create numServiceNodes nodes const serviceNodes = await ServiceNodesFleet.createAndRun( context, diff --git a/packages/tests/tests/light-push/peer_management.spec.ts b/packages/tests/tests/light-push/peer_management.spec.ts new file mode 100644 index 0000000000..9323495366 --- /dev/null +++ b/packages/tests/tests/light-push/peer_management.spec.ts @@ -0,0 +1,93 @@ +import { DefaultPubsubTopic, LightNode } from "@waku/interfaces"; +import { createEncoder, utf8ToBytes } from "@waku/sdk"; +import { expect } from "chai"; +import { describe } from "mocha"; + +import { + afterEachCustom, + beforeEachCustom, + ServiceNodesFleet +} from "../../src/index.js"; +import { + runMultipleNodes, + teardownNodesWithRedundancy +} from "../filter/utils.js"; + +describe("Waku Light Push: Peer Management: E2E", function () { + this.timeout(15000); + let waku: LightNode; + let serviceNodes: ServiceNodesFleet; + + beforeEachCustom(this, async () => { + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + undefined, + undefined, + 5 + ); + }); + + afterEachCustom(this, async () => { + await teardownNodesWithRedundancy(serviceNodes, waku); + }); + + const encoder = createEncoder({ + pubsubTopic: DefaultPubsubTopic, + contentTopic: "/test" + }); + + it("Number of peers are maintained correctly", async function () { + const { successes, failures } = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello_World") + }); + + expect(successes.length).to.be.greaterThan(0); + expect(successes.length).to.be.equal(waku.lightPush.numPeersToUse); + + if (failures) { + expect(failures.length).to.equal(0); + } + }); + + it("Failed peers are renewed", async function () { + // send a lightpush request -- should have all successes + const response1 = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello_World") + }); + + expect(response1.successes.length).to.be.equal( + waku.lightPush.numPeersToUse + ); + if (response1.failures) { + expect(response1.failures.length).to.equal(0); + } + + // disconnect from one peer to force a failure + const peerToDisconnect = response1.successes[0]; + await waku.connectionManager.dropConnection(peerToDisconnect); + + // send another lightpush request -- should have all successes except the one that was disconnected + const response2 = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello_World") + }); + + // check that the peer that was disconnected is not in the new successes + expect(response2.successes).to.not.include(peerToDisconnect); + expect(response2.failures).to.have.length(1); + expect(response2.failures?.[0].peerId).to.equal(peerToDisconnect); + + // send another lightpush request -- renewal should have triggerred and new peer should be used instead of the disconnected one + const response3 = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello_World") + }); + + expect(response3.successes.length).to.be.equal( + waku.lightPush.numPeersToUse + ); + + expect(response3.successes).to.not.include(peerToDisconnect); + if (response3.failures) { + expect(response3.failures.length).to.equal(0); + } + }); +}); diff --git a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts index be2b6fdec8..b971f9b12c 100644 --- a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts @@ -52,12 +52,12 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { contentTopic: customContentTopic2 }); - let nimPeerId: PeerId; + let node1PeerId: PeerId; beforeEachCustom(this, async () => { [nwaku, waku] = await runNodes(this.ctx, shardInfo); messageCollector = new MessageCollector(nwaku); - nimPeerId = await nwaku.getPeerId(); + node1PeerId = await nwaku.getPeerId(); }); afterEachCustom(this, async () => { @@ -69,7 +69,7 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { payload: utf8ToBytes(messageText) }); - expect(pushResponse.successes[0].toString()).to.eq(nimPeerId.toString()); + expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString()); expect( await messageCollector.waitForMessages(1, { @@ -89,8 +89,8 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { const pushResponse2 = await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); - expect(pushResponse1.successes[0].toString()).to.eq(nimPeerId.toString()); - expect(pushResponse2.successes[0].toString()).to.eq(nimPeerId.toString()); + expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString()); + expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString()); const messageCollector2 = new MessageCollector(nwaku); @@ -195,12 +195,12 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { pubsubTopicShardInfo: pubsubTopicToSingleShardInfo(autoshardingPubsubTopic2) }); - let nimPeerId: PeerId; + let node1PeerId: PeerId; beforeEachCustom(this, async () => { [nwaku, waku] = await runNodes(this.ctx, shardInfo); messageCollector = new MessageCollector(nwaku); - nimPeerId = await nwaku.getPeerId(); + node1PeerId = await nwaku.getPeerId(); }); afterEachCustom(this, async () => { @@ -213,7 +213,7 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { }); expect(pushResponse.failures).to.be.empty; - expect(pushResponse.successes[0].toString()).to.eq(nimPeerId.toString()); + expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString()); expect( await messageCollector.waitForMessagesAutosharding(1, { @@ -233,8 +233,8 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { const pushResponse2 = await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); - expect(pushResponse1.successes[0].toString()).to.eq(nimPeerId.toString()); - expect(pushResponse2.successes[0].toString()).to.eq(nimPeerId.toString()); + expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString()); + expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString()); const messageCollector2 = new MessageCollector(nwaku); @@ -352,13 +352,13 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () ] }; - let nimPeerId: PeerId; + let node1PeerId: PeerId; beforeEachCustom(this, async () => { ctx = this.ctx; [nwaku, waku] = await runNodes(ctx, testShardInfo); messageCollector = new MessageCollector(nwaku); - nimPeerId = await nwaku.getPeerId(); + node1PeerId = await nwaku.getPeerId(); }); afterEachCustom(this, async () => { @@ -370,7 +370,7 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () payload: utf8ToBytes(messageText) }); - expect(pushResponse.successes[0].toString()).to.eq(nimPeerId.toString()); + expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString()); expect( await messageCollector.waitForMessages(1, { @@ -390,8 +390,8 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () const pushResponse2 = await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); - expect(pushResponse1.successes[0].toString()).to.eq(nimPeerId.toString()); - expect(pushResponse2.successes[0].toString()).to.eq(nimPeerId.toString()); + expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString()); + expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString()); const messageCollector2 = new MessageCollector(nwaku); @@ -421,19 +421,27 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () { // Set up and start a new nwaku node with Default PubsubTopic - [nwaku2, waku2] = await runNodes(ctx, shardInfo2); + [nwaku2] = await runNodes(ctx, shardInfo2); + await nwaku2.ensureSubscriptions([autoshardingPubsubTopic2]); await waku.dial(await nwaku2.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.LightPush]); const messageCollector2 = new MessageCollector(nwaku2); - await waku.lightPush.send(customEncoder1, { + const { failures: f1 } = await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); - await waku.lightPush.send(customEncoder2, { - payload: utf8ToBytes("M2") - }); + const { failures: f2 } = await waku.lightPush.send( + customEncoder2, + { + payload: utf8ToBytes("M2") + }, + { forceUseAllPeers: true } + ); + + expect(f1).to.be.empty; + expect(f2).to.be.empty; await messageCollector.waitForMessages(1, { pubsubTopic: autoshardingPubsubTopic1 diff --git a/packages/tests/tests/sdk/content_topic.spec.ts b/packages/tests/tests/sdk/content_topic.spec.ts index fe28ed3d8f..a65804fcad 100644 --- a/packages/tests/tests/sdk/content_topic.spec.ts +++ b/packages/tests/tests/sdk/content_topic.spec.ts @@ -1,4 +1,3 @@ -import { wakuFilter } from "@waku/sdk"; import { bytesToUtf8, createEncoder, @@ -123,9 +122,9 @@ describe.skip("SDK: Creating by Content Topic", function () { pubsubTopics: shardInfo.pubsubTopics }, await defaultLibp2p(shardInfo.shardInfo, undefined, {}, undefined), - undefined, - undefined, - wakuFilter({ pubsubTopics: shardInfo.pubsubTopics }) + { + filter: true + } ); await wakuContentTopic.subscribeToContentTopic( ContentTopic, diff --git a/packages/tests/tests/store/index.node.spec.ts b/packages/tests/tests/store/index.node.spec.ts index 595c0abc2d..05a981e8bd 100644 --- a/packages/tests/tests/store/index.node.spec.ts +++ b/packages/tests/tests/store/index.node.spec.ts @@ -304,10 +304,13 @@ describe("Waku Store, general", function () { for await (const msg of query) { if (msg) { messages.push(msg as DecodedMessage); + console.log(bytesToUtf8(msg.payload!)); } } } + console.log(messages.length); + // Messages are ordered from oldest to latest within a page (1 page query) expect(bytesToUtf8(messages[0].payload!)).to.eq(asymText); expect(bytesToUtf8(messages[1].payload!)).to.eq(symText); diff --git a/packages/utils/src/common/delay.ts b/packages/utils/src/common/delay.ts new file mode 100644 index 0000000000..6069f04bf7 --- /dev/null +++ b/packages/utils/src/common/delay.ts @@ -0,0 +1,3 @@ +export async function delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/packages/utils/src/common/index.ts b/packages/utils/src/common/index.ts index 906bfb948d..025a06f03c 100644 --- a/packages/utils/src/common/index.ts +++ b/packages/utils/src/common/index.ts @@ -7,6 +7,7 @@ export * from "./is_size_valid.js"; export * from "./sharding.js"; export * from "./push_or_init_map.js"; export * from "./relay_shard_codec.js"; +export * from "./delay.js"; export function removeItemFromArray(arr: unknown[], value: unknown): unknown[] { const index = arr.indexOf(value);