diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts index 5c99f716e6..394e87a0b5 100644 --- a/packages/interfaces/src/waku.ts +++ b/packages/interfaces/src/waku.ts @@ -53,9 +53,22 @@ export type IWakuEventEmitter = TypedEventEmitter; export interface IWaku { libp2p: Libp2p; + + /** + * @deprecated should not be accessed directly, use {@link IWaku.send} and {@link IWaku.subscribe} instead + */ relay?: IRelay; + store?: IStore; + + /** + * @deprecated should not be accessed directly, use {@link IWaku.subscribe} instead + */ filter?: IFilter; + + /** + * @deprecated should not be accessed directly, use {@link IWaku.send} instead + */ lightPush?: ILightPush; /** diff --git a/packages/sdk/src/messaging/index.ts b/packages/sdk/src/messaging/index.ts new file mode 100644 index 0000000000..0035e4eb2f --- /dev/null +++ b/packages/sdk/src/messaging/index.ts @@ -0,0 +1 @@ +export { Messaging } from "./messaging.js"; diff --git a/packages/sdk/src/messaging/messaging.ts b/packages/sdk/src/messaging/messaging.ts new file mode 100644 index 0000000000..1324961d36 --- /dev/null +++ b/packages/sdk/src/messaging/messaging.ts @@ -0,0 +1,87 @@ +import { messageHashStr } from "@waku/core"; +import { + IDecodedMessage, + IEncoder, + IFilter, + ILightPush, + IMessage, + IStore +} from "@waku/interfaces"; + +interface IMessaging { + send(encoder: IEncoder, message: IMessage): Promise; +} + +type MessagingConstructorParams = { + lightPush: ILightPush; + filter: IFilter; + store: IStore; +}; + +export class Messaging implements IMessaging { + public constructor(params: MessagingConstructorParams) {} + + public send(encoder: IEncoder, message: IMessage): Promise { + return Promise.resolve(); + } +} + +class MessageStore { + // const hash: { encoder, message, filterAck, storeAck } + // filterAck(hash) + // storeAck(hash) + // markSent(hash) + // queue(encoder, message) + // getMessagesToSend() + // -> not sent yet (first) + // -> sent more than 2s ago but not acked yet (store or filter) +} + +type ICodec = null; + +interface IAckManager { + start(): void; + stop(): void; + subscribe(codec: ICodec): void; +} + +class FilterAckManager implements IAckManager { + private subscriptions: Set = new Set(); + + public constructor( + private messageStore: MessageStore, + private filter: IFilter + ) {} + + public start(): void {} + + public stop(): void {} + + public async subscribe(codec: ICodec): Promise { + return this.filter.subscribe(codec, this.onMessage.bind(this)); + } + + private async onMessage(message: IDecodedMessage): Promise { + const hash = messageHashStr(message.pubsubTopic, message); + + if (this.messageStore.has(message)) { + this.messageStore.markFilterAck(hash); + } else { + this.messageStore.put(message); + this.messageStore.markFilterAck(hash); + } + } +} + +class StoreAckManager implements IAckManager { + public constructor( + private messageStore: MessageStore, + private store: IStore + ) {} + + public start(): void {} + + public stop(): void {} + + public subscribe(codec: ICodec): void {} +} diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index 38dfec9ffa..a3c36ef384 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -15,6 +15,7 @@ import type { IEncoder, IFilter, ILightPush, + IMessage, IRelay, IRoutingInfo, IStore, @@ -33,6 +34,7 @@ import { createRoutingInfo, Logger } from "@waku/utils"; import { Filter } from "../filter/index.js"; import { HealthIndicator } from "../health_indicator/index.js"; import { LightPush } from "../light_push/index.js"; +import { Messaging } from "../messaging/index.js"; import { PeerManager } from "../peer_manager/index.js"; import { Store } from "../store/index.js"; @@ -64,6 +66,7 @@ export class WakuNode implements IWaku { private readonly connectionManager: ConnectionManager; private readonly peerManager: PeerManager; private readonly healthIndicator: HealthIndicator; + private readonly messaging: Messaging | null = null; public constructor( options: CreateNodeOptions, @@ -126,6 +129,14 @@ export class WakuNode implements IWaku { }); } + if (this.lightPush && this.filter && this.store) { + this.messaging = new Messaging({ + lightPush: this.lightPush, + filter: this.filter, + store: this.store + }); + } + log.info( "Waku node created", peerId, @@ -220,6 +231,7 @@ export class WakuNode implements IWaku { this.peerManager.start(); this.healthIndicator.start(); this.lightPush?.start(); + this.sender?.start(); this._nodeStateLock = false; this._nodeStarted = true; @@ -230,6 +242,7 @@ export class WakuNode implements IWaku { this._nodeStateLock = true; + this.sender?.stop(); this.lightPush?.stop(); this.healthIndicator.stop(); this.peerManager.stop(); @@ -280,6 +293,10 @@ export class WakuNode implements IWaku { }); } + public send(encoder: IEncoder, message: IMessage): Promise { + return this.messaging?.send(encoder, message) ?? Promise.resolve(); + } + private createRoutingInfo( contentTopic?: string, shardId?: number