feat(metadata): use error codes (#1904)

This commit is contained in:
Danish Arora 2024-03-12 16:40:08 +05:30 committed by GitHub
parent 1e86c3d63e
commit 1882023c58
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 138 additions and 72 deletions

View File

@ -6,7 +6,7 @@ import {
IMessage,
Libp2p,
ProtocolCreateOptions,
SendError
ProtocolError
} from "@waku/interfaces";
import { PushResponse } from "@waku/proto";
import { isMessageSizeUnderCap } from "@waku/utils";
@ -32,7 +32,7 @@ type PreparePushMessageResult =
}
| {
query: null;
error: SendError;
error: ProtocolError;
};
type CoreSendResult =
@ -66,12 +66,12 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
try {
if (!message.payload || message.payload.length === 0) {
log.error("Failed to send waku light push: payload is empty");
return { query: null, error: SendError.EMPTY_PAYLOAD };
return { query: null, error: ProtocolError.EMPTY_PAYLOAD };
}
if (!(await isMessageSizeUnderCap(encoder, message))) {
log.error("Failed to send waku light push: message is bigger than 1MB");
return { query: null, error: SendError.SIZE_TOO_BIG };
return { query: null, error: ProtocolError.SIZE_TOO_BIG };
}
const protoMessage = await encoder.toProtoObj(message);
@ -79,7 +79,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
log.error("Failed to encode to protoMessage, aborting push");
return {
query: null,
error: SendError.ENCODE_FAILED
error: ProtocolError.ENCODE_FAILED
};
}
@ -90,7 +90,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
return {
query: null,
error: SendError.GENERIC_FAIL
error: ProtocolError.GENERIC_FAIL
};
}
}
@ -126,7 +126,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
return {
success: null,
failure: {
error: SendError.REMOTE_PEER_FAULT,
error: ProtocolError.REMOTE_PEER_FAULT,
peerId: peer.id
}
};
@ -146,7 +146,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
return {
success: null,
failure: {
error: SendError.GENERIC_FAIL,
error: ProtocolError.GENERIC_FAIL,
peerId: peer.id
}
};
@ -165,7 +165,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
return {
success: null,
failure: {
error: SendError.DECODE_FAILED,
error: ProtocolError.DECODE_FAILED,
peerId: peer.id
}
};
@ -176,7 +176,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
return {
success: null,
failure: {
error: SendError.REMOTE_PEER_FAULT,
error: ProtocolError.REMOTE_PEER_FAULT,
peerId: peer.id
}
};
@ -187,7 +187,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
return {
success: null,
failure: {
error: SendError.REMOTE_PEER_REJECTED,
error: ProtocolError.REMOTE_PEER_REJECTED,
peerId: peer.id
}
};

View File

