fix: peer renewal connection drop & stream management (#2145)

* fix: peer renewal connection drop

* fix stream manager

* fix over iteration during stream creation

* remove timeout and use only open peers

* add logs

* refactor code, add tests

* debug test

* up debug

* remove debug, supress check for timestamps

* remove only

* add more debug

* remove debug

* remove check for timestamps
This commit is contained in:
Sasha 2024-10-01 12:54:55 +02:00 committed by GitHub
parent 3e821591c9
commit b93134a517
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 350 additions and 96 deletions

View File

@ -26,6 +26,7 @@
}
],
"@typescript-eslint/explicit-member-accessibility": "error",
"@typescript-eslint/no-unused-vars": ["warn", { "argsIgnorePattern": "^_" }],
"prettier/prettier": [
"error",
{

3
package-lock.json generated
View File

@ -39111,7 +39111,8 @@
"mocha": "^10.3.0",
"npm-run-all": "^4.1.5",
"process": "^0.11.10",
"rollup": "^4.12.0"
"rollup": "^4.12.0",
"sinon": "^18.0.0"
},
"engines": {
"node": ">=20"

View File

@ -92,6 +92,7 @@
"@types/uuid": "^9.0.8",
"@waku/build-utils": "*",
"chai": "^4.3.10",
"sinon": "^18.0.0",
"cspell": "^8.6.1",
"fast-check": "^3.19.0",
"ignore-loader": "^0.1.2",

View File

@ -0,0 +1,161 @@
import { Connection, Peer, PeerId, Stream } from "@libp2p/interface";
import { expect } from "chai";
import sinon from "sinon";
import { StreamManager } from "./stream_manager.js";
const MULTICODEC = "/test";
describe("StreamManager", () => {
let eventTarget: EventTarget;
let streamManager: StreamManager;
const mockPeer: Peer = {
id: {
toString() {
return "1";
}
}
} as unknown as Peer;
beforeEach(() => {
eventTarget = new EventTarget();
streamManager = new StreamManager(
MULTICODEC,
() => [],
eventTarget.addEventListener.bind(eventTarget)
);
});
it("should return usable stream attached to connection", async () => {
for (const writeStatus of ["ready", "writing"]) {
const con1 = createMockConnection();
con1.streams = [
createMockStream({ id: "1", protocol: MULTICODEC, writeStatus })
];
streamManager["getConnections"] = (_peerId: PeerId | undefined) => [con1];
const stream = await streamManager.getStream(mockPeer);
expect(stream).not.to.be.undefined;
expect(stream?.id).to.be.eq("1");
}
});
it("should throw if no connection provided", async () => {
streamManager["getConnections"] = (_peerId: PeerId | undefined) => [];
let error: Error | undefined;
try {
await streamManager.getStream(mockPeer);
} catch (e) {
error = e as Error;
}
expect(error).not.to.be.undefined;
expect(error?.message).to.include(mockPeer.id.toString());
expect(error?.message).to.include(MULTICODEC);
});
it("should create a new stream if no existing for protocol found", async () => {
for (const writeStatus of ["done", "closed", "closing"]) {
const con1 = createMockConnection();
con1.streams = [
createMockStream({ id: "1", protocol: MULTICODEC, writeStatus })
];
const newStreamSpy = sinon.spy(async (_protocol, _options) =>
createMockStream({
id: "2",
protocol: MULTICODEC,
writeStatus: "writable"
})
);
con1.newStream = newStreamSpy;
streamManager["getConnections"] = (_peerId: PeerId | undefined) => [con1];
const stream = await streamManager.getStream(mockPeer);
expect(stream).not.to.be.undefined;
expect(stream?.id).to.be.eq("2");
expect(newStreamSpy.calledOnce).to.be.true;
expect(newStreamSpy.calledWith(MULTICODEC)).to.be.true;
}
});
it("peer:update - should do nothing if another protocol hit", async () => {
const scheduleNewStreamSpy = sinon.spy();
streamManager["scheduleNewStream"] = scheduleNewStreamSpy;
eventTarget.dispatchEvent(
new CustomEvent("peer:update", { detail: { peer: { protocols: [] } } })
);
expect(scheduleNewStreamSpy.calledOnce).to.be.false;
});
it("peer:update - should schedule stream creation IF protocol hit AND no stream found on connection", async () => {
const scheduleNewStreamSpy = sinon.spy();
streamManager["scheduleNewStream"] = scheduleNewStreamSpy;
eventTarget.dispatchEvent(
new CustomEvent("peer:update", {
detail: { peer: { protocols: [MULTICODEC] } }
})
);
expect(scheduleNewStreamSpy.calledOnce).to.be.true;
});
it("peer:update - should not schedule stream creation IF protocol hit AND stream found on connection", async () => {
const con1 = createMockConnection();
con1.streams = [
createMockStream({
id: "1",
protocol: MULTICODEC,
writeStatus: "writable"
})
];
streamManager["getConnections"] = (_id) => [con1];
const scheduleNewStreamSpy = sinon.spy();
streamManager["scheduleNewStream"] = scheduleNewStreamSpy;
eventTarget.dispatchEvent(
new CustomEvent("peer:update", {
detail: { peer: { protocols: [MULTICODEC] } }
})
);
expect(scheduleNewStreamSpy.calledOnce).to.be.false;
});
});
type MockConnectionOptions = {
status?: string;
open?: number;
};
function createMockConnection(options: MockConnectionOptions = {}): Connection {
return {
status: options.status || "open",
timeline: {
open: options.open || 1
}
} as Connection;
}
type MockStreamOptions = {
id?: string;
protocol?: string;
writeStatus?: string;
};
function createMockStream(options: MockStreamOptions): Stream {
return {
id: options.id,
protocol: options.protocol,
writeStatus: options.writeStatus || "ready"
} as Stream;
}

View File

@ -1,47 +1,41 @@
import type { PeerUpdate, Stream } from "@libp2p/interface";
import type { Peer, PeerId } from "@libp2p/interface";
import { Libp2p } from "@waku/interfaces";
import type { Peer, PeerId, PeerUpdate, Stream } from "@libp2p/interface";
import type { Libp2p } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { selectConnection } from "./utils.js";
const CONNECTION_TIMEOUT = 5_000;
const RETRY_BACKOFF_BASE = 1_000;
const MAX_RETRIES = 3;
import { selectOpenConnection } from "./utils.js";
export class StreamManager {
private readonly streamPool: Map<string, Promise<Stream | void>>;
private readonly log: Logger;
private ongoingCreation: Set<string> = new Set();
private streamPool: Map<string, Promise<void>> = new Map();
public constructor(
public multicodec: string,
public getConnections: Libp2p["getConnections"],
public addEventListener: Libp2p["addEventListener"]
private multicodec: string,
private getConnections: Libp2p["getConnections"],
private addEventListener: Libp2p["addEventListener"]
) {
this.log = new Logger(`stream-manager:${multicodec}`);
this.streamPool = new Map();
this.addEventListener("peer:update", this.handlePeerUpdateStreamPool);
}
public async getStream(peer: Peer): Promise<Stream> {
const peerIdStr = peer.id.toString();
const streamPromise = this.streamPool.get(peerIdStr);
const peerId = peer.id.toString();
if (!streamPromise) {
return this.createStream(peer);
const scheduledStream = this.streamPool.get(peerId);
if (scheduledStream) {
this.streamPool.delete(peerId);
await scheduledStream;
}
this.streamPool.delete(peerIdStr);
this.prepareStream(peer);
const stream = this.getOpenStreamForCodec(peer.id);
try {
const stream = await streamPromise;
if (stream && stream.status !== "closed") {
return stream;
}
} catch (error) {
this.log.warn(`Failed to get stream for ${peerIdStr} -- `, error);
this.log.warn("Attempting to create a new stream for the peer");
if (stream) {
this.log.info(
`Found existing stream peerId=${peer.id.toString()} multicodec=${this.multicodec}`
);
return stream;
}
return this.createStream(peer);
@ -49,67 +43,112 @@ export class StreamManager {
private async createStream(peer: Peer, retries = 0): Promise<Stream> {
const connections = this.getConnections(peer.id);
const connection = selectConnection(connections);
const connection = selectOpenConnection(connections);
if (!connection) {
throw new Error("Failed to get a connection to the peer");
throw new Error(
`Failed to get a connection to the peer peerId=${peer.id.toString()} multicodec=${this.multicodec}`
);
}
let lastError: unknown;
let stream: Stream | undefined;
for (let i = 0; i < retries + 1; i++) {
try {
this.log.info(
`Attempting to create a stream for peerId=${peer.id.toString()} multicodec=${this.multicodec}`
);
stream = await connection.newStream(this.multicodec);
this.log.info(
`Created stream for peerId=${peer.id.toString()} multicodec=${this.multicodec}`
);
break;
} catch (error) {
lastError = error;
}
}
if (!stream) {
throw new Error(
`Failed to create a new stream for ${peer.id.toString()} -- ` +
lastError
);
}
return stream;
}
private async createStreamWithLock(peer: Peer): Promise<void> {
const peerId = peer.id.toString();
if (this.ongoingCreation.has(peerId)) {
this.log.info(
`Skipping creation of a stream due to lock for peerId=${peerId} multicodec=${this.multicodec}`
);
return;
}
try {
return await connection.newStream(this.multicodec);
this.ongoingCreation.add(peerId);
await this.createStream(peer);
} catch (error) {
if (retries < MAX_RETRIES) {
const backoff = RETRY_BACKOFF_BASE * Math.pow(2, retries);
await new Promise((resolve) => setTimeout(resolve, backoff));
return this.createStream(peer, retries + 1);
}
throw new Error(
`Failed to create a new stream for ${peer.id.toString()} -- ` + error
);
this.log.error(`Failed to createStreamWithLock:`, error);
} finally {
this.ongoingCreation.delete(peerId);
}
}
private prepareStream(peer: Peer): void {
const timeoutPromise = new Promise<void>((resolve) =>
setTimeout(resolve, CONNECTION_TIMEOUT)
);
const streamPromise = Promise.race([
this.createStream(peer),
timeoutPromise.then(() => {
throw new Error("Connection timeout");
})
]).catch((error) => {
this.log.error(
`Failed to prepare a new stream for ${peer.id.toString()} -- `,
error
);
});
this.streamPool.set(peer.id.toString(), streamPromise);
return;
}
private handlePeerUpdateStreamPool = (evt: CustomEvent<PeerUpdate>): void => {
const { peer } = evt.detail;
if (peer.protocols.includes(this.multicodec)) {
const isConnected = this.isConnectedTo(peer.id);
if (isConnected) {
this.log.info(`Preemptively opening a stream to ${peer.id.toString()}`);
this.prepareStream(peer);
} else {
const peerIdStr = peer.id.toString();
this.streamPool.delete(peerIdStr);
this.log.info(
`Removed pending stream for disconnected peer ${peerIdStr}`
);
}
if (!peer.protocols.includes(this.multicodec)) {
return;
}
const stream = this.getOpenStreamForCodec(peer.id);
if (stream) {
return;
}
this.scheduleNewStream(peer);
};
private isConnectedTo(peerId: PeerId): boolean {
private scheduleNewStream(peer: Peer): void {
this.log.info(
`Scheduling creation of a stream for peerId=${peer.id.toString()} multicodec=${this.multicodec}`
);
// abandon previous attempt
if (this.streamPool.has(peer.id.toString())) {
this.streamPool.delete(peer.id.toString());
}
this.streamPool.set(peer.id.toString(), this.createStreamWithLock(peer));
}
private getOpenStreamForCodec(peerId: PeerId): Stream | undefined {
const connections = this.getConnections(peerId);
return connections.some((connection) => connection.status === "open");
const connection = selectOpenConnection(connections);
if (!connection) {
return;
}
const stream = connection.streams.find(
(s) => s.protocol === this.multicodec
);
const isStreamUnusable = ["done", "closed", "closing"].includes(
stream?.writeStatus || ""
);
if (isStreamUnusable) {
return;
}
return stream;
}
}

View File

@ -0,0 +1,65 @@
import { Connection } from "@libp2p/interface";
import { expect } from "chai";
import { selectOpenConnection } from "./utils.js";
describe("selectOpenConnection", () => {
it("returns nothing if no connections present", () => {
const connection = selectOpenConnection([]);
expect(connection).to.be.undefined;
});
it("returns only open connection if one present", () => {
let expectedCon = createMockConnection({ id: "1", status: "closed" });
let actualCon = selectOpenConnection([expectedCon]);
expect(actualCon).to.be.undefined;
expectedCon = createMockConnection({ id: "1", status: "open" });
actualCon = selectOpenConnection([expectedCon]);
expect(actualCon).not.to.be.undefined;
expect(actualCon?.id).to.be.eq("1");
});
it("should return no connections if no open connection provided", () => {
const closedCon1 = createMockConnection({ status: "closed" });
const closedCon2 = createMockConnection({ status: "closed" });
const actualCon = selectOpenConnection([closedCon1, closedCon2]);
expect(actualCon).to.be.undefined;
});
it("should select older connection if present", () => {
const con1 = createMockConnection({
status: "open",
open: 10
});
const con2 = createMockConnection({
status: "open",
open: 15
});
const actualCon = selectOpenConnection([con1, con2]);
expect(actualCon).not.to.be.undefined;
expect(actualCon?.timeline.open).to.be.eq(15);
});
});
type MockConnectionOptions = {
id?: string;
status?: string;
open?: number;
};
function createMockConnection(options: MockConnectionOptions = {}): Connection {
return {
id: options.id,
status: options.status,
timeline: {
open: options.open
}
} as Connection;
}

View File

@ -1,22 +1,10 @@
import type { Connection } from "@libp2p/interface";
export function selectConnection(
export function selectOpenConnection(
connections: Connection[]
): Connection | undefined {
if (!connections.length) return;
if (connections.length === 1) return connections[0];
let latestConnection: Connection | undefined;
connections.forEach((connection) => {
if (connection.status === "open") {
if (!latestConnection) {
latestConnection = connection;
} else if (connection.timeline.open > latestConnection.timeline.open) {
latestConnection = connection;
}
}
});
return latestConnection;
return connections
.filter((c) => c.status === "open")
.sort((left, right) => right.timeline.open - left.timeline.open)
.at(0);
}

View File

@ -59,19 +59,16 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
public async renewPeer(peerToDisconnect: PeerId): Promise<Peer> {
this.log.info(`Renewing peer ${peerToDisconnect}`);
await this.connectionManager.dropConnection(peerToDisconnect);
const peer = (await this.findAndAddPeers(1))[0];
if (!peer) {
this.log.error(
"Failed to find a new peer to replace the disconnected one."
);
throw Error("Failed to find a new peer to replace the disconnected one.");
}
const updatedPeers = this.peers.filter(
(peer) => !peer.id.equals(peerToDisconnect)
);
this.updatePeers(updatedPeers);
await this.connectionManager.dropConnection(peerToDisconnect);
this.log.info(
`Peer ${peerToDisconnect} disconnected and removed from the peer list`

View File

@ -291,7 +291,8 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
messageCollector.verifyReceivedMessage(index, {
expectedContentTopic: topic,
expectedMessageText: `Message for Topic ${index + 1}`,
expectedPubsubTopic: TestPubsubTopic
expectedPubsubTopic: TestPubsubTopic,
checkTimestamp: false
});
});
});