feat: ConnectionManager extends EventEmitter & exposed on the Waku interface (& minor improvements) (#1447)

* move KeepAliveOptions to dedicated interface file

* update export for KeepAlive

* expose `ConnectionManager` on the waku node

* update ConnectionManager test to use the exposed API

* rm: only for the test
This commit is contained in:
Danish Arora 2023-07-31 13:54:39 +05:30 committed by GitHub
parent 30fcacea84
commit 0b8936f1f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 46 additions and 35 deletions

View File

@ -29,7 +29,4 @@ export { waitForRemotePeer } from "./lib/wait_for_remote_peer.js";
export { ConnectionManager } from "./lib/connection_manager.js";
export {
KeepAliveManager,
KeepAliveOptions,
} from "./lib/keep_alive_manager.js";
export { KeepAliveManager } from "./lib/keep_alive_manager.js";

View File

@ -5,14 +5,16 @@ import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events";
import {
ConnectionManagerOptions,
EPeersByDiscoveryEvents,
IConnectionManager,
IPeersByDiscoveryEvents,
IRelay,
KeepAliveOptions,
PeersByDiscoveryResult,
} from "@waku/interfaces";
import { Libp2p, Tags } from "@waku/interfaces";
import debug from "debug";
import { KeepAliveManager, KeepAliveOptions } from "./keep_alive_manager.js";
import { KeepAliveManager } from "./keep_alive_manager.js";
const log = debug("waku:connection-manager");
@ -20,7 +22,10 @@ export const DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED = 1;
export const DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER = 3;
export const DEFAULT_MAX_PARALLEL_DIALS = 3;
export class ConnectionManager extends EventEmitter<IPeersByDiscoveryEvents> {
export class ConnectionManager
extends EventEmitter<IPeersByDiscoveryEvents>
implements IConnectionManager
{
private static instances = new Map<string, ConnectionManager>();
private keepAliveManager: KeepAliveManager;
private options: ConnectionManagerOptions;
@ -217,7 +222,7 @@ export class ConnectionManager extends EventEmitter<IPeersByDiscoveryEvents> {
}
}
async dropConnection(peerId: PeerId): Promise<void> {
private async dropConnection(peerId: PeerId): Promise<void> {
try {
this.keepAliveManager.stop(peerId);
await this.libp2p.hangUp(peerId);

View File

@ -1,5 +1,6 @@
import type { PeerId } from "@libp2p/interface-peer-id";
import type { IRelay } from "@waku/interfaces";
import type { KeepAliveOptions } from "@waku/interfaces";
import debug from "debug";
import type { PingService } from "libp2p/ping";
@ -8,11 +9,6 @@ import { createEncoder } from "../index.js";
export const RelayPingContentTopic = "/relay-ping/1/ping/null";
const log = debug("waku:keep-alive");
export interface KeepAliveOptions {
pingKeepAlive: number;
relayKeepAlive: number;
}
export class KeepAliveManager {
private pingKeepAliveTimers: Map<string, ReturnType<typeof setInterval>>;
private relayKeepAliveTimers: Map<PeerId, ReturnType<typeof setInterval>>;

View File

@ -1,5 +1,6 @@
import type { PeerId } from "@libp2p/interface-peer-id";
import type { Peer } from "@libp2p/interface-peer-store";
import type { EventEmitter } from "@libp2p/interfaces/events";
export enum Tags {
BOOTSTRAP = "bootstrap",
@ -47,3 +48,9 @@ export interface PeersByDiscoveryResult {
[Tags.PEER_EXCHANGE]: Peer[];
};
}
export interface IConnectionManager
extends EventEmitter<IPeersByDiscoveryEvents> {
getPeersByDiscovery(): Promise<PeersByDiscoveryResult>;
stop(): void;
}

View File

@ -12,3 +12,4 @@ export * from "./sender.js";
export * from "./receiver.js";
export * from "./misc.js";
export * from "./libp2p.js";
export * from "./keep_alive_manager.js";

View File

@ -0,0 +1,4 @@
export interface KeepAliveOptions {
pingKeepAlive: number;
relayKeepAlive: number;
}

View File

@ -2,6 +2,7 @@ import type { Stream } from "@libp2p/interface-connection";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { Multiaddr } from "@multiformats/multiaddr";
import { IConnectionManager } from "./connection_manager.js";
import type { IFilter } from "./filter.js";
import type { Libp2p } from "./libp2p.js";
import type { ILightPush } from "./light_push.js";
@ -16,6 +17,8 @@ export interface Waku {
filter?: IFilter;
lightPush?: ILightPush;
connectionManager: IConnectionManager;
dial(peer: PeerId | Multiaddr, protocols?: Protocols[]): Promise<Stream>;
start(): Promise<void>;

View File

@ -1,6 +1,5 @@
import { CustomEvent } from "@libp2p/interfaces/events";
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import { ConnectionManager, KeepAliveOptions } from "@waku/core";
import { EPeersByDiscoveryEvents, LightNode, Tags } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { expect } from "chai";
@ -8,26 +7,14 @@ import sinon, { SinonSpy, SinonStub } from "sinon";
import { delay } from "../dist/delay.js";
const KEEP_ALIVE_OPTIONS: KeepAliveOptions = {
pingKeepAlive: 0,
relayKeepAlive: 5 * 1000,
};
const TEST_TIMEOUT = 10_000;
const DELAY_MS = 1_000;
describe("ConnectionManager", function () {
let connectionManager: ConnectionManager | undefined;
let waku: LightNode;
let peerId: string;
beforeEach(async function () {
waku = await createLightNode();
peerId = Math.random().toString(36).substring(7);
connectionManager = ConnectionManager.create(
peerId,
waku.libp2p,
KEEP_ALIVE_OPTIONS
);
});
afterEach(async () => {
@ -51,7 +38,7 @@ describe("ConnectionManager", function () {
});
const peerDiscoveryBootstrap = new Promise<boolean>((resolve) => {
connectionManager!.addEventListener(
waku.connectionManager.addEventListener(
EPeersByDiscoveryEvents.PEER_DISCOVERY_BOOTSTRAP,
({ detail: receivedPeerId }) => {
resolve(receivedPeerId.toString() === peerIdBootstrap.toString());
@ -59,7 +46,9 @@ describe("ConnectionManager", function () {
);
});
waku.libp2p.dispatchEvent(new CustomEvent("peer", { detail: peerId }));
waku.libp2p.dispatchEvent(
new CustomEvent("peer", { detail: await createSecp256k1PeerId() })
);
expect(await peerDiscoveryBootstrap).to.eq(true);
});
@ -77,7 +66,7 @@ describe("ConnectionManager", function () {
});
const peerDiscoveryPeerExchange = new Promise<boolean>((resolve) => {
connectionManager!.addEventListener(
waku.connectionManager.addEventListener(
EPeersByDiscoveryEvents.PEER_DISCOVERY_PEER_EXCHANGE,
({ detail: receivedPeerId }) => {
resolve(receivedPeerId.toString() === peerIdPx.toString());
@ -109,7 +98,7 @@ describe("ConnectionManager", function () {
});
const peerConnectedBootstrap = new Promise<boolean>((resolve) => {
connectionManager!.addEventListener(
waku.connectionManager.addEventListener(
EPeersByDiscoveryEvents.PEER_CONNECT_BOOTSTRAP,
({ detail: receivedPeerId }) => {
resolve(receivedPeerId.toString() === peerIdBootstrap.toString());
@ -136,7 +125,7 @@ describe("ConnectionManager", function () {
});
const peerConnectedPeerExchange = new Promise<boolean>((resolve) => {
connectionManager!.addEventListener(
waku.connectionManager.addEventListener(
EPeersByDiscoveryEvents.PEER_CONNECT_PEER_EXCHANGE,
({ detail: receivedPeerId }) => {
resolve(receivedPeerId.toString() === peerIdPx.toString());
@ -157,8 +146,14 @@ describe("ConnectionManager", function () {
let dialPeerStub: SinonStub;
let getConnectionsStub: SinonStub;
let getTagNamesForPeerStub: SinonStub;
let waku: LightNode;
afterEach(() => {
this.beforeEach(async function () {
waku = await createLightNode();
});
afterEach(async () => {
await waku.stop();
sinon.restore();
});
@ -166,7 +161,10 @@ describe("ConnectionManager", function () {
let attemptDialSpy: SinonSpy;
beforeEach(function () {
attemptDialSpy = sinon.spy(connectionManager as any, "attemptDial");
attemptDialSpy = sinon.spy(
waku.connectionManager as any,
"attemptDial"
);
});
afterEach(function () {
@ -196,14 +194,14 @@ describe("ConnectionManager", function () {
describe("dialPeer method", function () {
beforeEach(function () {
getConnectionsStub = sinon.stub(
(connectionManager as any).libp2p,
(waku.connectionManager as any).libp2p,
"getConnections"
);
getTagNamesForPeerStub = sinon.stub(
connectionManager as any,
waku.connectionManager as any,
"getTagNamesForPeer"
);
dialPeerStub = sinon.stub(connectionManager as any, "dialPeer");
dialPeerStub = sinon.stub(waku.connectionManager as any, "dialPeer");
});
afterEach(function () {