feat!: set peer-exchange with default bootstrap (#1469)

* set peer-exchange with default bootstrap

* only initialise protocols with bootstrap peers

* update package

* update package-lock

* refactor `getPeers` while setting up a protocol

* move codecs to `@waku/interfaces`

* lightpush: send messages to multiple peers

* only use multiple peers for LP and Filter

* fix: ts warnings

* lightpush: tests pass

* update breaking changes for new API

* move codecs back into protocol files

* refactor: `getPeers()`

* rm: log as an arg

* add tsdoc for getPeers

* add import

* add prettier rule to eslint

* add: peer exchange to sdk as a dep

* fix eslint error

* add try catch

* revert unecessary diff

* revert unecessary diff

* fix imports

* convert relaycodecs to array

* remove: peerId as an arg for protocol methods

* keep peerId as an arg for peer-exchange

* remove: peerId from getPeers()

* lightpush: extract hardcoded numPeers as a constant

* return all peers if numPeers is 0 and increase readability for random peers

* refactor considering more than 1 bootstrap peers can exist

* use `getPeers`

* change arg for `getPeers` to object

* address comments

* refactor tests for new API

* lightpush: make constant the class variable

* use `maxBootstrapPeers` instead of `includeBootstrap`

* refactor protocols for new API

* add tests for `getPeers`

* skip getPeers test

* rm: only from test

* move tests to `base_protocol.spec.ts`

* break down `getPeers` into a `filter` method

* return all bootstrap peers if arg is 0

* refactor test without stubbing

* address comments

* update test title

* move `filterPeers` to a separate file

* address comments & add more test

* make test title more verbose

* address comments

* remove ProtocolOptions

* chore: refactor tests for new API

* add defaults for getPeers

* address comments

* rm unneeded comment

* address comment: add diversity of node tags to test

* address comments

* fix: imports
This commit is contained in:
Danish Arora 2023-09-07 13:15:49 +05:30 committed by GitHub
parent 408b79d6a5
commit 81a52a8097
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 346 additions and 149 deletions

3
package-lock.json generated
View File

@ -27311,6 +27311,7 @@
"@waku/core": "0.0.22", "@waku/core": "0.0.22",
"@waku/dns-discovery": "0.0.16", "@waku/dns-discovery": "0.0.16",
"@waku/interfaces": "0.0.17", "@waku/interfaces": "0.0.17",
"@waku/peer-exchange": "^0.0.15",
"@waku/relay": "0.0.5", "@waku/relay": "0.0.5",
"@waku/utils": "0.0.10", "@waku/utils": "0.0.10",
"libp2p": "^0.46.8" "libp2p": "^0.46.8"
@ -27629,6 +27630,7 @@
"version": "0.0.10", "version": "0.0.10",
"license": "MIT OR Apache-2.0", "license": "MIT OR Apache-2.0",
"dependencies": { "dependencies": {
"@waku/interfaces": "^0.0.17",
"debug": "^4.3.4", "debug": "^4.3.4",
"uint8arrays": "^4.0.4" "uint8arrays": "^4.0.4"
}, },
@ -31390,6 +31392,7 @@
"@waku/core": "0.0.22", "@waku/core": "0.0.22",
"@waku/dns-discovery": "0.0.16", "@waku/dns-discovery": "0.0.16",
"@waku/interfaces": "0.0.17", "@waku/interfaces": "0.0.17",
"@waku/peer-exchange": "^0.0.15",
"@waku/relay": "0.0.5", "@waku/relay": "0.0.5",
"@waku/utils": "0.0.10", "@waku/utils": "0.0.10",
"cspell": "^7.3.2", "cspell": "^7.3.2",

View File

@ -15,15 +15,11 @@ export * as waku_filter from "./lib/filter/index.js";
export { wakuFilter, FilterCodecs } from "./lib/filter/index.js"; export { wakuFilter, FilterCodecs } from "./lib/filter/index.js";
export * as waku_light_push from "./lib/light_push/index.js"; export * as waku_light_push from "./lib/light_push/index.js";
export { wakuLightPush, LightPushCodec } from "./lib/light_push/index.js"; export { wakuLightPush } from "./lib/light_push/index.js";
export * as waku_store from "./lib/store/index.js"; export * as waku_store from "./lib/store/index.js";
export {
PageDirection, export { PageDirection, wakuStore, createCursor } from "./lib/store/index.js";
wakuStore,
StoreCodec,
createCursor
} from "./lib/store/index.js";
export { waitForRemotePeer } from "./lib/wait_for_remote_peer.js"; export { waitForRemotePeer } from "./lib/wait_for_remote_peer.js";

View File

@ -5,6 +5,7 @@ import { Peer, PeerStore } from "@libp2p/interface/peer-store";
import type { IBaseProtocol, Libp2pComponents } from "@waku/interfaces"; import type { IBaseProtocol, Libp2pComponents } from "@waku/interfaces";
import { getPeersForProtocol, selectPeerForProtocol } from "@waku/utils/libp2p"; import { getPeersForProtocol, selectPeerForProtocol } from "@waku/utils/libp2p";
import { filterPeers } from "./filterPeers.js";
import { StreamManager } from "./stream_manager.js"; import { StreamManager } from "./stream_manager.js";
/** /**
@ -60,4 +61,32 @@ export class BaseProtocol implements IBaseProtocol {
); );
return peer; return peer;
} }
/**
* Retrieves a list of peers based on the specified criteria.
*
* @param numPeers - The total number of peers to retrieve. If 0, all peers are returned.
* @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve.
* @returns A Promise that resolves to an array of peers based on the specified criteria.
*/
protected async getPeers(
{
numPeers,
maxBootstrapPeers
}: {
numPeers: number;
maxBootstrapPeers: number;
} = {
maxBootstrapPeers: 1,
numPeers: 0
}
): Promise<Peer[]> {
// Retrieve all peers that support the protocol
const allPeersForProtocol = await getPeersForProtocol(this.peerStore, [
this.multicodec
]);
// Filter the peers based on the specified criteria
return filterPeers(allPeersForProtocol, numPeers, maxBootstrapPeers);
}
} }

