diff --git a/CHANGELOG.md b/CHANGELOG.md index a31c267449..a3886e5d51 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Simple connection management that selects the most recent connection for store, light push and filter requests. + ## [0.25.0] - 2022-09-5 ### Changed diff --git a/src/lib/select_connection.ts b/src/lib/select_connection.ts new file mode 100644 index 0000000000..05977c7ab5 --- /dev/null +++ b/src/lib/select_connection.ts @@ -0,0 +1,24 @@ +import { Connection } from "@libp2p/interface-connection"; + +export function selectConnection( + connections: Connection[] +): Connection | undefined { + if (!connections.length) return; + if (connections.length === 1) return connections[0]; + + let latestConnection: Connection | undefined; + + connections.forEach((connection) => { + if (connection.stat.status === "OPEN") { + if (!latestConnection) { + latestConnection = connection; + } else if ( + connection.stat.timeline.open > latestConnection.stat.timeline.open + ) { + latestConnection = connection; + } + } + }); + + return latestConnection; +} diff --git a/src/lib/waku_filter/index.ts b/src/lib/waku_filter/index.ts index bef0827954..c63bfd0aae 100644 --- a/src/lib/waku_filter/index.ts +++ b/src/lib/waku_filter/index.ts @@ -10,6 +10,7 @@ import type { Libp2p } from "libp2p"; import { WakuMessage as WakuMessageProto } from "../../proto/message"; import { DefaultPubSubTopic } from "../constants"; +import { selectConnection } from "../select_connection"; import { getPeersForProtocol, selectRandomPeer } from "../select_peer"; import { hexToBytes } from "../utils"; import { DecryptionMethod, WakuMessage } from "../waku_message"; @@ -216,17 +217,14 @@ export class WakuFilter { } } - // Should be able to remove any at next libp2p release >0.37.3 private async newStream(peer: Peer): Promise { const connections = this.libp2p.connectionManager.getConnections(peer.id); - if (!connections) { + const connection = selectConnection(connections); + if (!connection) { throw new Error("Failed to get a connection to the peer"); } - // TODO: Appropriate connection selection - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore: tsc is confused by the @libp2p/interface-connection type to use - return connections[0].newStream(FilterCodec); + return connection.newStream(FilterCodec); } private async getPeer(peerId?: PeerId): Promise { diff --git a/src/lib/waku_light_push/index.ts b/src/lib/waku_light_push/index.ts index 623069d67e..101bb514f0 100644 --- a/src/lib/waku_light_push/index.ts +++ b/src/lib/waku_light_push/index.ts @@ -8,6 +8,7 @@ import { Uint8ArrayList } from "uint8arraylist"; import { PushResponse } from "../../proto/light_push"; import { DefaultPubSubTopic } from "../constants"; +import { selectConnection } from "../select_connection"; import { getPeersForProtocol, selectRandomPeer } from "../select_peer"; import { WakuMessage } from "../waku_message"; @@ -59,10 +60,11 @@ export class WakuLightPush { throw "Peer does not register waku light push protocol"; const connections = this.libp2p.connectionManager.getConnections(peer.id); - if (!connections) throw "Failed to get a connection to the peer"; + const connection = selectConnection(connections); - // TODO: Appropriate connection management - const stream = await connections[0].newStream(LightPushCodec); + if (!connection) throw "Failed to get a connection to the peer"; + + const stream = await connection.newStream(LightPushCodec); try { const pubSubTopic = opts?.pubSubTopic ? opts.pubSubTopic diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index 19534a7584..d9486516be 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -10,6 +10,7 @@ import { Uint8ArrayList } from "uint8arraylist"; import * as protoV2Beta4 from "../../proto/store_v2beta4"; import { HistoryResponse } from "../../proto/store_v2beta4"; import { DefaultPubSubTopic, StoreCodecs } from "../constants"; +import { selectConnection } from "../select_connection"; import { getPeersForProtocol, selectRandomPeer } from "../select_peer"; import { hexToBytes } from "../utils"; import { DecryptionMethod, WakuMessage } from "../waku_message"; @@ -171,8 +172,9 @@ export class WakuStore { Object.assign(opts, { storeCodec }); const connections = this.libp2p.connectionManager.getConnections(peer.id); - if (!connections || !connections.length) - throw "Failed to get a connection to the peer"; + const connection = selectConnection(connections); + + if (!connection) throw "Failed to get a connection to the peer"; const decryptionKeys = Array.from(this.decryptionKeys).map( ([key, { method, contentTopics }]) => { @@ -199,8 +201,7 @@ export class WakuStore { const messages: WakuMessage[] = []; let cursor = undefined; while (true) { - // TODO: Some connection selection logic? - const stream = await connections[0].newStream(storeCodec); + const stream = await connection.newStream(storeCodec); const queryOpts = Object.assign(opts, { cursor }); const historyRpcQuery = HistoryRPC.createQuery(queryOpts); dbg("Querying store peer", connections[0].remoteAddr.toString());