Merge pull request #1723 from waku-org/adklempner/autoshard-encoder-decoder

feat: add support for autosharded pubsub topics
This commit is contained in:
Arseniy Klempner 2023-12-21 10:43:47 -08:00 committed by GitHub
commit 197926d52f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 1351 additions and 122 deletions

View File

@ -1,5 +1,4 @@
export { DefaultUserAgent } from "./lib/waku.js";
export { DefaultPubsubTopic } from "./lib/constants.js";
export { createEncoder, createDecoder } from "./lib/message/version_0.js";
export type {
Encoder,

View File

@ -6,12 +6,12 @@ import type {
IBaseProtocol,
Libp2pComponents,
PubsubTopic,
ShardInfo
ShardingParams
} from "@waku/interfaces";
import { DefaultPubsubTopic } from "@waku/interfaces";
import { shardInfoToPubsubTopics } from "@waku/utils";
import { getPeersForProtocol, selectPeerForProtocol } from "@waku/utils/libp2p";
import { DefaultPubsubTopic } from "./constants.js";
import { filterPeers } from "./filterPeers.js";
import { StreamManager } from "./stream_manager.js";
@ -97,7 +97,7 @@ export class BaseProtocol implements IBaseProtocol {
return filterPeers(allPeersForProtocol, numPeers, maxBootstrapPeers);
}
initializePubsubTopic(shardInfo?: ShardInfo): PubsubTopic[] {
initializePubsubTopic(shardInfo?: ShardingParams): PubsubTopic[] {
return shardInfo
? shardInfoToPubsubTopics(shardInfo)
: [DefaultPubsubTopic];

View File

@ -17,6 +17,7 @@ import type {
SingleShardInfo,
Unsubscribe
} from "@waku/interfaces";
import { DefaultPubsubTopic } from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import {
ensurePubsubTopicIsConfigured,
@ -30,7 +31,6 @@ import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { BaseProtocol } from "../base_protocol.js";
import { DefaultPubsubTopic } from "../constants.js";
import {
FilterPushRpc,

View File

@ -11,9 +11,7 @@ import type {
SingleShardInfo
} from "@waku/interfaces";
import { proto_message as proto } from "@waku/proto";
import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils";
import { DefaultPubsubTopic } from "../constants.js";
import { determinePubsubTopic, Logger } from "@waku/utils";
const log = new Logger("message:version-0");
const OneMillion = BigInt(1_000_000);
@ -128,9 +126,7 @@ export function createEncoder({
return new Encoder(
contentTopic,
ephemeral,
pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic,
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
metaSetter
);
}
@ -193,9 +189,7 @@ export function createDecoder(
pubsubTopicShardInfo?: SingleShardInfo
): Decoder {
return new Decoder(
pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic,
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
contentTopic
);
}

View File

@ -1,7 +1,12 @@
import type { PeerId } from "@libp2p/interface/peer-id";
import { IncomingStreamData } from "@libp2p/interface/stream-handler";
import { encodeRelayShard } from "@waku/enr";
import type { IMetadata, Libp2pComponents, ShardInfo } from "@waku/interfaces";
import type {
IMetadata,
Libp2pComponents,
ShardInfo,
ShardingParams
} from "@waku/interfaces";
import { proto_metadata } from "@waku/proto";
import { Logger } from "@waku/utils";
import all from "it-all";
@ -16,9 +21,9 @@ const log = new Logger("metadata");
export const MetadataCodec = "/vac/waku/metadata/1.0.0";
class Metadata extends BaseProtocol {
private readonly shardInfo: ShardInfo;
private readonly shardInfo: ShardingParams;
private libp2pComponents: Libp2pComponents;
constructor(shardInfo: ShardInfo, libp2p: Libp2pComponents) {
constructor(shardInfo: ShardingParams, libp2p: Libp2pComponents) {
super(MetadataCodec, libp2p.components);
this.libp2pComponents = libp2p;
this.shardInfo = shardInfo;
@ -99,7 +104,7 @@ class Metadata extends BaseProtocol {
}
export function wakuMetadata(
shardInfo: ShardInfo
shardInfo: ShardingParams
): (components: Libp2pComponents) => IMetadata {
return (components: Libp2pComponents) => new Metadata(shardInfo, components);
}

View File

@ -8,14 +8,13 @@ import type {
IStore,
Libp2p,
PubsubTopic,
ShardInfo,
ShardingParams,
Waku
} from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { DefaultPubsubTopic, Protocols } from "@waku/interfaces";
import { Logger, shardInfoToPubsubTopics } from "@waku/utils";
import { ConnectionManager } from "./connection_manager.js";
import { DefaultPubsubTopic } from "./constants.js";
export const DefaultPingKeepAliveValueSecs = 5 * 60;
export const DefaultRelayKeepAliveValueSecs = 5 * 60;
@ -57,7 +56,7 @@ export class WakuNode implements Waku {
constructor(
options: WakuOptions,
libp2p: Libp2p,
pubsubShardInfo?: ShardInfo,
pubsubShardInfo?: ShardingParams,
store?: (libp2p: Libp2p) => IStore,
lightPush?: (libp2p: Libp2p) => ILightPush,
filter?: (libp2p: Libp2p) => IFilter,

View File

@ -15,3 +15,4 @@ export * from "./libp2p.js";
export * from "./keep_alive_manager.js";
export * from "./dns_discovery.js";
export * from "./metadata.js";
export * from "./constants.js";

View File

@ -2,7 +2,10 @@ import type { PubsubTopic } from "./misc.js";
export interface SingleShardInfo {
clusterId: number;
shard: number;
/**
* Specifying this field indicates to the encoder/decoder that static sharding must be used.
*/
shard?: number;
}
export interface IRateLimitProof {

View File

@ -21,6 +21,13 @@ export interface IBaseProtocol {
removeLibp2pEventListener: Libp2p["removeEventListener"];
}
export type ContentTopicInfo = {
clusterId: number;
contentTopics: string[];
};
export type ShardingParams = ShardInfo | ContentTopicInfo;
export type ProtocolCreateOptions = {
/**
* Waku supports usage of multiple pubsub topics. This is achieved through static sharding for now, and auto-sharding in the future.
@ -39,7 +46,7 @@ export type ProtocolCreateOptions = {
* See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details.
*
*/
shardInfo?: ShardInfo;
shardInfo?: ShardingParams;
/**
* You can pass options to the `Libp2p` instance used by {@link @waku/core!WakuNode} using the `libp2p` property.
* This property is the same type as the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create)

View File

@ -1,4 +1,3 @@
import { DefaultPubsubTopic } from "@waku/core";
import { Decoder as DecoderV0 } from "@waku/core/lib/message/version_0";
import type {
EncoderOptions as BaseEncoderOptions,
@ -11,7 +10,7 @@ import type {
SingleShardInfo
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils";
import { determinePubsubTopic, Logger } from "@waku/utils";
import { generatePrivateKey } from "./crypto/utils.js";
import { DecodedMessage } from "./decoded_message.js";
@ -107,9 +106,7 @@ export function createEncoder({
metaSetter
}: EncoderOptions): Encoder {
return new Encoder(
pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic,
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
contentTopic,
publicKey,
sigPrivKey,
@ -200,9 +197,7 @@ export function createDecoder(
pubsubTopicShardInfo?: SingleShardInfo
): Decoder {
return new Decoder(
pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic,
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
contentTopic,
privateKey
);

View File

@ -1,4 +1,3 @@
import { DefaultPubsubTopic } from "@waku/core";
import { Decoder as DecoderV0 } from "@waku/core/lib/message/version_0";
import type {
EncoderOptions as BaseEncoderOptions,
@ -11,7 +10,7 @@ import type {
SingleShardInfo
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils";
import { determinePubsubTopic, Logger } from "@waku/utils";
import { generateSymmetricKey } from "./crypto/utils.js";
import { DecodedMessage } from "./decoded_message.js";
@ -107,9 +106,7 @@ export function createEncoder({
metaSetter
}: EncoderOptions): Encoder {
return new Encoder(
pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic,
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
contentTopic,
symKey,
sigPrivKey,
@ -200,9 +197,7 @@ export function createDecoder(
pubsubTopicShardInfo?: SingleShardInfo
): Decoder {
return new Decoder(
pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic,
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
contentTopic,
symKey
);

View File

@ -9,7 +9,7 @@ import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types";
import type { PeerId } from "@libp2p/interface/peer-id";
import type { PubSub as Libp2pPubsub } from "@libp2p/interface/pubsub";
import { sha256 } from "@noble/hashes/sha256";
import { DefaultPubsubTopic } from "@waku/core";
import { DefaultPubsubTopic } from "@waku/interfaces";
import {
ActiveSubscriptions,
Callback,

View File

@ -1,4 +1,4 @@
import { DefaultPubsubTopic } from "@waku/core";
import { DefaultPubsubTopic } from "@waku/interfaces";
import type {
IDecodedMessage,
IDecoder,

View File

@ -23,7 +23,7 @@ import type {
LightNode,
ProtocolCreateOptions,
RelayNode,
ShardInfo
ShardingParams
} from "@waku/interfaces";
import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange";
import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay";
@ -180,7 +180,7 @@ type MetadataService = {
};
export async function defaultLibp2p(
shardInfo?: ShardInfo,
shardInfo?: ShardingParams,
wakuGossipSub?: PubsubService["pubsub"],
options?: Partial<CreateLibp2pOptions>,
userAgent?: string

View File

@ -1,4 +1,5 @@
import { DecodedMessage, DefaultPubsubTopic } from "@waku/core";
import { DecodedMessage } from "@waku/core";
import { DefaultPubsubTopic } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
import { AssertionError, expect } from "chai";
@ -103,6 +104,49 @@ export class MessageCollector {
}
}
async waitForMessagesAutosharding(
numMessages: number,
options?: {
contentTopic: string;
timeoutDuration?: number;
exact?: boolean;
}
): Promise<boolean> {
const startTime = Date.now();
const timeoutDuration = options?.timeoutDuration || 400;
const exact = options?.exact || false;
while (this.count < numMessages) {
if (this.nwaku) {
try {
this.list = await this.nwaku.messagesAutosharding(
options!.contentTopic
);
} catch (error) {
log.error(`Can't retrieve messages because of ${error}`);
await delay(10);
}
}
if (Date.now() - startTime > timeoutDuration * numMessages) {
return false;
}
await delay(10);
}
if (exact) {
if (this.count == numMessages) {
return true;
} else {
log.warn(`Was expecting exactly ${numMessages} messages`);
return false;
}
} else {
return true;
}
}
// Verifies a received message against expected values.
verifyReceivedMessage(
index: number,

View File

@ -1,7 +1,7 @@
import type { PeerId } from "@libp2p/interface/peer-id";
import { peerIdFromString } from "@libp2p/peer-id";
import { Multiaddr, multiaddr } from "@multiformats/multiaddr";
import { DefaultPubsubTopic } from "@waku/core";
import { DefaultPubsubTopic } from "@waku/interfaces";
import { isDefined } from "@waku/utils";
import { Logger } from "@waku/utils";
import { bytesToHex, hexToBytes } from "@waku/utils/bytes";
@ -216,6 +216,16 @@ export class NimGoNode {
]);
}
async ensureSubscriptionsAutosharding(
contentTopics: string[]
): Promise<boolean> {
this.checkProcess();
return this.rpcCall<boolean>("post_waku_v2_relay_v1_auto_subscriptions", [
contentTopics
]);
}
async sendMessage(
message: MessageRpcQuery,
pubsubTopic: string = DefaultPubsubTopic
@ -232,6 +242,18 @@ export class NimGoNode {
]);
}
async sendMessageAutosharding(message: MessageRpcQuery): Promise<boolean> {
this.checkProcess();
if (typeof message.timestamp === "undefined") {
message.timestamp = BigInt(new Date().valueOf()) * OneMillion;
}
return this.rpcCall<boolean>("post_waku_v2_relay_v1_auto_message", [
message
]);
}
async messages(
pubsubTopic: string = DefaultPubsubTopic
): Promise<MessageRpcResponse[]> {
@ -245,6 +267,19 @@ export class NimGoNode {
return msgs.filter(isDefined);
}
async messagesAutosharding(
contentTopic: string
): Promise<MessageRpcResponse[]> {
this.checkProcess();
const msgs = await this.rpcCall<MessageRpcResponse[]>(
"get_waku_v2_relay_v1_auto_messages",
[contentTopic]
);
return msgs.filter(isDefined);
}
async getAsymmetricKeyPair(): Promise<KeyPair> {
this.checkProcess();

View File

@ -1,5 +1,6 @@
import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core";
import type {
ContentTopicInfo,
IFilterSubscription,
LightNode,
ShardInfo,
@ -7,6 +8,7 @@ import type {
} from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import {
contentTopicToPubsubTopic,
pubsubTopicToSingleShardInfo,
singleShardInfoToPubsubTopic
} from "@waku/utils";
@ -178,3 +180,177 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () {
}
});
});
describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () {
// Set the timeout for all tests in this suite. Can be overwritten at test level
this.timeout(30000);
let waku: LightNode;
let nwaku: NimGoNode;
let nwaku2: NimGoNode;
let subscription: IFilterSubscription;
let messageCollector: MessageCollector;
const customContentTopic1 = "/waku/2/content/utf8";
const customContentTopic2 = "/myapp/1/latest/proto";
const autoshardingPubsubTopic1 = contentTopicToPubsubTopic(
customContentTopic1,
3
);
const autoshardingPubsubTopic2 = contentTopicToPubsubTopic(
customContentTopic2,
3
);
const contentTopicInfo: ContentTopicInfo = {
clusterId: 3,
contentTopics: [customContentTopic1, customContentTopic2]
};
const customEncoder1 = createEncoder({
contentTopic: customContentTopic1,
pubsubTopicShardInfo: {
clusterId: 3
}
});
const customDecoder1 = createDecoder(customContentTopic1, { clusterId: 3 });
const customEncoder2 = createEncoder({
contentTopic: customContentTopic2,
pubsubTopicShardInfo: {
clusterId: 3
}
});
const customDecoder2 = createDecoder(customContentTopic2, { clusterId: 3 });
this.beforeEach(async function () {
this.timeout(15000);
[nwaku, waku] = await runNodes(
this,
[autoshardingPubsubTopic1, autoshardingPubsubTopic2],
contentTopicInfo
);
subscription = await waku.filter.createSubscription(
pubsubTopicToSingleShardInfo(autoshardingPubsubTopic1)
);
messageCollector = new MessageCollector();
});
this.afterEach(async function () {
this.timeout(15000);
await tearDownNodes([nwaku, nwaku2], waku);
});
it("Subscribe and receive messages on autosharded pubsubtopic", async function () {
await subscription.subscribe([customDecoder1], messageCollector.callback);
await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") });
expect(
await messageCollector.waitForMessagesAutosharding(1, {
contentTopic: customContentTopic1
})
).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedContentTopic: customContentTopic1,
expectedPubsubTopic: autoshardingPubsubTopic1,
expectedMessageText: "M1"
});
});
it("Subscribe and receive messages on 2 different pubsubtopics", async function () {
await subscription.subscribe([customDecoder1], messageCollector.callback);
// Subscribe from the same lightnode to the 2nd pubsubtopic
const subscription2 = await waku.filter.createSubscription(
pubsubTopicToSingleShardInfo(autoshardingPubsubTopic2)
);
const messageCollector2 = new MessageCollector();
await subscription2.subscribe([customDecoder2], messageCollector2.callback);
await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") });
await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") });
expect(
await messageCollector.waitForMessagesAutosharding(1, {
contentTopic: customContentTopic1
})
).to.eq(true);
expect(
await messageCollector2.waitForMessagesAutosharding(1, {
contentTopic: customContentTopic2
})
).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedContentTopic: customContentTopic1,
expectedPubsubTopic: autoshardingPubsubTopic1,
expectedMessageText: "M1"
});
messageCollector2.verifyReceivedMessage(0, {
expectedContentTopic: customContentTopic2,
expectedPubsubTopic: autoshardingPubsubTopic2,
expectedMessageText: "M2"
});
});
it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () {
await subscription.subscribe([customDecoder1], messageCollector.callback);
// Set up and start a new nwaku node with customPubsubTopic1
nwaku2 = new NimGoNode(makeLogFileName(this) + "2");
await nwaku2.start({
filter: true,
lightpush: true,
relay: true,
pubsubTopic: [autoshardingPubsubTopic2]
});
await waku.dial(await nwaku2.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
// Subscribe from the same lightnode to the new nwaku on the new pubsubtopic
const subscription2 = await waku.filter.createSubscription(
pubsubTopicToSingleShardInfo(autoshardingPubsubTopic2),
await nwaku2.getPeerId()
);
await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]);
const messageCollector2 = new MessageCollector();
await subscription2.subscribe([customDecoder2], messageCollector2.callback);
// Making sure that messages are send and reveiced for both subscriptions
// While loop is done because of https://github.com/waku-org/js-waku/issues/1606
while (
!(await messageCollector.waitForMessagesAutosharding(1, {
contentTopic: customContentTopic1
})) ||
!(await messageCollector2.waitForMessagesAutosharding(1, {
contentTopic: customContentTopic2
}))
) {
await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") });
await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") });
}
messageCollector.verifyReceivedMessage(0, {
expectedContentTopic: customContentTopic1,
expectedPubsubTopic: autoshardingPubsubTopic1,
expectedMessageText: "M1"
});
messageCollector2.verifyReceivedMessage(0, {
expectedContentTopic: customContentTopic2,
expectedPubsubTopic: autoshardingPubsubTopic2,
expectedMessageText: "M2"
});
});
it("Should fail to subscribe with decoder with wrong pubsubTopic", async function () {
// this subscription object is set up with the `customPubsubTopic` but we're passing it a Decoder with the `DefaultPubsubTopic`
try {
await subscription.subscribe([customDecoder2], messageCollector.callback);
} catch (error) {
expect((error as Error).message).to.include(
"Pubsub topic not configured"
);
}
});
});

View File

@ -1,4 +1,4 @@
import { DefaultPubsubTopic } from "@waku/core";
import { DefaultPubsubTopic } from "@waku/interfaces";
import type { IFilterSubscription, LightNode } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";

View File

@ -1,5 +1,6 @@
import { DefaultPubsubTopic, waitForRemotePeer } from "@waku/core";
import { waitForRemotePeer } from "@waku/core";
import type { IFilterSubscription, LightNode } from "@waku/interfaces";
import { DefaultPubsubTopic } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";

View File

@ -1,10 +1,6 @@
import {
createDecoder,
createEncoder,
DefaultPubsubTopic,
waitForRemotePeer
} from "@waku/core";
import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core";
import type { IFilterSubscription, LightNode } from "@waku/interfaces";
import { DefaultPubsubTopic } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import {
ecies,

View File

@ -1,5 +1,6 @@
import { createDecoder, createEncoder, DefaultPubsubTopic } from "@waku/core";
import { createDecoder, createEncoder } from "@waku/core";
import type { IFilterSubscription, LightNode } from "@waku/interfaces";
import { DefaultPubsubTopic } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";

View File

@ -1,14 +1,10 @@
import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core";
import {
createDecoder,
createEncoder,
DefaultPubsubTopic,
waitForRemotePeer
} from "@waku/core";
import {
IFilterSubscription,
LightNode,
Protocols,
ShardInfo
ShardingParams
} from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { Logger } from "@waku/utils";
@ -50,7 +46,7 @@ export async function runNodes(
context: Context,
//TODO: change this to use `ShardInfo` instead of `string[]`
pubsubTopics: string[],
shardInfo?: ShardInfo
shardInfo?: ShardingParams
): Promise<[NimGoNode, LightNode]> {
const nwaku = new NimGoNode(makeLogFileName(context));

View File

@ -1,5 +1,10 @@
import { createEncoder, DefaultPubsubTopic } from "@waku/core";
import { IRateLimitProof, LightNode, SendError } from "@waku/interfaces";
import { createEncoder } from "@waku/core";
import {
DefaultPubsubTopic,
IRateLimitProof,
LightNode,
SendError
} from "@waku/interfaces";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";

View File

@ -1,13 +1,17 @@
import type { PeerId } from "@libp2p/interface/peer-id";
import { createEncoder, waitForRemotePeer } from "@waku/core";
import {
ContentTopicInfo,
LightNode,
Protocols,
SendResult,
ShardInfo,
SingleShardInfo
} from "@waku/interfaces";
import { singleShardInfoToPubsubTopic } from "@waku/utils";
import {
contentTopicToPubsubTopic,
singleShardInfoToPubsubTopic
} from "@waku/utils";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
@ -116,7 +120,7 @@ describe("Waku Light Push : Multiple PubsubTopics", function () {
messageCollector2.verifyReceivedMessage(0, {
expectedMessageText: "M2",
expectedContentTopic: customContentTopic2,
expectedPubsubTopic: customPubsubTopic2
expectedPubsubTopic: customPubsubTopic1
});
});
@ -169,3 +173,161 @@ describe("Waku Light Push : Multiple PubsubTopics", function () {
});
});
});
describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () {
this.timeout(30000);
let waku: LightNode;
let nwaku: NimGoNode;
let nwaku2: NimGoNode;
let messageCollector: MessageCollector;
// When using lightpush, we have to use a cluster id of 1 because that is the default cluster id for autosharding
// With a different cluster id, we never find a viable peer
const clusterId = 1;
const customContentTopic1 = "/waku/2/content/utf8";
const customContentTopic2 = "/myapp/1/latest/proto";
const autoshardingPubsubTopic1 = contentTopicToPubsubTopic(
customContentTopic1,
clusterId
);
const autoshardingPubsubTopic2 = contentTopicToPubsubTopic(
customContentTopic2,
clusterId
);
const contentTopicInfo: ContentTopicInfo = {
clusterId,
contentTopics: [customContentTopic1, customContentTopic2]
};
const customEncoder1 = createEncoder({
contentTopic: customContentTopic1,
pubsubTopicShardInfo: {
clusterId
}
});
const customEncoder2 = createEncoder({
contentTopic: customContentTopic2,
pubsubTopicShardInfo: { clusterId }
});
let nimPeerId: PeerId;
this.beforeEach(async function () {
this.timeout(15000);
[nwaku, waku] = await runNodes(
this,
[autoshardingPubsubTopic1, autoshardingPubsubTopic2],
contentTopicInfo
);
messageCollector = new MessageCollector(nwaku);
nimPeerId = await nwaku.getPeerId();
});
this.afterEach(async function () {
this.timeout(15000);
await tearDownNodes([nwaku, nwaku2], waku);
});
it("Push message on custom pubsubTopic", async function () {
const pushResponse = await waku.lightPush.send(customEncoder1, {
payload: utf8ToBytes(messageText)
});
expect(pushResponse.errors).to.be.empty;
expect(pushResponse.recipients[0].toString()).to.eq(nimPeerId.toString());
expect(
await messageCollector.waitForMessagesAutosharding(1, {
contentTopic: customContentTopic1
})
).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: messageText,
expectedContentTopic: customContentTopic1
});
});
it("Subscribe and receive messages on 2 different pubsubtopics", async function () {
const pushResponse1 = await waku.lightPush.send(customEncoder1, {
payload: utf8ToBytes("M1")
});
const pushResponse2 = await waku.lightPush.send(customEncoder2, {
payload: utf8ToBytes("M2")
});
expect(pushResponse1.recipients[0].toString()).to.eq(nimPeerId.toString());
expect(pushResponse2.recipients[0].toString()).to.eq(nimPeerId.toString());
const messageCollector2 = new MessageCollector(nwaku);
expect(
await messageCollector.waitForMessagesAutosharding(1, {
contentTopic: customContentTopic1
})
).to.eq(true);
expect(
await messageCollector2.waitForMessagesAutosharding(1, {
contentTopic: customContentTopic2
})
).to.eq(true);
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: customContentTopic1,
expectedPubsubTopic: autoshardingPubsubTopic1
});
messageCollector2.verifyReceivedMessage(0, {
expectedMessageText: "M2",
expectedContentTopic: customContentTopic2,
expectedPubsubTopic: autoshardingPubsubTopic2
});
});
it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () {
// Set up and start a new nwaku node with Default PubsubTopic
nwaku2 = new NimGoNode(makeLogFileName(this) + "2");
await nwaku2.start({
filter: true,
lightpush: true,
relay: true,
pubsubTopic: [autoshardingPubsubTopic2]
});
await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]);
await waku.dial(await nwaku2.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.LightPush]);
const messageCollector2 = new MessageCollector(nwaku2);
let pushResponse1: SendResult;
let pushResponse2: SendResult;
// Making sure that we send messages to both nwaku nodes
// While loop is done because of https://github.com/waku-org/js-waku/issues/1606
while (
!(await messageCollector.waitForMessagesAutosharding(1, {
contentTopic: customContentTopic1
})) ||
!(await messageCollector2.waitForMessagesAutosharding(1, {
contentTopic: customContentTopic2
})) ||
pushResponse1!.recipients[0].toString() ===
pushResponse2!.recipients[0].toString()
) {
pushResponse1 = await waku.lightPush.send(customEncoder1, {
payload: utf8ToBytes("M1")
});
pushResponse2 = await waku.lightPush.send(customEncoder2, {
payload: utf8ToBytes("M2")
});
}
messageCollector.verifyReceivedMessage(0, {
expectedMessageText: "M1",
expectedContentTopic: customContentTopic1,
expectedPubsubTopic: autoshardingPubsubTopic1
});
messageCollector2.verifyReceivedMessage(0, {
expectedMessageText: "M2",
expectedContentTopic: customContentTopic2,
expectedPubsubTopic: autoshardingPubsubTopic2
});
});
});

View File

@ -1,9 +1,10 @@
import { createEncoder, waitForRemotePeer } from "@waku/core";
import {
createEncoder,
DefaultPubsubTopic,
waitForRemotePeer
} from "@waku/core";
import { LightNode, Protocols, ShardInfo } from "@waku/interfaces";
LightNode,
Protocols,
ShardingParams
} from "@waku/interfaces";
import { createLightNode, utf8ToBytes } from "@waku/sdk";
import { Logger } from "@waku/utils";
@ -19,7 +20,7 @@ export const messagePayload = { payload: utf8ToBytes(messageText) };
export async function runNodes(
context: Mocha.Context,
pubsubTopics: string[],
shardInfo?: ShardInfo
shardInfo?: ShardingParams
): Promise<[NimGoNode, LightNode]> {
const nwaku = new NimGoNode(makeLogFileName(context));
await nwaku.start(
@ -44,6 +45,13 @@ export async function runNodes(
if (waku) {
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.LightPush]);
if (
shardInfo &&
"contentTopics" in shardInfo &&
shardInfo.contentTopics.length > 0
) {
await nwaku.ensureSubscriptionsAutosharding(shardInfo.contentTopics);
}
await nwaku.ensureSubscriptions(pubsubTopics);
return [nwaku, waku];
} else {

View File

@ -1,11 +1,6 @@
import type { PeerId } from "@libp2p/interface/peer-id";
import {
DecodedMessage,
DefaultPubsubTopic,
waitForRemotePeer
} from "@waku/core";
import { RelayNode } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { DecodedMessage, waitForRemotePeer } from "@waku/core";
import { DefaultPubsubTopic, Protocols, RelayNode } from "@waku/interfaces";
import { createRelayNode } from "@waku/sdk";
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";

View File

@ -4,10 +4,18 @@ import {
DecodedMessage,
waitForRemotePeer
} from "@waku/core";
import { RelayNode, ShardInfo, SingleShardInfo } from "@waku/interfaces";
import {
ContentTopicInfo,
RelayNode,
ShardInfo,
SingleShardInfo
} from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { createRelayNode } from "@waku/sdk";
import { singleShardInfoToPubsubTopic } from "@waku/utils";
import {
contentTopicToPubsubTopic,
singleShardInfoToPubsubTopic
} from "@waku/utils";
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
@ -305,3 +313,328 @@ describe("Waku Relay, multiple pubsub topics", function () {
expect(waku2ReceivedMsg.pubsubTopic).to.eq(customPubsubTopic1);
});
});
describe("Waku Relay (Autosharding), multiple pubsub topics", function () {
this.timeout(15000);
let waku1: RelayNode;
let waku2: RelayNode;
let waku3: RelayNode;
const customContentTopic1 = "/waku/2/content/utf8";
const customContentTopic2 = "/myapp/1/latest/proto";
const autoshardingPubsubTopic1 = contentTopicToPubsubTopic(
customContentTopic1,
3
);
const autoshardingPubsubTopic2 = contentTopicToPubsubTopic(
customContentTopic2,
3
);
const contentTopicInfo1: ContentTopicInfo = {
clusterId: 3,
contentTopics: [customContentTopic1]
};
const contentTopicInfo2: ContentTopicInfo = {
clusterId: 3,
contentTopics: [customContentTopic2]
};
const customEncoder1 = createEncoder({
contentTopic: customContentTopic1,
pubsubTopicShardInfo: {
clusterId: 3
}
});
const customDecoder1 = createDecoder(customContentTopic1, { clusterId: 3 });
const customEncoder2 = createEncoder({
contentTopic: customContentTopic2,
pubsubTopicShardInfo: { clusterId: 3 }
});
const customDecoder2 = createDecoder(customContentTopic2, { clusterId: 3 });
const contentTopicInfoBothShards: ContentTopicInfo = {
clusterId: 3,
contentTopics: [customContentTopic1, customContentTopic2]
};
afterEach(async function () {
this.timeout(15000);
await tearDownNodes([], [waku1, waku2, waku3]);
});
[
{
pubsub: autoshardingPubsubTopic1,
shardInfo: contentTopicInfo1,
encoder: customEncoder1,
decoder: customDecoder1
},
{
pubsub: autoshardingPubsubTopic2,
shardInfo: contentTopicInfo2,
encoder: customEncoder2,
decoder: customDecoder2
}
].forEach((testItem) => {
it(`3 nodes on ${testItem.pubsub} topic`, async function () {
const [msgCollector1, msgCollector2, msgCollector3] = Array(3)
.fill(null)
.map(() => new MessageCollector());
[waku1, waku2, waku3] = await Promise.all([
createRelayNode({
shardInfo: testItem.shardInfo,
staticNoiseKey: NOISE_KEY_1
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
shardInfo: testItem.shardInfo,
staticNoiseKey: NOISE_KEY_2,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
shardInfo: testItem.shardInfo,
staticNoiseKey: NOISE_KEY_3
}).then((waku) => waku.start().then(() => waku))
]);
await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, {
multiaddrs: waku2.libp2p.getMultiaddrs()
});
await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, {
multiaddrs: waku2.libp2p.getMultiaddrs()
});
await Promise.all([
waku1.dial(waku2.libp2p.peerId),
waku3.dial(waku2.libp2p.peerId)
]);
await Promise.all([
waitForRemotePeer(waku1, [Protocols.Relay]),
waitForRemotePeer(waku2, [Protocols.Relay]),
waitForRemotePeer(waku3, [Protocols.Relay])
]);
await waku1.relay.subscribe([testItem.decoder], msgCollector1.callback);
await waku2.relay.subscribe([testItem.decoder], msgCollector2.callback);
await waku3.relay.subscribe([testItem.decoder], msgCollector3.callback);
// The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network
const relayResponse1 = await waku1.relay.send(testItem.encoder, {
payload: utf8ToBytes("M1")
});
const relayResponse2 = await waku2.relay.send(testItem.encoder, {
payload: utf8ToBytes("M2")
});
const relayResponse3 = await waku3.relay.send(testItem.encoder, {
payload: utf8ToBytes("M3")
});
expect(relayResponse1.recipients[0].toString()).to.eq(
waku2.libp2p.peerId.toString()
);
expect(relayResponse3.recipients[0].toString()).to.eq(
waku2.libp2p.peerId.toString()
);
expect(relayResponse2.recipients.map((r) => r.toString())).to.include(
waku1.libp2p.peerId.toString()
);
expect(relayResponse2.recipients.map((r) => r.toString())).to.include(
waku3.libp2p.peerId.toString()
);
expect(
await msgCollector1.waitForMessagesAutosharding(2, {
contentTopic: testItem.encoder.contentTopic,
exact: true
})
).to.eq(true);
expect(
await msgCollector2.waitForMessagesAutosharding(2, {
contentTopic: testItem.encoder.contentTopic,
exact: true
})
).to.eq(true);
expect(
await msgCollector3.waitForMessagesAutosharding(2, {
contentTopic: testItem.encoder.contentTopic,
exact: true
})
).to.eq(true);
expect(
msgCollector1.hasMessage(testItem.encoder.contentTopic, "M2")
).to.eq(true);
expect(
msgCollector1.hasMessage(testItem.encoder.contentTopic, "M3")
).to.eq(true);
expect(
msgCollector2.hasMessage(testItem.encoder.contentTopic, "M1")
).to.eq(true);
expect(
msgCollector2.hasMessage(testItem.encoder.contentTopic, "M3")
).to.eq(true);
expect(
msgCollector3.hasMessage(testItem.encoder.contentTopic, "M1")
).to.eq(true);
expect(
msgCollector3.hasMessage(testItem.encoder.contentTopic, "M2")
).to.eq(true);
});
});
it("Nodes with multiple pubsub topic", async function () {
const [msgCollector1, msgCollector2, msgCollector3] = Array(3)
.fill(null)
.map(() => new MessageCollector());
// Waku1 and waku2 are using multiple pubsub topis
[waku1, waku2, waku3] = await Promise.all([
createRelayNode({
shardInfo: contentTopicInfoBothShards,
staticNoiseKey: NOISE_KEY_1
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
shardInfo: contentTopicInfoBothShards,
staticNoiseKey: NOISE_KEY_2,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
shardInfo: contentTopicInfo1,
staticNoiseKey: NOISE_KEY_3
}).then((waku) => waku.start().then(() => waku))
]);
await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, {
multiaddrs: waku2.libp2p.getMultiaddrs()
});
await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, {
multiaddrs: waku2.libp2p.getMultiaddrs()
});
await Promise.all([
waku1.dial(waku2.libp2p.peerId),
waku3.dial(waku2.libp2p.peerId)
]);
await Promise.all([
waitForRemotePeer(waku1, [Protocols.Relay]),
waitForRemotePeer(waku2, [Protocols.Relay]),
waitForRemotePeer(waku3, [Protocols.Relay])
]);
await waku1.relay.subscribe(
[customDecoder1, customDecoder2],
msgCollector1.callback
);
await waku2.relay.subscribe(
[customDecoder1, customDecoder2],
msgCollector2.callback
);
await waku3.relay.subscribe([customDecoder1], msgCollector3.callback);
// The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network
// However onlt waku1 and waku2 are receiving messages on the CustomPubSubTopic
await waku1.relay.send(customEncoder1, { payload: utf8ToBytes("M1") });
await waku1.relay.send(customEncoder2, { payload: utf8ToBytes("M2") });
await waku2.relay.send(customEncoder1, { payload: utf8ToBytes("M3") });
await waku2.relay.send(customEncoder2, { payload: utf8ToBytes("M4") });
await waku3.relay.send(customEncoder1, { payload: utf8ToBytes("M5") });
await waku3.relay.send(customEncoder2, { payload: utf8ToBytes("M6") });
expect(
await msgCollector1.waitForMessagesAutosharding(3, {
contentTopic: customContentTopic1,
exact: true
})
).to.eq(true);
expect(
await msgCollector1.waitForMessagesAutosharding(3, {
contentTopic: customContentTopic2,
exact: true
})
).to.eq(true);
expect(
await msgCollector2.waitForMessagesAutosharding(3, {
contentTopic: customContentTopic1,
exact: true
})
).to.eq(true);
expect(
await msgCollector2.waitForMessagesAutosharding(3, {
contentTopic: customContentTopic2,
exact: true
})
).to.eq(true);
expect(
await msgCollector3.waitForMessagesAutosharding(2, {
contentTopic: customContentTopic1,
exact: true
})
).to.eq(true);
expect(msgCollector1.hasMessage(customContentTopic1, "M3")).to.eq(true);
expect(msgCollector1.hasMessage(customContentTopic2, "M4")).to.eq(true);
expect(msgCollector1.hasMessage(customContentTopic1, "M5")).to.eq(true);
expect(msgCollector2.hasMessage(customContentTopic1, "M1")).to.eq(true);
expect(msgCollector2.hasMessage(customContentTopic2, "M2")).to.eq(true);
expect(msgCollector2.hasMessage(customContentTopic1, "M5")).to.eq(true);
expect(msgCollector3.hasMessage(customContentTopic1, "M1")).to.eq(true);
expect(msgCollector3.hasMessage(customContentTopic1, "M3")).to.eq(true);
});
it("n1 and n2 uses a custom pubsub, n3 uses the default pubsub", async function () {
[waku1, waku2, waku3] = await Promise.all([
createRelayNode({
shardInfo: contentTopicInfo1,
staticNoiseKey: NOISE_KEY_1
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
shardInfo: contentTopicInfo1,
staticNoiseKey: NOISE_KEY_2,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
staticNoiseKey: NOISE_KEY_3
}).then((waku) => waku.start().then(() => waku))
]);
await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, {
multiaddrs: waku2.libp2p.getMultiaddrs()
});
await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, {
multiaddrs: waku2.libp2p.getMultiaddrs()
});
await Promise.all([
waku1.dial(waku2.libp2p.peerId),
waku3.dial(waku2.libp2p.peerId)
]);
await Promise.all([
waitForRemotePeer(waku1, [Protocols.Relay]),
waitForRemotePeer(waku2, [Protocols.Relay])
]);
const messageText = "Communicating using a custom pubsub topic";
const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve) => {
void waku2.relay.subscribe([customDecoder1], resolve);
}
);
// The promise **fails** if we receive a message on the default
// pubsub topic.
const waku3NoMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve, reject) => {
void waku3.relay.subscribe([TestDecoder], reject);
setTimeout(resolve, 1000);
}
);
await waku1.relay.send(customEncoder1, {
payload: utf8ToBytes(messageText)
});
const waku2ReceivedMsg = await waku2ReceivedMsgPromise;
await waku3NoMsgPromise;
expect(bytesToUtf8(waku2ReceivedMsg.payload!)).to.eq(messageText);
expect(waku2ReceivedMsg.pubsubTopic).to.eq(autoshardingPubsubTopic1);
});
});

View File

@ -1,5 +1,5 @@
import { createDecoder, createEncoder, DefaultPubsubTopic } from "@waku/core";
import { RelayNode } from "@waku/interfaces";
import { createDecoder, createEncoder } from "@waku/core";
import { DefaultPubsubTopic, RelayNode } from "@waku/interfaces";
import { createRelayNode } from "@waku/sdk";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";

View File

@ -1,8 +1,17 @@
import { bootstrap } from "@libp2p/bootstrap";
import type { PeerId } from "@libp2p/interface/peer-id";
import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange";
import { createLightNode, LightNode, ShardInfo, Tags } from "@waku/sdk";
import { singleShardInfoToPubsubTopic } from "@waku/utils";
import {
ContentTopicInfo,
createLightNode,
LightNode,
ShardInfo,
Tags
} from "@waku/sdk";
import {
contentTopicToPubsubTopic,
singleShardInfoToPubsubTopic
} from "@waku/utils";
import chai, { expect } from "chai";
import chaiAsPromised from "chai-as-promised";
import Sinon, { SinonSpy } from "sinon";
@ -184,3 +193,176 @@ describe("Static Sharding: Peer Management", function () {
});
});
});
describe("Autosharding: Peer Management", function () {
const ContentTopic = "/waku/2/content/test.js";
describe("Peer Exchange", function () {
let waku: LightNode;
let nwaku1: NimGoNode;
let nwaku2: NimGoNode;
let nwaku3: NimGoNode;
let dialPeerSpy: SinonSpy;
beforeEach(async function () {
this.timeout(15000);
nwaku1 = new NimGoNode(makeLogFileName(this) + "1_auto");
nwaku2 = new NimGoNode(makeLogFileName(this) + "2_auto");
nwaku3 = new NimGoNode(makeLogFileName(this) + "3_auto");
});
afterEach(async function () {
this.timeout(15000);
await tearDownNodes([nwaku1, nwaku2, nwaku3], waku);
dialPeerSpy && dialPeerSpy.restore();
});
it("all px service nodes subscribed to the shard topic should be dialed", async function () {
this.timeout(100_000);
const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, 1)];
const contentTopicInfo: ContentTopicInfo = {
clusterId: 1,
contentTopics: [ContentTopic]
};
await nwaku1.start({
pubsubTopic: pubsubTopics,
discv5Discovery: true,
peerExchange: true,
relay: true
});
const enr1 = (await nwaku1.info()).enrUri;
await nwaku2.start({
pubsubTopic: pubsubTopics,
discv5Discovery: true,
peerExchange: true,
discv5BootstrapNode: enr1,
relay: true
});
const enr2 = (await nwaku2.info()).enrUri;
await nwaku3.start({
pubsubTopic: pubsubTopics,
discv5Discovery: true,
peerExchange: true,
discv5BootstrapNode: enr2,
relay: true
});
const nwaku3Ma = await nwaku3.getMultiaddrWithId();
waku = await createLightNode({
shardInfo: contentTopicInfo,
libp2p: {
peerDiscovery: [
bootstrap({ list: [nwaku3Ma.toString()] }),
wakuPeerExchangeDiscovery()
]
}
});
await waku.start();
dialPeerSpy = Sinon.spy((waku as any).connectionManager, "dialPeer");
const pxPeersDiscovered = new Set<PeerId>();
await new Promise<void>((resolve) => {
waku.libp2p.addEventListener("peer:discovery", (evt) => {
return void (async () => {
const peerId = evt.detail.id;
const peer = await waku.libp2p.peerStore.get(peerId);
const tags = Array.from(peer.tags.keys());
if (tags.includes(Tags.PEER_EXCHANGE)) {
pxPeersDiscovered.add(peerId);
if (pxPeersDiscovered.size === 2) {
resolve();
}
}
})();
});
});
await delay(1000);
expect(dialPeerSpy.callCount).to.equal(3);
});
it("px service nodes not subscribed to the shard should not be dialed", async function () {
this.timeout(100_000);
const pubsubTopicsToDial = [contentTopicToPubsubTopic(ContentTopic, 1)];
const contentTopicInfoToDial: ContentTopicInfo = {
clusterId: 1,
contentTopics: [ContentTopic]
};
const pubsubTopicsToIgnore = [contentTopicToPubsubTopic(ContentTopic, 2)];
// this service node is not subscribed to the shard
await nwaku1.start({
pubsubTopic: pubsubTopicsToIgnore,
relay: true,
discv5Discovery: true,
peerExchange: true
});
const enr1 = (await nwaku1.info()).enrUri;
await nwaku2.start({
pubsubTopic: pubsubTopicsToDial,
relay: true,
discv5Discovery: true,
peerExchange: true,
discv5BootstrapNode: enr1
});
const enr2 = (await nwaku2.info()).enrUri;
await nwaku3.start({
relay: true,
discv5Discovery: true,
peerExchange: true,
discv5BootstrapNode: enr2
});
const nwaku3Ma = await nwaku3.getMultiaddrWithId();
waku = await createLightNode({
shardInfo: contentTopicInfoToDial,
libp2p: {
peerDiscovery: [
bootstrap({ list: [nwaku3Ma.toString()] }),
wakuPeerExchangeDiscovery()
]
}
});
dialPeerSpy = Sinon.spy((waku as any).connectionManager, "dialPeer");
await waku.start();
const pxPeersDiscovered = new Set<PeerId>();
await new Promise<void>((resolve) => {
waku.libp2p.addEventListener("peer:discovery", (evt) => {
return void (async () => {
const peerId = evt.detail.id;
const peer = await waku.libp2p.peerStore.get(peerId);
const tags = Array.from(peer.tags.keys());
if (tags.includes(Tags.PEER_EXCHANGE)) {
pxPeersDiscovered.add(peerId);
if (pxPeersDiscovered.size === 1) {
resolve();
}
}
})();
});
});
await delay(1000);
expect(dialPeerSpy.callCount).to.equal(2);
});
});
});

View File

@ -20,6 +20,7 @@ const shardInfoBothShards: ShardInfo = { clusterId: 0, shards: [2, 3] };
const singleShardInfo1: SingleShardInfo = { clusterId: 0, shard: 2 };
const singleShardInfo2: SingleShardInfo = { clusterId: 0, shard: 3 };
const ContentTopic = "/waku/2/content/test.js";
const ContentTopic2 = "/myapp/1/latest/proto";
describe("Static Sharding: Running Nodes", () => {
let waku: LightNode;
@ -93,3 +94,51 @@ describe("Static Sharding: Running Nodes", () => {
}
});
});
describe("Autosharding: Running Nodes", () => {
let waku: LightNode;
let nwaku: NimGoNode;
beforeEach(async function () {
this.timeout(15_000);
nwaku = new NimGoNode(makeLogFileName(this));
await nwaku.start({ store: true, lightpush: true, relay: true });
});
afterEach(async function () {
this.timeout(15000);
await tearDownNodes(nwaku, waku);
});
it("configure the node with multiple pubsub topics", async function () {
this.timeout(15_000);
waku = await createLightNode({
shardInfo: {
...shardInfoBothShards,
// For autosharding, we configure multiple pubsub topics by using two content topics that hash to different shards
contentTopics: [ContentTopic, ContentTopic2]
}
});
const encoder1 = createEncoder({
contentTopic: ContentTopic,
pubsubTopicShardInfo: { clusterId: 0 }
});
const encoder2 = createEncoder({
contentTopic: ContentTopic,
pubsubTopicShardInfo: { clusterId: 0 }
});
const request1 = await waku.lightPush.send(encoder1, {
payload: utf8ToBytes("Hello World")
});
const request2 = await waku.lightPush.send(encoder2, {
payload: utf8ToBytes("Hello World")
});
expect(request1.recipients.length).to.eq(0);
expect(request2.recipients.length).to.eq(0);
});
});

View File

@ -1,5 +1,6 @@
import { createCursor, DecodedMessage, DefaultPubsubTopic } from "@waku/core";
import { createCursor, DecodedMessage } from "@waku/core";
import type { LightNode } from "@waku/interfaces";
import { DefaultPubsubTopic } from "@waku/interfaces";
import { bytesToUtf8 } from "@waku/utils/bytes";
import { expect } from "chai";

View File

@ -1,4 +1,4 @@
import { DefaultPubsubTopic } from "@waku/core";
import { DefaultPubsubTopic } from "@waku/interfaces";
import { IMessage, type LightNode } from "@waku/interfaces";
import { expect } from "chai";

View File

@ -1,11 +1,6 @@
import {
createDecoder,
DecodedMessage,
DefaultPubsubTopic,
waitForRemotePeer
} from "@waku/core";
import { createDecoder, DecodedMessage, waitForRemotePeer } from "@waku/core";
import type { IMessage, LightNode } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { DefaultPubsubTopic, Protocols } from "@waku/interfaces";
import {
generatePrivateKey,
generateSymmetricKey,

View File

@ -1,6 +1,7 @@
import { waitForRemotePeer } from "@waku/core";
import type { IMessage, LightNode } from "@waku/interfaces";
import { createDecoder, waitForRemotePeer } from "@waku/core";
import type { ContentTopicInfo, IMessage, LightNode } from "@waku/interfaces";
import { createLightNode, Protocols } from "@waku/sdk";
import { contentTopicToPubsubTopic } from "@waku/utils";
import { expect } from "chai";
import {
@ -19,6 +20,7 @@ import {
customShardedPubsubTopic2,
processQueriedMessages,
sendMessages,
sendMessagesAutosharding,
shardInfo1,
shardInfoBothShards,
startAndConnectLightNode,
@ -169,3 +171,153 @@ describe("Waku Store, custom pubsub topic", function () {
}
});
});
describe("Waku Store (Autosharding), custom pubsub topic", function () {
this.timeout(15000);
let waku: LightNode;
let nwaku: NimGoNode;
let nwaku2: NimGoNode;
const customContentTopic1 = "/waku/2/content/utf8";
const customContentTopic2 = "/myapp/1/latest/proto";
const clusterId = 1;
const autoshardingPubsubTopic1 = contentTopicToPubsubTopic(
customContentTopic1,
clusterId
);
const autoshardingPubsubTopic2 = contentTopicToPubsubTopic(
customContentTopic2,
clusterId
);
const contentTopicInfo1: ContentTopicInfo = {
clusterId,
contentTopics: [customContentTopic1]
};
const customDecoder1 = createDecoder(customContentTopic1, {
clusterId
});
const customDecoder2 = createDecoder(customContentTopic2, {
clusterId
});
const contentTopicInfoBothShards: ContentTopicInfo = {
clusterId,
contentTopics: [customContentTopic1, customContentTopic2]
};
beforeEach(async function () {
this.timeout(15000);
nwaku = new NimGoNode(makeLogFileName(this));
await nwaku.start({
store: true,
pubsubTopic: [autoshardingPubsubTopic1, autoshardingPubsubTopic2],
relay: true
});
await nwaku.ensureSubscriptionsAutosharding([
customContentTopic1,
customContentTopic2
]);
});
afterEach(async function () {
this.timeout(15000);
await tearDownNodes([nwaku, nwaku2], waku);
});
it("Generator, custom pubsub topic", async function () {
await sendMessagesAutosharding(nwaku, totalMsgs, customContentTopic1);
waku = await startAndConnectLightNode(nwaku, [], contentTopicInfo1);
const messages = await processQueriedMessages(
waku,
[customDecoder1],
autoshardingPubsubTopic1
);
expect(messages?.length).eq(totalMsgs);
const result = messages?.findIndex((msg) => {
return msg.payload![0]! === 0;
});
expect(result).to.not.eq(-1);
});
it("Generator, 2 different pubsubtopics", async function () {
this.timeout(10000);
const totalMsgs = 10;
await sendMessagesAutosharding(nwaku, totalMsgs, customContentTopic1);
await sendMessagesAutosharding(nwaku, totalMsgs, customContentTopic2);
waku = await startAndConnectLightNode(
nwaku,
[],
contentTopicInfoBothShards
);
const customMessages = await processQueriedMessages(
waku,
[customDecoder1],
autoshardingPubsubTopic1
);
expect(customMessages?.length).eq(totalMsgs);
const result1 = customMessages?.findIndex((msg) => {
return msg.payload![0]! === 0;
});
expect(result1).to.not.eq(-1);
const testMessages = await processQueriedMessages(
waku,
[customDecoder2],
autoshardingPubsubTopic2
);
expect(testMessages?.length).eq(totalMsgs);
const result2 = testMessages?.findIndex((msg) => {
return msg.payload![0]! === 0;
});
expect(result2).to.not.eq(-1);
});
it("Generator, 2 nwaku nodes each with different pubsubtopics", async function () {
this.timeout(10000);
// Set up and start a new nwaku node with Default Pubsubtopic
nwaku2 = new NimGoNode(makeLogFileName(this) + "2");
await nwaku2.start({
store: true,
pubsubTopic: [autoshardingPubsubTopic2],
relay: true
});
await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]);
const totalMsgs = 10;
await sendMessagesAutosharding(nwaku, totalMsgs, customContentTopic1);
await sendMessagesAutosharding(nwaku2, totalMsgs, customContentTopic2);
waku = await createLightNode({
staticNoiseKey: NOISE_KEY_1,
shardInfo: contentTopicInfoBothShards
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.dial(await nwaku2.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]);
let customMessages: IMessage[] = [];
let testMessages: IMessage[] = [];
while (
customMessages.length != totalMsgs ||
testMessages.length != totalMsgs
) {
customMessages = await processQueriedMessages(
waku,
[customDecoder1],
autoshardingPubsubTopic1
);
testMessages = await processQueriedMessages(
waku,
[customDecoder2],
autoshardingPubsubTopic2
);
}
});
});

View File

@ -1,5 +1,6 @@
import { DecodedMessage, DefaultPubsubTopic, PageDirection } from "@waku/core";
import { DecodedMessage, PageDirection } from "@waku/core";
import type { IMessage, LightNode } from "@waku/interfaces";
import { DefaultPubsubTopic } from "@waku/interfaces";
import { expect } from "chai";
import { makeLogFileName, NimGoNode, tearDownNodes } from "../../src/index.js";

View File

@ -1,4 +1,4 @@
import { DefaultPubsubTopic } from "@waku/core";
import { DefaultPubsubTopic } from "@waku/interfaces";
import type { LightNode } from "@waku/interfaces";
import { expect } from "chai";

View File

@ -1,5 +1,6 @@
import { DecodedMessage, DefaultPubsubTopic, PageDirection } from "@waku/core";
import { DecodedMessage, PageDirection } from "@waku/core";
import type { IMessage, LightNode } from "@waku/interfaces";
import { DefaultPubsubTopic } from "@waku/interfaces";
import { makeLogFileName, NimGoNode, tearDownNodes } from "../../src/index.js";

View File

@ -3,10 +3,15 @@ import {
createEncoder,
DecodedMessage,
Decoder,
DefaultPubsubTopic,
waitForRemotePeer
} from "@waku/core";
import { LightNode, Protocols, ShardInfo } from "@waku/interfaces";
import {
DefaultPubsubTopic,
LightNode,
Protocols,
ShardInfo,
ShardingParams
} from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils";
import { expect } from "chai";
@ -61,6 +66,24 @@ export async function sendMessages(
}
}
export async function sendMessagesAutosharding(
instance: NimGoNode,
numMessages: number,
contentTopic: string
): Promise<void> {
for (let i = 0; i < numMessages; i++) {
expect(
await instance.sendMessageAutosharding(
NimGoNode.toMessageRpcQuery({
payload: new Uint8Array([i]),
contentTopic: contentTopic
})
)
).to.eq(true);
await delay(1); // to ensure each timestamp is unique.
}
}
export async function processQueriedMessages(
instance: LightNode,
decoders: Array<Decoder>,
@ -81,7 +104,7 @@ export async function processQueriedMessages(
export async function startAndConnectLightNode(
instance: NimGoNode,
pubsubTopics: string[] = [DefaultPubsubTopic],
shardInfo?: ShardInfo
shardInfo?: ShardingParams
): Promise<LightNode> {
const waku = await createLightNode({
...((pubsubTopics.length !== 1 ||

View File

@ -1,14 +1,8 @@
import type { PeerStore } from "@libp2p/interface/peer-store";
import type { Peer } from "@libp2p/interface/peer-store";
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import {
createDecoder,
createEncoder,
DefaultPubsubTopic,
waitForRemotePeer
} from "@waku/core";
import { LightNode } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core";
import { DefaultPubsubTopic, LightNode, Protocols } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { toAsyncIterator } from "@waku/utils";
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";

View File

@ -1,6 +1,6 @@
import { DefaultPubsubTopic, waitForRemotePeer } from "@waku/core";
import { waitForRemotePeer } from "@waku/core";
import type { LightNode, RelayNode } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { DefaultPubsubTopic, Protocols } from "@waku/interfaces";
import { createLightNode, createRelayNode } from "@waku/sdk";
import { expect } from "chai";

View File

@ -68,6 +68,7 @@
},
"dependencies": {
"@noble/hashes": "^1.3.2",
"@waku/interfaces": "0.0.20",
"chai": "^4.3.10",
"debug": "^4.3.4",
"uint8arrays": "^4.0.4"
@ -77,7 +78,6 @@
"@rollup/plugin-json": "^6.0.0",
"@rollup/plugin-node-resolve": "^15.2.3",
"@waku/build-utils": "*",
"@waku/interfaces": "0.0.20",
"cspell": "^7.3.2",
"npm-run-all": "^4.1.5",
"rollup": "^4.6.0"

View File

@ -1,6 +1,11 @@
import { expect } from "chai";
import { contentTopicToShardIndex, ensureValidContentTopic } from "./sharding";
import {
contentTopicsByPubsubTopic,
contentTopicToPubsubTopic,
contentTopicToShardIndex,
ensureValidContentTopic
} from "./sharding";
const testInvalidCases = (
contentTopics: string[],
@ -91,10 +96,35 @@ describe("contentTopicToShardIndex", () => {
it("converts content topics to expected shard index", () => {
const contentTopics: [string, number][] = [
["/toychat/2/huilong/proto", 3],
["/myapp/1/latest/proto", 0]
["/myapp/1/latest/proto", 0],
["/waku/2/content/test.js", 1]
];
for (const [topic, shard] of contentTopics) {
expect(contentTopicToShardIndex(topic)).to.eq(shard);
}
});
it("topics with same application and version share the same shard", () => {
const contentTopics: [string, string][] = [
["/toychat/2/huilong/proto", "/toychat/2/othertopic/otherencoding"],
["/myapp/1/latest/proto", "/myapp/1/new/proto"],
["/waku/2/content/test.js", "/waku/2/users/proto"]
];
for (const [topic1, topic2] of contentTopics) {
expect(contentTopicToShardIndex(topic1)).to.eq(
contentTopicToShardIndex(topic2)
);
}
});
});
describe("contentTopicsByPubsubTopic", () => {
it("groups content topics by expected pubsub topic", () => {
const contentTopics = ["/toychat/2/huilong/proto", "/myapp/1/latest/proto"];
const grouped = contentTopicsByPubsubTopic(contentTopics);
for (const contentTopic of contentTopics) {
const pubsubTopic = contentTopicToPubsubTopic(contentTopic);
expect(grouped.get(pubsubTopic)?.includes(contentTopic)).to.be.true;
}
});
});

View File

@ -1,5 +1,10 @@
import { sha256 } from "@noble/hashes/sha256";
import type { PubsubTopic, ShardInfo, SingleShardInfo } from "@waku/interfaces";
import {
DefaultPubsubTopic,
PubsubTopic,
ShardingParams,
SingleShardInfo
} from "@waku/interfaces";
import { concat, utf8ToBytes } from "../bytes/index.js";
@ -13,14 +18,20 @@ export const singleShardInfoToPubsubTopic = (
};
export const shardInfoToPubsubTopics = (
shardInfo: ShardInfo
shardInfo: ShardingParams
): PubsubTopic[] => {
if (shardInfo.clusterId === undefined || shardInfo.shards === undefined)
throw new Error("Invalid shard");
return shardInfo.shards.map(
(index) => `/waku/2/rs/${shardInfo.clusterId}/${index}`
);
if (shardInfo.clusterId === undefined)
throw new Error("Cluster ID must be specified");
if ("contentTopics" in shardInfo) {
return shardInfo.contentTopics.map((contentTopic) =>
contentTopicToPubsubTopic(contentTopic, shardInfo.clusterId)
);
} else {
if (shardInfo.shards === undefined) throw new Error("Invalid shard");
return shardInfo.shards.map(
(index) => `/waku/2/rs/${shardInfo.clusterId}/${index}`
);
}
};
export const pubsubTopicToSingleShardInfo = (
@ -140,3 +151,43 @@ export function contentTopicToPubsubTopic(
const shardIndex = contentTopicToShardIndex(contentTopic, networkShards);
return `/waku/2/rs/${clusterId}/${shardIndex}`;
}
/**
* Given an array of content topics, groups them together by their Pubsub topic as derived using the algorithm for autosharding.
* If any of the content topics are not properly formatted, the function will throw an error.
*/
export function contentTopicsByPubsubTopic(
contentTopics: string[],
clusterId: number = 1,
networkShards: number = 8
): Map<string, Array<string>> {
const groupedContentTopics = new Map();
for (const contentTopic of contentTopics) {
const pubsubTopic = contentTopicToPubsubTopic(
contentTopic,
clusterId,
networkShards
);
let topics = groupedContentTopics.get(pubsubTopic);
if (!topics) {
groupedContentTopics.set(pubsubTopic, []);
topics = groupedContentTopics.get(pubsubTopic);
}
topics.push(contentTopic);
}
return groupedContentTopics;
}
/**
* Used when creating encoders/decoders to determine which pubsub topic to use
*/
export function determinePubsubTopic(
contentTopic: string,
pubsubTopicShardInfo?: SingleShardInfo
): string {
return pubsubTopicShardInfo
? pubsubTopicShardInfo.shard
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: contentTopicToPubsubTopic(contentTopic, pubsubTopicShardInfo.clusterId)
: DefaultPubsubTopic;
}