diff --git a/packages/core/src/lib/waku_filter/index.ts b/packages/core/src/lib/waku_filter/index.ts index a2f9c3c7d6..e09b9249ea 100644 --- a/packages/core/src/lib/waku_filter/index.ts +++ b/packages/core/src/lib/waku_filter/index.ts @@ -4,6 +4,7 @@ import type { Peer } from "@libp2p/interface-peer-store"; import type { IncomingStreamData } from "@libp2p/interface-registrar"; import type { Callback, + DecodedMessage, Decoder, Filter, Message, @@ -77,7 +78,7 @@ export class WakuFilter implements Filter { * @param opts The FilterSubscriptionOpts used to narrow which messages are returned, and which peer to connect to. * @returns Unsubscribe function that can be used to end the subscription. */ - async subscribe( + async subscribe( decoders: Decoder[], callback: Callback, opts?: ProtocolOptions @@ -211,7 +212,7 @@ export class WakuFilter implements Filter { this.subscriptions.delete(requestId); } - private addDecoders( + private addDecoders( decoders: Map>> ): void { decoders.forEach((decoders, contentTopic) => { @@ -224,7 +225,7 @@ export class WakuFilter implements Filter { }); } - private deleteDecoders( + private deleteDecoders( decoders: Map>> ): void { decoders.forEach((decoders, contentTopic) => { diff --git a/packages/core/src/lib/waku_message/version_0.ts b/packages/core/src/lib/waku_message/version_0.ts index ac3dfd379a..ea2c1a3cbe 100644 --- a/packages/core/src/lib/waku_message/version_0.ts +++ b/packages/core/src/lib/waku_message/version_0.ts @@ -1,4 +1,5 @@ import type { + DecodedMessage, Decoder, Encoder, Message, @@ -15,7 +16,7 @@ const OneMillion = BigInt(1_000_000); export const Version = 0; export { proto }; -export class MessageV0 implements Message { +export class MessageV0 implements DecodedMessage { constructor(protected proto: proto.WakuMessage) {} get _rawPayload(): Uint8Array | undefined { @@ -79,7 +80,7 @@ export class EncoderV0 implements Encoder { return { payload: message.payload, version: Version, - contentTopic: message.contentTopic ?? this.contentTopic, + contentTopic: this.contentTopic, timestamp: BigInt(timestamp.valueOf()) * OneMillion, rateLimitProof: message.rateLimitProof, }; diff --git a/packages/core/src/lib/waku_relay/index.ts b/packages/core/src/lib/waku_relay/index.ts index 420bf10cbe..eb83314c2e 100644 --- a/packages/core/src/lib/waku_relay/index.ts +++ b/packages/core/src/lib/waku_relay/index.ts @@ -16,6 +16,7 @@ import type { Relay, SendResult, } from "@waku/interfaces"; +import { DecodedMessage } from "@waku/interfaces"; import debug from "debug"; import { DefaultPubSubTopic } from "../constants"; @@ -26,7 +27,7 @@ import * as constants from "./constants"; const log = debug("waku:relay"); -export type Observer = { +export type Observer = { decoder: Decoder; callback: Callback; }; @@ -56,7 +57,7 @@ export type CreateOptions = { */ export class WakuRelay extends GossipSub implements Relay { pubSubTopic: string; - defaultDecoder: Decoder; + defaultDecoder: Decoder; public static multicodec: string = constants.RelayCodecs[0]; /** @@ -114,7 +115,7 @@ export class WakuRelay extends GossipSub implements Relay { * * @returns Function to delete the observer */ - addObserver( + addObserver( decoder: Decoder, callback: Callback ): () => void { diff --git a/packages/core/src/lib/waku_store/index.ts b/packages/core/src/lib/waku_store/index.ts index 9f02bec8dd..6bef492059 100644 --- a/packages/core/src/lib/waku_store/index.ts +++ b/packages/core/src/lib/waku_store/index.ts @@ -1,7 +1,7 @@ import type { Connection } from "@libp2p/interface-connection"; import type { PeerId } from "@libp2p/interface-peer-id"; import { Peer } from "@libp2p/interface-peer-store"; -import { Decoder, Message } from "@waku/interfaces"; +import { DecodedMessage, Decoder } from "@waku/interfaces"; import debug from "debug"; import all from "it-all"; import * as lp from "it-length-prefixed"; @@ -106,7 +106,7 @@ export class WakuStore { * or if an error is encountered when processing the reply, * or if two decoders with the same content topic are passed. */ - async queryOrderedCallback( + async queryOrderedCallback( decoders: Decoder[], callback: (message: T) => Promise | boolean | void, options?: QueryOptions @@ -155,7 +155,7 @@ export class WakuStore { * or if an error is encountered when processing the reply, * or if two decoders with the same content topic are passed. */ - async queryCallbackOnPromise( + async queryCallbackOnPromise( decoders: Decoder[], callback: ( message: Promise @@ -193,7 +193,7 @@ export class WakuStore { * or if an error is encountered when processing the reply, * or if two decoders with the same content topic are passed. */ - async *queryGenerator( + async *queryGenerator( decoders: Decoder[], options?: QueryOptions ): AsyncGenerator[]> { @@ -266,7 +266,7 @@ export class WakuStore { } } -async function* paginate( +async function* paginate( connection: Connection, protocol: string, queryOpts: Params, diff --git a/packages/interfaces/src/index.ts b/packages/interfaces/src/index.ts index f95555a6aa..5551483137 100644 --- a/packages/interfaces/src/index.ts +++ b/packages/interfaces/src/index.ts @@ -28,7 +28,7 @@ export type ProtocolOptions = { export type Callback = (msg: T) => void | Promise; export interface Filter extends PointToPointProtocol { - subscribe: ( + subscribe: ( decoders: Decoder[], callback: Callback, opts?: ProtocolOptions @@ -38,7 +38,7 @@ export interface Filter extends PointToPointProtocol { export interface LightPush extends PointToPointProtocol { push: ( encoder: Encoder, - message: Partial, + message: Message, opts?: ProtocolOptions ) => Promise; } @@ -76,27 +76,27 @@ export type StoreQueryOptions = { } & ProtocolOptions; export interface Store extends PointToPointProtocol { - queryOrderedCallback: ( + queryOrderedCallback: ( decoders: Decoder[], callback: (message: T) => Promise | boolean | void, options?: StoreQueryOptions ) => Promise; - queryCallbackOnPromise: ( + queryCallbackOnPromise: ( decoders: Decoder[], callback: ( message: Promise ) => Promise | boolean | void, options?: StoreQueryOptions ) => Promise; - queryGenerator: ( + queryGenerator: ( decoders: Decoder[], options?: StoreQueryOptions ) => AsyncGenerator[]>; } export interface Relay extends GossipSub { - send: (encoder: Encoder, message: Partial) => Promise; - addObserver: ( + send: (encoder: Encoder, message: Message) => Promise; + addObserver: ( decoder: Decoder, callback: Callback ) => () => void; @@ -155,6 +155,10 @@ export interface RateLimitProof { rlnIdentifier: Uint8Array; } +/** + * Interface matching the protobuf library. + * Field types matches the protobuf type over the wire + */ export interface ProtoMessage { payload: Uint8Array | undefined; contentTopic: string | undefined; @@ -163,20 +167,29 @@ export interface ProtoMessage { rateLimitProof: RateLimitProof | undefined; } +/** + * Interface for messages to encode and send. + */ export interface Message { + payload?: Uint8Array; + timestamp?: Date; + rateLimitProof?: RateLimitProof; +} + +export interface Encoder { + contentTopic: string; + toWire: (message: Message) => Promise; + toProtoObj: (message: Message) => Promise; +} + +export interface DecodedMessage { payload: Uint8Array | undefined; contentTopic: string | undefined; timestamp: Date | undefined; rateLimitProof: RateLimitProof | undefined; } -export interface Encoder { - contentTopic: string; - toWire: (message: Partial) => Promise; - toProtoObj: (message: Partial) => Promise; -} - -export interface Decoder { +export interface Decoder { contentTopic: string; fromWireToProtoObj: (bytes: Uint8Array) => Promise; fromProtoObj: (proto: ProtoMessage) => Promise; diff --git a/packages/message-encryption/src/index.ts b/packages/message-encryption/src/index.ts index feb42d6115..ec84603f72 100644 --- a/packages/message-encryption/src/index.ts +++ b/packages/message-encryption/src/index.ts @@ -5,7 +5,13 @@ import { MessageV0, proto, } from "@waku/core/lib/waku_message/version_0"; -import type { Decoder, Encoder, Message, ProtoMessage } from "@waku/interfaces"; +import type { + DecodedMessage, + Decoder, + Encoder, + Message, + ProtoMessage, +} from "@waku/interfaces"; import debug from "debug"; import { Symmetric } from "./constants.js"; @@ -38,7 +44,7 @@ export type Signature = { publicKey: Uint8Array | undefined; }; -export class MessageV1 extends MessageV0 implements Message { +export class MessageV1 extends MessageV0 implements DecodedMessage { private readonly _decodedPayload: Uint8Array; constructor( diff --git a/packages/tests/tests/filter.node.spec.ts b/packages/tests/tests/filter.node.spec.ts index b5298d59de..01e29b6ce6 100644 --- a/packages/tests/tests/filter.node.spec.ts +++ b/packages/tests/tests/filter.node.spec.ts @@ -2,7 +2,7 @@ import { bytesToUtf8, utf8ToBytes } from "@waku/byte-utils"; import { waitForRemotePeer } from "@waku/core/lib/wait_for_remote_peer"; import { DecoderV0, EncoderV0 } from "@waku/core/lib/waku_message/version_0"; import { createFullNode } from "@waku/create"; -import type { Message, WakuFull } from "@waku/interfaces"; +import type { DecodedMessage, WakuFull } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { expect } from "chai"; import debug from "debug"; @@ -47,7 +47,7 @@ describe("Waku Filter", () => { const messageText = "Filtering works!"; const message = { payload: utf8ToBytes(messageText) }; - const callback = (msg: Message): void => { + const callback = (msg: DecodedMessage): void => { log("Got a message"); messageCount++; expect(msg.contentTopic).to.eq(TestContentTopic); @@ -71,7 +71,7 @@ describe("Waku Filter", () => { this.timeout(10000); let messageCount = 0; - const callback = (msg: Message): void => { + const callback = (msg: DecodedMessage): void => { messageCount++; expect(msg.contentTopic).to.eq(TestContentTopic); }; diff --git a/packages/tests/tests/relay.node.spec.ts b/packages/tests/tests/relay.node.spec.ts index 2ed42b7e13..babd43d204 100644 --- a/packages/tests/tests/relay.node.spec.ts +++ b/packages/tests/tests/relay.node.spec.ts @@ -8,7 +8,7 @@ import { MessageV0, } from "@waku/core/lib/waku_message/version_0"; import { createPrivacyNode } from "@waku/create"; -import type { Message, WakuPrivacy } from "@waku/interfaces"; +import type { DecodedMessage, WakuPrivacy } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { AsymDecoder, @@ -118,9 +118,11 @@ describe("Waku Relay [node only]", () => { timestamp: messageTimestamp, }; - const receivedMsgPromise: Promise = new Promise((resolve) => { - waku2.relay.addObserver(TestDecoder, resolve); - }); + const receivedMsgPromise: Promise = new Promise( + (resolve) => { + waku2.relay.addObserver(TestDecoder, resolve); + } + ); await waku1.relay.send(TestEncoder, message); @@ -148,12 +150,12 @@ describe("Waku Relay [node only]", () => { const fooDecoder = new DecoderV0(fooContentTopic); const barDecoder = new DecoderV0(barContentTopic); - const fooMessages: Message[] = []; + const fooMessages: DecodedMessage[] = []; waku2.relay.addObserver(fooDecoder, (msg) => { fooMessages.push(msg); }); - const barMessages: Message[] = []; + const barMessages: DecodedMessage[] = []; waku2.relay.addObserver(barDecoder, (msg) => { barMessages.push(msg); }); @@ -197,7 +199,7 @@ describe("Waku Relay [node only]", () => { const asymDecoder = new AsymDecoder(asymTopic, privateKey); const symDecoder = new SymDecoder(symTopic, symKey); - const msgs: Message[] = []; + const msgs: DecodedMessage[] = []; waku2.relay.addObserver(asymDecoder, (wakuMsg) => { msgs.push(wakuMsg); }); @@ -228,7 +230,7 @@ describe("Waku Relay [node only]", () => { const contentTopic = "added-then-deleted-observer"; // The promise **fails** if we receive a message on this observer. - const receivedMsgPromise: Promise = new Promise( + const receivedMsgPromise: Promise = new Promise( (resolve, reject) => { const deleteObserver = waku2.relay.addObserver( new DecoderV0(contentTopic), @@ -304,7 +306,7 @@ describe("Waku Relay [node only]", () => { const messageText = "Communicating using a custom pubsub topic"; - const waku2ReceivedMsgPromise: Promise = new Promise( + const waku2ReceivedMsgPromise: Promise = new Promise( (resolve) => { waku2.relay.addObserver(TestDecoder, resolve); } @@ -312,7 +314,7 @@ describe("Waku Relay [node only]", () => { // The promise **fails** if we receive a message on the default // pubsub topic. - const waku3NoMsgPromise: Promise = new Promise( + const waku3NoMsgPromise: Promise = new Promise( (resolve, reject) => { waku3.relay.addObserver(TestDecoder, reject); setTimeout(resolve, 1000); @@ -466,7 +468,7 @@ describe("Waku Relay [node only]", () => { const msgStr = "Hello there!"; const message = { payload: utf8ToBytes(msgStr) }; - const waku2ReceivedMsgPromise: Promise = new Promise( + const waku2ReceivedMsgPromise: Promise = new Promise( (resolve) => { waku2.relay.addObserver(TestDecoder, resolve); } diff --git a/packages/tests/tests/waku.node.spec.ts b/packages/tests/tests/waku.node.spec.ts index 9c3965c6fb..3fb2ea024e 100644 --- a/packages/tests/tests/waku.node.spec.ts +++ b/packages/tests/tests/waku.node.spec.ts @@ -3,7 +3,12 @@ import { bytesToUtf8, utf8ToBytes } from "@waku/byte-utils"; import { PeerDiscoveryStaticPeers } from "@waku/core/lib/peer_discovery_static_list"; import { waitForRemotePeer } from "@waku/core/lib/wait_for_remote_peer"; import { createLightNode, createPrivacyNode } from "@waku/create"; -import type { Message, Waku, WakuLight, WakuPrivacy } from "@waku/interfaces"; +import type { + DecodedMessage, + Waku, + WakuLight, + WakuPrivacy, +} from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { generateSymmetricKey, @@ -175,9 +180,11 @@ describe("Decryption Keys", () => { timestamp: messageTimestamp, }; - const receivedMsgPromise: Promise = new Promise((resolve) => { - waku2.relay.addObserver(decoder, resolve); - }); + const receivedMsgPromise: Promise = new Promise( + (resolve) => { + waku2.relay.addObserver(decoder, resolve); + } + ); await waku1.relay.send(encoder, message);