feat(lightPush): improve peer usage and improve readability (#2155)

* fix comment of default number of peers

* export default number of peers from base protocol sdk

* rename to light_push, move class to separate file

* move waitForRemotePeer to sdk package

* add todo to move waitForGossipSubPeerInMesh into @waku/relay

* clean up waitForRemotePeer, split metadata await from event and optimise, decouple from protocol implementations

* simplify and rename ILightPush interface

* use only connected peers in light push based on connections instead of peer renewal mechanism

* improve readability of result processing in light push

* fix check & update tests

* address tests, add new test cases, fix racing condition in StreamManager

* use libp2p.getPeers
This commit is contained in:
Sasha 2024-10-04 10:50:58 +02:00 committed by GitHub
parent b93134a517
commit 1d68526e72
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
36 changed files with 309 additions and 236 deletions

4
package-lock.json generated
View File

@ -39089,7 +39089,6 @@
"it-all": "^3.0.4",
"it-length-prefixed": "^9.0.4",
"it-pipe": "^3.0.1",
"p-event": "^6.0.1",
"uint8arraylist": "^2.4.3",
"uuid": "^9.0.0"
},
@ -39410,7 +39409,8 @@
"@waku/message-hash": "0.1.16",
"@waku/proto": "^0.0.8",
"@waku/utils": "0.0.20",
"libp2p": "^1.8.1"
"libp2p": "^1.8.1",
"p-event": "^6.0.1"
},
"devDependencies": {
"@rollup/plugin-commonjs": "^25.0.7",

View File

@ -77,7 +77,6 @@
"it-all": "^3.0.4",
"it-length-prefixed": "^9.0.4",
"it-pipe": "^3.0.1",
"p-event": "^6.0.1",
"uint8arraylist": "^2.4.3",
"uuid": "^9.0.0"
},

View File

@ -13,9 +13,7 @@ export * as waku_light_push from "./lib/light_push/index.js";
export { LightPushCodec, LightPushCore } from "./lib/light_push/index.js";
export * as waku_store from "./lib/store/index.js";
export { StoreCore } from "./lib/store/index.js";
export { waitForRemotePeer } from "./lib/wait_for_remote_peer.js";
export { StoreCore, StoreCodec } from "./lib/store/index.js";
export { ConnectionManager } from "./lib/connection_manager.js";

View File

