chore: improve StreamManager (#1994)

* chore: improve stream manager

* chore: some cleaning

* chore: update API

* chore: rename `getConnectionStatus` to `isConnected`

* chore: use throw/catch

* chore: remove await

* remove redundant function and rename

* chore: increase test timeout
This commit is contained in:
Danish Arora 2024-05-14 16:31:38 +05:30 committed by GitHub
parent 6f188ef379
commit e49e7289ae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 143 additions and 76 deletions

78
package-lock.json generated
View File

@ -36494,14 +36494,14 @@
},
"packages/core": {
"name": "@waku/core",
"version": "0.0.28",
"version": "0.0.29",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@libp2p/ping": "^1.0.12",
"@waku/enr": "^0.0.22",
"@waku/interfaces": "0.0.23",
"@waku/proto": "0.0.6",
"@waku/utils": "0.0.16",
"@waku/enr": "^0.0.23",
"@waku/interfaces": "0.0.24",
"@waku/proto": "0.0.7",
"@waku/utils": "0.0.17",
"debug": "^4.3.4",
"it-all": "^3.0.4",
"it-length-prefixed": "^9.0.4",
@ -36552,14 +36552,14 @@
},
"packages/discovery": {
"name": "@waku/discovery",
"version": "0.0.1",
"version": "0.0.2",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@waku/core": "0.0.28",
"@waku/enr": "0.0.22",
"@waku/interfaces": "0.0.23",
"@waku/proto": "^0.0.6",
"@waku/utils": "0.0.16",
"@waku/core": "0.0.29",
"@waku/enr": "0.0.23",
"@waku/interfaces": "0.0.24",
"@waku/proto": "^0.0.7",
"@waku/utils": "0.0.17",
"debug": "^4.3.4",
"dns-query": "^0.11.2",
"hi-base32": "^0.5.1",
@ -36606,7 +36606,7 @@
},
"packages/enr": {
"name": "@waku/enr",
"version": "0.0.22",
"version": "0.0.23",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@ethersproject/rlp": "^5.7.0",
@ -36614,7 +36614,7 @@
"@libp2p/peer-id": "^4.0.4",
"@multiformats/multiaddr": "^12.0.0",
"@noble/secp256k1": "^1.7.1",
"@waku/utils": "0.0.16",
"@waku/utils": "0.0.17",
"debug": "^4.3.4",
"js-sha3": "^0.9.2"
},
@ -36626,7 +36626,7 @@
"@types/chai": "^4.3.11",
"@types/mocha": "^10.0.6",
"@waku/build-utils": "*",
"@waku/interfaces": "0.0.23",
"@waku/interfaces": "0.0.24",
"chai": "^4.3.10",
"cspell": "^8.6.1",
"fast-check": "^3.15.1",
@ -36655,10 +36655,10 @@
},
"packages/interfaces": {
"name": "@waku/interfaces",
"version": "0.0.23",
"version": "0.0.24",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@waku/proto": "^0.0.6"
"@waku/proto": "^0.0.7"
},
"devDependencies": {
"@chainsafe/libp2p-gossipsub": "^12.0.0",
@ -36673,14 +36673,14 @@
},
"packages/message-encryption": {
"name": "@waku/message-encryption",
"version": "0.0.26",
"version": "0.0.27",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@noble/secp256k1": "^1.7.1",
"@waku/core": "0.0.28",
"@waku/interfaces": "0.0.23",
"@waku/proto": "0.0.6",
"@waku/utils": "0.0.16",
"@waku/core": "0.0.29",
"@waku/interfaces": "0.0.24",
"@waku/proto": "0.0.7",
"@waku/utils": "0.0.17",
"debug": "^4.3.4",
"js-sha3": "^0.9.2",
"uint8arrays": "^5.0.1"
@ -36717,11 +36717,11 @@
},
"packages/message-hash": {
"name": "@waku/message-hash",
"version": "0.1.12",
"version": "0.1.13",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@noble/hashes": "^1.3.2",
"@waku/utils": "0.0.16"
"@waku/utils": "0.0.17"
},
"devDependencies": {
"@rollup/plugin-commonjs": "^25.0.7",
@ -36731,7 +36731,7 @@
"@types/debug": "^4.1.12",
"@types/mocha": "^10.0.6",
"@waku/build-utils": "*",
"@waku/interfaces": "0.0.23",
"@waku/interfaces": "0.0.24",
"chai": "^4.3.10",
"cspell": "^8.6.1",
"fast-check": "^3.15.1",
@ -36757,7 +36757,7 @@
},
"packages/proto": {
"name": "@waku/proto",
"version": "0.0.6",
"version": "0.0.7",
"license": "MIT OR Apache-2.0",
"dependencies": {
"protons-runtime": "^5.4.0"
@ -36799,15 +36799,15 @@
},
"packages/relay": {
"name": "@waku/relay",
"version": "0.0.11",
"version": "0.0.12",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@chainsafe/libp2p-gossipsub": "^12.0.0",
"@noble/hashes": "^1.3.2",
"@waku/core": "0.0.28",
"@waku/interfaces": "0.0.23",
"@waku/proto": "0.0.6",
"@waku/utils": "0.0.16",
"@waku/core": "0.0.29",
"@waku/interfaces": "0.0.24",
"@waku/proto": "0.0.7",
"@waku/utils": "0.0.17",
"chai": "^4.3.10",
"debug": "^4.3.4",
"fast-check": "^3.15.1"
@ -36840,7 +36840,7 @@
},
"packages/sdk": {
"name": "@waku/sdk",
"version": "0.0.24",
"version": "0.0.25",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@chainsafe/libp2p-noise": "^14.1.0",
@ -36850,12 +36850,12 @@
"@libp2p/ping": "^1.0.12",
"@libp2p/websockets": "^8.0.11",
"@noble/hashes": "^1.3.3",
"@waku/core": "0.0.28",
"@waku/discovery": "0.0.1",
"@waku/interfaces": "0.0.23",
"@waku/proto": "^0.0.6",
"@waku/relay": "0.0.11",
"@waku/utils": "0.0.16",
"@waku/core": "0.0.29",
"@waku/discovery": "0.0.2",
"@waku/interfaces": "0.0.24",
"@waku/proto": "^0.0.7",
"@waku/relay": "0.0.12",
"@waku/utils": "0.0.17",
"libp2p": "^1.1.2"
},
"devDependencies": {
@ -36940,11 +36940,11 @@
},
"packages/utils": {
"name": "@waku/utils",
"version": "0.0.16",
"version": "0.0.17",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@noble/hashes": "^1.3.2",
"@waku/interfaces": "0.0.23",
"@waku/interfaces": "0.0.24",
"chai": "^4.3.10",
"debug": "^4.3.4",
"uint8arrays": "^5.0.1"

View File

@ -98,18 +98,15 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
};
}
let stream: Stream | undefined;
let stream: Stream;
try {
stream = await this.getStream(peer);
} catch (err) {
log.error(
`Failed to get a stream for remote peer${peer.id.toString()}`,
err
);
} catch (error) {
log.error("Failed to get stream", error);
return {
success: null,
failure: {
error: ProtocolError.REMOTE_PEER_FAULT,
error: ProtocolError.NO_STREAM_AVAILABLE,
peerId: peer.id
}
};

View File

@ -85,7 +85,16 @@ class Metadata extends BaseProtocol implements IMetadata {
};
}
const stream = await this.getStream(peer);
let stream;
try {
stream = await this.getStream(peer);
} catch (error) {
log.error("Failed to get stream", error);
return {
shardInfo: null,
error: ProtocolError.NO_STREAM_AVAILABLE
};
}
const encodedResponse = await pipe(
[request],

View File

@ -92,7 +92,13 @@ export class StoreCore extends BaseProtocol implements IStoreCore {
const historyRpcQuery = HistoryRpc.createQuery(queryOpts);
const stream = await this.getStream(peer);
let stream;
try {
stream = await this.getStream(peer);
} catch (e) {
log.error("Failed to get stream", e);
break;
}
const res = await pipe(
[historyRpcQuery.encode()],

View File

@ -1,11 +1,15 @@
import type { PeerUpdate, Stream } from "@libp2p/interface";
import { Peer } from "@libp2p/interface";
import type { Peer, PeerId } from "@libp2p/interface";
import { Libp2p } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { selectConnection } from "@waku/utils/libp2p";
const CONNECTION_TIMEOUT = 5_000;
const RETRY_BACKOFF_BASE = 1_000;
const MAX_RETRIES = 3;
export class StreamManager {
private streamPool: Map<string, Promise<Stream | void>>;
private readonly streamPool: Map<string, Promise<Stream | void>>;
private readonly log: Logger;
constructor(
@ -14,12 +18,8 @@ export class StreamManager {
public addEventListener: Libp2p["addEventListener"]
) {
this.log = new Logger(`stream-manager:${multicodec}`);
this.addEventListener(
"peer:update",
this.handlePeerUpdateStreamPool.bind(this)
);
this.getStream = this.getStream.bind(this);
this.streamPool = new Map();
this.addEventListener("peer:update", this.handlePeerUpdateStreamPool);
}
public async getStream(peer: Peer): Promise<Stream> {
@ -27,47 +27,88 @@ export class StreamManager {
const streamPromise = this.streamPool.get(peerIdStr);
if (!streamPromise) {
return this.newStream(peer); // fallback by creating a new stream on the spot
return this.createStream(peer);
}
// We have the stream, let's remove it from the map
this.streamPool.delete(peerIdStr);
this.prepareStream(peer);
this.prepareNewStream(peer);
const stream = await streamPromise;
if (!stream || stream.status === "closed") {
return this.newStream(peer); // fallback by creating a new stream on the spot
try {
const stream = await streamPromise;
if (stream && stream.status !== "closed") {
return stream;
}
} catch (error) {
this.log.warn(`Failed to get stream for ${peerIdStr} -- `, error);
this.log.warn("Attempting to create a new stream for the peer");
}
return stream;
return this.createStream(peer);
}
private async newStream(peer: Peer): Promise<Stream> {
private async createStream(peer: Peer, retries = 0): Promise<Stream> {
const connections = this.getConnections(peer.id);
const connection = selectConnection(connections);
if (!connection) {
throw new Error("Failed to get a connection to the peer");
}
return connection.newStream(this.multicodec);
try {
return await connection.newStream(this.multicodec);
} catch (error) {
if (retries < MAX_RETRIES) {
const backoff = RETRY_BACKOFF_BASE * Math.pow(2, retries);
await new Promise((resolve) => setTimeout(resolve, backoff));
return this.createStream(peer, retries + 1);
}
throw new Error(
`Failed to create a new stream for ${peer.id.toString()} -- ` + error
);
}
}
private prepareNewStream(peer: Peer): void {
const streamPromise = this.newStream(peer).catch(() => {
// No error thrown as this call is not triggered by the user
private prepareStream(peer: Peer): void {
const timeoutPromise = new Promise<void>((resolve) =>
setTimeout(resolve, CONNECTION_TIMEOUT)
);
const streamPromise = Promise.race([
this.createStream(peer),
timeoutPromise.then(() => {
throw new Error("Connection timeout");
})
]).catch((error) => {
this.log.error(
`Failed to prepare a new stream for ${peer.id.toString()}`
`Failed to prepare a new stream for ${peer.id.toString()} -- `,
error
);
});
this.streamPool.set(peer.id.toString(), streamPromise);
}
private handlePeerUpdateStreamPool = (evt: CustomEvent<PeerUpdate>): void => {
const peer = evt.detail.peer;
const { peer } = evt.detail;
if (peer.protocols.includes(this.multicodec)) {
this.log.info(`Preemptively opening a stream to ${peer.id.toString()}`);
this.prepareNewStream(peer);
const isConnected = this.isConnectedTo(peer.id);
if (isConnected) {
this.log.info(`Preemptively opening a stream to ${peer.id.toString()}`);
this.prepareStream(peer);
} else {
const peerIdStr = peer.id.toString();
this.streamPool.delete(peerIdStr);
this.log.info(
`Removed pending stream for disconnected peer ${peerIdStr}`
);
}
}
};
private isConnectedTo(peerId: PeerId): boolean {
const connections = this.getConnections(peerId);
return connections.some((connection) => connection.status === "open");
}
}

View File

@ -51,7 +51,16 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
};
}
const stream = await this.getStream(peer);
let stream;
try {
stream = await this.getStream(peer);
} catch (err) {
log.error("Failed to get stream", err);
return {
peerInfos: null,
error: ProtocolError.NO_STREAM_AVAILABLE
};
}
const res = await pipe(
[rpcQuery.encode()],

View File

@ -147,6 +147,11 @@ export enum ProtocolError {
* on the connection manager before retrying.
*/
NO_PEER_AVAILABLE = "No peer available",
/**
* Failure to find a stream to the peer. This may be because the connection with the peer is not still alive.
* Mitigation can be: retrying after a given time period, or mitigation for `NO_PEER_AVAILABLE` can be used.
*/
NO_STREAM_AVAILABLE = "No stream available",
/**
* The remote peer did not behave as expected. Mitigation for `NO_PEER_AVAILABLE`
* or `DECODE_FAILED` can be used.

View File

@ -249,7 +249,7 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
});
it("Subscribe to 100 topics (new limit) at once and receives messages", async function () {
this.timeout(50000);
this.timeout(100_000);
const topicCount = 100;
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });

View File

@ -293,7 +293,7 @@ const runTests = (strictCheckNodes: boolean): void => {
});
it("Subscribe to 100 topics (new limit) at once and receives messages", async function () {
this.timeout(50000);
this.timeout(100_000);
const topicCount = 100;
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });