Danish Arora 24c24cc27d
feat!: ConnectionManager and KeepAliveManager (#1135)
* initialises ConnectionManager and KeepAliveManager
ports from previous PR and makes necessary improvements and reductions

* address: comments

* map a ConnectionManager instance with a WakuNode

* abstract event listeners logic

* minor fix

* minor cleaning

* instantiate KeepAliveManager instead of extending

* fix build and enable all tests

* fix CI

* address review

* refine event handlers
- only removes the previously attached callback from
the event handlers while shutting down service
- removes the requirement of passing around
`keepAliveOptions` and `relay` inside of
`ConnectionManager`

* add verbosity to interface

* make `dialPeer()` more readable

* use set to push tags to avoid duplicates

* fix: merge build

* remove: logging function

* rename startService and stopService

* remove: future TODO

added that as part of future refactor

* use the new libp2p api

* initialise options in constructor//fix TS error

* remove stale export

* address principal review

* reset test timeout to master
2023-02-20 15:30:59 +05:30

204 lines
5.5 KiB
TypeScript

import type { Stream } from "@libp2p/interface-connection";
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { PubSub } from "@libp2p/interface-pubsub";
import type { Multiaddr } from "@multiformats/multiaddr";
import type {
IFilter,
ILightPush,
IRelay,
IStore,
Waku,
} from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import debug from "debug";
import { ConnectionManager } from "./connection_manager.js";
import * as relayConstants from "./relay/constants.js";
export const DefaultPingKeepAliveValueSecs = 0;
export const DefaultRelayKeepAliveValueSecs = 5 * 60;
export const DefaultUserAgent = "js-waku";
const log = debug("waku:waku");
export interface WakuOptions {
/**
* Set keep alive frequency in seconds: Waku will send a `/ipfs/ping/1.0.0`
* request to each peer after the set number of seconds. Set to 0 to disable.
*
* @default {@link @waku/core.DefaultPingKeepAliveValueSecs}
*/
pingKeepAlive?: number;
/**
* Set keep alive frequency in seconds: Waku will send a ping message over
* relay to each peer after the set number of seconds. Set to 0 to disable.
*
* @default {@link @waku/core.DefaultRelayKeepAliveValueSecs}
*/
relayKeepAlive?: number;
/**
* Set the user agent string to be used in identification of the node.
* @default {@link @waku/core.DefaultUserAgent}
*/
userAgent?: string;
}
export class WakuNode implements Waku {
public libp2p: Libp2p;
public relay?: IRelay;
public store?: IStore;
public filter?: IFilter;
public lightPush?: ILightPush;
public connectionManager: ConnectionManager;
constructor(
options: WakuOptions,
libp2p: Libp2p,
store?: (libp2p: Libp2p) => IStore,
lightPush?: (libp2p: Libp2p) => ILightPush,
filter?: (libp2p: Libp2p) => IFilter
) {
this.libp2p = libp2p;
if (store) {
this.store = store(libp2p);
}
if (filter) {
this.filter = filter(libp2p);
}
if (lightPush) {
this.lightPush = lightPush(libp2p);
}
if (isRelay(libp2p.pubsub)) {
this.relay = libp2p.pubsub;
}
const pingKeepAlive =
options.pingKeepAlive || DefaultPingKeepAliveValueSecs;
const relayKeepAlive = this.relay
? options.relayKeepAlive || DefaultRelayKeepAliveValueSecs
: 0;
const peerId = this.libp2p.peerId.toString();
this.connectionManager = ConnectionManager.create(
peerId,
libp2p,
{ pingKeepAlive, relayKeepAlive },
this.relay
);
log(
"Waku node created",
peerId,
`relay: ${!!this.relay}, store: ${!!this.store}, light push: ${!!this
.lightPush}, filter: ${!!this.filter}`
);
}
/**
* Dials to the provided peer.
*
* @param peer The peer to dial
* @param protocols Waku protocols we expect from the peer; Defaults to mounted protocols
*/
async dial(
peer: PeerId | Multiaddr,
protocols?: Protocols[]
): Promise<Stream> {
const _protocols = protocols ?? [];
if (typeof protocols === "undefined") {
this.relay && _protocols.push(Protocols.Relay);
this.store && _protocols.push(Protocols.Store);
this.filter && _protocols.push(Protocols.Filter);
this.lightPush && _protocols.push(Protocols.LightPush);
}
const codecs: string[] = [];
if (_protocols.includes(Protocols.Relay)) {
if (this.relay) {
this.relay.multicodecs.forEach((codec) => codecs.push(codec));
} else {
log(
"Relay codec not included in dial codec: protocol not mounted locally"
);
}
}
if (_protocols.includes(Protocols.Store)) {
if (this.store) {
codecs.push(this.store.multicodec);
} else {
log(
"Store codec not included in dial codec: protocol not mounted locally"
);
}
}
if (_protocols.includes(Protocols.LightPush)) {
if (this.lightPush) {
codecs.push(this.lightPush.multicodec);
} else {
log(
"Light Push codec not included in dial codec: protocol not mounted locally"
);
}
}
if (_protocols.includes(Protocols.Filter)) {
if (this.filter) {
codecs.push(this.filter.multicodec);
} else {
log(
"Filter codec not included in dial codec: protocol not mounted locally"
);
}
}
log(`Dialing to ${peer.toString()} with protocols ${_protocols}`);
return this.libp2p.dialProtocol(peer, codecs);
}
async start(): Promise<void> {
await this.libp2p.start();
}
async stop(): Promise<void> {
this.connectionManager.stop();
await this.libp2p.stop();
}
isStarted(): boolean {
return this.libp2p.isStarted();
}
/**
* Return the local multiaddr with peer id on which libp2p is listening.
*
* @throws if libp2p is not listening on localhost.
*/
getLocalMultiaddrWithID(): string {
const localMultiaddr = this.libp2p
.getMultiaddrs()
.find((addr) => addr.toString().match(/127\.0\.0\.1/));
if (!localMultiaddr || localMultiaddr.toString() === "") {
throw "Not listening on localhost";
}
return localMultiaddr + "/p2p/" + this.libp2p.peerId.toString();
}
}
function isRelay(pubsub: PubSub): pubsub is IRelay {
if (pubsub) {
try {
return pubsub.multicodecs.includes(
relayConstants.RelayCodecs[relayConstants.RelayCodecs.length - 1]
);
// Exception is expected if `libp2p` was not instantiated with pubsub
// eslint-disable-next-line no-empty
} catch (e) {}
}
return false;
}