From e11e5b4870aede7813b3ee4b60f5e625f6eac5a2 Mon Sep 17 00:00:00 2001 From: Sasha <118575614+weboko@users.noreply.github.com> Date: Fri, 31 Mar 2023 03:17:41 +0200 Subject: [PATCH] feat!: add and implement IReceiver (#1219) - remove extend Relay by GossipSub and use it as public property; - detach GossipSub initialisation; --- packages/core/src/index.ts | 6 +- packages/core/src/lib/filter/index.ts | 30 ++- packages/core/src/lib/relay/index.ts | 173 +++++++++++++----- packages/core/src/lib/wait_for_remote_peer.ts | 2 +- packages/core/src/lib/waku.ts | 26 +-- packages/create/src/index.ts | 26 ++- packages/interfaces/src/filter.ts | 16 +- packages/interfaces/src/index.ts | 1 + packages/interfaces/src/receiver.ts | 17 ++ packages/interfaces/src/relay.ts | 20 +- packages/tests/tests/filter.node.spec.ts | 2 +- packages/tests/tests/relay.node.spec.ts | 24 +-- packages/tests/tests/waku.node.spec.ts | 2 +- 13 files changed, 224 insertions(+), 121 deletions(-) create mode 100644 packages/interfaces/src/receiver.ts diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 77742982bd..7e3ca1fec7 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -19,7 +19,11 @@ export * as waku_light_push from "./lib/light_push/index.js"; export { wakuLightPush, LightPushCodec } from "./lib/light_push/index.js"; export * as waku_relay from "./lib/relay/index.js"; -export { wakuRelay, RelayCreateOptions } from "./lib/relay/index.js"; +export { + wakuRelay, + RelayCreateOptions, + wakuGossipSub, +} from "./lib/relay/index.js"; export * as waku_store from "./lib/store/index.js"; export { diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 2782eb82ec..2c36851449 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -2,6 +2,7 @@ import type { Libp2p } from "@libp2p/interface-libp2p"; import type { Peer } from "@libp2p/interface-peer-store"; import type { IncomingStreamData } from "@libp2p/interface-registrar"; import type { + ActiveSubscriptions, Callback, IDecodedMessage, IDecoder, @@ -58,19 +59,20 @@ class Filter extends BaseProtocol implements IFilter { } /** - * @param decoders Array of Decoders to use to decode messages, it also specifies the content topics. + * @param decoders Decoder or array of Decoders to use to decode messages, it also specifies the content topics. * @param callback A function that will be called on each message returned by the filter. * @param opts The FilterSubscriptionOpts used to narrow which messages are returned, and which peer to connect to. * @returns Unsubscribe function that can be used to end the subscription. */ async subscribe( - decoders: IDecoder[], + decoders: IDecoder | IDecoder[], callback: Callback, opts?: ProtocolOptions ): Promise { + const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; const { pubSubTopic = DefaultPubSubTopic } = this.options; - const contentTopics = Array.from(groupByContentTopic(decoders).keys()); + const contentTopics = Array.from(groupByContentTopic(decodersArray).keys()); const contentFilters = contentTopics.map((contentTopic) => ({ contentTopic, @@ -109,7 +111,11 @@ class Filter extends BaseProtocol implements IFilter { throw e; } - const subscription: Subscription = { callback, decoders, pubSubTopic }; + const subscription: Subscription = { + callback, + decoders: decodersArray, + pubSubTopic, + }; this.subscriptions.set(requestId, subscription); return async () => { @@ -118,6 +124,22 @@ class Filter extends BaseProtocol implements IFilter { }; } + public getActiveSubscriptions(): ActiveSubscriptions { + const map: ActiveSubscriptions = new Map(); + const subscriptions = this.subscriptions as Map< + RequestID, + Subscription + >; + + for (const item of subscriptions.values()) { + const values = map.get(item.pubSubTopic) || []; + const nextValues = item.decoders.map((decoder) => decoder.contentTopic); + map.set(item.pubSubTopic, [...values, ...nextValues]); + } + + return map; + } + private onRequest(streamData: IncomingStreamData): void { log("Receiving message push"); try { diff --git a/packages/core/src/lib/relay/index.ts b/packages/core/src/lib/relay/index.ts index 40365e4f93..c1138ca3b3 100644 --- a/packages/core/src/lib/relay/index.ts +++ b/packages/core/src/lib/relay/index.ts @@ -6,6 +6,8 @@ import { } from "@chainsafe/libp2p-gossipsub"; import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types"; import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types"; +import type { Libp2p } from "@libp2p/interface-libp2p"; +import type { PubSub } from "@libp2p/interface-pubsub"; import type { ActiveSubscriptions, Callback, @@ -20,8 +22,8 @@ import type { import debug from "debug"; import { DefaultPubSubTopic } from "../constants.js"; +import { groupByContentTopic } from "../group_by.js"; import { TopicOnlyDecoder } from "../message/topic_only_message.js"; -import { pushOrInitMapSet } from "../push_or_init_map.js"; import * as constants from "./constants.js"; import { messageValidator } from "./message_validator.js"; @@ -38,14 +40,14 @@ export type ContentTopic = string; /** * Implements the [Waku v2 Relay protocol](https://rfc.vac.dev/spec/11/). - * Must be passed as a `pubsub` module to a `Libp2p` instance. - * - * @implements {require('libp2p-interfaces/src/pubsub')} + * Throws if libp2p.pubsub does not support Waku Relay */ -class Relay extends GossipSub implements IRelay { +class Relay implements IRelay { private readonly pubSubTopic: string; - defaultDecoder: IDecoder; + private defaultDecoder: IDecoder; + public static multicodec: string = constants.RelayCodecs[0]; + public readonly gossipSub: GossipSub; /** * observers called when receiving new message. @@ -53,21 +55,20 @@ class Relay extends GossipSub implements IRelay { */ private observers: Map>; - constructor( - components: GossipSubComponents, - options?: Partial - ) { - options = Object.assign(options ?? {}, { - // Ensure that no signature is included nor expected in the messages. - globalSignaturePolicy: SignaturePolicy.StrictNoSign, - fallbackToFloodsub: false, - }); - - super(components, options); - this.multicodecs = constants.RelayCodecs; + constructor(libp2p: Libp2p, options?: Partial) { + if (!this.isRelayPubSub(libp2p.pubsub)) { + throw Error( + `Failed to initialize Relay. libp2p.pubsub does not support ${Relay.multicodec}` + ); + } + this.gossipSub = libp2p.pubsub as GossipSub; this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic; + if (this.gossipSub.isStarted()) { + this.gossipSubSubscribe(this.pubSubTopic); + } + this.observers = new Map(); // TODO: User might want to decide what decoder should be used (e.g. for RLN) @@ -82,8 +83,12 @@ class Relay extends GossipSub implements IRelay { * @returns {void} */ public async start(): Promise { - await super.start(); - this.subscribe(this.pubSubTopic); + if (this.gossipSub.isStarted()) { + throw Error("GossipSub already started."); + } + + await this.gossipSub.start(); + this.gossipSubSubscribe(this.pubSubTopic); } /** @@ -96,7 +101,7 @@ class Relay extends GossipSub implements IRelay { return { recipients: [] }; } - return this.publish(this.pubSubTopic, msg); + return this.gossipSub.publish(this.pubSubTopic, msg); } /** @@ -104,22 +109,38 @@ class Relay extends GossipSub implements IRelay { * * @returns Function to delete the observer */ - addObserver( - decoder: IDecoder, + public subscribe( + decoders: IDecoder | IDecoder[], callback: Callback ): () => void { - const observer = { - decoder, - callback, - }; - const contentTopic = decoder.contentTopic; + const contentTopicToObservers = Array.isArray(decoders) + ? toObservers(decoders, callback) + : toObservers([decoders], callback); - pushOrInitMapSet(this.observers, contentTopic, observer); + for (const contentTopic of contentTopicToObservers.keys()) { + const currObservers = this.observers.get(contentTopic) || new Set(); + const newObservers = + contentTopicToObservers.get(contentTopic) || new Set(); + + this.observers.set(contentTopic, union(currObservers, newObservers)); + } return () => { - const observers = this.observers.get(contentTopic); - if (observers) { - observers.delete(observer); + for (const contentTopic of contentTopicToObservers.keys()) { + const currentObservers = this.observers.get(contentTopic) || new Set(); + const observersToRemove = + contentTopicToObservers.get(contentTopic) || new Set(); + + const nextObservers = leftMinusJoin( + currentObservers, + observersToRemove + ); + + if (nextObservers.size) { + this.observers.set(contentTopic, nextObservers); + } else { + this.observers.delete(contentTopic); + } } }; } @@ -130,6 +151,10 @@ class Relay extends GossipSub implements IRelay { return map; } + public getMeshPeers(topic?: TopicStr): PeerIdStr[] { + return this.gossipSub.getMeshPeers(topic ?? this.pubSubTopic); + } + private async processIncomingMessage( pubSubTopic: string, bytes: Uint8Array @@ -168,8 +193,8 @@ class Relay extends GossipSub implements IRelay { * * @override */ - subscribe(pubSubTopic: string): void { - this.addEventListener( + private gossipSubSubscribe(pubSubTopic: string): void { + this.gossipSub.addEventListener( "gossipsub:message", async (event: CustomEvent) => { if (event.detail.msg.topic !== pubSubTopic) return; @@ -182,24 +207,76 @@ class Relay extends GossipSub implements IRelay { } ); - this.topicValidators.set(pubSubTopic, messageValidator); - super.subscribe(pubSubTopic); + this.gossipSub.topicValidators.set(pubSubTopic, messageValidator); + this.gossipSub.subscribe(pubSubTopic); } - unsubscribe(pubSubTopic: TopicStr): void { - super.unsubscribe(pubSubTopic); - this.topicValidators.delete(pubSubTopic); - } - - getMeshPeers(topic?: TopicStr): PeerIdStr[] { - return super.getMeshPeers(topic ?? this.pubSubTopic); + private isRelayPubSub(pubsub: PubSub): boolean { + return pubsub?.multicodecs?.includes(Relay.multicodec) || false; } } -Relay.multicodec = constants.RelayCodecs[constants.RelayCodecs.length - 1]; - export function wakuRelay( - init: Partial = {} -): (components: GossipSubComponents) => IRelay { - return (components: GossipSubComponents) => new Relay(components, init); + init: Partial = {} +): (libp2p: Libp2p) => IRelay { + return (libp2p: Libp2p) => new Relay(libp2p, init); +} + +export function wakuGossipSub( + init: Partial = {} +): (components: GossipSubComponents) => GossipSub { + return (components: GossipSubComponents) => { + init = { + ...init, + // Ensure that no signature is included nor expected in the messages. + globalSignaturePolicy: SignaturePolicy.StrictNoSign, + fallbackToFloodsub: false, + }; + const pubsub = new GossipSub(components, init); + pubsub.multicodecs = constants.RelayCodecs; + return pubsub; + }; +} + +function toObservers( + decoders: IDecoder[], + callback: Callback +): Map>> { + const contentTopicToDecoders = Array.from( + groupByContentTopic(decoders).entries() + ); + + const contentTopicToObserversEntries = contentTopicToDecoders.map( + ([contentTopic, decoders]) => + [ + contentTopic, + new Set( + decoders.map( + (decoder) => + ({ + decoder, + callback, + } as Observer) + ) + ), + ] as [ContentTopic, Set>] + ); + + return new Map(contentTopicToObserversEntries); +} + +function union(left: Set, right: Set): Set { + for (const val of right.values()) { + left.add(val); + } + return left; +} + +function leftMinusJoin(left: Set, right: Set): Set { + for (const val of right.values()) { + if (left.has(val)) { + left.delete(val); + } + } + return left; } diff --git a/packages/core/src/lib/wait_for_remote_peer.ts b/packages/core/src/lib/wait_for_remote_peer.ts index af79cd94ca..3e90694c9c 100644 --- a/packages/core/src/lib/wait_for_remote_peer.ts +++ b/packages/core/src/lib/wait_for_remote_peer.ts @@ -105,7 +105,7 @@ async function waitForGossipSubPeerInMesh(waku: IRelay): Promise { let peers = waku.getMeshPeers(); while (peers.length == 0) { - await pEvent(waku, "gossipsub:heartbeat"); + await pEvent(waku.gossipSub, "gossipsub:heartbeat"); peers = waku.getMeshPeers(); } } diff --git a/packages/core/src/lib/waku.ts b/packages/core/src/lib/waku.ts index e8b7abc71d..8728b3a958 100644 --- a/packages/core/src/lib/waku.ts +++ b/packages/core/src/lib/waku.ts @@ -1,7 +1,6 @@ import type { Stream } from "@libp2p/interface-connection"; import type { Libp2p } from "@libp2p/interface-libp2p"; import type { PeerId } from "@libp2p/interface-peer-id"; -import type { PubSub } from "@libp2p/interface-pubsub"; import type { Multiaddr } from "@multiformats/multiaddr"; import type { IFilter, @@ -14,7 +13,6 @@ import { Protocols } from "@waku/interfaces"; import debug from "debug"; import { ConnectionManager } from "./connection_manager.js"; -import * as relayConstants from "./relay/constants.js"; export const DefaultPingKeepAliveValueSecs = 0; export const DefaultRelayKeepAliveValueSecs = 5 * 60; @@ -57,7 +55,8 @@ export class WakuNode implements Waku { libp2p: Libp2p, store?: (libp2p: Libp2p) => IStore, lightPush?: (libp2p: Libp2p) => ILightPush, - filter?: (libp2p: Libp2p) => IFilter + filter?: (libp2p: Libp2p) => IFilter, + relay?: (libp2p: Libp2p) => IRelay ) { this.libp2p = libp2p; @@ -71,8 +70,8 @@ export class WakuNode implements Waku { this.lightPush = lightPush(libp2p); } - if (isRelay(libp2p.pubsub)) { - this.relay = libp2p.pubsub; + if (relay) { + this.relay = relay(libp2p); } const pingKeepAlive = @@ -120,7 +119,9 @@ export class WakuNode implements Waku { const codecs: string[] = []; if (_protocols.includes(Protocols.Relay)) { if (this.relay) { - this.relay.multicodecs.forEach((codec) => codecs.push(codec)); + this.relay.gossipSub.multicodecs.forEach((codec: string) => + codecs.push(codec) + ); } else { log( "Relay codec not included in dial codec: protocol not mounted locally" @@ -188,16 +189,3 @@ export class WakuNode implements Waku { return localMultiaddr + "/p2p/" + this.libp2p.peerId.toString(); } } - -function isRelay(pubsub: PubSub): pubsub is IRelay { - if (pubsub) { - try { - return pubsub.multicodecs.includes( - relayConstants.RelayCodecs[relayConstants.RelayCodecs.length - 1] - ); - // Exception is expected if `libp2p` was not instantiated with pubsub - // eslint-disable-next-line no-empty - } catch (e) {} - } - return false; -} diff --git a/packages/create/src/index.ts b/packages/create/src/index.ts index 51999d6516..e251b6da98 100644 --- a/packages/create/src/index.ts +++ b/packages/create/src/index.ts @@ -1,3 +1,4 @@ +import type { GossipSub } from "@chainsafe/libp2p-gossipsub"; import { noise } from "@chainsafe/libp2p-noise"; import type { Libp2p } from "@libp2p/interface-libp2p"; import type { PeerDiscovery } from "@libp2p/interface-peer-discovery"; @@ -8,6 +9,7 @@ import { DefaultUserAgent, RelayCreateOptions, wakuFilter, + wakuGossipSub, wakuLightPush, WakuNode, WakuOptions, @@ -17,7 +19,6 @@ import { import { enrTree, wakuDnsDiscovery } from "@waku/dns-discovery"; import type { FullNode, - IRelay, LightNode, ProtocolCreateOptions, RelayNode, @@ -85,12 +86,21 @@ export async function createRelayNode( } const libp2p = await defaultLibp2p( - wakuRelay(options), + wakuGossipSub(options), libp2pOptions, options?.userAgent ); - return new WakuNode(options ?? {}, libp2p) as RelayNode; + const relay = wakuRelay(options); + + return new WakuNode( + options ?? {}, + libp2p, + undefined, + undefined, + undefined, + relay + ) as RelayNode; } /** @@ -117,7 +127,7 @@ export async function createFullNode( } const libp2p = await defaultLibp2p( - wakuRelay(options), + wakuGossipSub(options), libp2pOptions, options?.userAgent ); @@ -125,13 +135,15 @@ export async function createFullNode( const store = wakuStore(options); const lightPush = wakuLightPush(options); const filter = wakuFilter(options); + const relay = wakuRelay(options); return new WakuNode( options ?? {}, libp2p, store, lightPush, - filter + filter, + relay ) as FullNode; } @@ -142,7 +154,7 @@ export function defaultPeerDiscovery(): ( } export async function defaultLibp2p( - wakuRelay?: (components: Libp2pComponents) => IRelay, + wakuGossipSub?: (components: Libp2pComponents) => GossipSub, options?: Partial, userAgent?: string ): Promise { @@ -157,7 +169,7 @@ export async function defaultLibp2p( }, }, } as Libp2pOptions, - wakuRelay ? { pubsub: wakuRelay } : {}, + wakuGossipSub ? { pubsub: wakuGossipSub } : {}, options ?? {} ); diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index 526b493421..c3bd251068 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -1,14 +1,4 @@ -import type { IDecodedMessage, IDecoder } from "./message.js"; -import type { - Callback, - PointToPointProtocol, - ProtocolOptions, -} from "./protocols.js"; +import type { PointToPointProtocol } from "./protocols.js"; +import type { IReceiver } from "./receiver.js"; -export interface IFilter extends PointToPointProtocol { - subscribe: ( - decoders: IDecoder[], - callback: Callback, - opts?: ProtocolOptions - ) => Promise<() => Promise>; -} +export type IFilter = IReceiver & PointToPointProtocol; diff --git a/packages/interfaces/src/index.ts b/packages/interfaces/src/index.ts index 2d8baf54b3..24a4d26cfb 100644 --- a/packages/interfaces/src/index.ts +++ b/packages/interfaces/src/index.ts @@ -9,3 +9,4 @@ export * from "./store.js"; export * from "./waku.js"; export * from "./connection_manager.js"; export * from "./sender.js"; +export * from "./receiver.js"; diff --git a/packages/interfaces/src/receiver.ts b/packages/interfaces/src/receiver.ts new file mode 100644 index 0000000000..055854ca98 --- /dev/null +++ b/packages/interfaces/src/receiver.ts @@ -0,0 +1,17 @@ +import type { IDecodedMessage, IDecoder } from "./message.js"; +import type { Callback, ProtocolOptions } from "./protocols.js"; + +type Unsubscribe = () => void | Promise; +type PubSubTopic = string; +type ContentTopic = string; + +export type ActiveSubscriptions = Map; + +export interface IReceiver { + subscribe: ( + decoders: IDecoder | IDecoder[], + callback: Callback, + opts?: ProtocolOptions + ) => Unsubscribe | Promise; + getActiveSubscriptions: () => ActiveSubscriptions; +} diff --git a/packages/interfaces/src/relay.ts b/packages/interfaces/src/relay.ts index c602322cd1..5683b13b44 100644 --- a/packages/interfaces/src/relay.ts +++ b/packages/interfaces/src/relay.ts @@ -1,21 +1,13 @@ import type { GossipSub } from "@chainsafe/libp2p-gossipsub"; +import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types"; -import type { IDecodedMessage, IDecoder } from "./message.js"; -import type { Callback } from "./protocols.js"; +import { IReceiver } from "./receiver.js"; import type { ISender } from "./sender.js"; -type PubSubTopic = string; -type ContentTopic = string; - -export type ActiveSubscriptions = Map; - interface IRelayAPI { - addObserver: ( - decoder: IDecoder, - callback: Callback - ) => () => void; - getMeshPeers: () => string[]; - getActiveSubscriptions: () => ActiveSubscriptions | undefined; + readonly gossipSub: GossipSub; + start: () => Promise; + getMeshPeers: (topic?: TopicStr) => PeerIdStr[]; } -export type IRelay = IRelayAPI & GossipSub & ISender; +export type IRelay = IRelayAPI & ISender & IReceiver; diff --git a/packages/tests/tests/filter.node.spec.ts b/packages/tests/tests/filter.node.spec.ts index 6453e6b11e..068b00e090 100644 --- a/packages/tests/tests/filter.node.spec.ts +++ b/packages/tests/tests/filter.node.spec.ts @@ -78,7 +78,7 @@ describe("Waku Filter", () => { messageCount++; expect(msg.contentTopic).to.eq(TestContentTopic); }; - await waku.filter.subscribe([TestDecoder], callback); + await waku.filter.subscribe(TestDecoder, callback); await delay(200); await waku.lightPush.send(TestEncoder, { diff --git a/packages/tests/tests/relay.node.spec.ts b/packages/tests/tests/relay.node.spec.ts index 1438bd29cb..c3749d2498 100644 --- a/packages/tests/tests/relay.node.spec.ts +++ b/packages/tests/tests/relay.node.spec.ts @@ -121,7 +121,7 @@ describe("Waku Relay [node only]", () => { const receivedMsgPromise: Promise = new Promise( (resolve) => { - waku2.relay.addObserver(TestDecoder, resolve); + waku2.relay.subscribe([TestDecoder], resolve); } ); @@ -152,12 +152,12 @@ describe("Waku Relay [node only]", () => { const barDecoder = createDecoder(barContentTopic); const fooMessages: DecodedMessage[] = []; - waku2.relay.addObserver(fooDecoder, (msg) => { + waku2.relay.subscribe([fooDecoder], (msg) => { fooMessages.push(msg); }); const barMessages: DecodedMessage[] = []; - waku2.relay.addObserver(barDecoder, (msg) => { + waku2.relay.subscribe([barDecoder], (msg) => { barMessages.push(msg); }); @@ -207,10 +207,10 @@ describe("Waku Relay [node only]", () => { const symDecoder = createSymDecoder(symTopic, symKey); const msgs: DecodedMessage[] = []; - waku2.relay.addObserver(eciesDecoder, (wakuMsg) => { + waku2.relay.subscribe([eciesDecoder], (wakuMsg) => { msgs.push(wakuMsg); }); - waku2.relay.addObserver(symDecoder, (wakuMsg) => { + waku2.relay.subscribe([symDecoder], (wakuMsg) => { msgs.push(wakuMsg); }); @@ -239,10 +239,10 @@ describe("Waku Relay [node only]", () => { // The promise **fails** if we receive a message on this observer. const receivedMsgPromise: Promise = new Promise( (resolve, reject) => { - const deleteObserver = waku2.relay.addObserver( - createDecoder(contentTopic), + const deleteObserver = waku2.relay.subscribe( + [createDecoder(contentTopic)], reject - ); + ) as () => void; deleteObserver(); setTimeout(resolve, 500); } @@ -313,7 +313,7 @@ describe("Waku Relay [node only]", () => { const waku2ReceivedMsgPromise: Promise = new Promise( (resolve) => { - waku2.relay.addObserver(TestDecoder, resolve); + waku2.relay.subscribe([TestDecoder], resolve); } ); @@ -321,7 +321,7 @@ describe("Waku Relay [node only]", () => { // pubsub topic. const waku3NoMsgPromise: Promise = new Promise( (resolve, reject) => { - waku3.relay.addObserver(TestDecoder, reject); + waku3.relay.subscribe([TestDecoder], reject); setTimeout(resolve, 1000); } ); @@ -401,7 +401,7 @@ describe("Waku Relay [node only]", () => { const receivedMsgPromise: Promise = new Promise( (resolve) => { - waku.relay.addObserver(TestDecoder, (msg) => + waku.relay.subscribe(TestDecoder, (msg) => resolve(msg) ); } @@ -472,7 +472,7 @@ describe("Waku Relay [node only]", () => { const waku2ReceivedMsgPromise: Promise = new Promise( (resolve) => { - waku2.relay.addObserver(TestDecoder, resolve); + waku2.relay.subscribe(TestDecoder, resolve); } ); diff --git a/packages/tests/tests/waku.node.spec.ts b/packages/tests/tests/waku.node.spec.ts index d65077d077..f10c7a6f95 100644 --- a/packages/tests/tests/waku.node.spec.ts +++ b/packages/tests/tests/waku.node.spec.ts @@ -178,7 +178,7 @@ describe("Decryption Keys", () => { const receivedMsgPromise: Promise = new Promise( (resolve) => { - waku2.relay.addObserver(decoder, resolve); + waku2.relay.subscribe([decoder], resolve); } );