reintroduce and deprecate named sharding (#1751)

Co-authored-by: danisharora099 <danisharora099@gmail.com>
This commit is contained in:
Arseniy Klempner 2024-01-09 23:34:30 -08:00 committed by GitHub
parent 528803f4c8
commit 69406bf1f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 949 additions and 70 deletions

View File

@ -5,8 +5,8 @@ import { Peer, PeerStore } from "@libp2p/interface/peer-store";
import type {
IBaseProtocol,
Libp2pComponents,
PubsubTopic,
ShardingParams
ProtocolCreateOptions,
PubsubTopic
} from "@waku/interfaces";
import { DefaultPubsubTopic } from "@waku/interfaces";
import { shardInfoToPubsubTopics } from "@waku/utils";
@ -103,9 +103,12 @@ export class BaseProtocol implements IBaseProtocol {
return filterPeers(allPeersForProtocol, numPeers, maxBootstrapPeers);
}
initializePubsubTopic(shardInfo?: ShardingParams): PubsubTopic[] {
return shardInfo
? shardInfoToPubsubTopics(shardInfo)
: [DefaultPubsubTopic];
initializePubsubTopic(options?: ProtocolCreateOptions): PubsubTopic[] {
return (
options?.pubsubTopics ??
(options?.shardInfo
? shardInfoToPubsubTopics(options.shardInfo)
: [DefaultPubsubTopic])
);
}
}

View File

@ -281,7 +281,7 @@ class Filter extends BaseProtocol implements IReceiver {
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(FilterCodecs.SUBSCRIBE, libp2p.components);
this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo);
this.pubsubTopics = this.initializePubsubTopic(options);
libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => {
log.error("Failed to register ", FilterCodecs.PUSH, e);
@ -291,11 +291,12 @@ class Filter extends BaseProtocol implements IReceiver {
}
async createSubscription(
pubsubTopicShardInfo?: SingleShardInfo
pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic
): Promise<Subscription> {
const pubsubTopic = pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic;
const pubsubTopic =
typeof pubsubTopicShardInfo == "string"
? pubsubTopicShardInfo
: singleShardInfoToPubsubTopic(pubsubTopicShardInfo);
ensurePubsubTopicIsConfigured(pubsubTopic, this.pubsubTopics);

View File

@ -49,7 +49,7 @@ class LightPush extends BaseProtocol implements ILightPush {
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(LightPushCodec, libp2p.components);
this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo);
this.pubsubTopics = this.initializePubsubTopic(options);
}
private async preparePushMessage(

View File

@ -118,6 +118,7 @@ export class Encoder implements IEncoder {
* messages.
*/
export function createEncoder({
pubsubTopic,
pubsubTopicShardInfo,
contentTopic,
ephemeral,
@ -126,7 +127,7 @@ export function createEncoder({
return new Encoder(
contentTopic,
ephemeral,
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
determinePubsubTopic(contentTopic, pubsubTopic ?? pubsubTopicShardInfo),
metaSetter
);
}
@ -186,7 +187,7 @@ export class Decoder implements IDecoder<DecodedMessage> {
*/
export function createDecoder(
contentTopic: string,
pubsubTopicShardInfo?: SingleShardInfo
pubsubTopicShardInfo?: SingleShardInfo | PubsubTopic
): Decoder {
return new Decoder(
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),

View File

@ -79,7 +79,7 @@ class Store extends BaseProtocol implements IStore {
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(StoreCodec, libp2p.components);
this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo);
this.pubsubTopics = this.initializePubsubTopic(options);
}
/**

View File

@ -55,6 +55,7 @@ export class WakuNode implements Waku {
constructor(
options: WakuOptions,
pubsubTopics: PubsubTopic[] = [],
libp2p: Libp2p,
pubsubShardInfo?: ShardingParams,
store?: (libp2p: Libp2p) => IStore,
@ -63,7 +64,8 @@ export class WakuNode implements Waku {
relay?: (libp2p: Libp2p) => IRelay
) {
if (!pubsubShardInfo) {
this.pubsubTopics = [DefaultPubsubTopic];
this.pubsubTopics =
pubsubTopics.length > 0 ? pubsubTopics : [DefaultPubsubTopic];
} else {
this.pubsubTopics = shardInfoToPubsubTopics(pubsubShardInfo);
}

View File

@ -1,7 +1,7 @@
import type { PeerId } from "@libp2p/interface/peer-id";
import type { IDecodedMessage, IDecoder, SingleShardInfo } from "./message.js";
import type { ContentTopic } from "./misc.js";
import type { ContentTopic, PubsubTopic } from "./misc.js";
import type { Callback, IBaseProtocol } from "./protocols.js";
import type { IReceiver } from "./receiver.js";
@ -25,7 +25,7 @@ export interface IFilterSubscription {
export type IFilter = IReceiver &
IBaseProtocol & {
createSubscription(
pubsubTopicShardInfo?: SingleShardInfo,
pubsubTopicShardInfo?: SingleShardInfo | PubsubTopic,
peerId?: PeerId
): Promise<IFilterSubscription>;
};

View File

@ -46,6 +46,10 @@ export interface IMetaSetter {
}
export interface EncoderOptions {
/**
* @deprecated
*/
pubsubTopic?: PubsubTopic;
pubsubTopicShardInfo?: SingleShardInfo;
/** The content topic to set on outgoing messages. */
contentTopic: string;

View File

@ -5,6 +5,7 @@ import type { Peer, PeerStore } from "@libp2p/interface/peer-store";
import type { ShardInfo } from "./enr.js";
import type { CreateLibp2pOptions } from "./libp2p.js";
import type { IDecodedMessage } from "./message.js";
import { PubsubTopic } from "./misc.js";
export enum Protocols {
Relay = "relay",
@ -26,9 +27,20 @@ export type ContentTopicInfo = {
contentTopics: string[];
};
export type ShardingParams = ShardInfo | ContentTopicInfo;
export type ApplicationInfo = {
clusterId: number;
application: string;
version: string;
};
export type ShardingParams = ShardInfo | ContentTopicInfo | ApplicationInfo;
export type ProtocolCreateOptions = {
/**
* @deprecated
* Waku will stop supporting named sharding. Only static sharding and autosharding will be supported moving forward.
*/
pubsubTopics?: PubsubTopic[];
/**
* Waku supports usage of multiple pubsub topics. This is achieved through static sharding for now, and auto-sharding in the future.
* The format to specify a shard is:

View File

@ -1,13 +1,14 @@
import { Decoder as DecoderV0 } from "@waku/core/lib/message/version_0";
import type {
EncoderOptions as BaseEncoderOptions,
IDecoder,
IEncoder,
IMessage,
IMetaSetter,
IProtoMessage,
PubsubTopic,
SingleShardInfo
import {
type EncoderOptions as BaseEncoderOptions,
DefaultPubsubTopic,
type IDecoder,
type IEncoder,
type IMessage,
type IMetaSetter,
type IProtoMessage,
type PubsubTopic,
type SingleShardInfo
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { determinePubsubTopic, Logger } from "@waku/utils";
@ -79,6 +80,10 @@ class Encoder implements IEncoder {
}
export interface EncoderOptions extends BaseEncoderOptions {
/**
* @deprecated
*/
pubsubTopic?: PubsubTopic;
/** The public key to encrypt the payload for. */
publicKey: Uint8Array;
/** An optional private key to be used to sign the payload before encryption. */
@ -98,6 +103,7 @@ export interface EncoderOptions extends BaseEncoderOptions {
* in [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/).
*/
export function createEncoder({
pubsubTopic,
pubsubTopicShardInfo,
contentTopic,
publicKey,
@ -106,7 +112,7 @@ export function createEncoder({
metaSetter
}: EncoderOptions): Encoder {
return new Encoder(
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
determinePubsubTopic(contentTopic, pubsubTopic ?? pubsubTopicShardInfo),
contentTopic,
publicKey,
sigPrivKey,
@ -194,7 +200,7 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
export function createDecoder(
contentTopic: string,
privateKey: Uint8Array,
pubsubTopicShardInfo?: SingleShardInfo
pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic
): Decoder {
return new Decoder(
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),

View File

@ -1,13 +1,14 @@
import { Decoder as DecoderV0 } from "@waku/core/lib/message/version_0";
import type {
EncoderOptions as BaseEncoderOptions,
IDecoder,
IEncoder,
IMessage,
IMetaSetter,
IProtoMessage,
PubsubTopic,
SingleShardInfo
import {
type EncoderOptions as BaseEncoderOptions,
DefaultPubsubTopic,
type IDecoder,
type IEncoder,
type IMessage,
type IMetaSetter,
type IProtoMessage,
type PubsubTopic,
type SingleShardInfo
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { determinePubsubTopic, Logger } from "@waku/utils";
@ -98,6 +99,7 @@ export interface EncoderOptions extends BaseEncoderOptions {
* in [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/).
*/
export function createEncoder({
pubsubTopic = DefaultPubsubTopic,
pubsubTopicShardInfo,
contentTopic,
symKey,
@ -106,7 +108,7 @@ export function createEncoder({
metaSetter
}: EncoderOptions): Encoder {
return new Encoder(
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
determinePubsubTopic(contentTopic, pubsubTopic ?? pubsubTopicShardInfo),
contentTopic,
symKey,
sigPrivKey,
@ -194,7 +196,7 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
export function createDecoder(
contentTopic: string,
symKey: Uint8Array,
pubsubTopicShardInfo?: SingleShardInfo
pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic
): Decoder {
return new Decoder(
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),

View File

@ -9,10 +9,10 @@ import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types";
import type { PeerId } from "@libp2p/interface/peer-id";
import type { PubSub as Libp2pPubsub } from "@libp2p/interface/pubsub";
import { sha256 } from "@noble/hashes/sha256";
import { DefaultPubsubTopic } from "@waku/interfaces";
import {
ActiveSubscriptions,
Callback,
DefaultPubsubTopic,
IAsyncIterator,
IDecodedMessage,
IDecoder,
@ -72,9 +72,11 @@ class Relay implements IRelay {
}
this.gossipSub = libp2p.services.pubsub as GossipSub;
this.pubsubTopics = options?.shardInfo
? new Set(shardInfoToPubsubTopics(options.shardInfo))
: new Set([DefaultPubsubTopic]);
this.pubsubTopics = new Set(
options?.shardInfo
? shardInfoToPubsubTopics(options.shardInfo)
: options?.pubsubTopics ?? [DefaultPubsubTopic]
);
if (this.gossipSub.isStarted()) {
this.subscribeToAllTopics();

View File

@ -39,16 +39,31 @@ const DEFAULT_NODE_REQUIREMENTS = {
export { Libp2pComponents };
const ensureShardingConfigured = (shardInfo: ShardingParams): void => {
if (
("shards" in shardInfo && shardInfo.shards.length < 1) ||
("contentTopics" in shardInfo && shardInfo.contentTopics.length < 1)
) {
throw new Error(
"Missing required configuration options for static sharding or autosharding."
);
}
};
/**
* Create a Waku node that uses Waku Light Push, Filter and Store to send and
* receive messages, enabling low resource consumption.
* Uses Waku Filter V2 by default.
* Create a Waku node configured to use autosharding or static sharding.
*/
export async function createLightNode(
options?: ProtocolCreateOptions & WakuOptions
export async function createNode(
options?: ProtocolCreateOptions & WakuOptions & Partial<RelayCreateOptions>
): Promise<LightNode> {
options = options ?? {};
if (!options.shardInfo) {
throw new Error("Shard info must be set");
}
ensureShardingConfigured(options.shardInfo);
const libp2pOptions = options?.libp2p ?? {};
const peerDiscovery = libp2pOptions.peerDiscovery ?? [];
if (options?.defaultBootstrap) {
@ -57,8 +72,8 @@ export async function createLightNode(
}
const libp2p = await defaultLibp2p(
options.shardInfo,
undefined,
wakuGossipSub(options),
libp2pOptions,
options?.userAgent
);
@ -69,6 +84,50 @@ export async function createLightNode(
return new WakuNode(
options ?? {},
[],
libp2p,
options.shardInfo,
store,
lightPush,
filter
) as LightNode;
}
/**
* Create a Waku node that uses Waku Light Push, Filter and Store to send and
* receive messages, enabling low resource consumption.
* Uses Waku Filter V2 by default.
*/
export async function createLightNode(
options?: ProtocolCreateOptions & WakuOptions
): Promise<LightNode> {
options = options ?? {};
if (options.shardInfo) {
ensureShardingConfigured(options.shardInfo);
}
const libp2pOptions = options?.libp2p ?? {};
const peerDiscovery = libp2pOptions.peerDiscovery ?? [];
if (options?.defaultBootstrap) {
peerDiscovery.push(...defaultPeerDiscoveries());
Object.assign(libp2pOptions, { peerDiscovery });
}
const libp2p = await defaultLibp2p(
options.shardInfo,
wakuGossipSub(options),
libp2pOptions,
options?.userAgent
);
const store = wakuStore(options);
const lightPush = wakuLightPush(options);
const filter = wakuFilter(options);
return new WakuNode(
options ?? {},
options.pubsubTopics,
libp2p,
options.shardInfo,
store,
@ -86,6 +145,10 @@ export async function createRelayNode(
): Promise<RelayNode> {
options = options ?? {};
if (options.shardInfo) {
ensureShardingConfigured(options.shardInfo);
}
const libp2pOptions = options?.libp2p ?? {};
const peerDiscovery = libp2pOptions.peerDiscovery ?? [];
if (options?.defaultBootstrap) {
@ -104,6 +167,7 @@ export async function createRelayNode(
return new WakuNode(
options,
options.pubsubTopics,
libp2p,
options.shardInfo,
undefined,
@ -131,6 +195,10 @@ export async function createFullNode(
): Promise<FullNode> {
options = options ?? {};
if (options.shardInfo) {
ensureShardingConfigured(options.shardInfo);
}
const libp2pOptions = options?.libp2p ?? {};
const peerDiscovery = libp2pOptions.peerDiscovery ?? [];
if (options?.defaultBootstrap) {
@ -152,6 +220,7 @@ export async function createFullNode(
return new WakuNode(
options ?? {},
options.pubsubTopics,
libp2p,
options.shardInfo,
store,

View File

@ -354,3 +354,154 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () {
}
});
});
describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
this.timeout(30000);
let waku: LightNode;
let nwaku: NimGoNode;
let nwaku2: NimGoNode;
let subscription: IFilterSubscription;
let messageCollector: MessageCollector;
const customPubsubTopic1 = singleShardInfoToPubsubTopic({
clusterId: 3,
shard: 1
});
const customPubsubTopic2 = singleShardInfoToPubsubTopic({
clusterId: 3,
shard: 2
});
const customContentTopic1 = "/test/2/waku-filter";
const customContentTopic2 = "/test/3/waku-filter";
const customEncoder1 = createEncoder({
pubsubTopic: customPubsubTopic1,
contentTopic: customContentTopic1
});
const customDecoder1 = createDecoder(customContentTopic1, customPubsubTopic1);
const customEncoder2 = createEncoder({
pubsubTopic: customPubsubTopic2,
contentTopic: customContentTopic2
});
const customDecoder2 = createDecoder(customContentTopic2, customPubsubTopic2);
this.beforeEach(async function () {
this.timeout(15000);
[nwaku, waku] = await runNodes(this, [
customPubsubTopic1,
customPubsubTopic2
]);
subscription = await waku.filter.createSubscription(customPubsubTopic1);
messageCollector = new MessageCollector();
});
this.afterEach(async function () {
this.timeout(15000);
await tearDownNodes([nwaku, nwaku2], waku);
});
it("Subscribe and receive messages on custom pubsubtopic", async function () {
await subscription.subscribe([customDecoder1], messageCollector.callback);
await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") });
expect(await messageCollector.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedContentTopic: customContentTopic1,
expectedPubsubTopic: customPubsubTopic1,
expectedMessageText: "M1"
});
});
it("Subscribe and receive messages on 2 different pubsubtopics", async function () {
await subscription.subscribe([customDecoder1], messageCollector.callback);
// Subscribe from the same lightnode to the 2nd pubsubtopic
const subscription2 = await waku.filter.createSubscription(
pubsubTopicToSingleShardInfo(customPubsubTopic2)
);
const messageCollector2 = new MessageCollector();
await subscription2.subscribe([customDecoder2], messageCollector2.callback);
await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") });
await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") });
expect(await messageCollector.waitForMessages(1)).to.eq(true);
expect(await messageCollector2.waitForMessages(1)).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedContentTopic: customContentTopic1,
expectedPubsubTopic: customPubsubTopic1,
expectedMessageText: "M1"
});
messageCollector2.verifyReceivedMessage(0, {
expectedContentTopic: customContentTopic2,
expectedPubsubTopic: customPubsubTopic2,
expectedMessageText: "M2"
});
});
it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () {
await subscription.subscribe([customDecoder1], messageCollector.callback);
// Set up and start a new nwaku node with customPubsubTopic1
nwaku2 = new NimGoNode(makeLogFileName(this) + "2");
await nwaku2.start({
filter: true,
lightpush: true,
relay: true,
pubsubTopic: [customPubsubTopic2]
});
await waku.dial(await nwaku2.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
// Subscribe from the same lightnode to the new nwaku on the new pubsubtopic
const subscription2 = await waku.filter.createSubscription(
pubsubTopicToSingleShardInfo(customPubsubTopic2),
await nwaku2.getPeerId()
);
await nwaku2.ensureSubscriptions([customPubsubTopic2]);
const messageCollector2 = new MessageCollector();
await subscription2.subscribe([customDecoder2], messageCollector2.callback);
// Making sure that messages are send and reveiced for both subscriptions
// While loop is done because of https://github.com/waku-org/js-waku/issues/1606
while (
!(await messageCollector.waitForMessages(1, {
pubsubTopic: customPubsubTopic1
})) ||
!(await messageCollector2.waitForMessages(1, {
pubsubTopic: customPubsubTopic2
}))
) {
await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") });
await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") });
}
messageCollector.verifyReceivedMessage(0, {
expectedContentTopic: customContentTopic1,
expectedPubsubTopic: customPubsubTopic1,
expectedMessageText: "M1"
});
messageCollector2.verifyReceivedMessage(0, {
expectedContentTopic: customContentTopic2,
expectedPubsubTopic: customPubsubTopic2,
expectedMessageText: "M2"
});
});
it("Should fail to subscribe with decoder with wrong pubsubTopic", async function () {
// this subscription object is set up with the `customPubsubTopic` but we're passing it a Decoder with the `DefaultPubsubTopic`
try {
await subscription.subscribe([customDecoder2], messageCollector.callback);
} catch (error) {
expect((error as Error).message).to.include(
"Pubsub topic not configured"
);
}
});
});

View File

@ -63,6 +63,7 @@ export async function runNodes(
const waku_options = {
staticNoiseKey: NOISE_KEY_1,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
pubsubTopics: shardInfo ? undefined : pubsubTopics,
...((pubsubTopics.length !== 1 ||
pubsubTopics[0] !== DefaultPubsubTopic) && {
shardInfo: shardInfo

View File

@ -184,7 +184,7 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () {
// When using lightpush, we have to use a cluster id of 1 because that is the default cluster id for autosharding
// With a different cluster id, we never find a viable peer
const clusterId = 1;
const customContentTopic1 = "/waku/2/content/utf8";
const customContentTopic1 = "/waku/2/content/test.js";
const customContentTopic2 = "/myapp/1/latest/proto";
const autoshardingPubsubTopic1 = contentTopicToPubsubTopic(
customContentTopic1,
@ -194,19 +194,17 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () {
customContentTopic2,
clusterId
);
const contentTopicInfo: ContentTopicInfo = {
const shardInfo: ContentTopicInfo = {
clusterId,
contentTopics: [customContentTopic1, customContentTopic2]
};
const customEncoder1 = createEncoder({
contentTopic: customContentTopic1,
pubsubTopicShardInfo: {
clusterId
}
pubsubTopicShardInfo: shardInfo
});
const customEncoder2 = createEncoder({
contentTopic: customContentTopic2,
pubsubTopicShardInfo: { clusterId }
pubsubTopicShardInfo: shardInfo
});
let nimPeerId: PeerId;
@ -216,7 +214,7 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () {
[nwaku, waku] = await runNodes(
this,
[autoshardingPubsubTopic1, autoshardingPubsubTopic2],
contentTopicInfo
shardInfo
);
messageCollector = new MessageCollector(nwaku);
nimPeerId = await nwaku.getPeerId();
@ -331,3 +329,160 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () {
});
});
});
describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () {
this.timeout(30000);
let waku: LightNode;
let nwaku: NimGoNode;
let nwaku2: NimGoNode;
let messageCollector: MessageCollector;
// When using lightpush, we have to use a cluster id of 1 because that is the default cluster id for autosharding
// With a different cluster id, we never find a viable peer
const clusterId = 1;
const customContentTopic1 = "/waku/2/content/utf8";
const customContentTopic2 = "/myapp/1/latest/proto";
const autoshardingPubsubTopic1 = contentTopicToPubsubTopic(
customContentTopic1,
clusterId
);
const autoshardingPubsubTopic2 = contentTopicToPubsubTopic(
customContentTopic2,
clusterId
);
const contentTopicInfo: ContentTopicInfo = {
clusterId,
contentTopics: [customContentTopic1, customContentTopic2]
};
const customEncoder1 = createEncoder({
contentTopic: customContentTopic1,
pubsubTopicShardInfo: {
clusterId
}
});
const customEncoder2 = createEncoder({
contentTopic: customContentTopic2,
pubsubTopicShardInfo: { clusterId }
});
let nimPeerId: PeerId;
this.beforeEach(async function () {
this.timeout(15000);
[nwaku, waku] = await runNodes(
this,
[autoshardingPubsubTopic1, autoshardingPubsubTopic2],
contentTopicInfo
);
messageCollector = new MessageCollector(nwaku);
nimPeerId = await nwaku.getPeerId();
});
this.afterEach(async function () {
this.timeout(15000);
await tearDownNodes([nwaku, nwaku2], waku);
});
it("Push message on custom pubsubTopic", async function () {
const pushResponse = await waku.lightPush.send(customEncoder1, {
payload: utf8ToBytes(messageText)
});
expect(pushResponse.recipients[0].toString()).to.eq(nimPeerId.toString());
expect(
await messageCollector.waitForMessages(1, {
pubsubTopic: autoshardingPubsubTopic1
})
).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: customContentTopic1
});
});
it("Subscribe and receive messages on 2 different pubsubtopics", async function () {
const pushResponse1 = await waku.lightPush.send(customEncoder1, {
payload: utf8ToBytes("M1")
});
const pushResponse2 = await waku.lightPush.send(customEncoder2, {
payload: utf8ToBytes("M2")
});
expect(pushResponse1.recipients[0].toString()).to.eq(nimPeerId.toString());
expect(pushResponse2.recipients[0].toString()).to.eq(nimPeerId.toString());
const messageCollector2 = new MessageCollector(nwaku);
expect(
await messageCollector.waitForMessages(1, {
pubsubTopic: autoshardingPubsubTopic1
})
).to.eq(true);
expect(
await messageCollector2.waitForMessages(1, {
pubsubTopic: autoshardingPubsubTopic2
})
).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: customContentTopic1,
expectedPubsubTopic: autoshardingPubsubTopic1
});
messageCollector2.verifyReceivedMessage(0, {
expectedMessageText: "M2",
expectedContentTopic: customContentTopic2,
expectedPubsubTopic: autoshardingPubsubTopic2
});
});
it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () {
// Set up and start a new nwaku node with Default PubsubTopic
nwaku2 = new NimGoNode(makeLogFileName(this) + "2");
await nwaku2.start({
filter: true,
lightpush: true,
relay: true,
pubsubTopic: [autoshardingPubsubTopic2]
});
await nwaku2.ensureSubscriptions([autoshardingPubsubTopic2]);
await waku.dial(await nwaku2.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.LightPush]);
const messageCollector2 = new MessageCollector(nwaku2);
let pushResponse1: SendResult;
let pushResponse2: SendResult;
// Making sure that we send messages to both nwaku nodes
// While loop is done because of https://github.com/waku-org/js-waku/issues/1606
while (
!(await messageCollector.waitForMessages(1, {
pubsubTopic: autoshardingPubsubTopic1
})) ||
!(await messageCollector2.waitForMessages(1, {
pubsubTopic: autoshardingPubsubTopic2
})) ||
pushResponse1!.recipients[0].toString() ===
pushResponse2!.recipients[0].toString()
) {
pushResponse1 = await waku.lightPush.send(customEncoder1, {
payload: utf8ToBytes("M1")
});
pushResponse2 = await waku.lightPush.send(customEncoder2, {
payload: utf8ToBytes("M2")
});
}
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: customContentTopic1,
expectedPubsubTopic: autoshardingPubsubTopic1
});
messageCollector2.verifyReceivedMessage(0, {
expectedMessageText: "M2",
expectedContentTopic: customContentTopic2,
expectedPubsubTopic: autoshardingPubsubTopic2
});
});
});

View File

@ -35,6 +35,7 @@ export async function runNodes(
pubsubTopics[0] !== DefaultPubsubTopic) && {
shardInfo: shardInfo
}),
pubsubTopics: shardInfo ? undefined : pubsubTopics,
staticNoiseKey: NOISE_KEY_1
});
await waku.start();

View File

@ -638,3 +638,276 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () {
expect(waku2ReceivedMsg.pubsubTopic).to.eq(autoshardingPubsubTopic1);
});
});
describe("Waku Relay (named sharding), multiple pubsub topics", function () {
this.timeout(15000);
let waku1: RelayNode;
let waku2: RelayNode;
let waku3: RelayNode;
const customPubsubTopic1 = singleShardInfoToPubsubTopic({
clusterId: 3,
shard: 1
});
const customPubsubTopic2 = singleShardInfoToPubsubTopic({
clusterId: 3,
shard: 2
});
const customContentTopic1 = "/test/2/waku-relay/utf8";
const customContentTopic2 = "/test/3/waku-relay/utf8";
const customEncoder1 = createEncoder({
pubsubTopic: customPubsubTopic1,
contentTopic: customContentTopic1
});
const customDecoder1 = createDecoder(customContentTopic1, customPubsubTopic1);
const customEncoder2 = createEncoder({
pubsubTopic: customPubsubTopic2,
contentTopic: customContentTopic2
});
const customDecoder2 = createDecoder(customContentTopic2, customPubsubTopic2);
afterEach(async function () {
this.timeout(15000);
await tearDownNodes([], [waku1, waku2, waku3]);
});
[
{
pubsub: customPubsubTopic1,
encoder: customEncoder1,
decoder: customDecoder1
},
{
pubsub: customPubsubTopic2,
encoder: customEncoder2,
decoder: customDecoder2
}
].forEach((testItem) => {
it(`3 nodes on ${testItem.pubsub} topic`, async function () {
const [msgCollector1, msgCollector2, msgCollector3] = Array(3)
.fill(null)
.map(() => new MessageCollector());
[waku1, waku2, waku3] = await Promise.all([
createRelayNode({
pubsubTopics: [testItem.pubsub],
staticNoiseKey: NOISE_KEY_1
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
pubsubTopics: [testItem.pubsub],
staticNoiseKey: NOISE_KEY_2,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
pubsubTopics: [testItem.pubsub],
staticNoiseKey: NOISE_KEY_3
}).then((waku) => waku.start().then(() => waku))
]);
await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, {
multiaddrs: waku2.libp2p.getMultiaddrs()
});
await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, {
multiaddrs: waku2.libp2p.getMultiaddrs()
});
await Promise.all([
waku1.dial(waku2.libp2p.peerId),
waku3.dial(waku2.libp2p.peerId)
]);
await Promise.all([
waitForRemotePeer(waku1, [Protocols.Relay]),
waitForRemotePeer(waku2, [Protocols.Relay]),
waitForRemotePeer(waku3, [Protocols.Relay])
]);
await waku1.relay.subscribe([testItem.decoder], msgCollector1.callback);
await waku2.relay.subscribe([testItem.decoder], msgCollector2.callback);
await waku3.relay.subscribe([testItem.decoder], msgCollector3.callback);
// The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network
const relayResponse1 = await waku1.relay.send(testItem.encoder, {
payload: utf8ToBytes("M1")
});
const relayResponse2 = await waku2.relay.send(testItem.encoder, {
payload: utf8ToBytes("M2")
});
const relayResponse3 = await waku3.relay.send(testItem.encoder, {
payload: utf8ToBytes("M3")
});
expect(relayResponse1.recipients[0].toString()).to.eq(
waku2.libp2p.peerId.toString()
);
expect(relayResponse3.recipients[0].toString()).to.eq(
waku2.libp2p.peerId.toString()
);
expect(relayResponse2.recipients.map((r) => r.toString())).to.include(
waku1.libp2p.peerId.toString()
);
expect(relayResponse2.recipients.map((r) => r.toString())).to.include(
waku3.libp2p.peerId.toString()
);
expect(await msgCollector1.waitForMessages(2, { exact: true })).to.eq(
true
);
expect(await msgCollector2.waitForMessages(2, { exact: true })).to.eq(
true
);
expect(await msgCollector3.waitForMessages(2, { exact: true })).to.eq(
true
);
expect(
msgCollector1.hasMessage(testItem.encoder.contentTopic, "M2")
).to.eq(true);
expect(
msgCollector1.hasMessage(testItem.encoder.contentTopic, "M3")
).to.eq(true);
expect(
msgCollector2.hasMessage(testItem.encoder.contentTopic, "M1")
).to.eq(true);
expect(
msgCollector2.hasMessage(testItem.encoder.contentTopic, "M3")
).to.eq(true);
expect(
msgCollector3.hasMessage(testItem.encoder.contentTopic, "M1")
).to.eq(true);
expect(
msgCollector3.hasMessage(testItem.encoder.contentTopic, "M2")
).to.eq(true);
});
});
it("Nodes with multiple pubsub topic", async function () {
const [msgCollector1, msgCollector2, msgCollector3] = Array(3)
.fill(null)
.map(() => new MessageCollector());
// Waku1 and waku2 are using multiple pubsub topis
[waku1, waku2, waku3] = await Promise.all([
createRelayNode({
pubsubTopics: [customPubsubTopic1, customPubsubTopic2],
staticNoiseKey: NOISE_KEY_1
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
pubsubTopics: [customPubsubTopic1, customPubsubTopic2],
staticNoiseKey: NOISE_KEY_2,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
pubsubTopics: [customPubsubTopic1],
staticNoiseKey: NOISE_KEY_3
}).then((waku) => waku.start().then(() => waku))
]);
await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, {
multiaddrs: waku2.libp2p.getMultiaddrs()
});
await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, {
multiaddrs: waku2.libp2p.getMultiaddrs()
});
await Promise.all([
waku1.dial(waku2.libp2p.peerId),
waku3.dial(waku2.libp2p.peerId)
]);
await Promise.all([
waitForRemotePeer(waku1, [Protocols.Relay]),
waitForRemotePeer(waku2, [Protocols.Relay]),
waitForRemotePeer(waku3, [Protocols.Relay])
]);
await waku1.relay.subscribe(
[customDecoder1, customDecoder2],
msgCollector1.callback
);
await waku2.relay.subscribe(
[customDecoder1, customDecoder2],
msgCollector2.callback
);
await waku3.relay.subscribe([customDecoder1], msgCollector3.callback);
// The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network
// However onlt waku1 and waku2 are receiving messages on the CustomPubSubTopic
await waku1.relay.send(customEncoder1, { payload: utf8ToBytes("M1") });
await waku1.relay.send(customEncoder2, { payload: utf8ToBytes("M2") });
await waku2.relay.send(customEncoder1, { payload: utf8ToBytes("M3") });
await waku2.relay.send(customEncoder2, { payload: utf8ToBytes("M4") });
await waku3.relay.send(customEncoder1, { payload: utf8ToBytes("M5") });
await waku3.relay.send(customEncoder2, { payload: utf8ToBytes("M6") });
expect(await msgCollector1.waitForMessages(3, { exact: true })).to.eq(true);
expect(await msgCollector2.waitForMessages(3, { exact: true })).to.eq(true);
expect(await msgCollector3.waitForMessages(2, { exact: true })).to.eq(true);
expect(msgCollector1.hasMessage(customContentTopic1, "M3")).to.eq(true);
expect(msgCollector1.hasMessage(customContentTopic2, "M4")).to.eq(true);
expect(msgCollector1.hasMessage(customContentTopic1, "M5")).to.eq(true);
expect(msgCollector2.hasMessage(customContentTopic1, "M1")).to.eq(true);
expect(msgCollector2.hasMessage(customContentTopic2, "M2")).to.eq(true);
expect(msgCollector2.hasMessage(customContentTopic1, "M5")).to.eq(true);
expect(msgCollector3.hasMessage(customContentTopic1, "M1")).to.eq(true);
expect(msgCollector3.hasMessage(customContentTopic1, "M3")).to.eq(true);
});
it("n1 and n2 uses a custom pubsub, n3 uses the default pubsub", async function () {
[waku1, waku2, waku3] = await Promise.all([
createRelayNode({
pubsubTopics: [customPubsubTopic1],
staticNoiseKey: NOISE_KEY_1
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
pubsubTopics: [customPubsubTopic1],
staticNoiseKey: NOISE_KEY_2,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
staticNoiseKey: NOISE_KEY_3
}).then((waku) => waku.start().then(() => waku))
]);
await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, {
multiaddrs: waku2.libp2p.getMultiaddrs()
});
await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, {
multiaddrs: waku2.libp2p.getMultiaddrs()
});
await Promise.all([
waku1.dial(waku2.libp2p.peerId),
waku3.dial(waku2.libp2p.peerId)
]);
await Promise.all([
waitForRemotePeer(waku1, [Protocols.Relay]),
waitForRemotePeer(waku2, [Protocols.Relay])
]);
const messageText = "Communicating using a custom pubsub topic";
const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve) => {
void waku2.relay.subscribe([customDecoder1], resolve);
}
);
// The promise **fails** if we receive a message on the default
// pubsub topic.
const waku3NoMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve, reject) => {
void waku3.relay.subscribe([TestDecoder], reject);
setTimeout(resolve, 1000);
}
);
await waku1.relay.send(customEncoder1, {
payload: utf8ToBytes(messageText)
});
const waku2ReceivedMsg = await waku2ReceivedMsgPromise;
await waku3NoMsgPromise;
expect(bytesToUtf8(waku2ReceivedMsg.payload!)).to.eq(messageText);
expect(waku2ReceivedMsg.pubsubTopic).to.eq(customPubsubTopic1);
});
});

View File

@ -321,3 +321,163 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () {
}
});
});
describe("Waku Store (named sharding), custom pubsub topic", function () {
this.timeout(15000);
let waku: LightNode;
let nwaku: NimGoNode;
let nwaku2: NimGoNode;
const customDecoder1 = createDecoder(
customContentTopic1,
customShardedPubsubTopic1
);
const customDecoder2 = createDecoder(
customContentTopic2,
customShardedPubsubTopic2
);
beforeEach(async function () {
this.timeout(15000);
nwaku = new NimGoNode(makeLogFileName(this));
await nwaku.start({
store: true,
pubsubTopic: [customShardedPubsubTopic1, customShardedPubsubTopic2],
relay: true
});
await nwaku.ensureSubscriptions([
customShardedPubsubTopic1,
customShardedPubsubTopic2
]);
});
afterEach(async function () {
this.timeout(15000);
await tearDownNodes([nwaku, nwaku2], waku);
});
it("Generator, custom pubsub topic", async function () {
await sendMessages(
nwaku,
totalMsgs,
customContentTopic1,
customShardedPubsubTopic1
);
waku = await startAndConnectLightNode(nwaku, [
customShardedPubsubTopic1,
customShardedPubsubTopic2
]);
const messages = await processQueriedMessages(
waku,
[customDecoder1],
customShardedPubsubTopic1
);
expect(messages?.length).eq(totalMsgs);
const result = messages?.findIndex((msg) => {
return msg.payload![0]! === 0;
});
expect(result).to.not.eq(-1);
});
it("Generator, 2 different pubsubtopics", async function () {
this.timeout(10000);
const totalMsgs = 10;
await sendMessages(
nwaku,
totalMsgs,
customContentTopic1,
customShardedPubsubTopic1
);
await sendMessages(
nwaku,
totalMsgs,
customContentTopic2,
customShardedPubsubTopic2
);
waku = await startAndConnectLightNode(nwaku, [
customShardedPubsubTopic1,
customShardedPubsubTopic2
]);
const customMessages = await processQueriedMessages(
waku,
[customDecoder1],
customShardedPubsubTopic1
);
expect(customMessages?.length).eq(totalMsgs);
const result1 = customMessages?.findIndex((msg) => {
return msg.payload![0]! === 0;
});
expect(result1).to.not.eq(-1);
const testMessages = await processQueriedMessages(
waku,
[customDecoder2],
customShardedPubsubTopic2
);
expect(testMessages?.length).eq(totalMsgs);
const result2 = testMessages?.findIndex((msg) => {
return msg.payload![0]! === 0;
});
expect(result2).to.not.eq(-1);
});
it("Generator, 2 nwaku nodes each with different pubsubtopics", async function () {
this.timeout(10000);
// Set up and start a new nwaku node with Default Pubsubtopic
nwaku2 = new NimGoNode(makeLogFileName(this) + "2");
await nwaku2.start({
store: true,
pubsubTopic: [customShardedPubsubTopic2],
relay: true
});
await nwaku2.ensureSubscriptions([customShardedPubsubTopic2]);
const totalMsgs = 10;
await sendMessages(
nwaku,
totalMsgs,
customContentTopic1,
customShardedPubsubTopic1
);
await sendMessages(
nwaku2,
totalMsgs,
customContentTopic2,
customShardedPubsubTopic2
);
waku = await createLightNode({
staticNoiseKey: NOISE_KEY_1,
pubsubTopics: [customShardedPubsubTopic1, customShardedPubsubTopic2]
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.dial(await nwaku2.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]);
let customMessages: IMessage[] = [];
let testMessages: IMessage[] = [];
while (
customMessages.length != totalMsgs ||
testMessages.length != totalMsgs
) {
customMessages = await processQueriedMessages(
waku,
[customDecoder1],
customShardedPubsubTopic1
);
testMessages = await processQueriedMessages(
waku,
[customDecoder2],
customShardedPubsubTopic2
);
}
});
});

View File

@ -111,6 +111,7 @@ export async function startAndConnectLightNode(
pubsubTopics[0] !== DefaultPubsubTopic) && {
shardInfo: shardInfo
}),
pubsubTopics: shardInfo ? undefined : pubsubTopics,
staticNoiseKey: NOISE_KEY_1
});
await waku.start();

View File

@ -128,3 +128,14 @@ describe("contentTopicsByPubsubTopic", () => {
}
});
});
describe("contentTopicsByPubsubTopic", () => {
it("groups content topics by expected pubsub topic", () => {
const contentTopics = ["/toychat/2/huilong/proto", "/myapp/1/latest/proto"];
const grouped = contentTopicsByPubsubTopic(contentTopics);
for (const contentTopic of contentTopics) {
const pubsubTopic = contentTopicToPubsubTopic(contentTopic);
expect(grouped.get(pubsubTopic)?.includes(contentTopic)).to.be.true;
}
});
});

