feat(store): allow specifying node to use (#2192)

* feat: API allows using specific nodes for protocol

* chore: dial provided now

* chore: store uses the node

* chore: update API

* chore: use protocol-level class for getting peer

* chore: use ConnectionManager for dial ops

* chore: address comments

* chore: fix type error

* chore: Waku.dial() proxies through ConnectionManager.dialPeer

* chore: fix dial

* chore: add ts-doc for dialPeer()

* chore: remove log

* chore: reduce ts-doc for an internal function

* chore: address comments

* chore: return types from connmanager.dialpeer()

* chore: reduce diff by not introducing breaking changes
This commit is contained in:
Danish Arora 2025-01-28 17:57:49 +05:30 committed by GitHub
parent a73dd4f083
commit 4153396014
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 170 additions and 37 deletions

View File

@ -1,5 +1,14 @@
import type { Peer, PeerId, PeerInfo, PeerStore } from "@libp2p/interface";
import { TypedEventEmitter } from "@libp2p/interface";
import {
type Connection,
isPeerId,
type Peer,
type PeerId,
type PeerInfo,
type PeerStore,
type Stream,
TypedEventEmitter
} from "@libp2p/interface";
import { Multiaddr, multiaddr, MultiaddrInput } from "@multiformats/multiaddr";
import {
ConnectionManagerOptions,
DiscoveryTrigger,
@ -219,15 +228,60 @@ export class ConnectionManager
this.startNetworkStatusListener();
}
private async dialPeer(peerId: PeerId): Promise<void> {
/**
* Attempts to establish a connection with a peer and set up specified protocols.
* The method handles both PeerId and Multiaddr inputs, manages connection attempts,
* and maintains the connection state.
*
* The dialing process includes:
* 1. Converting input to dialable peer info
* 2. Managing parallel dial attempts
* 3. Attempting to establish protocol-specific connections
* 4. Handling connection failures and retries
* 5. Updating the peer store and connection state
*
* @param {PeerId | MultiaddrInput} peer - The peer to connect to, either as a PeerId or multiaddr
* @param {string[]} [protocolCodecs] - Optional array of protocol-specific codec strings to establish
* (e.g., for LightPush, Filter, Store protocols)
*
* @throws {Error} If the multiaddr is missing a peer ID
* @throws {Error} If the maximum dial attempts are reached and the peer cannot be dialed
* @throws {Error} If there's an error deleting an undialable peer from the peer store
*
* @example
* ```typescript
* // Dial using PeerId
* await connectionManager.dialPeer(peerId);
*
* // Dial using multiaddr with specific protocols
* await connectionManager.dialPeer(multiaddr, [
* "/vac/waku/relay/2.0.0",
* "/vac/waku/lightpush/2.0.0-beta1"
* ]);
* ```
*
* @remarks
* - The method implements exponential backoff through multiple dial attempts
* - Maintains a queue for parallel dial attempts (limited by maxParallelDials)
* - Integrates with the KeepAliveManager for connection maintenance
* - Updates the peer store and connection state after successful/failed attempts
* - If all dial attempts fail, triggers DNS discovery as a fallback
*/
public async dialPeer(peer: PeerId | MultiaddrInput): Promise<Connection> {
let connection: Connection | undefined;
let peerId: PeerId | undefined;
const peerDialInfo = this.getDialablePeerInfo(peer);
const peerIdStr = isPeerId(peerDialInfo)
? peerDialInfo.toString()
: peerDialInfo.getPeerId()!;
this.currentActiveParallelDialCount += 1;
let dialAttempt = 0;
while (dialAttempt < this.options.maxDialAttemptsForPeer) {
try {
log.info(
`Dialing peer ${peerId.toString()} on attempt ${dialAttempt + 1}`
);
await this.libp2p.dial(peerId);
log.info(`Dialing peer ${peerDialInfo} on attempt ${dialAttempt + 1}`);
connection = await this.libp2p.dial(peerDialInfo);
peerId = connection.remotePeer;
const tags = await this.getTagNamesForPeer(peerId);
// add tag to connection describing discovery mechanism
@ -246,21 +300,17 @@ export class ConnectionManager
} catch (error) {
if (error instanceof AggregateError) {
// Handle AggregateError
log.error(
`Error dialing peer ${peerId.toString()} - ${error.errors}`
);
log.error(`Error dialing peer ${peerIdStr} - ${error.errors}`);
} else {
// Handle generic error
log.error(
`Error dialing peer ${peerId.toString()} - ${
(error as any).message
}`
`Error dialing peer ${peerIdStr} - ${(error as any).message}`
);
}
this.dialErrorsForPeer.set(peerId.toString(), error);
this.dialErrorsForPeer.set(peerIdStr, error);
dialAttempt++;
this.dialAttemptsForPeer.set(peerId.toString(), dialAttempt);
this.dialAttemptsForPeer.set(peerIdStr, dialAttempt);
}
}
@ -271,7 +321,7 @@ export class ConnectionManager
// If max dial attempts reached and dialing failed, delete the peer
if (dialAttempt === this.options.maxDialAttemptsForPeer) {
try {
const error = this.dialErrorsForPeer.get(peerId.toString());
const error = this.dialErrorsForPeer.get(peerIdStr);
if (error) {
let errorMessage;
@ -288,21 +338,65 @@ export class ConnectionManager
}
log.info(
`Deleting undialable peer ${peerId.toString()} from peer store. Reason: ${errorMessage}`
`Deleting undialable peer ${peerIdStr} from peer store. Reason: ${errorMessage}`
);
}
this.dialErrorsForPeer.delete(peerId.toString());
await this.libp2p.peerStore.delete(peerId);
this.dialErrorsForPeer.delete(peerIdStr);
if (peerId) {
await this.libp2p.peerStore.delete(peerId);
}
// if it was last available peer - attempt DNS discovery
await this.attemptDnsDiscovery();
} catch (error) {
throw new Error(
`Error deleting undialable peer ${peerId.toString()} from peer store - ${error}`
`Error deleting undialable peer ${peerIdStr} from peer store - ${error}`
);
}
}
if (!connection) {
throw new Error(`Failed to dial peer ${peerDialInfo}`);
}
return connection;
}
/**
* Dial a peer with specific protocols.
* This method is a raw proxy to the libp2p dialProtocol method.
* @param peer - The peer to connect to, either as a PeerId or multiaddr
* @param protocolCodecs - Optional array of protocol-specific codec strings to establish
* @returns A stream to the peer
*/
public async rawDialPeerWithProtocols(
peer: PeerId | MultiaddrInput,
protocolCodecs: string[]
): Promise<Stream> {
const peerDialInfo = this.getDialablePeerInfo(peer);
return await this.libp2p.dialProtocol(peerDialInfo, protocolCodecs);
}
/**
* Internal utility to extract a PeerId or Multiaddr from a peer input.
* This is used internally by the connection manager to handle different peer input formats.
* @internal
*/
private getDialablePeerInfo(
peer: PeerId | MultiaddrInput
): PeerId | Multiaddr {
if (isPeerId(peer)) {
return peer;
} else {
// peer is of MultiaddrInput type
const ma = multiaddr(peer);
const peerIdStr = ma.getPeerId();
if (!peerIdStr) {
throw new Error("Failed to dial multiaddr: missing peer ID");
}
return ma;
}
}
private async attemptDnsDiscovery(): Promise<void> {

View File

@ -28,6 +28,10 @@ export type IBaseProtocolSDK = {
readonly numPeersToUse: number;
};
export type StoreProtocolOptions = {
peer: string;
};
export type NetworkConfig = StaticSharding | AutoSharding;
//TODO: merge this with ProtocolCreateOptions or establish distinction: https://github.com/waku-org/js-waku/issues/2048
@ -106,6 +110,10 @@ export type ProtocolCreateOptions = {
* List of peers to use to bootstrap the node. Ignored if defaultBootstrap is set to true.
*/
bootstrapPeers?: string[];
/**
* Options for the Store protocol.
*/
store?: Partial<StoreProtocolOptions>;
};
export type Callback<T extends IDecodedMessage> = (

View File

@ -1,3 +1,4 @@
import type { Peer } from "@libp2p/interface";
import { ConnectionManager, StoreCore } from "@waku/core";
import {
IDecodedMessage,
@ -5,7 +6,8 @@ import {
IStore,
Libp2p,
QueryRequestParams,
StoreCursor
StoreCursor,
StoreProtocolOptions
} from "@waku/interfaces";
import { messageHash } from "@waku/message-hash";
import { ensurePubsubTopicIsConfigured, isDefined, Logger } from "@waku/utils";
@ -23,7 +25,11 @@ const log = new Logger("waku:store:sdk");
export class Store extends BaseProtocolSDK implements IStore {
public readonly protocol: StoreCore;
public constructor(connectionManager: ConnectionManager, libp2p: Libp2p) {
public constructor(
connectionManager: ConnectionManager,
libp2p: Libp2p,
private options?: Partial<StoreProtocolOptions>
) {
super(
new StoreCore(connectionManager.configuredPubsubTopics, libp2p),
connectionManager,
@ -58,12 +64,8 @@ export class Store extends BaseProtocolSDK implements IStore {
...options
};
const peer = (
await this.protocol.getPeers({
numPeers: this.numPeersToUse,
maxBootstrapPeers: 1
})
)[0];
const peer = await this.getPeerToUse();
if (!peer) {
log.error("No peers available to query");
throw new Error("No peers available to query");
@ -228,6 +230,26 @@ export class Store extends BaseProtocolSDK implements IStore {
decodersAsMap
};
}
private async getPeerToUse(): Promise<Peer | null> {
const peer = this.connectedPeers.find(
(p) => p.id.toString() === this.options?.peer
);
if (peer) {
return peer;
}
log.warn(
`Passed node to use for Store not found: ${this.options?.peer}. Attempting to use random peers.`
);
return (
await this.protocol.getPeers({
numPeers: this.numPeersToUse,
maxBootstrapPeers: 1
})
)[0];
}
}
/**
@ -237,9 +259,10 @@ export class Store extends BaseProtocolSDK implements IStore {
* @returns A function that takes a Libp2p instance and returns a StoreSDK instance.
*/
export function wakuStore(
connectionManager: ConnectionManager
connectionManager: ConnectionManager,
options?: Partial<StoreProtocolOptions>
): (libp2p: Libp2p) => IStore {
return (libp2p: Libp2p) => {
return new Store(connectionManager, libp2p);
return new Store(connectionManager, libp2p, options);
};
}

View File

@ -1,7 +1,6 @@
import type { Stream } from "@libp2p/interface";
import { isPeerId, PeerId } from "@libp2p/interface";
import { isPeerId, PeerId, type Stream } from "@libp2p/interface";
import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr";
import { ConnectionManager, getHealthManager } from "@waku/core";
import { ConnectionManager, getHealthManager, StoreCodec } from "@waku/core";
import type {
IFilter,
IHealthManager,
@ -106,7 +105,17 @@ export class WakuNode implements IWaku {
this.health = getHealthManager();
if (protocolsEnabled.store) {
const store = wakuStore(this.connectionManager);
if (options.store?.peer) {
this.connectionManager
.rawDialPeerWithProtocols(options.store.peer, [StoreCodec])
.catch((e) => {
log.error("Failed to dial store peer", e);
});
}
const store = wakuStore(this.connectionManager, {
peer: options.store?.peer
});
this.store = store(libp2p);
}
@ -145,7 +154,6 @@ export class WakuNode implements IWaku {
protocols?: Protocols[]
): Promise<Stream> {
const _protocols = protocols ?? [];
const peerId = this.mapToPeerIdOrMultiaddr(peer);
if (typeof protocols === "undefined") {
this.relay && _protocols.push(Protocols.Relay);
@ -194,9 +202,9 @@ export class WakuNode implements IWaku {
}
}
const peerId = this.mapToPeerIdOrMultiaddr(peer);
log.info(`Dialing to ${peerId.toString()} with protocols ${_protocols}`);
return this.libp2p.dialProtocol(peerId, codecs);
return await this.connectionManager.rawDialPeerWithProtocols(peer, codecs);
}
public async start(): Promise<void> {