make ConnectionManager use ctor

This commit is contained in:
Sasha 2024-10-19 17:16:10 +02:00
parent c93b58ed8c
commit 7efbd26609
No known key found for this signature in database
6 changed files with 29 additions and 52 deletions

View File

@ -27,11 +27,21 @@ export const DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED = 1;
export const DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER = 3;
export const DEFAULT_MAX_PARALLEL_DIALS = 3;
type ConnectionManagerConstructorOptions = {
libp2p: Libp2p;
keepAliveOptions: KeepAliveOptions;
pubsubTopics: PubsubTopic[];
relay?: IRelay;
config?: Partial<ConnectionManagerOptions>;
};
export class ConnectionManager
extends TypedEventEmitter<IPeersByDiscoveryEvents & IConnectionStateEvents>
implements IConnectionManager
{
private static instances = new Map<string, ConnectionManager>();
// TODO(weboko): make it private
public readonly pubsubTopics: PubsubTopic[];
private keepAliveManager: KeepAliveManager;
private options: ConnectionManagerOptions;
private libp2p: Libp2p;
@ -51,29 +61,6 @@ export class ConnectionManager
return this.isP2PNetworkConnected;
}
public static create(
peerId: string,
libp2p: Libp2p,
keepAliveOptions: KeepAliveOptions,
pubsubTopics: PubsubTopic[],
relay?: IRelay,
options?: ConnectionManagerOptions
): ConnectionManager {
let instance = ConnectionManager.instances.get(peerId);
if (!instance) {
instance = new ConnectionManager(
libp2p,
keepAliveOptions,
pubsubTopics,
relay,
options
);
ConnectionManager.instances.set(peerId, instance);
}
return instance;
}
public stop(): void {
this.keepAliveManager.stopAll();
this.libp2p.removeEventListener(
@ -156,27 +143,21 @@ export class ConnectionManager
};
}
private constructor(
libp2p: Libp2p,
keepAliveOptions: KeepAliveOptions,
public readonly configuredPubsubTopics: PubsubTopic[],
relay?: IRelay,
options?: Partial<ConnectionManagerOptions>
) {
public constructor(options: ConnectionManagerConstructorOptions) {
super();
this.libp2p = libp2p;
this.configuredPubsubTopics = configuredPubsubTopics;
this.libp2p = options.libp2p;
this.pubsubTopics = options.pubsubTopics;
this.options = {
maxDialAttemptsForPeer: DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER,
maxBootstrapPeersAllowed: DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED,
maxParallelDials: DEFAULT_MAX_PARALLEL_DIALS,
...options
...options.config
};
this.keepAliveManager = new KeepAliveManager({
relay,
libp2p,
options: keepAliveOptions
relay: options.relay,
libp2p: options.libp2p,
options: options.keepAliveOptions
});
this.startEventListeners()
@ -478,7 +459,7 @@ export class ConnectionManager
log.warn(
`Discovered peer ${peerId.toString()} with ShardInfo ${shardInfo} is not part of any of the configured pubsub topics (${
this.configuredPubsubTopics
this.pubsubTopics
}).
Not dialing.`
);
@ -573,7 +554,7 @@ export class ConnectionManager
const pubsubTopics = shardInfoToPubsubTopics(shardInfo);
const isTopicConfigured = pubsubTopics.some((topic) =>
this.configuredPubsubTopics.includes(topic)
this.pubsubTopics.includes(topic)
);
return isTopicConfigured;
}

View File

@ -63,7 +63,7 @@ export interface IConnectionStateEvents {
export interface IConnectionManager
extends TypedEventEmitter<IPeersByDiscoveryEvents & IConnectionStateEvents> {
configuredPubsubTopics: PubsubTopic[];
pubsubTopics: PubsubTopic[];
dropConnection(peerId: PeerId): Promise<void>;
getPeersByDiscovery(): Promise<PeersByDiscoveryResult>;
stop(): void;

View File

@ -56,7 +56,7 @@ class Filter extends BaseProtocolSDK implements IFilter {
await subscription.processIncomingMessage(wakuMessage, peerIdStr);
},
connectionManager.configuredPubsubTopics,
connectionManager.pubsubTopics,
libp2p
),
connectionManager,

View File

@ -41,10 +41,7 @@ export class LightPush implements ILightPush {
options?: ProtocolCreateOptions
) {
this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE;
this.protocol = new LightPushCore(
connectionManager.configuredPubsubTopics,
libp2p
);
this.protocol = new LightPushCore(connectionManager.pubsubTopics, libp2p);
}
public async send(

View File

@ -25,7 +25,7 @@ export class Store extends BaseProtocolSDK implements IStore {
public constructor(connectionManager: ConnectionManager, libp2p: Libp2p) {
super(
new StoreCore(connectionManager.configuredPubsubTopics, libp2p),
new StoreCore(connectionManager.pubsubTopics, libp2p),
connectionManager,
{
numPeersToUse: DEFAULT_NUM_PEERS

View File

@ -95,13 +95,12 @@ export class WakuNode implements IWaku {
const peerId = this.libp2p.peerId.toString();
this.connectionManager = ConnectionManager.create(
peerId,
this.connectionManager = new ConnectionManager({
libp2p,
{ pingKeepAlive, relayKeepAlive },
this.pubsubTopics,
this.relay
);
keepAliveOptions: { pingKeepAlive, relayKeepAlive },
pubsubTopics: this.pubsubTopics,
relay: this.relay
});
this.health = getHealthManager();