From 5fae073ebd67ea3b11e50bdef3a92e328067d1dc Mon Sep 17 00:00:00 2001 From: Danish Arora <35004822+danisharora099@users.noreply.github.com> Date: Tue, 24 Jan 2023 22:50:16 +0530 Subject: [PATCH] fix: edge case with peer-exchange (#1125) * address comments * rename to maxRetries * use timeout instead of interval * remove: only from test --- .../src/waku_peer_exchange_discovery.ts | 87 ++++++++++++++----- .../tests/tests/peer_exchange.node.spec.ts | 2 +- 2 files changed, 64 insertions(+), 25 deletions(-) diff --git a/packages/peer-exchange/src/waku_peer_exchange_discovery.ts b/packages/peer-exchange/src/waku_peer_exchange_discovery.ts index 88d9b0862a..db5da153aa 100644 --- a/packages/peer-exchange/src/waku_peer_exchange_discovery.ts +++ b/packages/peer-exchange/src/waku_peer_exchange_discovery.ts @@ -15,7 +15,8 @@ import { PeerExchangeCodec, WakuPeerExchange } from "./waku_peer_exchange.js"; const log = debug("waku:peer-exchange-discovery"); const DEFAULT_PEER_EXCHANGE_REQUEST_NODES = 10; -const PEER_EXCHANGE_QUERY_INTERVAL = 5 * 60 * 1000; +const DEFAULT_PEER_EXCHANGE_QUERY_INTERVAL_MS = 10 * 1000; +const DEFAULT_MAX_RETRIES = 3; export interface Options { /** @@ -32,6 +33,16 @@ export interface Options { * Cause the bootstrap peer tag to be removed after this number of ms (default: 2 minutes) */ tagTTL?: number; + /** + * The interval between queries to a peer (default: 10 seconds) + * The interval will increase by a factor of an incrementing number (starting at 1) + * until it reaches the maximum attempts before backoff + */ + queryInterval?: number; + /** + * The number of attempts before the queries to a peer are aborted (default: 3) + */ + maxRetries?: number; } const DEFAULT_BOOTSTRAP_TAG_NAME = "peer-exchange"; @@ -46,21 +57,23 @@ export class PeerExchangeDiscovery private readonly peerExchange: WakuPeerExchange; private readonly options: Options; private isStarted: boolean; - private intervals: Map = new Map(); + private queryingPeers: Set = new Set(); + private queryAttempts: Map = new Map(); private readonly eventHandler = async ( event: CustomEvent ): Promise => { const { protocols, peerId } = event.detail; - if (!protocols.includes(PeerExchangeCodec) || this.intervals.get(peerId)) + if ( + !protocols.includes(PeerExchangeCodec) || + this.queryingPeers.has(peerId.toString()) + ) return; - await this.query(peerId); - const interval = setInterval(async () => { - await this.query(peerId); - }, PEER_EXCHANGE_QUERY_INTERVAL); - - this.intervals.set(peerId, interval); + this.queryingPeers.add(peerId.toString()); + this.startRecurringQueries(peerId).catch((error) => + log(`Error querying peer ${error}`) + ); }; constructor(components: PeerExchangeComponents, options: Options = {}) { @@ -94,7 +107,7 @@ export class PeerExchangeDiscovery if (!this.isStarted) return; log("Stopping peer exchange node discovery"); this.isStarted = false; - this.intervals.forEach((interval) => clearInterval(interval)); + this.queryingPeers.clear(); this.components.peerStore.removeEventListener( "change:protocols", this.eventHandler @@ -109,6 +122,30 @@ export class PeerExchangeDiscovery return "@waku/peer-exchange"; } + private readonly startRecurringQueries = async ( + peerId: PeerId + ): Promise => { + const peerIdStr = peerId.toString(); + const { + queryInterval = DEFAULT_PEER_EXCHANGE_QUERY_INTERVAL_MS, + maxRetries = DEFAULT_MAX_RETRIES, + } = this.options; + + await this.query(peerId); + + const currentAttempt = this.queryAttempts.get(peerIdStr) ?? 1; + + if (currentAttempt > maxRetries) { + this.abortQueriesForPeer(peerIdStr); + return; + } + + setTimeout(async () => { + this.queryAttempts.set(peerIdStr, currentAttempt + 1); + await this.startRecurringQueries(peerId); + }, queryInterval * currentAttempt); + }; + private query(peerId: PeerId): Promise { return this.peerExchange.query( { @@ -125,23 +162,19 @@ export class PeerExchangeDiscovery continue; } - const { peerId, multiaddrs } = ENR; + const { peerId } = ENR; + const multiaddrs = ENR.getFullMultiaddrs(); - if (!peerId) { - log("no peerId"); - continue; - } - if (!multiaddrs || multiaddrs.length === 0) { - log("no multiaddrs"); - continue; - } + if (!peerId || !multiaddrs || multiaddrs.length === 0) continue; - // check if peer is already in peerStore - const existingPeer = await this.components.peerStore.get(peerId); - if (existingPeer) { - log("peer already in peerStore"); + if (await this.components.peerStore.has(peerId)) continue; + + if ( + (await this.components.peerStore.getTags(peerId)).find( + ({ name }) => name === DEFAULT_BOOTSTRAP_TAG_NAME + ) + ) continue; - } await this.components.peerStore.tagPeer( peerId, @@ -165,6 +198,12 @@ export class PeerExchangeDiscovery } ); } + + private abortQueriesForPeer(peerIdStr: string): void { + log(`Aborting queries for peer: ${peerIdStr}`); + this.queryingPeers.delete(peerIdStr); + this.queryAttempts.delete(peerIdStr); + } } export function wakuPeerExchangeDiscovery(): ( diff --git a/packages/tests/tests/peer_exchange.node.spec.ts b/packages/tests/tests/peer_exchange.node.spec.ts index 6043a6f73f..1629c496e3 100644 --- a/packages/tests/tests/peer_exchange.node.spec.ts +++ b/packages/tests/tests/peer_exchange.node.spec.ts @@ -31,7 +31,7 @@ describe("Peer Exchange", () => { }); await waku.start(); - await delay(1000); + await delay(100000); await waitForRemotePeer(waku, [Protocols.PeerExchange]); const pxPeers = await waku.peerExchange.peers();