diff --git a/.size-limit.cjs b/.size-limit.cjs index 6116b79fc7..9c7cd40900 100644 --- a/.size-limit.cjs +++ b/.size-limit.cjs @@ -6,19 +6,24 @@ module.exports = [ }, { name: "Waku default setup", - path: "packages/create/bundle/index.js", - import: - "{ createLightNode, waitForRemotePeer, createEncoder, createDecoder }", + path: ["packages/create/bundle/index.js", "packages/core/bundle/index.js"], + import: { + "packages/create/bundle/index.js": "{ createLightNode }", + "packages/core/bundle/index.js": + "{ waitForRemotePeer, createEncoder, createDecoder }", + }, }, { name: "ECIES encryption", path: "packages/message-encryption/bundle/ecies.js", - import: "{ generatePrivateKey, createEncoder, createDecoder, DecodedMessage }", + import: + "{ generatePrivateKey, createEncoder, createDecoder, DecodedMessage }", }, { name: "Symmetric encryption", path: "packages/message-encryption/bundle/symmetric.js", - import: "{ generateSymmetricKey, createEncoder, createDecoder, DecodedMessage }", + import: + "{ generateSymmetricKey, createEncoder, createDecoder, DecodedMessage }", }, { name: "DNS discovery", @@ -28,16 +33,16 @@ module.exports = [ { name: "Privacy preserving protocols", path: "packages/core/bundle/index.js", - import: "{ WakuRelay }", + import: "{ wakuRelay }", }, { name: "Light protocols", path: "packages/core/bundle/index.js", - import: "{ WakuLightPush, WakuFilter }", + import: "{ wakuLightPush, wakuFilter }", }, { name: "History retrieval protocols", path: "packages/core/bundle/index.js", - import: "{ WakuStore }", + import: "{ wakuStore }", }, ]; diff --git a/package.json b/package.json index aebf347a73..0ae010f263 100644 --- a/package.json +++ b/package.json @@ -133,7 +133,7 @@ "*.ts": [ "eslint --fix" ], - "*.{ts,json,js,md}": [ + "*.{ts,json,js,md,cjs}": [ "prettier --write" ] } diff --git a/packages/core/src/lib/to_proto_message.ts b/packages/core/src/lib/to_proto_message.ts index e9bc99aeaf..e4c4f8c49d 100644 --- a/packages/core/src/lib/to_proto_message.ts +++ b/packages/core/src/lib/to_proto_message.ts @@ -1,7 +1,7 @@ -import { ProtoMessage } from "@waku/interfaces"; +import { IProtoMessage } from "@waku/interfaces"; import { WakuMessage as WakuMessageProto } from "@waku/proto"; -const EmptyMessage: ProtoMessage = { +const EmptyMessage: IProtoMessage = { payload: undefined, contentTopic: undefined, version: undefined, @@ -10,6 +10,6 @@ const EmptyMessage: ProtoMessage = { ephemeral: undefined, }; -export function toProtoMessage(wire: WakuMessageProto): ProtoMessage { +export function toProtoMessage(wire: WakuMessageProto): IProtoMessage { return { ...EmptyMessage, ...wire }; } diff --git a/packages/core/src/lib/waku_filter/index.ts b/packages/core/src/lib/waku_filter/index.ts index 7d7714a659..c8b2ab2b51 100644 --- a/packages/core/src/lib/waku_filter/index.ts +++ b/packages/core/src/lib/waku_filter/index.ts @@ -7,10 +7,10 @@ import type { IncomingStreamData } from "@libp2p/interface-registrar"; import type { Registrar } from "@libp2p/interface-registrar"; import type { Callback, - DecodedMessage, - Decoder, Filter, - Message, + IDecodedMessage, + IDecoder, + IMessage, ProtocolOptions, } from "@waku/interfaces"; import { @@ -69,7 +69,7 @@ class WakuFilter implements Filter { private subscriptions: Map>; private decoders: Map< string, // content topic - Set> + Set> >; constructor(public components: FilterComponents, options?: CreateOptions) { @@ -87,8 +87,8 @@ class WakuFilter implements Filter { * @param opts The FilterSubscriptionOpts used to narrow which messages are returned, and which peer to connect to. * @returns Unsubscribe function that can be used to end the subscription. */ - async subscribe( - decoders: Decoder[], + async subscribe( + decoders: IDecoder[], callback: Callback, opts?: ProtocolOptions ): Promise { @@ -198,7 +198,7 @@ class WakuFilter implements Filter { return; } - let msg: Message | undefined; + let msg: IMessage | undefined; // We don't want to wait for decoding failure, just attempt to decode // all messages and do the call back on the one that works // noinspection ES6MissingAwait @@ -225,8 +225,8 @@ class WakuFilter implements Filter { this.subscriptions.delete(requestId); } - private addDecoders( - decoders: Map>> + private addDecoders( + decoders: Map>> ): void { decoders.forEach((decoders, contentTopic) => { const currDecs = this.decoders.get(contentTopic); @@ -238,8 +238,8 @@ class WakuFilter implements Filter { }); } - private deleteDecoders( - decoders: Map>> + private deleteDecoders( + decoders: Map>> ): void { decoders.forEach((decoders, contentTopic) => { const currDecs = this.decoders.get(contentTopic); diff --git a/packages/core/src/lib/waku_light_push/index.ts b/packages/core/src/lib/waku_light_push/index.ts index 88c166c652..54c8db3afb 100644 --- a/packages/core/src/lib/waku_light_push/index.ts +++ b/packages/core/src/lib/waku_light_push/index.ts @@ -3,9 +3,9 @@ import type { PeerId } from "@libp2p/interface-peer-id"; import type { Peer } from "@libp2p/interface-peer-store"; import type { PeerStore } from "@libp2p/interface-peer-store"; import type { - Encoder, + IEncoder, + IMessage, LightPush, - Message, ProtocolOptions, SendResult, } from "@waku/interfaces"; @@ -59,8 +59,8 @@ class WakuLightPush implements LightPush { } async push( - encoder: Encoder, - message: Message, + encoder: IEncoder, + message: IMessage, opts?: ProtocolOptions ): Promise { const pubSubTopic = opts?.pubSubTopic ? opts.pubSubTopic : this.pubSubTopic; diff --git a/packages/core/src/lib/waku_message/topic_only_message.ts b/packages/core/src/lib/waku_message/topic_only_message.ts index 42c89cc6b5..fadf956700 100644 --- a/packages/core/src/lib/waku_message/topic_only_message.ts +++ b/packages/core/src/lib/waku_message/topic_only_message.ts @@ -1,10 +1,14 @@ -import type { DecodedMessage, Decoder, ProtoMessage } from "@waku/interfaces"; +import type { + IDecodedMessage, + IDecoder, + IProtoMessage, +} from "@waku/interfaces"; import { proto_topic_only_message as proto } from "@waku/proto"; import debug from "debug"; const log = debug("waku:message:topic-only"); -export class TopicOnlyMessage implements DecodedMessage { +export class TopicOnlyMessage implements IDecodedMessage { public payload: undefined; public rateLimitProof: undefined; public timestamp: undefined; @@ -17,10 +21,10 @@ export class TopicOnlyMessage implements DecodedMessage { } } -export class TopicOnlyDecoder implements Decoder { +export class TopicOnlyDecoder implements IDecoder { public contentTopic = ""; - fromWireToProtoObj(bytes: Uint8Array): Promise { + fromWireToProtoObj(bytes: Uint8Array): Promise { const protoMessage = proto.TopicOnlyMessage.decode(bytes); log("Message decoded", protoMessage); return Promise.resolve({ @@ -34,7 +38,7 @@ export class TopicOnlyDecoder implements Decoder { } async fromProtoObj( - proto: ProtoMessage + proto: IProtoMessage ): Promise { return new TopicOnlyMessage(proto); } diff --git a/packages/core/src/lib/waku_message/version_0.ts b/packages/core/src/lib/waku_message/version_0.ts index 469fe0be20..48ea0cb05f 100644 --- a/packages/core/src/lib/waku_message/version_0.ts +++ b/packages/core/src/lib/waku_message/version_0.ts @@ -1,10 +1,10 @@ import type { - DecodedMessage as IDecodedMessage, - Decoder as IDecoder, - Encoder as IEncoder, - Message, - ProtoMessage, - RateLimitProof, + IDecodedMessage, + IDecoder, + IEncoder, + IMessage, + IProtoMessage, + IRateLimitProof, } from "@waku/interfaces"; import { proto_message as proto } from "@waku/proto"; import debug from "debug"; @@ -65,7 +65,7 @@ export class DecodedMessage implements IDecodedMessage { return this.proto.version ?? 0; } - get rateLimitProof(): RateLimitProof | undefined { + get rateLimitProof(): IRateLimitProof | undefined { return this.proto.rateLimitProof; } } @@ -73,11 +73,11 @@ export class DecodedMessage implements IDecodedMessage { export class Encoder implements IEncoder { constructor(public contentTopic: string, public ephemeral: boolean = false) {} - async toWire(message: Message): Promise { + async toWire(message: IMessage): Promise { return proto.WakuMessage.encode(await this.toProtoObj(message)); } - async toProtoObj(message: Message): Promise { + async toProtoObj(message: IMessage): Promise { const timestamp = message.timestamp ?? new Date(); return { @@ -113,7 +113,7 @@ export function createEncoder( export class Decoder implements IDecoder { constructor(public contentTopic: string) {} - fromWireToProtoObj(bytes: Uint8Array): Promise { + fromWireToProtoObj(bytes: Uint8Array): Promise { const protoMessage = proto.WakuMessage.decode(bytes); log("Message decoded", protoMessage); return Promise.resolve({ @@ -126,7 +126,9 @@ export class Decoder implements IDecoder { }); } - async fromProtoObj(proto: ProtoMessage): Promise { + async fromProtoObj( + proto: IProtoMessage + ): Promise { // https://github.com/status-im/js-waku/issues/921 if (proto.version === undefined) { proto.version = 0; diff --git a/packages/core/src/lib/waku_relay/index.ts b/packages/core/src/lib/waku_relay/index.ts index 3e8459e6c4..d3a4861ffe 100644 --- a/packages/core/src/lib/waku_relay/index.ts +++ b/packages/core/src/lib/waku_relay/index.ts @@ -8,13 +8,13 @@ import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types"; import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types"; import type { Callback, - Decoder, - Encoder, - Message, + IDecoder, + IEncoder, + IMessage, Relay, SendResult, } from "@waku/interfaces"; -import { DecodedMessage } from "@waku/interfaces"; +import { IDecodedMessage } from "@waku/interfaces"; import debug from "debug"; import { DefaultPubSubTopic } from "../constants.js"; @@ -25,8 +25,8 @@ import * as constants from "./constants.js"; const log = debug("waku:relay"); -export type Observer = { - decoder: Decoder; +export type Observer = { + decoder: IDecoder; callback: Callback; }; @@ -55,7 +55,7 @@ export type CreateOptions = { */ class WakuRelay extends GossipSub implements Relay { pubSubTopic: string; - defaultDecoder: Decoder; + defaultDecoder: IDecoder; public static multicodec: string = constants.RelayCodecs[0]; /** @@ -99,7 +99,7 @@ class WakuRelay extends GossipSub implements Relay { /** * Send Waku message. */ - public async send(encoder: Encoder, message: Message): Promise { + public async send(encoder: IEncoder, message: IMessage): Promise { const msg = await encoder.toWire(message); if (!msg) { log("Failed to encode message, aborting publish"); @@ -113,8 +113,8 @@ class WakuRelay extends GossipSub implements Relay { * * @returns Function to delete the observer */ - addObserver( - decoder: Decoder, + addObserver( + decoder: IDecoder, callback: Callback ): () => void { const observer = { diff --git a/packages/core/src/lib/waku_store/index.ts b/packages/core/src/lib/waku_store/index.ts index 6d9e7545c1..78c890164d 100644 --- a/packages/core/src/lib/waku_store/index.ts +++ b/packages/core/src/lib/waku_store/index.ts @@ -6,8 +6,8 @@ import { sha256 } from "@noble/hashes/sha256"; import { concat, utf8ToBytes } from "@waku/byte-utils"; import { Cursor, - DecodedMessage, - Decoder, + IDecodedMessage, + IDecoder, Index, Store, } from "@waku/interfaces"; @@ -128,8 +128,8 @@ class WakuStore implements Store { * or if an error is encountered when processing the reply, * or if two decoders with the same content topic are passed. */ - async queryOrderedCallback( - decoders: Decoder[], + async queryOrderedCallback( + decoders: IDecoder[], callback: (message: T) => Promise | boolean | void, options?: QueryOptions ): Promise { @@ -177,8 +177,8 @@ class WakuStore implements Store { * or if an error is encountered when processing the reply, * or if two decoders with the same content topic are passed. */ - async queryCallbackOnPromise( - decoders: Decoder[], + async queryCallbackOnPromise( + decoders: IDecoder[], callback: ( message: Promise ) => Promise | boolean | void, @@ -215,8 +215,8 @@ class WakuStore implements Store { * or if an error is encountered when processing the reply, * or if two decoders with the same content topic are passed. */ - async *queryGenerator( - decoders: Decoder[], + async *queryGenerator( + decoders: IDecoder[], options?: QueryOptions ): AsyncGenerator[]> { let startTime, endTime; @@ -295,11 +295,11 @@ class WakuStore implements Store { } } -async function* paginate( +async function* paginate( connection: Connection, protocol: string, queryOpts: Params, - decoders: Map>, + decoders: Map>, cursor?: Cursor ): AsyncGenerator[]> { if ( @@ -404,7 +404,7 @@ export function isDefined(msg: T | undefined): msg is T { } export async function createCursor( - message: DecodedMessage, + message: IDecodedMessage, pubsubTopic: string = DefaultPubSubTopic ): Promise { if ( diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts new file mode 100644 index 0000000000..74c8e9c4fd --- /dev/null +++ b/packages/interfaces/src/filter.ts @@ -0,0 +1,14 @@ +import type { IDecodedMessage, IDecoder } from "./message.js"; +import type { + Callback, + PointToPointProtocol, + ProtocolOptions, +} from "./protocols.js"; + +export interface Filter extends PointToPointProtocol { + subscribe: ( + decoders: IDecoder[], + callback: Callback, + opts?: ProtocolOptions + ) => Promise<() => Promise>; +} diff --git a/packages/interfaces/src/index.ts b/packages/interfaces/src/index.ts index b9598d6f6f..3e9c3f3948 100644 --- a/packages/interfaces/src/index.ts +++ b/packages/interfaces/src/index.ts @@ -1,247 +1,8 @@ -import type { GossipSub } from "@chainsafe/libp2p-gossipsub"; -import type { Stream } from "@libp2p/interface-connection"; -import type { ConnectionManager } from "@libp2p/interface-connection-manager"; -import type { PeerId } from "@libp2p/interface-peer-id"; -import type { Peer } from "@libp2p/interface-peer-store"; -import type { PeerStore } from "@libp2p/interface-peer-store"; -import type { Registrar } from "@libp2p/interface-registrar"; -import type { Multiaddr } from "@multiformats/multiaddr"; -import { ENR } from "@waku/enr"; -import type { Libp2p } from "libp2p"; - -export enum Protocols { - Relay = "relay", - Store = "store", - LightPush = "lightpush", - Filter = "filter", - PeerExchange = "peer-exchange", -} - -export interface PointToPointProtocol { - peerStore: PeerStore; - peers: () => Promise; -} -export interface Index { - digest?: Uint8Array; - receivedTime?: bigint; - senderTime?: bigint; - pubsubTopic?: string; -} - -export type ProtocolOptions = { - pubSubTopic?: string; - /** - * Optionally specify an PeerId for the protocol request. If not included, will use a random peer. - */ - peerId?: PeerId; -}; - -export type Callback = (msg: T) => void | Promise; - -export interface Filter extends PointToPointProtocol { - subscribe: ( - decoders: Decoder[], - callback: Callback, - opts?: ProtocolOptions - ) => Promise<() => Promise>; -} - -export interface LightPush extends PointToPointProtocol { - push: ( - encoder: Encoder, - message: Message, - opts?: ProtocolOptions - ) => Promise; -} - -export interface PeerExchange extends PointToPointProtocol { - query( - params: PeerExchangeQueryParams, - callback: (response: PeerExchangeResponse) => Promise | void - ): Promise; -} - -export interface PeerExchangeQueryParams { - numPeers: number; -} - -export interface PeerExchangeResponse { - peerInfos: PeerInfo[]; -} - -export interface PeerInfo { - ENR?: ENR; -} - -export enum PageDirection { - BACKWARD = "backward", - FORWARD = "forward", -} - -export interface TimeFilter { - startTime: Date; - endTime: Date; -} - -export interface PeerExchangeComponents { - connectionManager: ConnectionManager; - peerStore: PeerStore; - registrar: Registrar; -} -export type Cursor = { - digest?: Uint8Array; - senderTime?: bigint; - pubsubTopic?: string; -}; - -export type StoreQueryOptions = { - /** - * The direction in which pages are retrieved: - * - { @link PageDirection.BACKWARD }: Most recent page first. - * - { @link PageDirection.FORWARD }: Oldest page first. - * - * Note: This does not affect the ordering of messages with the page - * (the oldest message is always first). - * - * @default { @link PageDirection.BACKWARD } - */ - pageDirection?: PageDirection; - /** - * The number of message per page. - */ - pageSize?: number; - /** - * Retrieve messages with a timestamp within the provided values. - */ - timeFilter?: TimeFilter; - /** - * Cursor as an index to start a query from. - */ - cursor?: Cursor; -} & ProtocolOptions; - -export interface Store extends PointToPointProtocol { - queryOrderedCallback: ( - decoders: Decoder[], - callback: (message: T) => Promise | boolean | void, - options?: StoreQueryOptions - ) => Promise; - queryCallbackOnPromise: ( - decoders: Decoder[], - callback: ( - message: Promise - ) => Promise | boolean | void, - options?: StoreQueryOptions - ) => Promise; - queryGenerator: ( - decoders: Decoder[], - options?: StoreQueryOptions - ) => AsyncGenerator[]>; -} - -export interface Relay extends GossipSub { - send: (encoder: Encoder, message: Message) => Promise; - addObserver: ( - decoder: Decoder, - callback: Callback - ) => () => void; - getMeshPeers: () => string[]; -} - -export interface Waku { - libp2p: Libp2p; - relay?: Relay; - store?: Store; - filter?: Filter; - lightPush?: LightPush; - peerExchange?: PeerExchange; - - dial(peer: PeerId | Multiaddr, protocols?: Protocols[]): Promise; - - start(): Promise; - - stop(): Promise; - - isStarted(): boolean; -} - -export interface WakuLight extends Waku { - relay: undefined; - store: Store; - filter: Filter; - lightPush: LightPush; - peerExchange: PeerExchange; -} - -export interface WakuPrivacy extends Waku { - relay: Relay; - store: undefined; - filter: undefined; - lightPush: undefined; - peerExchange: undefined; -} - -export interface WakuFull extends Waku { - relay: Relay; - store: Store; - filter: Filter; - lightPush: LightPush; - peerExchange: PeerExchange; -} - -export interface RateLimitProof { - proof: Uint8Array; - merkleRoot: Uint8Array; - epoch: Uint8Array; - shareX: Uint8Array; - shareY: Uint8Array; - nullifier: Uint8Array; - rlnIdentifier: Uint8Array; -} - -/** - * Interface matching the protobuf library. - * Field types matches the protobuf type over the wire - */ -export interface ProtoMessage { - payload: Uint8Array | undefined; - contentTopic: string | undefined; - version: number | undefined; - timestamp: bigint | undefined; - rateLimitProof: RateLimitProof | undefined; - ephemeral: boolean | undefined; -} - -/** - * Interface for messages to encode and send. - */ -export interface Message { - payload?: Uint8Array; - timestamp?: Date; - rateLimitProof?: RateLimitProof; -} - -export interface Encoder { - contentTopic: string; - ephemeral: boolean; - toWire: (message: Message) => Promise; - toProtoObj: (message: Message) => Promise; -} - -export interface DecodedMessage { - payload: Uint8Array | undefined; - contentTopic: string | undefined; - timestamp: Date | undefined; - rateLimitProof: RateLimitProof | undefined; - ephemeral: boolean | undefined; -} - -export interface Decoder { - contentTopic: string; - fromWireToProtoObj: (bytes: Uint8Array) => Promise; - fromProtoObj: (proto: ProtoMessage) => Promise; -} - -export interface SendResult { - recipients: PeerId[]; -} +export * from "./filter.js"; +export * from "./light_push.js"; +export * from "./message.js"; +export * from "./peer_exchange.js"; +export * from "./protocols.js"; +export * from "./relay.js"; +export * from "./store.js"; +export * from "./waku.js"; diff --git a/packages/interfaces/src/light_push.ts b/packages/interfaces/src/light_push.ts new file mode 100644 index 0000000000..2afb674ae9 --- /dev/null +++ b/packages/interfaces/src/light_push.ts @@ -0,0 +1,14 @@ +import type { IEncoder, IMessage } from "./message.js"; +import type { + PointToPointProtocol, + ProtocolOptions, + SendResult, +} from "./protocols.js"; + +export interface LightPush extends PointToPointProtocol { + push: ( + encoder: IEncoder, + message: IMessage, + opts?: ProtocolOptions + ) => Promise; +} diff --git a/packages/interfaces/src/message.ts b/packages/interfaces/src/message.ts new file mode 100644 index 0000000000..ca2770d399 --- /dev/null +++ b/packages/interfaces/src/message.ts @@ -0,0 +1,52 @@ +export interface IRateLimitProof { + proof: Uint8Array; + merkleRoot: Uint8Array; + epoch: Uint8Array; + shareX: Uint8Array; + shareY: Uint8Array; + nullifier: Uint8Array; + rlnIdentifier: Uint8Array; +} + +/** + * Interface matching the protobuf library. + * Field types matches the protobuf type over the wire + */ +export interface IProtoMessage { + payload: Uint8Array | undefined; + contentTopic: string | undefined; + version: number | undefined; + timestamp: bigint | undefined; + rateLimitProof: IRateLimitProof | undefined; + ephemeral: boolean | undefined; +} + +/** + * Interface for messages to encode and send. + */ +export interface IMessage { + payload?: Uint8Array; + timestamp?: Date; + rateLimitProof?: IRateLimitProof; +} + +export interface IEncoder { + contentTopic: string; + ephemeral: boolean; + toWire: (message: IMessage) => Promise; + toProtoObj: (message: IMessage) => Promise; +} + +export interface IDecodedMessage { + payload: Uint8Array | undefined; + contentTopic: string | undefined; + timestamp: Date | undefined; + rateLimitProof: IRateLimitProof | undefined; + ephemeral: boolean | undefined; +} + +export interface IDecoder { + contentTopic: string; + fromWireToProtoObj: (bytes: Uint8Array) => Promise; + fromProtoObj: (proto: IProtoMessage) => Promise; +} diff --git a/packages/interfaces/src/peer_exchange.ts b/packages/interfaces/src/peer_exchange.ts new file mode 100644 index 0000000000..05359e25da --- /dev/null +++ b/packages/interfaces/src/peer_exchange.ts @@ -0,0 +1,31 @@ +import type { ConnectionManager } from "@libp2p/interface-connection-manager"; +import type { PeerStore } from "@libp2p/interface-peer-store"; +import type { Registrar } from "@libp2p/interface-registrar"; +import { ENR } from "@waku/enr"; + +import { PointToPointProtocol } from "./protocols.js"; + +export interface PeerExchange extends PointToPointProtocol { + query( + params: PeerExchangeQueryParams, + callback: (response: PeerExchangeResponse) => Promise | void + ): Promise; +} + +export interface PeerExchangeQueryParams { + numPeers: number; +} + +export interface PeerExchangeResponse { + peerInfos: PeerInfo[]; +} + +export interface PeerInfo { + ENR?: ENR; +} + +export interface PeerExchangeComponents { + connectionManager: ConnectionManager; + peerStore: PeerStore; + registrar: Registrar; +} diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts new file mode 100644 index 0000000000..685c6de1a7 --- /dev/null +++ b/packages/interfaces/src/protocols.ts @@ -0,0 +1,31 @@ +import type { PeerId } from "@libp2p/interface-peer-id"; +import type { Peer, PeerStore } from "@libp2p/interface-peer-store"; + +import type { IMessage } from "./message.js"; + +export enum Protocols { + Relay = "relay", + Store = "store", + LightPush = "lightpush", + Filter = "filter", + PeerExchange = "peer-exchange", +} + +export interface PointToPointProtocol { + peerStore: PeerStore; + peers: () => Promise; +} + +export type ProtocolOptions = { + pubSubTopic?: string; + /** + * Optionally specify an PeerId for the protocol request. If not included, will use a random peer. + */ + peerId?: PeerId; +}; + +export type Callback = (msg: T) => void | Promise; + +export interface SendResult { + recipients: PeerId[]; +} diff --git a/packages/interfaces/src/relay.ts b/packages/interfaces/src/relay.ts new file mode 100644 index 0000000000..76d1a76958 --- /dev/null +++ b/packages/interfaces/src/relay.ts @@ -0,0 +1,18 @@ +import type { GossipSub } from "@chainsafe/libp2p-gossipsub"; + +import type { + IDecodedMessage, + IDecoder, + IEncoder, + IMessage, +} from "./message.js"; +import type { Callback, SendResult } from "./protocols.js"; + +export interface Relay extends GossipSub { + send: (encoder: IEncoder, message: IMessage) => Promise; + addObserver: ( + decoder: IDecoder, + callback: Callback + ) => () => void; + getMeshPeers: () => string[]; +} diff --git a/packages/interfaces/src/store.ts b/packages/interfaces/src/store.ts new file mode 100644 index 0000000000..b7d7b355ce --- /dev/null +++ b/packages/interfaces/src/store.ts @@ -0,0 +1,70 @@ +import type { IDecodedMessage, IDecoder } from "./message.js"; +import type { PointToPointProtocol, ProtocolOptions } from "./protocols.js"; + +export enum PageDirection { + BACKWARD = "backward", + FORWARD = "forward", +} + +export interface TimeFilter { + startTime: Date; + endTime: Date; +} + +export interface Index { + digest?: Uint8Array; + receivedTime?: bigint; + senderTime?: bigint; + pubsubTopic?: string; +} + +export type Cursor = { + digest?: Uint8Array; + senderTime?: bigint; + pubsubTopic?: string; +}; + +export type StoreQueryOptions = { + /** + * The direction in which pages are retrieved: + * - { @link PageDirection.BACKWARD }: Most recent page first. + * - { @link PageDirection.FORWARD }: Oldest page first. + * + * Note: This does not affect the ordering of messages with the page + * (the oldest message is always first). + * + * @default { @link PageDirection.BACKWARD } + */ + pageDirection?: PageDirection; + /** + * The number of message per page. + */ + pageSize?: number; + /** + * Retrieve messages with a timestamp within the provided values. + */ + timeFilter?: TimeFilter; + /** + * Cursor as an index to start a query from. + */ + cursor?: Cursor; +} & ProtocolOptions; + +export interface Store extends PointToPointProtocol { + queryOrderedCallback: ( + decoders: IDecoder[], + callback: (message: T) => Promise | boolean | void, + options?: StoreQueryOptions + ) => Promise; + queryCallbackOnPromise: ( + decoders: IDecoder[], + callback: ( + message: Promise + ) => Promise | boolean | void, + options?: StoreQueryOptions + ) => Promise; + queryGenerator: ( + decoders: IDecoder[], + options?: StoreQueryOptions + ) => AsyncGenerator[]>; +} diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts new file mode 100644 index 0000000000..19e0e429ff --- /dev/null +++ b/packages/interfaces/src/waku.ts @@ -0,0 +1,52 @@ +import type { Stream } from "@libp2p/interface-connection"; +import type { PeerId } from "@libp2p/interface-peer-id"; +import type { Multiaddr } from "@multiformats/multiaddr"; +import type { Libp2p } from "libp2p"; + +import type { Filter } from "./filter.js"; +import type { LightPush } from "./light_push.js"; +import type { PeerExchange } from "./peer_exchange.js"; +import { Protocols } from "./protocols.js"; +import type { Relay } from "./relay.js"; +import type { Store } from "./store.js"; + +export interface Waku { + libp2p: Libp2p; + relay?: Relay; + store?: Store; + filter?: Filter; + lightPush?: LightPush; + peerExchange?: PeerExchange; + + dial(peer: PeerId | Multiaddr, protocols?: Protocols[]): Promise; + + start(): Promise; + + stop(): Promise; + + isStarted(): boolean; +} + +export interface WakuLight extends Waku { + relay: undefined; + store: Store; + filter: Filter; + lightPush: LightPush; + peerExchange: PeerExchange; +} + +export interface WakuPrivacy extends Waku { + relay: Relay; + store: undefined; + filter: undefined; + lightPush: undefined; + peerExchange: undefined; +} + +export interface WakuFull extends Waku { + relay: Relay; + store: Store; + filter: Filter; + lightPush: LightPush; + peerExchange: PeerExchange; +} diff --git a/packages/message-encryption/src/ecies.ts b/packages/message-encryption/src/ecies.ts index 6b73c5f38c..f55b7ca0f0 100644 --- a/packages/message-encryption/src/ecies.ts +++ b/packages/message-encryption/src/ecies.ts @@ -3,10 +3,10 @@ import { proto, } from "@waku/core/lib/waku_message/version_0"; import type { - Decoder as IDecoder, - Encoder as IEncoder, - Message, - ProtoMessage, + IDecoder, + IEncoder, + IMessage, + IProtoMessage, } from "@waku/interfaces"; import debug from "debug"; @@ -37,14 +37,14 @@ class Encoder implements IEncoder { public ephemeral: boolean = false ) {} - async toWire(message: Message): Promise { + async toWire(message: IMessage): Promise { const protoMessage = await this.toProtoObj(message); if (!protoMessage) return; return proto.WakuMessage.encode(protoMessage); } - async toProtoObj(message: Message): Promise { + async toProtoObj(message: IMessage): Promise { const timestamp = message.timestamp ?? new Date(); if (!message.payload) { log("No payload to encrypt, skipping: ", message); @@ -98,7 +98,7 @@ class Decoder extends DecoderV0 implements IDecoder { } async fromProtoObj( - protoMessage: ProtoMessage + protoMessage: IProtoMessage ): Promise { const cipherPayload = protoMessage.payload; diff --git a/packages/message-encryption/src/index.ts b/packages/message-encryption/src/index.ts index 7705df57c3..118e0ade9f 100644 --- a/packages/message-encryption/src/index.ts +++ b/packages/message-encryption/src/index.ts @@ -2,7 +2,7 @@ import { DecodedMessage as DecodedMessageV0, proto, } from "@waku/core/lib/waku_message/version_0"; -import type { DecodedMessage as IDecodedMessage } from "@waku/interfaces"; +import type { IDecodedMessage } from "@waku/interfaces"; import { generatePrivateKey, diff --git a/packages/message-encryption/src/symmetric.ts b/packages/message-encryption/src/symmetric.ts index 794cf47f4f..b177813a5e 100644 --- a/packages/message-encryption/src/symmetric.ts +++ b/packages/message-encryption/src/symmetric.ts @@ -3,10 +3,10 @@ import { proto, } from "@waku/core/lib/waku_message/version_0"; import type { - Decoder as IDecoder, - Encoder as IEncoder, - Message, - ProtoMessage, + IDecoder, + IEncoder, + IMessage, + IProtoMessage, } from "@waku/interfaces"; import debug from "debug"; @@ -36,14 +36,14 @@ class Encoder implements IEncoder { public ephemeral: boolean = false ) {} - async toWire(message: Message): Promise { + async toWire(message: IMessage): Promise { const protoMessage = await this.toProtoObj(message); if (!protoMessage) return; return proto.WakuMessage.encode(protoMessage); } - async toProtoObj(message: Message): Promise { + async toProtoObj(message: IMessage): Promise { const timestamp = message.timestamp ?? new Date(); if (!message.payload) { log("No payload to encrypt, skipping: ", message); @@ -96,7 +96,7 @@ class Decoder extends DecoderV0 implements IDecoder { } async fromProtoObj( - protoMessage: ProtoMessage + protoMessage: IProtoMessage ): Promise { const cipherPayload = protoMessage.payload; diff --git a/packages/tests/tests/filter.node.spec.ts b/packages/tests/tests/filter.node.spec.ts index df2b9eb597..543a122bd4 100644 --- a/packages/tests/tests/filter.node.spec.ts +++ b/packages/tests/tests/filter.node.spec.ts @@ -1,7 +1,12 @@ import { bytesToUtf8, utf8ToBytes } from "@waku/byte-utils"; -import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; +import { + createDecoder, + createEncoder, + DecodedMessage, + waitForRemotePeer, +} from "@waku/core"; import { createLightNode } from "@waku/create"; -import type { DecodedMessage, WakuLight } from "@waku/interfaces"; +import type { WakuLight } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { expect } from "chai"; import debug from "debug"; diff --git a/packages/tests/tests/store.node.spec.ts b/packages/tests/tests/store.node.spec.ts index c7645de550..1f49ec3633 100644 --- a/packages/tests/tests/store.node.spec.ts +++ b/packages/tests/tests/store.node.spec.ts @@ -7,7 +7,7 @@ import { waitForRemotePeer, } from "@waku/core"; import { createLightNode } from "@waku/create"; -import type { DecodedMessage, Message, WakuLight } from "@waku/interfaces"; +import type { IDecodedMessage, IMessage, WakuLight } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { createDecoder as createEciesDecoder, @@ -74,7 +74,7 @@ describe("Waku Store", () => { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - const messages: Message[] = []; + const messages: IMessage[] = []; let promises: Promise[] = []; for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { const _promises = msgPromises.map(async (promise) => { @@ -105,7 +105,7 @@ describe("Waku Store", () => { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - const messages: Message[] = []; + const messages: IMessage[] = []; let promises: Promise[] = []; for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { const _promises = msgPromises.map(async (promise) => { @@ -147,10 +147,10 @@ describe("Waku Store", () => { const query = waku.store.queryGenerator([TestDecoder]); // messages in reversed order (first message at last index) - const messages: DecodedMessage[] = []; + const messages: IDecodedMessage[] = []; for await (const page of query) { for await (const msg of page.reverse()) { - messages.push(msg as DecodedMessage); + messages.push(msg as IDecodedMessage); } } @@ -160,12 +160,12 @@ describe("Waku Store", () => { // create cursor to extract messages after the 3rd index const cursor = await createCursor(messages[cursorIndex]); - const messagesAfterCursor: DecodedMessage[] = []; + const messagesAfterCursor: IDecodedMessage[] = []; for await (const page of waku.store.queryGenerator([TestDecoder], { cursor, })) { for await (const msg of page.reverse()) { - messagesAfterCursor.push(msg as DecodedMessage); + messagesAfterCursor.push(msg as IDecodedMessage); } } @@ -201,7 +201,7 @@ describe("Waku Store", () => { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - const messages: Message[] = []; + const messages: IMessage[] = []; await waku.store.queryCallbackOnPromise( [TestDecoder], async (msgPromise) => { @@ -243,7 +243,7 @@ describe("Waku Store", () => { await waitForRemotePeer(waku, [Protocols.Store]); const desiredMsgs = 14; - const messages: Message[] = []; + const messages: IMessage[] = []; await waku.store.queryCallbackOnPromise( [TestDecoder], async (msgPromise) => { @@ -282,7 +282,7 @@ describe("Waku Store", () => { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - const messages: Message[] = []; + const messages: IMessage[] = []; await waku.store.queryOrderedCallback( [TestDecoder], async (msg) => { @@ -321,7 +321,7 @@ describe("Waku Store", () => { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - let messages: Message[] = []; + let messages: IMessage[] = []; await waku.store.queryOrderedCallback( [TestDecoder], async (msg) => { @@ -413,7 +413,7 @@ describe("Waku Store", () => { await waitForRemotePeer(waku2, [Protocols.Store]); - const messages: DecodedMessage[] = []; + const messages: IDecodedMessage[] = []; log("Retrieve messages from store"); for await (const msgPromises of waku2.store.queryGenerator([ @@ -482,7 +482,7 @@ describe("Waku Store", () => { const nwakuPeerId = await nwaku.getPeerId(); - const firstMessages: Message[] = []; + const firstMessages: IMessage[] = []; await waku.store.queryOrderedCallback( [TestDecoder], (msg) => { @@ -496,7 +496,7 @@ describe("Waku Store", () => { } ); - const bothMessages: Message[] = []; + const bothMessages: IMessage[] = []; await waku.store.queryOrderedCallback( [TestDecoder], async (msg) => { @@ -543,7 +543,7 @@ describe("Waku Store", () => { await waitForRemotePeer(waku, [Protocols.Store]); const desiredMsgs = 14; - const messages: Message[] = []; + const messages: IMessage[] = []; await waku.store.queryOrderedCallback( [TestDecoder], async (msg) => { @@ -601,7 +601,7 @@ describe("Waku Store, custom pubsub topic", () => { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - const messages: Message[] = []; + const messages: IMessage[] = []; let promises: Promise[] = []; for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { const _promises = msgPromises.map(async (promise) => { diff --git a/packages/tests/tests/waku.node.spec.ts b/packages/tests/tests/waku.node.spec.ts index 10f372ae2f..e873107f35 100644 --- a/packages/tests/tests/waku.node.spec.ts +++ b/packages/tests/tests/waku.node.spec.ts @@ -1,14 +1,13 @@ import { bootstrap } from "@libp2p/bootstrap"; import type { PeerId } from "@libp2p/interface-peer-id"; import { bytesToUtf8, utf8ToBytes } from "@waku/byte-utils"; -import { DefaultUserAgent, waitForRemotePeer } from "@waku/core"; -import { createLightNode, createPrivacyNode } from "@waku/create"; -import type { +import { DecodedMessage, - Waku, - WakuLight, - WakuPrivacy, -} from "@waku/interfaces"; + DefaultUserAgent, + waitForRemotePeer, +} from "@waku/core"; +import { createLightNode, createPrivacyNode } from "@waku/create"; +import type { Waku, WakuLight, WakuPrivacy } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { createDecoder,