chore: store uses the node

This commit is contained in:
Danish Arora 2024-10-24 19:55:15 +05:30
parent 728b3c5f2d
commit b6339f7542
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E
4 changed files with 43 additions and 19 deletions

View File

@ -3,6 +3,7 @@ import type { Peer, Stream } from "@libp2p/interface";
import type { import type {
IBaseProtocolCore, IBaseProtocolCore,
Libp2pComponents, Libp2pComponents,
PeerIdStr,
PubsubTopic PubsubTopic
} from "@waku/interfaces"; } from "@waku/interfaces";
import { Logger } from "@waku/utils"; import { Logger } from "@waku/utils";
@ -75,15 +76,30 @@ export class BaseProtocol implements IBaseProtocolCore {
public async getPeers( public async getPeers(
{ {
numPeers, numPeers,
maxBootstrapPeers maxBootstrapPeers,
peerIdStr
}: { }: {
numPeers: number; numPeers: number;
maxBootstrapPeers: number; maxBootstrapPeers: number;
peerIdStr?: PeerIdStr;
} = { } = {
maxBootstrapPeers: 0, maxBootstrapPeers: 0,
numPeers: 0 numPeers: 0
} }
): Promise<Peer[]> { ): Promise<Peer[]> {
if (peerIdStr) {
const peer = (await this.connectedPeers()).find(
(p) => p.id.toString() === peerIdStr
);
if (peer) {
return [peer];
}
this.log.warn(
`Passed node to use for ${this.multicodec} not found: ${peerIdStr}. Attempting to use random peers.`
);
return this.getPeers({ numPeers, maxBootstrapPeers });
}
// Retrieve all connected peers that support the protocol & shard (if configured) // Retrieve all connected peers that support the protocol & shard (if configured)
const allAvailableConnectedPeers = await this.connectedPeers(); const allAvailableConnectedPeers = await this.connectedPeers();

View File

@ -110,10 +110,8 @@ export type ProtocolCreateOptions = {
* List of nodes' multiaddrs as strings to use for each protocol. If not specified, random nodes will be used. * List of nodes' multiaddrs as strings to use for each protocol. If not specified, random nodes will be used.
* This should be used only if you know what you are doing. * This should be used only if you know what you are doing.
*/ */
nodesToUse?: { nodeToUse?: {
store?: string[]; store?: string;
filter?: string[];
lightpush?: string[];
}; };
}; };

View File

@ -23,7 +23,11 @@ const log = new Logger("waku:store:sdk");
export class Store extends BaseProtocolSDK implements IStore { export class Store extends BaseProtocolSDK implements IStore {
public readonly protocol: StoreCore; public readonly protocol: StoreCore;
public constructor(connectionManager: ConnectionManager, libp2p: Libp2p) { public constructor(
connectionManager: ConnectionManager,
libp2p: Libp2p,
private readonly peerIdStrToUse?: string
) {
super( super(
new StoreCore(connectionManager.configuredPubsubTopics, libp2p), new StoreCore(connectionManager.configuredPubsubTopics, libp2p),
connectionManager, connectionManager,
@ -61,9 +65,11 @@ export class Store extends BaseProtocolSDK implements IStore {
const peer = ( const peer = (
await this.protocol.getPeers({ await this.protocol.getPeers({
numPeers: this.numPeersToUse, numPeers: this.numPeersToUse,
maxBootstrapPeers: 1 maxBootstrapPeers: 1,
peerIdStr: this.peerIdStrToUse
}) })
)[0]; )[0];
if (!peer) { if (!peer) {
log.error("No peers available to query"); log.error("No peers available to query");
throw new Error("No peers available to query"); throw new Error("No peers available to query");
@ -237,9 +243,10 @@ export class Store extends BaseProtocolSDK implements IStore {
* @returns A function that takes a Libp2p instance and returns a StoreSDK instance. * @returns A function that takes a Libp2p instance and returns a StoreSDK instance.
*/ */
export function wakuStore( export function wakuStore(
connectionManager: ConnectionManager connectionManager: ConnectionManager,
peerIdStrToUse?: string
): (libp2p: Libp2p) => IStore { ): (libp2p: Libp2p) => IStore {
return (libp2p: Libp2p) => { return (libp2p: Libp2p) => {
return new Store(connectionManager, libp2p); return new Store(connectionManager, libp2p, peerIdStrToUse);
}; };
} }

View File

@ -1,7 +1,7 @@
import type { Stream } from "@libp2p/interface"; import type { Stream } from "@libp2p/interface";
import { isPeerId, PeerId } from "@libp2p/interface"; import { isPeerId, PeerId } from "@libp2p/interface";
import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr"; import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr";
import { ConnectionManager, getHealthManager } from "@waku/core"; import { ConnectionManager, getHealthManager, StoreCodec } from "@waku/core";
import type { import type {
IFilter, IFilter,
IHealthManager, IHealthManager,
@ -10,6 +10,7 @@ import type {
IStore, IStore,
IWaku, IWaku,
Libp2p, Libp2p,
PeerIdStr,
ProtocolCreateOptions, ProtocolCreateOptions,
PubsubTopic PubsubTopic
} from "@waku/interfaces"; } from "@waku/interfaces";
@ -106,17 +107,15 @@ export class WakuNode implements IWaku {
this.health = getHealthManager(); this.health = getHealthManager();
if (protocolsEnabled.store) { if (protocolsEnabled.store) {
const store = wakuStore(this.connectionManager); let peerIdStr: PeerIdStr | undefined;
this.store = store(libp2p); if (options.nodeToUse?.store) {
this.dialMultiaddr(options.nodeToUse.store, StoreCodec).catch((e) => {
if (options.nodesToUse?.store) {
this.dialMultiaddr(
options.nodesToUse.store[0],
this.store.protocol.multicodec
).catch((e) => {
log.error("Failed to dial store peer", e); log.error("Failed to dial store peer", e);
}); });
} }
const store = wakuStore(this.connectionManager, peerIdStr);
this.store = store(libp2p);
} }
if (protocolsEnabled.lightpush) { if (protocolsEnabled.lightpush) {
@ -236,9 +235,13 @@ export class WakuNode implements IWaku {
private async dialMultiaddr( private async dialMultiaddr(
multiaddrStr: string, multiaddrStr: string,
protocol: string protocol: string
): Promise<void> { ): Promise<PeerIdStr> {
const ma = multiaddr(multiaddrStr); const ma = multiaddr(multiaddrStr);
if (!ma.getPeerId()) {
throw new Error("Failed to dial multiaddr: missing peer ID");
}
await this.libp2p.dialProtocol(ma, [protocol]); await this.libp2p.dialProtocol(ma, [protocol]);
return ma.getPeerId()!;
} }
private mapToPeerIdOrMultiaddr( private mapToPeerIdOrMultiaddr(