@ -86,6 +86,32 @@ describe("StreamManager", () => {
}
});
it("should return different streams if requested simultaniously", async () => {
const con1 = createMockConnection();
con1.streams = [createMockStream({ id: "1", protocol: MULTICODEC })];
const newStreamSpy = sinon.spy(async (_protocol, _options) =>
createMockStream({
id: "2",
protocol: MULTICODEC,
writeStatus: "writable"
})
);
con1.newStream = newStreamSpy;
streamManager["getConnections"] = (_peerId: PeerId | undefined) => [con1];
const [stream1, stream2] = await Promise.all([
streamManager.getStream(mockPeer),
streamManager.getStream(mockPeer)
]);
const expected = ["1", "2"].toString();
const actual = [stream1.id, stream2.id].sort().toString();
expect(actual).to.be.eq(expected);
});
it("peer:update - should do nothing if another protocol hit", async () => {
const scheduleNewStreamSpy = sinon.spy();
streamManager["scheduleNewStream"] = scheduleNewStreamSpy;
@ -156,6 +182,7 @@ function createMockStream(options: MockStreamOptions): Stream {
return {
id: options.id,
protocol: options.protocol,
writeStatus: options.writeStatus || "ready"
writeStatus: options.writeStatus || "ready",
metadata: {}
} as Stream;
}

View File

@ -4,6 +4,8 @@ import { Logger } from "@waku/utils";
import { selectOpenConnection } from "./utils.js";
const STREAM_LOCK_KEY = "consumed";
export class StreamManager {
private readonly log: Logger;
@ -29,16 +31,20 @@ export class StreamManager {
await scheduledStream;
}
const stream = this.getOpenStreamForCodec(peer.id);
let stream = this.getOpenStreamForCodec(peer.id);
if (stream) {
this.log.info(
`Found existing stream peerId=${peer.id.toString()} multicodec=${this.multicodec}`
);
this.lockStream(peer.id.toString(), stream);
return stream;
}
return this.createStream(peer);
stream = await this.createStream(peer);
this.lockStream(peer.id.toString(), stream);
return stream;
}
private async createStream(peer: Peer, retries = 0): Promise<Stream> {
@ -142,13 +148,26 @@ export class StreamManager {
(s) => s.protocol === this.multicodec
);
if (!stream) {
return;
}
const isStreamUnusable = ["done", "closed", "closing"].includes(
stream?.writeStatus || ""
stream.writeStatus || ""
);
if (isStreamUnusable) {
if (isStreamUnusable || this.isStreamLocked(stream)) {
return;
}
return stream;
}
private lockStream(peerId: string, stream: Stream): void {
this.log.info(`Locking stream for peerId:${peerId}\tstreamId:${stream.id}`);
stream.metadata[STREAM_LOCK_KEY] = true;
}
private isStreamLocked(stream: Stream): boolean {
return !!stream.metadata[STREAM_LOCK_KEY];
}
}

View File

@ -1,5 +1,5 @@
import { IBaseProtocolCore, IBaseProtocolSDK } from "./protocols.js";
import type { ISender } from "./sender.js";
export type ILightPushSDK = ISender &
export type ILightPush = ISender &
IBaseProtocolSDK & { protocol: IBaseProtocolCore };

View File

@ -102,7 +102,7 @@ export type ProtocolCreateOptions = {
* This is used by:
* - Light Push to send messages,
* - Filter to retrieve messages.
* Defaults to 3.
* Defaults to 2.
*/
numPeersToUse?: number;
/**

View File

@ -5,7 +5,7 @@ import { IConnectionManager } from "./connection_manager.js";
import type { IFilterSDK } from "./filter.js";
import { IHealthManager } from "./health_manager.js";
import type { Libp2p } from "./libp2p.js";
import type { ILightPushSDK } from "./light_push.js";
import type { ILightPush } from "./light_push.js";
import { Protocols } from "./protocols.js";
import type { IRelay } from "./relay.js";
import type { IStoreSDK } from "./store.js";
@ -15,7 +15,7 @@ export interface Waku {
relay?: IRelay;
store?: IStoreSDK;
filter?: IFilterSDK;
lightPush?: ILightPushSDK;
lightPush?: ILightPush;
connectionManager: IConnectionManager;
@ -36,7 +36,7 @@ export interface LightNode extends Waku {
relay: undefined;
store: IStoreSDK;
filter: IFilterSDK;
lightPush: ILightPushSDK;
lightPush: ILightPush;
}
export interface RelayNode extends Waku {

View File

@ -70,7 +70,8 @@
"@waku/proto": "^0.0.8",
"@waku/utils": "0.0.20",
"@waku/message-hash": "0.1.16",
"libp2p": "^1.8.1"
"libp2p": "^1.8.1",
"p-event": "^6.0.1"
},
"devDependencies": {
"@rollup/plugin-commonjs": "^25.0.7",

View File

@ -1,4 +1,4 @@
export { waitForRemotePeer, createEncoder, createDecoder } from "@waku/core";
export { createEncoder, createDecoder } from "@waku/core";
export {
DecodedMessage,
Decoder,
@ -14,10 +14,12 @@ export {
defaultLibp2p,
createLibp2pAndUpdateOptions
} from "./create/index.js";
export { wakuLightPush } from "./protocols/lightpush/index.js";
export { wakuLightPush } from "./protocols/light_push/index.js";
export { wakuFilter } from "./protocols/filter/index.js";
export { wakuStore } from "./protocols/store/index.js";
export { waitForRemotePeer } from "./wait_for_remote_peer.js";
export * as waku from "@waku/core";
export * as utils from "@waku/utils";
export * from "@waku/interfaces";

View File

@ -14,11 +14,11 @@ interface Options {
}
const RENEW_TIME_LOCK_DURATION = 30 * 1000;
const DEFAULT_NUM_PEERS_TO_USE = 2;
export const DEFAULT_NUM_PEERS_TO_USE = 2;
const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000;
export class BaseProtocolSDK implements IBaseProtocolSDK {
private healthManager: IHealthManager;
protected healthManager: IHealthManager;
public readonly numPeersToUse: number;
private peers: Peer[] = [];
private maintainPeersIntervalId: ReturnType<

View File

@ -0,0 +1 @@
export { wakuLightPush } from "./light_push.js";

View File

@ -1,9 +1,9 @@
import type { PeerId } from "@libp2p/interface";
import { ConnectionManager, LightPushCore } from "@waku/core";
import type { Peer, PeerId } from "@libp2p/interface";
import { ConnectionManager, LightPushCodec, LightPushCore } from "@waku/core";
import {
Failure,
type IEncoder,
ILightPushSDK,
ILightPush,
type IMessage,
type Libp2p,
type ProtocolCreateOptions,
@ -19,14 +19,14 @@ import { BaseProtocolSDK } from "../base_protocol.js";
const log = new Logger("sdk:light-push");
class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
class LightPush extends BaseProtocolSDK implements ILightPush {
public readonly protocol: LightPushCore;
private readonly reliabilityMonitor: SenderReliabilityMonitor;
public constructor(
connectionManager: ConnectionManager,
libp2p: Libp2p,
private libp2p: Libp2p,
options?: ProtocolCreateOptions
) {
super(
@ -49,11 +49,6 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
message: IMessage,
_options?: ProtocolUseOptions
): Promise<SDKProtocolResult> {
const options = {
autoRetry: true,
..._options
} as ProtocolUseOptions;
const successes: PeerId[] = [];
const failures: Failure[] = [];
@ -63,17 +58,17 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
} catch (error) {
log.error("Failed to send waku light push: pubsub topic not configured");
return {
successes,
failures: [
{
error: ProtocolError.TOPIC_NOT_CONFIGURED
}
],
successes: []
]
};
}
const hasPeers = await this.hasPeers(options);
if (!hasPeers) {
const peers = await this.getConnectedPeers();
if (peers.length === 0) {
return {
successes,
failures: [
@ -84,53 +79,75 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
};
}
const sendPromises = this.connectedPeers.map((peer) =>
this.protocol.send(encoder, message, peer)
const results = await Promise.allSettled(
peers.map((peer) => this.protocol.send(encoder, message, peer))
);
const results = await Promise.allSettled(sendPromises);
for (const result of results) {
if (result.status === "fulfilled") {
const { failure, success } = result.value;
if (success) {
successes.push(success);
}
if (failure) {
failures.push(failure);
if (failure.peerId) {
const peer = this.connectedPeers.find((connectedPeer) =>
connectedPeer.id.equals(failure.peerId)
);
if (peer) {
log.info(`
Failed to send message to peer ${failure.peerId}.
Retrying the message with the same peer in the background.
If this fails, the peer will be renewed.
`);
void this.reliabilityMonitor.attemptRetriesOrRenew(
failure.peerId,
() => this.protocol.send(encoder, message, peer)
);
}
}
}
} else {
if (result.status !== "fulfilled") {
log.error("Failed unexpectedly while sending:", result.reason);
failures.push({ error: ProtocolError.GENERIC_FAIL });
continue;
}
const { failure, success } = result.value;
if (success) {
successes.push(success);
continue;
}
if (failure) {
failures.push(failure);
const connectedPeer = this.connectedPeers.find((connectedPeer) =>
connectedPeer.id.equals(failure.peerId)
);
if (connectedPeer) {
void this.reliabilityMonitor.attemptRetriesOrRenew(
connectedPeer.id,
() => this.protocol.send(encoder, message, connectedPeer)
);
}
}
}
this.healthManager.updateProtocolHealth(LightPushCodec, successes.length);
return {
successes,
failures
};
}
private async getConnectedPeers(): Promise<Peer[]> {
const peerIDs = this.libp2p.getPeers();
if (peerIDs.length === 0) {
return [];
}
const peers = await Promise.all(
peerIDs.map(async (id) => {
try {
return await this.libp2p.peerStore.get(id);
} catch (e) {
return null;
}
})
);
return peers
.filter((p) => !!p)
.filter((p) => (p as Peer).protocols.includes(LightPushCodec))
.slice(0, this.numPeersToUse) as Peer[];
}
}
export function wakuLightPush(
connectionManager: ConnectionManager,
init: Partial<ProtocolCreateOptions> = {}
): (libp2p: Libp2p) => ILightPushSDK {
return (libp2p: Libp2p) => new LightPushSDK(connectionManager, libp2p, init);
): (libp2p: Libp2p) => ILightPush {
return (libp2p: Libp2p) => new LightPush(connectionManager, libp2p, init);
}

View File

@ -1,16 +1,12 @@
import type { IdentifyResult } from "@libp2p/interface";
import type {
IBaseProtocolCore,
IMetadata,
IRelay,
Waku
} from "@waku/interfaces";
import { FilterCodecs, LightPushCodec, StoreCodec } from "@waku/core";
import type { IRelay, Libp2p, Waku } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { pEvent } from "p-event";
const log = new Logger("wait-for-remote-peer");
//TODO: move this function within the Waku class: https://github.com/waku-org/js-waku/issues/1761
/**
* Wait for a remote peer to be ready given the passed protocols.
* Must be used after attempting to connect to nodes, using
@ -36,42 +32,48 @@ export async function waitForRemotePeer(
timeoutMs?: number
): Promise<void> {
protocols = protocols ?? getEnabledProtocols(waku);
const connections = waku.libp2p.getConnections();
if (!waku.isStarted()) return Promise.reject("Waku node is not started");
if (!waku.isStarted()) {
throw Error("Waku node is not started");
}
if (connections.length > 0 && !protocols.includes(Protocols.Relay)) {
const success = await waitForMetadata(waku.libp2p);
if (success) {
return;
}
}
const promises = [];
if (protocols.includes(Protocols.Relay)) {
if (!waku.relay)
throw new Error("Cannot wait for Relay peer: protocol not mounted");
if (!waku.relay) {
throw Error("Cannot wait for Relay peer: protocol not mounted");
}
promises.push(waitForGossipSubPeerInMesh(waku.relay));
}
if (protocols.includes(Protocols.Store)) {
if (!waku.store)
throw new Error("Cannot wait for Store peer: protocol not mounted");
promises.push(
waitForConnectedPeer(waku.store.protocol, waku.libp2p.services.metadata)
);
if (!waku.store) {
throw Error("Cannot wait for Store peer: protocol not mounted");
}
promises.push(waitForConnectedPeer(StoreCodec, waku.libp2p));
}
if (protocols.includes(Protocols.LightPush)) {
if (!waku.lightPush)
throw new Error("Cannot wait for LightPush peer: protocol not mounted");
promises.push(
waitForConnectedPeer(
waku.lightPush.protocol,
waku.libp2p.services.metadata
)
);
if (!waku.lightPush) {
throw Error("Cannot wait for LightPush peer: protocol not mounted");
}
promises.push(waitForConnectedPeer(LightPushCodec, waku.libp2p));
}
if (protocols.includes(Protocols.Filter)) {
if (!waku.filter)
if (!waku.filter) {
throw new Error("Cannot wait for Filter peer: protocol not mounted");
promises.push(
waitForConnectedPeer(waku.filter.protocol, waku.libp2p.services.metadata)
);
}
promises.push(waitForConnectedPeer(FilterCodecs.SUBSCRIBE, waku.libp2p));
}
if (timeoutMs) {
@ -85,71 +87,87 @@ export async function waitForRemotePeer(
}
}
//TODO: move this function within protocol SDK class: https://github.com/waku-org/js-waku/issues/1761
type EventListener = (_: CustomEvent<IdentifyResult>) => void;
/**
* Wait for a peer with the given protocol to be connected.
* If sharding is enabled on the node, it will also wait for the peer to be confirmed by the metadata service.
*/
async function waitForConnectedPeer(
protocol: IBaseProtocolCore,
metadataService?: IMetadata
codec: string,
libp2p: Libp2p
): Promise<void> {
const codec = protocol.multicodec;
const peers = await protocol.connectedPeers();
log.info(`Waiting for ${codec} peer.`);
if (peers.length) {
if (!metadataService) {
log.info(`${codec} peer found: `, peers[0].id.toString());
return;
}
// once a peer is connected, we need to confirm the metadata handshake with at least one of those peers if sharding is enabled
try {
await Promise.any(
peers.map((peer) => metadataService.confirmOrAttemptHandshake(peer.id))
);
return;
} catch (e) {
if ((e as any).code === "ERR_CONNECTION_BEING_CLOSED")
log.error(
`Connection with the peer was closed and possibly because it's on a different shard. Error: ${e}`
);
log.error(`Error waiting for handshake confirmation: ${e}`);
}
}
log.info(`Waiting for ${codec} peer`);
// else we'll just wait for the next peer to connect
await new Promise<void>((resolve) => {
const cb = (evt: CustomEvent<IdentifyResult>): void => {
const cb = (async (evt: CustomEvent<IdentifyResult>): Promise<void> => {
if (evt.detail?.protocols?.includes(codec)) {
if (metadataService) {
metadataService
.confirmOrAttemptHandshake(evt.detail.peerId)
.then(() => {
protocol.removeLibp2pEventListener("peer:identify", cb);
resolve();
})
.catch((e) => {
if (e.code === "ERR_CONNECTION_BEING_CLOSED")
log.error(
`Connection with the peer was closed and possibly because it's on a different shard. Error: ${e}`
);
const metadataService = libp2p.services.metadata;
log.error(`Error waiting for handshake confirmation: ${e}`);
});
} else {
protocol.removeLibp2pEventListener("peer:identify", cb);
if (!metadataService) {
libp2p.removeEventListener("peer:identify", cb);
resolve();
return;
}
try {
await metadataService.confirmOrAttemptHandshake(evt.detail.peerId);
libp2p.removeEventListener("peer:identify", cb);
resolve();
} catch (e) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
if ((e as any).code === "ERR_CONNECTION_BEING_CLOSED") {
log.error(
"Connection closed. Some peers can be on different shard."
);
}
log.error(`Error waiting for metadata: ${e}`);
}
}
};
protocol.addLibp2pEventListener("peer:identify", cb);
}) as EventListener;
libp2p.addEventListener("peer:identify", cb);
});
}
/**
* Waits for the metadata from the remote peer.
*/
async function waitForMetadata(libp2p: Libp2p): Promise<boolean> {
const connections = libp2p.getConnections();
const metadataService = libp2p.services.metadata;
if (!connections.length || !metadataService) {
log.info(
`Skipping waitForMetadata due to missing connections:${connections.length} or metadataService:${!!metadataService}`
);
return false;
}
try {
// confirm at least with one connected peer
await Promise.any(
connections
.map((c) => c.remotePeer)
.map((peer) => metadataService.confirmOrAttemptHandshake(peer))
);
return true;
} catch (e) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
if ((e as any).code === "ERR_CONNECTION_BEING_CLOSED") {
log.error("Connection closed. Some peers can be on different shard.");
}
log.error(`Error waiting for metadata: ${e}`);
}
return false;
}
// TODO: move to @waku/relay and use in `node.connect()` API https://github.com/waku-org/js-waku/issues/1761
/**
* Wait for at least one peer with the given protocol to be connected and in the gossipsub
* mesh for all pubsubTopics.

View File

@ -5,7 +5,7 @@ import { ConnectionManager, getHealthManager } from "@waku/core";
import type {
IFilterSDK,
IHealthManager,
ILightPushSDK,
ILightPush,
IRelay,
IStoreSDK,
Libp2p,
@ -17,7 +17,7 @@ import { Protocols } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { wakuFilter } from "./protocols/filter/index.js";
import { wakuLightPush } from "./protocols/lightpush/index.js";
import { wakuLightPush } from "./protocols/light_push/index.js";
import { wakuStore } from "./protocols/store/index.js";
import { ReliabilityMonitorManager } from "./reliability_monitor/index.js";
@ -64,7 +64,7 @@ export class WakuNode implements Waku {
public relay?: IRelay;
public store?: IStoreSDK;
public filter?: IFilterSDK;
public lightPush?: ILightPushSDK;
public lightPush?: ILightPush;
public connectionManager: ConnectionManager;
public readonly health: IHealthManager;

View File

@ -1,11 +1,10 @@
import { waitForRemotePeer } from "@waku/core";
import {
NetworkConfig,
ProtocolCreateOptions,
Protocols
} from "@waku/interfaces";
import { createRelayNode } from "@waku/relay";
import { createLightNode, WakuNode } from "@waku/sdk";
import { createLightNode, waitForRemotePeer, WakuNode } from "@waku/sdk";
import {
derivePubsubTopicsFromNetworkConfig,
Logger,

View File

@ -1,4 +1,3 @@
import { waitForRemotePeer } from "@waku/core";
import {
DefaultNetworkConfig,
LightNode,
@ -7,7 +6,7 @@ import {
Protocols,
Waku
} from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { createLightNode, waitForRemotePeer } from "@waku/sdk";
import { derivePubsubTopicsFromNetworkConfig, isDefined } from "@waku/utils";
import { Context } from "mocha";
import pRetry from "p-retry";

View File

@ -1,8 +1,8 @@
import { waitForRemotePeer } from "@waku/core";
import { EnrDecoder } from "@waku/enr";
import type { RelayNode } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { createRelayNode } from "@waku/relay";
import { waitForRemotePeer } from "@waku/sdk";
import { expect } from "chai";
import {

View File

@ -1,9 +1,4 @@
import {
createDecoder,
createEncoder,
DecodedMessage,
waitForRemotePeer
} from "@waku/core";
import { createDecoder, createEncoder, DecodedMessage } from "@waku/core";
import { Protocols } from "@waku/interfaces";
import type { LightNode } from "@waku/interfaces";
import {
@ -19,7 +14,7 @@ import {
createDecoder as createSymDecoder,
createEncoder as createSymEncoder
} from "@waku/message-encryption/symmetric";
import { createLightNode } from "@waku/sdk";
import { createLightNode, waitForRemotePeer } from "@waku/sdk";
import { contentTopicToPubsubTopic, Logger } from "@waku/utils";
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";

View File

@ -275,8 +275,5 @@ describe("Waku Filter: Peer Management: E2E", function () {
await sendMessage();
expect(waku.filter.connectedPeers.length).to.equal(2);
expect(
waku.filter.connectedPeers.map((p) => p.id.toString())
).to.not.include(nodeWithouDiscoveryPeerIdStr);
});
});

View File

@ -1,6 +1,5 @@
import { waitForRemotePeer } from "@waku/core";
import { LightNode, Protocols } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk";
import { utf8ToBytes, waitForRemotePeer } from "@waku/sdk";
import { expect } from "chai";
import {

View File

@ -1,4 +1,4 @@
import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core";
import { createDecoder, createEncoder } from "@waku/core";
import type {
ContentTopicInfo,
LightNode,
@ -6,6 +6,7 @@ import type {
SingleShardInfo
} from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { waitForRemotePeer } from "@waku/sdk";
import {
contentTopicToPubsubTopic,
contentTopicToShardIndex,

View File

@ -1,6 +1,5 @@
import { waitForRemotePeer } from "@waku/core";
import { LightNode, Protocols } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk";
import { utf8ToBytes, waitForRemotePeer } from "@waku/sdk";
import { expect } from "chai";
import {

View File

@ -1,4 +1,4 @@
import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core";
import { createDecoder, createEncoder } from "@waku/core";
import { LightNode, Protocols } from "@waku/interfaces";
import {
ecies,
@ -7,7 +7,7 @@ import {
getPublicKey,
symmetric
} from "@waku/message-encryption";
import { utf8ToBytes } from "@waku/sdk";
import { utf8ToBytes, waitForRemotePeer } from "@waku/sdk";
import { expect } from "chai";
import type { Context } from "mocha";

View File

@ -1,4 +1,4 @@
import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core";
import { createDecoder, createEncoder } from "@waku/core";
import {
DefaultNetworkConfig,
ISubscriptionSDK,
@ -8,7 +8,7 @@ import {
Protocols,
Waku
} from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { createLightNode, waitForRemotePeer } from "@waku/sdk";
import {
contentTopicToPubsubTopic,
derivePubsubTopicsFromNetworkConfig,

View File

@ -1,6 +1,6 @@
import type { Connection, Peer, PeerStore } from "@libp2p/interface";
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import { LightPushCodec, waitForRemotePeer } from "@waku/core";
import { LightPushCodec } from "@waku/core";
import {
ContentTopicInfo,
createLightNode,
@ -9,7 +9,8 @@ import {
Protocols,
ShardInfo,
Tags,
utf8ToBytes
utf8ToBytes,
waitForRemotePeer
} from "@waku/sdk";
import {
encodeRelayShard,

View File

@ -1,4 +1,4 @@
import { HealthStatus, LightNode, Protocols } from "@waku/interfaces";
import { HealthStatus, LightNode, Protocols, Waku } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { shardInfoToPubsubTopics } from "@waku/utils";
import { expect } from "chai";
@ -34,8 +34,7 @@ describe("Node Health Status Matrix Tests", function () {
peerCounts.forEach((lightPushPeers) => {
peerCounts.forEach((filterPeers) => {
const expectedHealth = getExpectedNodeHealth(lightPushPeers, filterPeers);
it(`LightPush: ${lightPushPeers} peers, Filter: ${filterPeers} peers - Expected: ${expectedHealth}`, async function () {
it(`LightPush: ${lightPushPeers} peers, Filter: ${filterPeers} peers`, async function () {
this.timeout(10_000);
[waku, serviceNodes] = await setupTestEnvironment(
@ -59,6 +58,10 @@ describe("Node Health Status Matrix Tests", function () {
);
const filterHealth = waku.health.getProtocolStatus(Protocols.Filter);
lightPushPeers = await getPeerCounBasedOnConnections(
waku,
waku.lightPush.protocol.multicodec
);
expect(lightPushHealth?.status).to.equal(
getExpectedProtocolStatus(lightPushPeers)
);
@ -66,6 +69,10 @@ describe("Node Health Status Matrix Tests", function () {
getExpectedProtocolStatus(filterPeers)
);
const expectedHealth = getExpectedNodeHealth(
lightPushPeers,
filterPeers
);
const nodeHealth = waku.health.getHealthStatus();
expect(nodeHealth).to.equal(expectedHealth);
});
@ -79,6 +86,21 @@ function getExpectedProtocolStatus(peerCount: number): HealthStatus {
return HealthStatus.SufficientlyHealthy;
}
async function getPeerCounBasedOnConnections(
waku: Waku,
codec: string
): Promise<number> {
const peerIDs = waku.libp2p
.getConnections()
.map((c) => c.remotePeer.toString());
const peers = await waku.libp2p.peerStore.all();
return peers
.filter((peer) => peerIDs.includes(peer.id.toString()))
.filter((peer) => peer.protocols.includes(codec)).length;
}
function getExpectedNodeHealth(
lightPushPeers: number,
filterPeers: number

View File

@ -1,6 +1,5 @@
import { LightNode } from "@waku/interfaces";
import { createEncoder, utf8ToBytes } from "@waku/sdk";
import { delay } from "@waku/utils";
import { expect } from "chai";
import { describe } from "mocha";
@ -15,7 +14,7 @@ import {
} from "../../src/index.js";
import { TestContentTopic } from "../filter/utils.js";
describe("Waku Light Push: Peer Management: E2E", function () {
describe("Waku Light Push: Connection Management: E2E", function () {
this.timeout(15000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
@ -39,64 +38,44 @@ describe("Waku Light Push: Peer Management: E2E", function () {
contentTopic: TestContentTopic
});
it("Number of peers are maintained correctly", async function () {
it("should push to needed amount of connections", async function () {
const { successes, failures } = await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello_World")
});
expect(successes.length).to.be.greaterThan(0);
expect(successes.length).to.be.equal(waku.lightPush.numPeersToUse);
if (failures) {
expect(failures.length).to.equal(0);
}
expect(failures?.length || 0).to.equal(0);
});
it("Failed peers are renewed", async function () {
// send a lightpush request -- should have all successes
const response1 = await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello_World")
});
expect(response1.successes.length).to.be.equal(
waku.lightPush.numPeersToUse
it("should push to available amount of connection if less than required", async function () {
const connections = waku.libp2p.getConnections();
await Promise.all(
connections
.slice(0, connections.length - 1)
.map((c) => waku.connectionManager.dropConnection(c.remotePeer))
);
if (response1.failures) {
expect(response1.failures.length).to.equal(0);
}
// disconnect from one peer to force a failure
const peerToDisconnect = response1.successes[0];
await waku.connectionManager.dropConnection(peerToDisconnect);
// send another lightpush request -- should have all successes except the one that was disconnected
const response2 = await waku.lightPush.send(encoder, {
const { successes, failures } = await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello_World")
});
// check that the peer that was disconnected is not in the new successes
expect(response2.successes).to.not.include(peerToDisconnect);
expect(response2.failures).to.have.length(1);
expect(response2.failures?.[0].peerId).to.equal(peerToDisconnect);
expect(successes.length).to.be.equal(1);
expect(failures?.length || 0).to.equal(0);
});
// send another lightpush request
// reattempts to send should be triggerred
// then renewal should happen
// so one failure should exist
const response3 = await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello_World")
});
// wait for reattempts to finish as they are async and not awaited
await delay(500);
// doing -1 because the peer that was disconnected is not in the successes
expect(response3.successes.length).to.be.equal(
waku.lightPush.numPeersToUse - 1
it("should fail to send if no connections available", async function () {
const connections = waku.libp2p.getConnections();
await Promise.all(
connections.map((c) =>
waku.connectionManager.dropConnection(c.remotePeer)
)
);
// and exists in failure instead
expect(response3.failures).to.have.length(1);
expect(response3.successes).to.not.include(peerToDisconnect);
const { successes, failures } = await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello_World")
});
expect(successes.length).to.be.equal(0);
expect(failures?.length).to.equal(1);
});
});

View File

@ -1,5 +1,5 @@
import type { PeerId } from "@libp2p/interface";
import { createEncoder, waitForRemotePeer } from "@waku/core";
import { createEncoder } from "@waku/core";
import {
ContentTopicInfo,
LightNode,
@ -8,6 +8,7 @@ import {
ShardInfo,
SingleShardInfo
} from "@waku/interfaces";
import { waitForRemotePeer } from "@waku/sdk";
import {
contentTopicToPubsubTopic,
contentTopicToShardIndex,

View File

@ -1,7 +1,8 @@
import type { PeerId } from "@libp2p/interface";
import { DecodedMessage, waitForRemotePeer } from "@waku/core";
import { DecodedMessage } from "@waku/core";
import { Protocols, RelayNode } from "@waku/interfaces";
import { createRelayNode } from "@waku/relay";
import { waitForRemotePeer } from "@waku/sdk";
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";

View File

@ -1,9 +1,4 @@
import {
createDecoder,
createEncoder,
DecodedMessage,
waitForRemotePeer
} from "@waku/core";
import { createDecoder, createEncoder, DecodedMessage } from "@waku/core";
import {
ContentTopicInfo,
RelayNode,
@ -12,6 +7,7 @@ import {
} from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { createRelayNode } from "@waku/relay";
import { waitForRemotePeer } from "@waku/sdk";
import {
contentTopicToPubsubTopic,
pubsubTopicToSingleShardInfo,

View File

@ -1,4 +1,4 @@
import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core";
import { createDecoder, createEncoder } from "@waku/core";
import {
NetworkConfig,
Protocols,
@ -6,6 +6,7 @@ import {
ShardInfo
} from "@waku/interfaces";
import { createRelayNode } from "@waku/relay";
import { waitForRemotePeer } from "@waku/sdk";
import { contentTopicToPubsubTopic, Logger } from "@waku/utils";
import { Context } from "mocha";

View File

@ -1,4 +1,4 @@
import { createDecoder, DecodedMessage, waitForRemotePeer } from "@waku/core";
import { createDecoder, DecodedMessage } from "@waku/core";
import type { IMessage, LightNode } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import {
@ -14,6 +14,7 @@ import {
createDecoder as createSymDecoder,
createEncoder as createSymEncoder
} from "@waku/message-encryption/symmetric";
import { waitForRemotePeer } from "@waku/sdk";
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import { equals } from "uint8arrays/equals";

View File

@ -1,6 +1,6 @@
import { createDecoder, waitForRemotePeer } from "@waku/core";
import { createDecoder } from "@waku/core";
import type { ContentTopicInfo, IMessage, LightNode } from "@waku/interfaces";
import { createLightNode, Protocols } from "@waku/sdk";
import { createLightNode, Protocols, waitForRemotePeer } from "@waku/sdk";
import {
contentTopicToPubsubTopic,
pubsubTopicToSingleShardInfo

View File

@ -1,8 +1,7 @@
import { waitForRemotePeer } from "@waku/core";
import type { LightNode, RelayNode } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { createRelayNode } from "@waku/relay";
import { createLightNode } from "@waku/sdk";
import { createLightNode, waitForRemotePeer } from "@waku/sdk";
import { expect } from "chai";
import {

View File

@ -1,6 +1,6 @@
import { bootstrap } from "@libp2p/bootstrap";
import type { PeerId } from "@libp2p/interface";
import { DecodedMessage, waitForRemotePeer } from "@waku/core";
import { DecodedMessage } from "@waku/core";
import type { LightNode, RelayNode, Waku } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { generateSymmetricKey } from "@waku/message-encryption";
@ -12,7 +12,8 @@ import { createRelayNode } from "@waku/relay";
import {
createLightNode,
createEncoder as createPlainEncoder,
DefaultUserAgent
DefaultUserAgent,
waitForRemotePeer
} from "@waku/sdk";
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";