From 415339601476925874904b19be43f6e055a45004 Mon Sep 17 00:00:00 2001 From: Danish Arora <35004822+danisharora099@users.noreply.github.com> Date: Tue, 28 Jan 2025 17:57:49 +0530 Subject: [PATCH] feat(store): allow specifying node to use (#2192) * feat: API allows using specific nodes for protocol * chore: dial provided now * chore: store uses the node * chore: update API * chore: use protocol-level class for getting peer * chore: use ConnectionManager for dial ops * chore: address comments * chore: fix type error * chore: Waku.dial() proxies through ConnectionManager.dialPeer * chore: fix dial * chore: add ts-doc for dialPeer() * chore: remove log * chore: reduce ts-doc for an internal function * chore: address comments * chore: return types from connmanager.dialpeer() * chore: reduce diff by not introducing breaking changes --- packages/core/src/lib/connection_manager.ts | 134 +++++++++++++++++--- packages/interfaces/src/protocols.ts | 8 ++ packages/sdk/src/protocols/store/index.ts | 43 +++++-- packages/sdk/src/waku/waku.ts | 22 +++- 4 files changed, 170 insertions(+), 37 deletions(-) diff --git a/packages/core/src/lib/connection_manager.ts b/packages/core/src/lib/connection_manager.ts index 64bd35434f..ba48a0cdc7 100644 --- a/packages/core/src/lib/connection_manager.ts +++ b/packages/core/src/lib/connection_manager.ts @@ -1,5 +1,14 @@ -import type { Peer, PeerId, PeerInfo, PeerStore } from "@libp2p/interface"; -import { TypedEventEmitter } from "@libp2p/interface"; +import { + type Connection, + isPeerId, + type Peer, + type PeerId, + type PeerInfo, + type PeerStore, + type Stream, + TypedEventEmitter +} from "@libp2p/interface"; +import { Multiaddr, multiaddr, MultiaddrInput } from "@multiformats/multiaddr"; import { ConnectionManagerOptions, DiscoveryTrigger, @@ -219,15 +228,60 @@ export class ConnectionManager this.startNetworkStatusListener(); } - private async dialPeer(peerId: PeerId): Promise { + /** + * Attempts to establish a connection with a peer and set up specified protocols. + * The method handles both PeerId and Multiaddr inputs, manages connection attempts, + * and maintains the connection state. + * + * The dialing process includes: + * 1. Converting input to dialable peer info + * 2. Managing parallel dial attempts + * 3. Attempting to establish protocol-specific connections + * 4. Handling connection failures and retries + * 5. Updating the peer store and connection state + * + * @param {PeerId | MultiaddrInput} peer - The peer to connect to, either as a PeerId or multiaddr + * @param {string[]} [protocolCodecs] - Optional array of protocol-specific codec strings to establish + * (e.g., for LightPush, Filter, Store protocols) + * + * @throws {Error} If the multiaddr is missing a peer ID + * @throws {Error} If the maximum dial attempts are reached and the peer cannot be dialed + * @throws {Error} If there's an error deleting an undialable peer from the peer store + * + * @example + * ```typescript + * // Dial using PeerId + * await connectionManager.dialPeer(peerId); + * + * // Dial using multiaddr with specific protocols + * await connectionManager.dialPeer(multiaddr, [ + * "/vac/waku/relay/2.0.0", + * "/vac/waku/lightpush/2.0.0-beta1" + * ]); + * ``` + * + * @remarks + * - The method implements exponential backoff through multiple dial attempts + * - Maintains a queue for parallel dial attempts (limited by maxParallelDials) + * - Integrates with the KeepAliveManager for connection maintenance + * - Updates the peer store and connection state after successful/failed attempts + * - If all dial attempts fail, triggers DNS discovery as a fallback + */ + public async dialPeer(peer: PeerId | MultiaddrInput): Promise { + let connection: Connection | undefined; + let peerId: PeerId | undefined; + const peerDialInfo = this.getDialablePeerInfo(peer); + const peerIdStr = isPeerId(peerDialInfo) + ? peerDialInfo.toString() + : peerDialInfo.getPeerId()!; + this.currentActiveParallelDialCount += 1; let dialAttempt = 0; while (dialAttempt < this.options.maxDialAttemptsForPeer) { try { - log.info( - `Dialing peer ${peerId.toString()} on attempt ${dialAttempt + 1}` - ); - await this.libp2p.dial(peerId); + log.info(`Dialing peer ${peerDialInfo} on attempt ${dialAttempt + 1}`); + connection = await this.libp2p.dial(peerDialInfo); + peerId = connection.remotePeer; const tags = await this.getTagNamesForPeer(peerId); // add tag to connection describing discovery mechanism @@ -246,21 +300,17 @@ export class ConnectionManager } catch (error) { if (error instanceof AggregateError) { // Handle AggregateError - log.error( - `Error dialing peer ${peerId.toString()} - ${error.errors}` - ); + log.error(`Error dialing peer ${peerIdStr} - ${error.errors}`); } else { // Handle generic error log.error( - `Error dialing peer ${peerId.toString()} - ${ - (error as any).message - }` + `Error dialing peer ${peerIdStr} - ${(error as any).message}` ); } - this.dialErrorsForPeer.set(peerId.toString(), error); + this.dialErrorsForPeer.set(peerIdStr, error); dialAttempt++; - this.dialAttemptsForPeer.set(peerId.toString(), dialAttempt); + this.dialAttemptsForPeer.set(peerIdStr, dialAttempt); } } @@ -271,7 +321,7 @@ export class ConnectionManager // If max dial attempts reached and dialing failed, delete the peer if (dialAttempt === this.options.maxDialAttemptsForPeer) { try { - const error = this.dialErrorsForPeer.get(peerId.toString()); + const error = this.dialErrorsForPeer.get(peerIdStr); if (error) { let errorMessage; @@ -288,21 +338,65 @@ export class ConnectionManager } log.info( - `Deleting undialable peer ${peerId.toString()} from peer store. Reason: ${errorMessage}` + `Deleting undialable peer ${peerIdStr} from peer store. Reason: ${errorMessage}` ); } - this.dialErrorsForPeer.delete(peerId.toString()); - await this.libp2p.peerStore.delete(peerId); + this.dialErrorsForPeer.delete(peerIdStr); + if (peerId) { + await this.libp2p.peerStore.delete(peerId); + } // if it was last available peer - attempt DNS discovery await this.attemptDnsDiscovery(); } catch (error) { throw new Error( - `Error deleting undialable peer ${peerId.toString()} from peer store - ${error}` + `Error deleting undialable peer ${peerIdStr} from peer store - ${error}` ); } } + + if (!connection) { + throw new Error(`Failed to dial peer ${peerDialInfo}`); + } + + return connection; + } + + /** + * Dial a peer with specific protocols. + * This method is a raw proxy to the libp2p dialProtocol method. + * @param peer - The peer to connect to, either as a PeerId or multiaddr + * @param protocolCodecs - Optional array of protocol-specific codec strings to establish + * @returns A stream to the peer + */ + public async rawDialPeerWithProtocols( + peer: PeerId | MultiaddrInput, + protocolCodecs: string[] + ): Promise { + const peerDialInfo = this.getDialablePeerInfo(peer); + return await this.libp2p.dialProtocol(peerDialInfo, protocolCodecs); + } + + /** + * Internal utility to extract a PeerId or Multiaddr from a peer input. + * This is used internally by the connection manager to handle different peer input formats. + * @internal + */ + private getDialablePeerInfo( + peer: PeerId | MultiaddrInput + ): PeerId | Multiaddr { + if (isPeerId(peer)) { + return peer; + } else { + // peer is of MultiaddrInput type + const ma = multiaddr(peer); + const peerIdStr = ma.getPeerId(); + if (!peerIdStr) { + throw new Error("Failed to dial multiaddr: missing peer ID"); + } + return ma; + } } private async attemptDnsDiscovery(): Promise { diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 74462efa13..9cec07405f 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -28,6 +28,10 @@ export type IBaseProtocolSDK = { readonly numPeersToUse: number; }; +export type StoreProtocolOptions = { + peer: string; +}; + export type NetworkConfig = StaticSharding | AutoSharding; //TODO: merge this with ProtocolCreateOptions or establish distinction: https://github.com/waku-org/js-waku/issues/2048 @@ -106,6 +110,10 @@ export type ProtocolCreateOptions = { * List of peers to use to bootstrap the node. Ignored if defaultBootstrap is set to true. */ bootstrapPeers?: string[]; + /** + * Options for the Store protocol. + */ + store?: Partial; }; export type Callback = ( diff --git a/packages/sdk/src/protocols/store/index.ts b/packages/sdk/src/protocols/store/index.ts index 01ef45d58c..bfc308f771 100644 --- a/packages/sdk/src/protocols/store/index.ts +++ b/packages/sdk/src/protocols/store/index.ts @@ -1,3 +1,4 @@ +import type { Peer } from "@libp2p/interface"; import { ConnectionManager, StoreCore } from "@waku/core"; import { IDecodedMessage, @@ -5,7 +6,8 @@ import { IStore, Libp2p, QueryRequestParams, - StoreCursor + StoreCursor, + StoreProtocolOptions } from "@waku/interfaces"; import { messageHash } from "@waku/message-hash"; import { ensurePubsubTopicIsConfigured, isDefined, Logger } from "@waku/utils"; @@ -23,7 +25,11 @@ const log = new Logger("waku:store:sdk"); export class Store extends BaseProtocolSDK implements IStore { public readonly protocol: StoreCore; - public constructor(connectionManager: ConnectionManager, libp2p: Libp2p) { + public constructor( + connectionManager: ConnectionManager, + libp2p: Libp2p, + private options?: Partial + ) { super( new StoreCore(connectionManager.configuredPubsubTopics, libp2p), connectionManager, @@ -58,12 +64,8 @@ export class Store extends BaseProtocolSDK implements IStore { ...options }; - const peer = ( - await this.protocol.getPeers({ - numPeers: this.numPeersToUse, - maxBootstrapPeers: 1 - }) - )[0]; + const peer = await this.getPeerToUse(); + if (!peer) { log.error("No peers available to query"); throw new Error("No peers available to query"); @@ -228,6 +230,26 @@ export class Store extends BaseProtocolSDK implements IStore { decodersAsMap }; } + + private async getPeerToUse(): Promise { + const peer = this.connectedPeers.find( + (p) => p.id.toString() === this.options?.peer + ); + if (peer) { + return peer; + } + + log.warn( + `Passed node to use for Store not found: ${this.options?.peer}. Attempting to use random peers.` + ); + + return ( + await this.protocol.getPeers({ + numPeers: this.numPeersToUse, + maxBootstrapPeers: 1 + }) + )[0]; + } } /** @@ -237,9 +259,10 @@ export class Store extends BaseProtocolSDK implements IStore { * @returns A function that takes a Libp2p instance and returns a StoreSDK instance. */ export function wakuStore( - connectionManager: ConnectionManager + connectionManager: ConnectionManager, + options?: Partial ): (libp2p: Libp2p) => IStore { return (libp2p: Libp2p) => { - return new Store(connectionManager, libp2p); + return new Store(connectionManager, libp2p, options); }; } diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index eea596d7a9..0889b038d9 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -1,7 +1,6 @@ -import type { Stream } from "@libp2p/interface"; -import { isPeerId, PeerId } from "@libp2p/interface"; +import { isPeerId, PeerId, type Stream } from "@libp2p/interface"; import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr"; -import { ConnectionManager, getHealthManager } from "@waku/core"; +import { ConnectionManager, getHealthManager, StoreCodec } from "@waku/core"; import type { IFilter, IHealthManager, @@ -106,7 +105,17 @@ export class WakuNode implements IWaku { this.health = getHealthManager(); if (protocolsEnabled.store) { - const store = wakuStore(this.connectionManager); + if (options.store?.peer) { + this.connectionManager + .rawDialPeerWithProtocols(options.store.peer, [StoreCodec]) + .catch((e) => { + log.error("Failed to dial store peer", e); + }); + } + + const store = wakuStore(this.connectionManager, { + peer: options.store?.peer + }); this.store = store(libp2p); } @@ -145,7 +154,6 @@ export class WakuNode implements IWaku { protocols?: Protocols[] ): Promise { const _protocols = protocols ?? []; - const peerId = this.mapToPeerIdOrMultiaddr(peer); if (typeof protocols === "undefined") { this.relay && _protocols.push(Protocols.Relay); @@ -194,9 +202,9 @@ export class WakuNode implements IWaku { } } + const peerId = this.mapToPeerIdOrMultiaddr(peer); log.info(`Dialing to ${peerId.toString()} with protocols ${_protocols}`); - - return this.libp2p.dialProtocol(peerId, codecs); + return await this.connectionManager.rawDialPeerWithProtocols(peer, codecs); } public async start(): Promise {