View File

@ -1,5 +1,4 @@
import { Stream } from "@libp2p/interface/connection"; import { Stream } from "@libp2p/interface/connection";
import type { PeerId } from "@libp2p/interface/peer-id";
import type { Peer } from "@libp2p/interface/peer-store"; import type { Peer } from "@libp2p/interface/peer-store";
import type { IncomingStreamData } from "@libp2p/interface-internal/registrar"; import type { IncomingStreamData } from "@libp2p/interface-internal/registrar";
import type { import type {
@ -14,7 +13,6 @@ import type {
Libp2p, Libp2p,
PeerIdStr, PeerIdStr,
ProtocolCreateOptions, ProtocolCreateOptions,
ProtocolOptions,
PubSubTopic, PubSubTopic,
Unsubscribe Unsubscribe
} from "@waku/interfaces"; } from "@waku/interfaces";
@ -228,6 +226,7 @@ class Subscription {
class Filter extends BaseProtocol implements IReceiver { class Filter extends BaseProtocol implements IReceiver {
private readonly options: ProtocolCreateOptions; private readonly options: ProtocolCreateOptions;
private activeSubscriptions = new Map<string, Subscription>(); private activeSubscriptions = new Map<string, Subscription>();
private readonly NUM_PEERS_PROTOCOL = 1;
private getActiveSubscription( private getActiveSubscription(
pubSubTopic: PubSubTopic, pubSubTopic: PubSubTopic,
@ -257,14 +256,16 @@ class Filter extends BaseProtocol implements IReceiver {
this.options = options ?? {}; this.options = options ?? {};
} }
async createSubscription( async createSubscription(pubSubTopic?: string): Promise<Subscription> {
pubSubTopic?: string,
peerId?: PeerId
): Promise<Subscription> {
const _pubSubTopic = const _pubSubTopic =
pubSubTopic ?? this.options.pubSubTopic ?? DefaultPubSubTopic; pubSubTopic ?? this.options.pubSubTopic ?? DefaultPubSubTopic;
const peer = await this.getPeer(peerId); const peer = (
await this.getPeers({
maxBootstrapPeers: 1,
numPeers: this.NUM_PEERS_PROTOCOL
})
)[0];
const subscription = const subscription =
this.getActiveSubscription(_pubSubTopic, peer.id.toString()) ?? this.getActiveSubscription(_pubSubTopic, peer.id.toString()) ??
@ -278,10 +279,9 @@ class Filter extends BaseProtocol implements IReceiver {
} }
public toSubscriptionIterator<T extends IDecodedMessage>( public toSubscriptionIterator<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[], decoders: IDecoder<T> | IDecoder<T>[]
opts?: ProtocolOptions | undefined
): Promise<IAsyncIterator<T>> { ): Promise<IAsyncIterator<T>> {
return toAsyncIterator(this, decoders, opts); return toAsyncIterator(this, decoders);
} }
/** /**
@ -301,10 +301,9 @@ class Filter extends BaseProtocol implements IReceiver {
*/ */
async subscribe<T extends IDecodedMessage>( async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[], decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>, callback: Callback<T>
opts?: ProtocolOptions
): Promise<Unsubscribe> { ): Promise<Unsubscribe> {
const subscription = await this.createSubscription(undefined, opts?.peerId); const subscription = await this.createSubscription();
await subscription.subscribe(decoders, callback); await subscription.subscribe(decoders, callback);

View File

@ -0,0 +1,144 @@
import { Peer } from "@libp2p/interface/peer-store";
import type { Tag } from "@libp2p/interface/peer-store";
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import { Tags } from "@waku/interfaces";
import { expect } from "chai";
import { filterPeers } from "./filterPeers.js";
describe("filterPeers function", function () {
it("should return all peers when numPeers is 0", async function () {
const peer1 = await createSecp256k1PeerId();
const peer2 = await createSecp256k1PeerId();
const peer3 = await createSecp256k1PeerId();
const mockPeers = [
{
id: peer1,
tags: new Map<string, Tag>([[Tags.BOOTSTRAP, { value: 100 }]])
},
{
id: peer2,
tags: new Map<string, Tag>([[Tags.BOOTSTRAP, { value: 100 }]])
},
{
id: peer3,
tags: new Map<string, Tag>([[Tags.PEER_EXCHANGE, { value: 100 }]])
}
] as unknown as Peer[];
const result = await filterPeers(mockPeers, 0, 10);
expect(result.length).to.deep.equal(mockPeers.length);
});
it("should return all non-bootstrap peers and no bootstrap peer when numPeers is 0 and maxBootstrapPeers is 0", async function () {
const peer1 = await createSecp256k1PeerId();
const peer2 = await createSecp256k1PeerId();
const peer3 = await createSecp256k1PeerId();
const peer4 = await createSecp256k1PeerId();
const mockPeers = [
{
id: peer1,
tags: new Map<string, Tag>([[Tags.BOOTSTRAP, { value: 100 }]])
},
{
id: peer2,
tags: new Map<string, Tag>([[Tags.BOOTSTRAP, { value: 100 }]])
},
{
id: peer3,
tags: new Map<string, Tag>([[Tags.PEER_EXCHANGE, { value: 100 }]])
},
{
id: peer4,
tags: new Map<string, Tag>([[Tags.PEER_EXCHANGE, { value: 100 }]])
}
] as unknown as Peer[];
const result = await filterPeers(mockPeers, 0, 0);
// result should have no bootstrap peers, and a total of 2 peers
expect(result.length).to.equal(2);
expect(
result.filter((peer: Peer) => peer.tags.has(Tags.BOOTSTRAP)).length
).to.equal(0);
});
it("should return one bootstrap peer, and all non-boostrap peers, when numPeers is 0 & maxBootstrap is 1", async function () {
const peer1 = await createSecp256k1PeerId();
const peer2 = await createSecp256k1PeerId();
const peer3 = await createSecp256k1PeerId();
const peer4 = await createSecp256k1PeerId();
const peer5 = await createSecp256k1PeerId();
const mockPeers = [
{
id: peer1,
tags: new Map<string, Tag>([[Tags.BOOTSTRAP, { value: 100 }]])
},
{
id: peer2,
tags: new Map<string, Tag>([[Tags.BOOTSTRAP, { value: 100 }]])
},
{
id: peer3,
tags: new Map<string, Tag>([[Tags.PEER_EXCHANGE, { value: 100 }]])
},
{
id: peer4,
tags: new Map<string, Tag>([[Tags.PEER_EXCHANGE, { value: 100 }]])
},
{
id: peer5,
tags: new Map<string, Tag>([[Tags.PEER_EXCHANGE, { value: 100 }]])
}
] as unknown as Peer[];
const result = await filterPeers(mockPeers, 0, 1);
// result should have 1 bootstrap peers, and a total of 4 peers
expect(result.length).to.equal(4);
expect(
result.filter((peer: Peer) => peer.tags.has(Tags.BOOTSTRAP)).length
).to.equal(1);
});
it("should return only bootstrap peers up to maxBootstrapPeers", async function () {
const peer1 = await createSecp256k1PeerId();
const peer2 = await createSecp256k1PeerId();
const peer3 = await createSecp256k1PeerId();
const peer4 = await createSecp256k1PeerId();
const peer5 = await createSecp256k1PeerId();
const mockPeers = [
{
id: peer1,
tags: new Map<string, Tag>([[Tags.BOOTSTRAP, { value: 100 }]])
},
{
id: peer2,
tags: new Map<string, Tag>([[Tags.BOOTSTRAP, { value: 100 }]])
},
{
id: peer3,
tags: new Map<string, Tag>([[Tags.BOOTSTRAP, { value: 100 }]])
},
{
id: peer4,
tags: new Map<string, Tag>([[Tags.PEER_EXCHANGE, { value: 100 }]])
},
{
id: peer5,
tags: new Map<string, Tag>([[Tags.PEER_EXCHANGE, { value: 100 }]])
}
] as unknown as Peer[];
const result = await filterPeers(mockPeers, 5, 2);
// check that result has at least 2 bootstrap peers and no more than 5 peers
expect(result.length).to.be.at.least(2);
expect(result.length).to.be.at.most(5);
expect(result.filter((peer: Peer) => peer.tags.has(Tags.BOOTSTRAP)).length);
});
});

View File

@ -0,0 +1,43 @@
import { Peer } from "@libp2p/interface/peer-store";
import { Tags } from "@waku/interfaces";
/**
* Retrieves a list of peers based on the specified criteria.
*
* @param peers - The list of peers to filter from.
* @param numPeers - The total number of peers to retrieve. If 0, all peers are returned.
* @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve.
* @returns A Promise that resolves to an array of peers based on the specified criteria.
*/
export async function filterPeers(
peers: Peer[],
numPeers: number,
maxBootstrapPeers: number
): Promise<Peer[]> {
// Collect the bootstrap peers up to the specified maximum
const bootstrapPeers = peers
.filter((peer) => peer.tags.has(Tags.BOOTSTRAP))
.slice(0, maxBootstrapPeers);
// Collect non-bootstrap peers
const nonBootstrapPeers = peers.filter(
(peer) => !peer.tags.has(Tags.BOOTSTRAP)
);
// If numPeers is 0, return all peers
if (numPeers === 0) {
return [...bootstrapPeers, ...nonBootstrapPeers];
}
// Initialize the list of selected peers with the bootstrap peers
const selectedPeers: Peer[] = [...bootstrapPeers];
// Fill up to numPeers with remaining random peers if needed
while (selectedPeers.length < numPeers && nonBootstrapPeers.length > 0) {
const randomIndex = Math.floor(Math.random() * nonBootstrapPeers.length);
const randomPeer = nonBootstrapPeers.splice(randomIndex, 1)[0];
selectedPeers.push(randomPeer);
}
return selectedPeers;
}

View File

@ -5,7 +5,6 @@ import {
IMessage, IMessage,
Libp2p, Libp2p,
ProtocolCreateOptions, ProtocolCreateOptions,
ProtocolOptions,
SendError, SendError,
SendResult SendResult
} from "@waku/interfaces"; } from "@waku/interfaces";
@ -42,6 +41,7 @@ type PreparePushMessageResult =
*/ */
class LightPush extends BaseProtocol implements ILightPush { class LightPush extends BaseProtocol implements ILightPush {
options: ProtocolCreateOptions; options: ProtocolCreateOptions;
private readonly NUM_PEERS_PROTOCOL = 1;
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(LightPushCodec, libp2p.components); super(LightPushCodec, libp2p.components);
@ -80,11 +80,7 @@ class LightPush extends BaseProtocol implements ILightPush {
} }
} }
async send( async send(encoder: IEncoder, message: IMessage): Promise<SendResult> {
encoder: IEncoder,
message: IMessage,
opts?: ProtocolOptions
): Promise<SendResult> {
const { pubSubTopic = DefaultPubSubTopic } = this.options; const { pubSubTopic = DefaultPubSubTopic } = this.options;
const recipients: PeerId[] = []; const recipients: PeerId[] = [];
@ -97,12 +93,17 @@ class LightPush extends BaseProtocol implements ILightPush {
if (preparationError || !query) { if (preparationError || !query) {
return { return {
recipients, recipients,
error: preparationError errors: [preparationError]
}; };
} }
let error: undefined | SendError = undefined; const peers = await this.getPeers({
const peer = await this.getPeer(opts?.peerId); maxBootstrapPeers: 1,
numPeers: this.NUM_PEERS_PROTOCOL
});
const promises = peers.map(async (peer) => {
let error: SendError | undefined;
const stream = await this.getStream(peer); const stream = await this.getStream(peer);
try { try {
@ -113,7 +114,6 @@ class LightPush extends BaseProtocol implements ILightPush {
lp.decode, lp.decode,
async (source) => await all(source) async (source) => await all(source)
); );
try { try {
const bytes = new Uint8ArrayList(); const bytes = new Uint8ArrayList();
res.forEach((chunk) => { res.forEach((chunk) => {
@ -123,6 +123,7 @@ class LightPush extends BaseProtocol implements ILightPush {
const response = PushRpc.decode(bytes).response; const response = PushRpc.decode(bytes).response;
if (response?.isSuccess) { if (response?.isSuccess) {
recipients.some((recipient) => recipient.equals(peer.id)) ||
recipients.push(peer.id); recipients.push(peer.id);
} else { } else {
log("No response in PushRPC"); log("No response in PushRPC");
@ -137,9 +138,25 @@ class LightPush extends BaseProtocol implements ILightPush {
error = SendError.GENERIC_FAIL; error = SendError.GENERIC_FAIL;
} }
return { recipients, error };
});
const results = await Promise.allSettled(promises);
const errors = results
.filter(
(
result
): result is PromiseFulfilledResult<{
recipients: PeerId[];
error: SendError | undefined;
}> => result.status === "fulfilled"
)
.map((result) => result.value.error)
.filter((error) => error !== undefined) as SendError[];
return { return {
recipients, recipients,
error errors
}; };
} }
} }

View File

@ -1,5 +1,4 @@
import type { Stream } from "@libp2p/interface/connection"; import type { Stream } from "@libp2p/interface/connection";
import type { PeerId } from "@libp2p/interface/peer-id";
import { sha256 } from "@noble/hashes/sha256"; import { sha256 } from "@noble/hashes/sha256";
import { import {
Cursor, Cursor,
@ -40,10 +39,6 @@ export interface TimeFilter {
} }
export interface QueryOptions { export interface QueryOptions {
/**
* The peer to query. If undefined, a pseudo-random peer is selected from the connected Waku Store peers.
*/
peerId?: PeerId;
/** /**
* The direction in which pages are retrieved: * The direction in which pages are retrieved:
* - { @link PageDirection.BACKWARD }: Most recent page first. * - { @link PageDirection.BACKWARD }: Most recent page first.
@ -80,6 +75,7 @@ export interface QueryOptions {
*/ */
class Store extends BaseProtocol implements IStore { class Store extends BaseProtocol implements IStore {
options: ProtocolCreateOptions; options: ProtocolCreateOptions;
private readonly NUM_PEERS_PROTOCOL = 1;
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(StoreCodec, libp2p.components); super(StoreCodec, libp2p.components);
@ -246,12 +242,14 @@ class Store extends BaseProtocol implements IStore {
{ contentTopics, startTime, endTime } { contentTopics, startTime, endTime }
); );
log("Querying history with the following options", { log("Querying history with the following options", options);
...options,
peerId: options?.peerId?.toString()
});
const peer = await this.getPeer(options?.peerId); const peer = (
await this.getPeers({
numPeers: this.NUM_PEERS_PROTOCOL,
maxBootstrapPeers: 1
})
)[0];
for await (const messages of paginate<T>( for await (const messages of paginate<T>(
this.getStream.bind(this, peer), this.getStream.bind(this, peer),

View File

@ -54,13 +54,6 @@ export type ProtocolCreateOptions = {
defaultBootstrap?: boolean; defaultBootstrap?: boolean;
}; };
export type ProtocolOptions = {
/**
* Optionally specify an PeerId for the protocol request. If not included, will use a random peer.
*/
peerId?: PeerId;
};
export type Callback<T extends IDecodedMessage> = ( export type Callback<T extends IDecodedMessage> = (
msg: T msg: T
) => void | Promise<void>; ) => void | Promise<void>;
@ -74,6 +67,6 @@ export enum SendError {
} }
export interface SendResult { export interface SendResult {
error?: SendError; errors?: SendError[];
recipients: PeerId[]; recipients: PeerId[];
} }

View File

@ -1,6 +1,6 @@
import type { IDecodedMessage, IDecoder } from "./message.js"; import type { IDecodedMessage, IDecoder } from "./message.js";
import type { IAsyncIterator, PubSubTopic, Unsubscribe } from "./misc.js"; import type { IAsyncIterator, PubSubTopic, Unsubscribe } from "./misc.js";
import type { Callback, ProtocolOptions } from "./protocols.js"; import type { Callback } from "./protocols.js";
type ContentTopic = string; type ContentTopic = string;
@ -8,12 +8,10 @@ export type ActiveSubscriptions = Map<PubSubTopic, ContentTopic[]>;
export interface IReceiver { export interface IReceiver {
toSubscriptionIterator: <T extends IDecodedMessage>( toSubscriptionIterator: <T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[], decoders: IDecoder<T> | IDecoder<T>[]
opts?: ProtocolOptions
) => Promise<IAsyncIterator<T>>; ) => Promise<IAsyncIterator<T>>;
subscribe: <T extends IDecodedMessage>( subscribe: <T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[], decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>, callback: Callback<T>
opts?: ProtocolOptions
) => Unsubscribe | Promise<Unsubscribe>; ) => Unsubscribe | Promise<Unsubscribe>;
} }

View File

@ -1,10 +1,6 @@
import type { IEncoder, IMessage } from "./message.js"; import type { IEncoder, IMessage } from "./message.js";
import type { ProtocolOptions, SendResult } from "./protocols.js"; import type { SendResult } from "./protocols.js";
export interface ISender { export interface ISender {
send: ( send: (encoder: IEncoder, message: IMessage) => Promise<SendResult>;
encoder: IEncoder,
message: IMessage,
opts?: ProtocolOptions
) => Promise<SendResult>;
} }

View File

@ -1,5 +1,5 @@
import type { IDecodedMessage, IDecoder } from "./message.js"; import type { IDecodedMessage, IDecoder } from "./message.js";
import type { IBaseProtocol, ProtocolOptions } from "./protocols.js"; import type { IBaseProtocol } from "./protocols.js";
export enum PageDirection { export enum PageDirection {
BACKWARD = "backward", BACKWARD = "backward",
@ -43,7 +43,7 @@ export type StoreQueryOptions = {
* Message. * Message.
*/ */
cursor?: Cursor; cursor?: Cursor;
} & ProtocolOptions; };
export interface IStore extends IBaseProtocol { export interface IStore extends IBaseProtocol {
queryWithOrderedCallback: <T extends IDecodedMessage>( queryWithOrderedCallback: <T extends IDecodedMessage>(

View File

@ -23,14 +23,11 @@ const log = debug("waku:peer-exchange");
* Implementation of the Peer Exchange protocol (https://rfc.vac.dev/spec/34/) * Implementation of the Peer Exchange protocol (https://rfc.vac.dev/spec/34/)
*/ */
export class WakuPeerExchange extends BaseProtocol implements IPeerExchange { export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
multicodec: string;
/** /**
* @param components - libp2p components * @param components - libp2p components
*/ */
constructor(components: Libp2pComponents) { constructor(components: Libp2pComponents) {
super(PeerExchangeCodec, components); super(PeerExchangeCodec, components);
this.multicodec = PeerExchangeCodec;
} }
/** /**

View File

@ -6,6 +6,7 @@ import {
} from "@chainsafe/libp2p-gossipsub"; } from "@chainsafe/libp2p-gossipsub";
import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types"; import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types";
import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types"; import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types";
import type { PeerId } from "@libp2p/interface/peer-id";
import type { PubSub } from "@libp2p/interface/pubsub"; import type { PubSub } from "@libp2p/interface/pubsub";
import { sha256 } from "@noble/hashes/sha256"; import { sha256 } from "@noble/hashes/sha256";
import { DefaultPubSubTopic } from "@waku/core"; import { DefaultPubSubTopic } from "@waku/core";
@ -20,7 +21,6 @@ import {
IRelay, IRelay,
Libp2p, Libp2p,
ProtocolCreateOptions, ProtocolCreateOptions,
ProtocolOptions,
SendError, SendError,
SendResult SendResult
} from "@waku/interfaces"; } from "@waku/interfaces";
@ -98,11 +98,12 @@ class Relay implements IRelay {
* Send Waku message. * Send Waku message.
*/ */
public async send(encoder: IEncoder, message: IMessage): Promise<SendResult> { public async send(encoder: IEncoder, message: IMessage): Promise<SendResult> {
const recipients: PeerId[] = [];
if (!isSizeValid(message.payload)) { if (!isSizeValid(message.payload)) {
log("Failed to send waku relay: message is bigger that 1MB"); log("Failed to send waku relay: message is bigger that 1MB");
return { return {
recipients: [], recipients,
error: SendError.SIZE_TOO_BIG errors: [SendError.SIZE_TOO_BIG]
}; };
} }
@ -110,8 +111,8 @@ class Relay implements IRelay {
if (!msg) { if (!msg) {
log("Failed to encode message, aborting publish"); log("Failed to encode message, aborting publish");
return { return {
recipients: [], recipients,
error: SendError.ENCODE_FAILED errors: [SendError.ENCODE_FAILED]
}; };
} }
@ -160,10 +161,9 @@ class Relay implements IRelay {
} }
public toSubscriptionIterator<T extends IDecodedMessage>( public toSubscriptionIterator<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[], decoders: IDecoder<T> | IDecoder<T>[]
opts?: ProtocolOptions | undefined
): Promise<IAsyncIterator<T>> { ): Promise<IAsyncIterator<T>> {
return toAsyncIterator(this, decoders, opts); return toAsyncIterator(this, decoders);
} }
public getActiveSubscriptions(): ActiveSubscriptions { public getActiveSubscriptions(): ActiveSubscriptions {

View File

@ -56,6 +56,7 @@
"@waku/core": "0.0.22", "@waku/core": "0.0.22",
"@waku/dns-discovery": "0.0.16", "@waku/dns-discovery": "0.0.16",
"@waku/interfaces": "0.0.17", "@waku/interfaces": "0.0.17",
"@waku/peer-exchange": "^0.0.15",
"libp2p": "^0.46.8" "libp2p": "^0.46.8"
}, },
"devDependencies": { "devDependencies": {

View File

@ -21,6 +21,7 @@ import type {
ProtocolCreateOptions, ProtocolCreateOptions,
RelayNode RelayNode
} from "@waku/interfaces"; } from "@waku/interfaces";
import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange";
import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay"; import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay";
import { createLibp2p, Libp2pOptions } from "libp2p"; import { createLibp2p, Libp2pOptions } from "libp2p";
import { identifyService } from "libp2p/identify"; import { identifyService } from "libp2p/identify";
@ -45,7 +46,7 @@ export async function createLightNode(
const libp2pOptions = options?.libp2p ?? {}; const libp2pOptions = options?.libp2p ?? {};
const peerDiscovery = libp2pOptions.peerDiscovery ?? []; const peerDiscovery = libp2pOptions.peerDiscovery ?? [];
if (options?.defaultBootstrap) { if (options?.defaultBootstrap) {
peerDiscovery.push(defaultPeerDiscovery()); peerDiscovery.push(...defaultPeerDiscoveries());
Object.assign(libp2pOptions, { peerDiscovery }); Object.assign(libp2pOptions, { peerDiscovery });
} }
@ -78,7 +79,7 @@ export async function createRelayNode(
const libp2pOptions = options?.libp2p ?? {}; const libp2pOptions = options?.libp2p ?? {};
const peerDiscovery = libp2pOptions.peerDiscovery ?? []; const peerDiscovery = libp2pOptions.peerDiscovery ?? [];
if (options?.defaultBootstrap) { if (options?.defaultBootstrap) {
peerDiscovery.push(defaultPeerDiscovery()); peerDiscovery.push(...defaultPeerDiscoveries());
Object.assign(libp2pOptions, { peerDiscovery }); Object.assign(libp2pOptions, { peerDiscovery });
} }
@ -119,7 +120,7 @@ export async function createFullNode(
const libp2pOptions = options?.libp2p ?? {}; const libp2pOptions = options?.libp2p ?? {};
const peerDiscovery = libp2pOptions.peerDiscovery ?? []; const peerDiscovery = libp2pOptions.peerDiscovery ?? [];
if (options?.defaultBootstrap) { if (options?.defaultBootstrap) {
peerDiscovery.push(defaultPeerDiscovery()); peerDiscovery.push(...defaultPeerDiscoveries());
Object.assign(libp2pOptions, { peerDiscovery }); Object.assign(libp2pOptions, { peerDiscovery });
} }
@ -144,10 +145,14 @@ export async function createFullNode(
) as FullNode; ) as FullNode;
} }
export function defaultPeerDiscovery(): ( export function defaultPeerDiscoveries(): ((
components: Libp2pComponents components: Libp2pComponents
) => PeerDiscovery { ) => PeerDiscovery)[] {
return wakuDnsDiscovery([enrTree["PROD"]], DEFAULT_NODE_REQUIREMENTS); const discoveries = [
wakuDnsDiscovery([enrTree["PROD"]], DEFAULT_NODE_REQUIREMENTS),
wakuPeerExchangeDiscovery()
];
return discoveries;
} }
type PubsubService = { type PubsubService = {

View File

@ -108,7 +108,7 @@ describe("Waku Light Push [node only]", () => {
payload: generateRandomUint8Array(MB + 65536) payload: generateRandomUint8Array(MB + 65536)
}); });
expect(pushResponse.recipients.length).to.eq(0); expect(pushResponse.recipients.length).to.eq(0);
expect(pushResponse.error).to.eq(SendError.SIZE_TOO_BIG); expect(pushResponse.errors).to.include(SendError.SIZE_TOO_BIG);
}); });
}); });
@ -138,13 +138,9 @@ describe("Waku Light Push [node only] - custom pubsub topic", () => {
const messageText = "Light Push works!"; const messageText = "Light Push works!";
log("Send message via lightpush"); log("Send message via lightpush");
const pushResponse = await waku.lightPush.send( const pushResponse = await waku.lightPush.send(TestEncoder, {
TestEncoder, payload: utf8ToBytes(messageText)
{ payload: utf8ToBytes(messageText) }, });
{
peerId: nimPeerId
}
);
log("Ack received", pushResponse); log("Ack received", pushResponse);
expect(pushResponse.recipients[0].toString()).to.eq(nimPeerId.toString()); expect(pushResponse.recipients[0].toString()).to.eq(nimPeerId.toString());

View File

@ -382,13 +382,13 @@ describe("Waku Relay [node only]", () => {
payload: generateRandomUint8Array(1 * MB + 65536) payload: generateRandomUint8Array(1 * MB + 65536)
}); });
expect(sendResult.recipients.length).to.eq(0); expect(sendResult.recipients.length).to.eq(0);
expect(sendResult.error).to.eq(SendError.SIZE_TOO_BIG); expect(sendResult.errors).to.include(SendError.SIZE_TOO_BIG);
sendResult = await waku1.relay.send(TestEncoder, { sendResult = await waku1.relay.send(TestEncoder, {
payload: generateRandomUint8Array(2 * MB) payload: generateRandomUint8Array(2 * MB)
}); });
expect(sendResult.recipients.length).to.eq(0); expect(sendResult.recipients.length).to.eq(0);
expect(sendResult.error).to.eq(SendError.SIZE_TOO_BIG); expect(sendResult.errors).to.include(SendError.SIZE_TOO_BIG);
const waku2ReceivedMsg = await waku2ReceivedMsgPromise; const waku2ReceivedMsg = await waku2ReceivedMsgPromise;
expect(waku2ReceivedMsg?.payload?.length).to.eq(0); expect(waku2ReceivedMsg?.payload?.length).to.eq(0);

View File

@ -488,8 +488,6 @@ describe("Waku Store", () => {
await waku.dial(await nwaku.getMultiaddrWithId()); await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]); await waitForRemotePeer(waku, [Protocols.Store]);
const nwakuPeerId = await nwaku.getPeerId();
const firstMessages: IMessage[] = []; const firstMessages: IMessage[] = [];
await waku.store.queryWithOrderedCallback( await waku.store.queryWithOrderedCallback(
[TestDecoder], [TestDecoder],
@ -499,7 +497,6 @@ describe("Waku Store", () => {
} }
}, },
{ {
peerId: nwakuPeerId,
timeFilter: { startTime, endTime: message1Timestamp } timeFilter: { startTime, endTime: message1Timestamp }
} }
); );
@ -511,7 +508,6 @@ describe("Waku Store", () => {
bothMessages.push(msg); bothMessages.push(msg);
}, },
{ {
peerId: nwakuPeerId,
timeFilter: { timeFilter: {
startTime, startTime,
endTime endTime

View File

@ -4,7 +4,7 @@ import {
DefaultPubSubTopic, DefaultPubSubTopic,
waitForRemotePeer waitForRemotePeer
} from "@waku/core"; } from "@waku/core";
import type { LightNode } from "@waku/interfaces"; import { LightNode } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk"; import { createLightNode } from "@waku/sdk";
import { toAsyncIterator } from "@waku/utils"; import { toAsyncIterator } from "@waku/utils";
@ -49,12 +49,9 @@ describe("Util: toAsyncIterator: Filter", () => {
const messageText = "hey, what's up?"; const messageText = "hey, what's up?";
const sent = { payload: utf8ToBytes(messageText) }; const sent = { payload: utf8ToBytes(messageText) };
const { iterator } = await toAsyncIterator( const { iterator } = await toAsyncIterator(waku.filter, TestDecoder, {
waku.filter, timeoutMs: 1000
TestDecoder, });
{},
{ timeoutMs: 1000 }
);
await waku.lightPush.send(TestEncoder, sent); await waku.lightPush.send(TestEncoder, sent);
const { value } = await iterator.next(); const { value } = await iterator.next();
@ -66,12 +63,9 @@ describe("Util: toAsyncIterator: Filter", () => {
it("handles multiple messages", async function () { it("handles multiple messages", async function () {
this.timeout(10000); this.timeout(10000);
const { iterator } = await toAsyncIterator( const { iterator } = await toAsyncIterator(waku.filter, TestDecoder, {
waku.filter, timeoutMs: 1000
TestDecoder, });
{},
{ timeoutMs: 1000 }
);
await waku.lightPush.send(TestEncoder, { await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("Filtering works!") payload: utf8ToBytes("Filtering works!")
@ -89,12 +83,9 @@ describe("Util: toAsyncIterator: Filter", () => {
it("unsubscribes", async function () { it("unsubscribes", async function () {
this.timeout(10000); this.timeout(10000);
const { iterator, stop } = await toAsyncIterator( const { iterator, stop } = await toAsyncIterator(waku.filter, TestDecoder, {
waku.filter, timeoutMs: 1000
TestDecoder, });
{},
{ timeoutMs: 1000 }
);
await waku.lightPush.send(TestEncoder, { await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("This should be received") payload: utf8ToBytes("This should be received")

View File

@ -66,7 +66,8 @@
}, },
"dependencies": { "dependencies": {
"debug": "^4.3.4", "debug": "^4.3.4",
"uint8arrays": "^4.0.4" "uint8arrays": "^4.0.4",
"@waku/interfaces": "^0.0.17"
}, },
"devDependencies": { "devDependencies": {
"@rollup/plugin-commonjs": "^25.0.4", "@rollup/plugin-commonjs": "^25.0.4",

View File

@ -1,5 +1,8 @@
export * from "./is_defined.js"; export * from "./is_defined.js";
export * from "./random_subset.js"; export * from "./random_subset.js";
export * from "./group_by.js";
export * from "./to_async_iterator.js";
export * from "./is_size_valid.js";
export function removeItemFromArray(arr: unknown[], value: unknown): unknown[] { export function removeItemFromArray(arr: unknown[], value: unknown): unknown[] {
const index = arr.indexOf(value); const index = arr.indexOf(value);
@ -8,6 +11,3 @@ export function removeItemFromArray(arr: unknown[], value: unknown): unknown[] {
} }
return arr; return arr;
} }
export * from "./group_by.js";
export * from "./to_async_iterator.js";
export * from "./is_size_valid.js";

View File

@ -3,7 +3,6 @@ import type {
IDecodedMessage, IDecodedMessage,
IDecoder, IDecoder,
IReceiver, IReceiver,
ProtocolOptions,
Unsubscribe Unsubscribe
} from "@waku/interfaces"; } from "@waku/interfaces";
@ -31,7 +30,6 @@ const FRAME_RATE = 60;
export async function toAsyncIterator<T extends IDecodedMessage>( export async function toAsyncIterator<T extends IDecodedMessage>(
receiver: IReceiver, receiver: IReceiver,
decoder: IDecoder<T> | IDecoder<T>[], decoder: IDecoder<T> | IDecoder<T>[],
options?: ProtocolOptions,
iteratorOptions?: IteratorOptions iteratorOptions?: IteratorOptions
): Promise<IAsyncIterator<T>> { ): Promise<IAsyncIterator<T>> {
const iteratorDelay = iteratorOptions?.iteratorDelay ?? FRAME_RATE; const iteratorDelay = iteratorOptions?.iteratorDelay ?? FRAME_RATE;
@ -39,13 +37,9 @@ export async function toAsyncIterator<T extends IDecodedMessage>(
const messages: T[] = []; const messages: T[] = [];
let unsubscribe: undefined | Unsubscribe; let unsubscribe: undefined | Unsubscribe;
unsubscribe = await receiver.subscribe( unsubscribe = await receiver.subscribe(decoder, (message: T) => {
decoder,
(message: T) => {
messages.push(message); messages.push(message);
}, });
options
);
const isWithTimeout = Number.isInteger(iteratorOptions?.timeoutMs); const isWithTimeout = Number.isInteger(iteratorOptions?.timeoutMs);
const timeoutMs = iteratorOptions?.timeoutMs ?? 0; const timeoutMs = iteratorOptions?.timeoutMs ?? 0;