feat: fix peer renewal, change Filter keep alive (#2065)

* move util, create stream manager folder

* change default keep alive, improve peer renewal

* address nit
This commit is contained in:
Sasha 2024-07-16 18:35:24 +02:00 committed by GitHub
parent 09a81302c1
commit 00635b7afe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 139 additions and 88 deletions

View File

@ -22,6 +22,6 @@ export { waitForRemotePeer } from "./lib/wait_for_remote_peer.js";
export { ConnectionManager } from "./lib/connection_manager.js";
export { KeepAliveManager } from "./lib/keep_alive_manager.js";
export { StreamManager } from "./lib/stream_manager.js";
export { StreamManager } from "./lib/stream_manager/index.js";
export { MetadataCodec, wakuMetadata } from "./lib/metadata/index.js";

View File

@ -14,7 +14,7 @@ import {
} from "@waku/utils/libp2p";
import { filterPeersByDiscovery } from "./filterPeers.js";
import { StreamManager } from "./stream_manager.js";
import { StreamManager } from "./stream_manager/index.js";
/**
* A class with predefined helpers, to be used as a base to implement Waku
@ -47,6 +47,7 @@ export class BaseProtocol implements IBaseProtocolCore {
this.addLibp2pEventListener
);
}
protected async getStream(peer: Peer): Promise<Stream> {
return this.streamManager.getStream(peer);
}

View File

@ -89,7 +89,7 @@ export class ConnectionManager
return instance;
}
stop(): void {
public stop(): void {
this.keepAliveManager.stopAll();
this.libp2p.removeEventListener(
"peer:connect",
@ -105,7 +105,7 @@ export class ConnectionManager
);
}
async dropConnection(peerId: PeerId): Promise<void> {
public async dropConnection(peerId: PeerId): Promise<void> {
try {
this.keepAliveManager.stop(peerId);
await this.libp2p.hangUp(peerId);
@ -187,7 +187,11 @@ export class ConnectionManager
...options
};
this.keepAliveManager = new KeepAliveManager(keepAliveOptions, relay);
this.keepAliveManager = new KeepAliveManager({
relay,
libp2p,
options: keepAliveOptions
});
this.run()
.then(() => log.info(`Connection Manager is now running`))
@ -250,6 +254,7 @@ export class ConnectionManager
this.dialAttemptsForPeer.set(peerId.toString(), -1);
// Dialing succeeded, break the loop
this.keepAliveManager.start(peerId);
break;
} catch (error) {
if (error instanceof AggregateError) {
@ -356,7 +361,7 @@ export class ConnectionManager
);
}
private async attemptDial(peerId: PeerId): Promise<void> {
public async attemptDial(peerId: PeerId): Promise<void> {
if (!(await this.shouldDialPeer(peerId))) return;
if (this.currentActiveParallelDialCount >= this.options.maxParallelDials) {
@ -364,9 +369,7 @@ export class ConnectionManager
return;
}
this.dialPeer(peerId).catch((err) => {
log.error(`Error dialing peer ${peerId.toString()} : ${err}`);
});
await this.dialPeer(peerId);
}
private onEventHandlers = {
@ -389,11 +392,7 @@ export class ConnectionManager
const peerId = evt.detail;
this.keepAliveManager.start(
peerId,
this.libp2p.services.ping,
this.libp2p.peerStore
);
this.keepAliveManager.start(peerId);
const isBootstrap = (await this.getTagNamesForPeer(peerId)).includes(
Tags.BOOTSTRAP
@ -449,38 +448,40 @@ export class ConnectionManager
* @returns true if the peer should be dialed, false otherwise
*/
private async shouldDialPeer(peerId: PeerId): Promise<boolean> {
// if we're already connected to the peer, don't dial
const isConnected = this.libp2p.getConnections(peerId).length > 0;
if (isConnected) {
log.warn(`Already connected to peer ${peerId.toString()}. Not dialing.`);
return false;
}
// if the peer is not part of any of the configured pubsub topics, don't dial
if (!(await this.isPeerTopicConfigured(peerId))) {
const isSameShard = await this.isPeerTopicConfigured(peerId);
if (!isSameShard) {
const shardInfo = await this.getPeerShardInfo(
peerId,
this.libp2p.peerStore
);
log.warn(
`Discovered peer ${peerId.toString()} with ShardInfo ${shardInfo} is not part of any of the configured pubsub topics (${
this.configuredPubsubTopics
}).
Not dialing.`
);
return false;
}
// if the peer is not dialable based on bootstrap status, don't dial
if (!(await this.isPeerDialableBasedOnBootstrapStatus(peerId))) {
const isPreferredBasedOnBootstrap =
await this.isPeerDialableBasedOnBootstrapStatus(peerId);
if (!isPreferredBasedOnBootstrap) {
log.warn(
`Peer ${peerId.toString()} is not dialable based on bootstrap status. Not dialing.`
);
return false;
}
// If the peer is already already has an active dial attempt, or has been dialed before, don't dial it
if (this.dialAttemptsForPeer.has(peerId.toString())) {
const hasBeenDialed = this.dialAttemptsForPeer.has(peerId.toString());
if (hasBeenDialed) {
log.warn(
`Peer ${peerId.toString()} has already been attempted dial before, or already has a dial attempt in progress, skipping dial`
);
@ -502,19 +503,17 @@ export class ConnectionManager
const isBootstrap = tagNames.some((tagName) => tagName === Tags.BOOTSTRAP);
if (isBootstrap) {
const currentBootstrapConnections = this.libp2p
.getConnections()
.filter((conn) => {
return conn.tags.find((name) => name === Tags.BOOTSTRAP);
}).length;
if (currentBootstrapConnections < this.options.maxBootstrapPeersAllowed)
return true;
} else {
if (!isBootstrap) {
return true;
}
return false;
const currentBootstrapConnections = this.libp2p
.getConnections()
.filter((conn) => {
return conn.tags.find((name) => name === Tags.BOOTSTRAP);
}).length;
return currentBootstrapConnections < this.options.maxBootstrapPeersAllowed;
}
private async dispatchDiscoveryEvent(peerId: PeerId): Promise<void> {

View File

@ -1,6 +1,5 @@
import type { PeerId, PeerStore } from "@libp2p/interface";
import type { PingService } from "@libp2p/ping";
import type { IRelay, PeerIdStr } from "@waku/interfaces";
import type { PeerId } from "@libp2p/interface";
import type { IRelay, Libp2p, PeerIdStr } from "@waku/interfaces";
import type { KeepAliveOptions } from "@waku/interfaces";
import { Logger, pubsubTopicToSingleShardInfo } from "@waku/utils";
import { utf8ToBytes } from "@waku/utils/bytes";
@ -10,24 +9,30 @@ import { createEncoder } from "./message/version_0.js";
export const RelayPingContentTopic = "/relay-ping/1/ping/null";
const log = new Logger("keep-alive");
export class KeepAliveManager {
private pingKeepAliveTimers: Map<string, ReturnType<typeof setInterval>>;
private relayKeepAliveTimers: Map<PeerId, ReturnType<typeof setInterval>[]>;
private options: KeepAliveOptions;
private relay?: IRelay;
type CreateKeepAliveManagerOptions = {
options: KeepAliveOptions;
libp2p: Libp2p;
relay?: IRelay;
};
constructor(options: KeepAliveOptions, relay?: IRelay) {
this.pingKeepAliveTimers = new Map();
this.relayKeepAliveTimers = new Map();
export class KeepAliveManager {
private readonly relay?: IRelay;
private readonly libp2p: Libp2p;
private readonly options: KeepAliveOptions;
private pingKeepAliveTimers: Map<string, ReturnType<typeof setInterval>> =
new Map();
private relayKeepAliveTimers: Map<PeerId, ReturnType<typeof setInterval>[]> =
new Map();
constructor({ options, relay, libp2p }: CreateKeepAliveManagerOptions) {
this.options = options;
this.relay = relay;
this.libp2p = libp2p;
}
public start(
peerId: PeerId,
libp2pPing: PingService,
peerStore: PeerStore
): void {
public start(peerId: PeerId): void {
// Just in case a timer already exists for this peer
this.stop(peerId);
@ -46,7 +51,7 @@ export class KeepAliveManager {
// ping the peer for keep alive
// also update the peer store with the latency
try {
ping = await libp2pPing.ping(peerId);
ping = await this.libp2p.services.ping.ping(peerId);
log.info(`Ping succeeded (${peerIdStr})`, ping);
} catch (error) {
log.error(`Ping failed for peer (${peerIdStr}).
@ -56,7 +61,7 @@ export class KeepAliveManager {
}
try {
await peerStore.merge(peerId, {
await this.libp2p.peerStore.merge(peerId, {
metadata: {
ping: utf8ToBytes(ping.toString())
}

View File

@ -0,0 +1 @@
export { StreamManager } from "./stream_manager.js";

View File

@ -2,7 +2,8 @@ import type { PeerUpdate, Stream } from "@libp2p/interface";
import type { Peer, PeerId } from "@libp2p/interface";
import { Libp2p } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { selectConnection } from "@waku/utils/libp2p";
import { selectConnection } from "./utils.js";
const CONNECTION_TIMEOUT = 5_000;
const RETRY_BACKOFF_BASE = 1_000;

View File

@ -0,0 +1,22 @@
import type { Connection } from "@libp2p/interface";
export function selectConnection(
connections: Connection[]
): Connection | undefined {
if (!connections.length) return;
if (connections.length === 1) return connections[0];
let latestConnection: Connection | undefined;
connections.forEach((connection) => {
if (connection.status === "open") {
if (!latestConnection) {
latestConnection = connection;
} else if (connection.timeline.open > latestConnection.timeline.open) {
latestConnection = connection;
}
}
});
return latestConnection;
}

View File

@ -9,6 +9,7 @@ interface Options {
maintainPeersInterval?: number;
}
const RENEW_TIME_LOCK_DURATION = 30 * 1000;
const DEFAULT_NUM_PEERS_TO_USE = 3;
const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000;
@ -21,6 +22,9 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
log: Logger;
private maintainPeersLock = false;
private readonly renewPeersLocker = new RenewPeerLocker(
RENEW_TIME_LOCK_DURATION
);
constructor(
protected core: BaseProtocol,
@ -46,11 +50,6 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
*/
public async renewPeer(peerToDisconnect: PeerId): Promise<Peer> {
this.log.info(`Renewing peer ${peerToDisconnect}`);
await this.connectionManager.dropConnection(peerToDisconnect);
this.peers = this.peers.filter((peer) => peer.id !== peerToDisconnect);
this.log.info(
`Peer ${peerToDisconnect} disconnected and removed from the peer list`
);
const peer = (await this.findAndAddPeers(1))[0];
if (!peer) {
@ -59,6 +58,14 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
);
}
await this.connectionManager.dropConnection(peerToDisconnect);
this.peers = this.peers.filter((peer) => !peer.id.equals(peerToDisconnect));
this.log.info(
`Peer ${peerToDisconnect} disconnected and removed from the peer list`
);
this.renewPeersLocker.lock(peerToDisconnect);
return peer;
}
@ -163,6 +170,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
this.log.info(
`Peer maintenance completed, current count: ${this.peers.length}`
);
this.renewPeersLocker.cleanUnlocked();
} finally {
this.maintainPeersLock = false;
}
@ -177,6 +185,12 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
this.log.info(`Finding and adding ${numPeers} new peers`);
try {
const additionalPeers = await this.findAdditionalPeers(numPeers);
const dials = additionalPeers.map((peer) =>
this.connectionManager.attemptDial(peer.id)
);
await Promise.all(dials);
this.peers = [...this.peers, ...additionalPeers];
this.log.info(
`Added ${additionalPeers.length} new peers, total peers: ${this.peers.length}`
@ -198,22 +212,19 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
private async findAdditionalPeers(numPeers: number): Promise<Peer[]> {
this.log.info(`Finding ${numPeers} additional peers`);
try {
let newPeers = await this.core.getPeers({
maxBootstrapPeers: 0,
numPeers: 0
});
let newPeers = await this.core.allPeers();
if (newPeers.length === 0) {
this.log.warn("No new peers found, trying with bootstrap peers");
newPeers = await this.core.getPeers({
maxBootstrapPeers: numPeers,
numPeers: 0
});
this.log.warn("No new peers found.");
}
newPeers = newPeers
.filter((peer) => this.peers.some((p) => p.id === peer.id) === false)
.filter(
(peer) => this.peers.some((p) => p.id.equals(peer.id)) === false
)
.filter((peer) => !this.renewPeersLocker.isLocked(peer.id))
.slice(0, numPeers);
return newPeers;
} catch (error) {
this.log.error("Error finding additional peers:", error);
@ -221,3 +232,35 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
}
}
}
class RenewPeerLocker {
private readonly peers: Map<string, number> = new Map();
constructor(private lockDuration: number) {}
public lock(id: PeerId): void {
this.peers.set(id.toString(), Date.now());
}
public isLocked(id: PeerId): boolean {
const time = this.peers.get(id.toString());
if (time && !this.isTimeUnlocked(time)) {
return true;
}
return false;
}
public cleanUnlocked(): void {
Object.entries(this.peers).forEach(([id, lock]) => {
if (this.isTimeUnlocked(lock)) {
this.peers.delete(id.toString());
}
});
}
private isTimeUnlocked(time: number): boolean {
return Date.now() - time >= this.lockDuration;
}
}

View File

@ -41,11 +41,11 @@ type SubscriptionCallback<T extends IDecodedMessage> = {
const log = new Logger("sdk:filter");
const MINUTE = 60 * 1000;
const DEFAULT_MAX_PINGS = 3;
const DEFAULT_KEEP_ALIVE = 30 * 1000;
const DEFAULT_SUBSCRIBE_OPTIONS = {
keepAlive: MINUTE
keepAlive: DEFAULT_KEEP_ALIVE
};
export class SubscriptionManager implements ISubscriptionSDK {
private readonly pubsubTopic: PubsubTopic;

View File

@ -105,24 +105,3 @@ export async function getConnectedPeersForProtocolAndShard(
const peersWithNulls = await Promise.all(peerPromises);
return peersWithNulls.filter((peer): peer is Peer => peer !== null);
}
export function selectConnection(
connections: Connection[]
): Connection | undefined {
if (!connections.length) return;
if (connections.length === 1) return connections[0];
let latestConnection: Connection | undefined;
connections.forEach((connection) => {
if (connection.status === "open") {
if (!latestConnection) {
latestConnection = connection;
} else if (connection.timeline.open > latestConnection.timeline.open) {
latestConnection = connection;
}
}
});
return latestConnection;
}