mirror of https://github.com/status-im/js-waku.git
clean up waitForRemotePeer, split metadata await from event and optimise, decouple from protocol implementations
This commit is contained in:
parent
ad56eae445
commit
7580eeb110
|
@ -13,7 +13,7 @@ export * as waku_light_push from "./lib/light_push/index.js";
|
|||
export { LightPushCodec, LightPushCore } from "./lib/light_push/index.js";
|
||||
|
||||
export * as waku_store from "./lib/store/index.js";
|
||||
export { StoreCore } from "./lib/store/index.js";
|
||||
export { StoreCore, StoreCodec } from "./lib/store/index.js";
|
||||
|
||||
export { ConnectionManager } from "./lib/connection_manager.js";
|
||||
|
||||
|
|
|
@ -1,13 +1,10 @@
|
|||
import type { IdentifyResult } from "@libp2p/interface";
|
||||
import type {
|
||||
IBaseProtocolCore,
|
||||
IMetadata,
|
||||
IRelay,
|
||||
Waku
|
||||
} from "@waku/interfaces";
|
||||
import { FilterCodecs, LightPushCodec, StoreCodec } from "@waku/core";
|
||||
import type { IRelay, Libp2p, Waku } from "@waku/interfaces";
|
||||
import { Protocols } from "@waku/interfaces";
|
||||
import { Logger } from "@waku/utils";
|
||||
import { pEvent } from "p-event";
|
||||
|
||||
const log = new Logger("wait-for-remote-peer");
|
||||
|
||||
/**
|
||||
|
@ -35,42 +32,48 @@ export async function waitForRemotePeer(
|
|||
timeoutMs?: number
|
||||
): Promise<void> {
|
||||
protocols = protocols ?? getEnabledProtocols(waku);
|
||||
const connections = waku.libp2p.getConnections();
|
||||
|
||||
if (!waku.isStarted()) return Promise.reject("Waku node is not started");
|
||||
if (!waku.isStarted()) {
|
||||
throw Error("Waku node is not started");
|
||||
}
|
||||
|
||||
if (connections.length > 0) {
|
||||
const success = await waitForMetadata(waku.libp2p);
|
||||
|
||||
if (success) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
const promises = [];
|
||||
|
||||
if (protocols.includes(Protocols.Relay)) {
|
||||
if (!waku.relay)
|
||||
throw new Error("Cannot wait for Relay peer: protocol not mounted");
|
||||
if (!waku.relay) {
|
||||
throw Error("Cannot wait for Relay peer: protocol not mounted");
|
||||
}
|
||||
promises.push(waitForGossipSubPeerInMesh(waku.relay));
|
||||
}
|
||||
|
||||
if (protocols.includes(Protocols.Store)) {
|
||||
if (!waku.store)
|
||||
throw new Error("Cannot wait for Store peer: protocol not mounted");
|
||||
promises.push(
|
||||
waitForConnectedPeer(waku.store.protocol, waku.libp2p.services.metadata)
|
||||
);
|
||||
if (!waku.store) {
|
||||
throw Error("Cannot wait for Store peer: protocol not mounted");
|
||||
}
|
||||
promises.push(waitForConnectedPeer(StoreCodec, waku.libp2p));
|
||||
}
|
||||
|
||||
if (protocols.includes(Protocols.LightPush)) {
|
||||
if (!waku.lightPush)
|
||||
throw new Error("Cannot wait for LightPush peer: protocol not mounted");
|
||||
promises.push(
|
||||
waitForConnectedPeer(
|
||||
waku.lightPush.protocol,
|
||||
waku.libp2p.services.metadata
|
||||
)
|
||||
);
|
||||
if (!waku.lightPush) {
|
||||
throw Error("Cannot wait for LightPush peer: protocol not mounted");
|
||||
}
|
||||
promises.push(waitForConnectedPeer(LightPushCodec, waku.libp2p));
|
||||
}
|
||||
|
||||
if (protocols.includes(Protocols.Filter)) {
|
||||
if (!waku.filter)
|
||||
if (!waku.filter) {
|
||||
throw new Error("Cannot wait for Filter peer: protocol not mounted");
|
||||
promises.push(
|
||||
waitForConnectedPeer(waku.filter.protocol, waku.libp2p.services.metadata)
|
||||
);
|
||||
}
|
||||
promises.push(waitForConnectedPeer(FilterCodecs.SUBSCRIBE, waku.libp2p));
|
||||
}
|
||||
|
||||
if (timeoutMs) {
|
||||
|
@ -84,71 +87,86 @@ export async function waitForRemotePeer(
|
|||
}
|
||||
}
|
||||
|
||||
type EventListener = (_: CustomEvent<IdentifyResult>) => void;
|
||||
|
||||
/**
|
||||
* Wait for a peer with the given protocol to be connected.
|
||||
* If sharding is enabled on the node, it will also wait for the peer to be confirmed by the metadata service.
|
||||
*/
|
||||
async function waitForConnectedPeer(
|
||||
protocol: IBaseProtocolCore,
|
||||
metadataService?: IMetadata
|
||||
codec: string,
|
||||
libp2p: Libp2p
|
||||
): Promise<void> {
|
||||
const codec = protocol.multicodec;
|
||||
const peers = await protocol.connectedPeers();
|
||||
log.info(`Waiting for ${codec} peer.`);
|
||||
|
||||
if (peers.length) {
|
||||
if (!metadataService) {
|
||||
log.info(`${codec} peer found: `, peers[0].id.toString());
|
||||
return;
|
||||
}
|
||||
|
||||
// once a peer is connected, we need to confirm the metadata handshake with at least one of those peers if sharding is enabled
|
||||
try {
|
||||
await Promise.any(
|
||||
peers.map((peer) => metadataService.confirmOrAttemptHandshake(peer.id))
|
||||
);
|
||||
return;
|
||||
} catch (e) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
if ((e as any).code === "ERR_CONNECTION_BEING_CLOSED")
|
||||
log.error(
|
||||
`Connection with the peer was closed and possibly because it's on a different shard. Error: ${e}`
|
||||
);
|
||||
|
||||
log.error(`Error waiting for handshake confirmation: ${e}`);
|
||||
}
|
||||
}
|
||||
|
||||
log.info(`Waiting for ${codec} peer`);
|
||||
|
||||
// else we'll just wait for the next peer to connect
|
||||
await new Promise<void>((resolve) => {
|
||||
const cb = (evt: CustomEvent<IdentifyResult>): void => {
|
||||
const cb = (async (evt: CustomEvent<IdentifyResult>): Promise<void> => {
|
||||
if (evt.detail?.protocols?.includes(codec)) {
|
||||
if (metadataService) {
|
||||
metadataService
|
||||
.confirmOrAttemptHandshake(evt.detail.peerId)
|
||||
.then(() => {
|
||||
protocol.removeLibp2pEventListener("peer:identify", cb);
|
||||
resolve();
|
||||
})
|
||||
.catch((e) => {
|
||||
if (e.code === "ERR_CONNECTION_BEING_CLOSED")
|
||||
log.error(
|
||||
`Connection with the peer was closed and possibly because it's on a different shard. Error: ${e}`
|
||||
);
|
||||
const metadataService = libp2p.services.metadata;
|
||||
|
||||
log.error(`Error waiting for handshake confirmation: ${e}`);
|
||||
});
|
||||
} else {
|
||||
protocol.removeLibp2pEventListener("peer:identify", cb);
|
||||
if (!metadataService) {
|
||||
libp2p.removeEventListener("peer:identify", cb);
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
await metadataService.confirmOrAttemptHandshake(evt.detail.peerId);
|
||||
|
||||
libp2p.removeEventListener("peer:identify", cb);
|
||||
resolve();
|
||||
} catch (e) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
if ((e as any).code === "ERR_CONNECTION_BEING_CLOSED") {
|
||||
log.error(
|
||||
"Connection closed. Some peers can be on different shard."
|
||||
);
|
||||
}
|
||||
|
||||
log.error(`Error waiting for metadata: ${e}`);
|
||||
}
|
||||
}
|
||||
};
|
||||
protocol.addLibp2pEventListener("peer:identify", cb);
|
||||
}) as EventListener;
|
||||
|
||||
libp2p.addEventListener("peer:identify", cb);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits for the metadata from the remote peer.
|
||||
*/
|
||||
async function waitForMetadata(libp2p: Libp2p): Promise<boolean> {
|
||||
const connections = libp2p.getConnections();
|
||||
const metadataService = libp2p.services.metadata;
|
||||
|
||||
if (!connections.length || !metadataService) {
|
||||
log.info(
|
||||
`Skipping waitForMetadata due to missing connections:${connections.length} or metadataService:${!!metadataService}`
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
// confirm at least with one connected peer
|
||||
await Promise.any(
|
||||
connections
|
||||
.map((c) => c.remotePeer)
|
||||
.map((peer) => metadataService.confirmOrAttemptHandshake(peer))
|
||||
);
|
||||
|
||||
return true;
|
||||
} catch (e) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
if ((e as any).code === "ERR_CONNECTION_BEING_CLOSED") {
|
||||
log.error("Connection closed. Some peers can be on different shard.");
|
||||
}
|
||||
|
||||
log.error(`Error waiting for metadata: ${e}`);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
// TODO: move to @waku/relay and use in `node.connect()` API https://github.com/waku-org/js-waku/issues/1761
|
||||
/**
|
||||
* Wait for at least one peer with the given protocol to be connected and in the gossipsub
|
||||
|
|
Loading…
Reference in New Issue