diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index d9379c1690..f4ffcb7fc2 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -1,3 +1,4 @@ +import type { Stream } from "@libp2p/interface/connection"; import type { PeerId } from "@libp2p/interface/peer-id"; import { IEncoder, @@ -102,43 +103,63 @@ class LightPush extends BaseProtocol implements ILightPush { numPeers: this.NUM_PEERS_PROTOCOL }); - const promises = peers.map(async (peer) => { - let error: SendError | undefined; - const stream = await this.getStream(peer); + if (!peers.length) { + return { + recipients, + errors: [SendError.NO_PEER_AVAILABLE] + }; + } + const promises = peers.map(async (peer) => { + let stream: Stream | undefined; try { - const res = await pipe( + stream = await this.getStream(peer); + } catch (err) { + log(`Failed to get a stream for remote peer${peer.id.toString()}`, err); + return { recipients, error: SendError.REMOTE_PEER_FAULT }; + } + + let res: Uint8ArrayList[] | undefined; + try { + res = await pipe( [query.encode()], lp.encode, stream, lp.decode, async (source) => await all(source) ); - try { - const bytes = new Uint8ArrayList(); - res.forEach((chunk) => { - bytes.append(chunk); - }); - - const response = PushRpc.decode(bytes).response; - - if (response?.isSuccess) { - recipients.some((recipient) => recipient.equals(peer.id)) || - recipients.push(peer.id); - } else { - log("No response in PushRPC"); - error = SendError.NO_RPC_RESPONSE; - } - } catch (err) { - log("Failed to decode push reply", err); - error = SendError.DECODE_FAILED; - } } catch (err) { log("Failed to send waku light push request", err); - error = SendError.GENERIC_FAIL; + return { recipients, error: SendError.GENERIC_FAIL }; } - return { recipients, error }; + const bytes = new Uint8ArrayList(); + res.forEach((chunk) => { + bytes.append(chunk); + }); + + let response: PushResponse | undefined; + try { + response = PushRpc.decode(bytes).response; + } catch (err) { + log("Failed to decode push reply", err); + return { recipients, error: SendError.DECODE_FAILED }; + } + + if (!response) { + log("Remote peer fault: No response in PushRPC"); + return { recipients, error: SendError.REMOTE_PEER_FAULT }; + } + + if (!response.isSuccess) { + log("Remote peer rejected the message: ", response.info); + return { recipients, error: SendError.REMOTE_PEER_REJECTED }; + } + + recipients.some((recipient) => recipient.equals(peer.id)) || + recipients.push(peer.id); + + return { recipients }; }); const results = await Promise.allSettled(promises); diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index c354b1b740..d84172f6e7 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -59,11 +59,36 @@ export type Callback = ( ) => void | Promise; export enum SendError { + /** Could not determine the origin of the fault. Best to check connectivity and try again */ GENERIC_FAIL = "Generic error", + /** Failure to protobuf encode the message. This is not recoverable and needs + * further investigation. */ ENCODE_FAILED = "Failed to encode", + /** Failure to protobuf decode the message. May be due to a remote peer issue, + * ensuring that messages are sent via several peer enable mitigation of this error.. */ DECODE_FAILED = "Failed to decode", + /** The message size is above the maximum message size allowed on the Waku Network. + * Compressing the message or using an alternative strategy for large messages is recommended. + */ SIZE_TOO_BIG = "Size is too big", - NO_RPC_RESPONSE = "No RPC response" + /** + * Failure to find a peer with suitable protocols. This may due to a connection issue. + * Mitigation can be: retrying after a given time period, display connectivity issue + * to user or listening for `peer:connected:bootstrap` or `peer:connected:peer-exchange` + * on the connection manager before retrying. + */ + NO_PEER_AVAILABLE = "No peer available", + /** + * The remote peer did not behave as expected. Mitigation for `NO_PEER_AVAILABLE` + * or `DECODE_FAILED` can be used. + */ + REMOTE_PEER_FAULT = "Remote peer fault", + /** + * The remote peer rejected the message. Information provided by the remote peer + * is logged. Review message validity, or mitigation for `NO_PEER_AVAILABLE` + * or `DECODE_FAILED` can be used. + */ + REMOTE_PEER_REJECTED = "Remote peer rejected" } export interface SendResult { diff --git a/packages/tests/tests/light-push/index.spec.ts b/packages/tests/tests/light-push/index.spec.ts index 3334cd76fb..e43207bd9f 100644 --- a/packages/tests/tests/light-push/index.spec.ts +++ b/packages/tests/tests/light-push/index.spec.ts @@ -86,7 +86,8 @@ describe("Waku Light Push [node only]", function () { }); } else { expect(pushResponse.recipients.length).to.eq(0); - expect(pushResponse.errors).to.include(SendError.NO_RPC_RESPONSE); + // This should be `REMOTE_PEER_REJECTED`, tracked with https://github.com/waku-org/nwaku/issues/1641 + expect(pushResponse.errors).to.include(SendError.REMOTE_PEER_FAULT); expect(await messageCollector.waitForMessages(1)).to.eq(false); } }); @@ -158,7 +159,8 @@ describe("Waku Light Push [node only]", function () { }); } else { expect(pushResponse.recipients.length).to.eq(0); - expect(pushResponse.errors).to.include(SendError.NO_RPC_RESPONSE); + // Should be `REMOTE_PEER_REJECTED`, tracked with https://github.com/waku-org/nwaku/issues/2059 + expect(pushResponse.errors).to.include(SendError.REMOTE_PEER_FAULT); expect(await messageCollector.waitForMessages(1)).to.eq(false); } });