mirror of
https://github.com/logos-messaging/logos-messaging-js.git
synced 2026-01-17 07:23:09 +00:00
add dialTimeout, change dialingQueue to Map
This commit is contained in:
parent
74ad13ba24
commit
4526a9a963
@ -65,7 +65,8 @@ describe("ConnectionLimiter", () => {
|
||||
enableAutoRecovery: true,
|
||||
maxDialingPeers: 3,
|
||||
failedDialCooldown: 60,
|
||||
dialCooldown: 10
|
||||
dialCooldown: 10,
|
||||
dialTimeout: 30
|
||||
};
|
||||
|
||||
function createLimiter(
|
||||
|
||||
@ -29,6 +29,7 @@ const DEFAULT_MAX_CONNECTIONS = 10;
|
||||
const DEFAULT_MAX_DIALING_PEERS = 3;
|
||||
const DEFAULT_FAILED_DIAL_COOLDOWN_SEC = 60;
|
||||
const DEFAULT_DIAL_COOLDOWN_SEC = 10;
|
||||
const DEFAULT_DIAL_TIMEOUT_SEC = 30;
|
||||
|
||||
type ConnectionManagerConstructorOptions = {
|
||||
libp2p: Libp2p;
|
||||
@ -61,6 +62,7 @@ export class ConnectionManager implements IConnectionManager {
|
||||
maxDialingPeers: DEFAULT_MAX_DIALING_PEERS,
|
||||
failedDialCooldown: DEFAULT_FAILED_DIAL_COOLDOWN_SEC,
|
||||
dialCooldown: DEFAULT_DIAL_COOLDOWN_SEC,
|
||||
dialTimeout: DEFAULT_DIAL_TIMEOUT_SEC,
|
||||
...options.config
|
||||
};
|
||||
|
||||
|
||||
@ -39,6 +39,7 @@ describe("Dialer", () => {
|
||||
maxDialingPeers: 3,
|
||||
failedDialCooldown: 60,
|
||||
dialCooldown: 10,
|
||||
dialTimeout: 30,
|
||||
maxConnections: 10,
|
||||
enableAutoRecovery: true
|
||||
};
|
||||
|
||||
@ -23,7 +23,7 @@ export class Dialer implements IDialer {
|
||||
private readonly shardReader: ShardReader;
|
||||
private readonly options: ConnectionManagerOptions;
|
||||
|
||||
private dialingQueue: PeerId[] = [];
|
||||
private dialingQueue: Map<string, PeerId> = new Map();
|
||||
private dialHistory: Map<string, number> = new Map();
|
||||
private failedDials: Map<string, number> = new Map();
|
||||
private dialingInterval: NodeJS.Timeout | null = null;
|
||||
@ -70,7 +70,7 @@ export class Dialer implements IDialer {
|
||||
return;
|
||||
}
|
||||
|
||||
const isEmptyQueue = this.dialingQueue.length === 0;
|
||||
const isEmptyQueue = this.dialingQueue.size === 0;
|
||||
const isNotDialing = !this.isProcessing && !this.isImmediateDialing;
|
||||
|
||||
// If queue is empty and we're not currently processing, dial immediately
|
||||
@ -81,29 +81,28 @@ export class Dialer implements IDialer {
|
||||
this.isImmediateDialing = false;
|
||||
log.info("Released immediate dial lock");
|
||||
} else {
|
||||
this.dialingQueue.push(peerId);
|
||||
this.dialingQueue.set(peerId.toString(), peerId);
|
||||
log.info(
|
||||
`Added peer to dialing queue, queue size: ${this.dialingQueue.length}`
|
||||
`Added peer to dialing queue, queue size: ${this.dialingQueue.size}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private async processQueue(): Promise<void> {
|
||||
if (this.dialingQueue.length === 0 || this.isProcessing) {
|
||||
if (this.dialingQueue.size === 0 || this.isProcessing) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.isProcessing = true;
|
||||
|
||||
try {
|
||||
const peersToDial = this.dialingQueue.slice(
|
||||
0,
|
||||
this.options.maxDialingPeers
|
||||
);
|
||||
this.dialingQueue = this.dialingQueue.slice(peersToDial.length);
|
||||
const allPeers = Array.from(this.dialingQueue.values());
|
||||
const peersToDial = allPeers.slice(0, this.options.maxDialingPeers);
|
||||
|
||||
peersToDial.forEach((peer) => this.dialingQueue.delete(peer.toString()));
|
||||
|
||||
log.info(
|
||||
`Processing dial queue: dialing ${peersToDial.length} peers, ${this.dialingQueue.length} remaining in queue`
|
||||
`Processing dial queue: dialing ${peersToDial.length} peers, ${this.dialingQueue.size} remaining in queue`
|
||||
);
|
||||
|
||||
await Promise.all(peersToDial.map((peerId) => this.dialPeer(peerId)));
|
||||
@ -116,7 +115,19 @@ export class Dialer implements IDialer {
|
||||
try {
|
||||
log.info(`Dialing peer from queue: ${peerId}`);
|
||||
|
||||
await this.libp2p.dial(peerId);
|
||||
await Promise.race([
|
||||
this.libp2p.dial(peerId),
|
||||
new Promise<never>((_, reject) =>
|
||||
setTimeout(
|
||||
() =>
|
||||
reject(
|
||||
new Error(`Dial timeout after ${this.options.dialTimeout}s`)
|
||||
),
|
||||
this.options.dialTimeout * 1000
|
||||
)
|
||||
)
|
||||
]);
|
||||
|
||||
this.dialHistory.set(peerId.toString(), Date.now());
|
||||
this.failedDials.delete(peerId.toString());
|
||||
|
||||
|
||||
@ -74,6 +74,13 @@ export type ConnectionManagerOptions = {
|
||||
* @default 10 seconds
|
||||
*/
|
||||
dialCooldown: number;
|
||||
|
||||
/**
|
||||
* Time to wait for a dial attempt to complete before timing out.
|
||||
*
|
||||
* @default 30 seconds
|
||||
*/
|
||||
dialTimeout: number;
|
||||
};
|
||||
|
||||
export interface IConnectionManager {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user