@ -1,10 +1,12 @@
import type { PeerId } from "@libp2p/interface";
import { IncomingStreamData } from "@libp2p/interface";
import type {
IMetadata,
Libp2pComponents,
PeerIdStr,
ShardInfo
import {
type IMetadata,
type Libp2pComponents,
type PeerIdStr,
ProtocolError,
QueryResult,
type ShardInfo
} from "@waku/interfaces";
import { proto_metadata } from "@waku/proto";
import { encodeRelayShard, Logger, shardInfoToPubsubTopics } from "@waku/utils";
@ -21,7 +23,7 @@ export const MetadataCodec = "/vac/waku/metadata/1.0.0";
class Metadata extends BaseProtocol implements IMetadata {
private libp2pComponents: Libp2pComponents;
handshakesConfirmed: Set<PeerIdStr> = new Set();
handshakesConfirmed: Map<PeerIdStr, ShardInfo> = new Map();
constructor(
public shardInfo: ShardInfo,
@ -57,13 +59,13 @@ class Metadata extends BaseProtocol implements IMetadata {
async (source) => await all(source)
);
const remoteShardInfoResponse =
this.decodeMetadataResponse(encodedResponse);
const { error, shardInfo } = this.decodeMetadataResponse(encodedResponse);
await this.savePeerShardInfo(
connection.remotePeer,
remoteShardInfoResponse
);
if (error) {
return;
}
await this.savePeerShardInfo(connection.remotePeer, shardInfo);
} catch (error) {
log.error("Error handling metadata request", error);
}
@ -72,12 +74,15 @@ class Metadata extends BaseProtocol implements IMetadata {
/**
* Make a metadata query to a peer
*/
async query(peerId: PeerId): Promise<ShardInfo> {
async query(peerId: PeerId): Promise<QueryResult> {
const request = proto_metadata.WakuMetadataRequest.encode(this.shardInfo);
const peer = await this.peerStore.get(peerId);
if (!peer) {
throw new Error(`Peer ${peerId.toString()} not found`);
return {
shardInfo: null,
error: ProtocolError.NO_PEER_AVAILABLE
};
}
const stream = await this.getStream(peer);
@ -90,22 +95,38 @@ class Metadata extends BaseProtocol implements IMetadata {
async (source) => await all(source)
);
const decodedResponse = this.decodeMetadataResponse(encodedResponse);
const { error, shardInfo } = this.decodeMetadataResponse(encodedResponse);
await this.savePeerShardInfo(peerId, decodedResponse);
if (error) {
return {
shardInfo: null,
error
};
}
return decodedResponse;
await this.savePeerShardInfo(peerId, shardInfo);
return {
shardInfo,
error: null
};
}
public async confirmOrAttemptHandshake(peerId: PeerId): Promise<void> {
if (this.handshakesConfirmed.has(peerId.toString())) return;
public async confirmOrAttemptHandshake(peerId: PeerId): Promise<QueryResult> {
const shardInfo = this.handshakesConfirmed.get(peerId.toString());
if (shardInfo) {
return {
shardInfo,
error: null
};
}
await this.query(peerId);
return;
return await this.query(peerId);
}
private decodeMetadataResponse(encodedResponse: Uint8ArrayList[]): ShardInfo {
private decodeMetadataResponse(
encodedResponse: Uint8ArrayList[]
): QueryResult {
const bytes = new Uint8ArrayList();
encodedResponse.forEach((chunk) => {
@ -115,9 +136,18 @@ class Metadata extends BaseProtocol implements IMetadata {
bytes
) as ShardInfo;
if (!response) log.error("Error decoding metadata response");
if (!response) {
log.error("Error decoding metadata response");
return {
shardInfo: null,
error: ProtocolError.DECODE_FAILED
};
}
return response;
return {
shardInfo: response,
error: null
};
}
private async savePeerShardInfo(
@ -131,7 +161,7 @@ class Metadata extends BaseProtocol implements IMetadata {
}
});
this.handshakesConfirmed.add(peerId.toString());
this.handshakesConfirmed.set(peerId.toString(), shardInfo);
}
}

View File

@ -1,11 +1,25 @@
import type { PeerId } from "@libp2p/interface";
import type { ShardInfo } from "./enr.js";
import type { IBaseProtocolCore, ShardingParams } from "./protocols.js";
import type {
IBaseProtocolCore,
ProtocolError,
ShardingParams
} from "./protocols.js";
export type QueryResult =
| {
shardInfo: ShardInfo;
error: null;
}
| {
shardInfo: null;
error: ProtocolError;
};
// IMetadata always has shardInfo defined while it is optionally undefined in IBaseProtocol
export interface IMetadata extends Omit<IBaseProtocolCore, "shardInfo"> {
shardInfo: ShardingParams;
confirmOrAttemptHandshake(peerId: PeerId): Promise<void>;
query(peerId: PeerId): Promise<ShardInfo | undefined>;
confirmOrAttemptHandshake(peerId: PeerId): Promise<QueryResult>;
query(peerId: PeerId): Promise<QueryResult>;
}

View File

@ -101,7 +101,7 @@ export type Callback<T extends IDecodedMessage> = (
msg: T
) => void | Promise<void>;
export enum SendError {
export enum ProtocolError {
/** Could not determine the origin of the fault. Best to check connectivity and try again */
GENERIC_FAIL = "Generic error",
/**
@ -150,7 +150,7 @@ export enum SendError {
}
export interface Failure {
error: SendError;
error: ProtocolError;
peerId?: PeerId;
}

View File

@ -20,8 +20,8 @@ import {
IRelay,
Libp2p,
ProtocolCreateOptions,
ProtocolError,
PubsubTopic,
SendError,
SendResult
} from "@waku/interfaces";
import { isWireSizeUnderCap, toAsyncIterator } from "@waku/utils";
@ -109,7 +109,7 @@ class Relay implements IRelay {
successes,
failures: [
{
error: SendError.TOPIC_NOT_CONFIGURED
error: ProtocolError.TOPIC_NOT_CONFIGURED
}
]
};
@ -122,7 +122,7 @@ class Relay implements IRelay {
successes,
failures: [
{
error: SendError.ENCODE_FAILED
error: ProtocolError.ENCODE_FAILED
}
]
};
@ -134,7 +134,7 @@ class Relay implements IRelay {
successes,
failures: [
{
error: SendError.SIZE_TOO_BIG
error: ProtocolError.SIZE_TOO_BIG
}
]
};

View File

@ -7,7 +7,7 @@ import {
type IMessage,
type Libp2p,
type ProtocolCreateOptions,
SendError,
ProtocolError,
type SendResult
} from "@waku/interfaces";
import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils";
@ -37,7 +37,7 @@ export class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
return {
failures: [
{
error: SendError.TOPIC_NOT_CONFIGURED
error: ProtocolError.TOPIC_NOT_CONFIGURED
}
],
successes: []
@ -48,7 +48,7 @@ export class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
if (!peers.length) {
return {
successes,
failures: [{ error: SendError.NO_PEER_AVAILABLE }]
failures: [{ error: ProtocolError.NO_PEER_AVAILABLE }]
};
}
@ -69,7 +69,7 @@ export class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
}
} else {
log.error("Failed to send message to peer", result.reason);
failures.push({ error: SendError.GENERIC_FAIL });
failures.push({ error: ProtocolError.GENERIC_FAIL });
// TODO: handle renewing faulty peers with new peers (https://github.com/waku-org/js-waku/issues/1463)
}
}

View File

@ -3,7 +3,7 @@ import {
DefaultPubsubTopic,
IRateLimitProof,
LightNode,
SendError
ProtocolError
} from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
@ -97,7 +97,7 @@ const runTests = (strictNodeCheck: boolean): void => {
expect(pushResponse.successes.length).to.eq(0);
console.log("validated 1");
expect(pushResponse.failures?.map((failure) => failure.error)).to.include(
SendError.EMPTY_PAYLOAD
ProtocolError.EMPTY_PAYLOAD
);
console.log("validated 2");
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
@ -190,7 +190,7 @@ const runTests = (strictNodeCheck: boolean): void => {
expect(pushResponse.successes.length).to.eq(0);
expect(
pushResponse.failures?.map((failure) => failure.error)
).to.include(SendError.REMOTE_PEER_REJECTED);
).to.include(ProtocolError.REMOTE_PEER_REJECTED);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
false
);
@ -262,7 +262,7 @@ const runTests = (strictNodeCheck: boolean): void => {
});
expect(pushResponse.successes.length).to.eq(0);
expect(pushResponse.failures?.map((failure) => failure.error)).to.include(
SendError.SIZE_TOO_BIG
ProtocolError.SIZE_TOO_BIG
);
expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
false

View File

@ -3,7 +3,7 @@ import {
DefaultPubsubTopic,
IRateLimitProof,
LightNode,
SendError
ProtocolError
} from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai";
@ -85,7 +85,7 @@ describe("Waku Light Push: Single Node", function () {
expect(pushResponse.successes.length).to.eq(0);
expect(pushResponse.failures?.map((failure) => failure.error)).to.include(
SendError.EMPTY_PAYLOAD
ProtocolError.EMPTY_PAYLOAD
);
expect(await messageCollector.waitForMessages(1)).to.eq(false);
});
@ -167,7 +167,7 @@ describe("Waku Light Push: Single Node", function () {
} else {
expect(pushResponse.successes.length).to.eq(0);
expect(pushResponse.failures?.map((failure) => failure.error)).to.include(
SendError.REMOTE_PEER_REJECTED
ProtocolError.REMOTE_PEER_REJECTED
);
expect(await messageCollector.waitForMessages(1)).to.eq(false);
}
@ -234,7 +234,7 @@ describe("Waku Light Push: Single Node", function () {
});
expect(pushResponse.successes.length).to.eq(0);
expect(pushResponse.failures?.map((failure) => failure.error)).to.include(
SendError.SIZE_TOO_BIG
ProtocolError.SIZE_TOO_BIG
);
expect(await messageCollector.waitForMessages(1)).to.eq(false);
});

View File

@ -52,11 +52,22 @@ describe("Metadata Protocol", function () {
await waku.start();
await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec);
const shardInfoRes =
await waku.libp2p.services.metadata?.query(nwaku1PeerId);
if (!waku.libp2p.services.metadata) {
expect(waku.libp2p.services.metadata).to.not.be.undefined;
return;
}
const { error, shardInfo: shardInfoRes } =
await waku.libp2p.services.metadata.query(nwaku1PeerId);
if (error) {
expect(error).to.be.null;
return;
}
expect(shardInfoRes).to.not.be.undefined;
expect(shardInfoRes?.clusterId).to.equal(shardInfo.clusterId);
expect(shardInfoRes?.shards).to.deep.equal(shardInfo.shards);
expect(shardInfoRes.clusterId).to.equal(shardInfo.clusterId);
expect(shardInfoRes.shards).to.deep.equal(shardInfo.shards);
const activeConnections = waku.libp2p.getConnections();
expect(activeConnections.length).to.equal(1);
@ -89,11 +100,22 @@ describe("Metadata Protocol", function () {
await waku.start();
await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec);
const shardInfoRes =
await waku.libp2p.services.metadata?.query(nwaku1PeerId);
if (!waku.libp2p.services.metadata) {
expect(waku.libp2p.services.metadata).to.not.be.undefined;
return;
}
const { error, shardInfo: shardInfoRes } =
await waku.libp2p.services.metadata.query(nwaku1PeerId);
if (error) {
expect(error).to.be.null;
return;
}
expect(shardInfoRes).to.not.be.undefined;
expect(shardInfoRes?.clusterId).to.equal(shardInfo1.clusterId);
expect(shardInfoRes?.shards).to.deep.equal(shardInfo1.shards);
expect(shardInfoRes.clusterId).to.equal(shardInfo1.clusterId);
expect(shardInfoRes.shards).to.deep.equal(shardInfo1.shards);
const activeConnections = waku.libp2p.getConnections();
expect(activeConnections.length).to.equal(1);

View File

@ -1,5 +1,5 @@
import { createEncoder } from "@waku/core";
import { IRateLimitProof, RelayNode, SendError } from "@waku/interfaces";
import { IRateLimitProof, ProtocolError, RelayNode } from "@waku/interfaces";
import { createRelayNode } from "@waku/sdk/relay";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
@ -135,7 +135,7 @@ describe("Waku Relay, Publish", function () {
payload: utf8ToBytes("")
});
expect(pushResponse.failures?.[0].error).to.eq(
SendError.TOPIC_NOT_CONFIGURED
ProtocolError.TOPIC_NOT_CONFIGURED
);
await delay(400);
expect(await messageCollector.waitForMessages(1)).to.eq(false);
@ -148,7 +148,7 @@ describe("Waku Relay, Publish", function () {
});
expect(pushResponse.successes.length).to.eq(0);
expect(pushResponse.failures?.map((failure) => failure.error)).to.include(
SendError.SIZE_TOO_BIG
ProtocolError.SIZE_TOO_BIG
);
await delay(400);
expect(await messageCollector.waitForMessages(1)).to.eq(false);

View File

@ -1,7 +1,7 @@
import {
LightNode,
ProtocolError,
Protocols,
SendError,
ShardInfo,
SingleShardInfo
} from "@waku/interfaces";
@ -93,7 +93,7 @@ describe("Static Sharding: Running Nodes", function () {
}
const errors = failures?.map((failure) => failure.error);
expect(errors).to.include(SendError.TOPIC_NOT_CONFIGURED);
expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED);
});
});

View File

@ -100,7 +100,7 @@ export const pubsubTopicToSingleShardInfo = (
};
//TODO: move part of BaseProtocol instead of utils
// return `SendError.TOPIC_NOT_CONFIGURED` instead of throwing
// return `ProtocolError.TOPIC_NOT_CONFIGURED` instead of throwing
export function ensurePubsubTopicIsConfigured(
pubsubTopic: PubsubTopic,
configuredTopics: PubsubTopic[]