diff --git a/packages/core/src/lib/connection_manager/connection_manager.ts b/packages/core/src/lib/connection_manager/connection_manager.ts index 93d85dc5b2..995bfdde15 100644 --- a/packages/core/src/lib/connection_manager/connection_manager.ts +++ b/packages/core/src/lib/connection_manager/connection_manager.ts @@ -4,7 +4,6 @@ import { type Peer, type PeerId, type PeerInfo, - type PeerStore, type Stream, TypedEventEmitter } from "@libp2p/interface"; @@ -574,12 +573,9 @@ export class ConnectionManager return false; } - const isSameShard = await this.isPeerTopicConfigured(peerId); + const isSameShard = await this.isPeerOnSameShard(peerId); if (!isSameShard) { - const shardInfo = await this.getPeerShardInfo( - peerId, - this.libp2p.peerStore - ); + const shardInfo = await this.getPeerShardInfo(peerId); log.warn( `Discovered peer ${peerId.toString()} with ShardInfo ${shardInfo} is not part of any of the configured pubsub topics (${ @@ -666,28 +662,40 @@ export class ConnectionManager } } - private async isPeerTopicConfigured(peerId: PeerId): Promise { - const shardInfo = await this.getPeerShardInfo( - peerId, - this.libp2p.peerStore - ); + public async isPeerOnSameShard(peerId: PeerId): Promise { + const shardInfo = await this.getPeerShardInfo(peerId); - // If there's no shard information, simply return true - if (!shardInfo) return true; + if (!shardInfo) { + return true; + } const pubsubTopics = shardInfoToPubsubTopics(shardInfo); const isTopicConfigured = pubsubTopics.some((topic) => this.pubsubTopics.includes(topic) ); + return isTopicConfigured; } - private async getPeerShardInfo( + public async isPeerOnPubsubTopic( peerId: PeerId, - peerStore: PeerStore + pubsubTopic: string + ): Promise { + const shardInfo = await this.getPeerShardInfo(peerId); + + if (!shardInfo) { + return true; + } + + const pubsubTopics = shardInfoToPubsubTopics(shardInfo); + return pubsubTopics.some((t) => t === pubsubTopic); + } + + private async getPeerShardInfo( + peerId: PeerId ): Promise { - const peer = await peerStore.get(peerId); + const peer = await this.libp2p.peerStore.get(peerId); const shardInfoBytes = peer.metadata.get("shardInfo"); if (!shardInfoBytes) return undefined; return decodeRelayShard(shardInfoBytes); diff --git a/packages/interfaces/src/store.ts b/packages/interfaces/src/store.ts index cf2ee15e3b..014842aaa6 100644 --- a/packages/interfaces/src/store.ts +++ b/packages/interfaces/src/store.ts @@ -99,5 +99,9 @@ export type IStore = { }; export type StoreProtocolOptions = { - peer: string; + /** + * List of Multi-addresses of peers to be prioritized for Store protocol queries. + * @default [] + */ + peers: string[]; }; diff --git a/packages/sdk/src/create/libp2p.ts b/packages/sdk/src/create/libp2p.ts index a05c488d43..8f91d3660b 100644 --- a/packages/sdk/src/create/libp2p.ts +++ b/packages/sdk/src/create/libp2p.ts @@ -106,8 +106,13 @@ export async function createLibp2pAndUpdateOptions( peerDiscovery.push(...getPeerDiscoveries(options.discovery)); } - if (options?.bootstrapPeers) { - peerDiscovery.push(bootstrap({ list: options.bootstrapPeers })); + const bootstrapPeers = [ + ...(options.bootstrapPeers || []), + ...(options.store?.peers || []) + ]; + + if (bootstrapPeers.length) { + peerDiscovery.push(bootstrap({ list: bootstrapPeers })); } libp2pOptions.peerDiscovery = peerDiscovery; diff --git a/packages/sdk/src/store/store.ts b/packages/sdk/src/store/store.ts index f95db353d2..07bcbe6515 100644 --- a/packages/sdk/src/store/store.ts +++ b/packages/sdk/src/store/store.ts @@ -1,5 +1,12 @@ -import type { PeerId } from "@libp2p/interface"; -import { ConnectionManager, messageHash, StoreCore } from "@waku/core"; +import type { Peer, PeerId } from "@libp2p/interface"; +import { peerIdFromString } from "@libp2p/peer-id"; +import { multiaddr } from "@multiformats/multiaddr"; +import { + ConnectionManager, + messageHash, + StoreCodec, + StoreCore +} from "@waku/core"; import { IDecodedMessage, IDecoder, @@ -28,14 +35,14 @@ type StoreConstructorParams = { */ export class Store implements IStore { private readonly options: Partial; - private readonly peerManager: PeerManager; + private readonly libp2p: Libp2p; private readonly connectionManager: ConnectionManager; private readonly protocol: StoreCore; public constructor(params: StoreConstructorParams) { this.options = params.options || {}; - this.peerManager = params.peerManager; this.connectionManager = params.connectionManager; + this.libp2p = params.libp2p; this.protocol = new StoreCore( params.connectionManager.pubsubTopics, @@ -93,7 +100,7 @@ export class Store implements IStore { ...options }; - const peer = await this.getPeerToUse(); + const peer = await this.getPeerToUse(pubsubTopic); if (!peer) { log.error("No peers available to query"); @@ -260,32 +267,81 @@ export class Store implements IStore { }; } - private async getPeerToUse(): Promise { - let peerId: PeerId | undefined; + private async getPeerToUse(pubsubTopic: string): Promise { + const peers = await this.filterConnectedPeers(pubsubTopic); - if (this.options?.peer) { - const connectedPeers = await this.connectionManager.getConnectedPeers(); + const peer = this.options.peers + ? await this.getPeerFromConfigurationOrFirst(peers, this.options.peers) + : peers[0]?.id; - const peer = connectedPeers.find( - (p) => p.id.toString() === this.options?.peer + return peer; + } + + private async getPeerFromConfigurationOrFirst( + peers: Peer[], + configPeers: string[] + ): Promise { + const storeConfigPeers = configPeers.map(multiaddr); + const missing = []; + + for (const peer of storeConfigPeers) { + const matchedPeer = peers.find( + (p) => p.id.toString() === peer.getPeerId()?.toString() ); - peerId = peer?.id; - if (!peerId) { + if (matchedPeer) { + return matchedPeer.id; + } + + missing.push(peer); + } + + while (missing.length) { + const toDial = missing.pop(); + + if (!toDial) { + return; + } + + try { + const conn = await this.libp2p.dial(toDial); + + if (conn) { + return peerIdFromString(toDial.getPeerId() as string); + } + } catch (e) { log.warn( - `Passed node to use for Store not found: ${this.options.peer}. Attempting to use random peers.` + `Failed to dial peer from options.peers list for Store protocol. Peer:${toDial.getPeerId()}, error:${e}` ); } } - const peerIds = this.peerManager.getPeers(); + log.warn( + `Passed node to use for Store not found: ${configPeers.toString()}. Attempting to use first available peers.` + ); - if (peerIds.length > 0) { - // TODO(weboko): implement smart way of getting a peer https://github.com/waku-org/js-waku/issues/2243 - return peerIds[Math.floor(Math.random() * peerIds.length)]; + return peers[0]?.id; + } + + private async filterConnectedPeers(pubsubTopic: string): Promise { + const peers = await this.connectionManager.getConnectedPeers(); + const result: Peer[] = []; + + for (const peer of peers) { + const isStoreCodec = peer.protocols.includes(StoreCodec); + const isSameShard = await this.connectionManager.isPeerOnSameShard( + peer.id + ); + const isSamePubsub = await this.connectionManager.isPeerOnPubsubTopic( + peer.id, + pubsubTopic + ); + + if (isStoreCodec && isSameShard && isSamePubsub) { + result.push(peer); + } } - log.error("No peers available to use."); - return; + return result; } } diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index 7d1b8e3206..39d805a602 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -1,11 +1,6 @@ import type { Peer, PeerId, Stream } from "@libp2p/interface"; import { MultiaddrInput } from "@multiformats/multiaddr"; -import { - ConnectionManager, - createDecoder, - createEncoder, - StoreCodec -} from "@waku/core"; +import { ConnectionManager, createDecoder, createEncoder } from "@waku/core"; import type { CreateDecoderParams, CreateEncoderParams, @@ -103,21 +98,11 @@ export class WakuNode implements IWaku { this.health = new HealthIndicator({ libp2p }); if (protocolsEnabled.store) { - if (options.store?.peer) { - this.connectionManager - .rawDialPeerWithProtocols(options.store.peer, [StoreCodec]) - .catch((e) => { - log.error("Failed to dial store peer", e); - }); - } - this.store = new Store({ libp2p, connectionManager: this.connectionManager, peerManager: this.peerManager, - options: { - peer: options.store?.peer - } + options: options?.store }); } diff --git a/packages/tests/tests/connection-mananger/dials.spec.ts b/packages/tests/tests/connection-mananger/dials.spec.ts index 48282f2986..3bcd5c7622 100644 --- a/packages/tests/tests/connection-mananger/dials.spec.ts +++ b/packages/tests/tests/connection-mananger/dials.spec.ts @@ -17,21 +17,21 @@ describe("Dials", function () { let dialPeerStub: SinonStub; let getConnectionsStub: SinonStub; let getTagNamesForPeerStub: SinonStub; - let isPeerTopicConfigured: SinonStub; + let isPeerOnSameShard: SinonStub; let waku: LightNode; beforeEachCustom(this, async () => { waku = await createLightNode(); - isPeerTopicConfigured = sinon.stub( + isPeerOnSameShard = sinon.stub( waku.connectionManager as any, - "isPeerTopicConfigured" + "isPeerOnSameShard" ); - isPeerTopicConfigured.resolves(true); + isPeerOnSameShard.resolves(true); }); afterEachCustom(this, async () => { await tearDownNodes([], waku); - isPeerTopicConfigured.restore(); + isPeerOnSameShard.restore(); sinon.restore(); }); diff --git a/packages/tests/tests/multiaddr.node.spec.ts b/packages/tests/tests/multiaddr.node.spec.ts index 885f6e29ba..d9f5e9a4a0 100644 --- a/packages/tests/tests/multiaddr.node.spec.ts +++ b/packages/tests/tests/multiaddr.node.spec.ts @@ -20,7 +20,7 @@ describe("multiaddr: dialing", function () { let waku: IWaku; let nwaku: ServiceNode; let dialPeerSpy: SinonSpy; - let isPeerTopicConfigured: SinonStub; + let isPeerOnSameShard: SinonStub; afterEachCustom(this, async () => { await tearDownNodes(nwaku, waku); @@ -63,11 +63,11 @@ describe("multiaddr: dialing", function () { peerId = await nwaku.getPeerId(); multiaddr = await nwaku.getMultiaddrWithId(); - isPeerTopicConfigured = Sinon.stub( + isPeerOnSameShard = Sinon.stub( waku.connectionManager as any, - "isPeerTopicConfigured" + "isPeerOnSameShard" ); - isPeerTopicConfigured.resolves(true); + isPeerOnSameShard.resolves(true); dialPeerSpy = Sinon.spy(waku.connectionManager as any, "dialPeer"); }); diff --git a/packages/tests/tests/store/index.node.spec.ts b/packages/tests/tests/store/index.node.spec.ts index d9a055fb3e..c95acffc86 100644 --- a/packages/tests/tests/store/index.node.spec.ts +++ b/packages/tests/tests/store/index.node.spec.ts @@ -304,13 +304,10 @@ describe("Waku Store, general", function () { for await (const msg of query) { if (msg) { messages.push(msg as DecodedMessage); - console.log(bytesToUtf8(msg.payload!)); } } } - console.log(messages.length); - // Messages are ordered from oldest to latest within a page (1 page query) expect(bytesToUtf8(messages[0].payload!)).to.eq(asymText); expect(bytesToUtf8(messages[1].payload!)).to.eq(symText); diff --git a/packages/tests/tests/store/multiple_pubsub.spec.ts b/packages/tests/tests/store/multiple_pubsub.spec.ts index 3788fb62b1..caf4204765 100644 --- a/packages/tests/tests/store/multiple_pubsub.spec.ts +++ b/packages/tests/tests/store/multiple_pubsub.spec.ts @@ -105,15 +105,25 @@ describe("Waku Store, custom pubsub topic", function () { it("Generator, 2 nwaku nodes each with different pubsubtopics", async function () { this.timeout(10000); + await tearDownNodes([nwaku], []); + + // make sure each nwaku node operates on dedicated shard only + nwaku = new ServiceNode(makeLogFileName(this) + "1"); + await nwaku.start({ + store: true, + clusterId: TestShardInfo.clusterId, + shard: [TestShardInfo.shards[0]], + relay: true + }); + // Set up and start a new nwaku node with Default Pubsubtopic nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); await nwaku2.start({ store: true, clusterId: TestShardInfo.clusterId, - shard: TestShardInfo.shards, + shard: [TestShardInfo.shards[1]], relay: true }); - await nwaku2.ensureSubscriptions([TestDecoder2.pubsubTopic]); const totalMsgs = 10; await sendMessages( @@ -129,6 +139,7 @@ describe("Waku Store, custom pubsub topic", function () { TestDecoder2.pubsubTopic ); + await waku.dial(await nwaku.getMultiaddrWithId()); await waku.dial(await nwaku2.getMultiaddrWithId()); await waku.waitForPeers([Protocols.Store]); @@ -366,6 +377,17 @@ describe("Waku Store (named sharding), custom pubsub topic", function () { it("Generator, 2 nwaku nodes each with different pubsubtopics", async function () { this.timeout(10000); + await tearDownNodes([nwaku], []); + + // make sure each nwaku node operates on dedicated shard only + nwaku = new ServiceNode(makeLogFileName(this) + "1"); + await nwaku.start({ + store: true, + clusterId: TestShardInfo.clusterId, + shard: [TestShardInfo.shards[0]], + relay: true + }); + // Set up and start a new nwaku node with Default Pubsubtopic nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); await nwaku2.start({ @@ -390,6 +412,7 @@ describe("Waku Store (named sharding), custom pubsub topic", function () { TestDecoder2.pubsubTopic ); + await waku.dial(await nwaku.getMultiaddrWithId()); await waku.dial(await nwaku2.getMultiaddrWithId()); await waku.waitForPeers([Protocols.Store]); diff --git a/tsconfig.json b/tsconfig.json index 6399f3d42e..d79e055766 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,7 +1,7 @@ { "compilerOptions": { "incremental": true, - "target": "ES2023", + "target": "ES2022", "moduleResolution": "Bundler", "module": "esnext", "declaration": true, @@ -38,7 +38,7 @@ // "experimentalDecorators": true /* Enables experimental support for ES7 decorators. */, // "emitDecoratorMetadata": true /* Enables experimental support for emitting type metadata for decorators. */, - "lib": ["es2023", "dom"], + "lib": ["es2022", "dom"], "types": ["node", "mocha"], "typeRoots": ["node_modules/@types"] },