diff --git a/package-lock.json b/package-lock.json index ac187457c4..038a43df53 100644 --- a/package-lock.json +++ b/package-lock.json @@ -6,9 +6,9 @@ "": { "name": "@waku/root", "workspaces": [ + "packages/interfaces", "packages/utils", "packages/proto", - "packages/interfaces", "packages/enr", "packages/core", "packages/message-hash", @@ -29895,6 +29895,7 @@ "@typescript-eslint/eslint-plugin": "^5.57.0", "@typescript-eslint/parser": "^5.51.0", "@waku/build-utils": "*", + "@waku/interfaces": "0.0.11", "cspell": "^6.31.1", "eslint": "^8.35.0", "eslint-config-prettier": "^8.6.0", @@ -35019,6 +35020,7 @@ "@typescript-eslint/eslint-plugin": "^5.57.0", "@typescript-eslint/parser": "^5.51.0", "@waku/build-utils": "*", + "@waku/interfaces": "0.0.11", "cspell": "^6.31.1", "debug": "^4.3.4", "eslint": "^8.35.0", diff --git a/package.json b/package.json index 23bd821123..9b5b3ac587 100644 --- a/package.json +++ b/package.json @@ -3,9 +3,9 @@ "private": true, "type": "module", "workspaces": [ + "packages/interfaces", "packages/utils", "packages/proto", - "packages/interfaces", "packages/enr", "packages/core", "packages/message-hash", diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 0a03989ffb..c6002cc178 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -4,6 +4,7 @@ import type { IncomingStreamData } from "@libp2p/interface-registrar"; import type { ActiveSubscriptions, Callback, + IAsyncIterator, IDecodedMessage, IDecoder, IFilter, @@ -11,6 +12,7 @@ import type { ProtocolOptions, } from "@waku/interfaces"; import { WakuMessage as WakuMessageProto } from "@waku/proto"; +import { toAsyncIterator } from "@waku/utils"; import debug from "debug"; import all from "it-all"; import * as lp from "it-length-prefixed"; @@ -124,6 +126,13 @@ class Filter extends BaseProtocol implements IFilter { }; } + public toSubscriptionIterator( + decoders: IDecoder | IDecoder[], + opts?: ProtocolOptions | undefined + ): Promise> { + return toAsyncIterator(this, decoders, opts); + } + public getActiveSubscriptions(): ActiveSubscriptions { const map: ActiveSubscriptions = new Map(); const subscriptions = this.subscriptions as Map< diff --git a/packages/core/src/lib/relay/index.ts b/packages/core/src/lib/relay/index.ts index 4ec9b62375..303d1bd0da 100644 --- a/packages/core/src/lib/relay/index.ts +++ b/packages/core/src/lib/relay/index.ts @@ -12,14 +12,17 @@ import { sha256 } from "@noble/hashes/sha256"; import type { ActiveSubscriptions, Callback, + IAsyncIterator, IDecodedMessage, IDecoder, IEncoder, IMessage, IRelay, ProtocolCreateOptions, + ProtocolOptions, SendResult, } from "@waku/interfaces"; +import { toAsyncIterator } from "@waku/utils"; import debug from "debug"; import { DefaultPubSubTopic } from "../constants.js"; @@ -146,6 +149,13 @@ class Relay implements IRelay { }; } + public toSubscriptionIterator( + decoders: IDecoder | IDecoder[], + opts?: ProtocolOptions | undefined + ): Promise> { + return toAsyncIterator(this, decoders, opts); + } + public getActiveSubscriptions(): ActiveSubscriptions { const map = new Map(); map.set(this.pubSubTopic, this.observers.keys()); diff --git a/packages/interfaces/src/index.ts b/packages/interfaces/src/index.ts index 24a4d26cfb..c1f22589e3 100644 --- a/packages/interfaces/src/index.ts +++ b/packages/interfaces/src/index.ts @@ -10,3 +10,4 @@ export * from "./waku.js"; export * from "./connection_manager.js"; export * from "./sender.js"; export * from "./receiver.js"; +export * from "./misc.js"; diff --git a/packages/interfaces/src/misc.ts b/packages/interfaces/src/misc.ts new file mode 100644 index 0000000000..445bee1d37 --- /dev/null +++ b/packages/interfaces/src/misc.ts @@ -0,0 +1,11 @@ +import type { IDecodedMessage } from "./message.js"; + +export interface IAsyncIterator { + iterator: AsyncIterator; + stop: Unsubscribe; +} + +export type Unsubscribe = () => void | Promise; + +export type PubSubTopic = string; +export type ContentTopic = string; diff --git a/packages/interfaces/src/receiver.ts b/packages/interfaces/src/receiver.ts index 055854ca98..f528ca4310 100644 --- a/packages/interfaces/src/receiver.ts +++ b/packages/interfaces/src/receiver.ts @@ -1,13 +1,19 @@ import type { IDecodedMessage, IDecoder } from "./message.js"; +import type { + ContentTopic, + IAsyncIterator, + PubSubTopic, + Unsubscribe, +} from "./misc.js"; import type { Callback, ProtocolOptions } from "./protocols.js"; -type Unsubscribe = () => void | Promise; -type PubSubTopic = string; -type ContentTopic = string; - export type ActiveSubscriptions = Map; export interface IReceiver { + toSubscriptionIterator: ( + decoders: IDecoder | IDecoder[], + opts?: ProtocolOptions + ) => Promise>; subscribe: ( decoders: IDecoder | IDecoder[], callback: Callback, diff --git a/packages/tests/tests/utils.spec.ts b/packages/tests/tests/utils.spec.ts new file mode 100644 index 0000000000..0be264571e --- /dev/null +++ b/packages/tests/tests/utils.spec.ts @@ -0,0 +1,98 @@ +import { + createDecoder, + createEncoder, + DefaultPubSubTopic, + waitForRemotePeer, +} from "@waku/core"; +import { createLightNode } from "@waku/create"; +import type { LightNode } from "@waku/interfaces"; +import { Protocols } from "@waku/interfaces"; +import { toAsyncIterator } from "@waku/utils"; +import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; + +import { makeLogFileName, NOISE_KEY_1, Nwaku } from "../src/index.js"; + +const TestContentTopic = "/test/1/waku-filter"; +const TestEncoder = createEncoder({ contentTopic: TestContentTopic }); +const TestDecoder = createDecoder(TestContentTopic); + +describe("Util: toAsyncIterator", () => { + let waku: LightNode; + let nwaku: Nwaku; + + beforeEach(async function () { + this.timeout(15000); + nwaku = new Nwaku(makeLogFileName(this)); + await nwaku.start({ filter: true, lightpush: true, relay: true }); + waku = await createLightNode({ + staticNoiseKey: NOISE_KEY_1, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }, + }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); + }); + + afterEach(async () => { + try { + await nwaku.stop(); + await waku.stop(); + } catch (err) { + console.log("Failed to stop", err); + } + }); + + it("creates an iterator", async function () { + const messageText = "hey, what's up?"; + const sent = { payload: utf8ToBytes(messageText) }; + + const { iterator } = await toAsyncIterator(waku.filter, TestDecoder); + + await waku.lightPush.send(TestEncoder, sent); + const { value } = await iterator.next(); + + expect(value.contentTopic).to.eq(TestContentTopic); + expect(value.pubSubTopic).to.eq(DefaultPubSubTopic); + expect(bytesToUtf8(value.payload)).to.eq(messageText); + }); + + it("handles multiple messages", async function () { + const { iterator } = await toAsyncIterator(waku.filter, TestDecoder); + + await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes("Filtering works!"), + }); + await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes("Filtering still works!"), + }); + + let result = await iterator.next(); + expect(bytesToUtf8(result.value.payload)).to.eq("Filtering works!"); + + result = await iterator.next(); + expect(bytesToUtf8(result.value.payload)).to.eq("Filtering still works!"); + }); + + it("unsubscribes", async function () { + const { iterator, stop } = await toAsyncIterator(waku.filter, TestDecoder); + + await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes("This should be received"), + }); + + await stop(); + + await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes("This should not be received"), + }); + + let result = await iterator.next(); + expect(result.done).to.eq(true); + expect(bytesToUtf8(result.value.payload)).to.eq("This should be received"); + + result = await iterator.next(); + expect(result.value).to.eq(undefined); + expect(result.done).to.eq(true); + }); +}); diff --git a/packages/utils/package.json b/packages/utils/package.json index 03625f46c1..51564f923d 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -78,6 +78,7 @@ "@typescript-eslint/eslint-plugin": "^5.57.0", "@typescript-eslint/parser": "^5.51.0", "@waku/build-utils": "*", + "@waku/interfaces": "*", "cspell": "^6.31.1", "eslint": "^8.35.0", "eslint-config-prettier": "^8.6.0", diff --git a/packages/utils/src/common/index.ts b/packages/utils/src/common/index.ts index 96e75ca55d..c4cbcb4e38 100644 --- a/packages/utils/src/common/index.ts +++ b/packages/utils/src/common/index.ts @@ -1,2 +1,3 @@ export * from "./is_defined.js"; export * from "./random_subset.js"; +export * from "./to_async_iterator.js"; diff --git a/packages/utils/src/common/to_async_iterator.ts b/packages/utils/src/common/to_async_iterator.ts new file mode 100644 index 0000000000..74d0218911 --- /dev/null +++ b/packages/utils/src/common/to_async_iterator.ts @@ -0,0 +1,51 @@ +import type { + IAsyncIterator, + IDecodedMessage, + IDecoder, + IReceiver, + ProtocolOptions, + Unsubscribe, +} from "@waku/interfaces"; + +export async function toAsyncIterator( + receiver: IReceiver, + decoder: IDecoder | IDecoder[], + options?: ProtocolOptions +): Promise> { + const messages: T[] = []; + + let unsubscribe: undefined | Unsubscribe; + unsubscribe = await receiver.subscribe( + decoder, + (message: T) => { + messages.push(message); + }, + options + ); + + async function* iterator(): AsyncIterator { + while (true) { + const message = messages.shift() as T; + + if (!unsubscribe && messages.length === 0) { + return message; + } + + if (!message && unsubscribe) { + continue; + } + + yield message; + } + } + + return { + iterator: iterator(), + async stop() { + if (unsubscribe) { + await unsubscribe(); + unsubscribe = undefined; + } + }, + }; +}