feat(filter)!: return error codes instead of throwing errors (#1971)

* move protocol result type to interfaces

* chore: update type names for verbosity

* feat(filter-core): convert error throws to return types

* chore: update types & imports

* update Filter API

* chore: update createSubscription

* chore: update imports & rename

* chore: update all tests

* chore: resolve conflicts & merge (2/n)

* chore: resolve conflicts & merge (3/n)

* chore: resolve conflicts & merge (4/n)

* chore: resolve conflicts & merge (5/n)

* chore: resolve conflicts & merge (6/n)

* chore: use idiomatic approach

* chore: fix tests

* chore: address comments

* chore: fix test

* rm: only
This commit is contained in:
Danish Arora 2024-05-09 16:51:08 +05:30 committed by GitHub
parent 5df41b0adf
commit 4eb06c64eb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 550 additions and 283 deletions

View File

@ -1,17 +1,20 @@
import type { Peer } from "@libp2p/interface"; import type { Peer, Stream } from "@libp2p/interface";
import type { IncomingStreamData } from "@libp2p/interface-internal"; import type { IncomingStreamData } from "@libp2p/interface-internal";
import type { import {
ContentTopic, type ContentTopic,
IBaseProtocolCore, type CoreProtocolResult,
Libp2p, type IBaseProtocolCore,
ProtocolCreateOptions, type Libp2p,
PubsubTopic type ProtocolCreateOptions,
ProtocolError,
type PubsubTopic
} from "@waku/interfaces"; } from "@waku/interfaces";
import { WakuMessage } from "@waku/proto"; import { WakuMessage } from "@waku/proto";
import { Logger } from "@waku/utils"; import { Logger } from "@waku/utils";
import all from "it-all"; import all from "it-all";
import * as lp from "it-length-prefixed"; import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe"; import { pipe } from "it-pipe";
import { Uint8ArrayList } from "uint8arraylist";
import { BaseProtocol } from "../base_protocol.js"; import { BaseProtocol } from "../base_protocol.js";
@ -90,7 +93,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
pubsubTopic: PubsubTopic, pubsubTopic: PubsubTopic,
peer: Peer, peer: Peer,
contentTopics: ContentTopic[] contentTopics: ContentTopic[]
): Promise<void> { ): Promise<CoreProtocolResult> {
const stream = await this.getStream(peer); const stream = await this.getStream(peer);
const request = FilterSubscribeRpc.createSubscribeRequest( const request = FilterSubscribeRpc.createSubscribeRequest(
@ -98,45 +101,98 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
contentTopics contentTopics
); );
const res = await pipe( let res: Uint8ArrayList[] | undefined;
[request.encode()], try {
lp.encode, res = await pipe(
stream, [request.encode()],
lp.decode, lp.encode,
async (source) => await all(source) stream,
); lp.decode,
async (source) => await all(source)
if (!res || !res.length) {
throw Error(
`No response received for request ${request.requestId}: ${res}`
); );
} catch (error) {
log.error("Failed to send subscribe request", error);
return {
success: null,
failure: {
error: ProtocolError.GENERIC_FAIL,
peerId: peer.id
}
};
} }
const { statusCode, requestId, statusDesc } = const { statusCode, requestId, statusDesc } =
FilterSubscribeResponse.decode(res[0].slice()); FilterSubscribeResponse.decode(res[0].slice());
if (statusCode < 200 || statusCode >= 300) { if (statusCode < 200 || statusCode >= 300) {
throw new Error( log.error(
`Filter subscribe request ${requestId} failed with status code ${statusCode}: ${statusDesc}` `Filter subscribe request ${requestId} failed with status code ${statusCode}: ${statusDesc}`
); );
return {
failure: {
error: ProtocolError.REMOTE_PEER_REJECTED,
peerId: peer.id
},
success: null
};
} }
return {
failure: null,
success: peer.id
};
} }
async unsubscribe( async unsubscribe(
pubsubTopic: PubsubTopic, pubsubTopic: PubsubTopic,
peer: Peer, peer: Peer,
contentTopics: ContentTopic[] contentTopics: ContentTopic[]
): Promise<void> { ): Promise<CoreProtocolResult> {
const stream = await this.getStream(peer); let stream: Stream | undefined;
try {
stream = await this.getStream(peer);
} catch (error) {
log.error(
`Failed to get a stream for remote peer${peer.id.toString()}`,
error
);
return {
success: null,
failure: {
error: ProtocolError.REMOTE_PEER_FAULT,
peerId: peer.id
}
};
}
const unsubscribeRequest = FilterSubscribeRpc.createUnsubscribeRequest( const unsubscribeRequest = FilterSubscribeRpc.createUnsubscribeRequest(
pubsubTopic, pubsubTopic,
contentTopics contentTopics
); );
await pipe([unsubscribeRequest.encode()], lp.encode, stream.sink); try {
await pipe([unsubscribeRequest.encode()], lp.encode, stream.sink);
} catch (error) {
log.error("Failed to send unsubscribe request", error);
return {
success: null,
failure: {
error: ProtocolError.GENERIC_FAIL,
peerId: peer.id
}
};
}
return {
success: peer.id,
failure: null
};
} }
async unsubscribeAll(pubsubTopic: PubsubTopic, peer: Peer): Promise<void> { async unsubscribeAll(
pubsubTopic: PubsubTopic,
peer: Peer
): Promise<CoreProtocolResult> {
const stream = await this.getStream(peer); const stream = await this.getStream(peer);
const request = FilterSubscribeRpc.createUnsubscribeAllRequest(pubsubTopic); const request = FilterSubscribeRpc.createUnsubscribeAllRequest(pubsubTopic);
@ -150,53 +206,105 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
); );
if (!res || !res.length) { if (!res || !res.length) {
throw Error( return {
`No response received for request ${request.requestId}: ${res}` failure: {
); error: ProtocolError.REMOTE_PEER_FAULT,
peerId: peer.id
},
success: null
};
} }
const { statusCode, requestId, statusDesc } = const { statusCode, requestId, statusDesc } =
FilterSubscribeResponse.decode(res[0].slice()); FilterSubscribeResponse.decode(res[0].slice());
if (statusCode < 200 || statusCode >= 300) { if (statusCode < 200 || statusCode >= 300) {
throw new Error( log.error(
`Filter unsubscribe all request ${requestId} failed with status code ${statusCode}: ${statusDesc}` `Filter unsubscribe all request ${requestId} failed with status code ${statusCode}: ${statusDesc}`
); );
return {
failure: {
error: ProtocolError.REMOTE_PEER_REJECTED,
peerId: peer.id
},
success: null
};
} }
return {
failure: null,
success: peer.id
};
} }
async ping(peer: Peer): Promise<void> { async ping(peer: Peer): Promise<CoreProtocolResult> {
const stream = await this.getStream(peer); let stream: Stream | undefined;
try {
stream = await this.getStream(peer);
} catch (error) {
log.error(
`Failed to get a stream for remote peer${peer.id.toString()}`,
error
);
return {
success: null,
failure: {
error: ProtocolError.REMOTE_PEER_FAULT,
peerId: peer.id
}
};
}
const request = FilterSubscribeRpc.createSubscriberPingRequest(); const request = FilterSubscribeRpc.createSubscriberPingRequest();
let res: Uint8ArrayList[] | undefined;
try { try {
const res = await pipe( res = await pipe(
[request.encode()], [request.encode()],
lp.encode, lp.encode,
stream, stream,
lp.decode, lp.decode,
async (source) => await all(source) async (source) => await all(source)
); );
if (!res || !res.length) {
throw Error(
`No response received for request ${request.requestId}: ${res}`
);
}
const { statusCode, requestId, statusDesc } =
FilterSubscribeResponse.decode(res[0].slice());
if (statusCode < 200 || statusCode >= 300) {
throw new Error(
`Filter ping request ${requestId} failed with status code ${statusCode}: ${statusDesc}`
);
}
log.info(`Ping successful for peer ${peer.id.toString()}`);
} catch (error) { } catch (error) {
log.error("Error pinging: ", error); log.error("Failed to send ping request", error);
throw error; // Rethrow the actual error instead of wrapping it return {
success: null,
failure: {
error: ProtocolError.GENERIC_FAIL,
peerId: peer.id
}
};
} }
if (!res || !res.length) {
return {
success: null,
failure: {
error: ProtocolError.REMOTE_PEER_FAULT,
peerId: peer.id
}
};
}
const { statusCode, requestId, statusDesc } =
FilterSubscribeResponse.decode(res[0].slice());
if (statusCode < 200 || statusCode >= 300) {
log.error(
`Filter ping request ${requestId} failed with status code ${statusCode}: ${statusDesc}`
);
return {
success: null,
failure: {
error: ProtocolError.REMOTE_PEER_REJECTED,
peerId: peer.id
}
};
}
return {
success: peer.id,
failure: null
};
} }
} }

