diff --git a/src/lib/interfaces.ts b/src/lib/interfaces.ts index 4df89f1b47..1b210be585 100644 --- a/src/lib/interfaces.ts +++ b/src/lib/interfaces.ts @@ -70,8 +70,8 @@ export interface Encoder { encodeProto: (message: Message) => Promise; } -export interface Decoder { +export interface Decoder { contentTopic: string; decodeProto: (bytes: Uint8Array) => Promise; - decode: (proto: ProtoMessage) => Promise; + decode: (proto: ProtoMessage) => Promise; } diff --git a/src/lib/waku_filter/index.ts b/src/lib/waku_filter/index.ts index 094c7d7e02..003642a480 100644 --- a/src/lib/waku_filter/index.ts +++ b/src/lib/waku_filter/index.ts @@ -63,9 +63,9 @@ export type UnsubscribeFunction = () => Promise; export class WakuFilter { pubSubTopic: string; private subscriptions: Map; - public decoders: Map< + private decoders: Map< string, // content topic - Set + Set> >; constructor(public libp2p: Libp2p, options?: CreateOptions) { @@ -83,8 +83,8 @@ export class WakuFilter { * @param opts The FilterSubscriptionOpts used to narrow which messages are returned, and which peer to connect to. * @returns Unsubscribe function that can be used to end the subscription. */ - async subscribe( - decoders: Decoder[], + async subscribe( + decoders: Decoder[], callback: FilterCallback, opts?: FilterSubscriptionOpts ): Promise { @@ -217,7 +217,9 @@ export class WakuFilter { this.subscriptions.delete(requestId); } - private addDecoders(decoders: Map>): void { + private addDecoders( + decoders: Map>> + ): void { decoders.forEach((decoders, contentTopic) => { const currDecs = this.decoders.get(contentTopic); if (!currDecs) { @@ -228,7 +230,9 @@ export class WakuFilter { }); } - private deleteDecoders(decoders: Map>): void { + private deleteDecoders( + decoders: Map>> + ): void { decoders.forEach((decoders, contentTopic) => { const currDecs = this.decoders.get(contentTopic); if (currDecs) { diff --git a/src/lib/waku_message/version_0.ts b/src/lib/waku_message/version_0.ts index 8ba75a0713..d316c3ceb1 100644 --- a/src/lib/waku_message/version_0.ts +++ b/src/lib/waku_message/version_0.ts @@ -75,7 +75,7 @@ export class EncoderV0 implements Encoder { } } -export class DecoderV0 implements Decoder { +export class DecoderV0 implements Decoder { constructor(public contentTopic: string) {} decodeProto(bytes: Uint8Array): Promise { @@ -84,7 +84,7 @@ export class DecoderV0 implements Decoder { return Promise.resolve(protoMessage); } - async decode(proto: ProtoMessage): Promise { + async decode(proto: ProtoMessage): Promise { // https://github.com/status-im/js-waku/issues/921 if (proto.version === undefined) { proto.version = 0; diff --git a/src/lib/waku_message/version_1.ts b/src/lib/waku_message/version_1.ts index 99b8e2e1cb..f0af9a7223 100644 --- a/src/lib/waku_message/version_1.ts +++ b/src/lib/waku_message/version_1.ts @@ -110,7 +110,7 @@ export class SymEncoder implements Encoder { } } -export class AsymDecoder extends DecoderV0 implements Decoder { +export class AsymDecoder extends DecoderV0 implements Decoder { constructor(contentTopic: string, private privateKey: Uint8Array) { super(contentTopic); } @@ -166,7 +166,7 @@ export class AsymDecoder extends DecoderV0 implements Decoder { } } -export class SymDecoder extends DecoderV0 implements Decoder { +export class SymDecoder extends DecoderV0 implements Decoder { constructor(contentTopic: string, private symKey: Uint8Array) { super(contentTopic); } diff --git a/src/lib/waku_relay/index.node.spec.ts b/src/lib/waku_relay/index.node.spec.ts index ea3045e58c..7ad8df7591 100644 --- a/src/lib/waku_relay/index.node.spec.ts +++ b/src/lib/waku_relay/index.node.spec.ts @@ -381,9 +381,7 @@ describe("Waku Relay [node only]", () => { const messageText = "Here is another message."; const receivedMsgPromise: Promise = new Promise((resolve) => { - waku.relay.addObserver(TestDecoder, (msg) => - resolve(msg as unknown as MessageV0) - ); + waku.relay.addObserver(TestDecoder, (msg) => resolve(msg)); }); await nwaku.sendMessage( diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index d132e7e17b..913672f879 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -20,7 +20,12 @@ import * as constants from "./constants"; const log = debug("waku:relay"); -export type Callback = (msg: Message) => void; +export type Callback = (msg: T) => void; + +export type Observer = { + decoder: Decoder; + callback: Callback; +}; export type CreateOptions = { /** @@ -53,7 +58,7 @@ export class WakuRelay extends GossipSub { * observers called when receiving new message. * Observers under key `""` are always called. */ - public observers: Map>; + public observers: Map>>; constructor(options?: Partial) { options = Object.assign(options ?? {}, { @@ -101,7 +106,10 @@ export class WakuRelay extends GossipSub { * * @returns Function to delete the observer */ - addObserver(decoder: Decoder, callback: Callback): () => void { + addObserver( + decoder: Decoder, + callback: Callback + ): () => void { const observer = { decoder, callback, diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index 158601fdf3..8874942c45 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -103,8 +103,8 @@ export class WakuStore { * @throws If not able to reach a Waku Store peer to query * or if an error is encountered when processing the reply. */ - async queryOrderedCallback( - decoder: Decoder, + async queryOrderedCallback( + decoder: Decoder, callback: (message: Message) => Promise | boolean | void, options?: QueryOptions ): Promise { @@ -151,8 +151,8 @@ export class WakuStore { * @throws If not able to reach a Waku Store peer to query * or if an error is encountered when processing the reply. */ - async queryCallbackOnPromise( - decoder: Decoder, + async queryCallbackOnPromise( + decoder: Decoder, callback: ( message: Promise ) => Promise | boolean | void, @@ -188,8 +188,8 @@ export class WakuStore { * @throws If not able to reach a Waku Store peer to query * or if an error is encountered when processing the reply. */ - async *queryGenerator( - decoder: Decoder, + async *queryGenerator( + decoder: Decoder, options?: QueryOptions ): AsyncGenerator[]> { let startTime, endTime; @@ -256,11 +256,11 @@ export class WakuStore { } } -async function* paginate( +async function* paginate( connection: Connection, protocol: string, queryOpts: Params, - decoder: Decoder + decoder: Decoder ): AsyncGenerator[]> { let cursor = undefined; while (true) {