From 1882023c58c830fc31921fe786bce734536ac1da Mon Sep 17 00:00:00 2001 From: Danish Arora <35004822+danisharora099@users.noreply.github.com> Date: Tue, 12 Mar 2024 16:40:08 +0530 Subject: [PATCH] feat(metadata): use error codes (#1904) --- packages/core/src/lib/light_push/index.ts | 22 ++--- packages/core/src/lib/metadata/index.ts | 82 +++++++++++++------ packages/interfaces/src/metadata.ts | 20 ++++- packages/interfaces/src/protocols.ts | 4 +- packages/relay/src/index.ts | 8 +- packages/sdk/src/protocols/light_push.ts | 8 +- .../tests/tests/light-push/index.node.spec.ts | 8 +- .../light-push/single_node/index.node.spec.ts | 8 +- packages/tests/tests/metadata.spec.ts | 38 +++++++-- .../tests/tests/relay/publish.node.spec.ts | 6 +- .../tests/sharding/running_nodes.spec.ts | 4 +- packages/utils/src/common/sharding.ts | 2 +- 12 files changed, 138 insertions(+), 72 deletions(-) diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index 5c34160889..3e5dcd6be0 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -6,7 +6,7 @@ import { IMessage, Libp2p, ProtocolCreateOptions, - SendError + ProtocolError } from "@waku/interfaces"; import { PushResponse } from "@waku/proto"; import { isMessageSizeUnderCap } from "@waku/utils"; @@ -32,7 +32,7 @@ type PreparePushMessageResult = } | { query: null; - error: SendError; + error: ProtocolError; }; type CoreSendResult = @@ -66,12 +66,12 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore { try { if (!message.payload || message.payload.length === 0) { log.error("Failed to send waku light push: payload is empty"); - return { query: null, error: SendError.EMPTY_PAYLOAD }; + return { query: null, error: ProtocolError.EMPTY_PAYLOAD }; } if (!(await isMessageSizeUnderCap(encoder, message))) { log.error("Failed to send waku light push: message is bigger than 1MB"); - return { query: null, error: SendError.SIZE_TOO_BIG }; + return { query: null, error: ProtocolError.SIZE_TOO_BIG }; } const protoMessage = await encoder.toProtoObj(message); @@ -79,7 +79,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore { log.error("Failed to encode to protoMessage, aborting push"); return { query: null, - error: SendError.ENCODE_FAILED + error: ProtocolError.ENCODE_FAILED }; } @@ -90,7 +90,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore { return { query: null, - error: SendError.GENERIC_FAIL + error: ProtocolError.GENERIC_FAIL }; } } @@ -126,7 +126,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore { return { success: null, failure: { - error: SendError.REMOTE_PEER_FAULT, + error: ProtocolError.REMOTE_PEER_FAULT, peerId: peer.id } }; @@ -146,7 +146,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore { return { success: null, failure: { - error: SendError.GENERIC_FAIL, + error: ProtocolError.GENERIC_FAIL, peerId: peer.id } }; @@ -165,7 +165,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore { return { success: null, failure: { - error: SendError.DECODE_FAILED, + error: ProtocolError.DECODE_FAILED, peerId: peer.id } }; @@ -176,7 +176,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore { return { success: null, failure: { - error: SendError.REMOTE_PEER_FAULT, + error: ProtocolError.REMOTE_PEER_FAULT, peerId: peer.id } }; @@ -187,7 +187,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore { return { success: null, failure: { - error: SendError.REMOTE_PEER_REJECTED, + error: ProtocolError.REMOTE_PEER_REJECTED, peerId: peer.id } }; diff --git a/packages/core/src/lib/metadata/index.ts b/packages/core/src/lib/metadata/index.ts index 0860294197..4fed198168 100644 --- a/packages/core/src/lib/metadata/index.ts +++ b/packages/core/src/lib/metadata/index.ts @@ -1,10 +1,12 @@ import type { PeerId } from "@libp2p/interface"; import { IncomingStreamData } from "@libp2p/interface"; -import type { - IMetadata, - Libp2pComponents, - PeerIdStr, - ShardInfo +import { + type IMetadata, + type Libp2pComponents, + type PeerIdStr, + ProtocolError, + QueryResult, + type ShardInfo } from "@waku/interfaces"; import { proto_metadata } from "@waku/proto"; import { encodeRelayShard, Logger, shardInfoToPubsubTopics } from "@waku/utils"; @@ -21,7 +23,7 @@ export const MetadataCodec = "/vac/waku/metadata/1.0.0"; class Metadata extends BaseProtocol implements IMetadata { private libp2pComponents: Libp2pComponents; - handshakesConfirmed: Set = new Set(); + handshakesConfirmed: Map = new Map(); constructor( public shardInfo: ShardInfo, @@ -57,13 +59,13 @@ class Metadata extends BaseProtocol implements IMetadata { async (source) => await all(source) ); - const remoteShardInfoResponse = - this.decodeMetadataResponse(encodedResponse); + const { error, shardInfo } = this.decodeMetadataResponse(encodedResponse); - await this.savePeerShardInfo( - connection.remotePeer, - remoteShardInfoResponse - ); + if (error) { + return; + } + + await this.savePeerShardInfo(connection.remotePeer, shardInfo); } catch (error) { log.error("Error handling metadata request", error); } @@ -72,12 +74,15 @@ class Metadata extends BaseProtocol implements IMetadata { /** * Make a metadata query to a peer */ - async query(peerId: PeerId): Promise { + async query(peerId: PeerId): Promise { const request = proto_metadata.WakuMetadataRequest.encode(this.shardInfo); const peer = await this.peerStore.get(peerId); if (!peer) { - throw new Error(`Peer ${peerId.toString()} not found`); + return { + shardInfo: null, + error: ProtocolError.NO_PEER_AVAILABLE + }; } const stream = await this.getStream(peer); @@ -90,22 +95,38 @@ class Metadata extends BaseProtocol implements IMetadata { async (source) => await all(source) ); - const decodedResponse = this.decodeMetadataResponse(encodedResponse); + const { error, shardInfo } = this.decodeMetadataResponse(encodedResponse); - await this.savePeerShardInfo(peerId, decodedResponse); + if (error) { + return { + shardInfo: null, + error + }; + } - return decodedResponse; + await this.savePeerShardInfo(peerId, shardInfo); + + return { + shardInfo, + error: null + }; } - public async confirmOrAttemptHandshake(peerId: PeerId): Promise { - if (this.handshakesConfirmed.has(peerId.toString())) return; + public async confirmOrAttemptHandshake(peerId: PeerId): Promise { + const shardInfo = this.handshakesConfirmed.get(peerId.toString()); + if (shardInfo) { + return { + shardInfo, + error: null + }; + } - await this.query(peerId); - - return; + return await this.query(peerId); } - private decodeMetadataResponse(encodedResponse: Uint8ArrayList[]): ShardInfo { + private decodeMetadataResponse( + encodedResponse: Uint8ArrayList[] + ): QueryResult { const bytes = new Uint8ArrayList(); encodedResponse.forEach((chunk) => { @@ -115,9 +136,18 @@ class Metadata extends BaseProtocol implements IMetadata { bytes ) as ShardInfo; - if (!response) log.error("Error decoding metadata response"); + if (!response) { + log.error("Error decoding metadata response"); + return { + shardInfo: null, + error: ProtocolError.DECODE_FAILED + }; + } - return response; + return { + shardInfo: response, + error: null + }; } private async savePeerShardInfo( @@ -131,7 +161,7 @@ class Metadata extends BaseProtocol implements IMetadata { } }); - this.handshakesConfirmed.add(peerId.toString()); + this.handshakesConfirmed.set(peerId.toString(), shardInfo); } } diff --git a/packages/interfaces/src/metadata.ts b/packages/interfaces/src/metadata.ts index a847fce52d..492d3b4168 100644 --- a/packages/interfaces/src/metadata.ts +++ b/packages/interfaces/src/metadata.ts @@ -1,11 +1,25 @@ import type { PeerId } from "@libp2p/interface"; import type { ShardInfo } from "./enr.js"; -import type { IBaseProtocolCore, ShardingParams } from "./protocols.js"; +import type { + IBaseProtocolCore, + ProtocolError, + ShardingParams +} from "./protocols.js"; + +export type QueryResult = + | { + shardInfo: ShardInfo; + error: null; + } + | { + shardInfo: null; + error: ProtocolError; + }; // IMetadata always has shardInfo defined while it is optionally undefined in IBaseProtocol export interface IMetadata extends Omit { shardInfo: ShardingParams; - confirmOrAttemptHandshake(peerId: PeerId): Promise; - query(peerId: PeerId): Promise; + confirmOrAttemptHandshake(peerId: PeerId): Promise; + query(peerId: PeerId): Promise; } diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 8d64e83b46..b1a161058b 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -101,7 +101,7 @@ export type Callback = ( msg: T ) => void | Promise; -export enum SendError { +export enum ProtocolError { /** Could not determine the origin of the fault. Best to check connectivity and try again */ GENERIC_FAIL = "Generic error", /** @@ -150,7 +150,7 @@ export enum SendError { } export interface Failure { - error: SendError; + error: ProtocolError; peerId?: PeerId; } diff --git a/packages/relay/src/index.ts b/packages/relay/src/index.ts index e2e524fd3e..bbe10447b9 100644 --- a/packages/relay/src/index.ts +++ b/packages/relay/src/index.ts @@ -20,8 +20,8 @@ import { IRelay, Libp2p, ProtocolCreateOptions, + ProtocolError, PubsubTopic, - SendError, SendResult } from "@waku/interfaces"; import { isWireSizeUnderCap, toAsyncIterator } from "@waku/utils"; @@ -109,7 +109,7 @@ class Relay implements IRelay { successes, failures: [ { - error: SendError.TOPIC_NOT_CONFIGURED + error: ProtocolError.TOPIC_NOT_CONFIGURED } ] }; @@ -122,7 +122,7 @@ class Relay implements IRelay { successes, failures: [ { - error: SendError.ENCODE_FAILED + error: ProtocolError.ENCODE_FAILED } ] }; @@ -134,7 +134,7 @@ class Relay implements IRelay { successes, failures: [ { - error: SendError.SIZE_TOO_BIG + error: ProtocolError.SIZE_TOO_BIG } ] }; diff --git a/packages/sdk/src/protocols/light_push.ts b/packages/sdk/src/protocols/light_push.ts index aca7d78b8a..0584878d4a 100644 --- a/packages/sdk/src/protocols/light_push.ts +++ b/packages/sdk/src/protocols/light_push.ts @@ -7,7 +7,7 @@ import { type IMessage, type Libp2p, type ProtocolCreateOptions, - SendError, + ProtocolError, type SendResult } from "@waku/interfaces"; import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils"; @@ -37,7 +37,7 @@ export class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { return { failures: [ { - error: SendError.TOPIC_NOT_CONFIGURED + error: ProtocolError.TOPIC_NOT_CONFIGURED } ], successes: [] @@ -48,7 +48,7 @@ export class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { if (!peers.length) { return { successes, - failures: [{ error: SendError.NO_PEER_AVAILABLE }] + failures: [{ error: ProtocolError.NO_PEER_AVAILABLE }] }; } @@ -69,7 +69,7 @@ export class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { } } else { log.error("Failed to send message to peer", result.reason); - failures.push({ error: SendError.GENERIC_FAIL }); + failures.push({ error: ProtocolError.GENERIC_FAIL }); // TODO: handle renewing faulty peers with new peers (https://github.com/waku-org/js-waku/issues/1463) } } diff --git a/packages/tests/tests/light-push/index.node.spec.ts b/packages/tests/tests/light-push/index.node.spec.ts index a4055d10cc..896a9b27fe 100644 --- a/packages/tests/tests/light-push/index.node.spec.ts +++ b/packages/tests/tests/light-push/index.node.spec.ts @@ -3,7 +3,7 @@ import { DefaultPubsubTopic, IRateLimitProof, LightNode, - SendError + ProtocolError } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/sdk"; import { expect } from "chai"; @@ -97,7 +97,7 @@ const runTests = (strictNodeCheck: boolean): void => { expect(pushResponse.successes.length).to.eq(0); console.log("validated 1"); expect(pushResponse.failures?.map((failure) => failure.error)).to.include( - SendError.EMPTY_PAYLOAD + ProtocolError.EMPTY_PAYLOAD ); console.log("validated 2"); expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( @@ -190,7 +190,7 @@ const runTests = (strictNodeCheck: boolean): void => { expect(pushResponse.successes.length).to.eq(0); expect( pushResponse.failures?.map((failure) => failure.error) - ).to.include(SendError.REMOTE_PEER_REJECTED); + ).to.include(ProtocolError.REMOTE_PEER_REJECTED); expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( false ); @@ -262,7 +262,7 @@ const runTests = (strictNodeCheck: boolean): void => { }); expect(pushResponse.successes.length).to.eq(0); expect(pushResponse.failures?.map((failure) => failure.error)).to.include( - SendError.SIZE_TOO_BIG + ProtocolError.SIZE_TOO_BIG ); expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( false diff --git a/packages/tests/tests/light-push/single_node/index.node.spec.ts b/packages/tests/tests/light-push/single_node/index.node.spec.ts index fb1779ae91..f6d5c02b59 100644 --- a/packages/tests/tests/light-push/single_node/index.node.spec.ts +++ b/packages/tests/tests/light-push/single_node/index.node.spec.ts @@ -3,7 +3,7 @@ import { DefaultPubsubTopic, IRateLimitProof, LightNode, - SendError + ProtocolError } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/sdk"; import { expect } from "chai"; @@ -85,7 +85,7 @@ describe("Waku Light Push: Single Node", function () { expect(pushResponse.successes.length).to.eq(0); expect(pushResponse.failures?.map((failure) => failure.error)).to.include( - SendError.EMPTY_PAYLOAD + ProtocolError.EMPTY_PAYLOAD ); expect(await messageCollector.waitForMessages(1)).to.eq(false); }); @@ -167,7 +167,7 @@ describe("Waku Light Push: Single Node", function () { } else { expect(pushResponse.successes.length).to.eq(0); expect(pushResponse.failures?.map((failure) => failure.error)).to.include( - SendError.REMOTE_PEER_REJECTED + ProtocolError.REMOTE_PEER_REJECTED ); expect(await messageCollector.waitForMessages(1)).to.eq(false); } @@ -234,7 +234,7 @@ describe("Waku Light Push: Single Node", function () { }); expect(pushResponse.successes.length).to.eq(0); expect(pushResponse.failures?.map((failure) => failure.error)).to.include( - SendError.SIZE_TOO_BIG + ProtocolError.SIZE_TOO_BIG ); expect(await messageCollector.waitForMessages(1)).to.eq(false); }); diff --git a/packages/tests/tests/metadata.spec.ts b/packages/tests/tests/metadata.spec.ts index 172accb1bb..d121f14c62 100644 --- a/packages/tests/tests/metadata.spec.ts +++ b/packages/tests/tests/metadata.spec.ts @@ -52,11 +52,22 @@ describe("Metadata Protocol", function () { await waku.start(); await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec); - const shardInfoRes = - await waku.libp2p.services.metadata?.query(nwaku1PeerId); + if (!waku.libp2p.services.metadata) { + expect(waku.libp2p.services.metadata).to.not.be.undefined; + return; + } + + const { error, shardInfo: shardInfoRes } = + await waku.libp2p.services.metadata.query(nwaku1PeerId); + + if (error) { + expect(error).to.be.null; + return; + } + expect(shardInfoRes).to.not.be.undefined; - expect(shardInfoRes?.clusterId).to.equal(shardInfo.clusterId); - expect(shardInfoRes?.shards).to.deep.equal(shardInfo.shards); + expect(shardInfoRes.clusterId).to.equal(shardInfo.clusterId); + expect(shardInfoRes.shards).to.deep.equal(shardInfo.shards); const activeConnections = waku.libp2p.getConnections(); expect(activeConnections.length).to.equal(1); @@ -89,11 +100,22 @@ describe("Metadata Protocol", function () { await waku.start(); await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec); - const shardInfoRes = - await waku.libp2p.services.metadata?.query(nwaku1PeerId); + if (!waku.libp2p.services.metadata) { + expect(waku.libp2p.services.metadata).to.not.be.undefined; + return; + } + + const { error, shardInfo: shardInfoRes } = + await waku.libp2p.services.metadata.query(nwaku1PeerId); + + if (error) { + expect(error).to.be.null; + return; + } + expect(shardInfoRes).to.not.be.undefined; - expect(shardInfoRes?.clusterId).to.equal(shardInfo1.clusterId); - expect(shardInfoRes?.shards).to.deep.equal(shardInfo1.shards); + expect(shardInfoRes.clusterId).to.equal(shardInfo1.clusterId); + expect(shardInfoRes.shards).to.deep.equal(shardInfo1.shards); const activeConnections = waku.libp2p.getConnections(); expect(activeConnections.length).to.equal(1); diff --git a/packages/tests/tests/relay/publish.node.spec.ts b/packages/tests/tests/relay/publish.node.spec.ts index 50b6fb7340..86caea8fb8 100644 --- a/packages/tests/tests/relay/publish.node.spec.ts +++ b/packages/tests/tests/relay/publish.node.spec.ts @@ -1,5 +1,5 @@ import { createEncoder } from "@waku/core"; -import { IRateLimitProof, RelayNode, SendError } from "@waku/interfaces"; +import { IRateLimitProof, ProtocolError, RelayNode } from "@waku/interfaces"; import { createRelayNode } from "@waku/sdk/relay"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; @@ -135,7 +135,7 @@ describe("Waku Relay, Publish", function () { payload: utf8ToBytes("") }); expect(pushResponse.failures?.[0].error).to.eq( - SendError.TOPIC_NOT_CONFIGURED + ProtocolError.TOPIC_NOT_CONFIGURED ); await delay(400); expect(await messageCollector.waitForMessages(1)).to.eq(false); @@ -148,7 +148,7 @@ describe("Waku Relay, Publish", function () { }); expect(pushResponse.successes.length).to.eq(0); expect(pushResponse.failures?.map((failure) => failure.error)).to.include( - SendError.SIZE_TOO_BIG + ProtocolError.SIZE_TOO_BIG ); await delay(400); expect(await messageCollector.waitForMessages(1)).to.eq(false); diff --git a/packages/tests/tests/sharding/running_nodes.spec.ts b/packages/tests/tests/sharding/running_nodes.spec.ts index 24beb3f519..1d51141990 100644 --- a/packages/tests/tests/sharding/running_nodes.spec.ts +++ b/packages/tests/tests/sharding/running_nodes.spec.ts @@ -1,7 +1,7 @@ import { LightNode, + ProtocolError, Protocols, - SendError, ShardInfo, SingleShardInfo } from "@waku/interfaces"; @@ -93,7 +93,7 @@ describe("Static Sharding: Running Nodes", function () { } const errors = failures?.map((failure) => failure.error); - expect(errors).to.include(SendError.TOPIC_NOT_CONFIGURED); + expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED); }); }); diff --git a/packages/utils/src/common/sharding.ts b/packages/utils/src/common/sharding.ts index c2911234cd..3ad5a77780 100644 --- a/packages/utils/src/common/sharding.ts +++ b/packages/utils/src/common/sharding.ts @@ -100,7 +100,7 @@ export const pubsubTopicToSingleShardInfo = ( }; //TODO: move part of BaseProtocol instead of utils -// return `SendError.TOPIC_NOT_CONFIGURED` instead of throwing +// return `ProtocolError.TOPIC_NOT_CONFIGURED` instead of throwing export function ensurePubsubTopicIsConfigured( pubsubTopic: PubsubTopic, configuredTopics: PubsubTopic[]