View File

@ -1,13 +1,13 @@
import type { Peer, PeerId, Stream } from "@libp2p/interface"; import type { Peer, Stream } from "@libp2p/interface";
import { import {
Failure, type CoreProtocolResult,
IBaseProtocolCore, type IBaseProtocolCore,
IEncoder, type IEncoder,
IMessage, type IMessage,
Libp2p, type Libp2p,
ProtocolCreateOptions, type ProtocolCreateOptions,
ProtocolError, ProtocolError,
ProtocolResult type ThisOrThat
} from "@waku/interfaces"; } from "@waku/interfaces";
import { PushResponse } from "@waku/proto"; import { PushResponse } from "@waku/proto";
import { isMessageSizeUnderCap } from "@waku/utils"; import { isMessageSizeUnderCap } from "@waku/utils";
@ -26,9 +26,7 @@ const log = new Logger("light-push");
export const LightPushCodec = "/vac/waku/lightpush/2.0.0-beta1"; export const LightPushCodec = "/vac/waku/lightpush/2.0.0-beta1";
export { PushResponse }; export { PushResponse };
type PreparePushMessageResult = ProtocolResult<"query", PushRpc>; type PreparePushMessageResult = ThisOrThat<"query", PushRpc>;
type CoreSendResult = ProtocolResult<"success", PeerId, "failure", Failure>;
/** /**
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/). * Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
@ -84,7 +82,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
encoder: IEncoder, encoder: IEncoder,
message: IMessage, message: IMessage,
peer: Peer peer: Peer
): Promise<CoreSendResult> { ): Promise<CoreProtocolResult> {
const { query, error: preparationError } = await this.preparePushMessage( const { query, error: preparationError } = await this.preparePushMessage(
encoder, encoder,
message message

View File

@ -3,9 +3,9 @@ import { IncomingStreamData } from "@libp2p/interface";
import { import {
type IMetadata, type IMetadata,
type Libp2pComponents, type Libp2pComponents,
type MetadataQueryResult,
type PeerIdStr, type PeerIdStr,
ProtocolError, ProtocolError,
QueryResult,
type ShardInfo type ShardInfo
} from "@waku/interfaces"; } from "@waku/interfaces";
import { proto_metadata } from "@waku/proto"; import { proto_metadata } from "@waku/proto";
@ -74,7 +74,7 @@ class Metadata extends BaseProtocol implements IMetadata {
/** /**
* Make a metadata query to a peer * Make a metadata query to a peer
*/ */
async query(peerId: PeerId): Promise<QueryResult> { async query(peerId: PeerId): Promise<MetadataQueryResult> {
const request = proto_metadata.WakuMetadataRequest.encode(this.shardInfo); const request = proto_metadata.WakuMetadataRequest.encode(this.shardInfo);
const peer = await this.peerStore.get(peerId); const peer = await this.peerStore.get(peerId);
@ -112,7 +112,9 @@ class Metadata extends BaseProtocol implements IMetadata {
}; };
} }
public async confirmOrAttemptHandshake(peerId: PeerId): Promise<QueryResult> { public async confirmOrAttemptHandshake(
peerId: PeerId
): Promise<MetadataQueryResult> {
const shardInfo = this.handshakesConfirmed.get(peerId.toString()); const shardInfo = this.handshakesConfirmed.get(peerId.toString());
if (shardInfo) { if (shardInfo) {
return { return {
@ -126,7 +128,7 @@ class Metadata extends BaseProtocol implements IMetadata {
private decodeMetadataResponse( private decodeMetadataResponse(
encodedResponse: Uint8ArrayList[] encodedResponse: Uint8ArrayList[]
): QueryResult { ): MetadataQueryResult {
const bytes = new Uint8ArrayList(); const bytes = new Uint8ArrayList();
encodedResponse.forEach((chunk) => { encodedResponse.forEach((chunk) => {

View File

@ -4,7 +4,7 @@ import {
IPeerExchange, IPeerExchange,
Libp2pComponents, Libp2pComponents,
PeerExchangeQueryParams, PeerExchangeQueryParams,
PeerExchangeResult, PeerExchangeQueryResult,
ProtocolError, ProtocolError,
PubsubTopic PubsubTopic
} from "@waku/interfaces"; } from "@waku/interfaces";
@ -35,7 +35,9 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
/** /**
* Make a peer exchange query to a peer * Make a peer exchange query to a peer
*/ */
async query(params: PeerExchangeQueryParams): Promise<PeerExchangeResult> { async query(
params: PeerExchangeQueryParams
): Promise<PeerExchangeQueryResult> {
const { numPeers } = params; const { numPeers } = params;
const rpcQuery = PeerExchangeRPC.createRequest({ const rpcQuery = PeerExchangeRPC.createRequest({
numPeers: BigInt(numPeers) numPeers: BigInt(numPeers)

View File

@ -8,8 +8,8 @@ import type {
PeerInfo PeerInfo
} from "@libp2p/interface"; } from "@libp2p/interface";
import { import {
Libp2pComponents, type Libp2pComponents,
PeerExchangeResult, type PeerExchangeQueryResult,
PubsubTopic, PubsubTopic,
Tags Tags
} from "@waku/interfaces"; } from "@waku/interfaces";
@ -165,7 +165,7 @@ export class PeerExchangeDiscovery
}, queryInterval * currentAttempt); }, queryInterval * currentAttempt);
}; };
private async query(peerId: PeerId): Promise<PeerExchangeResult> { private async query(peerId: PeerId): Promise<PeerExchangeQueryResult> {
const { error, peerInfos } = await this.peerExchange.query({ const { error, peerInfos } = await this.peerExchange.query({
numPeers: DEFAULT_PEER_EXCHANGE_REQUEST_NODES, numPeers: DEFAULT_PEER_EXCHANGE_REQUEST_NODES,
peerId peerId

View File

@ -1,9 +1,11 @@
import type { IDecodedMessage, IDecoder } from "./message.js"; import type { IDecodedMessage, IDecoder } from "./message.js";
import type { ContentTopic, PubsubTopic } from "./misc.js"; import type { ContentTopic, PubsubTopic, ThisOrThat } from "./misc.js";
import type { import type {
Callback, Callback,
IBaseProtocolCore, IBaseProtocolCore,
IBaseProtocolSDK, IBaseProtocolSDK,
ProtocolError,
SDKProtocolResult,
ShardingParams ShardingParams
} from "./protocols.js"; } from "./protocols.js";
import type { IReceiver } from "./receiver.js"; import type { IReceiver } from "./receiver.js";
@ -12,18 +14,20 @@ export type SubscribeOptions = {
keepAlive?: number; keepAlive?: number;
}; };
export interface IFilterSubscription { export type IFilter = IReceiver & IBaseProtocolCore;
export interface ISubscriptionSDK {
subscribe<T extends IDecodedMessage>( subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[], decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>, callback: Callback<T>,
options?: SubscribeOptions options?: SubscribeOptions
): Promise<void>; ): Promise<SDKProtocolResult>;
unsubscribe(contentTopics: ContentTopic[]): Promise<void>; unsubscribe(contentTopics: ContentTopic[]): Promise<SDKProtocolResult>;
ping(): Promise<void>; ping(): Promise<SDKProtocolResult>;
unsubscribeAll(): Promise<void>; unsubscribeAll(): Promise<SDKProtocolResult>;
} }
export type IFilterSDK = IReceiver & export type IFilterSDK = IReceiver &
@ -31,5 +35,12 @@ export type IFilterSDK = IReceiver &
createSubscription( createSubscription(
pubsubTopicShardInfo?: ShardingParams | PubsubTopic, pubsubTopicShardInfo?: ShardingParams | PubsubTopic,
options?: SubscribeOptions options?: SubscribeOptions
): Promise<IFilterSubscription>; ): Promise<CreateSubscriptionResult>;
}; };
export type CreateSubscriptionResult = ThisOrThat<
"subscription",
ISubscriptionSDK,
"error",
ProtocolError
>;

View File

@ -1,17 +1,14 @@
import type { PeerId } from "@libp2p/interface"; import type { PeerId } from "@libp2p/interface";
import { type ShardInfo } from "./enr.js"; import { type ShardInfo } from "./enr.js";
import type { import { ThisOrThat } from "./misc.js";
IBaseProtocolCore, import type { IBaseProtocolCore, ShardingParams } from "./protocols.js";
ProtocolResult,
ShardingParams
} from "./protocols.js";
export type QueryResult = ProtocolResult<"shardInfo", ShardInfo>; export type MetadataQueryResult = ThisOrThat<"shardInfo", ShardInfo>;
// IMetadata always has shardInfo defined while it is optionally undefined in IBaseProtocol // IMetadata always has shardInfo defined while it is optionally undefined in IBaseProtocol
export interface IMetadata extends Omit<IBaseProtocolCore, "shardInfo"> { export interface IMetadata extends Omit<IBaseProtocolCore, "shardInfo"> {
shardInfo: ShardingParams; shardInfo: ShardingParams;
confirmOrAttemptHandshake(peerId: PeerId): Promise<QueryResult>; confirmOrAttemptHandshake(peerId: PeerId): Promise<MetadataQueryResult>;
query(peerId: PeerId): Promise<QueryResult>; query(peerId: PeerId): Promise<MetadataQueryResult>;
} }

View File

@ -1,4 +1,5 @@
import type { IDecodedMessage } from "./message.js"; import type { IDecodedMessage } from "./message.js";
import { ProtocolError } from "./protocols.js";
export interface IAsyncIterator<T extends IDecodedMessage> { export interface IAsyncIterator<T extends IDecodedMessage> {
iterator: AsyncIterator<T>; iterator: AsyncIterator<T>;
@ -11,3 +12,23 @@ export type PubsubTopic = string;
export type ContentTopic = string; export type ContentTopic = string;
export type PeerIdStr = string; export type PeerIdStr = string;
// SK = success key name
// SV = success value type
// EK = error key name (default: "error")
// EV = error value type (default: ProtocolError)
export type ThisOrThat<
SK extends string,
SV,
EK extends string = "error",
EV = ProtocolError
> =
| ({ [key in SK]: SV } & { [key in EK]: null })
| ({ [key in SK]: null } & { [key in EK]: EV });
export type ThisAndThat<
SK extends string,
SV,
EK extends string = "error",
EV = ProtocolError
> = { [key in SK]: SV } & { [key in EK]: EV };

View File

@ -3,13 +3,14 @@ import type { PeerStore } from "@libp2p/interface";
import type { ConnectionManager } from "@libp2p/interface-internal"; import type { ConnectionManager } from "@libp2p/interface-internal";
import { IEnr } from "./enr.js"; import { IEnr } from "./enr.js";
import { IBaseProtocolCore, ProtocolResult } from "./protocols.js"; import { ThisOrThat } from "./misc.js";
import { IBaseProtocolCore } from "./protocols.js";
export interface IPeerExchange extends IBaseProtocolCore { export interface IPeerExchange extends IBaseProtocolCore {
query(params: PeerExchangeQueryParams): Promise<PeerExchangeResult>; query(params: PeerExchangeQueryParams): Promise<PeerExchangeQueryResult>;
} }
export type PeerExchangeResult = ProtocolResult<"peerInfos", PeerInfo[]>; export type PeerExchangeQueryResult = ThisOrThat<"peerInfos", PeerInfo[]>;
export interface PeerExchangeQueryParams { export interface PeerExchangeQueryParams {
numPeers: number; numPeers: number;

View File

@ -5,7 +5,7 @@ import type { Peer, PeerStore } from "@libp2p/interface";
import type { ShardInfo } from "./enr.js"; import type { ShardInfo } from "./enr.js";
import type { CreateLibp2pOptions } from "./libp2p.js"; import type { CreateLibp2pOptions } from "./libp2p.js";
import type { IDecodedMessage } from "./message.js"; import type { IDecodedMessage } from "./message.js";
import { PubsubTopic } from "./misc.js"; import { PubsubTopic, ThisAndThat, ThisOrThat } from "./misc.js";
export enum Protocols { export enum Protocols {
Relay = "relay", Relay = "relay",
@ -107,27 +107,6 @@ export type Callback<T extends IDecodedMessage> = (
msg: T msg: T
) => void | Promise<void>; ) => void | Promise<void>;
// SK = success key name
// SV = success value type
// EK = error key name (default: "error")
// EV = error value type (default: ProtocolError)
export type ProtocolResult<
SK extends string,
SV,
EK extends string = "error",
EV = ProtocolError
> =
| ({
[key in SK]: SV;
} & {
[key in EK]: null;
})
| ({
[key in SK]: null;
} & {
[key in EK]: EV;
});
export enum ProtocolError { export enum ProtocolError {
/** Could not determine the origin of the fault. Best to check connectivity and try again */ /** Could not determine the origin of the fault. Best to check connectivity and try again */
GENERIC_FAIL = "Generic error", GENERIC_FAIL = "Generic error",
@ -156,6 +135,11 @@ export enum ProtocolError {
* Please ensure that the PubsubTopic is used when initializing the Waku node. * Please ensure that the PubsubTopic is used when initializing the Waku node.
*/ */
TOPIC_NOT_CONFIGURED = "Topic not configured", TOPIC_NOT_CONFIGURED = "Topic not configured",
/**
* The pubsub topic configured on the decoder does not match the pubsub topic setup on the protocol.
* Ensure that the pubsub topic used for decoder creation is the same as the one used for protocol.
*/
TOPIC_DECODER_MISMATCH = "Topic decoder mismatch",
/** /**
* Failure to find a peer with suitable protocols. This may due to a connection issue. * Failure to find a peer with suitable protocols. This may due to a connection issue.
* Mitigation can be: retrying after a given time period, display connectivity issue * Mitigation can be: retrying after a given time period, display connectivity issue
@ -186,7 +170,16 @@ export interface Failure {
peerId?: PeerId; peerId?: PeerId;
} }
export interface SendResult { export type CoreProtocolResult = ThisOrThat<
failures?: Failure[]; "success",
successes: PeerId[]; PeerId,
} "failure",
Failure
>;
export type SDKProtocolResult = ThisAndThat<
"successes",
PeerId[],
"failures",
Failure[]
>;

View File

@ -1,6 +1,6 @@
import type { IEncoder, IMessage } from "./message.js"; import type { IEncoder, IMessage } from "./message.js";
import type { SendResult } from "./protocols.js"; import { SDKProtocolResult } from "./protocols.js";
export interface ISender { export interface ISender {
send: (encoder: IEncoder, message: IMessage) => Promise<SendResult>; send: (encoder: IEncoder, message: IMessage) => Promise<SDKProtocolResult>;
} }

View File

@ -22,7 +22,7 @@ import {
ProtocolCreateOptions, ProtocolCreateOptions,
ProtocolError, ProtocolError,
PubsubTopic, PubsubTopic,
SendResult SDKProtocolResult
} from "@waku/interfaces"; } from "@waku/interfaces";
import { isWireSizeUnderCap, toAsyncIterator } from "@waku/utils"; import { isWireSizeUnderCap, toAsyncIterator } from "@waku/utils";
import { pushOrInitMapSet } from "@waku/utils"; import { pushOrInitMapSet } from "@waku/utils";
@ -99,7 +99,10 @@ class Relay implements IRelay {
/** /**
* Send Waku message. * Send Waku message.
*/ */
public async send(encoder: IEncoder, message: IMessage): Promise<SendResult> { public async send(
encoder: IEncoder,
message: IMessage
): Promise<SDKProtocolResult> {
const successes: PeerId[] = []; const successes: PeerId[] = [];
const { pubsubTopic } = encoder; const { pubsubTopic } = encoder;
@ -142,7 +145,8 @@ class Relay implements IRelay {
const { recipients } = await this.gossipSub.publish(pubsubTopic, msg); const { recipients } = await this.gossipSub.publish(pubsubTopic, msg);
return { return {
successes: recipients successes: recipients,
failures: []
}; };
} }

View File

@ -1,19 +1,24 @@
import type { Peer } from "@libp2p/interface"; import type { Peer } from "@libp2p/interface";
import { FilterCore } from "@waku/core"; import { FilterCore } from "@waku/core";
import type { import {
Callback, type Callback,
ContentTopic, type ContentTopic,
IAsyncIterator, CoreProtocolResult,
IDecodedMessage, CreateSubscriptionResult,
IDecoder, type IAsyncIterator,
IFilterSDK, type IDecodedMessage,
IProtoMessage, type IDecoder,
Libp2p, type IFilterSDK,
ProtocolCreateOptions, type IProtoMessage,
PubsubTopic, type ISubscriptionSDK,
ShardingParams, type Libp2p,
type ProtocolCreateOptions,
ProtocolError,
type PubsubTopic,
SDKProtocolResult,
type ShardingParams,
SubscribeOptions, SubscribeOptions,
Unsubscribe type Unsubscribe
} from "@waku/interfaces"; } from "@waku/interfaces";
import { messageHashStr } from "@waku/message-hash"; import { messageHashStr } from "@waku/message-hash";
import { WakuMessage } from "@waku/proto"; import { WakuMessage } from "@waku/proto";
@ -38,8 +43,7 @@ const MINUTE = 60 * 1000;
const DEFAULT_SUBSCRIBE_OPTIONS = { const DEFAULT_SUBSCRIBE_OPTIONS = {
keepAlive: MINUTE keepAlive: MINUTE
}; };
export class SubscriptionManager implements ISubscriptionSDK {
export class SubscriptionManager {
private readonly pubsubTopic: PubsubTopic; private readonly pubsubTopic: PubsubTopic;
readonly peers: Peer[]; readonly peers: Peer[];
readonly receivedMessagesHashStr: string[] = []; readonly receivedMessagesHashStr: string[] = [];
@ -64,28 +68,33 @@ export class SubscriptionManager {
decoders: IDecoder<T> | IDecoder<T>[], decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>, callback: Callback<T>,
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
): Promise<void> { ): Promise<SDKProtocolResult> {
const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; const decodersArray = Array.isArray(decoders) ? decoders : [decoders];
// check that all decoders are configured for the same pubsub topic as this subscription // check that all decoders are configured for the same pubsub topic as this subscription
decodersArray.forEach((decoder) => { for (const decoder of decodersArray) {
if (decoder.pubsubTopic !== this.pubsubTopic) { if (decoder.pubsubTopic !== this.pubsubTopic) {
throw new Error( return {
`Pubsub topic not configured: decoder is configured for pubsub topic ${decoder.pubsubTopic} but this subscription is for pubsub topic ${this.pubsubTopic}. Please create a new Subscription for the different pubsub topic.` failures: [
); {
error: ProtocolError.TOPIC_DECODER_MISMATCH
}
],
successes: []
};
} }
}); }
const decodersGroupedByCT = groupByContentTopic(decodersArray); const decodersGroupedByCT = groupByContentTopic(decodersArray);
const contentTopics = Array.from(decodersGroupedByCT.keys()); const contentTopics = Array.from(decodersGroupedByCT.keys());
const promises = this.peers.map(async (peer) => { const promises = this.peers.map(async (peer) =>
await this.protocol.subscribe(this.pubsubTopic, peer, contentTopics); this.protocol.subscribe(this.pubsubTopic, peer, contentTopics)
}); );
const results = await Promise.allSettled(promises); const results = await Promise.allSettled(promises);
this.handleErrors(results, "subscribe"); const finalResult = this.handleResult(results, "subscribe");
// Save the callback functions by content topics so they // Save the callback functions by content topics so they
// can easily be removed (reciprocally replaced) if `unsubscribe` (reciprocally `subscribe`) // can easily be removed (reciprocally replaced) if `unsubscribe` (reciprocally `subscribe`)
@ -106,50 +115,59 @@ export class SubscriptionManager {
if (options?.keepAlive) { if (options?.keepAlive) {
this.startKeepAlivePings(options.keepAlive); this.startKeepAlivePings(options.keepAlive);
} }
return finalResult;
} }
async unsubscribe(contentTopics: ContentTopic[]): Promise<void> { async unsubscribe(contentTopics: ContentTopic[]): Promise<SDKProtocolResult> {
const promises = this.peers.map(async (peer) => { const promises = this.peers.map(async (peer) => {
await this.protocol.unsubscribe(this.pubsubTopic, peer, contentTopics); const response = await this.protocol.unsubscribe(
this.pubsubTopic,
peer,
contentTopics
);
contentTopics.forEach((contentTopic: string) => { contentTopics.forEach((contentTopic: string) => {
this.subscriptionCallbacks.delete(contentTopic); this.subscriptionCallbacks.delete(contentTopic);
}); });
return response;
}); });
const results = await Promise.allSettled(promises); const results = await Promise.allSettled(promises);
const finalResult = this.handleResult(results, "unsubscribe");
this.handleErrors(results, "unsubscribe");
if (this.subscriptionCallbacks.size === 0 && this.keepAliveTimer) { if (this.subscriptionCallbacks.size === 0 && this.keepAliveTimer) {
this.stopKeepAlivePings(); this.stopKeepAlivePings();
} }
return finalResult;
} }
async ping(): Promise<void> { async ping(): Promise<SDKProtocolResult> {
const promises = this.peers.map(async (peer) => { const promises = this.peers.map(async (peer) => this.protocol.ping(peer));
await this.protocol.ping(peer);
});
const results = await Promise.allSettled(promises); const results = await Promise.allSettled(promises);
this.handleErrors(results, "ping"); return this.handleResult(results, "ping");
} }
async unsubscribeAll(): Promise<void> { async unsubscribeAll(): Promise<SDKProtocolResult> {
const promises = this.peers.map(async (peer) => { const promises = this.peers.map(async (peer) =>
await this.protocol.unsubscribeAll(this.pubsubTopic, peer); this.protocol.unsubscribeAll(this.pubsubTopic, peer)
}); );
const results = await Promise.allSettled(promises); const results = await Promise.allSettled(promises);
this.subscriptionCallbacks.clear(); this.subscriptionCallbacks.clear();
this.handleErrors(results, "unsubscribeAll"); const finalResult = this.handleResult(results, "unsubscribeAll");
if (this.keepAliveTimer) { if (this.keepAliveTimer) {
this.stopKeepAlivePings(); this.stopKeepAlivePings();
} }
return finalResult;
} }
async processIncomingMessage(message: WakuMessage): Promise<void> { async processIncomingMessage(message: WakuMessage): Promise<void> {
@ -178,40 +196,32 @@ export class SubscriptionManager {
await pushMessage(subscriptionCallback, this.pubsubTopic, message); await pushMessage(subscriptionCallback, this.pubsubTopic, message);
} }
// Filter out only the rejected promises and extract & handle their reasons private handleResult(
private handleErrors( results: PromiseSettledResult<CoreProtocolResult>[],
results: PromiseSettledResult<void>[],
type: "ping" | "subscribe" | "unsubscribe" | "unsubscribeAll" type: "ping" | "subscribe" | "unsubscribe" | "unsubscribeAll"
): void { ): SDKProtocolResult {
const errors = results const result: SDKProtocolResult = { failures: [], successes: [] };
.filter(
(result): result is PromiseRejectedResult =>
result.status === "rejected"
)
.map((rejectedResult) => rejectedResult.reason);
if (errors.length === this.peers.length) { for (const promiseResult of results) {
const errorCounts = new Map<string, number>(); if (promiseResult.status === "rejected") {
// TODO: streamline error logging with https://github.com/orgs/waku-org/projects/2/views/1?pane=issue&itemId=42849952 log.error(
errors.forEach((error) => { `Failed to resolve ${type} promise successfully: `,
const message = error instanceof Error ? error.message : String(error); promiseResult.reason
errorCounts.set(message, (errorCounts.get(message) || 0) + 1); );
}); result.failures.push({ error: ProtocolError.GENERIC_FAIL });
} else {
const uniqueErrorMessages = Array.from( const coreResult = promiseResult.value;
errorCounts, if (coreResult.failure) {
([message, count]) => `${message} (occurred ${count} times)` result.failures.push(coreResult.failure);
).join(", "); } else {
throw new Error(`Error ${type} all peers: ${uniqueErrorMessages}`); result.successes.push(coreResult.success);
} else if (errors.length > 0) { }
// TODO: handle renewing faulty peers with new peers (https://github.com/waku-org/js-waku/issues/1463) }
log.warn(
`Some ${type} failed. These will be refreshed with new peers`,
errors
);
} else {
log.info(`${type} successful for all peers`);
} }
// TODO: handle renewing faulty peers with new peers (https://github.com/waku-org/js-waku/issues/1463)
return result;
} }
private startKeepAlivePings(interval: number): void { private startKeepAlivePings(interval: number): void {
@ -297,7 +307,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
*/ */
async createSubscription( async createSubscription(
pubsubTopicShardInfo: ShardingParams | PubsubTopic pubsubTopicShardInfo: ShardingParams | PubsubTopic
): Promise<SubscriptionManager> { ): Promise<CreateSubscriptionResult> {
const pubsubTopic = const pubsubTopic =
typeof pubsubTopicShardInfo == "string" typeof pubsubTopicShardInfo == "string"
? pubsubTopicShardInfo ? pubsubTopicShardInfo
@ -305,9 +315,21 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics); ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics);
const peers = await this.protocol.getPeers(); let peers: Peer[] = [];
try {
peers = await this.protocol.getPeers();
} catch (error) {
log.error("Error getting peers to initiate subscription: ", error);
return {
error: ProtocolError.GENERIC_FAIL,
subscription: null
};
}
if (peers.length === 0) { if (peers.length === 0) {
throw new Error("No peer found to initiate subscription."); return {
error: ProtocolError.NO_PEER_AVAILABLE,
subscription: null
};
} }
log.info( log.info(
@ -322,7 +344,10 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
new SubscriptionManager(pubsubTopic, peers, this.protocol) new SubscriptionManager(pubsubTopic, peers, this.protocol)
); );
return subscription; return {
error: null,
subscription
};
} }
//TODO: remove this dependency on IReceiver //TODO: remove this dependency on IReceiver
@ -346,21 +371,27 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
callback: Callback<T>, callback: Callback<T>,
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
): Promise<Unsubscribe> { ): Promise<Unsubscribe> {
const pubsubTopics = this.getPubsubTopics<T>(decoders); const uniquePubsubTopics = this.getUniquePubsubTopics<T>(decoders);
if (pubsubTopics.length === 0) { if (uniquePubsubTopics.length === 0) {
throw Error( throw Error(
"Failed to subscribe: no pubsubTopic found on decoders provided." "Failed to subscribe: no pubsubTopic found on decoders provided."
); );
} }
if (pubsubTopics.length > 1) { if (uniquePubsubTopics.length > 1) {
throw Error( throw Error(
"Failed to subscribe: all decoders should have the same pubsub topic. Use createSubscription to be more agile." "Failed to subscribe: all decoders should have the same pubsub topic. Use createSubscription to be more agile."
); );
} }
const subscription = await this.createSubscription(pubsubTopics[0]); const { subscription, error } = await this.createSubscription(
uniquePubsubTopics[0]
);
if (error) {
throw Error(`Failed to create subscription: ${error}`);
}
await subscription.subscribe(decoders, callback, options); await subscription.subscribe(decoders, callback, options);
@ -381,7 +412,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
return toAsyncIterator(this, decoders); return toAsyncIterator(this, decoders);
} }
private getPubsubTopics<T extends IDecodedMessage>( private getUniquePubsubTopics<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[] decoders: IDecoder<T> | IDecoder<T>[]
): string[] { ): string[] {
if (!Array.isArray(decoders)) { if (!Array.isArray(decoders)) {

View File

@ -8,7 +8,7 @@ import {
type Libp2p, type Libp2p,
type ProtocolCreateOptions, type ProtocolCreateOptions,
ProtocolError, ProtocolError,
type SendResult SDKProtocolResult
} from "@waku/interfaces"; } from "@waku/interfaces";
import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils"; import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils";
@ -24,7 +24,7 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
this.protocol = new LightPushCore(libp2p, options); this.protocol = new LightPushCore(libp2p, options);
} }
async send(encoder: IEncoder, message: IMessage): Promise<SendResult> { async send(encoder: IEncoder, message: IMessage): Promise<SDKProtocolResult> {
const successes: PeerId[] = []; const successes: PeerId[] = [];
const failures: Failure[] = []; const failures: Failure[] = [];

View File

@ -3,7 +3,7 @@ import { createDecoder, DecodedMessage, waitForRemotePeer } from "@waku/core";
import { import {
Callback, Callback,
IDecoder, IDecoder,
IFilterSubscription, ISubscriptionSDK,
LightNode, LightNode,
Protocols Protocols
} from "@waku/interfaces"; } from "@waku/interfaces";
@ -27,7 +27,7 @@ async function prepareSubscription(
peer: Multiaddr peer: Multiaddr
): Promise<{ ): Promise<{
decoder: IDecoder<DecodedMessage>; decoder: IDecoder<DecodedMessage>;
subscription: IFilterSubscription; subscription: ISubscriptionSDK;
}> { }> {
// Validate that the Waku node matches assumptions // Validate that the Waku node matches assumptions
if (!waku.filter) { if (!waku.filter) {
@ -52,7 +52,10 @@ async function prepareSubscription(
// Create decoder and subscription // Create decoder and subscription
let decoder = createDecoder(contentTopic, pubsubTopic); let decoder = createDecoder(contentTopic, pubsubTopic);
if (decoder) decoder = decoder ?? decoder; if (decoder) decoder = decoder ?? decoder;
const subscription = await waku.filter.createSubscription(pubsubTopic); const { subscription, error } =
await waku.filter.createSubscription(pubsubTopic);
if (error)
throw new Error("Failed to create subscription for content topic.");
return { decoder, subscription }; return { decoder, subscription };
} }
@ -86,10 +89,11 @@ export async function streamContentTopic(
controller.enqueue(message); controller.enqueue(message);
}); });
}, },
cancel() { async cancel() {
return subscription.unsubscribe([contentTopic]); await subscription.unsubscribe([contentTopic]);
} }
}); });
return [messageStream, opts.waku]; return [messageStream, opts.waku];
} }
@ -105,7 +109,7 @@ export async function subscribeToContentTopic(
contentTopic: string, contentTopic: string,
callback: Callback<DecodedMessage>, callback: Callback<DecodedMessage>,
opts: CreateTopicOptions opts: CreateTopicOptions
): Promise<{ subscription: IFilterSubscription; waku: LightNode }> { ): Promise<{ subscription: ISubscriptionSDK; waku: LightNode }> {
opts.waku = opts.waku =
opts.waku ?? opts.waku ??
(await createLightNode({ (await createLightNode({

View File

@ -5,10 +5,10 @@ import { ConnectionManager, DecodedMessage } from "@waku/core";
import type { import type {
Callback, Callback,
IFilterSDK, IFilterSDK,
IFilterSubscription,
ILightPushSDK, ILightPushSDK,
IRelay, IRelay,
IStoreSDK, IStoreSDK,
ISubscriptionSDK,
Libp2p, Libp2p,
LightNode, LightNode,
ProtocolCreateOptions, ProtocolCreateOptions,
@ -193,7 +193,7 @@ export class WakuNode implements Waku {
contentTopic: string, contentTopic: string,
peer: Multiaddr, peer: Multiaddr,
callback: Callback<DecodedMessage> callback: Callback<DecodedMessage>
): Promise<IFilterSubscription> { ): Promise<ISubscriptionSDK> {
return ( return (
await subscribeToContentTopic(contentTopic, callback, { await subscribeToContentTopic(contentTopic, callback, {
waku: this as LightNode, waku: this as LightNode,

View File

@ -50,8 +50,11 @@ function clean(str: string): string {
return str.replace(/ /g, "_").replace(/[':()/]/g, ""); return str.replace(/ /g, "_").replace(/[':()/]/g, "");
} }
export function makeLogFileName(ctx: Context): string { export function makeLogFileName(ctx: Context | undefined): string {
const unitTest = ctx?.currentTest ? ctx!.currentTest : ctx.test; if (!ctx) {
return "unknown";
}
const unitTest = ctx.currentTest ? ctx.currentTest : ctx.test;
let name = clean(unitTest!.title); let name = clean(unitTest!.title);
let suite = unitTest?.parent; let suite = unitTest?.parent;

View File

@ -4,7 +4,7 @@ import {
DecodedMessage, DecodedMessage,
waitForRemotePeer waitForRemotePeer
} from "@waku/core"; } from "@waku/core";
import { IFilterSubscription, Protocols } from "@waku/interfaces"; import { ISubscriptionSDK, Protocols } from "@waku/interfaces";
import type { LightNode } from "@waku/interfaces"; import type { LightNode } from "@waku/interfaces";
import { import {
generatePrivateKey, generatePrivateKey,
@ -83,7 +83,7 @@ describe("Waku Message Ephemeral field", function () {
let waku: LightNode; let waku: LightNode;
let nwaku: ServiceNode; let nwaku: ServiceNode;
let subscription: IFilterSubscription; let subscription: ISubscriptionSDK;
afterEachCustom(this, async () => { afterEachCustom(this, async () => {
await tearDownNodes(nwaku, waku); await tearDownNodes(nwaku, waku);
@ -123,9 +123,10 @@ describe("Waku Message Ephemeral field", function () {
Protocols.Store Protocols.Store
]); ]);
subscription = await waku.filter.createSubscription( const { error, subscription: _subscription } =
TestEncoder.pubsubTopic await waku.filter.createSubscription(TestEncoder.pubsubTopic);
); if (error) throw error;
subscription = _subscription;
}); });
it("Ephemeral messages are not stored", async function () { it("Ephemeral messages are not stored", async function () {

View File

@ -1,4 +1,4 @@
import { IFilterSubscription, LightNode } from "@waku/interfaces"; import { ISubscriptionSDK, LightNode } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk"; import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai"; import { expect } from "chai";
@ -24,11 +24,14 @@ const runTests = (strictCheckNodes: boolean): void => {
this.timeout(10000); this.timeout(10000);
let waku: LightNode; let waku: LightNode;
let serviceNodes: ServiceNodesFleet; let serviceNodes: ServiceNodesFleet;
let subscription: IFilterSubscription; let subscription: ISubscriptionSDK;
beforeEachCustom(this, async () => { beforeEachCustom(this, async () => {
[serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo); [serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo);
subscription = await waku.filter.createSubscription(TestShardInfo); const { error, subscription: _subscription } =
await waku.filter.createSubscription(TestShardInfo);
if (error) throw error;
subscription = _subscription;
}); });
afterEachCustom(this, async () => { afterEachCustom(this, async () => {

View File

@ -1,5 +1,5 @@
import { waitForRemotePeer } from "@waku/core"; import { waitForRemotePeer } from "@waku/core";
import { IFilterSubscription, LightNode, Protocols } from "@waku/interfaces"; import { ISubscriptionSDK, LightNode, Protocols } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk"; import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai"; import { expect } from "chai";
@ -29,11 +29,15 @@ const runTests = (strictCheckNodes: boolean): void => {
this.timeout(10000); this.timeout(10000);
let waku: LightNode; let waku: LightNode;
let serviceNodes: ServiceNodesFleet; let serviceNodes: ServiceNodesFleet;
let subscription: IFilterSubscription; let subscription: ISubscriptionSDK;
beforeEachCustom(this, async () => { beforeEachCustom(this, async () => {
[serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo); [serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo);
subscription = await waku.filter.createSubscription(TestShardInfo);
const { error, subscription: _subscription } =
await waku.filter.createSubscription(TestShardInfo);
if (error) throw error;
subscription = _subscription;
}); });
afterEachCustom(this, async () => { afterEachCustom(this, async () => {
@ -238,7 +242,10 @@ const runTests = (strictCheckNodes: boolean): void => {
await waku.dial(await node.getMultiaddrWithId()); await waku.dial(await node.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
} }
subscription = await waku.filter.createSubscription(TestShardInfo); const { error, subscription: _subscription } =
await waku.filter.createSubscription(TestShardInfo);
if (error) throw error;
subscription = _subscription;
await subscription.subscribe( await subscription.subscribe(
[TestDecoder], [TestDecoder],
serviceNodes.messageCollector.callback serviceNodes.messageCollector.callback

View File

@ -1,7 +1,7 @@
import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core";
import type { import type {
ContentTopicInfo, ContentTopicInfo,
IFilterSubscription, ISubscriptionSDK,
LightNode, LightNode,
ShardInfo, ShardInfo,
SingleShardInfo SingleShardInfo
@ -32,7 +32,7 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () {
let waku: LightNode; let waku: LightNode;
let nwaku: ServiceNode; let nwaku: ServiceNode;
let nwaku2: ServiceNode; let nwaku2: ServiceNode;
let subscription: IFilterSubscription; let subscription: ISubscriptionSDK;
let messageCollector: MessageCollector; let messageCollector: MessageCollector;
const customPubsubTopic1 = singleShardInfoToPubsubTopic({ const customPubsubTopic1 = singleShardInfoToPubsubTopic({
@ -61,7 +61,12 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () {
beforeEachCustom(this, async () => { beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, shardInfo); [nwaku, waku] = await runNodes(this.ctx, shardInfo);
subscription = await waku.filter.createSubscription(shardInfo);
const { error, subscription: _subscription } =
await waku.filter.createSubscription(shardInfo);
if (error) throw error;
subscription = _subscription;
messageCollector = new MessageCollector(); messageCollector = new MessageCollector();
}); });
@ -84,8 +89,11 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () {
await subscription.subscribe([customDecoder1], messageCollector.callback); await subscription.subscribe([customDecoder1], messageCollector.callback);
// Subscribe from the same lightnode to the 2nd pubsubtopic // Subscribe from the same lightnode to the 2nd pubsubtopic
const subscription2 = const { error, subscription: subscription2 } =
await waku.filter.createSubscription(customPubsubTopic2); await waku.filter.createSubscription(customPubsubTopic2);
if (error) {
throw error;
}
const messageCollector2 = new MessageCollector(); const messageCollector2 = new MessageCollector();
@ -126,8 +134,13 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () {
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
// Subscribe from the same lightnode to the new nwaku on the new pubsubtopic // Subscribe from the same lightnode to the new nwaku on the new pubsubtopic
const subscription2 = const { error, subscription: subscription2 } =
await waku.filter.createSubscription(customPubsubTopic2); await waku.filter.createSubscription(customPubsubTopic2);
if (error) {
throw error;
}
await nwaku2.ensureSubscriptions([customPubsubTopic2]); await nwaku2.ensureSubscriptions([customPubsubTopic2]);
const messageCollector2 = new MessageCollector(); const messageCollector2 = new MessageCollector();
@ -180,7 +193,7 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () {
let waku: LightNode; let waku: LightNode;
let nwaku: ServiceNode; let nwaku: ServiceNode;
let nwaku2: ServiceNode; let nwaku2: ServiceNode;
let subscription: IFilterSubscription; let subscription: ISubscriptionSDK;
let messageCollector: MessageCollector; let messageCollector: MessageCollector;
const customContentTopic1 = "/waku/2/content/utf8"; const customContentTopic1 = "/waku/2/content/utf8";
@ -222,9 +235,10 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () {
beforeEachCustom(this, async () => { beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, contentTopicInfo); [nwaku, waku] = await runNodes(this.ctx, contentTopicInfo);
subscription = await waku.filter.createSubscription( const { error, subscription: _subscription } =
autoshardingPubsubTopic1 await waku.filter.createSubscription(autoshardingPubsubTopic1);
); if (error) throw error;
subscription = _subscription;
messageCollector = new MessageCollector(); messageCollector = new MessageCollector();
}); });
@ -251,9 +265,12 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () {
await subscription.subscribe([customDecoder1], messageCollector.callback); await subscription.subscribe([customDecoder1], messageCollector.callback);
// Subscribe from the same lightnode to the 2nd pubsubtopic // Subscribe from the same lightnode to the 2nd pubsubtopic
const subscription2 = await waku.filter.createSubscription( const { error, subscription: subscription2 } =
autoshardingPubsubTopic2 await waku.filter.createSubscription(autoshardingPubsubTopic2);
);
if (error) {
throw error;
}
const messageCollector2 = new MessageCollector(); const messageCollector2 = new MessageCollector();
@ -303,9 +320,13 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () {
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
// Subscribe from the same lightnode to the new nwaku on the new pubsubtopic // Subscribe from the same lightnode to the new nwaku on the new pubsubtopic
const subscription2 = await waku.filter.createSubscription( const { error, subscription: subscription2 } =
autoshardingPubsubTopic2 await waku.filter.createSubscription(autoshardingPubsubTopic2);
);
if (error) {
throw error;
}
await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]); await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]);
const messageCollector2 = new MessageCollector(); const messageCollector2 = new MessageCollector();
@ -357,7 +378,7 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () {
let waku: LightNode; let waku: LightNode;
let nwaku: ServiceNode; let nwaku: ServiceNode;
let nwaku2: ServiceNode; let nwaku2: ServiceNode;
let subscription: IFilterSubscription; let subscription: ISubscriptionSDK;
let messageCollector: MessageCollector; let messageCollector: MessageCollector;
const customPubsubTopic1 = singleShardInfoToPubsubTopic({ const customPubsubTopic1 = singleShardInfoToPubsubTopic({
@ -387,7 +408,11 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () {
beforeEachCustom(this, async () => { beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, shardInfo); [nwaku, waku] = await runNodes(this.ctx, shardInfo);
subscription = await waku.filter.createSubscription(customPubsubTopic1); const { error, subscription: _subscription } =
await waku.filter.createSubscription(customPubsubTopic1);
if (error) throw error;
subscription = _subscription;
messageCollector = new MessageCollector(); messageCollector = new MessageCollector();
}); });
@ -410,8 +435,11 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () {
await subscription.subscribe([customDecoder1], messageCollector.callback); await subscription.subscribe([customDecoder1], messageCollector.callback);
// Subscribe from the same lightnode to the 2nd pubsubtopic // Subscribe from the same lightnode to the 2nd pubsubtopic
const subscription2 = const { error, subscription: subscription2 } =
await waku.filter.createSubscription(customPubsubTopic2); await waku.filter.createSubscription(customPubsubTopic2);
if (error) {
throw error;
}
const messageCollector2 = new MessageCollector(); const messageCollector2 = new MessageCollector();
@ -452,8 +480,11 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () {
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
// Subscribe from the same lightnode to the new nwaku on the new pubsubtopic // Subscribe from the same lightnode to the new nwaku on the new pubsubtopic
const subscription2 = const { error, subscription: subscription2 } =
await waku.filter.createSubscription(customPubsubTopic2); await waku.filter.createSubscription(customPubsubTopic2);
if (error) {
throw error;
}
await nwaku2.ensureSubscriptions([customPubsubTopic2]); await nwaku2.ensureSubscriptions([customPubsubTopic2]);
const messageCollector2 = new MessageCollector(); const messageCollector2 = new MessageCollector();

View File

@ -1,4 +1,4 @@
import { IFilterSubscription, LightNode } from "@waku/interfaces"; import { ISubscriptionSDK, LightNode } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk"; import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai"; import { expect } from "chai";
@ -24,12 +24,16 @@ describe("Waku Filter V2: Ping", function () {
this.timeout(10000); this.timeout(10000);
let waku: LightNode; let waku: LightNode;
let nwaku: ServiceNode; let nwaku: ServiceNode;
let subscription: IFilterSubscription; let subscription: ISubscriptionSDK;
let messageCollector: MessageCollector; let messageCollector: MessageCollector;
beforeEachCustom(this, async () => { beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, TestShardInfo); [nwaku, waku] = await runNodes(this.ctx, TestShardInfo);
subscription = await waku.filter.createSubscription(TestShardInfo);
const { error, subscription: _subscription } =
await waku.filter.createSubscription(TestShardInfo);
if (error) throw error;
subscription = _subscription;
messageCollector = new MessageCollector(); messageCollector = new MessageCollector();
}); });

View File

@ -1,5 +1,5 @@
import { waitForRemotePeer } from "@waku/core"; import { waitForRemotePeer } from "@waku/core";
import { IFilterSubscription, LightNode, Protocols } from "@waku/interfaces"; import { ISubscriptionSDK, LightNode, Protocols } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk"; import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai"; import { expect } from "chai";
@ -28,12 +28,17 @@ describe("Waku Filter V2: FilterPush", function () {
this.timeout(10000); this.timeout(10000);
let waku: LightNode; let waku: LightNode;
let nwaku: ServiceNode; let nwaku: ServiceNode;
let subscription: IFilterSubscription; let subscription: ISubscriptionSDK;
let messageCollector: MessageCollector; let messageCollector: MessageCollector;
beforeEachCustom(this, async () => { beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, TestShardInfo); [nwaku, waku] = await runNodes(this.ctx, TestShardInfo);
subscription = await waku.filter.createSubscription(TestShardInfo);
const { error, subscription: _subscription } =
await waku.filter.createSubscription(TestShardInfo);
if (error) throw error;
subscription = _subscription;
messageCollector = new MessageCollector(nwaku); messageCollector = new MessageCollector(nwaku);
}); });
@ -219,7 +224,10 @@ describe("Waku Filter V2: FilterPush", function () {
// Redo the connection and create a new subscription // Redo the connection and create a new subscription
await waku.dial(await nwaku.getMultiaddrWithId()); await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
subscription = await waku.filter.createSubscription(); const { error, subscription: _subscription } =
await waku.filter.createSubscription();
if (error) throw error;
subscription = _subscription;
await subscription.subscribe([TestDecoder], messageCollector.callback); await subscription.subscribe([TestDecoder], messageCollector.callback);
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });

View File

@ -1,5 +1,5 @@
import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core";
import { IFilterSubscription, LightNode, Protocols } from "@waku/interfaces"; import { ISubscriptionSDK, LightNode, Protocols } from "@waku/interfaces";
import { import {
ecies, ecies,
generatePrivateKey, generatePrivateKey,
@ -40,14 +40,18 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
let waku2: LightNode; let waku2: LightNode;
let nwaku: ServiceNode; let nwaku: ServiceNode;
let nwaku2: ServiceNode; let nwaku2: ServiceNode;
let subscription: IFilterSubscription; let subscription: ISubscriptionSDK;
let messageCollector: MessageCollector; let messageCollector: MessageCollector;
let ctx: Context; let ctx: Context;
beforeEachCustom(this, async () => { beforeEachCustom(this, async () => {
ctx = this.ctx;
[nwaku, waku] = await runNodes(this.ctx, TestShardInfo); [nwaku, waku] = await runNodes(this.ctx, TestShardInfo);
subscription = await waku.filter.createSubscription(TestShardInfo);
const { error, subscription: _subscription } =
await waku.filter.createSubscription(TestShardInfo);
if (error) throw error;
subscription = _subscription;
messageCollector = new MessageCollector(); messageCollector = new MessageCollector();
await nwaku.ensureSubscriptions([TestPubsubTopic]); await nwaku.ensureSubscriptions([TestPubsubTopic]);
}); });
@ -282,10 +286,15 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic }); const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
try { try {
await subscription.subscribe(td.decoders, messageCollector.callback); const { failures, successes } = await subscription.subscribe(
throw new Error( td.decoders,
`Subscribe to ${topicCount} topics was successful but was expected to fail with a specific error.` messageCollector.callback
); );
if (failures.length === 0 || successes.length > 0) {
throw new Error(
`Subscribe to ${topicCount} topics was successful but was expected to fail with a specific error.`
);
}
} catch (err) { } catch (err) {
if ( if (
err instanceof Error && err instanceof Error &&
@ -387,7 +396,11 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
// Create a second subscription on a different topic // Create a second subscription on a different topic
const subscription2 = await waku.filter.createSubscription(TestShardInfo); const { error, subscription: subscription2 } =
await waku.filter.createSubscription(TestShardInfo);
if (error) {
throw error;
}
const newContentTopic = "/test/2/waku-filter/default"; const newContentTopic = "/test/2/waku-filter/default";
const newEncoder = createEncoder({ const newEncoder = createEncoder({
contentTopic: newContentTopic, contentTopic: newContentTopic,
@ -419,7 +432,11 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
[nwaku2, waku2] = await runNodes(ctx, TestShardInfo); [nwaku2, waku2] = await runNodes(ctx, TestShardInfo);
await waku.dial(await nwaku2.getMultiaddrWithId()); await waku.dial(await nwaku2.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
const subscription2 = await waku.filter.createSubscription(TestShardInfo); const { error, subscription: subscription2 } =
await waku.filter.createSubscription(TestShardInfo);
if (error) {
throw error;
}
await nwaku2.ensureSubscriptions([TestPubsubTopic]); await nwaku2.ensureSubscriptions([TestPubsubTopic]);
// Send a message using the new subscription // Send a message using the new subscription
const newContentTopic = "/test/2/waku-filter/default"; const newContentTopic = "/test/2/waku-filter/default";

View File

@ -1,5 +1,5 @@
import { createDecoder, createEncoder } from "@waku/core"; import { createDecoder, createEncoder } from "@waku/core";
import { IFilterSubscription } from "@waku/interfaces"; import { ISubscriptionSDK } from "@waku/interfaces";
import { LightNode } from "@waku/interfaces"; import { LightNode } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk"; import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai"; import { expect } from "chai";
@ -28,12 +28,16 @@ describe("Waku Filter V2: Unsubscribe", function () {
this.timeout(10000); this.timeout(10000);
let waku: LightNode; let waku: LightNode;
let nwaku: ServiceNode; let nwaku: ServiceNode;
let subscription: IFilterSubscription; let subscription: ISubscriptionSDK;
let messageCollector: MessageCollector; let messageCollector: MessageCollector;
beforeEachCustom(this, async () => { beforeEachCustom(this, async () => {
[nwaku, waku] = await runNodes(this.ctx, TestShardInfo); [nwaku, waku] = await runNodes(this.ctx, TestShardInfo);
subscription = await waku.filter.createSubscription(TestShardInfo);
const { error, subscription: _subscription } =
await waku.filter.createSubscription(TestShardInfo);
if (error) throw error;
subscription = _subscription;
messageCollector = new MessageCollector(); messageCollector = new MessageCollector();
await nwaku.ensureSubscriptions([TestPubsubTopic]); await nwaku.ensureSubscriptions([TestPubsubTopic]);
}); });

View File

@ -1,5 +1,5 @@
import { createDecoder, createEncoder } from "@waku/core"; import { createDecoder, createEncoder } from "@waku/core";
import { IFilterSubscription, LightNode } from "@waku/interfaces"; import { ISubscriptionSDK, LightNode } from "@waku/interfaces";
import { import {
ecies, ecies,
generatePrivateKey, generatePrivateKey,
@ -36,7 +36,7 @@ const runTests = (strictCheckNodes: boolean): void => {
this.timeout(100000); this.timeout(100000);
let waku: LightNode; let waku: LightNode;
let serviceNodes: ServiceNodesFleet; let serviceNodes: ServiceNodesFleet;
let subscription: IFilterSubscription; let subscription: ISubscriptionSDK;
beforeEachCustom(this, async () => { beforeEachCustom(this, async () => {
[serviceNodes, waku] = await runMultipleNodes( [serviceNodes, waku] = await runMultipleNodes(
@ -44,7 +44,12 @@ const runTests = (strictCheckNodes: boolean): void => {
TestShardInfo, TestShardInfo,
strictCheckNodes strictCheckNodes
); );
subscription = await waku.filter.createSubscription(TestShardInfo); const { error, subscription: _subscription } =
await waku.filter.createSubscription(TestShardInfo);
if (!error) {
subscription = _subscription;
}
}); });
afterEachCustom(this, async () => { afterEachCustom(this, async () => {
@ -330,13 +335,15 @@ const runTests = (strictCheckNodes: boolean): void => {
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic }); const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
try { try {
await subscription.subscribe( const { failures, successes } = await subscription.subscribe(
td.decoders, td.decoders,
serviceNodes.messageCollector.callback serviceNodes.messageCollector.callback
); );
throw new Error( if (failures.length === 0 || successes.length > 0) {
`Subscribe to ${topicCount} topics was successful but was expected to fail with a specific error.` throw new Error(
); `Subscribe to ${topicCount} topics was successful but was expected to fail with a specific error.`
);
}
} catch (err) { } catch (err) {
if ( if (
err instanceof Error && err instanceof Error &&
@ -461,7 +468,11 @@ const runTests = (strictCheckNodes: boolean): void => {
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
// Create a second subscription on a different topic // Create a second subscription on a different topic
const subscription2 = await waku.filter.createSubscription(TestShardInfo); const { error, subscription: subscription2 } =
await waku.filter.createSubscription(TestShardInfo);
if (error) {
throw error;
}
const newContentTopic = "/test/2/waku-filter/default"; const newContentTopic = "/test/2/waku-filter/default";
const newEncoder = createEncoder({ const newEncoder = createEncoder({
contentTopic: newContentTopic, contentTopic: newContentTopic,

View File

@ -1,5 +1,5 @@
import { createDecoder, createEncoder } from "@waku/core"; import { createDecoder, createEncoder } from "@waku/core";
import { IFilterSubscription, LightNode } from "@waku/interfaces"; import { ISubscriptionSDK, LightNode } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk"; import { utf8ToBytes } from "@waku/sdk";
import { expect } from "chai"; import { expect } from "chai";
@ -28,18 +28,22 @@ const runTests = (strictCheckNodes: boolean): void => {
this.timeout(10000); this.timeout(10000);
let waku: LightNode; let waku: LightNode;
let serviceNodes: ServiceNodesFleet; let serviceNodes: ServiceNodesFleet;
let subscription: IFilterSubscription; let subscription: ISubscriptionSDK;
beforeEachCustom(this, async () => { beforeEachCustom(this, async () => {
[serviceNodes, waku] = await runMultipleNodes(this.ctx, { [serviceNodes, waku] = await runMultipleNodes(this.ctx, {
contentTopics: [TestContentTopic], contentTopics: [TestContentTopic],
clusterId: ClusterId clusterId: ClusterId
}); });
const { error, subscription: _subscription } =
await waku.filter.createSubscription({
contentTopics: [TestContentTopic],
clusterId: ClusterId
});
subscription = await waku.filter.createSubscription({ if (!error) {
contentTopics: [TestContentTopic], subscription = _subscription;
clusterId: ClusterId }
});
}); });
afterEachCustom(this, async () => { afterEachCustom(this, async () => {

View File

@ -1,6 +1,6 @@
import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core";
import { import {
IFilterSubscription, ISubscriptionSDK,
LightNode, LightNode,
ProtocolCreateOptions, ProtocolCreateOptions,
Protocols, Protocols,
@ -45,13 +45,15 @@ export const messagePayload = { payload: utf8ToBytes(messageText) };
// Utility to validate errors related to pings in the subscription. // Utility to validate errors related to pings in the subscription.
export async function validatePingError( export async function validatePingError(
subscription: IFilterSubscription subscription: ISubscriptionSDK
): Promise<void> { ): Promise<void> {
try { try {
await subscription.ping(); const { failures, successes } = await subscription.ping();
throw new Error( if (failures.length === 0 || successes.length > 0) {
"Ping was successful but was expected to fail with a specific error." throw new Error(
); "Ping was successful but was expected to fail with a specific error."
);
}
} catch (err) { } catch (err) {
if ( if (
err instanceof Error && err instanceof Error &&

View File

@ -7,7 +7,7 @@ import {
WakuPeerExchange, WakuPeerExchange,
wakuPeerExchangeDiscovery wakuPeerExchangeDiscovery
} from "@waku/discovery"; } from "@waku/discovery";
import type { LightNode, PeerExchangeResult } from "@waku/interfaces"; import type { LightNode, PeerExchangeQueryResult } from "@waku/interfaces";
import { createLightNode, Libp2pComponents, ProtocolError } from "@waku/sdk"; import { createLightNode, Libp2pComponents, ProtocolError } from "@waku/sdk";
import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils"; import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils";
import { expect } from "chai"; import { expect } from "chai";
@ -38,7 +38,7 @@ describe("Peer Exchange Query", function () {
let components: Libp2pComponents; let components: Libp2pComponents;
let peerExchange: WakuPeerExchange; let peerExchange: WakuPeerExchange;
let numPeersToRequest: number; let numPeersToRequest: number;
let queryResult: PeerExchangeResult; let queryResult: PeerExchangeQueryResult;
beforeEachCustom( beforeEachCustom(
this, this,
@ -99,7 +99,7 @@ describe("Peer Exchange Query", function () {
peerId: nwaku3PeerId, peerId: nwaku3PeerId,
numPeers: numPeersToRequest numPeers: numPeersToRequest
}), }),
new Promise<PeerExchangeResult>((resolve) => new Promise<PeerExchangeQueryResult>((resolve) =>
setTimeout( setTimeout(
() => () =>
resolve({ resolve({