View File

@ -23,14 +23,31 @@ export const shardInfoToPubsubTopics = (
if (shardInfo.clusterId === undefined)
throw new Error("Cluster ID must be specified");
if ("contentTopics" in shardInfo) {
return shardInfo.contentTopics.map((contentTopic) =>
// Autosharding: explicitly defined content topics
return Array.from(
new Set(
shardInfo.contentTopics.map((contentTopic) =>
contentTopicToPubsubTopic(contentTopic, shardInfo.clusterId)
)
)
);
} else if ("shards" in shardInfo) {
// Static sharding
if (shardInfo.shards === undefined) throw new Error("Invalid shard");
return Array.from(
new Set(
shardInfo.shards.map(
(index) => `/waku/2/rs/${shardInfo.clusterId}/${index}`
)
)
);
} else {
if (shardInfo.shards === undefined) throw new Error("Invalid shard");
return shardInfo.shards.map(
(index) => `/waku/2/rs/${shardInfo.clusterId}/${index}`
);
// Autosharding: single shard from application and version
return [
contentTopicToPubsubTopic(
`/${shardInfo.application}/${shardInfo.version}/default/default`
)
];
}
};
@ -183,11 +200,18 @@ export function contentTopicsByPubsubTopic(
*/
export function determinePubsubTopic(
contentTopic: string,
pubsubTopicShardInfo?: SingleShardInfo
pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic
): string {
if (typeof pubsubTopicShardInfo == "string") {
return pubsubTopicShardInfo;
} else {
return pubsubTopicShardInfo
? pubsubTopicShardInfo.shard
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: contentTopicToPubsubTopic(contentTopic, pubsubTopicShardInfo.clusterId)
: contentTopicToPubsubTopic(
contentTopic,
pubsubTopicShardInfo.clusterId
)
: DefaultPubsubTopic;
}
}