feat: peer-exchange uses error codes (#1907)

* setup a generic protocol result type (DRY)

* metadata: use generic

* lightpush: use generic

* peer-exchange: use error codes + generic + update tests

* add issue link to skipped test

* tests: improve while loop readability
This commit is contained in:
Danish Arora 2024-03-13 19:33:50 +05:30 committed by GitHub
parent 5296bfbad8
commit 877fe1dc1d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 131 additions and 97 deletions

View File

@ -6,7 +6,8 @@ import {
IMessage,
Libp2p,
ProtocolCreateOptions,
ProtocolError
ProtocolError,
ProtocolResult
} from "@waku/interfaces";
import { PushResponse } from "@waku/proto";
import { isMessageSizeUnderCap } from "@waku/utils";
@ -25,25 +26,9 @@ const log = new Logger("light-push");
export const LightPushCodec = "/vac/waku/lightpush/2.0.0-beta1";
export { PushResponse };
type PreparePushMessageResult =
| {
query: PushRpc;
error: null;
}
| {
query: null;
error: ProtocolError;
};
type PreparePushMessageResult = ProtocolResult<"query", PushRpc>;
type CoreSendResult =
| {
success: null;
failure: Failure;
}
| {
success: PeerId;
failure: null;
};
type CoreSendResult = ProtocolResult<"success", PeerId, "failure", Failure>;
/**
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).

View File

@ -1,10 +1,11 @@
import { BaseProtocol } from "@waku/core/lib/base_protocol";
import { EnrDecoder } from "@waku/enr";
import type {
import {
IPeerExchange,
Libp2pComponents,
PeerExchangeQueryParams,
PeerInfo,
PeerExchangeResult,
ProtocolError,
PubsubTopic
} from "@waku/interfaces";
import { isDefined } from "@waku/utils";
@ -34,18 +35,18 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
/**
* Make a peer exchange query to a peer
*/
async query(
params: PeerExchangeQueryParams
): Promise<PeerInfo[] | undefined> {
async query(params: PeerExchangeQueryParams): Promise<PeerExchangeResult> {
const { numPeers } = params;
const rpcQuery = PeerExchangeRPC.createRequest({
numPeers: BigInt(numPeers)
});
const peer = await this.peerStore.get(params.peerId);
if (!peer) {
throw new Error(`Peer ${params.peerId.toString()} not found`);
return {
peerInfos: null,
error: ProtocolError.NO_PEER_AVAILABLE
};
}
const stream = await this.getStream(peer);
@ -65,15 +66,17 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
});
const { response } = PeerExchangeRPC.decode(bytes);
if (!response) {
log.error(
"PeerExchangeRPC message did not contains a `response` field"
);
return;
return {
peerInfos: null,
error: ProtocolError.EMPTY_PAYLOAD
};
}
return Promise.all(
const peerInfos = await Promise.all(
response.peerInfos
.map((peerInfo) => peerInfo.enr)
.filter(isDefined)
@ -81,9 +84,17 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
return { ENR: await EnrDecoder.fromRLP(enr) };
})
);
return {
peerInfos,
error: null
};
} catch (err) {
log.error("Failed to decode push reply", err);
return;
return {
peerInfos: null,
error: ProtocolError.DECODE_FAILED
};
}
}
}

View File

