feat: implement a simple connection management

This commit is contained in:
fryorcraken.eth 2022-09-05 21:48:27 +10:00
parent 9dd00fc026
commit ac48792e0e
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
5 changed files with 42 additions and 13 deletions

View File

@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Added
- Simple connection management that selects the most recent connection for store, light push and filter requests.
## [0.25.0] - 2022-09-5
### Changed

View File

@ -0,0 +1,24 @@
import { Connection } from "@libp2p/interface-connection";
export function selectConnection(
connections: Connection[]
): Connection | undefined {
if (!connections.length) return;
if (connections.length === 1) return connections[0];
let latestConnection: Connection | undefined;
connections.forEach((connection) => {
if (connection.stat.status === "OPEN") {
if (!latestConnection) {
latestConnection = connection;
} else if (
connection.stat.timeline.open > latestConnection.stat.timeline.open
) {
latestConnection = connection;
}
}
});
return latestConnection;
}

View File

@ -10,6 +10,7 @@ import type { Libp2p } from "libp2p";
import { WakuMessage as WakuMessageProto } from "../../proto/message";
import { DefaultPubSubTopic } from "../constants";
import { selectConnection } from "../select_connection";
import { getPeersForProtocol, selectRandomPeer } from "../select_peer";
import { hexToBytes } from "../utils";
import { DecryptionMethod, WakuMessage } from "../waku_message";
@ -216,17 +217,14 @@ export class WakuFilter {
}
}
// Should be able to remove any at next libp2p release >0.37.3
private async newStream(peer: Peer): Promise<Stream> {
const connections = this.libp2p.connectionManager.getConnections(peer.id);
if (!connections) {
const connection = selectConnection(connections);
if (!connection) {
throw new Error("Failed to get a connection to the peer");
}
// TODO: Appropriate connection selection
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore: tsc is confused by the @libp2p/interface-connection type to use
return connections[0].newStream(FilterCodec);
return connection.newStream(FilterCodec);
}
private async getPeer(peerId?: PeerId): Promise<Peer> {

View File

@ -8,6 +8,7 @@ import { Uint8ArrayList } from "uint8arraylist";
import { PushResponse } from "../../proto/light_push";
import { DefaultPubSubTopic } from "../constants";
import { selectConnection } from "../select_connection";
import { getPeersForProtocol, selectRandomPeer } from "../select_peer";
import { WakuMessage } from "../waku_message";
@ -59,10 +60,11 @@ export class WakuLightPush {
throw "Peer does not register waku light push protocol";
const connections = this.libp2p.connectionManager.getConnections(peer.id);
if (!connections) throw "Failed to get a connection to the peer";
const connection = selectConnection(connections);
// TODO: Appropriate connection management
const stream = await connections[0].newStream(LightPushCodec);
if (!connection) throw "Failed to get a connection to the peer";
const stream = await connection.newStream(LightPushCodec);
try {
const pubSubTopic = opts?.pubSubTopic
? opts.pubSubTopic

View File

@ -10,6 +10,7 @@ import { Uint8ArrayList } from "uint8arraylist";
import * as protoV2Beta4 from "../../proto/store_v2beta4";
import { HistoryResponse } from "../../proto/store_v2beta4";
import { DefaultPubSubTopic, StoreCodecs } from "../constants";
import { selectConnection } from "../select_connection";
import { getPeersForProtocol, selectRandomPeer } from "../select_peer";
import { hexToBytes } from "../utils";
import { DecryptionMethod, WakuMessage } from "../waku_message";
@ -171,8 +172,9 @@ export class WakuStore {
Object.assign(opts, { storeCodec });
const connections = this.libp2p.connectionManager.getConnections(peer.id);
if (!connections || !connections.length)
throw "Failed to get a connection to the peer";
const connection = selectConnection(connections);
if (!connection) throw "Failed to get a connection to the peer";
const decryptionKeys = Array.from(this.decryptionKeys).map(
([key, { method, contentTopics }]) => {
@ -199,8 +201,7 @@ export class WakuStore {
const messages: WakuMessage[] = [];
let cursor = undefined;
while (true) {
// TODO: Some connection selection logic?
const stream = await connections[0].newStream(storeCodec);
const stream = await connection.newStream(storeCodec);
const queryOpts = Object.assign(opts, { cursor });
const historyRpcQuery = HistoryRPC.createQuery(queryOpts);
dbg("Querying store peer", connections[0].remoteAddr.toString());