diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index 4143056599..234a43f60f 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -1,5 +1,5 @@ import type { Libp2p } from "@libp2p/interface"; -import type { Peer, Stream } from "@libp2p/interface"; +import type { PeerId, Stream } from "@libp2p/interface"; import type { IBaseProtocolCore, Libp2pComponents, @@ -38,7 +38,7 @@ export class BaseProtocol implements IBaseProtocolCore { ); } - protected async getStream(peer: Peer): Promise { - return this.streamManager.getStream(peer); + protected async getStream(peerId: PeerId): Promise { + return this.streamManager.getStream(peerId); } } diff --git a/packages/core/src/lib/filter/filter.ts b/packages/core/src/lib/filter/filter.ts index 09bbef9e62..5c672a28fc 100644 --- a/packages/core/src/lib/filter/filter.ts +++ b/packages/core/src/lib/filter/filter.ts @@ -1,4 +1,4 @@ -import type { Peer, Stream } from "@libp2p/interface"; +import type { PeerId, Stream } from "@libp2p/interface"; import type { IncomingStreamData } from "@libp2p/interface-internal"; import { type ContentTopic, @@ -53,10 +53,10 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { public async subscribe( pubsubTopic: PubsubTopic, - peer: Peer, + peerId: PeerId, contentTopics: ContentTopic[] ): Promise { - const stream = await this.getStream(peer); + const stream = await this.getStream(peerId); const request = FilterSubscribeRpc.createSubscribeRequest( pubsubTopic, @@ -78,7 +78,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { success: null, failure: { error: ProtocolError.GENERIC_FAIL, - peerId: peer.id + peerId: peerId } }; } @@ -93,7 +93,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { return { failure: { error: ProtocolError.REMOTE_PEER_REJECTED, - peerId: peer.id + peerId: peerId }, success: null }; @@ -101,28 +101,28 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { return { failure: null, - success: peer.id + success: peerId }; } public async unsubscribe( pubsubTopic: PubsubTopic, - peer: Peer, + peerId: PeerId, contentTopics: ContentTopic[] ): Promise { let stream: Stream | undefined; try { - stream = await this.getStream(peer); + stream = await this.getStream(peerId); } catch (error) { log.error( - `Failed to get a stream for remote peer${peer.id.toString()}`, + `Failed to get a stream for remote peer${peerId.toString()}`, error ); return { success: null, failure: { error: ProtocolError.NO_STREAM_AVAILABLE, - peerId: peer.id + peerId: peerId } }; } @@ -140,22 +140,22 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { success: null, failure: { error: ProtocolError.GENERIC_FAIL, - peerId: peer.id + peerId: peerId } }; } return { - success: peer.id, + success: peerId, failure: null }; } public async unsubscribeAll( pubsubTopic: PubsubTopic, - peer: Peer + peerId: PeerId ): Promise { - const stream = await this.getStream(peer); + const stream = await this.getStream(peerId); const request = FilterSubscribeRpc.createUnsubscribeAllRequest(pubsubTopic); @@ -171,7 +171,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { return { failure: { error: ProtocolError.NO_RESPONSE, - peerId: peer.id + peerId: peerId }, success: null }; @@ -187,7 +187,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { return { failure: { error: ProtocolError.REMOTE_PEER_REJECTED, - peerId: peer.id + peerId: peerId }, success: null }; @@ -195,24 +195,24 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { return { failure: null, - success: peer.id + success: peerId }; } - public async ping(peer: Peer): Promise { + public async ping(peerId: PeerId): Promise { let stream: Stream | undefined; try { - stream = await this.getStream(peer); + stream = await this.getStream(peerId); } catch (error) { log.error( - `Failed to get a stream for remote peer${peer.id.toString()}`, + `Failed to get a stream for remote peer${peerId.toString()}`, error ); return { success: null, failure: { error: ProtocolError.NO_STREAM_AVAILABLE, - peerId: peer.id + peerId: peerId } }; } @@ -234,7 +234,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { success: null, failure: { error: ProtocolError.GENERIC_FAIL, - peerId: peer.id + peerId: peerId } }; } @@ -244,7 +244,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { success: null, failure: { error: ProtocolError.NO_RESPONSE, - peerId: peer.id + peerId: peerId } }; } @@ -260,12 +260,12 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { success: null, failure: { error: ProtocolError.REMOTE_PEER_REJECTED, - peerId: peer.id + peerId: peerId } }; } return { - success: peer.id, + success: peerId, failure: null }; } diff --git a/packages/core/src/lib/light_push/light_push.ts b/packages/core/src/lib/light_push/light_push.ts index d649df7a21..ca132a5607 100644 --- a/packages/core/src/lib/light_push/light_push.ts +++ b/packages/core/src/lib/light_push/light_push.ts @@ -1,4 +1,4 @@ -import type { Peer, Stream } from "@libp2p/interface"; +import type { PeerId, Stream } from "@libp2p/interface"; import { type CoreProtocolResult, type IBaseProtocolCore, @@ -76,11 +76,10 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore { } } - // TODO(weboko): use peer.id as parameter instead public async send( encoder: IEncoder, message: IMessage, - peer: Peer + peerId: PeerId ): Promise { const { query, error: preparationError } = await this.preparePushMessage( encoder, @@ -92,21 +91,21 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore { success: null, failure: { error: preparationError, - peerId: peer.id + peerId } }; } let stream: Stream; try { - stream = await this.getStream(peer); + stream = await this.getStream(peerId); } catch (error) { log.error("Failed to get stream", error); return { success: null, failure: { error: ProtocolError.NO_STREAM_AVAILABLE, - peerId: peer.id + peerId: peerId } }; } @@ -126,7 +125,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore { success: null, failure: { error: ProtocolError.GENERIC_FAIL, - peerId: peer.id + peerId: peerId } }; } @@ -145,7 +144,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore { success: null, failure: { error: ProtocolError.DECODE_FAILED, - peerId: peer.id + peerId: peerId } }; } @@ -156,7 +155,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore { success: null, failure: { error: ProtocolError.NO_RESPONSE, - peerId: peer.id + peerId: peerId } }; } @@ -168,7 +167,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore { success: null, failure: { error: rlnErrorCase, - peerId: peer.id + peerId: peerId } }; } @@ -179,11 +178,11 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore { success: null, failure: { error: ProtocolError.REMOTE_PEER_REJECTED, - peerId: peer.id + peerId: peerId } }; } - return { success: peer.id, failure: null }; + return { success: peerId, failure: null }; } } diff --git a/packages/core/src/lib/metadata/metadata.ts b/packages/core/src/lib/metadata/metadata.ts index 2b3bfe2532..f64596c0a4 100644 --- a/packages/core/src/lib/metadata/metadata.ts +++ b/packages/core/src/lib/metadata/metadata.ts @@ -55,7 +55,7 @@ class Metadata extends BaseProtocol implements IMetadata { let stream; try { - stream = await this.getStream(peer); + stream = await this.getStream(peerId); } catch (error) { log.error("Failed to get stream", error); return { diff --git a/packages/core/src/lib/store/store.ts b/packages/core/src/lib/store/store.ts index 830a7b5100..f3b5fdba37 100644 --- a/packages/core/src/lib/store/store.ts +++ b/packages/core/src/lib/store/store.ts @@ -1,4 +1,4 @@ -import type { Peer } from "@libp2p/interface"; +import type { PeerId } from "@libp2p/interface"; import { IDecodedMessage, IDecoder, @@ -38,7 +38,7 @@ export class StoreCore extends BaseProtocol implements IStoreCore { public async *queryPerPage( queryOpts: QueryRequestParams, decoders: Map>, - peer: Peer + peerId: PeerId ): AsyncGenerator[]> { if ( queryOpts.contentTopics.toString() !== @@ -58,7 +58,7 @@ export class StoreCore extends BaseProtocol implements IStoreCore { let stream; try { - stream = await this.getStream(peer); + stream = await this.getStream(peerId); } catch (e) { log.error("Failed to get stream", e); break; diff --git a/packages/core/src/lib/stream_manager/stream_manager.spec.ts b/packages/core/src/lib/stream_manager/stream_manager.spec.ts index e66b199d2e..3896ab3053 100644 --- a/packages/core/src/lib/stream_manager/stream_manager.spec.ts +++ b/packages/core/src/lib/stream_manager/stream_manager.spec.ts @@ -36,7 +36,7 @@ describe("StreamManager", () => { streamManager["getConnections"] = (_peerId: PeerId | undefined) => [con1]; - const stream = await streamManager.getStream(mockPeer); + const stream = await streamManager.getStream(mockPeer.id); expect(stream).not.to.be.undefined; expect(stream?.id).to.be.eq("1"); @@ -48,7 +48,7 @@ describe("StreamManager", () => { let error: Error | undefined; try { - await streamManager.getStream(mockPeer); + await streamManager.getStream(mockPeer.id); } catch (e) { error = e as Error; } @@ -76,7 +76,7 @@ describe("StreamManager", () => { con1.newStream = newStreamSpy; streamManager["getConnections"] = (_peerId: PeerId | undefined) => [con1]; - const stream = await streamManager.getStream(mockPeer); + const stream = await streamManager.getStream(mockPeer.id); expect(stream).not.to.be.undefined; expect(stream?.id).to.be.eq("2"); @@ -102,8 +102,8 @@ describe("StreamManager", () => { streamManager["getConnections"] = (_peerId: PeerId | undefined) => [con1]; const [stream1, stream2] = await Promise.all([ - streamManager.getStream(mockPeer), - streamManager.getStream(mockPeer) + streamManager.getStream(mockPeer.id), + streamManager.getStream(mockPeer.id) ]); const expected = ["1", "2"].toString(); diff --git a/packages/core/src/lib/stream_manager/stream_manager.ts b/packages/core/src/lib/stream_manager/stream_manager.ts index e1da56453d..e533c3f0d7 100644 --- a/packages/core/src/lib/stream_manager/stream_manager.ts +++ b/packages/core/src/lib/stream_manager/stream_manager.ts @@ -21,39 +21,38 @@ export class StreamManager { this.addEventListener("peer:update", this.handlePeerUpdateStreamPool); } - public async getStream(peer: Peer): Promise { - const peerId = peer.id.toString(); - - const scheduledStream = this.streamPool.get(peerId); + public async getStream(peerId: PeerId): Promise { + const peerIdStr = peerId.toString(); + const scheduledStream = this.streamPool.get(peerIdStr); if (scheduledStream) { - this.streamPool.delete(peerId); + this.streamPool.delete(peerIdStr); await scheduledStream; } - let stream = this.getOpenStreamForCodec(peer.id); + let stream = this.getOpenStreamForCodec(peerId); if (stream) { this.log.info( - `Found existing stream peerId=${peer.id.toString()} multicodec=${this.multicodec}` + `Found existing stream peerId=${peerIdStr} multicodec=${this.multicodec}` ); - this.lockStream(peer.id.toString(), stream); + this.lockStream(peerIdStr, stream); return stream; } - stream = await this.createStream(peer); - this.lockStream(peer.id.toString(), stream); + stream = await this.createStream(peerId); + this.lockStream(peerIdStr, stream); return stream; } - private async createStream(peer: Peer, retries = 0): Promise { - const connections = this.getConnections(peer.id); + private async createStream(peerId: PeerId, retries = 0): Promise { + const connections = this.getConnections(peerId); const connection = selectOpenConnection(connections); if (!connection) { throw new Error( - `Failed to get a connection to the peer peerId=${peer.id.toString()} multicodec=${this.multicodec}` + `Failed to get a connection to the peer peerId=${peerId.toString()} multicodec=${this.multicodec}` ); } @@ -63,11 +62,11 @@ export class StreamManager { for (let i = 0; i < retries + 1; i++) { try { this.log.info( - `Attempting to create a stream for peerId=${peer.id.toString()} multicodec=${this.multicodec}` + `Attempting to create a stream for peerId=${peerId.toString()} multicodec=${this.multicodec}` ); stream = await connection.newStream(this.multicodec); this.log.info( - `Created stream for peerId=${peer.id.toString()} multicodec=${this.multicodec}` + `Created stream for peerId=${peerId.toString()} multicodec=${this.multicodec}` ); break; } catch (error) { @@ -77,8 +76,7 @@ export class StreamManager { if (!stream) { throw new Error( - `Failed to create a new stream for ${peer.id.toString()} -- ` + - lastError + `Failed to create a new stream for ${peerId.toString()} -- ` + lastError ); } @@ -97,7 +95,7 @@ export class StreamManager { try { this.ongoingCreation.add(peerId); - await this.createStream(peer); + await this.createStream(peer.id); } catch (error) { this.log.error(`Failed to createStreamWithLock:`, error); } finally { diff --git a/packages/discovery/src/peer-exchange/waku_peer_exchange.ts b/packages/discovery/src/peer-exchange/waku_peer_exchange.ts index 43520e9bde..5c2ab30615 100644 --- a/packages/discovery/src/peer-exchange/waku_peer_exchange.ts +++ b/packages/discovery/src/peer-exchange/waku_peer_exchange.ts @@ -57,7 +57,7 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange { let stream; try { - stream = await this.getStream(peer); + stream = await this.getStream(peerId); } catch (err) { log.error("Failed to get stream", err); return { diff --git a/packages/sdk/src/filter/filter.ts b/packages/sdk/src/filter/filter.ts index 1f7c513b7f..13203e66c0 100644 --- a/packages/sdk/src/filter/filter.ts +++ b/packages/sdk/src/filter/filter.ts @@ -157,8 +157,8 @@ class Filter implements IFilter { ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics); - const peers = await this.peerManager.getPeers(); - if (peers.length === 0) { + const peerIds = await this.peerManager.getPeers(); + if (peerIds.length === 0) { return { error: ProtocolError.NO_PEER_AVAILABLE, subscription: null @@ -166,8 +166,8 @@ class Filter implements IFilter { } log.info( - `Creating filter subscription with ${peers.length} peers: `, - peers.map((peer) => peer.id.toString()) + `Creating filter subscription with ${peerIds.length} peers: `, + peerIds.map((id) => id.toString()) ); const subscription = diff --git a/packages/sdk/src/filter/subscription_monitor.ts b/packages/sdk/src/filter/subscription_monitor.ts index 3cf65088d1..116a8e8022 100644 --- a/packages/sdk/src/filter/subscription_monitor.ts +++ b/packages/sdk/src/filter/subscription_monitor.ts @@ -1,4 +1,4 @@ -import type { EventHandler, Peer, PeerId } from "@libp2p/interface"; +import type { EventHandler, PeerId } from "@libp2p/interface"; import { FilterCore } from "@waku/core"; import type { FilterProtocolOptions, @@ -31,7 +31,7 @@ export class SubscriptionMonitor { * Cached peers that are in use by subscription. * Needed to understand if they disconnect later or not. */ - public peers: Peer[] = []; + public peerIds: PeerId[] = []; private isStarted: boolean = false; @@ -108,12 +108,12 @@ export class SubscriptionMonitor { * Method to get peers that are used by particular subscription or, if initially called, peers that can be used by subscription. * @returns array of peers */ - public async getPeers(): Promise { + public async getPeers(): Promise { if (!this.isStarted) { - this.peers = await this.peerManager.getPeers(); + this.peerIds = await this.peerManager.getPeers(); } - return this.peers; + return this.peerIds; } /** @@ -168,7 +168,7 @@ export class SubscriptionMonitor { return; } - await Promise.all(this.peers.map((peer) => this.ping(peer, true))); + await Promise.all(this.peerIds.map((id) => this.ping(id, true))); this.startKeepAlive(); } @@ -178,7 +178,7 @@ export class SubscriptionMonitor { } this.keepAliveIntervalId = setInterval(() => { - void this.peers.map((peer) => this.ping(peer)); + void this.peerIds.map((id) => this.ping(id)); }, this.config.keepAliveIntervalMs) as unknown as number; } @@ -216,40 +216,40 @@ export class SubscriptionMonitor { // this method keeps track of new connections and will trigger subscribe request if needed private async onPeerConnected(_event: CustomEvent): Promise { // TODO(weboko): use config.numOfUsedPeers instead of this.peers - const hasSomePeers = this.peers.length > 0; + const hasSomePeers = this.peerIds.length > 0; if (hasSomePeers) { return; } - this.peers = await this.peerManager.getPeers(); - await Promise.all(this.peers.map((peer) => this.subscribe(peer))); + this.peerIds = await this.peerManager.getPeers(); + await Promise.all(this.peerIds.map((id) => this.subscribe(id))); } // this method keeps track of disconnects and will trigger subscribe request if needed private async onPeerDisconnected(event: CustomEvent): Promise { - const hasNotBeenUsed = !this.peers.find((p) => p.id.equals(event.detail)); + const hasNotBeenUsed = !this.peerIds.find((id) => id.equals(event.detail)); if (hasNotBeenUsed) { return; } - this.peers = await this.peerManager.getPeers(); + this.peerIds = await this.peerManager.getPeers(); // we trigger subscribe for peer that was used before // it will expectedly fail and we will initiate addition of a new peer - await Promise.all(this.peers.map((peer) => this.subscribe(peer))); + await Promise.all(this.peerIds.map((id) => this.subscribe(id))); } - private async subscribe(_peer: Peer | undefined): Promise { - let peer: Peer | undefined = _peer; + private async subscribe(_peerId: PeerId | undefined): Promise { + let peerId: PeerId | undefined = _peerId; for (let i = 0; i < MAX_SUBSCRIBE_ATTEMPTS; i++) { - if (!peer) { + if (!peerId) { return; } const response = await this.filter.subscribe( this.pubsubTopic, - peer, + peerId, Array.from(this.activeSubscriptions.keys()) ); @@ -257,19 +257,19 @@ export class SubscriptionMonitor { return; } - peer = await this.peerManager.requestRenew(peer.id); + peerId = await this.peerManager.requestRenew(peerId); } } private async ping( - peer: Peer, + peerId: PeerId, renewOnFirstFail: boolean = false ): Promise { - const peerIdStr = peer.id.toString(); - const response = await this.filter.ping(peer); + const peerIdStr = peerId.toString(); + const response = await this.filter.ping(peerId); if (response.failure && renewOnFirstFail) { - const newPeer = await this.peerManager.requestRenew(peer.id); + const newPeer = await this.peerManager.requestRenew(peerId); await this.subscribe(newPeer); return; } @@ -286,7 +286,7 @@ export class SubscriptionMonitor { const madeAttempts = this.pingFailedAttempts.get(peerIdStr) || 0; if (madeAttempts >= this.config.pingsBeforePeerRenewed) { - const newPeer = await this.peerManager.requestRenew(peer.id); + const newPeer = await this.peerManager.requestRenew(peerId); await this.subscribe(newPeer); } } diff --git a/packages/sdk/src/light_push/light_push.spec.ts b/packages/sdk/src/light_push/light_push.spec.ts index a461c89828..f6d52a8f3d 100644 --- a/packages/sdk/src/light_push/light_push.spec.ts +++ b/packages/sdk/src/light_push/light_push.spec.ts @@ -1,4 +1,4 @@ -import { Peer } from "@libp2p/interface"; +import { Peer, PeerId } from "@libp2p/interface"; import { ConnectionManager, createEncoder, @@ -59,8 +59,8 @@ describe("LightPush SDK", () => { lightPush = mockLightPush({ libp2p, numPeersToUse: 2 }); let sendSpy = sinon.spy( - (_encoder: any, _message: any, peer: Peer) => - ({ success: peer.id }) as any + (_encoder: any, _message: any, peerId: PeerId) => + ({ success: peerId }) as any ); lightPush.protocol.send = sendSpy; @@ -74,8 +74,8 @@ describe("LightPush SDK", () => { // check if setting another value works lightPush = mockLightPush({ libp2p, numPeersToUse: 3 }); sendSpy = sinon.spy( - (_encoder: any, _message: any, peer: Peer) => - ({ success: peer.id }) as any + (_encoder: any, _message: any, peerId: PeerId) => + ({ success: peerId }) as any ); lightPush.protocol.send = sendSpy; @@ -91,9 +91,9 @@ describe("LightPush SDK", () => { }); lightPush = mockLightPush({ libp2p }); - let sendSpy = sinon.spy((_encoder: any, _message: any, peer: Peer) => { - if (peer.id.toString() === "1") { - return { success: peer.id }; + let sendSpy = sinon.spy((_encoder: any, _message: any, peerId: PeerId) => { + if (peerId.toString() === "1") { + return { success: peerId }; } return { failure: { error: "problem" } }; @@ -108,7 +108,7 @@ describe("LightPush SDK", () => { { autoRetry: true } ); - expect(attemptRetriesSpy.calledOnce).to.be.true; + expect(attemptRetriesSpy.callCount).to.be.eq(1); expect(result.successes?.length).to.be.eq(1); expect(result.failures?.length).to.be.eq(1); @@ -162,7 +162,6 @@ function mockLightPush(options: MockLightPushOptions): LightPush { getPeers: () => options.libp2p .getPeers() - .map((id) => mockPeer(id.toString())) .slice(0, options.numPeersToUse || options.libp2p.getPeers().length) } as unknown as PeerManager, options.libp2p diff --git a/packages/sdk/src/light_push/light_push.ts b/packages/sdk/src/light_push/light_push.ts index 0c71cc8012..5b3f6b67be 100644 --- a/packages/sdk/src/light_push/light_push.ts +++ b/packages/sdk/src/light_push/light_push.ts @@ -1,4 +1,4 @@ -import type { Peer, PeerId } from "@libp2p/interface"; +import type { PeerId } from "@libp2p/interface"; import { ConnectionManager, getHealthManager, LightPushCore } from "@waku/core"; import { type CoreProtocolResult, @@ -23,7 +23,7 @@ const DEFAULT_SEND_OPTIONS: ISenderOptions = { maxAttempts: DEFAULT_MAX_ATTEMPTS }; -type RetryCallback = (peer: Peer) => Promise; +type RetryCallback = (peerId: PeerId) => Promise; export class LightPush implements ILightPush { public readonly protocol: LightPushCore; @@ -59,8 +59,8 @@ export class LightPush implements ILightPush { }; } - const peers = await this.peerManager.getPeers(); - if (peers.length === 0) { + const peerIds = await this.peerManager.getPeers(); + if (peerIds.length === 0) { return { successes, failures: [ @@ -72,7 +72,7 @@ export class LightPush implements ILightPush { } const results = await Promise.allSettled( - peers.map((peer) => this.protocol.send(encoder, message, peer)) + peerIds.map((id) => this.protocol.send(encoder, message, id)) ); for (const result of results) { @@ -94,7 +94,7 @@ export class LightPush implements ILightPush { if (options?.autoRetry) { void this.attemptRetries( - (peer: Peer) => this.protocol.send(encoder, message, peer), + (id: PeerId) => this.protocol.send(encoder, message, id), options.maxAttempts ); } @@ -117,23 +117,23 @@ export class LightPush implements ILightPush { maxAttempts?: number ): Promise { maxAttempts = maxAttempts || DEFAULT_MAX_ATTEMPTS; - const connectedPeers = await this.peerManager.getPeers(); + const peerIds = await this.peerManager.getPeers(); - if (connectedPeers.length === 0) { + if (peerIds.length === 0) { log.warn("Cannot retry with no connected peers."); return; } for (let i = 0; i < maxAttempts; i++) { - const peer = connectedPeers[i % connectedPeers.length]; // always present as we checked for the length already - const response = await fn(peer); + const id = peerIds[i % peerIds.length]; // always present as we checked for the length already + const response = await fn(id); if (response.success) { return; } log.info( - `Attempted retry for peer:${peer.id} failed with:${response?.failure?.error}` + `Attempted retry for peer:${id} failed with:${response?.failure?.error}` ); } } diff --git a/packages/sdk/src/peer_manager/peer_manager.spec.ts b/packages/sdk/src/peer_manager/peer_manager.spec.ts index 2930d90405..e496f4ccfc 100644 --- a/packages/sdk/src/peer_manager/peer_manager.spec.ts +++ b/packages/sdk/src/peer_manager/peer_manager.spec.ts @@ -48,9 +48,9 @@ describe("PeerManager", () => { ]; sinon.stub(libp2p, "getConnections").returns(connections); - const peer = await peerManager.requestRenew("1"); - expect(peer).to.not.be.undefined; - expect(peer?.id).to.not.equal("1"); + const peerId = await peerManager.requestRenew("1"); + expect(peerId).to.not.be.undefined; + expect(peerId).to.not.equal("1"); }); it("should handle connection events", () => { diff --git a/packages/sdk/src/peer_manager/peer_manager.ts b/packages/sdk/src/peer_manager/peer_manager.ts index 4b874ce89c..3a7c704df2 100644 --- a/packages/sdk/src/peer_manager/peer_manager.ts +++ b/packages/sdk/src/peer_manager/peer_manager.ts @@ -1,4 +1,4 @@ -import { Connection, Peer, PeerId } from "@libp2p/interface"; +import { Connection, PeerId } from "@libp2p/interface"; import { Libp2p } from "@waku/interfaces"; import { Logger } from "@waku/utils"; @@ -37,15 +37,13 @@ export class PeerManager { this.stopConnectionListener(); } - public async getPeers(): Promise { - return Promise.all( - this.getLockedConnections().map((c) => this.mapConnectionToPeer(c)) - ); + public async getPeers(): Promise { + return Promise.all(this.getLockedConnections().map((c) => c.remotePeer)); } public async requestRenew( peerId: PeerId | string - ): Promise { + ): Promise { const lockedConnections = this.getLockedConnections(); const neededPeers = this.numPeersToUse - lockedConnections.length; @@ -58,12 +56,12 @@ export class PeerManager { .filter((c) => !c.remotePeer.equals(peerId)) .slice(0, neededPeers) .map((c) => this.lockConnection(c)) - .map((c) => this.mapConnectionToPeer(c)) + .map((c) => c.remotePeer) ); - const newPeer = result[0]; + const newPeerId = result[0]; - if (!newPeer) { + if (!newPeerId) { log.warn( `requestRenew: Couldn't renew peer ${peerId.toString()} - no peers.` ); @@ -71,10 +69,10 @@ export class PeerManager { } log.info( - `requestRenew: Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}` + `requestRenew: Renewed peer ${peerId.toString()} to ${newPeerId.toString()}` ); - return newPeer; + return newPeerId; } private startConnectionListener(): void { @@ -133,9 +131,4 @@ export class PeerManager { private isConnectionLocked(c: Connection): boolean { return c.tags.includes(CONNECTION_LOCK_TAG); } - - private async mapConnectionToPeer(c: Connection): Promise { - const peerId = c.remotePeer; - return this.libp2p.peerStore.get(peerId); - } } diff --git a/packages/sdk/src/store/store.ts b/packages/sdk/src/store/store.ts index dcbdd53295..2a14659875 100644 --- a/packages/sdk/src/store/store.ts +++ b/packages/sdk/src/store/store.ts @@ -1,4 +1,4 @@ -import type { Peer } from "@libp2p/interface"; +import type { PeerId } from "@libp2p/interface"; import { ConnectionManager, StoreCore } from "@waku/core"; import { IDecodedMessage, @@ -223,26 +223,29 @@ export class Store implements IStore { }; } - private async getPeerToUse(): Promise { - let peer: Peer | undefined; + private async getPeerToUse(): Promise { + let peerId: PeerId | undefined; if (this.options?.peer) { const connectedPeers = await this.connectionManager.getConnectedPeers(); - peer = connectedPeers.find((p) => p.id.toString() === this.options?.peer); + const peer = connectedPeers.find( + (p) => p.id.toString() === this.options?.peer + ); + peerId = peer?.id; - if (!peer) { + if (!peerId) { log.warn( `Passed node to use for Store not found: ${this.options.peer}. Attempting to use random peers.` ); } } - const peers = await this.peerManager.getPeers(); + const peerIds = await this.peerManager.getPeers(); - if (peers.length > 0) { + if (peerIds.length > 0) { // TODO(weboko): implement smart way of getting a peer https://github.com/waku-org/js-waku/issues/2243 - return peers[Math.floor(Math.random() * peers.length)]; + return peerIds[Math.floor(Math.random() * peerIds.length)]; } log.error("No peers available to use.");