diff --git a/packages/core/src/lib/connection_manager/connection_manager.spec.ts b/packages/core/src/lib/connection_manager/connection_manager.spec.ts index ec1d53921f..45d64781f6 100644 --- a/packages/core/src/lib/connection_manager/connection_manager.spec.ts +++ b/packages/core/src/lib/connection_manager/connection_manager.spec.ts @@ -15,7 +15,7 @@ import { ConnectionManager } from "./connection_manager.js"; import { DiscoveryDialer } from "./discovery_dialer.js"; import { KeepAliveManager } from "./keep_alive_manager.js"; import { NetworkMonitor } from "./network_monitor.js"; -import { ShardReader } from "./shard_reader.js"; +import { IShardReader, ShardReader } from "./shard_reader.js"; describe("ConnectionManager", () => { let libp2p: Libp2p; @@ -30,7 +30,7 @@ describe("ConnectionManager", () => { // Mock internal components let mockKeepAliveManager: sinon.SinonStubbedInstance; let mockDiscoveryDialer: sinon.SinonStubbedInstance; - let mockShardReader: sinon.SinonStubbedInstance; + let mockShardReader: sinon.SinonStubbedInstance; let mockNetworkMonitor: sinon.SinonStubbedInstance; let mockConnectionLimiter: sinon.SinonStubbedInstance; @@ -87,7 +87,7 @@ describe("ConnectionManager", () => { mockShardReader = { isPeerOnTopic: sinon.stub().resolves(true) - } as unknown as sinon.SinonStubbedInstance; + } as unknown as sinon.SinonStubbedInstance; mockNetworkMonitor = { start: sinon.stub(), diff --git a/packages/core/src/lib/connection_manager/connection_manager.ts b/packages/core/src/lib/connection_manager/connection_manager.ts index 0f3f83e159..f5d6ded196 100644 --- a/packages/core/src/lib/connection_manager/connection_manager.ts +++ b/packages/core/src/lib/connection_manager/connection_manager.ts @@ -1,7 +1,6 @@ import { type Peer, type PeerId, type Stream } from "@libp2p/interface"; import { MultiaddrInput } from "@multiformats/multiaddr"; import { - ClusterId, ConnectionManagerOptions, IConnectionManager, IRelay, @@ -47,7 +46,7 @@ export class ConnectionManager implements IConnectionManager { private readonly networkMonitor: NetworkMonitor; private readonly connectionLimiter: ConnectionLimiter; - private options: ConnectionManagerOptions; + private readonly options: ConnectionManagerOptions; private libp2p: Libp2p; public constructor(options: ConnectionManagerConstructorOptions) { @@ -200,9 +199,8 @@ export class ConnectionManager implements IConnectionManager { public async isPeerOnShard( peerId: PeerId, - clusterId: ClusterId, shardId: ShardId ): Promise { - return this.shardReader.isPeerOnShard(peerId, clusterId, shardId); + return this.shardReader.isPeerOnShard(peerId, shardId); } } diff --git a/packages/core/src/lib/connection_manager/shard_reader.spec.ts b/packages/core/src/lib/connection_manager/shard_reader.spec.ts index 30bb19ac26..79f058a1d7 100644 --- a/packages/core/src/lib/connection_manager/shard_reader.spec.ts +++ b/packages/core/src/lib/connection_manager/shard_reader.spec.ts @@ -177,7 +177,6 @@ describe("ShardReader", function () { const result = await shardReader.isPeerOnShard( testPeerId, - testClusterId, testShardIndex ); @@ -192,9 +191,13 @@ describe("ShardReader", function () { mockPeerStore.get.resolves(mockPeer); - const result = await shardReader.isPeerOnShard( + const shardReaderCluster5 = new ShardReader({ + libp2p: mockLibp2p as any, + networkConfig: { clusterId: 5 } + }); + + const result = await shardReaderCluster5.isPeerOnShard( testPeerId, - 5, testShardIndex ); @@ -211,7 +214,6 @@ describe("ShardReader", function () { const result = await shardReader.isPeerOnShard( testPeerId, - testClusterId, testShardIndex + 100 ); @@ -223,7 +225,6 @@ describe("ShardReader", function () { const result = await shardReader.isPeerOnShard( testPeerId, - testClusterId, testShardIndex ); diff --git a/packages/core/src/lib/connection_manager/shard_reader.ts b/packages/core/src/lib/connection_manager/shard_reader.ts index 2dade0de99..0867795ca4 100644 --- a/packages/core/src/lib/connection_manager/shard_reader.ts +++ b/packages/core/src/lib/connection_manager/shard_reader.ts @@ -20,7 +20,7 @@ type ShardReaderConstructorOptions = { networkConfig: NetworkConfig; }; -interface IShardReader { +export interface IShardReader { hasShardInfo(id: PeerId): Promise; isPeerOnCluster(id: PeerId): Promise; isPeerOnShard( @@ -66,7 +66,8 @@ export class ShardReader implements IShardReader { ): Promise { try { const { clusterId, shard } = pubsubTopicToSingleShardInfo(pubsubTopic); - return await this.isPeerOnShard(id, clusterId, shard); + if (clusterId !== this.clusterId) return false; + return await this.isPeerOnShard(id, shard); } catch (error) { log.error( `Error comparing pubsub topic ${pubsubTopic} with shard info for ${id}`, @@ -76,14 +77,10 @@ export class ShardReader implements IShardReader { } } - public async isPeerOnShard( - id: PeerId, - clusterId: ClusterId, - shard: ShardId - ): Promise { + public async isPeerOnShard(id: PeerId, shard: ShardId): Promise { const peerShardInfo = await this.getRelayShards(id); log.info( - `Checking if peer on same shard: this { clusterId: ${clusterId}, shardId: ${shard} },` + + `Checking if peer on same shard: this { clusterId: ${this.clusterId}, shardId: ${shard} },` + `${id} { clusterId: ${peerShardInfo?.clusterId}, shards: ${peerShardInfo?.shards} }` ); if (!peerShardInfo) { @@ -91,7 +88,7 @@ export class ShardReader implements IShardReader { } return ( - peerShardInfo.clusterId === clusterId && + peerShardInfo.clusterId === this.clusterId && peerShardInfo.shards.includes(shard) ); } diff --git a/packages/interfaces/src/connection_manager.ts b/packages/interfaces/src/connection_manager.ts index 301ca94155..d863bfe3af 100644 --- a/packages/interfaces/src/connection_manager.ts +++ b/packages/interfaces/src/connection_manager.ts @@ -1,6 +1,8 @@ import type { Peer, PeerId, Stream } from "@libp2p/interface"; import type { MultiaddrInput } from "@multiformats/multiaddr"; +import { ShardId } from "./sharding.js"; + // Peer tags export enum Tags { BOOTSTRAP = "bootstrap", @@ -161,4 +163,14 @@ export interface IConnectionManager { * @returns Promise resolving to true if the peer has shard info, false otherwise */ hasShardInfo(peerId: PeerId): Promise; + + /** + * Returns true if the passed peer is on the passed pubsub topic + */ + isPeerOnTopic(peerId: PeerId, pubsubTopic: string): Promise; + + /** + * Returns true if the passed peer is on the passed shard + */ + isPeerOnShard(peerId: PeerId, shardId: ShardId): Promise; } diff --git a/packages/sdk/src/peer_manager/peer_manager.ts b/packages/sdk/src/peer_manager/peer_manager.ts index a42baf7215..48c7a1efe2 100644 --- a/packages/sdk/src/peer_manager/peer_manager.ts +++ b/packages/sdk/src/peer_manager/peer_manager.ts @@ -4,14 +4,10 @@ import { PeerId, TypedEventEmitter } from "@libp2p/interface"; -import { - ConnectionManager, - FilterCodecs, - LightPushCodec, - StoreCodec -} from "@waku/core"; +import { FilterCodecs, LightPushCodec, StoreCodec } from "@waku/core"; import { CONNECTION_LOCKED_TAG, + type IConnectionManager, Libp2p, Libp2pEventHandler, Protocols @@ -29,7 +25,7 @@ type PeerManagerConfig = { type PeerManagerParams = { libp2p: Libp2p; config?: PeerManagerConfig; - connectionManager: ConnectionManager; + connectionManager: IConnectionManager; }; type GetPeersParams = { @@ -67,7 +63,7 @@ export class PeerManager { private readonly numPeersToUse: number; private readonly libp2p: Libp2p; - private readonly connectionManager: ConnectionManager; + private readonly connectionManager: IConnectionManager; private readonly lockedPeers = new Set(); private readonly unlockedPeers = new Map();