fix: edge case with peer-exchange (#1125)

* address comments

* rename to maxRetries

* use timeout instead of interval

* remove: only from test
This commit is contained in:
Danish Arora 2023-01-24 22:50:16 +05:30 committed by GitHub
parent 4d38aaa731
commit 5fae073ebd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 64 additions and 25 deletions

View File

@ -15,7 +15,8 @@ import { PeerExchangeCodec, WakuPeerExchange } from "./waku_peer_exchange.js";
const log = debug("waku:peer-exchange-discovery"); const log = debug("waku:peer-exchange-discovery");
const DEFAULT_PEER_EXCHANGE_REQUEST_NODES = 10; const DEFAULT_PEER_EXCHANGE_REQUEST_NODES = 10;
const PEER_EXCHANGE_QUERY_INTERVAL = 5 * 60 * 1000; const DEFAULT_PEER_EXCHANGE_QUERY_INTERVAL_MS = 10 * 1000;
const DEFAULT_MAX_RETRIES = 3;
export interface Options { export interface Options {
/** /**
@ -32,6 +33,16 @@ export interface Options {
* Cause the bootstrap peer tag to be removed after this number of ms (default: 2 minutes) * Cause the bootstrap peer tag to be removed after this number of ms (default: 2 minutes)
*/ */
tagTTL?: number; tagTTL?: number;
/**
* The interval between queries to a peer (default: 10 seconds)
* The interval will increase by a factor of an incrementing number (starting at 1)
* until it reaches the maximum attempts before backoff
*/
queryInterval?: number;
/**
* The number of attempts before the queries to a peer are aborted (default: 3)
*/
maxRetries?: number;
} }
const DEFAULT_BOOTSTRAP_TAG_NAME = "peer-exchange"; const DEFAULT_BOOTSTRAP_TAG_NAME = "peer-exchange";
@ -46,21 +57,23 @@ export class PeerExchangeDiscovery
private readonly peerExchange: WakuPeerExchange; private readonly peerExchange: WakuPeerExchange;
private readonly options: Options; private readonly options: Options;
private isStarted: boolean; private isStarted: boolean;
private intervals: Map<PeerId, NodeJS.Timeout> = new Map(); private queryingPeers: Set<string> = new Set();
private queryAttempts: Map<string, number> = new Map();
private readonly eventHandler = async ( private readonly eventHandler = async (
event: CustomEvent<PeerProtocolsChangeData> event: CustomEvent<PeerProtocolsChangeData>
): Promise<void> => { ): Promise<void> => {
const { protocols, peerId } = event.detail; const { protocols, peerId } = event.detail;
if (!protocols.includes(PeerExchangeCodec) || this.intervals.get(peerId)) if (
!protocols.includes(PeerExchangeCodec) ||
this.queryingPeers.has(peerId.toString())
)
return; return;
await this.query(peerId); this.queryingPeers.add(peerId.toString());
const interval = setInterval(async () => { this.startRecurringQueries(peerId).catch((error) =>
await this.query(peerId); log(`Error querying peer ${error}`)
}, PEER_EXCHANGE_QUERY_INTERVAL); );
this.intervals.set(peerId, interval);
}; };
constructor(components: PeerExchangeComponents, options: Options = {}) { constructor(components: PeerExchangeComponents, options: Options = {}) {
@ -94,7 +107,7 @@ export class PeerExchangeDiscovery
if (!this.isStarted) return; if (!this.isStarted) return;
log("Stopping peer exchange node discovery"); log("Stopping peer exchange node discovery");
this.isStarted = false; this.isStarted = false;
this.intervals.forEach((interval) => clearInterval(interval)); this.queryingPeers.clear();
this.components.peerStore.removeEventListener( this.components.peerStore.removeEventListener(
"change:protocols", "change:protocols",
this.eventHandler this.eventHandler
@ -109,6 +122,30 @@ export class PeerExchangeDiscovery
return "@waku/peer-exchange"; return "@waku/peer-exchange";
} }
private readonly startRecurringQueries = async (
peerId: PeerId
): Promise<void> => {
const peerIdStr = peerId.toString();
const {
queryInterval = DEFAULT_PEER_EXCHANGE_QUERY_INTERVAL_MS,
maxRetries = DEFAULT_MAX_RETRIES,
} = this.options;
await this.query(peerId);
const currentAttempt = this.queryAttempts.get(peerIdStr) ?? 1;
if (currentAttempt > maxRetries) {
this.abortQueriesForPeer(peerIdStr);
return;
}
setTimeout(async () => {
this.queryAttempts.set(peerIdStr, currentAttempt + 1);
await this.startRecurringQueries(peerId);
}, queryInterval * currentAttempt);
};
private query(peerId: PeerId): Promise<void> { private query(peerId: PeerId): Promise<void> {
return this.peerExchange.query( return this.peerExchange.query(
{ {
@ -125,23 +162,19 @@ export class PeerExchangeDiscovery
continue; continue;
} }
const { peerId, multiaddrs } = ENR; const { peerId } = ENR;
const multiaddrs = ENR.getFullMultiaddrs();
if (!peerId) { if (!peerId || !multiaddrs || multiaddrs.length === 0) continue;
log("no peerId");
continue;
}
if (!multiaddrs || multiaddrs.length === 0) {
log("no multiaddrs");
continue;
}
// check if peer is already in peerStore if (await this.components.peerStore.has(peerId)) continue;
const existingPeer = await this.components.peerStore.get(peerId);
if (existingPeer) { if (
log("peer already in peerStore"); (await this.components.peerStore.getTags(peerId)).find(
({ name }) => name === DEFAULT_BOOTSTRAP_TAG_NAME
)
)
continue; continue;
}
await this.components.peerStore.tagPeer( await this.components.peerStore.tagPeer(
peerId, peerId,
@ -165,6 +198,12 @@ export class PeerExchangeDiscovery
} }
); );
} }
private abortQueriesForPeer(peerIdStr: string): void {
log(`Aborting queries for peer: ${peerIdStr}`);
this.queryingPeers.delete(peerIdStr);
this.queryAttempts.delete(peerIdStr);
}
} }
export function wakuPeerExchangeDiscovery(): ( export function wakuPeerExchangeDiscovery(): (

View File

@ -31,7 +31,7 @@ describe("Peer Exchange", () => {
}); });
await waku.start(); await waku.start();
await delay(1000); await delay(100000);
await waitForRemotePeer(waku, [Protocols.PeerExchange]); await waitForRemotePeer(waku, [Protocols.PeerExchange]);
const pxPeers = await waku.peerExchange.peers(); const pxPeers = await waku.peerExchange.peers();