feat(lightpush): peer management for protocols (#2003)

* chore: make `dropConnection` to be a public function

* feat: peers are maintained for protocols
- passes `ConnectionManager` to ProtocolSDK to allow disconnecting from within protocol
- maintains `numPeersToUse` for each protocol within BaseProtocolSDK

* fix: pass options to protocols

* chore: update interfaces to allow public access

* chore: improve logging on protocol

* fix: renew peer upon failure

* chore(tests): allow DefaultPubsubTopic

* feat(lightpush): write peer management tests

* chore: rename test

* feat: add lock to `maintainPeers()` to handle parallelisation of requests
fixes parallelisation of lightpush.send() requests

* fix: concurrent lightpush requests

* fix: test & improve peers fetching

* chore: use getter

* address comments

* chore: smaller improvements

* feat: attempt to improve time for first lightpush.send()

* chore: use `window.interval` for type-safety

* chore: remove delays

* feat: add autoRetry

* feat: `forceUseAllPeers` to wait for all connected peers to be resoled
This commit is contained in:
Danish Arora 2024-06-19 01:52:16 -04:00 committed by GitHub
parent 69f9045bed
commit 93e78c3b87
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 541 additions and 156 deletions

View File

@ -89,6 +89,34 @@ export class ConnectionManager
return instance; return instance;
} }
stop(): void {
this.keepAliveManager.stopAll();
this.libp2p.removeEventListener(
"peer:connect",
this.onEventHandlers["peer:connect"]
);
this.libp2p.removeEventListener(
"peer:disconnect",
this.onEventHandlers["peer:disconnect"]
);
this.libp2p.removeEventListener(
"peer:discovery",
this.onEventHandlers["peer:discovery"]
);
}
async dropConnection(peerId: PeerId): Promise<void> {
try {
this.keepAliveManager.stop(peerId);
await this.libp2p.hangUp(peerId);
log.info(`Dropped connection with peer ${peerId.toString()}`);
} catch (error) {
log.error(
`Error dropping connection with peer ${peerId.toString()} - ${error}`
);
}
}
public async getPeersByDiscovery(): Promise<PeersByDiscoveryResult> { public async getPeersByDiscovery(): Promise<PeersByDiscoveryResult> {
const peersDiscovered = await this.libp2p.peerStore.all(); const peersDiscovered = await this.libp2p.peerStore.all();
const peersConnected = this.libp2p const peersConnected = this.libp2p
@ -200,22 +228,6 @@ export class ConnectionManager
this.startPeerDisconnectionListener(); this.startPeerDisconnectionListener();
} }
stop(): void {
this.keepAliveManager.stopAll();
this.libp2p.removeEventListener(
"peer:connect",
this.onEventHandlers["peer:connect"]
);
this.libp2p.removeEventListener(
"peer:disconnect",
this.onEventHandlers["peer:disconnect"]
);
this.libp2p.removeEventListener(
"peer:discovery",
this.onEventHandlers["peer:discovery"]
);
}
private async dialPeer(peerId: PeerId): Promise<void> { private async dialPeer(peerId: PeerId): Promise<void> {
this.currentActiveParallelDialCount += 1; this.currentActiveParallelDialCount += 1;
let dialAttempt = 0; let dialAttempt = 0;
@ -298,18 +310,6 @@ export class ConnectionManager
} }
} }
private async dropConnection(peerId: PeerId): Promise<void> {
try {
this.keepAliveManager.stop(peerId);
await this.libp2p.hangUp(peerId);
log.info(`Dropped connection with peer ${peerId.toString()}`);
} catch (error) {
log.error(
`Error dropping connection with peer ${peerId.toString()} - ${error}`
);
}
}
private processDialQueue(): void { private processDialQueue(): void {
if ( if (
this.pendingPeerDialQueue.length > 0 && this.pendingPeerDialQueue.length > 0 &&

View File

@ -61,6 +61,7 @@ export interface IConnectionStateEvents {
export interface IConnectionManager export interface IConnectionManager
extends TypedEventEmitter<IPeersByDiscoveryEvents & IConnectionStateEvents> { extends TypedEventEmitter<IPeersByDiscoveryEvents & IConnectionStateEvents> {
dropConnection(peerId: PeerId): Promise<void>;
getPeersByDiscovery(): Promise<PeersByDiscoveryResult>; getPeersByDiscovery(): Promise<PeersByDiscoveryResult>;
stop(): void; stop(): void;
} }

View File

@ -25,7 +25,9 @@ export type IBaseProtocolCore = {
}; };
export type IBaseProtocolSDK = { export type IBaseProtocolSDK = {
numPeers: number; renewPeer: (peerToDisconnect: PeerId) => Promise<void>;
readonly connectedPeers: Peer[];
readonly numPeersToUse: number;
}; };
export type ContentTopicInfo = { export type ContentTopicInfo = {

View File

@ -2,5 +2,35 @@ import type { IEncoder, IMessage } from "./message.js";
import { SDKProtocolResult } from "./protocols.js"; import { SDKProtocolResult } from "./protocols.js";
export interface ISender { export interface ISender {
send: (encoder: IEncoder, message: IMessage) => Promise<SDKProtocolResult>; send: (
encoder: IEncoder,
message: IMessage,
sendOptions?: SendOptions
) => Promise<SDKProtocolResult>;
} }
/**
* Options for using LightPush
*/
export type SendOptions = {
/**
* Optional flag to enable auto-retry with exponential backoff
*/
autoRetry?: boolean;
/**
* Optional flag to force using all available peers
*/
forceUseAllPeers?: boolean;
/**
* Optional maximum number of attempts for exponential backoff
*/
maxAttempts?: number;
/**
* Optional initial delay in milliseconds for exponential backoff
*/
initialDelay?: number;
/**
* Optional maximum delay in milliseconds for exponential backoff
*/
maxDelay?: number;
};

View File

@ -1,8 +1,5 @@
import { type Libp2pComponents, type LightNode } from "@waku/interfaces"; import { type Libp2pComponents, type LightNode } from "@waku/interfaces";
import { wakuFilter } from "../protocols/filter.js";
import { wakuLightPush } from "../protocols/light_push.js";
import { wakuStore } from "../protocols/store.js";
import { createLibp2pAndUpdateOptions } from "../utils/libp2p.js"; import { createLibp2pAndUpdateOptions } from "../utils/libp2p.js";
import { CreateWakuNodeOptions, WakuNode, WakuOptions } from "../waku.js"; import { CreateWakuNodeOptions, WakuNode, WakuOptions } from "../waku.js";
@ -18,15 +15,9 @@ export async function createLightNode(
): Promise<LightNode> { ): Promise<LightNode> {
const libp2p = await createLibp2pAndUpdateOptions(options); const libp2p = await createLibp2pAndUpdateOptions(options);
const store = wakuStore(options); return new WakuNode(options as WakuOptions, libp2p, {
const lightPush = wakuLightPush(options); store: true,
const filter = wakuFilter(options); lightpush: true,
filter: true
return new WakuNode( }) as LightNode;
options as WakuOptions,
libp2p,
store,
lightPush,
filter
) as LightNode;
} }

View File

@ -1,15 +1,220 @@
import { IBaseProtocolSDK } from "@waku/interfaces"; import type { Peer, PeerId } from "@libp2p/interface";
import { ConnectionManager } from "@waku/core";
import { BaseProtocol } from "@waku/core/lib/base_protocol";
import { IBaseProtocolSDK, SendOptions } from "@waku/interfaces";
import { delay, Logger } from "@waku/utils";
interface Options { interface Options {
numPeersToUse?: number; numPeersToUse?: number;
maintainPeersInterval?: number;
} }
const DEFAULT_NUM_PEERS_TO_USE = 3; const DEFAULT_NUM_PEERS_TO_USE = 3;
const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000;
export class BaseProtocolSDK implements IBaseProtocolSDK { export class BaseProtocolSDK implements IBaseProtocolSDK {
public readonly numPeers: number; public readonly numPeersToUse: number;
private peers: Peer[] = [];
private maintainPeersIntervalId: ReturnType<
typeof window.setInterval
> | null = null;
log: Logger;
constructor(options: Options) { private maintainPeersLock = false;
this.numPeers = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE;
constructor(
protected core: BaseProtocol,
private connectionManager: ConnectionManager,
options: Options
) {
this.log = new Logger(`sdk:${core.multicodec}`);
this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE;
const maintainPeersInterval =
options?.maintainPeersInterval ?? DEFAULT_MAINTAIN_PEERS_INTERVAL;
void this.startMaintainPeersInterval(maintainPeersInterval);
}
get connectedPeers(): Peer[] {
return this.peers;
}
/**
* Disconnects from a peer and tries to find a new one to replace it.
* @param peerToDisconnect The peer to disconnect from.
*/
public async renewPeer(peerToDisconnect: PeerId): Promise<void> {
this.log.info(`Renewing peer ${peerToDisconnect}`);
try {
await this.connectionManager.dropConnection(peerToDisconnect);
this.peers = this.peers.filter((peer) => peer.id !== peerToDisconnect);
this.log.info(
`Peer ${peerToDisconnect} disconnected and removed from the peer list`
);
await this.findAndAddPeers(1);
} catch (error) {
this.log.info(
"Peer renewal failed, relying on the interval to find a new peer"
);
}
}
/**
* Stops the maintain peers interval.
*/
public stopMaintainPeersInterval(): void {
if (this.maintainPeersIntervalId) {
clearInterval(this.maintainPeersIntervalId);
this.maintainPeersIntervalId = null;
this.log.info("Maintain peers interval stopped");
}
}
/**
* Checks if there are peers to send a message to.
* If `forceUseAllPeers` is `false` (default) and there are connected peers, returns `true`.
* If `forceUseAllPeers` is `true` or there are no connected peers, tries to find new peers from the ConnectionManager.
* If `autoRetry` is `false`, returns `false` if no peers are found.
* If `autoRetry` is `true`, tries to find new peers from the ConnectionManager with exponential backoff.
* Returns `true` if peers are found, `false` otherwise.
* @param options Optional options object
* @param options.autoRetry Optional flag to enable auto-retry with exponential backoff (default: false)
* @param options.forceUseAllPeers Optional flag to force using all available peers (default: false)
* @param options.initialDelay Optional initial delay in milliseconds for exponential backoff (default: 10)
* @param options.maxAttempts Optional maximum number of attempts for exponential backoff (default: 3)
* @param options.maxDelay Optional maximum delay in milliseconds for exponential backoff (default: 100)
*/
protected hasPeers = async (
options: Partial<SendOptions> = {}
): Promise<boolean> => {
const {
autoRetry = false,
forceUseAllPeers = false,
initialDelay = 10,
maxAttempts = 3,
maxDelay = 100
} = options;
if (!forceUseAllPeers && this.connectedPeers.length > 0) return true;
let attempts = 0;
while (attempts < maxAttempts) {
attempts++;
if (await this.maintainPeers()) {
if (this.peers.length < this.numPeersToUse) {
this.log.warn(
`Found only ${this.peers.length} peers, expected ${this.numPeersToUse}`
);
}
return true;
}
if (!autoRetry) return false;
const delayMs = Math.min(
initialDelay * Math.pow(2, attempts - 1),
maxDelay
);
await delay(delayMs);
}
this.log.error("Failed to find peers to send message to");
return false;
};
/**
* Starts an interval to maintain the peers list to `numPeersToUse`.
* @param interval The interval in milliseconds to maintain the peers.
*/
private async startMaintainPeersInterval(interval: number): Promise<void> {
this.log.info("Starting maintain peers interval");
try {
await this.maintainPeers();
this.maintainPeersIntervalId = setInterval(() => {
this.maintainPeers().catch((error) => {
this.log.error("Error during maintain peers interval:", error);
});
}, interval);
this.log.info(
`Maintain peers interval started with interval ${interval}ms`
);
} catch (error) {
this.log.error("Error starting maintain peers interval:", error);
throw error;
}
}
/**
* Maintains the peers list to `numPeersToUse`.
*/
private async maintainPeers(): Promise<boolean> {
if (this.maintainPeersLock) {
return false;
}
this.maintainPeersLock = true;
this.log.info(`Maintaining peers, current count: ${this.peers.length}`);
try {
const numPeersToAdd = this.numPeersToUse - this.peers.length;
if (numPeersToAdd > 0) {
await this.findAndAddPeers(numPeersToAdd);
}
this.log.info(
`Peer maintenance completed, current count: ${this.peers.length}`
);
} finally {
this.maintainPeersLock = false;
}
return true;
}
/**
* Finds and adds new peers to the peers list.
* @param numPeers The number of peers to find and add.
*/
private async findAndAddPeers(numPeers: number): Promise<void> {
this.log.info(`Finding and adding ${numPeers} new peers`);
try {
const additionalPeers = await this.findAdditionalPeers(numPeers);
this.peers = [...this.peers, ...additionalPeers];
this.log.info(
`Added ${additionalPeers.length} new peers, total peers: ${this.peers.length}`
);
} catch (error) {
this.log.error("Error finding and adding new peers:", error);
throw error;
}
}
/**
* Finds additional peers.
* Attempts to find peers without using bootstrap peers first,
* If no peers are found,
* tries with bootstrap peers.
* @param numPeers The number of peers to find.
*/
private async findAdditionalPeers(numPeers: number): Promise<Peer[]> {
this.log.info(`Finding ${numPeers} additional peers`);
try {
let newPeers = await this.core.getPeers({
maxBootstrapPeers: 0,
numPeers: numPeers
});
if (newPeers.length === 0) {
this.log.warn("No new peers found, trying with bootstrap peers");
newPeers = await this.core.getPeers({
maxBootstrapPeers: numPeers,
numPeers: numPeers
});
}
newPeers = newPeers.filter(
(peer) => this.peers.some((p) => p.id === peer.id) === false
);
return newPeers;
} catch (error) {
this.log.error("Error finding additional peers:", error);
throw error;
}
} }
} }

View File

@ -1,5 +1,5 @@
import type { Peer } from "@libp2p/interface"; import type { Peer } from "@libp2p/interface";
import { FilterCore } from "@waku/core"; import { ConnectionManager, FilterCore } from "@waku/core";
import { import {
type Callback, type Callback,
type ContentTopic, type ContentTopic,
@ -261,26 +261,34 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
public readonly protocol: FilterCore; public readonly protocol: FilterCore;
private activeSubscriptions = new Map<string, SubscriptionManager>(); private activeSubscriptions = new Map<string, SubscriptionManager>();
private async handleIncomingMessage(
pubsubTopic: PubsubTopic,
wakuMessage: WakuMessage
): Promise<void> {
const subscription = this.getActiveSubscription(pubsubTopic);
if (!subscription) {
log.error(`No subscription locally registered for topic ${pubsubTopic}`);
return;
}
await subscription.processIncomingMessage(wakuMessage); constructor(
} connectionManager: ConnectionManager,
libp2p: Libp2p,
options?: ProtocolCreateOptions
) {
super(
new FilterCore(
async (pubsubTopic: PubsubTopic, wakuMessage: WakuMessage) => {
const subscription = this.getActiveSubscription(pubsubTopic);
if (!subscription) {
log.error(
`No subscription locally registered for topic ${pubsubTopic}`
);
return;
}
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { await subscription.processIncomingMessage(wakuMessage);
super({ numPeersToUse: options?.numPeersToUse }); },
this.protocol = new FilterCore( libp2p,
this.handleIncomingMessage.bind(this), options
libp2p, ),
options connectionManager,
{ numPeersToUse: options?.numPeersToUse }
); );
this.protocol = this.core as FilterCore;
this.activeSubscriptions = new Map(); this.activeSubscriptions = new Map();
} }
@ -430,9 +438,10 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
} }
export function wakuFilter( export function wakuFilter(
init: ProtocolCreateOptions connectionManager: ConnectionManager,
init?: ProtocolCreateOptions
): (libp2p: Libp2p) => IFilterSDK { ): (libp2p: Libp2p) => IFilterSDK {
return (libp2p: Libp2p) => new FilterSDK(libp2p, init); return (libp2p: Libp2p) => new FilterSDK(connectionManager, libp2p, init);
} }
async function pushMessage<T extends IDecodedMessage>( async function pushMessage<T extends IDecodedMessage>(

View File

@ -1,5 +1,5 @@
import type { PeerId } from "@libp2p/interface"; import type { PeerId } from "@libp2p/interface";
import { LightPushCore } from "@waku/core"; import { ConnectionManager, LightPushCore } from "@waku/core";
import { import {
Failure, Failure,
type IEncoder, type IEncoder,
@ -8,7 +8,8 @@ import {
type Libp2p, type Libp2p,
type ProtocolCreateOptions, type ProtocolCreateOptions,
ProtocolError, ProtocolError,
SDKProtocolResult SDKProtocolResult,
SendOptions
} from "@waku/interfaces"; } from "@waku/interfaces";
import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils"; import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils";
@ -19,12 +20,28 @@ const log = new Logger("sdk:light-push");
class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
public readonly protocol: LightPushCore; public readonly protocol: LightPushCore;
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { constructor(
super({ numPeersToUse: options?.numPeersToUse }); connectionManager: ConnectionManager,
this.protocol = new LightPushCore(libp2p, options); libp2p: Libp2p,
options?: ProtocolCreateOptions
) {
super(new LightPushCore(libp2p, options), connectionManager, {
numPeersToUse: options?.numPeersToUse
});
this.protocol = this.core as LightPushCore;
} }
async send(encoder: IEncoder, message: IMessage): Promise<SDKProtocolResult> { async send(
encoder: IEncoder,
message: IMessage,
_options?: SendOptions
): Promise<SDKProtocolResult> {
const options = {
autoRetry: true,
..._options
} as SendOptions;
const successes: PeerId[] = []; const successes: PeerId[] = [];
const failures: Failure[] = []; const failures: Failure[] = [];
@ -43,15 +60,19 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
}; };
} }
const peers = await this.protocol.getPeers(); const hasPeers = await this.hasPeers(options);
if (!peers.length) { if (!hasPeers) {
return { return {
successes, successes,
failures: [{ error: ProtocolError.NO_PEER_AVAILABLE }] failures: [
{
error: ProtocolError.NO_PEER_AVAILABLE
}
]
}; };
} }
const sendPromises = peers.map((peer) => const sendPromises = this.connectedPeers.map((peer) =>
this.protocol.send(encoder, message, peer) this.protocol.send(encoder, message, peer)
); );
@ -64,12 +85,15 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
successes.push(success); successes.push(success);
} }
if (failure) { if (failure) {
if (failure.peerId) {
await this.renewPeer(failure.peerId);
}
failures.push(failure); failures.push(failure);
} }
} else { } else {
log.error("Failed to send message to peer", result.reason); log.error("Failed to send message to peer", result.reason);
failures.push({ error: ProtocolError.GENERIC_FAIL }); failures.push({ error: ProtocolError.GENERIC_FAIL });
// TODO: handle renewing faulty peers with new peers (https://github.com/waku-org/js-waku/issues/1463)
} }
} }
@ -81,7 +105,8 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
} }
export function wakuLightPush( export function wakuLightPush(
connectionManager: ConnectionManager,
init: Partial<ProtocolCreateOptions> = {} init: Partial<ProtocolCreateOptions> = {}
): (libp2p: Libp2p) => ILightPushSDK { ): (libp2p: Libp2p) => ILightPushSDK {
return (libp2p: Libp2p) => new LightPushSDK(libp2p, init); return (libp2p: Libp2p) => new LightPushSDK(connectionManager, libp2p, init);
} }

