feat(static-sharding)!: allow multiple pubSubTopics (#1586)

* `ProtocolCreateOptions` now has `pubSubTopic` as `pubSubTopic[]`

* chore: update encoder & decoder to support `PubSubTopic`

* feat(protocols): allow multiple `PubSubTopic[]`

* feat(relay): allow multiple `PubSubTopic[]`

* chore(tests): update for new API

* chore: minor fixes

* chore: make store more robust

* fix(relay): correctly set types

* chore(address comments): update terminology around configured pubsub topics

* chore(address comments): minor refactoring

* chore(relay): split `subscribe` into smaller functions for readability & modularity

* chore(address comments): refactor `waitForGossipSubPeerInMesh`

* chore(store): only allow to query one `pubSubTopic`

* fix: `store` bug

* feat(tests): add some basic tests

* sharding utils

* address comments

* feat(relay): re-add API for `getMeshPeers`

* update error message

Co-authored-by: fryorcraken <110212804+fryorcraken@users.noreply.github.com>

* refactor for new API

* feat: simplify handling of observers (#1614)

* refactor: simplify handling of observers

* refactor: Remove redundant PubSubTopic from Observer

* use `??` instead of `||`

* update `pubsubTopic` to `pubSubTopic`

* update `interval` typo

* change occurence of `pubsubTopic` to `pubSubTopic`

* relay: rm `getAllMeshPeers` and make `pubSubTopics` public

* relay: use `push_or_init_map` and move to `utils`

* fix: update API for tests

* fix: relay waitForRemotePeer

---------

Co-authored-by: fryorcraken <110212804+fryorcraken@users.noreply.github.com>
This commit is contained in:
Danish Arora 2023-09-27 15:28:07 +05:30 committed by GitHub
parent 077d9a76d3
commit a3c45b6e1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 393 additions and 192 deletions

View File

@ -39,6 +39,7 @@
"exponentiate",
"extip",
"fanout",
"sharded",
"floodsub",
"fontsource",
"globby",

26
package-lock.json generated
View File

@ -6506,7 +6506,8 @@
},
"node_modules/chai": {
"version": "4.3.8",
"license": "MIT",
"resolved": "https://registry.npmjs.org/chai/-/chai-4.3.8.tgz",
"integrity": "sha512-vX4YvVVtxlfSZ2VecZgFUTU5qPCYsobVI2O9FmwEXBhDigYGQA6jRXCycIs1yJnnWbZ6/+a2zNIF5DfVCcJBFQ==",
"dependencies": {
"assertion-error": "^1.1.0",
"check-error": "^1.0.2",
@ -26180,7 +26181,7 @@
"version": "0.0.11",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@waku/interfaces": "0.0.18",
"chai": "^4.3.8",
"debug": "^4.3.4",
"uint8arrays": "^4.0.4"
},
@ -26189,7 +26190,7 @@
"@rollup/plugin-json": "^6.0.0",
"@rollup/plugin-node-resolve": "^15.1.0",
"@waku/build-utils": "*",
"@waku/interfaces": "0.0.17",
"@waku/interfaces": "0.0.18",
"cspell": "^7.3.2",
"npm-run-all": "^4.1.5",
"rollup": "^3.29.2"
@ -26197,14 +26198,6 @@
"engines": {
"node": ">=18"
}
},
"packages/utils/node_modules/@waku/interfaces": {
"version": "0.0.17",
"dev": true,
"license": "MIT OR Apache-2.0",
"engines": {
"node": ">=16"
}
}
},
"dependencies": {
@ -29401,18 +29394,13 @@
"@rollup/plugin-json": "^6.0.0",
"@rollup/plugin-node-resolve": "^15.1.0",
"@waku/build-utils": "*",
"@waku/interfaces": "0.0.17",
"@waku/interfaces": "0.0.18",
"chai": "^4.3.8",
"cspell": "^7.3.2",
"debug": "^4.3.4",
"npm-run-all": "^4.1.5",
"rollup": "^3.29.2",
"uint8arrays": "^4.0.4"
},
"dependencies": {
"@waku/interfaces": {
"version": "0.0.17",
"dev": true
}
}
},
"@webassemblyjs/ast": {
@ -30521,6 +30509,8 @@
},
"chai": {
"version": "4.3.8",
"resolved": "https://registry.npmjs.org/chai/-/chai-4.3.8.tgz",
"integrity": "sha512-vX4YvVVtxlfSZ2VecZgFUTU5qPCYsobVI2O9FmwEXBhDigYGQA6jRXCycIs1yJnnWbZ6/+a2zNIF5DfVCcJBFQ==",
"requires": {
"assertion-error": "^1.1.0",
"check-error": "^1.0.2",

View File

@ -17,7 +17,11 @@ import type {
Unsubscribe
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { groupByContentTopic, toAsyncIterator } from "@waku/utils";
import {
ensurePubsubTopicIsConfigured,
groupByContentTopic,
toAsyncIterator
} from "@waku/utils";
import debug from "debug";
import all from "it-all";
import * as lp from "it-length-prefixed";
@ -230,7 +234,7 @@ class Subscription {
}
class Filter extends BaseProtocol implements IReceiver {
private readonly options: ProtocolCreateOptions;
private readonly pubSubTopics: PubSubTopic[] = [];
private activeSubscriptions = new Map<string, Subscription>();
private readonly NUM_PEERS_PROTOCOL = 1;
@ -253,19 +257,22 @@ class Filter extends BaseProtocol implements IReceiver {
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(FilterCodecs.SUBSCRIBE, libp2p.components);
this.pubSubTopics = options?.pubSubTopics || [DefaultPubSubTopic];
libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => {
log("Failed to register ", FilterCodecs.PUSH, e);
});
this.activeSubscriptions = new Map();
this.options = options ?? {};
}
async createSubscription(pubSubTopic?: string): Promise<Subscription> {
const _pubSubTopic =
pubSubTopic ?? this.options.pubSubTopic ?? DefaultPubSubTopic;
async createSubscription(
pubSubTopic: string = DefaultPubSubTopic
): Promise<Subscription> {
ensurePubsubTopicIsConfigured(pubSubTopic, this.pubSubTopics);
//TODO: get a relevant peer for the topic/shard
// https://github.com/waku-org/js-waku/pull/1586#discussion_r1336428230
const peer = (
await this.getPeers({
maxBootstrapPeers: 1,
@ -274,11 +281,11 @@ class Filter extends BaseProtocol implements IReceiver {
)[0];
const subscription =
this.getActiveSubscription(_pubSubTopic, peer.id.toString()) ??
this.getActiveSubscription(pubSubTopic, peer.id.toString()) ??
this.setActiveSubscription(
_pubSubTopic,
pubSubTopic,
peer.id.toString(),
new Subscription(_pubSubTopic, peer, this.getStream.bind(this, peer))
new Subscription(pubSubTopic, peer, this.getStream.bind(this, peer))
);
return subscription;

View File

@ -1,6 +1,6 @@
import type { PeerId } from "@libp2p/interface/peer-id";
import type { PeerStore } from "@libp2p/interface/peer-store";
import type { IRelay } from "@waku/interfaces";
import type { IRelay, PeerIdStr } from "@waku/interfaces";
import type { KeepAliveOptions } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/utils/bytes";
import debug from "debug";
@ -13,7 +13,7 @@ const log = debug("waku:keep-alive");
export class KeepAliveManager {
private pingKeepAliveTimers: Map<string, ReturnType<typeof setInterval>>;
private relayKeepAliveTimers: Map<PeerId, ReturnType<typeof setInterval>>;
private relayKeepAliveTimers: Map<PeerId, ReturnType<typeof setInterval>[]>;
private options: KeepAliveOptions;
private relay?: IRelay;
@ -66,17 +66,12 @@ export class KeepAliveManager {
const relay = this.relay;
if (relay && relayPeriodSecs !== 0) {
const encoder = createEncoder({
contentTopic: RelayPingContentTopic,
ephemeral: true
});
const interval = setInterval(() => {
log("Sending Waku Relay ping message");
relay
.send(encoder, { payload: new Uint8Array([1]) })
.catch((e) => log("Failed to send relay ping", e));
}, relayPeriodSecs * 1000);
this.relayKeepAliveTimers.set(peerId, interval);
const intervals = this.scheduleRelayPings(
relay,
relayPeriodSecs,
peerId.toString()
);
this.relayKeepAliveTimers.set(peerId, intervals);
}
}
@ -89,7 +84,7 @@ export class KeepAliveManager {
}
if (this.relayKeepAliveTimers.has(peerId)) {
clearInterval(this.relayKeepAliveTimers.get(peerId));
this.relayKeepAliveTimers.get(peerId)?.map(clearInterval);
this.relayKeepAliveTimers.delete(peerId);
}
}
@ -105,4 +100,32 @@ export class KeepAliveManager {
this.pingKeepAliveTimers.clear();
this.relayKeepAliveTimers.clear();
}
private scheduleRelayPings(
relay: IRelay,
relayPeriodSecs: number,
peerIdStr: PeerIdStr
): NodeJS.Timeout[] {
// send a ping message to each PubSubTopic the peer is part of
const intervals: NodeJS.Timeout[] = [];
for (const topic of relay.pubSubTopics) {
const meshPeers = relay.getMeshPeers(topic);
if (!meshPeers.includes(peerIdStr)) continue;
const encoder = createEncoder({
pubSubTopic: topic,
contentTopic: RelayPingContentTopic,
ephemeral: true
});
const interval = setInterval(() => {
log("Sending Waku Relay ping message");
relay
.send(encoder, { payload: new Uint8Array([1]) })
.catch((e) => log("Failed to send relay ping", e));
}, relayPeriodSecs * 1000);
intervals.push(interval);
}
return intervals;
}
}

View File

@ -6,11 +6,12 @@ import {
IMessage,
Libp2p,
ProtocolCreateOptions,
PubSubTopic,
SendError,
SendResult
} from "@waku/interfaces";
import { PushResponse } from "@waku/proto";
import { isSizeValid } from "@waku/utils";
import { ensurePubsubTopicIsConfigured, isSizeValid } from "@waku/utils";
import debug from "debug";
import all from "it-all";
import * as lp from "it-length-prefixed";
@ -41,12 +42,12 @@ type PreparePushMessageResult =
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
*/
class LightPush extends BaseProtocol implements ILightPush {
options: ProtocolCreateOptions;
private readonly pubSubTopics: PubSubTopic[];
private readonly NUM_PEERS_PROTOCOL = 1;
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(LightPushCodec, libp2p.components);
this.options = options || {};
this.pubSubTopics = options?.pubSubTopics ?? [DefaultPubSubTopic];
}
private async preparePushMessage(
@ -82,7 +83,9 @@ class LightPush extends BaseProtocol implements ILightPush {
}
async send(encoder: IEncoder, message: IMessage): Promise<SendResult> {
const { pubSubTopic = DefaultPubSubTopic } = this.options;
const { pubSubTopic } = encoder;
ensurePubsubTopicIsConfigured(pubSubTopic, this.pubSubTopics);
const recipients: PeerId[] = [];
const { query, error: preparationError } = await this.preparePushMessage(
@ -98,6 +101,7 @@ class LightPush extends BaseProtocol implements ILightPush {
};
}
//TODO: get a relevant peer for the topic/shard
const peers = await this.getPeers({
maxBootstrapPeers: 1,
numPeers: this.NUM_PEERS_PROTOCOL

View File

@ -6,11 +6,14 @@ import type {
IMessage,
IMetaSetter,
IProtoMessage,
IRateLimitProof
IRateLimitProof,
PubSubTopic
} from "@waku/interfaces";
import { proto_message as proto } from "@waku/proto";
import debug from "debug";
import { DefaultPubSubTopic } from "../constants.js";
const log = debug("waku:message:version-0");
const OneMillion = BigInt(1_000_000);
@ -73,6 +76,7 @@ export class Encoder implements IEncoder {
constructor(
public contentTopic: string,
public ephemeral: boolean = false,
public pubSubTopic: PubSubTopic,
public metaSetter?: IMetaSetter
) {
if (!contentTopic || contentTopic === "") {
@ -115,15 +119,19 @@ export class Encoder implements IEncoder {
* messages.
*/
export function createEncoder({
pubSubTopic = DefaultPubSubTopic,
contentTopic,
ephemeral,
metaSetter
}: EncoderOptions): Encoder {
return new Encoder(contentTopic, ephemeral, metaSetter);
return new Encoder(contentTopic, ephemeral, pubSubTopic, metaSetter);
}
export class Decoder implements IDecoder<DecodedMessage> {
constructor(public contentTopic: string) {
constructor(
public pubSubTopic: PubSubTopic,
public contentTopic: string
) {
if (!contentTopic || contentTopic === "") {
throw new Error("Content topic must be specified");
}
@ -173,6 +181,9 @@ export class Decoder implements IDecoder<DecodedMessage> {
*
* @param contentTopic The resulting decoder will only decode messages with this content topic.
*/
export function createDecoder(contentTopic: string): Decoder {
return new Decoder(contentTopic);
export function createDecoder(
contentTopic: string,
pubsubTopic: PubSubTopic = DefaultPubSubTopic
): Decoder {
return new Decoder(pubsubTopic, contentTopic);
}

View File

@ -6,10 +6,11 @@ import {
IDecoder,
IStore,
Libp2p,
ProtocolCreateOptions
ProtocolCreateOptions,
PubSubTopic
} from "@waku/interfaces";
import { proto_store as proto } from "@waku/proto";
import { isDefined } from "@waku/utils";
import { ensurePubsubTopicIsConfigured, isDefined } from "@waku/utils";
import { concat, utf8ToBytes } from "@waku/utils/bytes";
import debug from "debug";
import all from "it-all";
@ -74,12 +75,12 @@ export interface QueryOptions {
* The Waku Store protocol can be used to retrieved historical messages.
*/
class Store extends BaseProtocol implements IStore {
options: ProtocolCreateOptions;
private readonly pubSubTopics: PubSubTopic[];
private readonly NUM_PEERS_PROTOCOL = 1;
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(StoreCodec, libp2p.components);
this.options = options ?? {};
this.pubSubTopics = options?.pubSubTopics ?? [DefaultPubSubTopic];
}
/**
@ -206,12 +207,20 @@ class Store extends BaseProtocol implements IStore {
* @throws If not able to reach a Waku Store peer to query,
* or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed.
*
* This API only supports querying a single pubsub topic at a time.
* If multiple decoders are provided, they must all have the same pubsub topic.
* @throws If multiple decoders with different pubsub topics are provided.
* @throws If no decoders are provided.
* @throws If no decoders are found for the provided pubsub topic.
*/
async *queryGenerator<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
options?: QueryOptions
): AsyncGenerator<Promise<T | undefined>[]> {
const { pubSubTopic = DefaultPubSubTopic } = this.options;
if (decoders.length === 0) {
throw new Error("No decoders provided");
}
let startTime, endTime;
@ -220,6 +229,23 @@ class Store extends BaseProtocol implements IStore {
endTime = options.timeFilter.endTime;
}
// convert array to set to remove duplicates
const uniquePubSubTopicsInQuery = Array.from(
new Set(decoders.map((decoder) => decoder.pubSubTopic))
);
// If multiple pubsub topics are provided, throw an error
if (uniquePubSubTopicsInQuery.length > 1) {
throw new Error(
"API does not support querying multiple pubsub topics at once"
);
}
// we can be certain that there is only one pubsub topic in the query
const pubSubTopicForQuery = uniquePubSubTopicsInQuery[0];
ensurePubsubTopicIsConfigured(pubSubTopicForQuery, this.pubSubTopics);
const decodersAsMap = new Map();
decoders.forEach((dec) => {
if (decodersAsMap.has(dec.contentTopic)) {
@ -230,11 +256,17 @@ class Store extends BaseProtocol implements IStore {
decodersAsMap.set(dec.contentTopic, dec);
});
const contentTopics = decoders.map((dec) => dec.contentTopic);
const contentTopics = decoders
.filter((decoder) => decoder.pubSubTopic === pubSubTopicForQuery)
.map((dec) => dec.contentTopic);
if (contentTopics.length === 0) {
throw new Error("No decoders found for topic " + pubSubTopicForQuery);
}
const queryOpts = Object.assign(
{
pubSubTopic: pubSubTopic,
pubSubTopic: pubSubTopicForQuery,
pageDirection: PageDirection.BACKWARD,
pageSize: DefaultPageSize
},

View File

@ -96,15 +96,18 @@ async function waitForConnectedPeer(protocol: IBaseProtocol): Promise<void> {
}
/**
* Wait for a peer with the given protocol to be connected and in the gossipsub
* mesh.
* Wait for at least one peer with the given protocol to be connected and in the gossipsub
* mesh for all pubSubTopics.
*/
async function waitForGossipSubPeerInMesh(waku: IRelay): Promise<void> {
let peers = waku.getMeshPeers();
const pubSubTopics = waku.pubSubTopics;
while (peers.length == 0) {
await pEvent(waku.gossipSub, "gossipsub:heartbeat");
peers = waku.getMeshPeers();
for (const topic of pubSubTopics) {
while (peers.length == 0) {
await pEvent(waku.gossipSub, "gossipsub:heartbeat");
peers = waku.getMeshPeers(topic);
}
}
}

View File

@ -1,3 +1,5 @@
import type { PubSubTopic } from "./misc.js";
export interface IRateLimitProof {
proof: Uint8Array;
merkleRoot: Uint8Array;
@ -36,6 +38,7 @@ export interface IMetaSetter {
}
export interface EncoderOptions {
pubSubTopic?: PubSubTopic;
/** The content topic to set on outgoing messages. */
contentTopic: string;
/**
@ -52,6 +55,7 @@ export interface EncoderOptions {
}
export interface IEncoder {
pubSubTopic: PubSubTopic;
contentTopic: string;
ephemeral: boolean;
toWire: (message: IMessage) => Promise<Uint8Array | undefined>;
@ -61,7 +65,7 @@ export interface IEncoder {
export interface IDecodedMessage {
payload: Uint8Array;
contentTopic: string;
pubSubTopic: string;
pubSubTopic: PubSubTopic;
timestamp: Date | undefined;
rateLimitProof: IRateLimitProof | undefined;
ephemeral: boolean | undefined;
@ -69,6 +73,7 @@ export interface IDecodedMessage {
}
export interface IDecoder<T extends IDecodedMessage> {
pubSubTopic: PubSubTopic;
contentTopic: string;
fromWireToProtoObj: (bytes: Uint8Array) => Promise<IProtoMessage | undefined>;
fromProtoObj: (

View File

@ -4,6 +4,7 @@ import type { Peer, PeerStore } from "@libp2p/interface/peer-store";
import type { Libp2pOptions } from "libp2p";
import type { IDecodedMessage } from "./message.js";
import type { PubSubTopic } from "./misc.js";
export enum Protocols {
Relay = "relay",
@ -22,18 +23,23 @@ export interface IBaseProtocol {
export type ProtocolCreateOptions = {
/**
* Waku supports usage of multiple pubsub topics, but this is still in early stages.
* Waku implements sharding to achieve scalability
* The format of the sharded topic is `/waku/2/rs/<shard_cluster_index>/<shard_number>`
* To learn more about the sharding specifications implemented, see [Relay Sharding](https://rfc.vac.dev/spec/51/).
* The PubSub Topic to use. Defaults to {@link @waku/core!DefaultPubSubTopic }.
*
* One and only one pubsub topic is used by Waku. This is used by:
* If no pubsub topic is specified, the default pubsub topic is used.
* The set of pubsub topics that are used to initialize the Waku node, will need to be used by the protocols as well
* You cannot currently add or remove pubsub topics after initialization.
* This is used by:
* - WakuRelay to receive, route and send messages,
* - WakuLightPush to send messages,
* - WakuStore to retrieve messages.
*
* The usage of the default pubsub topic is recommended.
* See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details.
*
*/
pubSubTopic?: string;
pubSubTopics?: PubSubTopic[];
/**
* You can pass options to the `Libp2p` instance used by {@link @waku/core!WakuNode} using the `libp2p` property.
* This property is the same type as the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create)
@ -71,6 +77,11 @@ export enum SendError {
* Compressing the message or using an alternative strategy for large messages is recommended.
*/
SIZE_TOO_BIG = "Size is too big",
/**
* The PubSubTopic passed to the send function is not configured on the Waku node.
* Please ensure that the PubSubTopic is used when initializing the Waku node.
*/
TOPIC_NOT_CONFIGURED = "Topic not configured",
/**
* Failure to find a peer with suitable protocols. This may due to a connection issue.
* Mitigation can be: retrying after a given time period, display connectivity issue

View File

@ -1,6 +1,7 @@
import type { GossipSub } from "@chainsafe/libp2p-gossipsub";
import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types";
import { PubSubTopic } from "./misc.js";
import { IReceiver } from "./receiver.js";
import type { ISender } from "./sender.js";
@ -12,6 +13,7 @@ import type { ISender } from "./sender.js";
* @property getMeshPeers - Function to retrieve the mesh peers for a given topic or all topics if none is specified. Returns an array of peer IDs as strings.
*/
export interface IRelayAPI {
readonly pubSubTopics: Set<PubSubTopic>;
readonly gossipSub: GossipSub;
start: () => Promise<void>;
getMeshPeers: (topic?: TopicStr) => PeerIdStr[];

View File

@ -1,5 +1,6 @@
import { DefaultPubSubTopic } from "@waku/core";
import { Decoder as DecoderV0 } from "@waku/core/lib/message/version_0";
import { IMetaSetter } from "@waku/interfaces";
import { IMetaSetter, PubSubTopic } from "@waku/interfaces";
import type {
EncoderOptions as BaseEncoderOptions,
IDecoder,
@ -32,6 +33,7 @@ const log = debug("waku:message-encryption:ecies");
class Encoder implements IEncoder {
constructor(
public pubSubTopic: PubSubTopic,
public contentTopic: string,
private publicKey: Uint8Array,
private sigPrivKey?: Uint8Array,
@ -95,6 +97,7 @@ export interface EncoderOptions extends BaseEncoderOptions {
* in [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/).
*/
export function createEncoder({
pubSubTopic = DefaultPubSubTopic,
contentTopic,
publicKey,
sigPrivKey,
@ -102,6 +105,7 @@ export function createEncoder({
metaSetter
}: EncoderOptions): Encoder {
return new Encoder(
pubSubTopic,
contentTopic,
publicKey,
sigPrivKey,
@ -112,10 +116,11 @@ export function createEncoder({
class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
constructor(
pubSubTopic: PubSubTopic,
contentTopic: string,
private privateKey: Uint8Array
) {
super(contentTopic);
super(pubSubTopic, contentTopic);
}
async fromProtoObj(
@ -183,7 +188,8 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
*/
export function createDecoder(
contentTopic: string,
privateKey: Uint8Array
privateKey: Uint8Array,
pubSubTopic: PubSubTopic = DefaultPubSubTopic
): Decoder {
return new Decoder(contentTopic, privateKey);
return new Decoder(pubSubTopic, contentTopic, privateKey);
}

View File

@ -1,3 +1,4 @@
import { DefaultPubSubTopic } from "@waku/core";
import { Decoder as DecoderV0 } from "@waku/core/lib/message/version_0";
import type {
EncoderOptions as BaseEncoderOptions,
@ -5,7 +6,8 @@ import type {
IEncoder,
IMessage,
IMetaSetter,
IProtoMessage
IProtoMessage,
PubSubTopic
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import debug from "debug";
@ -27,6 +29,7 @@ const log = debug("waku:message-encryption:symmetric");
class Encoder implements IEncoder {
constructor(
public pubSubTopic: PubSubTopic,
public contentTopic: string,
private symKey: Uint8Array,
private sigPrivKey?: Uint8Array,
@ -90,21 +93,30 @@ export interface EncoderOptions extends BaseEncoderOptions {
* in [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/).
*/
export function createEncoder({
pubSubTopic = DefaultPubSubTopic,
contentTopic,
symKey,
sigPrivKey,
ephemeral = false,
metaSetter
}: EncoderOptions): Encoder {
return new Encoder(contentTopic, symKey, sigPrivKey, ephemeral, metaSetter);
return new Encoder(
pubSubTopic,
contentTopic,
symKey,
sigPrivKey,
ephemeral,
metaSetter
);
}
class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
constructor(
pubSubTopic: PubSubTopic,
contentTopic: string,
private symKey: Uint8Array
) {
super(contentTopic);
super(pubSubTopic, contentTopic);
}
async fromProtoObj(
@ -172,7 +184,8 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
*/
export function createDecoder(
contentTopic: string,
symKey: Uint8Array
symKey: Uint8Array,
pubSubTopic: PubSubTopic = DefaultPubSubTopic
): Decoder {
return new Decoder(contentTopic, symKey);
return new Decoder(pubSubTopic, contentTopic, symKey);
}

View File

@ -21,10 +21,12 @@ import {
IRelay,
Libp2p,
ProtocolCreateOptions,
PubSubTopic,
SendError,
SendResult
} from "@waku/interfaces";
import { groupByContentTopic, isSizeValid, toAsyncIterator } from "@waku/utils";
import { isSizeValid, toAsyncIterator } from "@waku/utils";
import { pushOrInitMapSet } from "@waku/utils";
import debug from "debug";
import { RelayCodecs } from "./constants.js";
@ -46,7 +48,7 @@ export type ContentTopic = string;
* Throws if libp2p.pubsub does not support Waku Relay
*/
class Relay implements IRelay {
private readonly pubSubTopic: string;
public readonly pubSubTopics: Set<PubSubTopic>;
private defaultDecoder: IDecoder<IDecodedMessage>;
public static multicodec: string = RelayCodecs[0];
@ -56,7 +58,7 @@ class Relay implements IRelay {
* observers called when receiving new message.
* Observers under key `""` are always called.
*/
private observers: Map<ContentTopic, Set<unknown>>;
private observers: Map<PubSubTopic, Map<ContentTopic, Set<unknown>>>;
constructor(libp2p: Libp2p, options?: Partial<RelayCreateOptions>) {
if (!this.isRelayPubSub(libp2p.services.pubsub)) {
@ -66,21 +68,22 @@ class Relay implements IRelay {
}
this.gossipSub = libp2p.services.pubsub as GossipSub;
this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic;
this.pubSubTopics = new Set(options?.pubSubTopics ?? [DefaultPubSubTopic]);
if (this.gossipSub.isStarted()) {
this.gossipSubSubscribe(this.pubSubTopic);
this.subscribeToAllTopics();
}
this.observers = new Map();
// Default PubSubTopic decoder
// TODO: User might want to decide what decoder should be used (e.g. for RLN)
this.defaultDecoder = new TopicOnlyDecoder();
}
/**
* Mounts the gossipsub protocol onto the libp2p node
* and subscribes to the default topic.
* and subscribes to all the topics.
*
* @override
* @returns {void}
@ -91,7 +94,7 @@ class Relay implements IRelay {
}
await this.gossipSub.start();
this.gossipSubSubscribe(this.pubSubTopic);
this.subscribeToAllTopics();
}
/**
@ -99,6 +102,16 @@ class Relay implements IRelay {
*/
public async send(encoder: IEncoder, message: IMessage): Promise<SendResult> {
const recipients: PeerId[] = [];
const { pubSubTopic } = encoder;
if (!this.pubSubTopics.has(pubSubTopic)) {
log("Failed to send waku relay: topic not configured");
return {
recipients,
errors: [SendError.TOPIC_NOT_CONFIGURED]
};
}
if (!isSizeValid(message.payload)) {
log("Failed to send waku relay: message is bigger that 1MB");
return {
@ -116,50 +129,49 @@ class Relay implements IRelay {
};
}
return this.gossipSub.publish(this.pubSubTopic, msg);
return this.gossipSub.publish(pubSubTopic, msg);
}
/**
* Add an observer and associated Decoder to process incoming messages on a given content topic.
*
* @returns Function to delete the observer
*/
public subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): () => void {
const contentTopicToObservers = Array.isArray(decoders)
? toObservers(decoders, callback)
: toObservers([decoders], callback);
const observers: Array<[PubSubTopic, Observer<T>]> = [];
for (const contentTopic of contentTopicToObservers.keys()) {
const currObservers = this.observers.get(contentTopic) || new Set();
const newObservers =
contentTopicToObservers.get(contentTopic) || new Set();
for (const decoder of Array.isArray(decoders) ? decoders : [decoders]) {
const { pubSubTopic } = decoder;
const ctObs: Map<ContentTopic, Set<Observer<T>>> = this.observers.get(
pubSubTopic
) ?? new Map();
const observer = { pubSubTopic, decoder, callback };
pushOrInitMapSet(ctObs, decoder.contentTopic, observer);
this.observers.set(contentTopic, union(currObservers, newObservers));
this.observers.set(pubSubTopic, ctObs);
observers.push([pubSubTopic, observer]);
}
return () => {
for (const contentTopic of contentTopicToObservers.keys()) {
const currentObservers = this.observers.get(contentTopic) || new Set();
const observersToRemove =
contentTopicToObservers.get(contentTopic) || new Set();
const nextObservers = leftMinusJoin(
currentObservers,
observersToRemove
);
if (nextObservers.size) {
this.observers.set(contentTopic, nextObservers);
} else {
this.observers.delete(contentTopic);
}
}
this.removeObservers(observers);
};
}
private removeObservers<T extends IDecodedMessage>(
observers: Array<[PubSubTopic, Observer<T>]>
): void {
for (const [pubSubTopic, observer] of observers) {
const ctObs = this.observers.get(pubSubTopic);
if (!ctObs) continue;
const contentTopic = observer.decoder.contentTopic;
const _obs = ctObs.get(contentTopic);
if (!_obs) continue;
_obs.delete(observer);
ctObs.set(contentTopic, _obs);
this.observers.set(pubSubTopic, ctObs);
}
}
public toSubscriptionIterator<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
): Promise<IAsyncIterator<T>> {
@ -168,12 +180,20 @@ class Relay implements IRelay {
public getActiveSubscriptions(): ActiveSubscriptions {
const map = new Map();
map.set(this.pubSubTopic, this.observers.keys());
for (const pubSubTopic of this.pubSubTopics) {
map.set(pubSubTopic, Array.from(this.observers.keys()));
}
return map;
}
public getMeshPeers(topic?: TopicStr): PeerIdStr[] {
return this.gossipSub.getMeshPeers(topic ?? this.pubSubTopic);
public getMeshPeers(topic: TopicStr = DefaultPubSubTopic): PeerIdStr[] {
return this.gossipSub.getMeshPeers(topic);
}
private subscribeToAllTopics(): void {
for (const pubSubTopic of this.pubSubTopics) {
this.gossipSubSubscribe(pubSubTopic);
}
}
private async processIncomingMessage<T extends IDecodedMessage>(
@ -186,12 +206,20 @@ class Relay implements IRelay {
return;
}
const observers = this.observers.get(topicOnlyMsg.contentTopic) as Set<
// Retrieve the map of content topics for the given pubSubTopic
const contentTopicMap = this.observers.get(pubSubTopic);
if (!contentTopicMap) {
return;
}
// Retrieve the set of observers for the given contentTopic
const observers = contentTopicMap.get(topicOnlyMsg.contentTopic) as Set<
Observer<T>
>;
if (!observers) {
return;
}
await Promise.all(
Array.from(observers).map(({ decoder, callback }) => {
return (async () => {
@ -241,7 +269,7 @@ class Relay implements IRelay {
}
private isRelayPubSub(pubsub: PubSub | undefined): boolean {
return pubsub?.multicodecs?.includes(Relay.multicodec) || false;
return pubsub?.multicodecs?.includes(Relay.multicodec) ?? false;
}
}
@ -267,46 +295,3 @@ export function wakuGossipSub(
return pubsub;
};
}
function toObservers<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: Callback<T>
): Map<ContentTopic, Set<Observer<T>>> {
const contentTopicToDecoders = Array.from(
groupByContentTopic(decoders).entries()
);
const contentTopicToObserversEntries = contentTopicToDecoders.map(
([contentTopic, decoders]) =>
[
contentTopic,
new Set(
decoders.map(
(decoder) =>
({
decoder,
callback
}) as Observer<T>
)
)
] as [ContentTopic, Set<Observer<T>>]
);
return new Map(contentTopicToObserversEntries);
}
function union(left: Set<unknown>, right: Set<unknown>): Set<unknown> {
for (const val of right.values()) {
left.add(val);
}
return left;
}
function leftMinusJoin(left: Set<unknown>, right: Set<unknown>): Set<unknown> {
for (const val of right.values()) {
if (left.has(val)) {
left.delete(val);
}
}
return left;
}

View File

@ -1,3 +1,4 @@
import { DefaultPubSubTopic } from "@waku/core";
import type {
IDecodedMessage,
IDecoder,
@ -26,6 +27,7 @@ export class TopicOnlyMessage implements IDecodedMessage {
}
export class TopicOnlyDecoder implements IDecoder<TopicOnlyMessage> {
pubSubTopic = DefaultPubSubTopic;
public contentTopic = "";
fromWireToProtoObj(bytes: Uint8Array): Promise<IProtoMessage | undefined> {

View File

@ -1,15 +1,11 @@
import { createEncoder } from "@waku/core";
import { LightNode } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import { MessageCollector, NimGoNode, tearDownNodes } from "../../src/index.js";
import {
messageText,
runNodes,
TestContentTopic,
TestEncoder
} from "./utils.js";
import { messageText, runNodes, TestContentTopic } from "./utils.js";
describe("Waku Light Push [node only] - custom pubsub topic", function () {
this.timeout(15000);
@ -36,7 +32,12 @@ describe("Waku Light Push [node only] - custom pubsub topic", function () {
it("Push message", async function () {
const nimPeerId = await nwaku.getPeerId();
const pushResponse = await waku.lightPush.send(TestEncoder, {
const testEncoder = createEncoder({
contentTopic: TestContentTopic,
pubSubTopic: customPubSubTopic
});
const pushResponse = await waku.lightPush.send(testEncoder, {
payload: utf8ToBytes(messageText)
});

View File

@ -30,7 +30,7 @@ export async function runNodes(
let waku: LightNode | undefined;
try {
waku = await createLightNode({
pubSubTopic,
pubSubTopics: pubSubTopic ? [pubSubTopic] : undefined,
staticNoiseKey: NOISE_KEY_1
});
await waku.start();

View File

@ -259,6 +259,15 @@ describe("Waku Relay [node only]", () => {
let waku1: RelayNode;
let waku2: RelayNode;
let waku3: RelayNode;
const pubSubTopic = "/some/pubsub/topic";
const CustomTopicEncoder = createEncoder({
contentTopic: TestContentTopic,
pubSubTopic: pubSubTopic
});
const CustomTopicDecoder = createDecoder(TestContentTopic, pubSubTopic);
afterEach(async function () {
!!waku1 &&
waku1.stop().catch((e) => console.log("Waku failed to stop", e));
@ -271,17 +280,15 @@ describe("Waku Relay [node only]", () => {
it("Publish", async function () {
this.timeout(10000);
const pubSubTopic = "/some/pubsub/topic";
// 1 and 2 uses a custom pubsub
// 3 uses the default pubsub
[waku1, waku2, waku3] = await Promise.all([
createRelayNode({
pubSubTopic: pubSubTopic,
pubSubTopics: [pubSubTopic],
staticNoiseKey: NOISE_KEY_1
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
pubSubTopic: pubSubTopic,
pubSubTopics: [pubSubTopic],
staticNoiseKey: NOISE_KEY_2,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
}).then((waku) => waku.start().then(() => waku)),
@ -310,7 +317,7 @@ describe("Waku Relay [node only]", () => {
const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve) => {
void waku2.relay.subscribe([TestDecoder], resolve);
void waku2.relay.subscribe([CustomTopicDecoder], resolve);
}
);
@ -323,7 +330,7 @@ describe("Waku Relay [node only]", () => {
}
);
await waku1.relay.send(TestEncoder, {
await waku1.relay.send(CustomTopicEncoder, {
payload: utf8ToBytes(messageText)
});
@ -338,16 +345,14 @@ describe("Waku Relay [node only]", () => {
this.timeout(10000);
const MB = 1024 ** 2;
const pubSubTopic = "/some/pubsub/topic";
// 1 and 2 uses a custom pubsub
[waku1, waku2] = await Promise.all([
createRelayNode({
pubSubTopic: pubSubTopic,
pubSubTopics: [pubSubTopic],
staticNoiseKey: NOISE_KEY_1
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
pubSubTopic: pubSubTopic,
pubSubTopics: [pubSubTopic],
staticNoiseKey: NOISE_KEY_2,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
}).then((waku) => waku.start().then(() => waku))
@ -365,7 +370,7 @@ describe("Waku Relay [node only]", () => {
const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve) => {
void waku2.relay.subscribe([TestDecoder], () =>
void waku2.relay.subscribe([CustomTopicDecoder], () =>
resolve({
payload: new Uint8Array([])
} as DecodedMessage)
@ -373,18 +378,18 @@ describe("Waku Relay [node only]", () => {
}
);
let sendResult = await waku1.relay.send(TestEncoder, {
let sendResult = await waku1.relay.send(CustomTopicEncoder, {
payload: generateRandomUint8Array(1 * MB)
});
expect(sendResult.recipients.length).to.eq(1);
sendResult = await waku1.relay.send(TestEncoder, {
sendResult = await waku1.relay.send(CustomTopicEncoder, {
payload: generateRandomUint8Array(1 * MB + 65536)
});
expect(sendResult.recipients.length).to.eq(0);
expect(sendResult.errors).to.include(SendError.SIZE_TOO_BIG);
sendResult = await waku1.relay.send(TestEncoder, {
sendResult = await waku1.relay.send(CustomTopicEncoder, {
payload: generateRandomUint8Array(2 * MB)
});
expect(sendResult.recipients.length).to.eq(0);

View File

@ -0,0 +1,79 @@
import { createLightNode, LightNode, utf8ToBytes } from "@waku/sdk";
import { createEncoder } from "@waku/sdk";
import chai, { expect } from "chai";
import chaiAsPromised from "chai-as-promised";
import { makeLogFileName } from "../src/log_file.js";
import { NimGoNode } from "../src/node/node.js";
const PubSubTopic1 = "/waku/2/rs/0/2";
const PubSubTopic2 = "/waku/2/rs/0/3";
const ContentTopic = "/waku/2/content/test";
chai.use(chaiAsPromised);
describe("Static Sharding", () => {
let waku: LightNode;
let nwaku: NimGoNode;
beforeEach(async function () {
this.timeout(15_000);
nwaku = new NimGoNode(makeLogFileName(this));
await nwaku.start({ store: true, lightpush: true, relay: true });
});
afterEach(async function () {
!!nwaku &&
nwaku.stop().catch((e) => console.log("Nwaku failed to stop", e));
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
});
it("configure the node with multiple pubsub topics", async function () {
this.timeout(15_000);
waku = await createLightNode({
pubSubTopics: [PubSubTopic1, PubSubTopic2]
});
const encoder1 = createEncoder({
contentTopic: ContentTopic,
pubSubTopic: PubSubTopic1
});
const encoder2 = createEncoder({
contentTopic: ContentTopic,
pubSubTopic: PubSubTopic2
});
const request1 = waku.lightPush.send(encoder1, {
payload: utf8ToBytes("Hello World")
});
const request2 = waku.lightPush.send(encoder2, {
payload: utf8ToBytes("Hello World")
});
await expect(request1).to.be.fulfilled;
await expect(request2).to.be.fulfilled;
});
it("using a protocol with unconfigured pubsub topic should fail", async function () {
this.timeout(15_000);
waku = await createLightNode({
pubSubTopics: [PubSubTopic1]
});
// use a pubsub topic that is not configured
const encoder = createEncoder({
contentTopic: ContentTopic,
pubSubTopic: PubSubTopic2
});
// the following request should throw an error
const request = waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello World")
});
await expect(request).to.be.rejectedWith(Error);
});
});

View File

@ -566,6 +566,11 @@ describe("Waku Store, custom pubsub topic", () => {
let waku: LightNode;
let nwaku: NimGoNode;
const CustomPubSubTestDecoder = createDecoder(
TestContentTopic,
customPubSubTopic
);
beforeEach(async function () {
this.timeout(15_000);
nwaku = new NimGoNode(makeLogFileName(this));
@ -600,7 +605,7 @@ describe("Waku Store, custom pubsub topic", () => {
waku = await createLightNode({
staticNoiseKey: NOISE_KEY_1,
pubSubTopic: customPubSubTopic
pubSubTopics: [customPubSubTopic]
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
@ -608,7 +613,9 @@ describe("Waku Store, custom pubsub topic", () => {
const messages: IMessage[] = [];
let promises: Promise<void>[] = [];
for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) {
for await (const msgPromises of waku.store.queryGenerator([
CustomPubSubTestDecoder
])) {
const _promises = msgPromises.map(async (promise) => {
const msg = await promise;
if (msg) {

View File

@ -1,4 +1,4 @@
import { waitForRemotePeer } from "@waku/core";
import { DefaultPubSubTopic, waitForRemotePeer } from "@waku/core";
import type { LightNode, RelayNode } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { createLightNode, createRelayNode } from "@waku/sdk";
@ -39,7 +39,7 @@ describe("Wait for remote peer", function () {
await waku1.dial(multiAddrWithId);
await delay(1000);
await waitForRemotePeer(waku1, [Protocols.Relay]);
const peers = waku1.relay.getMeshPeers();
const peers = waku1.relay.getMeshPeers(DefaultPubSubTopic);
const nimPeerId = multiAddrWithId.getPeerId();
expect(nimPeerId).to.not.be.undefined;
@ -262,7 +262,7 @@ describe("Wait for remote peer", function () {
await waku1.dial(multiAddrWithId);
await waitForRemotePeer(waku1);
const peers = waku1.relay.getMeshPeers();
const peers = waku1.relay.getMeshPeers(DefaultPubSubTopic);
const nimPeerId = multiAddrWithId.getPeerId();

View File

@ -65,16 +65,16 @@
"node": ">=18"
},
"dependencies": {
"chai": "^4.3.8",
"debug": "^4.3.4",
"uint8arrays": "^4.0.4",
"@waku/interfaces": "0.0.18"
"uint8arrays": "^4.0.4"
},
"devDependencies": {
"@rollup/plugin-commonjs": "^25.0.4",
"@rollup/plugin-json": "^6.0.0",
"@rollup/plugin-node-resolve": "^15.1.0",
"@waku/build-utils": "*",
"@waku/interfaces": "0.0.17",
"@waku/interfaces": "0.0.18",
"cspell": "^7.3.2",
"npm-run-all": "^4.1.5",
"rollup": "^3.29.2"

View File

@ -3,6 +3,8 @@ export * from "./random_subset.js";
export * from "./group_by.js";
export * from "./to_async_iterator.js";
export * from "./is_size_valid.js";
export * from "./sharding.js";
export * from "./push_or_init_map.js";
export function removeItemFromArray(arr: unknown[], value: unknown): unknown[] {
const index = arr.indexOf(value);

View File

@ -0,0 +1,12 @@
import type { PubSubTopic } from "@waku/interfaces";
export function ensurePubsubTopicIsConfigured(
pubsubTopic: PubSubTopic,
configuredTopics: PubSubTopic[]
): void {
if (!configuredTopics.includes(pubsubTopic)) {
throw new Error(
`PubSub topic ${pubsubTopic} has not been configured on this instance. Configured topics are: ${configuredTopics}. Please update your configuration by passing in the topic during Waku node instantiation.`
);
}
}