diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 502128dfce..d5ca907afa 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -6,7 +6,6 @@ import type { IDecodedMessage, IDecoder, IFilter, - IMessage, ProtocolCreateOptions, ProtocolOptions, } from "@waku/interfaces"; @@ -30,6 +29,12 @@ export const FilterCodec = "/vac/waku/filter/2.0.0-beta1"; const log = debug("waku:filter"); export type UnsubscribeFunction = () => Promise; +export type RequestID = string; + +type Subscription = { + decoders: IDecoder[]; + callback: Callback; +}; /** * Implements client side of the [Waku v2 Filter protocol](https://rfc.vac.dev/spec/12/). @@ -40,17 +45,12 @@ export type UnsubscribeFunction = () => Promise; */ class Filter extends BaseProtocol implements IFilter { options: ProtocolCreateOptions; - private subscriptions: Map>; - private decoders: Map< - string, // content topic - Set> - >; + private subscriptions: Map; constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) { super(FilterCodec, libp2p); this.options = options ?? {}; this.subscriptions = new Map(); - this.decoders = new Map(); this.libp2p .handle(this.multicodec, this.onRequest.bind(this)) .catch((e) => log("Failed to register filter protocol", e)); @@ -69,8 +69,7 @@ class Filter extends BaseProtocol implements IFilter { ): Promise { const { pubSubTopic = DefaultPubSubTopic } = this.options; - const groupedDecoders = groupByContentTopic(decoders); - const contentTopics = Array.from(groupedDecoders.keys()); + const contentTopics = Array.from(groupByContentTopic(decoders).keys()); const contentFilters = contentTopics.map((contentTopic) => ({ contentTopic, @@ -109,13 +108,11 @@ class Filter extends BaseProtocol implements IFilter { throw e; } - this.addDecoders(groupedDecoders); - this.addCallback(requestId, callback); + this.subscriptions.set(requestId, { callback, decoders }); return async () => { await this.unsubscribe(pubSubTopic, contentFilters, requestId, peer); - this.deleteDecoders(groupedDecoders); - this.deleteCallback(requestId); + this.subscriptions.delete(requestId); }; } @@ -142,13 +139,21 @@ class Filter extends BaseProtocol implements IFilter { } } - private async pushMessages( + private async pushMessages( requestId: string, messages: WakuMessageProto[] ): Promise { - const callback = this.subscriptions.get(requestId); - if (!callback) { - log(`No callback registered for request ID ${requestId}`); + const subscription = this.subscriptions.get(requestId) as + | Subscription + | undefined; + if (!subscription) { + log(`No subscription locally registered for request ID ${requestId}`); + return; + } + const { decoders, callback } = subscription; + + if (!decoders || !decoders.length) { + log(`No decoder registered for request ID ${requestId}`); return; } @@ -159,18 +164,12 @@ class Filter extends BaseProtocol implements IFilter { return; } - const decoders = this.decoders.get(contentTopic); - if (!decoders) { - log("No decoder for", contentTopic); - return; - } - - let msg: IMessage | undefined; + let didDecodeMsg = false; // We don't want to wait for decoding failure, just attempt to decode // all messages and do the call back on the one that works // noinspection ES6MissingAwait - decoders.forEach(async (dec) => { - if (msg) return; + decoders.forEach(async (dec: IDecoder) => { + if (didDecodeMsg) return; const decoded = await dec.fromProtoObj(toProtoMessage(protoMessage)); if (!decoded) { log("Not able to decode message"); @@ -178,46 +177,12 @@ class Filter extends BaseProtocol implements IFilter { } // This is just to prevent more decoding attempt // TODO: Could be better if we were to abort promises - msg = decoded; + didDecodeMsg = Boolean(decoded); await callback(decoded); }); } } - private addCallback(requestId: string, callback: Callback): void { - this.subscriptions.set(requestId, callback); - } - - private deleteCallback(requestId: string): void { - this.subscriptions.delete(requestId); - } - - private addDecoders( - decoders: Map>> - ): void { - decoders.forEach((decoders, contentTopic) => { - const currDecs = this.decoders.get(contentTopic); - if (!currDecs) { - this.decoders.set(contentTopic, new Set(decoders)); - } else { - this.decoders.set(contentTopic, new Set([...currDecs, ...decoders])); - } - }); - } - - private deleteDecoders( - decoders: Map>> - ): void { - decoders.forEach((decoders, contentTopic) => { - const currDecs = this.decoders.get(contentTopic); - if (currDecs) { - decoders.forEach((dec) => { - currDecs.delete(dec); - }); - } - }); - } - private async unsubscribe( topic: string, contentFilters: ContentFilter[], diff --git a/packages/core/src/lib/group_by.ts b/packages/core/src/lib/group_by.ts index e270ae6b71..6a21d2b278 100644 --- a/packages/core/src/lib/group_by.ts +++ b/packages/core/src/lib/group_by.ts @@ -1,5 +1,5 @@ export function groupByContentTopic( - values: T[] + values: readonly T[] ): Map> { const groupedDecoders = new Map(); values.forEach((value) => { diff --git a/packages/core/src/lib/relay/index.ts b/packages/core/src/lib/relay/index.ts index 8768c0209b..6f2355b326 100644 --- a/packages/core/src/lib/relay/index.ts +++ b/packages/core/src/lib/relay/index.ts @@ -32,6 +32,7 @@ export type Observer = { }; export type RelayCreateOptions = ProtocolCreateOptions & GossipsubOpts; +export type ContentTopic = string; /** * Implements the [Waku v2 Relay protocol](https://rfc.vac.dev/spec/11/). @@ -40,7 +41,7 @@ export type RelayCreateOptions = ProtocolCreateOptions & GossipsubOpts; * @implements {require('libp2p-interfaces/src/pubsub')} */ class Relay extends GossipSub implements IRelay { - private pubSubTopic: string; + private readonly pubSubTopic: string; defaultDecoder: IDecoder; public static multicodec: string = constants.RelayCodecs[0]; @@ -48,7 +49,7 @@ class Relay extends GossipSub implements IRelay { * observers called when receiving new message. * Observers under key `""` are always called. */ - public observers: Map>>; + public observers: Map>; constructor( components: GossipSubComponents, @@ -119,6 +120,38 @@ class Relay extends GossipSub implements IRelay { }; } + private async processIncomingMessage( + bytes: Uint8Array + ): Promise { + const topicOnlyMsg = await this.defaultDecoder.fromWireToProtoObj(bytes); + if (!topicOnlyMsg || !topicOnlyMsg.contentTopic) { + log("Message does not have a content topic, skipping"); + return; + } + + const observers = this.observers.get(topicOnlyMsg.contentTopic) as Set< + Observer + >; + if (!observers) { + return; + } + await Promise.all( + Array.from(observers).map(async ({ decoder, callback }) => { + const protoMsg = await decoder.fromWireToProtoObj(bytes); + if (!protoMsg) { + log("Internal error: message previously decoded failed on 2nd pass."); + return; + } + const msg = await decoder.fromProtoObj(protoMsg); + if (msg) { + callback(msg); + } else { + log("Failed to decode messages on", topicOnlyMsg.contentTopic); + } + }) + ); + } + /** * Subscribe to a pubsub topic and start emitting Waku messages to observers. * @@ -131,36 +164,8 @@ class Relay extends GossipSub implements IRelay { if (event.detail.msg.topic !== pubSubTopic) return; log(`Message received on ${pubSubTopic}`); - const topicOnlyMsg = await this.defaultDecoder.fromWireToProtoObj( - event.detail.msg.data - ); - if (!topicOnlyMsg || !topicOnlyMsg.contentTopic) { - log("Message does not have a content topic, skipping"); - return; - } - - const observers = this.observers.get(topicOnlyMsg.contentTopic); - if (!observers) { - return; - } - await Promise.all( - Array.from(observers).map(async ({ decoder, callback }) => { - const protoMsg = await decoder.fromWireToProtoObj( - event.detail.msg.data - ); - if (!protoMsg) { - log( - "Internal error: message previously decoded failed on 2nd pass." - ); - return; - } - const msg = await decoder.fromProtoObj(protoMsg); - if (msg) { - callback(msg); - } else { - log("Failed to decode messages on", topicOnlyMsg.contentTopic); - } - }) + this.processIncomingMessage(event.detail.msg.data).catch((e) => + log("Failed to process incoming message", e) ); } );