chore: trimming down BaseProtocolCore

chore: rm tests for remove internal util

chore: update HealthManager updates

chore: update tests

rm: only
This commit is contained in:
Danish Arora 2024-09-23 10:14:50 +05:30
parent 1aa57649ae
commit 31cf78af11
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E
5 changed files with 28 additions and 431 deletions

View File

@ -1,5 +1,5 @@
import type { Libp2p } from "@libp2p/interface";
import type { Peer, PeerStore, Stream } from "@libp2p/interface";
import type { Peer, Stream } from "@libp2p/interface";
import type {
IBaseProtocolCore,
Libp2pComponents,
@ -46,18 +46,15 @@ export class BaseProtocol implements IBaseProtocolCore {
return this.streamManager.getStream(peer);
}
public get peerStore(): PeerStore {
return this.components.peerStore;
}
//TODO: move to SDK
/**
* Returns known peers from the address book (`libp2p.peerStore`) that support
* the class protocol. Waku may or may not be currently connected to these
* peers.
*/
public async allPeers(): Promise<Peer[]> {
return getPeersForProtocol(this.peerStore, [this.multicodec]);
}
// public async allPeers(): Promise<Peer[]> {
// return getPeersForProtocol(this.peerStore, [this.multicodec]);
// }
public async connectedPeers(withOpenStreams = false): Promise<Peer[]> {
const peers = await this.allPeers();
@ -91,7 +88,7 @@ export class BaseProtocol implements IBaseProtocolCore {
const connectedPeersForProtocolAndShard =
await getConnectedPeersForProtocolAndShard(
this.components.connectionManager.getConnections(),
this.peerStore,
this.components.peerStore,
[this.multicodec],
pubsubTopicsToShardInfo(this.pubsubTopics)
);
@ -105,7 +102,7 @@ export class BaseProtocol implements IBaseProtocolCore {
// Sort the peers by latency
const sortedFilteredPeers = await sortPeersByLatency(
this.peerStore,
this.components.peerStore,
filteredPeers
);

View File

@ -1,6 +1,6 @@
import type { Libp2p } from "@libp2p/interface";
import type { PeerId } from "@libp2p/interface";
import type { Peer, PeerStore } from "@libp2p/interface";
import type { Peer } from "@libp2p/interface";
import type { CreateLibp2pOptions } from "./libp2p.js";
import type { IDecodedMessage } from "./message.js";

View File

@ -14,7 +14,6 @@ interface Options {
numPeersToUse?: number;
maintainPeersInterval?: number;
}
///TODO: update HealthManager
const DEFAULT_NUM_PEERS_TO_USE = 2;
const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000;

View File

@ -1,11 +1,14 @@
import { Peer, PeerId } from "@libp2p/interface";
import { ConnectionManager } from "@waku/core";
import { ConnectionManager, getHealthManager } from "@waku/core";
import { BaseProtocol } from "@waku/core/lib/base_protocol";
import { IHealthManager } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { Mutex } from "async-mutex";
export class PeerManager {
private peers: Map<string, Peer> = new Map();
private healthManager: IHealthManager;
private readMutex = new Mutex();
private writeMutex = new Mutex();
private writeLockHolder: string | null = null;
@ -14,7 +17,10 @@ export class PeerManager {
private readonly connectionManager: ConnectionManager,
private readonly core: BaseProtocol,
private readonly log: Logger
) {}
) {
this.healthManager = getHealthManager();
this.healthManager.updateProtocolHealth(this.core.multicodec, 0);
}
public getWriteLockHolder(): string | null {
return this.writeLockHolder;
@ -30,6 +36,10 @@ export class PeerManager {
await this.connectionManager.attemptDial(peer.id);
this.peers.set(peer.id.toString(), peer);
this.log.info(`Added and dialed peer: ${peer.id.toString()}`);
this.healthManager.updateProtocolHealth(
this.core.multicodec,
this.peers.size
);
this.writeLockHolder = null;
});
}
@ -39,6 +49,10 @@ export class PeerManager {
this.writeLockHolder = `removePeer: ${peerId.toString()}`;
this.peers.delete(peerId.toString());
this.log.info(`Removed peer: ${peerId.toString()}`);
this.healthManager.updateProtocolHealth(
this.core.multicodec,
this.peers.size
);
this.writeLockHolder = null;
});
}

View File

@ -1,22 +1,13 @@
import type { Connection, Peer, PeerStore } from "@libp2p/interface";
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import { LightPushCodec, waitForRemotePeer } from "@waku/core";
import {
ContentTopicInfo,
createLightNode,
Libp2pComponents,
type LightNode,
Protocols,
ShardInfo,
Tags,
utf8ToBytes
} from "@waku/sdk";
import {
encodeRelayShard,
ensureShardingConfigured,
shardInfoToPubsubTopics
} from "@waku/utils";
import { getConnectedPeersForProtocolAndShard } from "@waku/utils/libp2p";
import { encodeRelayShard } from "@waku/utils";
import { expect } from "chai";
import fc from "fast-check";
import Sinon from "sinon";
@ -24,414 +15,9 @@ import Sinon from "sinon";
import {
afterEachCustom,
beforeEachCustom,
DefaultTestShardInfo,
delay,
makeLogFileName,
ServiceNode,
tearDownNodes
DefaultTestShardInfo
} from "../src/index.js";
describe("getConnectedPeersForProtocolAndShard", function () {
let waku: LightNode;
let serviceNode1: ServiceNode;
let serviceNode2: ServiceNode;
const contentTopic = "/test/2/waku-light-push/utf8";
const autoshardingClusterId = 6;
beforeEachCustom(this, async () => {
serviceNode1 = new ServiceNode(makeLogFileName(this.ctx) + "1");
serviceNode2 = new ServiceNode(makeLogFileName(this.ctx) + "2");
});
afterEachCustom(this, async () => {
await tearDownNodes([serviceNode1, serviceNode2], waku);
});
it("same cluster, same shard: nodes connect", async function () {
this.timeout(15000);
const shardInfo: ShardInfo = {
clusterId: 2,
shards: [2]
};
await serviceNode1.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo),
lightpush: true,
relay: true
});
const serviceNodeMa = await serviceNode1.getMultiaddrWithId();
waku = await createLightNode({ networkConfig: shardInfo });
await waku.start();
await waku.libp2p.dialProtocol(serviceNodeMa, LightPushCodec);
await waitForRemotePeer(waku, [Protocols.LightPush]);
const peers = await getConnectedPeersForProtocolAndShard(
waku.libp2p.getConnections(),
waku.libp2p.peerStore,
waku.libp2p.getProtocols(),
ensureShardingConfigured(shardInfo).shardInfo
);
expect(peers.length).to.be.greaterThan(0);
});
it("same cluster, different shard: nodes don't connect", async function () {
this.timeout(15000);
const shardInfo1: ShardInfo = {
clusterId: 2,
shards: [1]
};
const shardInfo2: ShardInfo = {
clusterId: 2,
shards: [2]
};
// Separate shard
await serviceNode1.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo1.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo1),
lightpush: true,
relay: true
});
// Same shard
await serviceNode2.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo2.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo2),
lightpush: true,
relay: true
});
const serviceNode1Ma = await serviceNode1.getMultiaddrWithId();
const serviceNode2Ma = await serviceNode2.getMultiaddrWithId();
waku = await createLightNode({ networkConfig: shardInfo2 });
await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec);
await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec);
await waku.start();
await waitForRemotePeer(waku, [Protocols.LightPush]);
const peers = await getConnectedPeersForProtocolAndShard(
waku.libp2p.getConnections(),
waku.libp2p.peerStore,
waku.libp2p.getProtocols(),
ensureShardingConfigured(shardInfo2).shardInfo
);
expect(peers.length).to.be.equal(1);
});
it("different cluster, same shard: nodes don't connect", async function () {
this.timeout(15000);
const shardInfo1: ShardInfo = {
clusterId: 2,
shards: [1]
};
const shardInfo2: ShardInfo = {
clusterId: 3,
shards: [1]
};
// we start one node in a separate cluster
await serviceNode1.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo1.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo1),
lightpush: true,
relay: true
});
// and another node in the same cluster cluster as our node
await serviceNode2.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo2.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo2),
lightpush: true,
relay: true
});
const serviceNode1Ma = await serviceNode1.getMultiaddrWithId();
const serviceNode2Ma = await serviceNode2.getMultiaddrWithId();
waku = await createLightNode({ networkConfig: shardInfo2 });
await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec);
await delay(500);
await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec);
await waku.start();
await waitForRemotePeer(waku, [Protocols.LightPush]);
const peers = await getConnectedPeersForProtocolAndShard(
waku.libp2p.getConnections(),
waku.libp2p.peerStore,
waku.libp2p.getProtocols(),
shardInfo2
);
expect(peers.length).to.be.equal(1);
});
it("different cluster, different shard: nodes don't connect", async function () {
this.timeout(15000);
const shardInfo1: ShardInfo = {
clusterId: 2,
shards: [1]
};
const shardInfo2: ShardInfo = {
clusterId: 3,
shards: [2]
};
// we start one node in a separate cluster
await serviceNode1.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo1.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo1),
lightpush: true,
relay: true
});
// and another node in the same cluster cluster as our node
const serviceNode2 = new ServiceNode(makeLogFileName(this) + "2");
await serviceNode2.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo2.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo2),
lightpush: true,
relay: true
});
const serviceNodeMa1 = await serviceNode1.getMultiaddrWithId();
const serviceNodeMa2 = await serviceNode2.getMultiaddrWithId();
waku = await createLightNode({ networkConfig: shardInfo2 });
await waku.libp2p.dialProtocol(serviceNodeMa1, LightPushCodec);
await delay(500);
await waku.libp2p.dialProtocol(serviceNodeMa2, LightPushCodec);
await waku.start();
await waitForRemotePeer(waku, [Protocols.LightPush]);
const peers = await getConnectedPeersForProtocolAndShard(
waku.libp2p.getConnections(),
waku.libp2p.peerStore,
waku.libp2p.getProtocols(),
shardInfo2
);
expect(peers.length).to.be.equal(1);
});
it("same cluster, same shard: nodes connect (autosharding)", async function () {
this.timeout(15000);
const shardInfo: ContentTopicInfo = {
clusterId: autoshardingClusterId,
contentTopics: [contentTopic]
};
await serviceNode1.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo),
contentTopic: [contentTopic],
lightpush: true,
relay: true
});
const serviceNodeMa = await serviceNode1.getMultiaddrWithId();
waku = await createLightNode({ networkConfig: shardInfo });
await waku.start();
await waku.libp2p.dialProtocol(serviceNodeMa, LightPushCodec);
await waitForRemotePeer(waku, [Protocols.LightPush]);
const peers = await getConnectedPeersForProtocolAndShard(
waku.libp2p.getConnections(),
waku.libp2p.peerStore,
waku.libp2p.getProtocols(),
ensureShardingConfigured(shardInfo).shardInfo
);
expect(peers.length).to.be.greaterThan(0);
});
it("same cluster, different shard: nodes connect (autosharding)", async function () {
this.timeout(15000);
const shardInfo1: ContentTopicInfo = {
clusterId: autoshardingClusterId,
contentTopics: [contentTopic]
};
const shardInfo2: ContentTopicInfo = {
clusterId: autoshardingClusterId,
contentTopics: ["/test/5/waku-light-push/utf8"]
};
// Separate shard
await serviceNode1.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo1.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo1),
contentTopic: [contentTopic],
lightpush: true,
relay: true
});
// Same shard
await serviceNode2.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo2.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo2),
contentTopic: [contentTopic],
lightpush: true,
relay: true
});
const serviceNode1Ma = await serviceNode1.getMultiaddrWithId();
const serviceNode2Ma = await serviceNode2.getMultiaddrWithId();
waku = await createLightNode({ networkConfig: shardInfo2 });
await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec);
await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec);
await waku.start();
await waitForRemotePeer(waku, [Protocols.LightPush]);
const peers = await getConnectedPeersForProtocolAndShard(
waku.libp2p.getConnections(),
waku.libp2p.peerStore,
waku.libp2p.getProtocols(),
ensureShardingConfigured(shardInfo2).shardInfo
);
expect(peers.length).to.be.equal(1);
});
it("different cluster, same shard: nodes don't connect (autosharding)", async function () {
this.timeout(15000);
const shardInfo1: ContentTopicInfo = {
clusterId: autoshardingClusterId,
contentTopics: [contentTopic]
};
const shardInfo2: ContentTopicInfo = {
clusterId: 2,
contentTopics: [contentTopic]
};
// we start one node in a separate cluster
await serviceNode1.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo1.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo1),
contentTopic: [contentTopic],
lightpush: true,
relay: true
});
// and another node in the same cluster cluster as our node
await serviceNode2.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo2.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo2),
lightpush: true,
relay: true
});
const serviceNode1Ma = await serviceNode1.getMultiaddrWithId();
const serviceNode2Ma = await serviceNode2.getMultiaddrWithId();
waku = await createLightNode({ networkConfig: shardInfo2 });
await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec);
await delay(500);
await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec);
await waku.start();
await waitForRemotePeer(waku, [Protocols.LightPush]);
const peers = await getConnectedPeersForProtocolAndShard(
waku.libp2p.getConnections(),
waku.libp2p.peerStore,
waku.libp2p.getProtocols(),
ensureShardingConfigured(shardInfo2).shardInfo
);
expect(peers.length).to.be.equal(1);
});
it("different cluster, different shard: nodes don't connect (autosharding)", async function () {
this.timeout(15000);
const shardInfo1: ContentTopicInfo = {
clusterId: autoshardingClusterId,
contentTopics: [contentTopic]
};
const shardInfo2: ContentTopicInfo = {
clusterId: 2,
contentTopics: ["/test/5/waku-light-push/utf8"]
};
// we start one node in a separate cluster
await serviceNode1.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo1.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo1),
contentTopic: [contentTopic],
lightpush: true,
relay: true
});
// and another node in the same cluster cluster as our node
const serviceNode2 = new ServiceNode(makeLogFileName(this) + "2");
await serviceNode2.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo2.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo2),
lightpush: true,
relay: true
});
const serviceNodeMa1 = await serviceNode1.getMultiaddrWithId();
const serviceNodeMa2 = await serviceNode2.getMultiaddrWithId();
waku = await createLightNode({ networkConfig: shardInfo2 });
await waku.libp2p.dialProtocol(serviceNodeMa1, LightPushCodec);
await delay(500);
await waku.libp2p.dialProtocol(serviceNodeMa2, LightPushCodec);
await waku.start();
await waitForRemotePeer(waku, [Protocols.LightPush]);
const peers = await getConnectedPeersForProtocolAndShard(
waku.libp2p.getConnections(),
waku.libp2p.peerStore,
waku.libp2p.getProtocols(),
ensureShardingConfigured(shardInfo2).shardInfo
);
expect(peers.length).to.be.equal(1);
});
});
describe("getPeers", function () {
let peerStore: PeerStore;
let connectionManager: Libp2pComponents["connectionManager"];
@ -566,7 +152,8 @@ describe("getPeers", function () {
for (const peer of allPeers) {
connections.push({
status: "open",
remotePeer: peer.id
remotePeer: peer.id,
streams: [{ protocol: waku.lightPush.protocol.multicodec }]
} as unknown as Connection);
}
return connections;