From ee2d4176f8cca45a51b7dac0009f0eb01952f540 Mon Sep 17 00:00:00 2001 From: Arseniy Klempner Date: Thu, 8 Feb 2024 12:09:10 -0800 Subject: [PATCH] feat: create node and subscription by content topic --- .size-limit.cjs | 4 +- packages/core/src/index.ts | 4 - packages/core/src/lib/wait_for_remote_peer.ts | 2 +- packages/interfaces/src/protocols.ts | 4 +- packages/sdk/src/content_topic.ts | 121 ++++++++++++ packages/sdk/src/create.ts | 12 +- packages/sdk/src/index.ts | 9 +- packages/sdk/src/relay/index.ts | 2 +- packages/{core/src/lib => sdk/src}/waku.ts | 19 +- .../tests/tests/sdk/content_topic.spec.ts | 175 ++++++++++++++++++ packages/tests/tests/waku.node.spec.ts | 9 +- 11 files changed, 329 insertions(+), 32 deletions(-) create mode 100644 packages/sdk/src/content_topic.ts rename packages/{core/src/lib => sdk/src}/waku.ts (92%) create mode 100644 packages/tests/tests/sdk/content_topic.spec.ts diff --git a/.size-limit.cjs b/.size-limit.cjs index 9be27499f..588546904 100644 --- a/.size-limit.cjs +++ b/.size-limit.cjs @@ -1,7 +1,7 @@ module.exports = [ { - name: "Waku core", - path: "packages/core/bundle/index.js", + name: "Waku node", + path: "packages/sdk/bundle/index.js", import: "{ WakuNode }", }, { diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index d20bdc977..a0b837116 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -1,4 +1,3 @@ -export { DefaultUserAgent } from "./lib/waku.js"; export { createEncoder, createDecoder } from "./lib/message/version_0.js"; export type { Encoder, @@ -7,9 +6,6 @@ export type { } from "./lib/message/version_0.js"; export * as message from "./lib/message/index.js"; -export * as waku from "./lib/waku.js"; -export { WakuNode, WakuOptions } from "./lib/waku.js"; - export * as waku_filter from "./lib/filter/index.js"; export { wakuFilter, FilterCodecs } from "./lib/filter/index.js"; diff --git a/packages/core/src/lib/wait_for_remote_peer.ts b/packages/core/src/lib/wait_for_remote_peer.ts index 2ddcd6576..ae5d5cf84 100644 --- a/packages/core/src/lib/wait_for_remote_peer.ts +++ b/packages/core/src/lib/wait_for_remote_peer.ts @@ -8,7 +8,7 @@ const log = new Logger("wait-for-remote-peer"); /** * Wait for a remote peer to be ready given the passed protocols. * Must be used after attempting to connect to nodes, using - * {@link @waku/core!WakuNode.dial} or a bootstrap method with + * {@link @waku/sdk!WakuNode.dial} or a bootstrap method with * {@link @waku/sdk!createLightNode}. * * If the passed protocols is a GossipSub protocol, then it resolves only once diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 444682027..9b3dedfca 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -62,11 +62,11 @@ export type ProtocolCreateOptions = { */ shardInfo?: Partial; /** - * You can pass options to the `Libp2p` instance used by {@link @waku/core!WakuNode} using the `libp2p` property. + * You can pass options to the `Libp2p` instance used by {@link @waku/sdk!WakuNode} using the `libp2p` property. * This property is the same type as the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create) * apart that we made the `modules` property optional and partial, * allowing its omission and letting Waku set good defaults. - * Notes that some values are overridden by {@link @waku/core!WakuNode} to ensure it implements the Waku protocol. + * Notes that some values are overridden by {@link @waku/sdk!WakuNode} to ensure it implements the Waku protocol. */ libp2p?: Partial; /** diff --git a/packages/sdk/src/content_topic.ts b/packages/sdk/src/content_topic.ts new file mode 100644 index 000000000..c4a5c789b --- /dev/null +++ b/packages/sdk/src/content_topic.ts @@ -0,0 +1,121 @@ +import type { Multiaddr } from "@multiformats/multiaddr"; +import { createDecoder, DecodedMessage, waitForRemotePeer } from "@waku/core"; +import { + Callback, + IDecoder, + IFilterSubscription, + LightNode, + Protocols +} from "@waku/interfaces"; +import { + contentTopicToPubsubTopic, + shardInfoToPubsubTopics +} from "@waku/utils"; + +import { createLightNode } from "./create.js"; + +interface CreateTopicOptions { + waku?: LightNode; + peer: Multiaddr; +} + +// Given a Waku node, peer Multiaddr, and content topic, creates a decoder and +// subscription for that content topic. +async function prepareSubscription( + waku: LightNode, + contentTopic: string, + peer: Multiaddr +): Promise<{ + decoder: IDecoder; + subscription: IFilterSubscription; +}> { + // Validate that the Waku node matches assumptions + if (!waku.filter) { + throw new Error("Filter protocol missing from Waku node"); + } + const { shardInfo } = waku.libp2p.components.metadata; + if (!shardInfo) { + throw new Error("Shard info missing from Waku node."); + } + + // Validate content topic and ensure node is configured for its corresponding pubsub topic + const pubsubTopics = shardInfoToPubsubTopics(shardInfo); + const pubsubTopic = contentTopicToPubsubTopic(contentTopic); + if (!pubsubTopics.includes(pubsubTopic)) + throw new Error( + "Content topic does not match any pubsub topic in shard info." + ); + + await waku.dial(peer); + await waitForRemotePeer(waku, [Protocols.Filter]); + + // Create decoder and subscription + let decoder = createDecoder(contentTopic, pubsubTopic); + if (decoder) decoder = decoder ?? decoder; + const subscription = await waku.filter.createSubscription(pubsubTopic); + + return { decoder, subscription }; +} + +/** + * Creates a subscription and streams all new messages for a content topic. + * Will create a light node configured for the content topic with default settings if a node is not provided in `opts`. + * Assumes node is using autosharding. + * @param contentTopic + * @param opts + */ +export async function streamContentTopic( + contentTopic: string, + opts: CreateTopicOptions +): Promise<[ReadableStream, LightNode]> { + opts.waku = + opts.waku ?? + (await createLightNode({ + shardInfo: { contentTopics: [contentTopic] } + })); + const { decoder, subscription } = await prepareSubscription( + opts.waku, + contentTopic, + opts.peer + ); + + // Create a ReadableStream that receives any messages for the content topic + const messageStream = new ReadableStream({ + async start(controller) { + await subscription.subscribe(decoder, (message) => { + controller.enqueue(message); + }); + }, + cancel() { + return subscription.unsubscribe([contentTopic]); + } + }); + return [messageStream, opts.waku]; +} + +/** + * Subscribes to new messages for a content topic via callback function. + * Will create a light node configured for the content topic with default settings if a node is not provided in `opts`. + * Assumes node is using autosharding. + * @param contentTopic + * @param callback Called every time a new message is received on the content topic + * @param opts + */ +export async function subscribeToContentTopic( + contentTopic: string, + callback: Callback, + opts: CreateTopicOptions +): Promise<{ subscription: IFilterSubscription; waku: LightNode }> { + opts.waku = + opts.waku ?? + (await createLightNode({ + shardInfo: { contentTopics: [contentTopic] } + })); + const { decoder, subscription } = await prepareSubscription( + opts.waku, + contentTopic, + opts.peer + ); + await subscription.subscribe(decoder, callback); + return { subscription, waku: opts.waku }; +} diff --git a/packages/sdk/src/create.ts b/packages/sdk/src/create.ts index c852890ca..3cdb809eb 100644 --- a/packages/sdk/src/create.ts +++ b/packages/sdk/src/create.ts @@ -6,15 +6,7 @@ import { mplex } from "@libp2p/mplex"; import { ping } from "@libp2p/ping"; import { webSockets } from "@libp2p/websockets"; import { all as filterAll } from "@libp2p/websockets/filters"; -import { - DefaultUserAgent, - wakuFilter, - wakuLightPush, - wakuMetadata, - WakuNode, - WakuOptions, - wakuStore -} from "@waku/core"; +import { wakuFilter, wakuLightPush, wakuMetadata, wakuStore } from "@waku/core"; import { enrTree, wakuDnsDiscovery } from "@waku/dns-discovery"; import { type CreateLibp2pOptions, @@ -34,6 +26,8 @@ import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay"; import { ensureShardingConfigured } from "@waku/utils"; import { createLibp2p } from "libp2p"; +import { DefaultUserAgent, WakuNode, WakuOptions } from "./waku.js"; + const DEFAULT_NODE_REQUIREMENTS = { lightPush: 1, filter: 1, diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index fbe6f66cc..723105f1d 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -1,9 +1,4 @@ -export { - waitForRemotePeer, - createEncoder, - createDecoder, - WakuNode -} from "@waku/core"; +export { waitForRemotePeer, createEncoder, createDecoder } from "@waku/core"; export { DecodedMessage, Decoder, @@ -12,6 +7,8 @@ export { export { utf8ToBytes, bytesToUtf8 } from "@waku/utils/bytes"; +export * from "./content_topic.js"; +export * from "./waku.js"; export * from "./create.js"; export * as waku from "@waku/core"; export * as utils from "@waku/utils"; diff --git a/packages/sdk/src/relay/index.ts b/packages/sdk/src/relay/index.ts index 6a993852f..9e90697c6 100644 --- a/packages/sdk/src/relay/index.ts +++ b/packages/sdk/src/relay/index.ts @@ -1,4 +1,3 @@ -import { WakuNode, WakuOptions } from "@waku/core"; import { DefaultPubsubTopic, type ProtocolCreateOptions, @@ -8,6 +7,7 @@ import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay"; import { ensureShardingConfigured } from "@waku/utils"; import { defaultLibp2p, defaultPeerDiscoveries } from "../create.js"; +import { WakuNode, WakuOptions } from "../waku.js"; /** * Create a Waku node that uses Waku Relay to send and receive messages, diff --git a/packages/core/src/lib/waku.ts b/packages/sdk/src/waku.ts similarity index 92% rename from packages/core/src/lib/waku.ts rename to packages/sdk/src/waku.ts index dabf3ca5d..a4649b5be 100644 --- a/packages/core/src/lib/waku.ts +++ b/packages/sdk/src/waku.ts @@ -1,19 +1,23 @@ import type { Stream } from "@libp2p/interface"; import { isPeerId, PeerId } from "@libp2p/interface"; import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr"; +import { ConnectionManager, DecodedMessage } from "@waku/core"; import type { + Callback, IFilter, + IFilterSubscription, ILightPush, IRelay, IStore, Libp2p, + LightNode, PubsubTopic, Waku } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { Logger } from "@waku/utils"; -import { ConnectionManager } from "./connection_manager.js"; +import { subscribeToContentTopic } from "./content_topic.js"; export const DefaultPingKeepAliveValueSecs = 5 * 60; export const DefaultRelayKeepAliveValueSecs = 5 * 60; @@ -180,6 +184,19 @@ export class WakuNode implements Waku { await this.libp2p.stop(); } + async subscribeToContentTopic( + contentTopic: string, + peer: Multiaddr, + callback: Callback + ): Promise { + return ( + await subscribeToContentTopic(contentTopic, callback, { + waku: this as LightNode, + peer + }) + ).subscription; + } + isStarted(): boolean { return this.libp2p.status == "started"; } diff --git a/packages/tests/tests/sdk/content_topic.spec.ts b/packages/tests/tests/sdk/content_topic.spec.ts new file mode 100644 index 000000000..32776a7d7 --- /dev/null +++ b/packages/tests/tests/sdk/content_topic.spec.ts @@ -0,0 +1,175 @@ +import { wakuFilter } from "@waku/core"; +import { + bytesToUtf8, + createEncoder, + createLightNode, + DEFAULT_CLUSTER_ID, + defaultLibp2p, + LightNode, + Protocols, + streamContentTopic, + subscribeToContentTopic, + utf8ToBytes, + waitForRemotePeer, + WakuNode +} from "@waku/sdk"; +import { + contentTopicToPubsubTopic, + ensureShardingConfigured, + pubsubTopicToSingleShardInfo +} from "@waku/utils"; +import { expect } from "chai"; + +import { makeLogFileName, ServiceNode, tearDownNodes } from "../../src"; + +describe("SDK: Creating by Content Topic", function () { + const ContentTopic = "/myapp/1/latest/proto"; + const testMessage = "Test123"; + let nwaku: ServiceNode; + let waku: LightNode; + let waku2: LightNode; + + beforeEach(async function () { + this.timeout(15000); + nwaku = new ServiceNode(makeLogFileName(this) + "1"); + await nwaku.start({ + pubsubTopic: [contentTopicToPubsubTopic(ContentTopic)], + lightpush: true, + relay: true, + filter: true, + discv5Discovery: true, + peerExchange: true, + clusterId: DEFAULT_CLUSTER_ID + }); + }); + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes(nwaku, [waku, waku2]); + }); + + it("given a content topic, creates a waku node and filter subscription", async function () { + const expectedPubsubTopic = contentTopicToPubsubTopic(ContentTopic); + + waku = ( + await subscribeToContentTopic(ContentTopic, () => {}, { + peer: await nwaku.getMultiaddrWithId() + }) + ).waku; + + expect((waku as WakuNode).pubsubTopics).to.include(expectedPubsubTopic); + }); + + it("given a waku node and content topic, creates a filter subscription", async function () { + const expectedPubsubTopic = contentTopicToPubsubTopic(ContentTopic); + + waku = await createLightNode({ + shardInfo: { contentTopics: [ContentTopic] } + }); + await subscribeToContentTopic(ContentTopic, () => {}, { + waku, + peer: await nwaku.getMultiaddrWithId() + }); + + expect((waku as WakuNode).pubsubTopics).to.include(expectedPubsubTopic); + }); + + it("receives messages sent to provided content topic through callback", async function () { + const messages: string[] = []; + waku = ( + await subscribeToContentTopic( + ContentTopic, + (msg) => { + messages.push(bytesToUtf8(msg.payload)); + }, + { + peer: await nwaku.getMultiaddrWithId() + } + ) + ).waku; + + waku2 = await createLightNode({ + shardInfo: { contentTopics: [ContentTopic] } + }); + await waku2.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku2, [Protocols.LightPush]); + const encoder = createEncoder({ + pubsubTopicShardInfo: pubsubTopicToSingleShardInfo( + contentTopicToPubsubTopic(ContentTopic) + ), + contentTopic: ContentTopic + }); + await waku2.lightPush?.send(encoder, { + payload: utf8ToBytes(testMessage) + }); + + expect(messages[0]).to.be.eq(testMessage); + }); + + it("receives messages sent to provided content topic through callback (Waku class)", async function () { + const messages: string[] = []; + const shardInfo = ensureShardingConfigured({ + contentTopics: [ContentTopic] + }); + const wakuContentTopic = new WakuNode( + { + pubsubTopics: shardInfo.pubsubTopics + }, + await defaultLibp2p(shardInfo.shardInfo, undefined, {}, undefined), + undefined, + undefined, + wakuFilter({ pubsubTopics: shardInfo.pubsubTopics }) + ); + await wakuContentTopic.subscribeToContentTopic( + ContentTopic, + await nwaku.getMultiaddrWithId(), + (msg) => { + messages.push(bytesToUtf8(msg.payload)); + } + ); + + waku2 = await createLightNode({ + shardInfo: { contentTopics: [ContentTopic] } + }); + await waku2.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku2, [Protocols.LightPush]); + const encoder = createEncoder({ + pubsubTopicShardInfo: pubsubTopicToSingleShardInfo( + contentTopicToPubsubTopic(ContentTopic) + ), + contentTopic: ContentTopic + }); + await waku2.lightPush?.send(encoder, { + payload: utf8ToBytes(testMessage) + }); + + expect(messages[0]).to.be.eq(testMessage); + }); + + it("receives messages sent to provided content topic through stream", async function () { + let stream; + [stream, waku] = await streamContentTopic(ContentTopic, { + peer: await nwaku.getMultiaddrWithId() + }); + + waku2 = await createLightNode({ + shardInfo: { contentTopics: [ContentTopic] } + }); + await waku2.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku2, [Protocols.LightPush]); + + const encoder = createEncoder({ + pubsubTopicShardInfo: pubsubTopicToSingleShardInfo( + contentTopicToPubsubTopic(ContentTopic) + ), + contentTopic: ContentTopic + }); + await waku2.lightPush?.send(encoder, { + payload: utf8ToBytes(testMessage) + }); + + const reader = stream.getReader(); + const { value: message } = await reader.read(); + expect(bytesToUtf8(message!.payload)).to.be.eq(testMessage); + }); +}); diff --git a/packages/tests/tests/waku.node.spec.ts b/packages/tests/tests/waku.node.spec.ts index b19a06bca..1dc556ebd 100644 --- a/packages/tests/tests/waku.node.spec.ts +++ b/packages/tests/tests/waku.node.spec.ts @@ -1,10 +1,6 @@ import { bootstrap } from "@libp2p/bootstrap"; import type { PeerId } from "@libp2p/interface"; -import { - DecodedMessage, - DefaultUserAgent, - waitForRemotePeer -} from "@waku/core"; +import { DecodedMessage, waitForRemotePeer } from "@waku/core"; import type { LightNode, RelayNode, Waku } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { generateSymmetricKey } from "@waku/message-encryption"; @@ -14,7 +10,8 @@ import { } from "@waku/message-encryption/symmetric"; import { createLightNode, - createEncoder as createPlainEncoder + createEncoder as createPlainEncoder, + DefaultUserAgent } from "@waku/sdk"; import { createRelayNode } from "@waku/sdk/relay"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";