diff --git a/package-lock.json b/package-lock.json index 2a7e4435fa..913acc2b8e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -27311,6 +27311,7 @@ "@waku/core": "0.0.22", "@waku/dns-discovery": "0.0.16", "@waku/interfaces": "0.0.17", + "@waku/peer-exchange": "^0.0.15", "@waku/relay": "0.0.5", "@waku/utils": "0.0.10", "libp2p": "^0.46.8" @@ -27629,6 +27630,7 @@ "version": "0.0.10", "license": "MIT OR Apache-2.0", "dependencies": { + "@waku/interfaces": "^0.0.17", "debug": "^4.3.4", "uint8arrays": "^4.0.4" }, @@ -31390,6 +31392,7 @@ "@waku/core": "0.0.22", "@waku/dns-discovery": "0.0.16", "@waku/interfaces": "0.0.17", + "@waku/peer-exchange": "^0.0.15", "@waku/relay": "0.0.5", "@waku/utils": "0.0.10", "cspell": "^7.3.2", diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 8efdf07e23..51035e909d 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -15,15 +15,11 @@ export * as waku_filter from "./lib/filter/index.js"; export { wakuFilter, FilterCodecs } from "./lib/filter/index.js"; export * as waku_light_push from "./lib/light_push/index.js"; -export { wakuLightPush, LightPushCodec } from "./lib/light_push/index.js"; +export { wakuLightPush } from "./lib/light_push/index.js"; export * as waku_store from "./lib/store/index.js"; -export { - PageDirection, - wakuStore, - StoreCodec, - createCursor -} from "./lib/store/index.js"; + +export { PageDirection, wakuStore, createCursor } from "./lib/store/index.js"; export { waitForRemotePeer } from "./lib/wait_for_remote_peer.js"; diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index 22497b425b..1546fda480 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -5,6 +5,7 @@ import { Peer, PeerStore } from "@libp2p/interface/peer-store"; import type { IBaseProtocol, Libp2pComponents } from "@waku/interfaces"; import { getPeersForProtocol, selectPeerForProtocol } from "@waku/utils/libp2p"; +import { filterPeers } from "./filterPeers.js"; import { StreamManager } from "./stream_manager.js"; /** @@ -60,4 +61,32 @@ export class BaseProtocol implements IBaseProtocol { ); return peer; } + + /** + * Retrieves a list of peers based on the specified criteria. + * + * @param numPeers - The total number of peers to retrieve. If 0, all peers are returned. + * @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve. + * @returns A Promise that resolves to an array of peers based on the specified criteria. + */ + protected async getPeers( + { + numPeers, + maxBootstrapPeers + }: { + numPeers: number; + maxBootstrapPeers: number; + } = { + maxBootstrapPeers: 1, + numPeers: 0 + } + ): Promise { + // Retrieve all peers that support the protocol + const allPeersForProtocol = await getPeersForProtocol(this.peerStore, [ + this.multicodec + ]); + + // Filter the peers based on the specified criteria + return filterPeers(allPeersForProtocol, numPeers, maxBootstrapPeers); + } } diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index dae27919b2..06adb7442a 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -1,5 +1,4 @@ import { Stream } from "@libp2p/interface/connection"; -import type { PeerId } from "@libp2p/interface/peer-id"; import type { Peer } from "@libp2p/interface/peer-store"; import type { IncomingStreamData } from "@libp2p/interface-internal/registrar"; import type { @@ -14,7 +13,6 @@ import type { Libp2p, PeerIdStr, ProtocolCreateOptions, - ProtocolOptions, PubSubTopic, Unsubscribe } from "@waku/interfaces"; @@ -228,6 +226,7 @@ class Subscription { class Filter extends BaseProtocol implements IReceiver { private readonly options: ProtocolCreateOptions; private activeSubscriptions = new Map(); + private readonly NUM_PEERS_PROTOCOL = 1; private getActiveSubscription( pubSubTopic: PubSubTopic, @@ -257,14 +256,16 @@ class Filter extends BaseProtocol implements IReceiver { this.options = options ?? {}; } - async createSubscription( - pubSubTopic?: string, - peerId?: PeerId - ): Promise { + async createSubscription(pubSubTopic?: string): Promise { const _pubSubTopic = pubSubTopic ?? this.options.pubSubTopic ?? DefaultPubSubTopic; - const peer = await this.getPeer(peerId); + const peer = ( + await this.getPeers({ + maxBootstrapPeers: 1, + numPeers: this.NUM_PEERS_PROTOCOL + }) + )[0]; const subscription = this.getActiveSubscription(_pubSubTopic, peer.id.toString()) ?? @@ -278,10 +279,9 @@ class Filter extends BaseProtocol implements IReceiver { } public toSubscriptionIterator( - decoders: IDecoder | IDecoder[], - opts?: ProtocolOptions | undefined + decoders: IDecoder | IDecoder[] ): Promise> { - return toAsyncIterator(this, decoders, opts); + return toAsyncIterator(this, decoders); } /** @@ -301,10 +301,9 @@ class Filter extends BaseProtocol implements IReceiver { */ async subscribe( decoders: IDecoder | IDecoder[], - callback: Callback, - opts?: ProtocolOptions + callback: Callback ): Promise { - const subscription = await this.createSubscription(undefined, opts?.peerId); + const subscription = await this.createSubscription(); await subscription.subscribe(decoders, callback); diff --git a/packages/core/src/lib/filterPeers.spec.ts b/packages/core/src/lib/filterPeers.spec.ts new file mode 100644 index 0000000000..de51da5593 --- /dev/null +++ b/packages/core/src/lib/filterPeers.spec.ts @@ -0,0 +1,144 @@ +import { Peer } from "@libp2p/interface/peer-store"; +import type { Tag } from "@libp2p/interface/peer-store"; +import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; +import { Tags } from "@waku/interfaces"; +import { expect } from "chai"; + +import { filterPeers } from "./filterPeers.js"; + +describe("filterPeers function", function () { + it("should return all peers when numPeers is 0", async function () { + const peer1 = await createSecp256k1PeerId(); + const peer2 = await createSecp256k1PeerId(); + const peer3 = await createSecp256k1PeerId(); + + const mockPeers = [ + { + id: peer1, + tags: new Map([[Tags.BOOTSTRAP, { value: 100 }]]) + }, + { + id: peer2, + tags: new Map([[Tags.BOOTSTRAP, { value: 100 }]]) + }, + { + id: peer3, + tags: new Map([[Tags.PEER_EXCHANGE, { value: 100 }]]) + } + ] as unknown as Peer[]; + + const result = await filterPeers(mockPeers, 0, 10); + expect(result.length).to.deep.equal(mockPeers.length); + }); + + it("should return all non-bootstrap peers and no bootstrap peer when numPeers is 0 and maxBootstrapPeers is 0", async function () { + const peer1 = await createSecp256k1PeerId(); + const peer2 = await createSecp256k1PeerId(); + const peer3 = await createSecp256k1PeerId(); + const peer4 = await createSecp256k1PeerId(); + + const mockPeers = [ + { + id: peer1, + tags: new Map([[Tags.BOOTSTRAP, { value: 100 }]]) + }, + { + id: peer2, + tags: new Map([[Tags.BOOTSTRAP, { value: 100 }]]) + }, + { + id: peer3, + tags: new Map([[Tags.PEER_EXCHANGE, { value: 100 }]]) + }, + { + id: peer4, + tags: new Map([[Tags.PEER_EXCHANGE, { value: 100 }]]) + } + ] as unknown as Peer[]; + + const result = await filterPeers(mockPeers, 0, 0); + + // result should have no bootstrap peers, and a total of 2 peers + expect(result.length).to.equal(2); + expect( + result.filter((peer: Peer) => peer.tags.has(Tags.BOOTSTRAP)).length + ).to.equal(0); + }); + + it("should return one bootstrap peer, and all non-boostrap peers, when numPeers is 0 & maxBootstrap is 1", async function () { + const peer1 = await createSecp256k1PeerId(); + const peer2 = await createSecp256k1PeerId(); + const peer3 = await createSecp256k1PeerId(); + const peer4 = await createSecp256k1PeerId(); + const peer5 = await createSecp256k1PeerId(); + + const mockPeers = [ + { + id: peer1, + tags: new Map([[Tags.BOOTSTRAP, { value: 100 }]]) + }, + { + id: peer2, + tags: new Map([[Tags.BOOTSTRAP, { value: 100 }]]) + }, + { + id: peer3, + tags: new Map([[Tags.PEER_EXCHANGE, { value: 100 }]]) + }, + { + id: peer4, + tags: new Map([[Tags.PEER_EXCHANGE, { value: 100 }]]) + }, + { + id: peer5, + tags: new Map([[Tags.PEER_EXCHANGE, { value: 100 }]]) + } + ] as unknown as Peer[]; + + const result = await filterPeers(mockPeers, 0, 1); + + // result should have 1 bootstrap peers, and a total of 4 peers + expect(result.length).to.equal(4); + expect( + result.filter((peer: Peer) => peer.tags.has(Tags.BOOTSTRAP)).length + ).to.equal(1); + }); + + it("should return only bootstrap peers up to maxBootstrapPeers", async function () { + const peer1 = await createSecp256k1PeerId(); + const peer2 = await createSecp256k1PeerId(); + const peer3 = await createSecp256k1PeerId(); + const peer4 = await createSecp256k1PeerId(); + const peer5 = await createSecp256k1PeerId(); + + const mockPeers = [ + { + id: peer1, + tags: new Map([[Tags.BOOTSTRAP, { value: 100 }]]) + }, + { + id: peer2, + tags: new Map([[Tags.BOOTSTRAP, { value: 100 }]]) + }, + { + id: peer3, + tags: new Map([[Tags.BOOTSTRAP, { value: 100 }]]) + }, + { + id: peer4, + tags: new Map([[Tags.PEER_EXCHANGE, { value: 100 }]]) + }, + { + id: peer5, + tags: new Map([[Tags.PEER_EXCHANGE, { value: 100 }]]) + } + ] as unknown as Peer[]; + + const result = await filterPeers(mockPeers, 5, 2); + + // check that result has at least 2 bootstrap peers and no more than 5 peers + expect(result.length).to.be.at.least(2); + expect(result.length).to.be.at.most(5); + expect(result.filter((peer: Peer) => peer.tags.has(Tags.BOOTSTRAP)).length); + }); +}); diff --git a/packages/core/src/lib/filterPeers.ts b/packages/core/src/lib/filterPeers.ts new file mode 100644 index 0000000000..298c9f15bc --- /dev/null +++ b/packages/core/src/lib/filterPeers.ts @@ -0,0 +1,43 @@ +import { Peer } from "@libp2p/interface/peer-store"; +import { Tags } from "@waku/interfaces"; + +/** + * Retrieves a list of peers based on the specified criteria. + * + * @param peers - The list of peers to filter from. + * @param numPeers - The total number of peers to retrieve. If 0, all peers are returned. + * @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve. + * @returns A Promise that resolves to an array of peers based on the specified criteria. + */ +export async function filterPeers( + peers: Peer[], + numPeers: number, + maxBootstrapPeers: number +): Promise { + // Collect the bootstrap peers up to the specified maximum + const bootstrapPeers = peers + .filter((peer) => peer.tags.has(Tags.BOOTSTRAP)) + .slice(0, maxBootstrapPeers); + + // Collect non-bootstrap peers + const nonBootstrapPeers = peers.filter( + (peer) => !peer.tags.has(Tags.BOOTSTRAP) + ); + + // If numPeers is 0, return all peers + if (numPeers === 0) { + return [...bootstrapPeers, ...nonBootstrapPeers]; + } + + // Initialize the list of selected peers with the bootstrap peers + const selectedPeers: Peer[] = [...bootstrapPeers]; + + // Fill up to numPeers with remaining random peers if needed + while (selectedPeers.length < numPeers && nonBootstrapPeers.length > 0) { + const randomIndex = Math.floor(Math.random() * nonBootstrapPeers.length); + const randomPeer = nonBootstrapPeers.splice(randomIndex, 1)[0]; + selectedPeers.push(randomPeer); + } + + return selectedPeers; +} diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index 51174b1ad8..d9379c1690 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -5,7 +5,6 @@ import { IMessage, Libp2p, ProtocolCreateOptions, - ProtocolOptions, SendError, SendResult } from "@waku/interfaces"; @@ -42,6 +41,7 @@ type PreparePushMessageResult = */ class LightPush extends BaseProtocol implements ILightPush { options: ProtocolCreateOptions; + private readonly NUM_PEERS_PROTOCOL = 1; constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { super(LightPushCodec, libp2p.components); @@ -80,11 +80,7 @@ class LightPush extends BaseProtocol implements ILightPush { } } - async send( - encoder: IEncoder, - message: IMessage, - opts?: ProtocolOptions - ): Promise { + async send(encoder: IEncoder, message: IMessage): Promise { const { pubSubTopic = DefaultPubSubTopic } = this.options; const recipients: PeerId[] = []; @@ -97,49 +93,70 @@ class LightPush extends BaseProtocol implements ILightPush { if (preparationError || !query) { return { recipients, - error: preparationError + errors: [preparationError] }; } - let error: undefined | SendError = undefined; - const peer = await this.getPeer(opts?.peerId); - const stream = await this.getStream(peer); + const peers = await this.getPeers({ + maxBootstrapPeers: 1, + numPeers: this.NUM_PEERS_PROTOCOL + }); - try { - const res = await pipe( - [query.encode()], - lp.encode, - stream, - lp.decode, - async (source) => await all(source) - ); + const promises = peers.map(async (peer) => { + let error: SendError | undefined; + const stream = await this.getStream(peer); try { - const bytes = new Uint8ArrayList(); - res.forEach((chunk) => { - bytes.append(chunk); - }); + const res = await pipe( + [query.encode()], + lp.encode, + stream, + lp.decode, + async (source) => await all(source) + ); + try { + const bytes = new Uint8ArrayList(); + res.forEach((chunk) => { + bytes.append(chunk); + }); - const response = PushRpc.decode(bytes).response; + const response = PushRpc.decode(bytes).response; - if (response?.isSuccess) { - recipients.push(peer.id); - } else { - log("No response in PushRPC"); - error = SendError.NO_RPC_RESPONSE; + if (response?.isSuccess) { + recipients.some((recipient) => recipient.equals(peer.id)) || + recipients.push(peer.id); + } else { + log("No response in PushRPC"); + error = SendError.NO_RPC_RESPONSE; + } + } catch (err) { + log("Failed to decode push reply", err); + error = SendError.DECODE_FAILED; } } catch (err) { - log("Failed to decode push reply", err); - error = SendError.DECODE_FAILED; + log("Failed to send waku light push request", err); + error = SendError.GENERIC_FAIL; } - } catch (err) { - log("Failed to send waku light push request", err); - error = SendError.GENERIC_FAIL; - } + + return { recipients, error }; + }); + + const results = await Promise.allSettled(promises); + const errors = results + .filter( + ( + result + ): result is PromiseFulfilledResult<{ + recipients: PeerId[]; + error: SendError | undefined; + }> => result.status === "fulfilled" + ) + .map((result) => result.value.error) + .filter((error) => error !== undefined) as SendError[]; return { recipients, - error + errors }; } } diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index b8aa405d95..67bd856e5d 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -1,5 +1,4 @@ import type { Stream } from "@libp2p/interface/connection"; -import type { PeerId } from "@libp2p/interface/peer-id"; import { sha256 } from "@noble/hashes/sha256"; import { Cursor, @@ -40,10 +39,6 @@ export interface TimeFilter { } export interface QueryOptions { - /** - * The peer to query. If undefined, a pseudo-random peer is selected from the connected Waku Store peers. - */ - peerId?: PeerId; /** * The direction in which pages are retrieved: * - { @link PageDirection.BACKWARD }: Most recent page first. @@ -80,6 +75,7 @@ export interface QueryOptions { */ class Store extends BaseProtocol implements IStore { options: ProtocolCreateOptions; + private readonly NUM_PEERS_PROTOCOL = 1; constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { super(StoreCodec, libp2p.components); @@ -246,12 +242,14 @@ class Store extends BaseProtocol implements IStore { { contentTopics, startTime, endTime } ); - log("Querying history with the following options", { - ...options, - peerId: options?.peerId?.toString() - }); + log("Querying history with the following options", options); - const peer = await this.getPeer(options?.peerId); + const peer = ( + await this.getPeers({ + numPeers: this.NUM_PEERS_PROTOCOL, + maxBootstrapPeers: 1 + }) + )[0]; for await (const messages of paginate( this.getStream.bind(this, peer), diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 4c0783eaec..c354b1b740 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -54,13 +54,6 @@ export type ProtocolCreateOptions = { defaultBootstrap?: boolean; }; -export type ProtocolOptions = { - /** - * Optionally specify an PeerId for the protocol request. If not included, will use a random peer. - */ - peerId?: PeerId; -}; - export type Callback = ( msg: T ) => void | Promise; @@ -74,6 +67,6 @@ export enum SendError { } export interface SendResult { - error?: SendError; + errors?: SendError[]; recipients: PeerId[]; } diff --git a/packages/interfaces/src/receiver.ts b/packages/interfaces/src/receiver.ts index 64cd4f810a..9dbcc2fde4 100644 --- a/packages/interfaces/src/receiver.ts +++ b/packages/interfaces/src/receiver.ts @@ -1,6 +1,6 @@ import type { IDecodedMessage, IDecoder } from "./message.js"; import type { IAsyncIterator, PubSubTopic, Unsubscribe } from "./misc.js"; -import type { Callback, ProtocolOptions } from "./protocols.js"; +import type { Callback } from "./protocols.js"; type ContentTopic = string; @@ -8,12 +8,10 @@ export type ActiveSubscriptions = Map; export interface IReceiver { toSubscriptionIterator: ( - decoders: IDecoder | IDecoder[], - opts?: ProtocolOptions + decoders: IDecoder | IDecoder[] ) => Promise>; subscribe: ( decoders: IDecoder | IDecoder[], - callback: Callback, - opts?: ProtocolOptions + callback: Callback ) => Unsubscribe | Promise; } diff --git a/packages/interfaces/src/sender.ts b/packages/interfaces/src/sender.ts index efdc2f4531..ab7e6d1b8c 100644 --- a/packages/interfaces/src/sender.ts +++ b/packages/interfaces/src/sender.ts @@ -1,10 +1,6 @@ import type { IEncoder, IMessage } from "./message.js"; -import type { ProtocolOptions, SendResult } from "./protocols.js"; +import type { SendResult } from "./protocols.js"; export interface ISender { - send: ( - encoder: IEncoder, - message: IMessage, - opts?: ProtocolOptions - ) => Promise; + send: (encoder: IEncoder, message: IMessage) => Promise; } diff --git a/packages/interfaces/src/store.ts b/packages/interfaces/src/store.ts index aea825b3b2..9127d35d4d 100644 --- a/packages/interfaces/src/store.ts +++ b/packages/interfaces/src/store.ts @@ -1,5 +1,5 @@ import type { IDecodedMessage, IDecoder } from "./message.js"; -import type { IBaseProtocol, ProtocolOptions } from "./protocols.js"; +import type { IBaseProtocol } from "./protocols.js"; export enum PageDirection { BACKWARD = "backward", @@ -43,7 +43,7 @@ export type StoreQueryOptions = { * Message. */ cursor?: Cursor; -} & ProtocolOptions; +}; export interface IStore extends IBaseProtocol { queryWithOrderedCallback: ( diff --git a/packages/peer-exchange/src/waku_peer_exchange.ts b/packages/peer-exchange/src/waku_peer_exchange.ts index 4a088f51d7..823793e3e0 100644 --- a/packages/peer-exchange/src/waku_peer_exchange.ts +++ b/packages/peer-exchange/src/waku_peer_exchange.ts @@ -23,14 +23,11 @@ const log = debug("waku:peer-exchange"); * Implementation of the Peer Exchange protocol (https://rfc.vac.dev/spec/34/) */ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange { - multicodec: string; - /** * @param components - libp2p components */ constructor(components: Libp2pComponents) { super(PeerExchangeCodec, components); - this.multicodec = PeerExchangeCodec; } /** diff --git a/packages/relay/src/index.ts b/packages/relay/src/index.ts index e8a73233d5..20dc7d2f50 100644 --- a/packages/relay/src/index.ts +++ b/packages/relay/src/index.ts @@ -6,6 +6,7 @@ import { } from "@chainsafe/libp2p-gossipsub"; import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types"; import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types"; +import type { PeerId } from "@libp2p/interface/peer-id"; import type { PubSub } from "@libp2p/interface/pubsub"; import { sha256 } from "@noble/hashes/sha256"; import { DefaultPubSubTopic } from "@waku/core"; @@ -20,7 +21,6 @@ import { IRelay, Libp2p, ProtocolCreateOptions, - ProtocolOptions, SendError, SendResult } from "@waku/interfaces"; @@ -98,11 +98,12 @@ class Relay implements IRelay { * Send Waku message. */ public async send(encoder: IEncoder, message: IMessage): Promise { + const recipients: PeerId[] = []; if (!isSizeValid(message.payload)) { log("Failed to send waku relay: message is bigger that 1MB"); return { - recipients: [], - error: SendError.SIZE_TOO_BIG + recipients, + errors: [SendError.SIZE_TOO_BIG] }; } @@ -110,8 +111,8 @@ class Relay implements IRelay { if (!msg) { log("Failed to encode message, aborting publish"); return { - recipients: [], - error: SendError.ENCODE_FAILED + recipients, + errors: [SendError.ENCODE_FAILED] }; } @@ -160,10 +161,9 @@ class Relay implements IRelay { } public toSubscriptionIterator( - decoders: IDecoder | IDecoder[], - opts?: ProtocolOptions | undefined + decoders: IDecoder | IDecoder[] ): Promise> { - return toAsyncIterator(this, decoders, opts); + return toAsyncIterator(this, decoders); } public getActiveSubscriptions(): ActiveSubscriptions { diff --git a/packages/sdk/package.json b/packages/sdk/package.json index f2e546a0b0..734019b3a2 100644 --- a/packages/sdk/package.json +++ b/packages/sdk/package.json @@ -56,6 +56,7 @@ "@waku/core": "0.0.22", "@waku/dns-discovery": "0.0.16", "@waku/interfaces": "0.0.17", + "@waku/peer-exchange": "^0.0.15", "libp2p": "^0.46.8" }, "devDependencies": { diff --git a/packages/sdk/src/create.ts b/packages/sdk/src/create.ts index 3ef8077735..0b78fd39fc 100644 --- a/packages/sdk/src/create.ts +++ b/packages/sdk/src/create.ts @@ -21,6 +21,7 @@ import type { ProtocolCreateOptions, RelayNode } from "@waku/interfaces"; +import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange"; import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay"; import { createLibp2p, Libp2pOptions } from "libp2p"; import { identifyService } from "libp2p/identify"; @@ -45,7 +46,7 @@ export async function createLightNode( const libp2pOptions = options?.libp2p ?? {}; const peerDiscovery = libp2pOptions.peerDiscovery ?? []; if (options?.defaultBootstrap) { - peerDiscovery.push(defaultPeerDiscovery()); + peerDiscovery.push(...defaultPeerDiscoveries()); Object.assign(libp2pOptions, { peerDiscovery }); } @@ -78,7 +79,7 @@ export async function createRelayNode( const libp2pOptions = options?.libp2p ?? {}; const peerDiscovery = libp2pOptions.peerDiscovery ?? []; if (options?.defaultBootstrap) { - peerDiscovery.push(defaultPeerDiscovery()); + peerDiscovery.push(...defaultPeerDiscoveries()); Object.assign(libp2pOptions, { peerDiscovery }); } @@ -119,7 +120,7 @@ export async function createFullNode( const libp2pOptions = options?.libp2p ?? {}; const peerDiscovery = libp2pOptions.peerDiscovery ?? []; if (options?.defaultBootstrap) { - peerDiscovery.push(defaultPeerDiscovery()); + peerDiscovery.push(...defaultPeerDiscoveries()); Object.assign(libp2pOptions, { peerDiscovery }); } @@ -144,10 +145,14 @@ export async function createFullNode( ) as FullNode; } -export function defaultPeerDiscovery(): ( +export function defaultPeerDiscoveries(): (( components: Libp2pComponents -) => PeerDiscovery { - return wakuDnsDiscovery([enrTree["PROD"]], DEFAULT_NODE_REQUIREMENTS); +) => PeerDiscovery)[] { + const discoveries = [ + wakuDnsDiscovery([enrTree["PROD"]], DEFAULT_NODE_REQUIREMENTS), + wakuPeerExchangeDiscovery() + ]; + return discoveries; } type PubsubService = { diff --git a/packages/tests/tests/light_push.node.spec.ts b/packages/tests/tests/light_push.node.spec.ts index 9529f27cd3..5b14f78676 100644 --- a/packages/tests/tests/light_push.node.spec.ts +++ b/packages/tests/tests/light_push.node.spec.ts @@ -108,7 +108,7 @@ describe("Waku Light Push [node only]", () => { payload: generateRandomUint8Array(MB + 65536) }); expect(pushResponse.recipients.length).to.eq(0); - expect(pushResponse.error).to.eq(SendError.SIZE_TOO_BIG); + expect(pushResponse.errors).to.include(SendError.SIZE_TOO_BIG); }); }); @@ -138,13 +138,9 @@ describe("Waku Light Push [node only] - custom pubsub topic", () => { const messageText = "Light Push works!"; log("Send message via lightpush"); - const pushResponse = await waku.lightPush.send( - TestEncoder, - { payload: utf8ToBytes(messageText) }, - { - peerId: nimPeerId - } - ); + const pushResponse = await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes(messageText) + }); log("Ack received", pushResponse); expect(pushResponse.recipients[0].toString()).to.eq(nimPeerId.toString()); diff --git a/packages/tests/tests/relay.node.spec.ts b/packages/tests/tests/relay.node.spec.ts index f8cc8ae5c7..9c046f9de6 100644 --- a/packages/tests/tests/relay.node.spec.ts +++ b/packages/tests/tests/relay.node.spec.ts @@ -382,13 +382,13 @@ describe("Waku Relay [node only]", () => { payload: generateRandomUint8Array(1 * MB + 65536) }); expect(sendResult.recipients.length).to.eq(0); - expect(sendResult.error).to.eq(SendError.SIZE_TOO_BIG); + expect(sendResult.errors).to.include(SendError.SIZE_TOO_BIG); sendResult = await waku1.relay.send(TestEncoder, { payload: generateRandomUint8Array(2 * MB) }); expect(sendResult.recipients.length).to.eq(0); - expect(sendResult.error).to.eq(SendError.SIZE_TOO_BIG); + expect(sendResult.errors).to.include(SendError.SIZE_TOO_BIG); const waku2ReceivedMsg = await waku2ReceivedMsgPromise; expect(waku2ReceivedMsg?.payload?.length).to.eq(0); diff --git a/packages/tests/tests/store.node.spec.ts b/packages/tests/tests/store.node.spec.ts index cde93d098c..53736af4bd 100644 --- a/packages/tests/tests/store.node.spec.ts +++ b/packages/tests/tests/store.node.spec.ts @@ -488,8 +488,6 @@ describe("Waku Store", () => { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - const nwakuPeerId = await nwaku.getPeerId(); - const firstMessages: IMessage[] = []; await waku.store.queryWithOrderedCallback( [TestDecoder], @@ -499,7 +497,6 @@ describe("Waku Store", () => { } }, { - peerId: nwakuPeerId, timeFilter: { startTime, endTime: message1Timestamp } } ); @@ -511,7 +508,6 @@ describe("Waku Store", () => { bothMessages.push(msg); }, { - peerId: nwakuPeerId, timeFilter: { startTime, endTime diff --git a/packages/tests/tests/utils.spec.ts b/packages/tests/tests/utils.spec.ts index 006170de95..42935d60ec 100644 --- a/packages/tests/tests/utils.spec.ts +++ b/packages/tests/tests/utils.spec.ts @@ -4,7 +4,7 @@ import { DefaultPubSubTopic, waitForRemotePeer } from "@waku/core"; -import type { LightNode } from "@waku/interfaces"; +import { LightNode } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; import { toAsyncIterator } from "@waku/utils"; @@ -49,12 +49,9 @@ describe("Util: toAsyncIterator: Filter", () => { const messageText = "hey, what's up?"; const sent = { payload: utf8ToBytes(messageText) }; - const { iterator } = await toAsyncIterator( - waku.filter, - TestDecoder, - {}, - { timeoutMs: 1000 } - ); + const { iterator } = await toAsyncIterator(waku.filter, TestDecoder, { + timeoutMs: 1000 + }); await waku.lightPush.send(TestEncoder, sent); const { value } = await iterator.next(); @@ -66,12 +63,9 @@ describe("Util: toAsyncIterator: Filter", () => { it("handles multiple messages", async function () { this.timeout(10000); - const { iterator } = await toAsyncIterator( - waku.filter, - TestDecoder, - {}, - { timeoutMs: 1000 } - ); + const { iterator } = await toAsyncIterator(waku.filter, TestDecoder, { + timeoutMs: 1000 + }); await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("Filtering works!") @@ -89,12 +83,9 @@ describe("Util: toAsyncIterator: Filter", () => { it("unsubscribes", async function () { this.timeout(10000); - const { iterator, stop } = await toAsyncIterator( - waku.filter, - TestDecoder, - {}, - { timeoutMs: 1000 } - ); + const { iterator, stop } = await toAsyncIterator(waku.filter, TestDecoder, { + timeoutMs: 1000 + }); await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("This should be received") diff --git a/packages/utils/package.json b/packages/utils/package.json index 8461f6483b..6693ac8186 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -66,7 +66,8 @@ }, "dependencies": { "debug": "^4.3.4", - "uint8arrays": "^4.0.4" + "uint8arrays": "^4.0.4", + "@waku/interfaces": "^0.0.17" }, "devDependencies": { "@rollup/plugin-commonjs": "^25.0.4", diff --git a/packages/utils/src/common/index.ts b/packages/utils/src/common/index.ts index 5429451207..f73240fbeb 100644 --- a/packages/utils/src/common/index.ts +++ b/packages/utils/src/common/index.ts @@ -1,5 +1,8 @@ export * from "./is_defined.js"; export * from "./random_subset.js"; +export * from "./group_by.js"; +export * from "./to_async_iterator.js"; +export * from "./is_size_valid.js"; export function removeItemFromArray(arr: unknown[], value: unknown): unknown[] { const index = arr.indexOf(value); @@ -8,6 +11,3 @@ export function removeItemFromArray(arr: unknown[], value: unknown): unknown[] { } return arr; } -export * from "./group_by.js"; -export * from "./to_async_iterator.js"; -export * from "./is_size_valid.js"; diff --git a/packages/utils/src/common/to_async_iterator.ts b/packages/utils/src/common/to_async_iterator.ts index 3dd9df8d51..489ef234ef 100644 --- a/packages/utils/src/common/to_async_iterator.ts +++ b/packages/utils/src/common/to_async_iterator.ts @@ -3,7 +3,6 @@ import type { IDecodedMessage, IDecoder, IReceiver, - ProtocolOptions, Unsubscribe } from "@waku/interfaces"; @@ -31,7 +30,6 @@ const FRAME_RATE = 60; export async function toAsyncIterator( receiver: IReceiver, decoder: IDecoder | IDecoder[], - options?: ProtocolOptions, iteratorOptions?: IteratorOptions ): Promise> { const iteratorDelay = iteratorOptions?.iteratorDelay ?? FRAME_RATE; @@ -39,13 +37,9 @@ export async function toAsyncIterator( const messages: T[] = []; let unsubscribe: undefined | Unsubscribe; - unsubscribe = await receiver.subscribe( - decoder, - (message: T) => { - messages.push(message); - }, - options - ); + unsubscribe = await receiver.subscribe(decoder, (message: T) => { + messages.push(message); + }); const isWithTimeout = Number.isInteger(iteratorOptions?.timeoutMs); const timeoutMs = iteratorOptions?.timeoutMs ?? 0;