View File

@ -1,5 +1,5 @@
import { sha256 } from "@noble/hashes/sha256"; import { sha256 } from "@noble/hashes/sha256";
import { StoreCore, waku_store } from "@waku/core"; import { ConnectionManager, StoreCore, waku_store } from "@waku/core";
import { import {
Cursor, Cursor,
IDecodedMessage, IDecodedMessage,
@ -25,11 +25,17 @@ const log = new Logger("waku:store:protocol");
export class StoreSDK extends BaseProtocolSDK implements IStoreSDK { export class StoreSDK extends BaseProtocolSDK implements IStoreSDK {
public readonly protocol: StoreCore; public readonly protocol: StoreCore;
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { constructor(
connectionManager: ConnectionManager,
libp2p: Libp2p,
options?: ProtocolCreateOptions
) {
// TODO: options.numPeersToUse is disregarded: https://github.com/waku-org/js-waku/issues/1685 // TODO: options.numPeersToUse is disregarded: https://github.com/waku-org/js-waku/issues/1685
super({ numPeersToUse: DEFAULT_NUM_PEERS }); super(new StoreCore(libp2p, options), connectionManager, {
numPeersToUse: DEFAULT_NUM_PEERS
});
this.protocol = new StoreCore(libp2p, options); this.protocol = this.core as StoreCore;
} }
/** /**
@ -67,7 +73,7 @@ export class StoreSDK extends BaseProtocolSDK implements IStoreSDK {
const peer = ( const peer = (
await this.protocol.getPeers({ await this.protocol.getPeers({
numPeers: this.numPeers, numPeers: this.numPeersToUse,
maxBootstrapPeers: 1 maxBootstrapPeers: 1
}) })
)[0]; )[0];
@ -315,7 +321,8 @@ export class StoreSDK extends BaseProtocolSDK implements IStoreSDK {
} }
export function wakuStore( export function wakuStore(
connectionManager: ConnectionManager,
init: Partial<ProtocolCreateOptions> = {} init: Partial<ProtocolCreateOptions> = {}
): (libp2p: Libp2p) => IStoreSDK { ): (libp2p: Libp2p) => IStoreSDK {
return (libp2p: Libp2p) => new StoreSDK(libp2p, init); return (libp2p: Libp2p) => new StoreSDK(connectionManager, libp2p, init);
} }

View File

@ -1,9 +1,6 @@
import { type FullNode, type RelayNode } from "@waku/interfaces"; import { type FullNode, type RelayNode } from "@waku/interfaces";
import { RelayCreateOptions, wakuRelay } from "@waku/relay"; import { RelayCreateOptions } from "@waku/relay";
import { wakuFilter } from "../protocols/filter.js";
import { wakuLightPush } from "../protocols/light_push.js";
import { wakuStore } from "../protocols/store.js";
import { createLibp2pAndUpdateOptions } from "../utils/libp2p.js"; import { createLibp2pAndUpdateOptions } from "../utils/libp2p.js";
import { CreateWakuNodeOptions, WakuNode, WakuOptions } from "../waku.js"; import { CreateWakuNodeOptions, WakuNode, WakuOptions } from "../waku.js";
@ -24,16 +21,9 @@ export async function createRelayNode(
): Promise<RelayNode> { ): Promise<RelayNode> {
const libp2p = await createLibp2pAndUpdateOptions(options); const libp2p = await createLibp2pAndUpdateOptions(options);
const relay = wakuRelay(options?.pubsubTopics || []); return new WakuNode(options as WakuOptions, libp2p, {
relay: true
return new WakuNode( }) as RelayNode;
options as WakuOptions,
libp2p,
undefined,
undefined,
undefined,
relay
) as RelayNode;
} }
/** /**
@ -56,17 +46,10 @@ export async function createFullNode(
): Promise<FullNode> { ): Promise<FullNode> {
const libp2p = await createLibp2pAndUpdateOptions(options); const libp2p = await createLibp2pAndUpdateOptions(options);
const store = wakuStore(options); return new WakuNode(options as WakuOptions, libp2p, {
const lightPush = wakuLightPush(options); filter: true,
const filter = wakuFilter(options); lightpush: true,
const relay = wakuRelay(options?.pubsubTopics || []); relay: true,
store: true
return new WakuNode( }) as FullNode;
options as WakuOptions,
libp2p,
store,
lightPush,
filter,
relay
) as FullNode;
} }

View File

@ -16,8 +16,12 @@ import type {
Waku Waku
} from "@waku/interfaces"; } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces";
import { wakuRelay } from "@waku/relay";
import { Logger } from "@waku/utils"; import { Logger } from "@waku/utils";
import { wakuFilter } from "./protocols/filter.js";
import { wakuLightPush } from "./protocols/light_push.js";
import { wakuStore } from "./protocols/store.js";
import { subscribeToContentTopic } from "./utils/content_topic.js"; import { subscribeToContentTopic } from "./utils/content_topic.js";
export const DefaultPingKeepAliveValueSecs = 5 * 60; export const DefaultPingKeepAliveValueSecs = 5 * 60;
@ -53,6 +57,13 @@ export interface WakuOptions {
export type CreateWakuNodeOptions = ProtocolCreateOptions & export type CreateWakuNodeOptions = ProtocolCreateOptions &
Partial<WakuOptions>; Partial<WakuOptions>;
type ProtocolsEnabled = {
filter?: boolean;
lightpush?: boolean;
store?: boolean;
relay?: boolean;
};
export class WakuNode implements Waku { export class WakuNode implements Waku {
public libp2p: Libp2p; public libp2p: Libp2p;
public relay?: IRelay; public relay?: IRelay;
@ -65,10 +76,7 @@ export class WakuNode implements Waku {
constructor( constructor(
options: WakuOptions, options: WakuOptions,
libp2p: Libp2p, libp2p: Libp2p,
store?: (libp2p: Libp2p) => IStoreSDK, protocolsEnabled: ProtocolsEnabled
lightPush?: (libp2p: Libp2p) => ILightPushSDK,
filter?: (libp2p: Libp2p) => IFilterSDK,
relay?: (libp2p: Libp2p) => IRelay
) { ) {
if (options.pubsubTopics.length == 0) { if (options.pubsubTopics.length == 0) {
throw new Error("At least one pubsub topic must be provided"); throw new Error("At least one pubsub topic must be provided");
@ -77,19 +85,13 @@ export class WakuNode implements Waku {
this.libp2p = libp2p; this.libp2p = libp2p;
if (store) { protocolsEnabled = {
this.store = store(libp2p); filter: false,
} lightpush: false,
if (filter) { store: false,
this.filter = filter(libp2p); relay: false,
} ...protocolsEnabled
if (lightPush) { };
this.lightPush = lightPush(libp2p);
}
if (relay) {
this.relay = relay(libp2p);
}
const pingKeepAlive = const pingKeepAlive =
options.pingKeepAlive || DefaultPingKeepAliveValueSecs; options.pingKeepAlive || DefaultPingKeepAliveValueSecs;
@ -107,6 +109,26 @@ export class WakuNode implements Waku {
this.relay this.relay
); );
if (protocolsEnabled.store) {
const store = wakuStore(this.connectionManager, options);
this.store = store(libp2p);
}
if (protocolsEnabled.lightpush) {
const lightPush = wakuLightPush(this.connectionManager, options);
this.lightPush = lightPush(libp2p);
}
if (protocolsEnabled.filter) {
const filter = wakuFilter(this.connectionManager, options);
this.filter = filter(libp2p);
}
if (protocolsEnabled.relay) {
const relay = wakuRelay(this.pubsubTopics);
this.relay = relay(libp2p);
}
log.info( log.info(
"Waku node created", "Waku node created",
peerId, peerId,

View File

@ -1,5 +1,6 @@
import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core";
import { import {
DefaultPubsubTopic,
ISubscriptionSDK, ISubscriptionSDK,
LightNode, LightNode,
ProtocolCreateOptions, ProtocolCreateOptions,
@ -68,12 +69,14 @@ export async function validatePingError(
export async function runMultipleNodes( export async function runMultipleNodes(
context: Context, context: Context,
shardInfo: ShardingParams, shardInfo?: ShardingParams,
strictChecking: boolean = false, strictChecking: boolean = false,
numServiceNodes = 3, numServiceNodes = 3,
withoutFilter = false withoutFilter = false
): Promise<[ServiceNodesFleet, LightNode]> { ): Promise<[ServiceNodesFleet, LightNode]> {
const pubsubTopics = shardInfoToPubsubTopics(shardInfo); const pubsubTopics = shardInfo
? shardInfoToPubsubTopics(shardInfo)
: [DefaultPubsubTopic];
// create numServiceNodes nodes // create numServiceNodes nodes
const serviceNodes = await ServiceNodesFleet.createAndRun( const serviceNodes = await ServiceNodesFleet.createAndRun(
context, context,

View File

@ -0,0 +1,93 @@
import { DefaultPubsubTopic, LightNode } from "@waku/interfaces";
import { createEncoder, utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
import { describe } from "mocha";
import {
afterEachCustom,
beforeEachCustom,
ServiceNodesFleet
} from "../../src/index.js";
import {
runMultipleNodes,
teardownNodesWithRedundancy
} from "../filter/utils.js";
describe("Waku Light Push: Peer Management: E2E", function () {
this.timeout(15000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
beforeEachCustom(this, async () => {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
undefined,
undefined,
5
);
});
afterEachCustom(this, async () => {
await teardownNodesWithRedundancy(serviceNodes, waku);
});
const encoder = createEncoder({
pubsubTopic: DefaultPubsubTopic,
contentTopic: "/test"
});
it("Number of peers are maintained correctly", async function () {
const { successes, failures } = await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello_World")
});
expect(successes.length).to.be.greaterThan(0);
expect(successes.length).to.be.equal(waku.lightPush.numPeersToUse);
if (failures) {
expect(failures.length).to.equal(0);
}
});
it("Failed peers are renewed", async function () {
// send a lightpush request -- should have all successes
const response1 = await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello_World")
});
expect(response1.successes.length).to.be.equal(
waku.lightPush.numPeersToUse
);
if (response1.failures) {
expect(response1.failures.length).to.equal(0);
}
// disconnect from one peer to force a failure
const peerToDisconnect = response1.successes[0];
await waku.connectionManager.dropConnection(peerToDisconnect);
// send another lightpush request -- should have all successes except the one that was disconnected
const response2 = await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello_World")
});
// check that the peer that was disconnected is not in the new successes
expect(response2.successes).to.not.include(peerToDisconnect);
expect(response2.failures).to.have.length(1);
expect(response2.failures?.[0].peerId).to.equal(peerToDisconnect);
// send another lightpush request -- renewal should have triggerred and new peer should be used instead of the disconnected one
const response3 = await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello_World")
});
expect(response3.successes.length).to.be.equal(
waku.lightPush.numPeersToUse
);
expect(response3.successes).to.not.include(peerToDisconnect);
if (response3.failures) {
expect(response3.failures.length).to.equal(0);
}
});
});

View File

@ -52,12 +52,12 @@ describe("Waku Light Push : Multiple PubsubTopics", function () {
contentTopic: customContentTopic2 contentTopic: customContentTopic2
}); });
let nimPeerId: PeerId; let node1PeerId: PeerId;
beforeEachCustom(this, async () => { beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, shardInfo); [nwaku, waku] = await runNodes(this.ctx, shardInfo);
messageCollector = new MessageCollector(nwaku); messageCollector = new MessageCollector(nwaku);
nimPeerId = await nwaku.getPeerId(); node1PeerId = await nwaku.getPeerId();
}); });
afterEachCustom(this, async () => { afterEachCustom(this, async () => {
@ -69,7 +69,7 @@ describe("Waku Light Push : Multiple PubsubTopics", function () {
payload: utf8ToBytes(messageText) payload: utf8ToBytes(messageText)
}); });
expect(pushResponse.successes[0].toString()).to.eq(nimPeerId.toString()); expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString());
expect( expect(
await messageCollector.waitForMessages(1, { await messageCollector.waitForMessages(1, {
@ -89,8 +89,8 @@ describe("Waku Light Push : Multiple PubsubTopics", function () {
const pushResponse2 = await waku.lightPush.send(customEncoder2, { const pushResponse2 = await waku.lightPush.send(customEncoder2, {
payload: utf8ToBytes("M2") payload: utf8ToBytes("M2")
}); });
expect(pushResponse1.successes[0].toString()).to.eq(nimPeerId.toString()); expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString());
expect(pushResponse2.successes[0].toString()).to.eq(nimPeerId.toString()); expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString());
const messageCollector2 = new MessageCollector(nwaku); const messageCollector2 = new MessageCollector(nwaku);
@ -195,12 +195,12 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () {
pubsubTopicShardInfo: pubsubTopicToSingleShardInfo(autoshardingPubsubTopic2) pubsubTopicShardInfo: pubsubTopicToSingleShardInfo(autoshardingPubsubTopic2)
}); });
let nimPeerId: PeerId; let node1PeerId: PeerId;
beforeEachCustom(this, async () => { beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, shardInfo); [nwaku, waku] = await runNodes(this.ctx, shardInfo);
messageCollector = new MessageCollector(nwaku); messageCollector = new MessageCollector(nwaku);
nimPeerId = await nwaku.getPeerId(); node1PeerId = await nwaku.getPeerId();
}); });
afterEachCustom(this, async () => { afterEachCustom(this, async () => {
@ -213,7 +213,7 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () {
}); });
expect(pushResponse.failures).to.be.empty; expect(pushResponse.failures).to.be.empty;
expect(pushResponse.successes[0].toString()).to.eq(nimPeerId.toString()); expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString());
expect( expect(
await messageCollector.waitForMessagesAutosharding(1, { await messageCollector.waitForMessagesAutosharding(1, {
@ -233,8 +233,8 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () {
const pushResponse2 = await waku.lightPush.send(customEncoder2, { const pushResponse2 = await waku.lightPush.send(customEncoder2, {
payload: utf8ToBytes("M2") payload: utf8ToBytes("M2")
}); });
expect(pushResponse1.successes[0].toString()).to.eq(nimPeerId.toString()); expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString());
expect(pushResponse2.successes[0].toString()).to.eq(nimPeerId.toString()); expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString());
const messageCollector2 = new MessageCollector(nwaku); const messageCollector2 = new MessageCollector(nwaku);
@ -352,13 +352,13 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function ()
] ]
}; };
let nimPeerId: PeerId; let node1PeerId: PeerId;
beforeEachCustom(this, async () => { beforeEachCustom(this, async () => {
ctx = this.ctx; ctx = this.ctx;
[nwaku, waku] = await runNodes(ctx, testShardInfo); [nwaku, waku] = await runNodes(ctx, testShardInfo);
messageCollector = new MessageCollector(nwaku); messageCollector = new MessageCollector(nwaku);
nimPeerId = await nwaku.getPeerId(); node1PeerId = await nwaku.getPeerId();
}); });
afterEachCustom(this, async () => { afterEachCustom(this, async () => {
@ -370,7 +370,7 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function ()
payload: utf8ToBytes(messageText) payload: utf8ToBytes(messageText)
}); });
expect(pushResponse.successes[0].toString()).to.eq(nimPeerId.toString()); expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString());
expect( expect(
await messageCollector.waitForMessages(1, { await messageCollector.waitForMessages(1, {
@ -390,8 +390,8 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function ()
const pushResponse2 = await waku.lightPush.send(customEncoder2, { const pushResponse2 = await waku.lightPush.send(customEncoder2, {
payload: utf8ToBytes("M2") payload: utf8ToBytes("M2")
}); });
expect(pushResponse1.successes[0].toString()).to.eq(nimPeerId.toString()); expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString());
expect(pushResponse2.successes[0].toString()).to.eq(nimPeerId.toString()); expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString());
const messageCollector2 = new MessageCollector(nwaku); const messageCollector2 = new MessageCollector(nwaku);
@ -421,19 +421,27 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function ()
it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () { it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () {
// Set up and start a new nwaku node with Default PubsubTopic // Set up and start a new nwaku node with Default PubsubTopic
[nwaku2, waku2] = await runNodes(ctx, shardInfo2); [nwaku2] = await runNodes(ctx, shardInfo2);
await nwaku2.ensureSubscriptions([autoshardingPubsubTopic2]); await nwaku2.ensureSubscriptions([autoshardingPubsubTopic2]);
await waku.dial(await nwaku2.getMultiaddrWithId()); await waku.dial(await nwaku2.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.LightPush]); await waitForRemotePeer(waku, [Protocols.LightPush]);
const messageCollector2 = new MessageCollector(nwaku2); const messageCollector2 = new MessageCollector(nwaku2);
await waku.lightPush.send(customEncoder1, { const { failures: f1 } = await waku.lightPush.send(customEncoder1, {
payload: utf8ToBytes("M1") payload: utf8ToBytes("M1")
}); });
await waku.lightPush.send(customEncoder2, { const { failures: f2 } = await waku.lightPush.send(
payload: utf8ToBytes("M2") customEncoder2,
}); {
payload: utf8ToBytes("M2")
},
{ forceUseAllPeers: true }
);
expect(f1).to.be.empty;
expect(f2).to.be.empty;
await messageCollector.waitForMessages(1, { await messageCollector.waitForMessages(1, {
pubsubTopic: autoshardingPubsubTopic1 pubsubTopic: autoshardingPubsubTopic1

View File

@ -1,4 +1,3 @@
import { wakuFilter } from "@waku/sdk";
import { import {
bytesToUtf8, bytesToUtf8,
createEncoder, createEncoder,
@ -123,9 +122,9 @@ describe.skip("SDK: Creating by Content Topic", function () {
pubsubTopics: shardInfo.pubsubTopics pubsubTopics: shardInfo.pubsubTopics
}, },
await defaultLibp2p(shardInfo.shardInfo, undefined, {}, undefined), await defaultLibp2p(shardInfo.shardInfo, undefined, {}, undefined),
undefined, {
undefined, filter: true
wakuFilter({ pubsubTopics: shardInfo.pubsubTopics }) }
); );
await wakuContentTopic.subscribeToContentTopic( await wakuContentTopic.subscribeToContentTopic(
ContentTopic, ContentTopic,

View File

@ -304,10 +304,13 @@ describe("Waku Store, general", function () {
for await (const msg of query) { for await (const msg of query) {
if (msg) { if (msg) {
messages.push(msg as DecodedMessage); messages.push(msg as DecodedMessage);
console.log(bytesToUtf8(msg.payload!));
} }
} }
} }
console.log(messages.length);
// Messages are ordered from oldest to latest within a page (1 page query) // Messages are ordered from oldest to latest within a page (1 page query)
expect(bytesToUtf8(messages[0].payload!)).to.eq(asymText); expect(bytesToUtf8(messages[0].payload!)).to.eq(asymText);
expect(bytesToUtf8(messages[1].payload!)).to.eq(symText); expect(bytesToUtf8(messages[1].payload!)).to.eq(symText);

View File

@ -0,0 +1,3 @@
export async function delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

View File

@ -7,6 +7,7 @@ export * from "./is_size_valid.js";
export * from "./sharding.js"; export * from "./sharding.js";
export * from "./push_or_init_map.js"; export * from "./push_or_init_map.js";
export * from "./relay_shard_codec.js"; export * from "./relay_shard_codec.js";
export * from "./delay.js";
export function removeItemFromArray(arr: unknown[], value: unknown): unknown[] { export function removeItemFromArray(arr: unknown[], value: unknown): unknown[] {
const index = arr.indexOf(value); const index = arr.indexOf(value);