From b6339f75425d40893da6e2d7485fc594856ec494 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Thu, 24 Oct 2024 19:55:15 +0530 Subject: [PATCH] chore: store uses the node --- packages/core/src/lib/base_protocol.ts | 18 +++++++++++++++++- packages/interfaces/src/protocols.ts | 6 ++---- packages/sdk/src/protocols/store/index.ts | 15 +++++++++++---- packages/sdk/src/waku/waku.ts | 23 +++++++++++++---------- 4 files changed, 43 insertions(+), 19 deletions(-) diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index 740298ae0a..45ae85de39 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -3,6 +3,7 @@ import type { Peer, Stream } from "@libp2p/interface"; import type { IBaseProtocolCore, Libp2pComponents, + PeerIdStr, PubsubTopic } from "@waku/interfaces"; import { Logger } from "@waku/utils"; @@ -75,15 +76,30 @@ export class BaseProtocol implements IBaseProtocolCore { public async getPeers( { numPeers, - maxBootstrapPeers + maxBootstrapPeers, + peerIdStr }: { numPeers: number; maxBootstrapPeers: number; + peerIdStr?: PeerIdStr; } = { maxBootstrapPeers: 0, numPeers: 0 } ): Promise { + if (peerIdStr) { + const peer = (await this.connectedPeers()).find( + (p) => p.id.toString() === peerIdStr + ); + if (peer) { + return [peer]; + } + this.log.warn( + `Passed node to use for ${this.multicodec} not found: ${peerIdStr}. Attempting to use random peers.` + ); + return this.getPeers({ numPeers, maxBootstrapPeers }); + } + // Retrieve all connected peers that support the protocol & shard (if configured) const allAvailableConnectedPeers = await this.connectedPeers(); diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 34d03d4824..904d711cdb 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -110,10 +110,8 @@ export type ProtocolCreateOptions = { * List of nodes' multiaddrs as strings to use for each protocol. If not specified, random nodes will be used. * This should be used only if you know what you are doing. */ - nodesToUse?: { - store?: string[]; - filter?: string[]; - lightpush?: string[]; + nodeToUse?: { + store?: string; }; }; diff --git a/packages/sdk/src/protocols/store/index.ts b/packages/sdk/src/protocols/store/index.ts index 01ef45d58c..b12958184a 100644 --- a/packages/sdk/src/protocols/store/index.ts +++ b/packages/sdk/src/protocols/store/index.ts @@ -23,7 +23,11 @@ const log = new Logger("waku:store:sdk"); export class Store extends BaseProtocolSDK implements IStore { public readonly protocol: StoreCore; - public constructor(connectionManager: ConnectionManager, libp2p: Libp2p) { + public constructor( + connectionManager: ConnectionManager, + libp2p: Libp2p, + private readonly peerIdStrToUse?: string + ) { super( new StoreCore(connectionManager.configuredPubsubTopics, libp2p), connectionManager, @@ -61,9 +65,11 @@ export class Store extends BaseProtocolSDK implements IStore { const peer = ( await this.protocol.getPeers({ numPeers: this.numPeersToUse, - maxBootstrapPeers: 1 + maxBootstrapPeers: 1, + peerIdStr: this.peerIdStrToUse }) )[0]; + if (!peer) { log.error("No peers available to query"); throw new Error("No peers available to query"); @@ -237,9 +243,10 @@ export class Store extends BaseProtocolSDK implements IStore { * @returns A function that takes a Libp2p instance and returns a StoreSDK instance. */ export function wakuStore( - connectionManager: ConnectionManager + connectionManager: ConnectionManager, + peerIdStrToUse?: string ): (libp2p: Libp2p) => IStore { return (libp2p: Libp2p) => { - return new Store(connectionManager, libp2p); + return new Store(connectionManager, libp2p, peerIdStrToUse); }; } diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index 03a30e54aa..b2c5ebef86 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -1,7 +1,7 @@ import type { Stream } from "@libp2p/interface"; import { isPeerId, PeerId } from "@libp2p/interface"; import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr"; -import { ConnectionManager, getHealthManager } from "@waku/core"; +import { ConnectionManager, getHealthManager, StoreCodec } from "@waku/core"; import type { IFilter, IHealthManager, @@ -10,6 +10,7 @@ import type { IStore, IWaku, Libp2p, + PeerIdStr, ProtocolCreateOptions, PubsubTopic } from "@waku/interfaces"; @@ -106,17 +107,15 @@ export class WakuNode implements IWaku { this.health = getHealthManager(); if (protocolsEnabled.store) { - const store = wakuStore(this.connectionManager); - this.store = store(libp2p); - - if (options.nodesToUse?.store) { - this.dialMultiaddr( - options.nodesToUse.store[0], - this.store.protocol.multicodec - ).catch((e) => { + let peerIdStr: PeerIdStr | undefined; + if (options.nodeToUse?.store) { + this.dialMultiaddr(options.nodeToUse.store, StoreCodec).catch((e) => { log.error("Failed to dial store peer", e); }); } + + const store = wakuStore(this.connectionManager, peerIdStr); + this.store = store(libp2p); } if (protocolsEnabled.lightpush) { @@ -236,9 +235,13 @@ export class WakuNode implements IWaku { private async dialMultiaddr( multiaddrStr: string, protocol: string - ): Promise { + ): Promise { const ma = multiaddr(multiaddrStr); + if (!ma.getPeerId()) { + throw new Error("Failed to dial multiaddr: missing peer ID"); + } await this.libp2p.dialProtocol(ma, [protocol]); + return ma.getPeerId()!; } private mapToPeerIdOrMultiaddr(