@ -7,7 +7,12 @@ import type {
PeerId,
PeerInfo
} from "@libp2p/interface";
import { Libp2pComponents, PubsubTopic, Tags } from "@waku/interfaces";
import {
Libp2pComponents,
PeerExchangeResult,
PubsubTopic,
Tags
} from "@waku/interfaces";
import { encodeRelayShard, Logger } from "@waku/utils";
import { PeerExchangeCodec, WakuPeerExchange } from "./waku_peer_exchange.js";
@ -160,15 +165,15 @@ export class PeerExchangeDiscovery
}, queryInterval * currentAttempt);
};
private async query(peerId: PeerId): Promise<void> {
const peerInfos = await this.peerExchange.query({
private async query(peerId: PeerId): Promise<PeerExchangeResult> {
const { error, peerInfos } = await this.peerExchange.query({
numPeers: DEFAULT_PEER_EXCHANGE_REQUEST_NODES,
peerId
});
if (!peerInfos) {
log.error("Peer exchange query failed, no peer info returned");
return;
if (error) {
log.error("Peer exchange query failed", error);
return { error, peerInfos: null };
}
for (const _peerInfo of peerInfos) {
@ -214,6 +219,8 @@ export class PeerExchangeDiscovery
})
);
}
return { error: null, peerInfos };
}
private abortQueriesForPeer(peerIdStr: string): void {

View File

@ -1,21 +1,13 @@
import type { PeerId } from "@libp2p/interface";
import type { ShardInfo } from "./enr.js";
import { type ShardInfo } from "./enr.js";
import type {
IBaseProtocolCore,
ProtocolError,
ProtocolResult,
ShardingParams
} from "./protocols.js";
export type QueryResult =
| {
shardInfo: ShardInfo;
error: null;
}
| {
shardInfo: null;
error: ProtocolError;
};
export type QueryResult = ProtocolResult<"shardInfo", ShardInfo>;
// IMetadata always has shardInfo defined while it is optionally undefined in IBaseProtocol
export interface IMetadata extends Omit<IBaseProtocolCore, "shardInfo"> {

View File

@ -3,12 +3,14 @@ import type { PeerStore } from "@libp2p/interface";
import type { ConnectionManager } from "@libp2p/interface-internal";
import { IEnr } from "./enr.js";
import { IBaseProtocolCore } from "./protocols.js";
import { IBaseProtocolCore, ProtocolResult } from "./protocols.js";
export interface IPeerExchange extends IBaseProtocolCore {
query(params: PeerExchangeQueryParams): Promise<PeerInfo[] | undefined>;
query(params: PeerExchangeQueryParams): Promise<PeerExchangeResult>;
}
export type PeerExchangeResult = ProtocolResult<"peerInfos", PeerInfo[]>;
export interface PeerExchangeQueryParams {
numPeers: number;
peerId: PeerId;

View File

@ -101,6 +101,27 @@ export type Callback<T extends IDecodedMessage> = (
msg: T
) => void | Promise<void>;
// SK = success key name
// SV = success value type
// EK = error key name (default: "error")
// EV = error value type (default: ProtocolError)
export type ProtocolResult<
SK extends string,
SV,
EK extends string = "error",
EV = ProtocolError
> =
| ({
[key in SK]: SV;
} & {
[key in EK]: null;
})
| ({
[key in SK]: null;
} & {
[key in EK]: EV;
});
export enum ProtocolError {
/** Could not determine the origin of the fault. Best to check connectivity and try again */
GENERIC_FAIL = "Generic error",
@ -146,7 +167,12 @@ export enum ProtocolError {
* is logged. Review message validity, or mitigation for `NO_PEER_AVAILABLE`
* or `DECODE_FAILED` can be used.
*/
REMOTE_PEER_REJECTED = "Remote peer rejected"
REMOTE_PEER_REJECTED = "Remote peer rejected",
/**
* The protocol request timed out without a response. This may be due to a connection issue.
* Mitigation can be: retrying after a given time period
*/
REQUEST_TIMEOUT = "Request timeout"
}
export interface Failure {

View File

@ -7,8 +7,8 @@ import {
WakuPeerExchange,
wakuPeerExchangeDiscovery
} from "@waku/discovery";
import type { LightNode, PeerInfo } from "@waku/interfaces";
import { createLightNode, Libp2pComponents } from "@waku/sdk";
import type { LightNode, PeerExchangeResult } from "@waku/interfaces";
import { createLightNode, Libp2pComponents, ProtocolError } from "@waku/sdk";
import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils";
import { expect } from "chai";
@ -38,7 +38,7 @@ describe("Peer Exchange Query", function () {
let components: Libp2pComponents;
let peerExchange: WakuPeerExchange;
let numPeersToRequest: number;
let peerInfos: PeerInfo[];
let queryResult: PeerExchangeResult;
beforeEachCustom(
this,
@ -85,57 +85,77 @@ describe("Peer Exchange Query", function () {
peerExchange = new WakuPeerExchange(components, pubsubTopic);
numPeersToRequest = 2;
// querying the connected peer
peerInfos = [];
const startTime = Date.now();
while (!peerInfos || peerInfos.length != numPeersToRequest) {
if (Date.now() - startTime > 100000) {
while (true) {
if (Date.now() - startTime > 100_000) {
log.error("Timeout reached, exiting the loop.");
break;
}
await delay(2000);
try {
peerInfos = await Promise.race([
queryResult = await Promise.race([
peerExchange.query({
peerId: nwaku3PeerId,
numPeers: numPeersToRequest
}) as Promise<PeerInfo[]>,
new Promise<PeerInfo[]>((resolve) =>
setTimeout(() => resolve([]), 5000)
}),
new Promise<PeerExchangeResult>((resolve) =>
setTimeout(
() =>
resolve({
peerInfos: null,
error: ProtocolError.REQUEST_TIMEOUT
}),
5000
)
)
]);
if (peerInfos.length === 0) {
log.warn("Query timed out, retrying...");
const hasErrors = queryResult?.error !== null;
const hasPeerInfos =
queryResult?.peerInfos &&
queryResult.peerInfos.length === numPeersToRequest;
if (hasErrors) {
if (queryResult.error === ProtocolError.REQUEST_TIMEOUT) {
log.warn("Query timed out, retrying...");
} else {
log.error("Error encountered, retrying...", queryResult.error);
}
continue;
}
if (!hasPeerInfos) {
log.warn(
"Peer info not available or does not match the requested number of peers, retrying..."
);
continue;
}
break;
} catch (error) {
log.warn("Error encountered, retrying...");
log.warn("Error encountered, retrying...", error);
}
}
},
120000
120_000
);
afterEachCustom(this, async () => {
await tearDownNodes([nwaku1, nwaku2, nwaku3], waku);
});
// slow and flaky in CI
// slow and flaky in CI: https://github.com/waku-org/js-waku/issues/1911
it.skip("connected peers and dial", async function () {
expect(peerInfos[0].ENR).to.not.be.null;
expect(peerInfos[0].ENR?.peerInfo?.multiaddrs).to.not.be.null;
expect(queryResult.error).to.be.null;
const peerWsMA = peerInfos[0].ENR?.peerInfo?.multiaddrs[2];
expect(queryResult.peerInfos?.[0].ENR).to.not.be.null;
expect(queryResult.peerInfos?.[0].ENR?.peerInfo?.multiaddrs).to.not.be.null;
const peerWsMA = queryResult.peerInfos?.[0].ENR?.peerInfo?.multiaddrs[2];
const localPeerWsMAAsString = peerWsMA
?.toString()
.replace(/\/ip4\/[\d.]+\//, "/ip4/127.0.0.1/");
const localPeerWsMA = multiaddr(localPeerWsMAAsString);
let foundNodePeerId: PeerId | undefined = undefined;
const doesPeerIdExistInResponse = peerInfos.some(({ ENR }) => {
const doesPeerIdExistInResponse = queryResult.peerInfos?.some(({ ENR }) => {
foundNodePeerId = ENR?.peerInfo?.id;
return ENR?.peerInfo?.id.toString() === nwaku1PeerId.toString();
});
@ -148,43 +168,34 @@ describe("Peer Exchange Query", function () {
await waitForRemotePeerWithCodec(waku, PeerExchangeCodec, foundNodePeerId);
});
// slow and flaky in CI
// slow and flaky in CI: https://github.com/waku-org/js-waku/issues/1911
it.skip("more peers than existing", async function () {
const peerInfo = await peerExchange.query({
const result = await peerExchange.query({
peerId: nwaku3PeerId,
numPeers: 5
});
expect(peerInfo?.length).to.be.eq(numPeersToRequest);
expect(result.error).to.be.null;
expect(result.peerInfos?.length).to.be.eq(numPeersToRequest);
});
// slow and flaky in CI
// slow and flaky in CI: https://github.com/waku-org/js-waku/issues/1911
it.skip("less peers than existing", async function () {
const peerInfo = await peerExchange.query({
const result = await peerExchange.query({
peerId: nwaku3PeerId,
numPeers: 1
});
expect(peerInfo?.length).to.be.eq(1);
expect(result.error).to.be.null;
expect(result.peerInfos?.length).to.be.eq(1);
});
// slow and flaky in CI
// slow and flaky in CI: https://github.com/waku-org/js-waku/issues/1911
it.skip("non connected peers", async function () {
// querying the non connected peer
try {
await peerExchange.query({
peerId: nwaku1PeerId,
numPeers: numPeersToRequest
});
throw new Error("Query on not connected peer succeeded unexpectedly.");
} catch (error) {
if (
!(
error instanceof Error &&
(error.message === "Not Found" ||
error.message === "Failed to get a connection to the peer")
)
) {
throw error;
}
}
const result = await peerExchange.query({
peerId: nwaku1PeerId,
numPeers: numPeersToRequest
});
expect(result.error).to.be.eq(ProtocolError.NO_PEER_AVAILABLE);
expect(result.peerInfos).to.be.null;
});
});