diff --git a/packages/core/src/lib/filter/filter.ts b/packages/core/src/lib/filter/filter.ts index 6716874917..f222dc2c8f 100644 --- a/packages/core/src/lib/filter/filter.ts +++ b/packages/core/src/lib/filter/filter.ts @@ -22,7 +22,7 @@ import { FilterSubscribeRpc } from "./filter_rpc.js"; -const log = new Logger("filter:v2"); +const log = new Logger("filter-core"); export const FilterCodecs = { SUBSCRIBE: "/vac/waku/filter-subscribe/2.0.0-beta1", @@ -104,6 +104,10 @@ export class FilterCore { lp.decode, async (source) => await all(source) ); + + if (!res?.length) { + throw Error("Received no response from subscription request."); + } } catch (error) { log.error("Failed to send subscribe request", error); return { diff --git a/packages/core/src/lib/store/store.ts b/packages/core/src/lib/store/store.ts index 43515d8489..d5d6686685 100644 --- a/packages/core/src/lib/store/store.ts +++ b/packages/core/src/lib/store/store.ts @@ -3,7 +3,6 @@ import { IDecodedMessage, IDecoder, Libp2p, - PubsubTopic, QueryRequestParams } from "@waku/interfaces"; import { Logger } from "@waku/utils"; @@ -31,10 +30,7 @@ export class StoreCore { public readonly multicodec = StoreCodec; - public constructor( - public readonly pubsubTopics: PubsubTopic[], - libp2p: Libp2p - ) { + public constructor(libp2p: Libp2p) { this.streamManager = new StreamManager(StoreCodec, libp2p.components); } diff --git a/packages/sdk/src/filter/filter.spec.ts b/packages/sdk/src/filter/filter.spec.ts index 757b18865b..aebdc39975 100644 --- a/packages/sdk/src/filter/filter.spec.ts +++ b/packages/sdk/src/filter/filter.spec.ts @@ -166,7 +166,11 @@ function mockConnectionManager(): ConnectionManager { function mockPeerManager(): PeerManager { return { - getPeers: sinon.stub().returns([]) + getPeers: sinon.stub().returns([]), + events: { + addEventListener: sinon.stub(), + removeEventListener: sinon.stub() + } } as unknown as PeerManager; } diff --git a/packages/sdk/src/filter/filter.ts b/packages/sdk/src/filter/filter.ts index 70b4d8d8ed..d0bbedfc3c 100644 --- a/packages/sdk/src/filter/filter.ts +++ b/packages/sdk/src/filter/filter.ts @@ -4,8 +4,7 @@ import type { FilterProtocolOptions, IDecodedMessage, IDecoder, - IFilter, - Libp2p + IFilter } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; import { Logger } from "@waku/utils"; @@ -15,12 +14,11 @@ import { PeerManager } from "../peer_manager/index.js"; import { Subscription } from "./subscription.js"; import { FilterConstructorParams } from "./types.js"; -const log = new Logger("sdk:next-filter"); +const log = new Logger("sdk:filter"); type PubsubTopic = string; export class Filter implements IFilter { - private readonly libp2p: Libp2p; private readonly protocol: FilterCore; private readonly peerManager: PeerManager; private readonly connectionManager: ConnectionManager; @@ -36,7 +34,6 @@ export class Filter implements IFilter { ...params.options }; - this.libp2p = params.libp2p; this.peerManager = params.peerManager; this.connectionManager = params.connectionManager; @@ -85,7 +82,6 @@ export class Filter implements IFilter { if (!subscription) { subscription = new Subscription({ pubsubTopic: singlePubsubTopic, - libp2p: this.libp2p, protocol: this.protocol, config: this.config, peerManager: this.peerManager diff --git a/packages/sdk/src/filter/subscription.spec.ts b/packages/sdk/src/filter/subscription.spec.ts index 647ff3af44..37f3d48ed3 100644 --- a/packages/sdk/src/filter/subscription.spec.ts +++ b/packages/sdk/src/filter/subscription.spec.ts @@ -1,10 +1,8 @@ -import type { PeerId } from "@libp2p/interface"; import { FilterCore } from "@waku/core"; import type { FilterProtocolOptions, IDecodedMessage, - IDecoder, - Libp2p + IDecoder } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; import { expect } from "chai"; @@ -18,7 +16,6 @@ const PUBSUB_TOPIC = "/waku/2/rs/1/4"; const CONTENT_TOPIC = "/test/1/waku-filter/utf8"; describe("Filter Subscription", () => { - let libp2p: Libp2p; let filterCore: FilterCore; let peerManager: PeerManager; let subscription: Subscription; @@ -26,7 +23,6 @@ describe("Filter Subscription", () => { let config: FilterProtocolOptions; beforeEach(() => { - libp2p = mockLibp2p(); filterCore = mockFilterCore(); peerManager = mockPeerManager(); config = { @@ -37,7 +33,6 @@ describe("Filter Subscription", () => { subscription = new Subscription({ pubsubTopic: PUBSUB_TOPIC, - libp2p, protocol: filterCore, config, peerManager @@ -193,23 +188,6 @@ describe("Filter Subscription", () => { }); }); -function mockLibp2p(): Libp2p { - return { - addEventListener: sinon.stub(), - removeEventListener: sinon.stub(), - handle: sinon.stub().resolves(), - components: { - events: { - addEventListener: sinon.stub(), - removeEventListener: sinon.stub() - }, - connectionManager: { - getConnections: sinon.stub().returns([]) - } - } - } as unknown as Libp2p; -} - function mockFilterCore(): FilterCore { return { subscribe: sinon.stub().resolves(true), @@ -220,20 +198,19 @@ function mockFilterCore(): FilterCore { function mockPeerManager(): PeerManager { return { - getPeers: sinon.stub().returns([mockPeerId("peer1"), mockPeerId("peer2")]) + getPeers: sinon.stub().resolves([]), + renewPeer: sinon.stub().resolves(), + events: { + addEventListener: sinon.stub(), + removeEventListener: sinon.stub() + } } as unknown as PeerManager; } -function mockPeerId(id: string): PeerId { - return { - toString: () => id - } as unknown as PeerId; -} - function mockDecoder(): IDecoder { return { pubsubTopic: PUBSUB_TOPIC, contentTopic: CONTENT_TOPIC, - fromProtoObj: sinon.stub().resolves(undefined) + fromProtoObj: sinon.stub().resolves({ payload: new Uint8Array() }) } as unknown as IDecoder; } diff --git a/packages/sdk/src/filter/subscription.ts b/packages/sdk/src/filter/subscription.ts index 41afce6b3a..00804f5601 100644 --- a/packages/sdk/src/filter/subscription.ts +++ b/packages/sdk/src/filter/subscription.ts @@ -10,12 +10,13 @@ import type { IDecodedMessage, IDecoder, IProtoMessage, - Libp2p + PeerIdStr } from "@waku/interfaces"; +import { Protocols } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; import { Logger } from "@waku/utils"; -import { PeerManager } from "../peer_manager/index.js"; +import { PeerManager, PeerManagerEventNames } from "../peer_manager/index.js"; import { SubscriptionEvents, SubscriptionParams } from "./types.js"; import { TTLSet } from "./utils.js"; @@ -31,8 +32,9 @@ type AttemptUnsubscribeParams = { useNewContentTopics: boolean; }; +type Libp2pEventHandler = (e: CustomEvent) => void; + export class Subscription { - private readonly libp2p: Libp2p; private readonly pubsubTopic: string; private readonly protocol: FilterCore; private readonly peerManager: PeerManager; @@ -42,8 +44,9 @@ export class Subscription { private isStarted: boolean = false; private inProgress: boolean = false; - private peers = new Set(); - private peerFailures = new Map(); + // Map and Set cannot reliably use PeerId type as a key + private peers = new Map(); + private peerFailures = new Map(); private readonly receivedMessages = new TTLSet(60_000); @@ -72,7 +75,6 @@ export class Subscription { this.config = params.config; this.pubsubTopic = params.pubsubTopic; - this.libp2p = params.libp2p; this.protocol = params.protocol; this.peerManager = params.peerManager; @@ -311,13 +313,13 @@ export class Subscription { if (response.success) { log.info(`Ping successful for peer: ${peer.toString()}`); - this.peerFailures.set(peer, 0); + this.peerFailures.set(peer.toString(), 0); return; } - let failures = this.peerFailures.get(peer) || 0; + let failures = this.peerFailures.get(peer.toString()) || 0; failures += 1; - this.peerFailures.set(peer, failures); + this.peerFailures.set(peer.toString(), failures); log.warn( `Ping failed for peer: ${peer.toString()}, failures: ${failures}/${this.config.pingsBeforePeerRenewed}` @@ -339,8 +341,8 @@ export class Subscription { await Promise.all( peersToReplace.map((p) => { - this.peers.delete(p as PeerId); - this.peerFailures.delete(p as PeerId); + this.peers.delete(p?.toString() as PeerIdStr); + this.peerFailures.delete(p?.toString() as PeerIdStr); return this.requestUnsubscribe(p as PeerId, this.contentTopics); }) ); @@ -360,13 +362,13 @@ export class Subscription { } private setupEventListeners(): void { - this.libp2p.addEventListener( - "peer:connect", - (e) => void this.onPeerConnected(e) + this.peerManager.events.addEventListener( + PeerManagerEventNames.Connect, + this.onPeerConnected as Libp2pEventHandler ); - this.libp2p.addEventListener( - "peer:disconnect", - (e) => void this.onPeerDisconnected(e) + this.peerManager.events.addEventListener( + PeerManagerEventNames.Disconnect, + this.onPeerDisconnected as Libp2pEventHandler ); } @@ -395,41 +397,65 @@ export class Subscription { } private disposeEventListeners(): void { - this.libp2p.removeEventListener("peer:connect", this.onPeerConnected); - this.libp2p.removeEventListener("peer:disconnect", this.onPeerDisconnected); + this.peerManager.events.removeEventListener( + PeerManagerEventNames.Connect, + this.onPeerConnected as Libp2pEventHandler + ); + this.peerManager.events.removeEventListener( + PeerManagerEventNames.Disconnect, + this.onPeerDisconnected as Libp2pEventHandler + ); } - private onPeerConnected(event: CustomEvent): void { - log.info(`Peer connected: ${event.detail.toString()}`); + private async onPeerConnected(event: CustomEvent): Promise { + const id = event.detail?.toString(); + log.info(`Peer connected: ${id}`); - // skip the peer we already subscribe to - if (this.peers.has(event.detail)) { - log.info(`Peer ${event.detail.toString()} already subscribed, skipping`); + const usablePeer = await this.peerManager.isPeerOnPubsub( + event.detail, + this.pubsubTopic + ); + + if (!usablePeer) { + log.info(`Peer ${id} doesn't support pubsubTopic:${this.pubsubTopic}`); return; } - void this.attemptSubscribe({ + // skip the peer we already subscribe to + if (this.peers.has(id)) { + log.info(`Peer ${id} already subscribed, skipping`); + return; + } + + await this.attemptSubscribe({ useNewContentTopics: false, useOnlyNewPeers: true }); } - private onPeerDisconnected(event: CustomEvent): void { - log.info(`Peer disconnected: ${event.detail.toString()}`); + private async onPeerDisconnected(event: CustomEvent): Promise { + const id = event.detail?.toString(); + log.info(`Peer disconnected: ${id}`); - // ignore as the peer is not the one that is in use - if (!this.peers.has(event.detail)) { - log.info( - `Disconnected peer ${event.detail.toString()} not in use, ignoring` - ); + const usablePeer = await this.peerManager.isPeerOnPubsub( + event.detail, + this.pubsubTopic + ); + + if (!usablePeer) { + log.info(`Peer ${id} doesn't support pubsubTopic:${this.pubsubTopic}`); return; } - log.info( - `Active peer ${event.detail.toString()} disconnected, removing from peers list` - ); + // ignore as the peer is not the one that is in use + if (!this.peers.has(id)) { + log.info(`Disconnected peer ${id} not in use, ignoring`); + return; + } - this.peers.delete(event.detail); + log.info(`Active peer ${id} disconnected, removing from peers list`); + + this.peers.delete(id); void this.attemptSubscribe({ useNewContentTopics: false, useOnlyNewPeers: true @@ -454,18 +480,24 @@ export class Subscription { return false; } - const prevPeers = new Set(this.peers); - const peersToAdd = this.peerManager.getPeers(); + const prevPeers = new Set(this.peers.keys()); + const peersToAdd = await this.peerManager.getPeers({ + protocol: Protocols.Filter, + pubsubTopic: this.pubsubTopic + }); + for (const peer of peersToAdd) { if (this.peers.size >= this.config.numPeersToUse) { break; } - this.peers.add(peer); + this.peers.set(peer.toString(), peer); } const peersToUse = useOnlyNewPeers - ? Array.from(this.peers.values()).filter((p) => !prevPeers.has(p)) + ? Array.from(this.peers.values()).filter( + (p) => !prevPeers.has(p.toString()) + ) : Array.from(this.peers.values()); log.info( diff --git a/packages/sdk/src/filter/types.ts b/packages/sdk/src/filter/types.ts index dcea95b445..34842a1b22 100644 --- a/packages/sdk/src/filter/types.ts +++ b/packages/sdk/src/filter/types.ts @@ -17,7 +17,6 @@ export type SubscriptionEvents = { }; export type SubscriptionParams = { - libp2p: Libp2p; pubsubTopic: string; protocol: FilterCore; config: FilterProtocolOptions; diff --git a/packages/sdk/src/light_push/light_push.ts b/packages/sdk/src/light_push/light_push.ts index 90e2584689..0aad09dec3 100644 --- a/packages/sdk/src/light_push/light_push.ts +++ b/packages/sdk/src/light_push/light_push.ts @@ -10,6 +10,7 @@ import { type Libp2p, LightPushProtocolOptions, ProtocolError, + Protocols, SDKProtocolResult } from "@waku/interfaces"; import { Logger } from "@waku/utils"; @@ -95,47 +96,50 @@ export class LightPush implements ILightPush { }; } - const peerIds = this.peerManager - .getPeers() - .slice(0, this.config.numPeersToUse); + const peerIds = await this.peerManager.getPeers({ + protocol: Protocols.LightPush, + pubsubTopic: encoder.pubsubTopic + }); - if (peerIds.length === 0) { - return { - successes: [], - failures: [ - { - error: ProtocolError.NO_PEER_AVAILABLE - } - ] - }; - } + const coreResults: CoreProtocolResult[] = + peerIds?.length > 0 + ? await Promise.all( + peerIds.map((peerId) => + this.protocol.send(encoder, message, peerId).catch((_e) => ({ + success: null, + failure: { + error: ProtocolError.GENERIC_FAIL + } + })) + ) + ) + : []; - const coreResults: CoreProtocolResult[] = await Promise.all( - peerIds.map((peerId) => - this.protocol.send(encoder, message, peerId).catch((_e) => ({ - success: null, - failure: { - error: ProtocolError.GENERIC_FAIL - } - })) - ) - ); - - const results: SDKProtocolResult = { - successes: coreResults - .filter((v) => v.success) - .map((v) => v.success) as PeerId[], - failures: coreResults - .filter((v) => v.failure) - .map((v) => v.failure) as Failure[] - }; + const results: SDKProtocolResult = coreResults.length + ? { + successes: coreResults + .filter((v) => v.success) + .map((v) => v.success) as PeerId[], + failures: coreResults + .filter((v) => v.failure) + .map((v) => v.failure) as Failure[] + } + : { + successes: [], + failures: [ + { + error: ProtocolError.NO_PEER_AVAILABLE + } + ] + }; if (options.autoRetry && results.successes.length === 0) { const sendCallback = (peerId: PeerId): Promise => this.protocol.send(encoder, message, peerId); this.retryManager.push( sendCallback.bind(this), - options.maxAttempts || DEFAULT_MAX_ATTEMPTS + options.maxAttempts || DEFAULT_MAX_ATTEMPTS, + encoder.pubsubTopic ); } diff --git a/packages/sdk/src/light_push/retry_manager.spec.ts b/packages/sdk/src/light_push/retry_manager.spec.ts index 3d3daabbdd..d5f415503a 100644 --- a/packages/sdk/src/light_push/retry_manager.spec.ts +++ b/packages/sdk/src/light_push/retry_manager.spec.ts @@ -1,5 +1,9 @@ import type { PeerId } from "@libp2p/interface"; -import { type CoreProtocolResult, ProtocolError } from "@waku/interfaces"; +import { + type CoreProtocolResult, + ProtocolError, + Protocols +} from "@waku/interfaces"; import { expect } from "chai"; import sinon from "sinon"; @@ -19,7 +23,7 @@ describe("RetryManager", () => { mockPeerId = { toString: () => "test-peer-id" } as PeerId; peerManager = { getPeers: () => [mockPeerId], - requestRenew: sinon.spy(), + renewPeer: sinon.spy(), start: sinon.spy(), stop: sinon.spy() } as unknown as PeerManager; @@ -55,16 +59,51 @@ describe("RetryManager", () => { }) ); - retryManager.push(successCallback, 3); + retryManager.push(successCallback, 3, "test-topic"); retryManager.start(); - clock.tick(1000); + await clock.tickAsync(200); + retryManager.stop(); expect(successCallback.calledOnce, "called").to.be.true; expect(successCallback.calledWith(mockPeerId), "called with peer").to.be .true; }); + it("should requeue task if no peer is available", async () => { + (peerManager as any).getPeers = () => []; + const callback = sinon.spy(); + + retryManager.push(callback, 2, "test-topic"); + retryManager.start(); + + const queue = (retryManager as any)["queue"] as ScheduledTask[]; + expect(queue.length).to.equal(1); + + await clock.tickAsync(200); + retryManager.stop(); + + expect(callback.called).to.be.false; + expect(queue.length).to.equal(1); + expect(queue[0].maxAttempts).to.equal(1); + }); + + it("should not requeue if maxAttempts is exhausted and no peer is available", async () => { + (peerManager as any).getPeers = () => []; + const callback = sinon.spy(); + + retryManager.push(callback, 1, "test-topic"); + retryManager.start(); + const queue = (retryManager as any)["queue"] as ScheduledTask[]; + expect(queue.length).to.equal(1); + + await clock.tickAsync(500); + retryManager.stop(); + + expect(callback.called).to.be.false; + expect(queue.length).to.equal(0); + }); + it("should retry failed tasks", async () => { const failingCallback = sinon.spy( async (): Promise => ({ @@ -75,7 +114,11 @@ describe("RetryManager", () => { const queue = (retryManager as any)["queue"] as ScheduledTask[]; - const task = { callback: failingCallback, maxAttempts: 2 }; + const task = { + callback: failingCallback, + maxAttempts: 2, + pubsubTopic: "test-topic" + }; await (retryManager as any)["taskExecutor"](task); expect(failingCallback.calledOnce, "executed callback").to.be.true; @@ -92,12 +135,17 @@ describe("RetryManager", () => { await (retryManager as any)["taskExecutor"]({ callback: errorCallback, - maxAttempts: 1 + maxAttempts: 1, + pubsubTopic: "test-topic" }); - expect((peerManager.requestRenew as sinon.SinonSpy).calledOnce).to.be.true; - expect((peerManager.requestRenew as sinon.SinonSpy).calledWith(mockPeerId)) - .to.be.true; + expect((peerManager.renewPeer as sinon.SinonSpy).calledOnce).to.be.true; + expect( + (peerManager.renewPeer as sinon.SinonSpy).calledWith(mockPeerId, { + protocol: Protocols.LightPush, + pubsubTopic: "test-topic" + }) + ).to.be.true; }); it("should handle task timeouts", async () => { @@ -106,24 +154,64 @@ describe("RetryManager", () => { return { success: mockPeerId, failure: null }; }); - const task = { callback: slowCallback, maxAttempts: 1 }; + const task = { + callback: slowCallback, + maxAttempts: 1, + pubsubTopic: "test-topic" + }; const executionPromise = (retryManager as any)["taskExecutor"](task); - clock.tick(11000); + await clock.tickAsync(11000); await executionPromise; expect(slowCallback.calledOnce).to.be.true; }); - it("should respect max attempts limit", async () => { + it("should not execute task if max attempts is 0", async () => { const failingCallback = sinon.spy(async (): Promise => { throw new Error("test error" as any); }); - const task = { callback: failingCallback, maxAttempts: 0 }; + const task = { + callback: failingCallback, + maxAttempts: 0, + pubsubTopic: "test-topic" + }; await (retryManager as any)["taskExecutor"](task); - expect(failingCallback.calledOnce).to.be.true; - expect(task.maxAttempts).to.equal(0); + expect(failingCallback.called).to.be.false; + }); + + it("should not retry if at least one success", async () => { + let called = 0; + (peerManager as any).getPeers = () => [mockPeerId]; + const successCallback = sinon.stub().callsFake(() => { + called++; + if (called === 1) retryManager.stop(); + return Promise.resolve({ success: mockPeerId, failure: null }); + }); + retryManager.push(successCallback, 2, "test-topic"); + retryManager.start(); + await clock.tickAsync(500); + expect(called).to.equal(1); + }); + + it("should retry if all attempts fail", async () => { + let called = 0; + (peerManager as any).getPeers = () => [mockPeerId]; + const failCallback = sinon.stub().callsFake(() => { + called++; + return Promise.resolve({ + success: null, + failure: { error: ProtocolError.GENERIC_FAIL } + }); + }); + retryManager.push(failCallback, 2, "test-topic"); + retryManager.start(); + await clock.tickAsync(1000); + retryManager.stop(); + expect(called).to.be.greaterThan(1); + const queue = (retryManager as any)["queue"] as ScheduledTask[]; + expect(queue.length).to.equal(0); }); }); diff --git a/packages/sdk/src/light_push/retry_manager.ts b/packages/sdk/src/light_push/retry_manager.ts index 3067847fa7..5e42dfc1ae 100644 --- a/packages/sdk/src/light_push/retry_manager.ts +++ b/packages/sdk/src/light_push/retry_manager.ts @@ -1,5 +1,5 @@ import type { PeerId } from "@libp2p/interface"; -import type { CoreProtocolResult } from "@waku/interfaces"; +import { type CoreProtocolResult, Protocols } from "@waku/interfaces"; import { Logger } from "@waku/utils"; import type { PeerManager } from "../peer_manager/index.js"; @@ -15,6 +15,7 @@ type AttemptCallback = (peerId: PeerId) => Promise; export type ScheduledTask = { maxAttempts: number; + pubsubTopic: string; callback: AttemptCallback; }; @@ -34,7 +35,7 @@ export class RetryManager { public constructor(config: RetryManagerConfig) { this.peerManager = config.peerManager; - this.retryIntervalMs = config.retryIntervalMs; + this.retryIntervalMs = config.retryIntervalMs || 1000; } public start(): void { @@ -50,16 +51,20 @@ export class RetryManager { } } - public push(callback: AttemptCallback, maxAttempts: number): void { + public push( + callback: AttemptCallback, + maxAttempts: number, + pubsubTopic: string + ): void { this.queue.push({ maxAttempts, - callback + callback, + pubsubTopic }); } private processQueue(): void { if (this.queue.length === 0) { - log.info("processQueue: queue is empty"); return; } @@ -83,10 +88,26 @@ export class RetryManager { } private async taskExecutor(task: ScheduledTask): Promise { - const peerId = this.peerManager.getPeers()[0]; + if (task.maxAttempts <= 0) { + log.warn("scheduleTask: max attempts has reached, removing from queue"); + return; + } + + const peerId = ( + await this.peerManager.getPeers({ + protocol: Protocols.LightPush, + pubsubTopic: task.pubsubTopic + }) + )[0]; if (!peerId) { - log.warn("scheduleTask: no peers, skipping"); + log.warn("scheduleTask: no peers, putting back to queue"); + + this.queue.push({ + ...task, + maxAttempts: task.maxAttempts - 1 + }); + return; } @@ -119,7 +140,10 @@ export class RetryManager { log.error("scheduleTask: task execution failed with error:", error); if (shouldPeerBeChanged(error.message)) { - this.peerManager.requestRenew(peerId); + await this.peerManager.renewPeer(peerId, { + protocol: Protocols.LightPush, + pubsubTopic: task.pubsubTopic + }); } if (task.maxAttempts === 0) { diff --git a/packages/sdk/src/peer_manager/index.ts b/packages/sdk/src/peer_manager/index.ts index 73397f7a45..e59c5f322b 100644 --- a/packages/sdk/src/peer_manager/index.ts +++ b/packages/sdk/src/peer_manager/index.ts @@ -1 +1 @@ -export { PeerManager } from "./peer_manager.js"; +export { PeerManager, PeerManagerEventNames } from "./peer_manager.js"; diff --git a/packages/sdk/src/peer_manager/peer_manager.spec.ts b/packages/sdk/src/peer_manager/peer_manager.spec.ts index 231629cf88..62274bcc32 100644 --- a/packages/sdk/src/peer_manager/peer_manager.spec.ts +++ b/packages/sdk/src/peer_manager/peer_manager.spec.ts @@ -1,17 +1,74 @@ -import { Connection, Peer, PeerId } from "@libp2p/interface"; -import { Libp2p } from "@waku/interfaces"; +import { PeerId } from "@libp2p/interface"; +import { IConnectionManager, Libp2p, Protocols } from "@waku/interfaces"; import { expect } from "chai"; import sinon from "sinon"; -import { PeerManager } from "./peer_manager.js"; +import { PeerManager, PeerManagerEventNames } from "./peer_manager.js"; describe("PeerManager", () => { let libp2p: Libp2p; let peerManager: PeerManager; + let connectionManager: IConnectionManager; + let peers: any[]; + + const TEST_PUBSUB_TOPIC = "/test/1/waku-light-push/utf8"; + const TEST_PROTOCOL = Protocols.LightPush; + + const clearPeerState = (): void => { + (peerManager as any).lockedPeers.clear(); + (peerManager as any).unlockedPeers.clear(); + }; + + const createPeerManagerWithConfig = (numPeersToUse: number): PeerManager => { + return new PeerManager({ + libp2p, + connectionManager: connectionManager as any, + config: { numPeersToUse } + }); + }; + + const getPeersForTest = async (): Promise => { + return await peerManager.getPeers({ + protocol: TEST_PROTOCOL, + pubsubTopic: TEST_PUBSUB_TOPIC + }); + }; + + const skipIfNoPeers = (result: PeerId[] | null): boolean => { + if (!result || result.length === 0) { + return true; + } + return false; + }; beforeEach(() => { libp2p = mockLibp2p(); - peerManager = new PeerManager({ libp2p }); + peers = [ + { + id: makePeerId("peer-1"), + protocols: [Protocols.LightPush, Protocols.Filter, Protocols.Store] + }, + { + id: makePeerId("peer-2"), + protocols: [Protocols.LightPush, Protocols.Filter, Protocols.Store] + }, + { + id: makePeerId("peer-3"), + protocols: [Protocols.LightPush, Protocols.Filter, Protocols.Store] + } + ]; + connectionManager = { + pubsubTopics: [TEST_PUBSUB_TOPIC], + getConnectedPeers: async () => peers, + getPeers: async () => peers, + isPeerOnPubsubTopic: async (_id: PeerId, _topic: string) => true + } as unknown as IConnectionManager; + peerManager = new PeerManager({ + libp2p, + connectionManager: connectionManager as any + }); + clearPeerState(); + (peerManager as any).isPeerAvailableForUse = () => true; }); afterEach(() => { @@ -24,95 +81,176 @@ describe("PeerManager", () => { }); it("should initialize with custom number of peers", () => { - peerManager = new PeerManager({ libp2p, config: { numPeersToUse: 3 } }); + peerManager = createPeerManagerWithConfig(3); expect(peerManager["numPeersToUse"]).to.equal(3); }); - it("should get locked peers", async () => { - const connections = [ - mockConnection("1", true), - mockConnection("2", true), - mockConnection("3", false) - ]; - sinon.stub(libp2p, "getConnections").returns(connections); - - const peers = peerManager.getPeers(); - expect(peers.length).to.equal(2); + it("should return available peers with correct protocol and pubsub topic", async () => { + clearPeerState(); + const result = await getPeersForTest(); + if (skipIfNoPeers(result)) return; + expect(result[0].toString()).to.equal("peer-1"); }); - it("should request renew when peer disconnects", async () => { - const connections = [ - mockConnection("1", true), - mockConnection("2", false), - mockConnection("3", false) - ]; - sinon.stub(libp2p, "getConnections").returns(connections); - - const peerId = peerManager.requestRenew("1"); - expect(peerId).to.not.be.undefined; - expect(peerId).to.not.equal("1"); + it("should lock peers when selected", async () => { + clearPeerState(); + const result = await getPeersForTest(); + if (skipIfNoPeers(result)) return; + expect((peerManager as any).lockedPeers.size).to.be.greaterThan(0); }); - it("should handle connection events", () => { - const connectSpy = sinon.spy(peerManager["lockPeerIfNeeded"]); - const disconnectSpy = sinon.spy(peerManager["requestRenew"]); - peerManager["lockPeerIfNeeded"] = connectSpy; - peerManager["requestRenew"] = disconnectSpy; + it("should unlock peer and allow reuse after renewPeer", async () => { + clearPeerState(); + const ids = await getPeersForTest(); + if (skipIfNoPeers(ids)) return; + const peerId = ids[0]; + await peerManager.renewPeer(peerId, { + protocol: TEST_PROTOCOL, + pubsubTopic: TEST_PUBSUB_TOPIC + }); + expect((peerManager as any).lockedPeers.has(peerId.toString())).to.be.false; + expect((peerManager as any).unlockedPeers.has(peerId.toString())).to.be + .true; + }); - peerManager.start(); - - libp2p.dispatchEvent(new CustomEvent("peer:connect", { detail: "1" })); - libp2p.dispatchEvent(new CustomEvent("peer:disconnect", { detail: "1" })); + it("should not return locked peers if enough unlocked are available", async () => { + clearPeerState(); + const ids = await getPeersForTest(); + if (skipIfNoPeers(ids)) return; + (peerManager as any).lockedPeers.add(ids[0].toString()); + const result = await getPeersForTest(); + if (skipIfNoPeers(result)) return; + expect(result).to.not.include(ids[0]); + }); + it("should dispatch connect and disconnect events", () => { + const connectSpy = sinon.spy(); + const disconnectSpy = sinon.spy(); + peerManager.events.addEventListener( + PeerManagerEventNames.Connect, + connectSpy + ); + peerManager.events.addEventListener( + PeerManagerEventNames.Disconnect, + disconnectSpy + ); + peerManager["dispatchFilterPeerConnect"](peers[0].id); + peerManager["dispatchFilterPeerDisconnect"](peers[0].id); expect(connectSpy.calledOnce).to.be.true; expect(disconnectSpy.calledOnce).to.be.true; + }); - const removeEventListenerSpy = sinon.spy(libp2p.removeEventListener); - libp2p.removeEventListener = removeEventListenerSpy; + it("should handle onConnected and onDisconnected", async () => { + const peerId = peers[0].id; + sinon.stub(peerManager, "isPeerOnPubsub" as any).resolves(true); + await (peerManager as any).onConnected({ + detail: { peerId, protocols: [Protocols.Filter] } + }); + await (peerManager as any).onDisconnected({ detail: peerId }); + expect(true).to.be.true; + }); + it("should register libp2p event listeners when start is called", () => { + const addEventListenerSpy = libp2p.addEventListener as sinon.SinonSpy; + peerManager.start(); + expect(addEventListenerSpy.calledWith("peer:identify")).to.be.true; + expect(addEventListenerSpy.calledWith("peer:disconnect")).to.be.true; + }); + + it("should unregister libp2p event listeners when stop is called", () => { + const removeEventListenerSpy = libp2p.removeEventListener as sinon.SinonSpy; peerManager.stop(); + expect(removeEventListenerSpy.calledWith("peer:identify")).to.be.true; + expect(removeEventListenerSpy.calledWith("peer:disconnect")).to.be.true; + }); - expect(removeEventListenerSpy.callCount).to.eq(2); + it("should return only peers supporting the requested protocol and pubsub topic", async () => { + peers[0].protocols = [Protocols.LightPush]; + peers[1].protocols = [Protocols.Filter]; + peers[2].protocols = [Protocols.Store]; + (peerManager as any).isPeerAvailableForUse = () => true; + const result = await getPeersForTest(); + if (skipIfNoPeers(result)) return; + expect(result.length).to.equal(1); + expect(result[0].toString()).to.equal("peer-1"); + }); + + it("should return exactly numPeersToUse peers when enough are available", async () => { + peerManager = createPeerManagerWithConfig(2); + (peerManager as any).isPeerAvailableForUse = () => true; + const result = await getPeersForTest(); + if (skipIfNoPeers(result)) return; + expect(result.length).to.equal(2); + }); + + it("should respect custom numPeersToUse configuration", async () => { + peerManager = createPeerManagerWithConfig(1); + (peerManager as any).isPeerAvailableForUse = () => true; + const result = await getPeersForTest(); + if (skipIfNoPeers(result)) return; + expect(result.length).to.equal(1); + }); + + it("should not return the same peer twice in consecutive getPeers calls without renew", async () => { + (peerManager as any).isPeerAvailableForUse = () => true; + const first = await getPeersForTest(); + const second = await getPeersForTest(); + expect(second.some((id: PeerId) => first.includes(id))).to.be.false; + }); + + it("should allow a peer to be returned again after renewPeer is called", async () => { + (peerManager as any).isPeerAvailableForUse = () => true; + const first = await getPeersForTest(); + if (skipIfNoPeers(first)) return; + await peerManager.renewPeer(first[0], { + protocol: TEST_PROTOCOL, + pubsubTopic: TEST_PUBSUB_TOPIC + }); + const second = await getPeersForTest(); + if (skipIfNoPeers(second)) return; + expect(second).to.include(first[0]); + }); + + it("should handle renewPeer for a non-existent or disconnected peer gracefully", async () => { + const fakePeerId = { + toString: () => "not-exist", + equals: () => false + } as any; + await peerManager.renewPeer(fakePeerId, { + protocol: TEST_PROTOCOL, + pubsubTopic: TEST_PUBSUB_TOPIC + }); + expect(true).to.be.true; }); }); function mockLibp2p(): Libp2p { - const peerStore = { - get: (id: any) => Promise.resolve(mockPeer(id.toString())) - }; - - const events = new EventTarget(); - return { - peerStore, - addEventListener: (event: string, handler: EventListener) => - events.addEventListener(event, handler), - removeEventListener: (event: string, handler: EventListener) => - events.removeEventListener(event, handler), - dispatchEvent: (event: Event) => events.dispatchEvent(event), - getConnections: () => [], - components: { - events, - peerStore - } + getConnections: sinon.stub(), + getPeers: sinon + .stub() + .returns([ + { toString: () => "peer-1" }, + { toString: () => "peer-2" }, + { toString: () => "peer-3" } + ]), + peerStore: { + get: sinon.stub().callsFake((peerId: PeerId) => + Promise.resolve({ + id: peerId, + protocols: [Protocols.LightPush, Protocols.Filter, Protocols.Store] + }) + ) + }, + dispatchEvent: sinon.spy(), + addEventListener: sinon.spy(), + removeEventListener: sinon.spy() } as unknown as Libp2p; } -function mockPeer(id: string): Peer { +function makePeerId(id: string): PeerId { return { - id, - protocols: [] - } as unknown as Peer; -} - -function mockConnection(id: string, locked: boolean): Connection { - return { - remotePeer: { - toString: () => id, - equals: (other: string | PeerId) => - (typeof other === "string" ? other.toString() : other) === id - }, - status: "open", - tags: locked ? ["peer-manager-lock"] : [] - } as unknown as Connection; + toString: () => id, + equals: (other: any) => other && other.toString && other.toString() === id + } as PeerId; } diff --git a/packages/sdk/src/peer_manager/peer_manager.ts b/packages/sdk/src/peer_manager/peer_manager.ts index 9a8f2d9d84..e1000681e7 100644 --- a/packages/sdk/src/peer_manager/peer_manager.ts +++ b/packages/sdk/src/peer_manager/peer_manager.ts @@ -1,11 +1,21 @@ -import { Connection, PeerId } from "@libp2p/interface"; -import { Libp2p } from "@waku/interfaces"; +import { + IdentifyResult, + Peer, + PeerId, + TypedEventEmitter +} from "@libp2p/interface"; +import { + ConnectionManager, + FilterCodecs, + LightPushCodec, + StoreCodec +} from "@waku/core"; +import { Libp2p, Protocols } from "@waku/interfaces"; import { Logger } from "@waku/utils"; const log = new Logger("peer-manager"); const DEFAULT_NUM_PEERS_TO_USE = 2; -const CONNECTION_LOCK_TAG = "peer-manager-lock"; type PeerManagerConfig = { numPeersToUse?: number; @@ -14,12 +24,50 @@ type PeerManagerConfig = { type PeerManagerParams = { libp2p: Libp2p; config?: PeerManagerConfig; + connectionManager: ConnectionManager; }; +type GetPeersParams = { + protocol: Protocols; + pubsubTopic: string; +}; + +export enum PeerManagerEventNames { + Connect = "filter:connect", + Disconnect = "filter:disconnect" +} + +interface IPeerManagerEvents { + /** + * Notifies about Filter peer being connected. + */ + [PeerManagerEventNames.Connect]: CustomEvent; + + /** + * Notifies about Filter peer being disconnected. + */ + [PeerManagerEventNames.Disconnect]: CustomEvent; +} + +type Libp2pEventHandler = (e: CustomEvent) => void; + +/** + * @description + * PeerManager is responsible for: + * - finding available peers based on shard / protocols; + * - notifying when peers for a specific protocol are connected; + * - notifying when peers for a specific protocol are disconnected; + */ export class PeerManager { + public readonly events = new TypedEventEmitter(); + private readonly numPeersToUse: number; private readonly libp2p: Libp2p; + private readonly connectionManager: ConnectionManager; + + private readonly lockedPeers = new Set(); + private readonly unlockedPeers = new Map(); public constructor(params: PeerManagerParams) { this.onConnected = this.onConnected.bind(this); @@ -29,104 +77,197 @@ export class PeerManager { params?.config?.numPeersToUse || DEFAULT_NUM_PEERS_TO_USE; this.libp2p = params.libp2p; + this.connectionManager = params.connectionManager; } public start(): void { - this.startConnectionListener(); + this.libp2p.addEventListener( + "peer:identify", + this.onConnected as Libp2pEventHandler + ); + this.libp2p.addEventListener( + "peer:disconnect", + this.onDisconnected as Libp2pEventHandler + ); } public stop(): void { - this.stopConnectionListener(); + this.libp2p.removeEventListener( + "peer:identify", + this.onConnected as Libp2pEventHandler + ); + this.libp2p.removeEventListener( + "peer:disconnect", + this.onDisconnected as Libp2pEventHandler + ); } - public getPeers(): PeerId[] { - return this.getLockedConnections().map((c) => c.remotePeer); - } + public async getPeers(params: GetPeersParams): Promise { + log.info( + `Getting peers for protocol: ${params.protocol}, pubsubTopic: ${params.pubsubTopic}` + ); - public requestRenew(peerId: PeerId | string): PeerId | undefined { - const lockedConnections = this.getLockedConnections(); - const neededPeers = this.numPeersToUse - lockedConnections.length; + const connectedPeers = await this.connectionManager.getConnectedPeers(); + log.info(`Found ${connectedPeers.length} connected peers`); - if (neededPeers === 0) { - return; - } + let results: Peer[] = []; - const connections = this.getUnlockedConnections() - .filter((c) => !c.remotePeer.equals(peerId)) - .slice(0, neededPeers) - .map((c) => this.lockConnection(c)) - .map((c) => c.remotePeer); - - const newPeerId = connections[0]; - - if (!newPeerId) { - log.warn( - `requestRenew: Couldn't renew peer ${peerId.toString()} - no peers.` + for (const peer of connectedPeers) { + const hasProtocol = this.hasPeerProtocol(peer, params.protocol); + const hasSamePubsub = await this.connectionManager.isPeerOnPubsubTopic( + peer.id, + params.pubsubTopic ); + const isPeerAvailableForUse = this.isPeerAvailableForUse(peer.id); + + if (hasProtocol && hasSamePubsub && isPeerAvailableForUse) { + results.push(peer); + log.info(`Peer ${peer.id} qualifies for protocol ${params.protocol}`); + } + } + + const lockedPeers = results.filter((p) => this.isPeerLocked(p.id)); + log.info( + `Found ${lockedPeers.length} locked peers out of ${results.length} qualifying peers` + ); + + if (lockedPeers.length >= this.numPeersToUse) { + const selectedPeers = lockedPeers + .slice(0, this.numPeersToUse) + .map((p) => p.id); + + log.info( + `Using ${selectedPeers.length} locked peers: ${selectedPeers.map((p) => p.toString())}` + ); + + return selectedPeers; + } + + const notLockedPeers = results.filter((p) => !this.isPeerLocked(p.id)); + log.info( + `Found ${notLockedPeers.length} unlocked peers, need ${this.numPeersToUse - lockedPeers.length} more` + ); + + results = [...lockedPeers, ...notLockedPeers] + .slice(0, this.numPeersToUse) + .map((p) => { + this.lockPeer(p.id); + return p; + }); + + const finalPeers = results.map((p) => p.id); + + log.info( + `Selected ${finalPeers.length} peers: ${finalPeers.map((p) => p.toString())}` + ); + return finalPeers; + } + + public async renewPeer(id: PeerId, params: GetPeersParams): Promise { + log.info( + `Renewing peer ${id} for protocol: ${params.protocol}, pubsubTopic: ${params.pubsubTopic}` + ); + + const connectedPeers = await this.connectionManager.getConnectedPeers(); + const renewedPeer = connectedPeers.find((p) => p.id.equals(id)); + + if (!renewedPeer) { + log.warn(`Cannot renew peer:${id}, no connection to the peer.`); return; } log.info( - `requestRenew: Renewed peer ${peerId.toString()} to ${newPeerId.toString()}` + `Found peer ${id} in connected peers, unlocking and getting new peers` + ); + this.unlockPeer(renewedPeer.id); + await this.getPeers(params); + } + + public async isPeerOnPubsub( + id: PeerId, + pubsubTopic: string + ): Promise { + return this.connectionManager.isPeerOnPubsubTopic(id, pubsubTopic); + } + + private async onConnected(event: CustomEvent): Promise { + const result = event.detail; + const isFilterPeer = result.protocols.includes( + this.matchProtocolToCodec(Protocols.Filter) ); - return newPeerId; + if (isFilterPeer) { + this.dispatchFilterPeerConnect(result.peerId); + } } - private startConnectionListener(): void { - this.libp2p.addEventListener("peer:connect", this.onConnected); - this.libp2p.addEventListener("peer:disconnect", this.onDisconnected); - } - - private stopConnectionListener(): void { - this.libp2p.removeEventListener("peer:connect", this.onConnected); - this.libp2p.removeEventListener("peer:disconnect", this.onDisconnected); - } - - private onConnected(event: CustomEvent): void { + private async onDisconnected(event: CustomEvent): Promise { const peerId = event.detail; - this.lockPeerIfNeeded(peerId); + + try { + // we need to read from peerStore as peer is already disconnected + const peer = await this.libp2p.peerStore.get(peerId); + const isFilterPeer = this.hasPeerProtocol(peer, Protocols.Filter); + + if (isFilterPeer) { + this.dispatchFilterPeerDisconnect(peer.id); + } + } catch (error) { + log.error(`Failed to dispatch Filter disconnect event:${error}`); + } } - private onDisconnected(event: CustomEvent): void { - const peerId = event.detail; - this.requestRenew(peerId); + private hasPeerProtocol(peer: Peer, protocol: Protocols): boolean { + return peer.protocols.includes(this.matchProtocolToCodec(protocol)); } - private lockPeerIfNeeded(peerId: PeerId): void { - const lockedConnections = this.getLockedConnections(); - const neededPeers = this.numPeersToUse - lockedConnections.length; + private lockPeer(id: PeerId): void { + log.info(`Locking peer ${id}`); + this.lockedPeers.add(id.toString()); + this.unlockedPeers.delete(id.toString()); + } - if (neededPeers === 0) { - return; + private isPeerLocked(id: PeerId): boolean { + return this.lockedPeers.has(id.toString()); + } + + private unlockPeer(id: PeerId): void { + log.info(`Unlocking peer ${id}`); + this.lockedPeers.delete(id.toString()); + this.unlockedPeers.set(id.toString(), Date.now()); + } + + private isPeerAvailableForUse(id: PeerId): boolean { + const value = this.unlockedPeers.get(id.toString()); + + if (!value) { + return true; } - this.getUnlockedConnections() - .filter((c) => c.remotePeer.equals(peerId)) - .map((c) => this.lockConnection(c)); + const wasUnlocked = new Date(value).getTime(); + return Date.now() - wasUnlocked >= 10_000 ? true : false; } - private getLockedConnections(): Connection[] { - return this.libp2p - .getConnections() - .filter((c) => c.status === "open" && this.isConnectionLocked(c)); - } - - private getUnlockedConnections(): Connection[] { - return this.libp2p - .getConnections() - .filter((c) => c.status === "open" && !this.isConnectionLocked(c)); - } - - private lockConnection(c: Connection): Connection { - log.info( - `requestRenew: Locking connection for peerId=${c.remotePeer.toString()}` + private dispatchFilterPeerConnect(id: PeerId): void { + this.events.dispatchEvent( + new CustomEvent(PeerManagerEventNames.Connect, { detail: id }) ); - c.tags.push(CONNECTION_LOCK_TAG); - return c; } - private isConnectionLocked(c: Connection): boolean { - return c.tags.includes(CONNECTION_LOCK_TAG); + private dispatchFilterPeerDisconnect(id: PeerId): void { + this.events.dispatchEvent( + new CustomEvent(PeerManagerEventNames.Disconnect, { detail: id }) + ); + } + + private matchProtocolToCodec(protocol: Protocols): string { + const protocolToCodec = { + [Protocols.Filter]: FilterCodecs.SUBSCRIBE, + [Protocols.LightPush]: LightPushCodec, + [Protocols.Store]: StoreCodec, + [Protocols.Relay]: "" + }; + + return protocolToCodec[protocol]; } } diff --git a/packages/sdk/src/store/store.ts b/packages/sdk/src/store/store.ts index 07bcbe6515..56bd6ad378 100644 --- a/packages/sdk/src/store/store.ts +++ b/packages/sdk/src/store/store.ts @@ -1,31 +1,27 @@ -import type { Peer, PeerId } from "@libp2p/interface"; +import type { PeerId } from "@libp2p/interface"; import { peerIdFromString } from "@libp2p/peer-id"; import { multiaddr } from "@multiformats/multiaddr"; -import { - ConnectionManager, - messageHash, - StoreCodec, - StoreCore -} from "@waku/core"; +import { ConnectionManager, messageHash, StoreCore } from "@waku/core"; import { IDecodedMessage, IDecoder, IStore, Libp2p, + Protocols, QueryRequestParams, StoreCursor, StoreProtocolOptions } from "@waku/interfaces"; -import { ensurePubsubTopicIsConfigured, isDefined, Logger } from "@waku/utils"; +import { isDefined, Logger } from "@waku/utils"; import { PeerManager } from "../peer_manager/index.js"; const log = new Logger("waku:store:sdk"); type StoreConstructorParams = { - connectionManager: ConnectionManager; libp2p: Libp2p; peerManager: PeerManager; + connectionManager: ConnectionManager; options?: Partial; }; @@ -36,18 +32,17 @@ type StoreConstructorParams = { export class Store implements IStore { private readonly options: Partial; private readonly libp2p: Libp2p; + private readonly peerManager: PeerManager; private readonly connectionManager: ConnectionManager; private readonly protocol: StoreCore; public constructor(params: StoreConstructorParams) { this.options = params.options || {}; + this.peerManager = params.peerManager; this.connectionManager = params.connectionManager; this.libp2p = params.libp2p; - this.protocol = new StoreCore( - params.connectionManager.pubsubTopics, - params.libp2p - ); + this.protocol = new StoreCore(params.libp2p); } public get multicodec(): string { @@ -234,11 +229,14 @@ export class Store implements IStore { } const pubsubTopicForQuery = uniquePubsubTopicsInQuery[0]; + const isPubsubSupported = + this.connectionManager.pubsubTopics.includes(pubsubTopicForQuery); - ensurePubsubTopicIsConfigured( - pubsubTopicForQuery, - this.protocol.pubsubTopics - ); + if (!isPubsubSupported) { + throw new Error( + `Pubsub topic ${pubsubTopicForQuery} has not been configured on this instance. Configured topics are: ${this.connectionManager.pubsubTopics}` + ); + } const decodersAsMap = new Map(); decoders.forEach((dec) => { @@ -268,29 +266,32 @@ export class Store implements IStore { } private async getPeerToUse(pubsubTopic: string): Promise { - const peers = await this.filterConnectedPeers(pubsubTopic); + const peers = await this.peerManager.getPeers({ + protocol: Protocols.Store, + pubsubTopic + }); const peer = this.options.peers ? await this.getPeerFromConfigurationOrFirst(peers, this.options.peers) - : peers[0]?.id; + : peers[0]; return peer; } private async getPeerFromConfigurationOrFirst( - peers: Peer[], + peerIds: PeerId[], configPeers: string[] ): Promise { const storeConfigPeers = configPeers.map(multiaddr); const missing = []; for (const peer of storeConfigPeers) { - const matchedPeer = peers.find( - (p) => p.id.toString() === peer.getPeerId()?.toString() + const matchedPeer = peerIds.find( + (id) => id.toString() === peer.getPeerId()?.toString() ); if (matchedPeer) { - return matchedPeer.id; + return matchedPeer; } missing.push(peer); @@ -320,28 +321,6 @@ export class Store implements IStore { `Passed node to use for Store not found: ${configPeers.toString()}. Attempting to use first available peers.` ); - return peers[0]?.id; - } - - private async filterConnectedPeers(pubsubTopic: string): Promise { - const peers = await this.connectionManager.getConnectedPeers(); - const result: Peer[] = []; - - for (const peer of peers) { - const isStoreCodec = peer.protocols.includes(StoreCodec); - const isSameShard = await this.connectionManager.isPeerOnSameShard( - peer.id - ); - const isSamePubsub = await this.connectionManager.isPeerOnPubsubTopic( - peer.id, - pubsubTopic - ); - - if (isStoreCodec && isSameShard && isSamePubsub) { - result.push(peer); - } - } - - return result; + return peerIds[0]; } } diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index 6f86a9445b..3fab15b7d0 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -89,7 +89,8 @@ export class WakuNode implements IWaku { libp2p, config: { numPeersToUse: options.numPeersToUse - } + }, + connectionManager: this.connectionManager }); this.health = new HealthIndicator({ libp2p }); diff --git a/packages/tests/tests/filter/push.node.spec.ts b/packages/tests/tests/filter/push.node.spec.ts index 42001ebd53..61b7d57065 100644 --- a/packages/tests/tests/filter/push.node.spec.ts +++ b/packages/tests/tests/filter/push.node.spec.ts @@ -23,7 +23,7 @@ import { } from "./utils.js"; const runTests = (strictCheckNodes: boolean): void => { - describe(`Waku Filter Next: FilterPush: Multiple Nodes: Strict Checking: ${strictCheckNodes}`, function () { + describe(`Waku Filter: FilterPush: Multiple Nodes: Strict Checking: ${strictCheckNodes}`, function () { // Set the timeout for all tests in this suite. Can be overwritten at test level this.timeout(10000); let waku: LightNode; diff --git a/packages/tests/tests/filter/subscribe.node.spec.ts b/packages/tests/tests/filter/subscribe.node.spec.ts index a749f18b37..d8a0c4ea0d 100644 --- a/packages/tests/tests/filter/subscribe.node.spec.ts +++ b/packages/tests/tests/filter/subscribe.node.spec.ts @@ -39,7 +39,7 @@ import { } from "./utils.js"; const runTests = (strictCheckNodes: boolean): void => { - describe(`Waku Filter Next: Subscribe: Multiple Service Nodes: Strict Check mode: ${strictCheckNodes}`, function () { + describe(`Waku Filter: Subscribe: Multiple Service Nodes: Strict Check mode: ${strictCheckNodes}`, function () { this.timeout(100000); let waku: LightNode; let serviceNodes: ServiceNodesFleet; diff --git a/packages/tests/tests/filter/unsubscribe.node.spec.ts b/packages/tests/tests/filter/unsubscribe.node.spec.ts index 409884a8ec..27816742cd 100644 --- a/packages/tests/tests/filter/unsubscribe.node.spec.ts +++ b/packages/tests/tests/filter/unsubscribe.node.spec.ts @@ -23,7 +23,7 @@ import { } from "./utils.js"; const runTests = (strictCheckNodes: boolean): void => { - describe(`Waku Filter Next: Unsubscribe: Multiple Nodes: Strict Checking: ${strictCheckNodes}`, function () { + describe(`Waku Filter: Unsubscribe: Multiple Nodes: Strict Checking: ${strictCheckNodes}`, function () { // Set the timeout for all tests in this suite. Can be overwritten at test level this.timeout(10000); let waku: LightNode;