mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-01-13 19:23:14 +00:00
implement TTL update for open connections, add re-bootstrapping in case reaches zero peers
This commit is contained in:
parent
50c7cd16d1
commit
22c10fca53
@ -1,101 +0,0 @@
|
||||
import { PeerId } from "@libp2p/interface";
|
||||
import { Libp2p, Tags } from "@waku/interfaces";
|
||||
import { Logger } from "@waku/utils";
|
||||
|
||||
type BootstrapTriggerConstructorOptions = {
|
||||
libp2p: Libp2p;
|
||||
};
|
||||
|
||||
interface IBootstrapTrigger {
|
||||
start(): void;
|
||||
stop(): void;
|
||||
}
|
||||
|
||||
const log = new Logger("bootstrap-trigger");
|
||||
|
||||
const DEFAULT_BOOTSTRAP_TIMEOUT_MS = 1000;
|
||||
|
||||
export class BootstrapTrigger implements IBootstrapTrigger {
|
||||
private readonly libp2p: Libp2p;
|
||||
private bootstrapTimeout: NodeJS.Timeout | null = null;
|
||||
|
||||
public constructor(options: BootstrapTriggerConstructorOptions) {
|
||||
this.libp2p = options.libp2p;
|
||||
}
|
||||
|
||||
public start(): void {
|
||||
log.info("Starting bootstrap trigger");
|
||||
this.libp2p.addEventListener("peer:disconnect", this.onPeerDisconnectEvent);
|
||||
}
|
||||
|
||||
public stop(): void {
|
||||
log.info("Stopping bootstrap trigger");
|
||||
this.libp2p.removeEventListener(
|
||||
"peer:disconnect",
|
||||
this.onPeerDisconnectEvent
|
||||
);
|
||||
|
||||
if (this.bootstrapTimeout) {
|
||||
clearTimeout(this.bootstrapTimeout);
|
||||
this.bootstrapTimeout = null;
|
||||
log.info("Cleared pending bootstrap timeout");
|
||||
}
|
||||
}
|
||||
|
||||
private onPeerDisconnectEvent = (event: CustomEvent<PeerId>): void => {
|
||||
const peerId = event.detail;
|
||||
const connections = this.libp2p.getConnections();
|
||||
log.info(
|
||||
`Peer disconnected: ${peerId.toString()}, remaining connections: ${connections.length}`
|
||||
);
|
||||
|
||||
if (connections.length !== 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.info(
|
||||
`Last peer disconnected, scheduling bootstrap in ${DEFAULT_BOOTSTRAP_TIMEOUT_MS} milliseconds`
|
||||
);
|
||||
|
||||
if (this.bootstrapTimeout) {
|
||||
clearTimeout(this.bootstrapTimeout);
|
||||
}
|
||||
|
||||
this.bootstrapTimeout = setTimeout(() => {
|
||||
log.info("Triggering bootstrap after timeout");
|
||||
this.triggerBootstrap();
|
||||
this.bootstrapTimeout = null;
|
||||
}, DEFAULT_BOOTSTRAP_TIMEOUT_MS);
|
||||
};
|
||||
|
||||
private triggerBootstrap(): void {
|
||||
log.info("Triggering bootstrap discovery");
|
||||
|
||||
const bootstrapComponents = Object.values(this.libp2p.components.components)
|
||||
.filter((c) => !!c)
|
||||
.filter((c: unknown) =>
|
||||
[`@waku/${Tags.BOOTSTRAP}`, `@waku/${Tags.PEER_CACHE}`].includes(
|
||||
(c as { [Symbol.toStringTag]: string })?.[Symbol.toStringTag]
|
||||
)
|
||||
);
|
||||
|
||||
if (bootstrapComponents.length === 0) {
|
||||
log.warn("No bootstrap components found to trigger");
|
||||
return;
|
||||
}
|
||||
|
||||
log.info(
|
||||
`Found ${bootstrapComponents.length} bootstrap components, starting them`
|
||||
);
|
||||
|
||||
bootstrapComponents.forEach((component) => {
|
||||
try {
|
||||
(component as { stop: () => void })?.stop?.();
|
||||
(component as { start: () => void })?.start?.();
|
||||
log.info("Successfully started bootstrap component");
|
||||
} catch (error) {
|
||||
log.error("Failed to start bootstrap component", error);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@ -9,9 +9,11 @@ import {
|
||||
WakuEvent
|
||||
} from "@waku/interfaces";
|
||||
import { Logger } from "@waku/utils";
|
||||
import { numberToBytes } from "@waku/utils/bytes";
|
||||
|
||||
import { Dialer } from "./dialer.js";
|
||||
import { NetworkMonitor } from "./network_monitor.js";
|
||||
import { isAddressesSupported } from "./utils.js";
|
||||
|
||||
const log = new Logger("connection-limiter");
|
||||
|
||||
@ -123,6 +125,7 @@ export class ConnectionLimiter implements IConnectionLimiter {
|
||||
private async maintainConnections(): Promise<void> {
|
||||
await this.maintainConnectionsCount();
|
||||
await this.maintainBootstrapConnections();
|
||||
await this.maintainTTLConnectedPeers();
|
||||
}
|
||||
|
||||
private async onDisconnectedEvent(): Promise<void> {
|
||||
@ -145,13 +148,15 @@ export class ConnectionLimiter implements IConnectionLimiter {
|
||||
const peers = await this.getPrioritizedPeers();
|
||||
|
||||
if (peers.length === 0) {
|
||||
log.info(`No peers to dial, node is utilizing all known peers`);
|
||||
log.info(`No peers to dial, skipping`);
|
||||
await this.triggerBootstrap();
|
||||
return;
|
||||
}
|
||||
|
||||
const promises = peers
|
||||
.slice(0, this.options.maxConnections - connections.length)
|
||||
.map((p) => this.dialer.dial(p.id));
|
||||
|
||||
await Promise.all(promises);
|
||||
|
||||
return;
|
||||
@ -210,6 +215,28 @@ export class ConnectionLimiter implements IConnectionLimiter {
|
||||
}
|
||||
}
|
||||
|
||||
private async maintainTTLConnectedPeers(): Promise<void> {
|
||||
log.info(`Maintaining TTL connected peers`);
|
||||
|
||||
const promises = this.libp2p.getConnections().map(async (c) => {
|
||||
try {
|
||||
await this.libp2p.peerStore.merge(c.remotePeer, {
|
||||
metadata: {
|
||||
ttl: numberToBytes(Date.now())
|
||||
}
|
||||
});
|
||||
log.info(`TTL updated for connected peer ${c.remotePeer.toString()}`);
|
||||
} catch (error) {
|
||||
log.error(
|
||||
`Unexpected error while maintaining TTL connected peer`,
|
||||
error
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
await Promise.all(promises);
|
||||
}
|
||||
|
||||
private async dialPeersFromStore(): Promise<void> {
|
||||
log.info(`Dialing peers from store`);
|
||||
|
||||
@ -218,6 +245,7 @@ export class ConnectionLimiter implements IConnectionLimiter {
|
||||
|
||||
if (peers.length === 0) {
|
||||
log.info(`No peers to dial, skipping`);
|
||||
await this.triggerBootstrap();
|
||||
return;
|
||||
}
|
||||
|
||||
@ -248,10 +276,9 @@ export class ConnectionLimiter implements IConnectionLimiter {
|
||||
const notConnectedPeers = allPeers.filter(
|
||||
(p) =>
|
||||
!allConnections.some((c) => c.remotePeer.equals(p.id)) &&
|
||||
p.addresses.some(
|
||||
(a) =>
|
||||
a.multiaddr.toString().includes("wss") ||
|
||||
a.multiaddr.toString().includes("ws")
|
||||
isAddressesSupported(
|
||||
this.libp2p,
|
||||
p.addresses.map((a) => a.multiaddr)
|
||||
)
|
||||
);
|
||||
|
||||
@ -267,7 +294,19 @@ export class ConnectionLimiter implements IConnectionLimiter {
|
||||
p.tags.has(Tags.PEER_CACHE)
|
||||
);
|
||||
|
||||
return [...bootstrapPeers, ...peerExchangePeers, ...localStorePeers];
|
||||
const restPeers = notConnectedPeers.filter(
|
||||
(p) =>
|
||||
!p.tags.has(Tags.BOOTSTRAP) &&
|
||||
!p.tags.has(Tags.PEER_EXCHANGE) &&
|
||||
!p.tags.has(Tags.PEER_CACHE)
|
||||
);
|
||||
|
||||
return [
|
||||
...bootstrapPeers,
|
||||
...peerExchangePeers,
|
||||
...localStorePeers,
|
||||
...restPeers
|
||||
];
|
||||
}
|
||||
|
||||
private async getBootstrapPeers(): Promise<Peer[]> {
|
||||
@ -291,4 +330,41 @@ export class ConnectionLimiter implements IConnectionLimiter {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Triggers the bootstrap or peer cache discovery if they are mounted.
|
||||
* @returns void
|
||||
*/
|
||||
private async triggerBootstrap(): Promise<void> {
|
||||
log.info("Triggering bootstrap discovery");
|
||||
|
||||
const bootstrapComponents = Object.values(this.libp2p.components.components)
|
||||
.filter((c) => !!c)
|
||||
.filter((c: unknown) =>
|
||||
[`@waku/${Tags.BOOTSTRAP}`, `@waku/${Tags.PEER_CACHE}`].includes(
|
||||
(c as { [Symbol.toStringTag]: string })?.[Symbol.toStringTag]
|
||||
)
|
||||
);
|
||||
|
||||
if (bootstrapComponents.length === 0) {
|
||||
log.warn("No bootstrap components found to trigger");
|
||||
return;
|
||||
}
|
||||
|
||||
log.info(
|
||||
`Found ${bootstrapComponents.length} bootstrap components, starting them`
|
||||
);
|
||||
|
||||
const promises = bootstrapComponents.map(async (component) => {
|
||||
try {
|
||||
await (component as { stop: () => Promise<void> })?.stop?.();
|
||||
await (component as { start: () => Promise<void> })?.start?.();
|
||||
log.info("Successfully started bootstrap component");
|
||||
} catch (error) {
|
||||
log.error("Failed to start bootstrap component", error);
|
||||
}
|
||||
});
|
||||
|
||||
await Promise.all(promises);
|
||||
}
|
||||
}
|
||||
|
||||
@ -52,6 +52,12 @@ describe("ConnectionManager", () => {
|
||||
dialProtocol: sinon.stub().resolves({} as Stream),
|
||||
hangUp: sinon.stub().resolves(),
|
||||
getPeers: sinon.stub().returns([]),
|
||||
getConnections: sinon.stub().returns([]),
|
||||
addEventListener: sinon.stub(),
|
||||
removeEventListener: sinon.stub(),
|
||||
components: {
|
||||
components: {}
|
||||
},
|
||||
peerStore: {
|
||||
get: sinon.stub().resolves(null),
|
||||
merge: sinon.stub().resolves()
|
||||
|
||||
@ -11,7 +11,6 @@ import {
|
||||
import { Libp2p } from "@waku/interfaces";
|
||||
import { Logger } from "@waku/utils";
|
||||
|
||||
import { BootstrapTrigger } from "./bootstrap_trigger.js";
|
||||
import { ConnectionLimiter } from "./connection_limiter.js";
|
||||
import { Dialer } from "./dialer.js";
|
||||
import { DiscoveryDialer } from "./discovery_dialer.js";
|
||||
@ -46,7 +45,6 @@ export class ConnectionManager implements IConnectionManager {
|
||||
private readonly shardReader: ShardReader;
|
||||
private readonly networkMonitor: NetworkMonitor;
|
||||
private readonly connectionLimiter: ConnectionLimiter;
|
||||
private readonly bootstrapTrigger: BootstrapTrigger;
|
||||
|
||||
private readonly options: ConnectionManagerOptions;
|
||||
private libp2p: Libp2p;
|
||||
@ -66,10 +64,6 @@ export class ConnectionManager implements IConnectionManager {
|
||||
...options.config
|
||||
};
|
||||
|
||||
this.bootstrapTrigger = new BootstrapTrigger({
|
||||
libp2p: options.libp2p
|
||||
});
|
||||
|
||||
this.keepAliveManager = new KeepAliveManager({
|
||||
relay: options.relay,
|
||||
libp2p: options.libp2p,
|
||||
@ -116,7 +110,6 @@ export class ConnectionManager implements IConnectionManager {
|
||||
this.discoveryDialer.start();
|
||||
this.keepAliveManager.start();
|
||||
this.connectionLimiter.start();
|
||||
this.bootstrapTrigger.start();
|
||||
}
|
||||
|
||||
public stop(): void {
|
||||
@ -125,7 +118,6 @@ export class ConnectionManager implements IConnectionManager {
|
||||
this.discoveryDialer.stop();
|
||||
this.keepAliveManager.stop();
|
||||
this.connectionLimiter.stop();
|
||||
this.bootstrapTrigger.stop();
|
||||
}
|
||||
|
||||
public isConnected(): boolean {
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
import { isPeerId, type Peer, type PeerId } from "@libp2p/interface";
|
||||
import { peerIdFromString } from "@libp2p/peer-id";
|
||||
import { Multiaddr, multiaddr, MultiaddrInput } from "@multiformats/multiaddr";
|
||||
import { Libp2p } from "@waku/interfaces";
|
||||
import { bytesToUtf8 } from "@waku/utils/bytes";
|
||||
|
||||
/**
|
||||
@ -49,3 +50,25 @@ export const mapToPeerId = (input: PeerId | MultiaddrInput): PeerId => {
|
||||
? input
|
||||
: peerIdFromString(multiaddr(input).getPeerId()!);
|
||||
};
|
||||
|
||||
/**
|
||||
* Checks if the address is supported by the libp2p instance.
|
||||
* @param libp2p - The libp2p instance.
|
||||
* @param addresses - The addresses to check.
|
||||
* @returns True if the addresses are supported, false otherwise.
|
||||
*/
|
||||
export const isAddressesSupported = (
|
||||
libp2p: Libp2p,
|
||||
addresses: Multiaddr[]
|
||||
): boolean => {
|
||||
const transports =
|
||||
libp2p?.components?.transportManager?.getTransports() || [];
|
||||
|
||||
if (transports.length === 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return transports
|
||||
.map((transport) => transport.dialFilter(addresses))
|
||||
.some((supportedAddresses) => supportedAddresses.length > 0);
|
||||
};
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user