mirror of
https://github.com/waku-org/js-waku.git
synced 2025-02-21 16:48:29 +00:00
chore: improvements
This commit is contained in:
parent
75fcca4cd9
commit
8caa220e8f
@ -66,8 +66,11 @@ export class BaseProtocol implements IBaseProtocolCore {
|
||||
public async connectedPeers(): Promise<Peer[]> {
|
||||
const peers = await this.allPeers();
|
||||
return peers.filter((peer) => {
|
||||
return (
|
||||
this.components.connectionManager.getConnections(peer.id).length > 0
|
||||
const connections = this.components.connectionManager.getConnections(
|
||||
peer.id
|
||||
);
|
||||
return connections.some((c) =>
|
||||
c.streams.some((s) => s.protocol === this.multicodec)
|
||||
);
|
||||
});
|
||||
}
|
||||
|
@ -25,7 +25,7 @@ export type IBaseProtocolCore = {
|
||||
|
||||
export type IBaseProtocolSDK = {
|
||||
readonly connectedPeers: Peer[];
|
||||
renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>;
|
||||
renewPeer: (peerToDisconnect: PeerId) => Promise<Peer | undefined>;
|
||||
readonly numPeersToUse: number;
|
||||
};
|
||||
|
||||
|
@ -56,14 +56,9 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
|
||||
* @param peerToDisconnect The peer to disconnect from.
|
||||
* @returns The new peer that was found and connected to.
|
||||
*/
|
||||
public async renewPeer(peerToDisconnect: PeerId): Promise<Peer> {
|
||||
public async renewPeer(peerToDisconnect: PeerId): Promise<Peer | undefined> {
|
||||
this.log.info(`Renewing peer ${peerToDisconnect}`);
|
||||
|
||||
const peer = (await this.findAndAddPeers(1))[0];
|
||||
if (!peer) {
|
||||
throw Error("Failed to find a new peer to replace the disconnected one.");
|
||||
}
|
||||
|
||||
const updatedPeers = this.peers.filter(
|
||||
(peer) => !peer.id.equals(peerToDisconnect)
|
||||
);
|
||||
@ -74,9 +69,17 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
|
||||
`Peer ${peerToDisconnect} disconnected and removed from the peer list`
|
||||
);
|
||||
|
||||
const newPeer = await this.findAndAddPeers(1);
|
||||
if (newPeer.length === 0) {
|
||||
this.log.error(
|
||||
"Failed to find a new peer to replace the disconnected one."
|
||||
);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
this.renewPeersLocker.lock(peerToDisconnect);
|
||||
|
||||
return peer;
|
||||
return newPeer[0];
|
||||
}
|
||||
|
||||
/**
|
||||
@ -171,6 +174,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
|
||||
}
|
||||
|
||||
this.maintainPeersLock = true;
|
||||
await this.confirmPeers();
|
||||
this.log.info(`Maintaining peers, current count: ${this.peers.length}`);
|
||||
try {
|
||||
const numPeersToAdd = this.numPeersToUse - this.peers.length;
|
||||
@ -187,6 +191,25 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
|
||||
return true;
|
||||
}
|
||||
|
||||
private async confirmPeers(): Promise<void> {
|
||||
const connectedPeers = await this.core.connectedPeers();
|
||||
const currentPeers = this.peers;
|
||||
const peersToAdd = connectedPeers.filter(
|
||||
(p) => !currentPeers.some((cp) => cp.id.equals(p.id))
|
||||
);
|
||||
const peersToRemove = currentPeers.filter(
|
||||
(p) => !connectedPeers.some((cp) => cp.id.equals(p.id))
|
||||
);
|
||||
|
||||
peersToAdd.forEach((p) => this.peers.push(p));
|
||||
peersToRemove.forEach((p) => {
|
||||
const index = this.peers.findIndex((cp) => cp.id.equals(p.id));
|
||||
if (index !== -1) this.peers.splice(index, 1);
|
||||
});
|
||||
|
||||
this.updatePeers(this.peers);
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds and adds new peers to the peers list.
|
||||
* @param numPeers The number of peers to find and add.
|
||||
|
@ -41,7 +41,9 @@ export class SubscriptionManager implements ISubscription {
|
||||
private readonly protocol: FilterCore,
|
||||
private readonly connectionManager: ConnectionManager,
|
||||
private readonly getPeers: () => Peer[],
|
||||
private readonly renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>
|
||||
private readonly renewPeer: (
|
||||
peerToDisconnect: PeerId
|
||||
) => Promise<Peer | undefined>
|
||||
) {
|
||||
this.pubsubTopic = pubsubTopic;
|
||||
this.subscriptionCallbacks = new Map();
|
||||
@ -299,9 +301,9 @@ export class SubscriptionManager implements ISubscription {
|
||||
}
|
||||
|
||||
this.keepAliveTimer = setInterval(() => {
|
||||
void this.ping().catch((error) => {
|
||||
log.error("Error in keep-alive ping cycle:", error);
|
||||
});
|
||||
void this.ping()
|
||||
.then(() => log.info("Keep-alive ping successful"))
|
||||
.catch((error) => log.error("Error in keep-alive ping cycle:", error));
|
||||
}, interval) as unknown as number;
|
||||
}
|
||||
|
||||
|
@ -18,7 +18,7 @@ export class ReliabilityMonitorManager {
|
||||
public static createReceiverMonitor(
|
||||
pubsubTopic: PubsubTopic,
|
||||
getPeers: () => Peer[],
|
||||
renewPeer: (peerId: PeerId) => Promise<Peer>,
|
||||
renewPeer: (peerId: PeerId) => Promise<Peer | undefined>,
|
||||
getContentTopics: () => ContentTopic[],
|
||||
protocolSubscribe: (
|
||||
pubsubTopic: PubsubTopic,
|
||||
@ -42,7 +42,7 @@ export class ReliabilityMonitorManager {
|
||||
}
|
||||
|
||||
public static createSenderMonitor(
|
||||
renewPeer: (peerId: PeerId) => Promise<Peer>
|
||||
renewPeer: (peerId: PeerId) => Promise<Peer | undefined>
|
||||
): SenderReliabilityMonitor {
|
||||
if (!ReliabilityMonitorManager.senderMonitor) {
|
||||
ReliabilityMonitorManager.senderMonitor = new SenderReliabilityMonitor(
|
||||
|
@ -32,7 +32,7 @@ export class ReceiverReliabilityMonitor {
|
||||
public constructor(
|
||||
private readonly pubsubTopic: PubsubTopic,
|
||||
private getPeers: () => Peer[],
|
||||
private renewPeer: (peerId: PeerId) => Promise<Peer>,
|
||||
private renewPeer: (peerId: PeerId) => Promise<Peer | undefined>,
|
||||
private getContentTopics: () => ContentTopic[],
|
||||
private protocolSubscribe: (
|
||||
pubsubTopic: PubsubTopic,
|
||||
@ -163,15 +163,21 @@ export class ReceiverReliabilityMonitor {
|
||||
private async renewAndSubscribePeer(
|
||||
peerId: PeerId
|
||||
): Promise<Peer | undefined> {
|
||||
const peerIdStr = peerId.toString();
|
||||
try {
|
||||
if (this.peerRenewalLocks.has(peerId.toString())) {
|
||||
log.info(`Peer ${peerId.toString()} is already being renewed.`);
|
||||
if (this.peerRenewalLocks.has(peerIdStr)) {
|
||||
log.info(`Peer ${peerIdStr} is already being renewed.`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.peerRenewalLocks.add(peerId.toString());
|
||||
this.peerRenewalLocks.add(peerIdStr);
|
||||
|
||||
const newPeer = await this.renewPeer(peerId);
|
||||
if (!newPeer) {
|
||||
log.warn(`Failed to renew peer ${peerIdStr}: No new peer found.`);
|
||||
return;
|
||||
}
|
||||
|
||||
await this.protocolSubscribe(
|
||||
this.pubsubTopic,
|
||||
newPeer,
|
||||
@ -181,16 +187,16 @@ export class ReceiverReliabilityMonitor {
|
||||
this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set();
|
||||
this.missedMessagesByPeer.set(newPeer.id.toString(), 0);
|
||||
|
||||
this.peerFailures.delete(peerId.toString());
|
||||
this.missedMessagesByPeer.delete(peerId.toString());
|
||||
delete this.receivedMessagesHashes.nodes[peerId.toString()];
|
||||
this.peerFailures.delete(peerIdStr);
|
||||
this.missedMessagesByPeer.delete(peerIdStr);
|
||||
delete this.receivedMessagesHashes.nodes[peerIdStr];
|
||||
|
||||
return newPeer;
|
||||
} catch (error) {
|
||||
log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`);
|
||||
log.error(`Failed to renew peer ${peerIdStr}: ${error}.`);
|
||||
return;
|
||||
} finally {
|
||||
this.peerRenewalLocks.delete(peerId.toString());
|
||||
this.peerRenewalLocks.delete(peerIdStr);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -11,7 +11,9 @@ export class SenderReliabilityMonitor {
|
||||
private readonly maxAttemptsBeforeRenewal =
|
||||
DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL;
|
||||
|
||||
public constructor(private renewPeer: (peerId: PeerId) => Promise<Peer>) {}
|
||||
public constructor(
|
||||
private renewPeer: (peerId: PeerId) => Promise<Peer | undefined>
|
||||
) {}
|
||||
|
||||
public async attemptRetriesOrRenew(
|
||||
peerId: PeerId,
|
||||
@ -42,13 +44,19 @@ export class SenderReliabilityMonitor {
|
||||
} else {
|
||||
try {
|
||||
const newPeer = await this.renewPeer(peerId);
|
||||
log.info(
|
||||
`Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}`
|
||||
);
|
||||
if (newPeer) {
|
||||
log.info(
|
||||
`Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}`
|
||||
);
|
||||
|
||||
this.attempts.delete(peerIdStr);
|
||||
this.attempts.set(newPeer.id.toString(), 0);
|
||||
await protocolSend();
|
||||
this.attempts.delete(peerIdStr);
|
||||
this.attempts.set(newPeer.id.toString(), 0);
|
||||
await protocolSend();
|
||||
} else {
|
||||
log.error(
|
||||
`Failed to renew peer ${peerId.toString()}: New peer is undefined`
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
log.error(`Failed to renew peer ${peerId.toString()}: ${error}`);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user