mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-01-04 23:03:07 +00:00
feat!: ConnectionManager and KeepAliveManager (#1135)
* initialises ConnectionManager and KeepAliveManager ports from previous PR and makes necessary improvements and reductions * address: comments * map a ConnectionManager instance with a WakuNode * abstract event listeners logic * minor fix * minor cleaning * instantiate KeepAliveManager instead of extending * fix build and enable all tests * fix CI * address review * refine event handlers - only removes the previously attached callback from the event handlers while shutting down service - removes the requirement of passing around `keepAliveOptions` and `relay` inside of `ConnectionManager` * add verbosity to interface * make `dialPeer()` more readable * use set to push tags to avoid duplicates * fix: merge build * remove: logging function * rename startService and stopService * remove: future TODO added that as part of future refactor * use the new libp2p api * initialise options in constructor//fix TS error * remove stale export * address principal review * reset test timeout to master
This commit is contained in:
parent
35b276804c
commit
24c24cc27d
@ -4,8 +4,8 @@
|
||||
"language": "en",
|
||||
"words": [
|
||||
"abortable",
|
||||
"Alives",
|
||||
"ahadns",
|
||||
"Alives",
|
||||
"asym",
|
||||
"backoff",
|
||||
"backoffs",
|
||||
@ -20,6 +20,7 @@
|
||||
"codecov",
|
||||
"commitlint",
|
||||
"dependabot",
|
||||
"dialable",
|
||||
"dingpu",
|
||||
"discv",
|
||||
"Dlazy",
|
||||
@ -97,6 +98,7 @@
|
||||
"supercrypto",
|
||||
"transpiled",
|
||||
"typedoc",
|
||||
"undialable",
|
||||
"unencrypted",
|
||||
"unmarshal",
|
||||
"unmount",
|
||||
|
||||
31882
package-lock.json
generated
31882
package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@ -29,3 +29,10 @@ export {
|
||||
} from "./lib/store/index.js";
|
||||
|
||||
export { waitForRemotePeer } from "./lib/wait_for_remote_peer.js";
|
||||
|
||||
export { ConnectionManager } from "./lib/connection_manager.js";
|
||||
|
||||
export {
|
||||
KeepAliveManager,
|
||||
KeepAliveOptions,
|
||||
} from "./lib/keep_alive_manager.js";
|
||||
|
||||
220
packages/core/src/lib/connection_manager.ts
Normal file
220
packages/core/src/lib/connection_manager.ts
Normal file
@ -0,0 +1,220 @@
|
||||
import type { Connection } from "@libp2p/interface-connection";
|
||||
import type { Libp2p } from "@libp2p/interface-libp2p";
|
||||
import type { PeerId } from "@libp2p/interface-peer-id";
|
||||
import type { PeerInfo } from "@libp2p/interface-peer-info";
|
||||
import type { ConnectionManagerOptions, IRelay } from "@waku/interfaces";
|
||||
import { Tags } from "@waku/interfaces";
|
||||
import debug from "debug";
|
||||
|
||||
import { KeepAliveManager, KeepAliveOptions } from "./keep_alive_manager.js";
|
||||
|
||||
const log = debug("waku:connection-manager");
|
||||
|
||||
export const DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED = 1;
|
||||
export const DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER = 3;
|
||||
|
||||
export class ConnectionManager {
|
||||
private static instances = new Map<string, ConnectionManager>();
|
||||
private keepAliveManager: KeepAliveManager;
|
||||
private options: ConnectionManagerOptions;
|
||||
private libp2pComponents: Libp2p;
|
||||
private dialAttemptsForPeer: Map<string, number> = new Map();
|
||||
|
||||
public static create(
|
||||
peerId: string,
|
||||
libp2p: Libp2p,
|
||||
keepAliveOptions: KeepAliveOptions,
|
||||
relay?: IRelay,
|
||||
options?: ConnectionManagerOptions
|
||||
): ConnectionManager {
|
||||
let instance = ConnectionManager.instances.get(peerId);
|
||||
if (!instance) {
|
||||
instance = new ConnectionManager(
|
||||
libp2p,
|
||||
keepAliveOptions,
|
||||
relay,
|
||||
options
|
||||
);
|
||||
ConnectionManager.instances.set(peerId, instance);
|
||||
}
|
||||
|
||||
return instance;
|
||||
}
|
||||
|
||||
private constructor(
|
||||
libp2pComponents: Libp2p,
|
||||
keepAliveOptions: KeepAliveOptions,
|
||||
relay?: IRelay,
|
||||
options?: Partial<ConnectionManagerOptions>
|
||||
) {
|
||||
this.libp2pComponents = libp2pComponents;
|
||||
this.options = {
|
||||
maxDialAttemptsForPeer: DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER,
|
||||
maxBootstrapPeersAllowed: DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED,
|
||||
...options,
|
||||
};
|
||||
|
||||
this.keepAliveManager = new KeepAliveManager(keepAliveOptions, relay);
|
||||
|
||||
this.run()
|
||||
.then(() => log(`Connection Manager is now running`))
|
||||
.catch((error) => log(`Unexpected error while running service`, error));
|
||||
}
|
||||
|
||||
private async run(): Promise<void> {
|
||||
// start event listeners
|
||||
this.startPeerDiscoveryListener();
|
||||
this.startPeerConnectionListener();
|
||||
this.startPeerDisconnectionListener();
|
||||
}
|
||||
|
||||
stop(): void {
|
||||
this.keepAliveManager.stopAll();
|
||||
this.libp2pComponents.removeEventListener(
|
||||
"peer:connect",
|
||||
this.onEventHandlers["peer:connect"]
|
||||
);
|
||||
this.libp2pComponents.removeEventListener(
|
||||
"peer:disconnect",
|
||||
this.onEventHandlers["peer:disconnect"]
|
||||
);
|
||||
this.libp2pComponents.removeEventListener(
|
||||
"peer:discovery",
|
||||
this.onEventHandlers["peer:discovery"]
|
||||
);
|
||||
}
|
||||
|
||||
private async dialPeer(peerId: PeerId): Promise<void> {
|
||||
let dialAttempt = 0;
|
||||
while (dialAttempt <= this.options.maxDialAttemptsForPeer) {
|
||||
try {
|
||||
log(`Dialing peer ${peerId.toString()}`);
|
||||
await this.libp2pComponents.dial(peerId);
|
||||
|
||||
const tags = await this.getTagNamesForPeer(peerId);
|
||||
// add tag to connection describing discovery mechanism
|
||||
// don't add duplicate tags
|
||||
this.libp2pComponents
|
||||
.getConnections(peerId)
|
||||
.forEach(
|
||||
(conn) => (conn.tags = Array.from(new Set([...conn.tags, ...tags])))
|
||||
);
|
||||
|
||||
this.dialAttemptsForPeer.delete(peerId.toString());
|
||||
return;
|
||||
} catch (error) {
|
||||
log(`
|
||||
Error dialing peer ${peerId.toString()}`);
|
||||
dialAttempt = this.dialAttemptsForPeer.get(peerId.toString()) ?? 1;
|
||||
this.dialAttemptsForPeer.set(peerId.toString(), dialAttempt + 1);
|
||||
|
||||
if (dialAttempt <= this.options.maxDialAttemptsForPeer) {
|
||||
log(`Reattempting dial (${dialAttempt})`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
log(`Deleting undialable peer ${peerId.toString()} from peer store`);
|
||||
return await this.libp2pComponents.peerStore.delete(peerId);
|
||||
} catch (error) {
|
||||
throw `Error deleting undialable peer ${peerId.toString()} from peer store - ${error}`;
|
||||
}
|
||||
}
|
||||
|
||||
private startPeerDiscoveryListener(): void {
|
||||
this.libp2pComponents.peerStore.addEventListener(
|
||||
"peer",
|
||||
this.onEventHandlers["peer:discovery"]
|
||||
);
|
||||
}
|
||||
|
||||
private startPeerConnectionListener(): void {
|
||||
this.libp2pComponents.addEventListener(
|
||||
"peer:connect",
|
||||
this.onEventHandlers["peer:connect"]
|
||||
);
|
||||
}
|
||||
|
||||
private startPeerDisconnectionListener(): void {
|
||||
// TODO: ensure that these following issues are updated and confirmed
|
||||
/**
|
||||
* NOTE: Event is not being emitted on closing nor losing a connection.
|
||||
* @see https://github.com/libp2p/js-libp2p/issues/939
|
||||
* @see https://github.com/status-im/js-waku/issues/252
|
||||
*
|
||||
* >This event will be triggered anytime we are disconnected from another peer,
|
||||
* >regardless of the circumstances of that disconnection.
|
||||
* >If we happen to have multiple connections to a peer,
|
||||
* >this event will **only** be triggered when the last connection is closed.
|
||||
* @see https://github.com/libp2p/js-libp2p/blob/bad9e8c0ff58d60a78314077720c82ae331cc55b/doc/API.md?plain=1#L2100
|
||||
*/
|
||||
this.libp2pComponents.addEventListener(
|
||||
"peer:disconnect",
|
||||
this.onEventHandlers["peer:disconnect"]
|
||||
);
|
||||
}
|
||||
|
||||
private onEventHandlers = {
|
||||
"peer:discovery": async (evt: CustomEvent<PeerInfo>): Promise<void> => {
|
||||
const { id: peerId } = evt.detail;
|
||||
if (!(await this.shouldDialPeer(peerId))) return;
|
||||
|
||||
this.dialPeer(peerId).catch((err) =>
|
||||
log(`Error dialing peer ${peerId.toString()} : ${err}`)
|
||||
);
|
||||
},
|
||||
"peer:connect": (evt: CustomEvent<Connection>): void => {
|
||||
{
|
||||
this.keepAliveManager.start(
|
||||
evt.detail.remotePeer,
|
||||
this.libp2pComponents.ping.bind(this)
|
||||
);
|
||||
}
|
||||
},
|
||||
"peer:disconnect": () => {
|
||||
return (evt: CustomEvent<Connection>): void => {
|
||||
this.keepAliveManager.stop(evt.detail.remotePeer);
|
||||
};
|
||||
},
|
||||
};
|
||||
|
||||
/**
|
||||
* Checks if the peer is dialable based on the following conditions:
|
||||
* 1. If the peer is a bootstrap peer, it is only dialable if the number of current bootstrap connections is less than the max allowed.
|
||||
* 2. If the peer is not a bootstrap peer
|
||||
*/
|
||||
private async shouldDialPeer(peerId: PeerId): Promise<boolean> {
|
||||
const isConnected = this.libp2pComponents.getConnections(peerId).length > 0;
|
||||
|
||||
if (isConnected) return false;
|
||||
|
||||
const isBootstrap = (await this.getTagNamesForPeer(peerId)).some(
|
||||
(tagName) => tagName === Tags.BOOTSTRAP
|
||||
);
|
||||
|
||||
if (isBootstrap) {
|
||||
const currentBootstrapConnections = this.libp2pComponents
|
||||
.getConnections()
|
||||
.filter((conn) => {
|
||||
conn.tags.find((name) => name === Tags.BOOTSTRAP);
|
||||
}).length;
|
||||
if (currentBootstrapConnections < this.options.maxBootstrapPeersAllowed)
|
||||
return true;
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches the tag names for a given peer
|
||||
*/
|
||||
private async getTagNamesForPeer(peerId: PeerId): Promise<string[]> {
|
||||
const tags = (await this.libp2pComponents.peerStore.getTags(peerId)).map(
|
||||
(tag) => tag.name
|
||||
);
|
||||
return tags;
|
||||
}
|
||||
}
|
||||
89
packages/core/src/lib/keep_alive_manager.ts
Normal file
89
packages/core/src/lib/keep_alive_manager.ts
Normal file
@ -0,0 +1,89 @@
|
||||
import type { PeerId } from "@libp2p/interface-peer-id";
|
||||
import type { IRelay } from "@waku/interfaces";
|
||||
import debug from "debug";
|
||||
import type { Libp2p } from "libp2p";
|
||||
|
||||
import { createEncoder } from "../index.js";
|
||||
|
||||
import { RelayPingContentTopic } from "./relay/constants.js";
|
||||
|
||||
const log = debug("waku:keep-alive");
|
||||
|
||||
export interface KeepAliveOptions {
|
||||
pingKeepAlive: number;
|
||||
relayKeepAlive: number;
|
||||
}
|
||||
|
||||
export class KeepAliveManager {
|
||||
private pingKeepAliveTimers: Map<string, ReturnType<typeof setInterval>>;
|
||||
private relayKeepAliveTimers: Map<PeerId, ReturnType<typeof setInterval>>;
|
||||
private options: KeepAliveOptions;
|
||||
private relay?: IRelay;
|
||||
|
||||
constructor(options: KeepAliveOptions, relay?: IRelay) {
|
||||
this.pingKeepAliveTimers = new Map();
|
||||
this.relayKeepAliveTimers = new Map();
|
||||
this.options = options;
|
||||
this.relay = relay;
|
||||
}
|
||||
|
||||
public start(peerId: PeerId, libp2pPing: Libp2p["ping"]): void {
|
||||
// Just in case a timer already exist for this peer
|
||||
this.stop(peerId);
|
||||
|
||||
const { pingKeepAlive: pingPeriodSecs, relayKeepAlive: relayPeriodSecs } =
|
||||
this.options;
|
||||
|
||||
const peerIdStr = peerId.toString();
|
||||
|
||||
if (pingPeriodSecs !== 0) {
|
||||
const interval = setInterval(() => {
|
||||
libp2pPing(peerId).catch((e) => {
|
||||
log(`Ping failed (${peerIdStr})`, e);
|
||||
});
|
||||
}, pingPeriodSecs * 1000);
|
||||
this.pingKeepAliveTimers.set(peerIdStr, interval);
|
||||
}
|
||||
|
||||
const relay = this.relay;
|
||||
if (relay && relayPeriodSecs !== 0) {
|
||||
const encoder = createEncoder({
|
||||
contentTopic: RelayPingContentTopic,
|
||||
ephemeral: true,
|
||||
});
|
||||
const interval = setInterval(() => {
|
||||
log("Sending Waku Relay ping message");
|
||||
relay
|
||||
.send(encoder, { payload: new Uint8Array() })
|
||||
.catch((e) => log("Failed to send relay ping", e));
|
||||
}, relayPeriodSecs * 1000);
|
||||
this.relayKeepAliveTimers.set(peerId, interval);
|
||||
}
|
||||
}
|
||||
|
||||
public stop(peerId: PeerId): void {
|
||||
const peerIdStr = peerId.toString();
|
||||
|
||||
if (this.pingKeepAliveTimers.has(peerIdStr)) {
|
||||
clearInterval(this.pingKeepAliveTimers.get(peerIdStr));
|
||||
this.pingKeepAliveTimers.delete(peerIdStr);
|
||||
}
|
||||
|
||||
if (this.relayKeepAliveTimers.has(peerId)) {
|
||||
clearInterval(this.relayKeepAliveTimers.get(peerId));
|
||||
this.relayKeepAliveTimers.delete(peerId);
|
||||
}
|
||||
}
|
||||
|
||||
public stopAll(): void {
|
||||
for (const timer of [
|
||||
...Object.values(this.pingKeepAliveTimers),
|
||||
...Object.values(this.relayKeepAliveTimers),
|
||||
]) {
|
||||
clearInterval(timer);
|
||||
}
|
||||
|
||||
this.pingKeepAliveTimers.clear();
|
||||
this.relayKeepAliveTimers.clear();
|
||||
}
|
||||
}
|
||||
@ -13,9 +13,8 @@ import type {
|
||||
import { Protocols } from "@waku/interfaces";
|
||||
import debug from "debug";
|
||||
|
||||
import { createEncoder } from "./message/version_0.js";
|
||||
import { ConnectionManager } from "./connection_manager.js";
|
||||
import * as relayConstants from "./relay/constants.js";
|
||||
import { RelayPingContentTopic } from "./relay/constants.js";
|
||||
|
||||
export const DefaultPingKeepAliveValueSecs = 0;
|
||||
export const DefaultRelayKeepAliveValueSecs = 5 * 60;
|
||||
@ -51,13 +50,7 @@ export class WakuNode implements Waku {
|
||||
public store?: IStore;
|
||||
public filter?: IFilter;
|
||||
public lightPush?: ILightPush;
|
||||
|
||||
private pingKeepAliveTimers: {
|
||||
[peer: string]: ReturnType<typeof setInterval>;
|
||||
};
|
||||
private relayKeepAliveTimers: {
|
||||
[peer: string]: ReturnType<typeof setInterval>;
|
||||
};
|
||||
public connectionManager: ConnectionManager;
|
||||
|
||||
constructor(
|
||||
options: WakuOptions,
|
||||
@ -82,49 +75,27 @@ export class WakuNode implements Waku {
|
||||
this.relay = libp2p.pubsub;
|
||||
}
|
||||
|
||||
log(
|
||||
"Waku node created",
|
||||
this.libp2p.peerId.toString(),
|
||||
`relay: ${!!this.relay}, store: ${!!this.store}, light push: ${!!this
|
||||
.lightPush}, filter: ${!!this.filter}} `
|
||||
);
|
||||
|
||||
this.pingKeepAliveTimers = {};
|
||||
this.relayKeepAliveTimers = {};
|
||||
|
||||
const pingKeepAlive =
|
||||
options.pingKeepAlive || DefaultPingKeepAliveValueSecs;
|
||||
const relayKeepAlive = this.relay
|
||||
? options.relayKeepAlive || DefaultRelayKeepAliveValueSecs
|
||||
: 0;
|
||||
|
||||
libp2p.addEventListener("peer:connect", (evt) => {
|
||||
this.startKeepAlive(evt.detail.remotePeer, pingKeepAlive, relayKeepAlive);
|
||||
});
|
||||
const peerId = this.libp2p.peerId.toString();
|
||||
|
||||
/**
|
||||
* NOTE: Event is not being emitted on closing nor losing a connection.
|
||||
* @see https://github.com/libp2p/js-libp2p/issues/939
|
||||
* @see https://github.com/status-im/js-waku/issues/252
|
||||
*
|
||||
* >This event will be triggered anytime we are disconnected from another peer,
|
||||
* >regardless of the circumstances of that disconnection.
|
||||
* >If we happen to have multiple connections to a peer,
|
||||
* >this event will **only** be triggered when the last connection is closed.
|
||||
* @see https://github.com/libp2p/js-libp2p/blob/bad9e8c0ff58d60a78314077720c82ae331cc55b/doc/API.md?plain=1#L2100
|
||||
*/
|
||||
libp2p.addEventListener("peer:disconnect", (evt) => {
|
||||
this.stopKeepAlive(evt.detail.remotePeer);
|
||||
});
|
||||
this.connectionManager = ConnectionManager.create(
|
||||
peerId,
|
||||
libp2p,
|
||||
{ pingKeepAlive, relayKeepAlive },
|
||||
this.relay
|
||||
);
|
||||
|
||||
// Trivial handling of discovered peers, to be refined.
|
||||
libp2p.addEventListener("peer:discovery", (evt) => {
|
||||
const peerId = evt.detail.id;
|
||||
log(`Found peer ${peerId.toString()}, dialing.`);
|
||||
libp2p.dial(peerId).catch((err) => {
|
||||
log(`Fail to dial ${peerId}`, err);
|
||||
});
|
||||
});
|
||||
log(
|
||||
"Waku node created",
|
||||
peerId,
|
||||
`relay: ${!!this.relay}, store: ${!!this.store}, light push: ${!!this
|
||||
.lightPush}, filter: ${!!this.filter}`
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -194,7 +165,7 @@ export class WakuNode implements Waku {
|
||||
}
|
||||
|
||||
async stop(): Promise<void> {
|
||||
this.stopAllKeepAlives();
|
||||
this.connectionManager.stop();
|
||||
await this.libp2p.stop();
|
||||
}
|
||||
|
||||
@ -216,65 +187,6 @@ export class WakuNode implements Waku {
|
||||
}
|
||||
return localMultiaddr + "/p2p/" + this.libp2p.peerId.toString();
|
||||
}
|
||||
|
||||
private startKeepAlive(
|
||||
peerId: PeerId,
|
||||
pingPeriodSecs: number,
|
||||
relayPeriodSecs: number
|
||||
): void {
|
||||
// Just in case a timer already exist for this peer
|
||||
this.stopKeepAlive(peerId);
|
||||
|
||||
const peerIdStr = peerId.toString();
|
||||
|
||||
if (pingPeriodSecs !== 0) {
|
||||
this.pingKeepAliveTimers[peerIdStr] = setInterval(() => {
|
||||
this.libp2p.ping(peerId).catch((e) => {
|
||||
log(`Ping failed (${peerIdStr})`, e);
|
||||
});
|
||||
}, pingPeriodSecs * 1000);
|
||||
}
|
||||
|
||||
const relay = this.relay;
|
||||
if (relay && relayPeriodSecs !== 0) {
|
||||
const encoder = createEncoder({
|
||||
contentTopic: RelayPingContentTopic,
|
||||
ephemeral: true,
|
||||
});
|
||||
this.relayKeepAliveTimers[peerIdStr] = setInterval(() => {
|
||||
log("Sending Waku Relay ping message");
|
||||
relay
|
||||
.send(encoder, { payload: new Uint8Array() })
|
||||
.catch((e) => log("Failed to send relay ping", e));
|
||||
}, relayPeriodSecs * 1000);
|
||||
}
|
||||
}
|
||||
|
||||
private stopKeepAlive(peerId: PeerId): void {
|
||||
const peerIdStr = peerId.toString();
|
||||
|
||||
if (this.pingKeepAliveTimers[peerIdStr]) {
|
||||
clearInterval(this.pingKeepAliveTimers[peerIdStr]);
|
||||
delete this.pingKeepAliveTimers[peerIdStr];
|
||||
}
|
||||
|
||||
if (this.relayKeepAliveTimers[peerIdStr]) {
|
||||
clearInterval(this.relayKeepAliveTimers[peerIdStr]);
|
||||
delete this.relayKeepAliveTimers[peerIdStr];
|
||||
}
|
||||
}
|
||||
|
||||
private stopAllKeepAlives(): void {
|
||||
for (const timer of [
|
||||
...Object.values(this.pingKeepAliveTimers),
|
||||
...Object.values(this.relayKeepAliveTimers),
|
||||
]) {
|
||||
clearInterval(timer);
|
||||
}
|
||||
|
||||
this.pingKeepAliveTimers = {};
|
||||
this.relayKeepAliveTimers = {};
|
||||
}
|
||||
}
|
||||
|
||||
function isRelay(pubsub: PubSub): pubsub is IRelay {
|
||||
|
||||
17
packages/interfaces/src/connection_manager.ts
Normal file
17
packages/interfaces/src/connection_manager.ts
Normal file
@ -0,0 +1,17 @@
|
||||
export enum Tags {
|
||||
BOOTSTRAP = "bootstrap",
|
||||
PEER_EXCHANGE = "peer-exchange",
|
||||
}
|
||||
|
||||
export interface ConnectionManagerOptions {
|
||||
/**
|
||||
* Number of attempts before a peer is considered non-dialable
|
||||
* This is used to not spam a peer with dial attempts when it is not dialable
|
||||
*/
|
||||
maxDialAttemptsForPeer: number;
|
||||
/**
|
||||
* Max number of bootstrap peers allowed to be connected to, initially
|
||||
* This is used to increase intention of dialing non-bootstrap peers, found using other discovery mechanisms (like Peer Exchange)
|
||||
*/
|
||||
maxBootstrapPeersAllowed: number;
|
||||
}
|
||||
@ -7,3 +7,4 @@ export * from "./protocols.js";
|
||||
export * from "./relay.js";
|
||||
export * from "./store.js";
|
||||
export * from "./waku.js";
|
||||
export * from "./connection_manager.js";
|
||||
|
||||
@ -31,7 +31,7 @@ describe("Peer Exchange", () => {
|
||||
});
|
||||
|
||||
it("Auto discovery", async function () {
|
||||
this.timeout(60_000);
|
||||
this.timeout(50_000);
|
||||
|
||||
waku = await createLightNode({
|
||||
libp2p: {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user