chore: remove content_topic specific API (#2081)

This commit is contained in:
Sasha 2024-07-23 17:25:36 +02:00 committed by GitHub
parent a739ada33a
commit 04bd518210
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 1 additions and 325 deletions

View File

@ -7,7 +7,6 @@ export {
export { utf8ToBytes, bytesToUtf8 } from "@waku/utils/bytes";
export * from "./utils/content_topic.js";
export * from "./waku.js";
export { createLightNode, defaultLibp2p } from "./create/index.js";

View File

@ -1,125 +0,0 @@
import type { Multiaddr } from "@multiformats/multiaddr";
import { createDecoder, DecodedMessage, waitForRemotePeer } from "@waku/core";
import {
Callback,
IDecoder,
ISubscriptionSDK,
LightNode,
Protocols
} from "@waku/interfaces";
import {
contentTopicToPubsubTopic,
shardInfoToPubsubTopics
} from "@waku/utils";
import { createLightNode } from "../create/index.js";
interface CreateTopicOptions {
waku?: LightNode;
peer: Multiaddr;
}
// Given a Waku node, peer Multiaddr, and content topic, creates a decoder and
// subscription for that content topic.
async function prepareSubscription(
waku: LightNode,
contentTopic: string,
peer: Multiaddr
): Promise<{
decoder: IDecoder<DecodedMessage>;
subscription: ISubscriptionSDK;
}> {
// Validate that the Waku node matches assumptions
if (!waku.filter) {
throw new Error("Filter protocol missing from Waku node");
}
const { shardInfo } = waku.libp2p.components.metadata;
if (!shardInfo) {
throw new Error("Shard info missing from Waku node.");
}
// Validate content topic and ensure node is configured for its corresponding pubsub topic
const pubsubTopics = shardInfoToPubsubTopics(shardInfo);
const pubsubTopic = contentTopicToPubsubTopic(contentTopic);
if (!pubsubTopics.includes(pubsubTopic))
throw new Error(
"Content topic does not match any pubsub topic in shard info."
);
await waku.dial(peer);
await waitForRemotePeer(waku, [Protocols.Filter]);
// Create decoder and subscription
let decoder = createDecoder(contentTopic, pubsubTopic);
if (decoder) decoder = decoder ?? decoder;
const { subscription, error } =
await waku.filter.createSubscription(pubsubTopic);
if (error)
throw new Error("Failed to create subscription for content topic.");
return { decoder, subscription };
}
/**
* Creates a subscription and streams all new messages for a content topic.
* Will create a light node configured for the content topic with default settings if a node is not provided in `opts`.
* Assumes node is using autosharding.
* @param contentTopic
* @param opts
*/
export async function streamContentTopic(
contentTopic: string,
opts: CreateTopicOptions
): Promise<[ReadableStream<DecodedMessage>, LightNode]> {
opts.waku =
opts.waku ??
(await createLightNode({
shardInfo: { contentTopics: [contentTopic] }
}));
const { decoder, subscription } = await prepareSubscription(
opts.waku,
contentTopic,
opts.peer
);
// Create a ReadableStream that receives any messages for the content topic
const messageStream = new ReadableStream<DecodedMessage>({
async start(controller) {
await subscription.subscribe(decoder, (message) => {
controller.enqueue(message);
});
},
async cancel() {
await subscription.unsubscribe([contentTopic]);
}
});
return [messageStream, opts.waku];
}
/**
* Subscribes to new messages for a content topic via callback function.
* Will create a light node configured for the content topic with default settings if a node is not provided in `opts`.
* Assumes node is using autosharding.
* @param contentTopic
* @param callback Called every time a new message is received on the content topic
* @param opts
*/
export async function subscribeToContentTopic(
contentTopic: string,
callback: Callback<DecodedMessage>,
opts: CreateTopicOptions
): Promise<{ subscription: ISubscriptionSDK; waku: LightNode }> {
opts.waku =
opts.waku ??
(await createLightNode({
shardInfo: { contentTopics: [contentTopic] }
}));
const { decoder, subscription } = await prepareSubscription(
opts.waku,
contentTopic,
opts.peer
);
await subscription.subscribe(decoder, callback);
return { subscription, waku: opts.waku };
}

View File

@ -1,16 +1,13 @@
import type { Stream } from "@libp2p/interface";
import { isPeerId, PeerId } from "@libp2p/interface";
import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr";
import { ConnectionManager, DecodedMessage } from "@waku/core";
import { ConnectionManager } from "@waku/core";
import type {
Callback,
IFilterSDK,
ILightPushSDK,
IRelay,
IStoreSDK,
ISubscriptionSDK,
Libp2p,
LightNode,
ProtocolCreateOptions,
PubsubTopic,
Waku
@ -22,7 +19,6 @@ import { Logger } from "@waku/utils";
import { wakuFilter } from "./protocols/filter.js";
import { wakuLightPush } from "./protocols/light_push.js";
import { wakuStore } from "./protocols/store.js";
import { subscribeToContentTopic } from "./utils/content_topic.js";
export const DefaultPingKeepAliveValueSecs = 5 * 60;
export const DefaultRelayKeepAliveValueSecs = 5 * 60;
@ -211,19 +207,6 @@ export class WakuNode implements Waku {
await this.libp2p.stop();
}
public async subscribeToContentTopic(
contentTopic: string,
peer: Multiaddr,
callback: Callback<DecodedMessage>
): Promise<ISubscriptionSDK> {
return (
await subscribeToContentTopic(contentTopic, callback, {
waku: this as LightNode,
peer
})
).subscription;
}
public isStarted(): boolean {
return this.libp2p.status == "started";
}

View File

@ -1,181 +0,0 @@
import {
bytesToUtf8,
createEncoder,
createLightNode,
defaultLibp2p,
LightNode,
Protocols,
streamContentTopic,
subscribeToContentTopic,
utf8ToBytes,
waitForRemotePeer,
WakuNode
} from "@waku/sdk";
import {
contentTopicToPubsubTopic,
ensureShardingConfigured,
pubsubTopicToSingleShardInfo
} from "@waku/utils";
import { expect } from "chai";
import { makeLogFileName, ServiceNode, tearDownNodes } from "../../src";
// skipped: https://github.com/waku-org/js-waku/issues/1914
describe.skip("SDK: Creating by Content Topic", function () {
const ContentTopic = "/myapp/1/latest/proto";
const testMessage = "Test123";
const clusterId = 2;
let nwaku: ServiceNode;
let waku: LightNode;
let waku2: LightNode;
beforeEach(async function () {
this.timeout(15000);
nwaku = new ServiceNode(makeLogFileName(this) + "1");
await nwaku.start({
pubsubTopic: [contentTopicToPubsubTopic(ContentTopic, clusterId)],
lightpush: true,
relay: true,
filter: true,
discv5Discovery: true,
peerExchange: true,
clusterId: clusterId
});
});
afterEach(async function () {
this.timeout(15000);
await tearDownNodes(nwaku, [waku, waku2]);
});
it("given a content topic, creates a waku node and filter subscription", async function () {
const expectedPubsubTopic = contentTopicToPubsubTopic(
ContentTopic,
clusterId
);
waku = (
await subscribeToContentTopic(ContentTopic, () => {}, {
peer: await nwaku.getMultiaddrWithId()
})
).waku;
expect((waku as WakuNode).pubsubTopics).to.include(expectedPubsubTopic);
});
it("given a waku node and content topic, creates a filter subscription", async function () {
const expectedPubsubTopic = contentTopicToPubsubTopic(
ContentTopic,
clusterId
);
waku = await createLightNode({
shardInfo: { contentTopics: [ContentTopic] }
});
await subscribeToContentTopic(ContentTopic, () => {}, {
waku,
peer: await nwaku.getMultiaddrWithId()
});
expect((waku as WakuNode).pubsubTopics).to.include(expectedPubsubTopic);
});
it("receives messages sent to provided content topic through callback", async function () {
const messages: string[] = [];
waku = (
await subscribeToContentTopic(
ContentTopic,
(msg) => {
messages.push(bytesToUtf8(msg.payload));
},
{
peer: await nwaku.getMultiaddrWithId()
}
)
).waku;
waku2 = await createLightNode({
shardInfo: { contentTopics: [ContentTopic] }
});
await waku2.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku2, [Protocols.LightPush]);
const encoder = createEncoder({
pubsubTopicShardInfo: pubsubTopicToSingleShardInfo(
contentTopicToPubsubTopic(ContentTopic, clusterId)
),
contentTopic: ContentTopic
});
await waku2.lightPush?.send(encoder, {
payload: utf8ToBytes(testMessage)
});
expect(messages[0]).to.be.eq(testMessage);
});
it("receives messages sent to provided content topic through callback (Waku class)", async function () {
const messages: string[] = [];
const shardInfo = ensureShardingConfigured({
contentTopics: [ContentTopic]
});
const wakuContentTopic = new WakuNode(
{
pubsubTopics: shardInfo.pubsubTopics
},
await defaultLibp2p(shardInfo.shardInfo, undefined, {}, undefined),
{
filter: true
}
);
await wakuContentTopic.subscribeToContentTopic(
ContentTopic,
await nwaku.getMultiaddrWithId(),
(msg) => {
messages.push(bytesToUtf8(msg.payload));
}
);
waku2 = await createLightNode({
shardInfo: { contentTopics: [ContentTopic] }
});
await waku2.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku2, [Protocols.LightPush]);
const encoder = createEncoder({
pubsubTopicShardInfo: pubsubTopicToSingleShardInfo(
contentTopicToPubsubTopic(ContentTopic, clusterId)
),
contentTopic: ContentTopic
});
await waku2.lightPush?.send(encoder, {
payload: utf8ToBytes(testMessage)
});
expect(messages[0]).to.be.eq(testMessage);
});
it("receives messages sent to provided content topic through stream", async function () {
let stream;
[stream, waku] = await streamContentTopic(ContentTopic, {
peer: await nwaku.getMultiaddrWithId()
});
waku2 = await createLightNode({
shardInfo: { contentTopics: [ContentTopic] }
});
await waku2.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku2, [Protocols.LightPush]);
const encoder = createEncoder({
pubsubTopicShardInfo: pubsubTopicToSingleShardInfo(
contentTopicToPubsubTopic(ContentTopic, clusterId)
),
contentTopic: ContentTopic
});
await waku2.lightPush?.send(encoder, {
payload: utf8ToBytes(testMessage)
});
const reader = stream.getReader();
const { value: message } = await reader.read();
expect(bytesToUtf8(message!.payload)).to.be.eq(testMessage);
});
});