From 3de906a78a1a1789a355304cd9e4694858838e06 Mon Sep 17 00:00:00 2001 From: Sasha Date: Thu, 25 Sep 2025 00:53:29 +0200 Subject: [PATCH] implement basic entites and structure, decouple into separate files --- packages/sdk/src/messaging/fitler_ack.ts | 44 +++++ packages/sdk/src/messaging/message_store.ts | 101 +++++++++++ packages/sdk/src/messaging/messaging.spec.ts | 170 +++++++++++++++++++ packages/sdk/src/messaging/messaging.ts | 131 +++----------- packages/sdk/src/messaging/store_ack.ts | 58 +++++++ packages/sdk/src/messaging/utils.ts | 10 ++ packages/sdk/src/waku/waku.ts | 4 +- 7 files changed, 413 insertions(+), 105 deletions(-) create mode 100644 packages/sdk/src/messaging/fitler_ack.ts create mode 100644 packages/sdk/src/messaging/message_store.ts create mode 100644 packages/sdk/src/messaging/messaging.spec.ts create mode 100644 packages/sdk/src/messaging/store_ack.ts create mode 100644 packages/sdk/src/messaging/utils.ts diff --git a/packages/sdk/src/messaging/fitler_ack.ts b/packages/sdk/src/messaging/fitler_ack.ts new file mode 100644 index 0000000000..b5ed7562c3 --- /dev/null +++ b/packages/sdk/src/messaging/fitler_ack.ts @@ -0,0 +1,44 @@ +import { IDecodedMessage, IFilter } from "@waku/interfaces"; + +import { MessageStore } from "./message_store.js"; +import { IAckManager, ICodec } from "./utils.js"; + +export class FilterAckManager implements IAckManager { + private codecs: Set = new Set(); + + public constructor( + private messageStore: MessageStore, + private filter: IFilter + ) {} + + public start(): void { + return; + } + + public async stop(): Promise { + const promises = Array.from(this.codecs.entries()).map((codec) => + this.filter.unsubscribe(codec) + ); + await Promise.all(promises); + this.codecs.clear(); + } + + public async subscribe(codec: ICodec): Promise { + const success = await this.filter.subscribe( + codec, + this.onMessage.bind(this) + ); + if (success) { + this.codecs.add(codec); + } + return success; + } + + private async onMessage(message: IDecodedMessage): Promise { + if (!this.messageStore.has(message.hashStr)) { + this.messageStore.add(message); + } + + this.messageStore.markFilterAck(message.hashStr); + } +} diff --git a/packages/sdk/src/messaging/message_store.ts b/packages/sdk/src/messaging/message_store.ts new file mode 100644 index 0000000000..910f5cee60 --- /dev/null +++ b/packages/sdk/src/messaging/message_store.ts @@ -0,0 +1,101 @@ +import { messageHashStr } from "@waku/core"; +import { IDecodedMessage, IEncoder, IMessage } from "@waku/interfaces"; + +type QueuedMessage = { + encoder?: IEncoder; + message?: IMessage; + filterAck: boolean; + storeAck: boolean; + lastSentAt?: number; + createdAt: number; +}; + +type MessageStoreOptions = { + resendIntervalMs?: number; +}; + +export class MessageStore { + private readonly messages: Map = new Map(); + private readonly resendIntervalMs: number; + + public constructor(options: MessageStoreOptions = {}) { + this.resendIntervalMs = options.resendIntervalMs ?? 2000; + } + + public has(hashStr: string): boolean { + return this.messages.has(hashStr); + } + + public add(message: IDecodedMessage): void { + if (!this.messages.has(message.hashStr)) { + this.messages.set(message.hashStr, { + filterAck: false, + storeAck: false, + createdAt: Date.now() + }); + } + } + + public markFilterAck(hashStr: string): void { + const entry = this.messages.get(hashStr); + if (!entry) return; + entry.filterAck = true; + } + + public markStoreAck(hashStr: string): void { + const entry = this.messages.get(hashStr); + if (!entry) return; + entry.storeAck = true; + } + + public markSent(hashStr: string): void { + const entry = this.messages.get(hashStr); + if (!entry) return; + entry.lastSentAt = Date.now(); + } + + public async queue( + encoder: IEncoder, + message: IMessage + ): Promise { + const proto = await encoder.toProtoObj(message); + if (!proto) return undefined; + const hashStr = messageHashStr(encoder.pubsubTopic, proto); + const existing = this.messages.get(hashStr); + if (!existing) { + this.messages.set(hashStr, { + encoder, + message, + filterAck: false, + storeAck: false, + createdAt: Date.now() + }); + } + return hashStr; + } + + public getMessagesToSend(): Array<{ + hashStr: string; + encoder: IEncoder; + message: IMessage; + }> { + const now = Date.now(); + const res: Array<{ + hashStr: string; + encoder: IEncoder; + message: IMessage; + }> = []; + for (const [hashStr, entry] of this.messages.entries()) { + if (!entry.encoder || !entry.message) continue; + const isAcknowledged = entry.filterAck || entry.storeAck; + if (isAcknowledged) continue; + if ( + !entry.lastSentAt || + now - entry.lastSentAt >= this.resendIntervalMs + ) { + res.push({ hashStr, encoder: entry.encoder, message: entry.message }); + } + } + return res; + } +} diff --git a/packages/sdk/src/messaging/messaging.spec.ts b/packages/sdk/src/messaging/messaging.spec.ts new file mode 100644 index 0000000000..209336634c --- /dev/null +++ b/packages/sdk/src/messaging/messaging.spec.ts @@ -0,0 +1,170 @@ +import { createDecoder, createEncoder } from "@waku/core"; +import type { + IDecodedMessage, + IDecoder, + IEncoder, + IFilter, + ILightPush, + IMessage, + IStore +} from "@waku/interfaces"; +import { createRoutingInfo } from "@waku/utils"; +import { utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; +import sinon from "sinon"; + +import { + FilterAckManager, + MessageStore, + Messaging, + StoreAckManager +} from "./messaging.js"; + +const testContentTopic = "/test/1/waku-messaging/utf8"; +const testNetworkconfig = { + clusterId: 0, + numShardsInCluster: 9 +}; +const testRoutingInfo = createRoutingInfo(testNetworkconfig, { + contentTopic: testContentTopic +}); + +describe("MessageStore", () => { + it("queues, marks sent and acks", async () => { + const encoder = createEncoder({ + contentTopic: testContentTopic, + routingInfo: testRoutingInfo + }); + const store = new MessageStore({ resendIntervalMs: 1 }); + const msg: IMessage = { payload: utf8ToBytes("hello") }; + + const hash = await store.queue(encoder as IEncoder, msg); + expect(hash).to.be.a("string"); + if (!hash) return; + expect(store.has(hash)).to.be.true; + store.markSent(hash); + store.markFilterAck(hash); + store.markStoreAck(hash); + + const toSend = store.getMessagesToSend(); + expect(toSend.length).to.eq(0); + }); +}); +describe("FilterAckManager", () => { + it("subscribes and marks filter ack on messages", async () => { + const store = new MessageStore(); + const filter: IFilter = { + multicodec: "filter", + start: sinon.stub().resolves(), + stop: sinon.stub().resolves(), + subscribe: sinon.stub().callsFake(async (_dec, cb: any) => { + const decoder = createDecoder(testContentTopic, testRoutingInfo); + const proto = await decoder.fromProtoObj(decoder.pubsubTopic, { + payload: utf8ToBytes("x"), + contentTopic: testContentTopic, + version: 0, + timestamp: BigInt(Date.now()), + meta: undefined, + rateLimitProof: undefined, + ephemeral: false + } as any); + if (proto) { + await cb({ ...proto, hashStr: "hash" } as IDecodedMessage); + } + return true; + }), + unsubscribe: sinon.stub().resolves(true), + unsubscribeAll: sinon.stub() + } as unknown as IFilter; + + const mgr = new FilterAckManager(store, filter); + const encoder = createEncoder({ + contentTopic: testContentTopic, + routingInfo: testRoutingInfo + }); + + const subscribed = await mgr.subscribe({ + ...encoder, + fromWireToProtoObj: (b: Uint8Array) => + createDecoder(testContentTopic, testRoutingInfo).fromWireToProtoObj(b), + fromProtoObj: (pubsub: string, p: any) => + createDecoder(testContentTopic, testRoutingInfo).fromProtoObj(pubsub, p) + } as unknown as IDecoder & IEncoder); + expect(subscribed).to.be.true; + }); +}); + +describe("StoreAckManager", () => { + it("queries and marks store ack", async () => { + const store = new MessageStore(); + const decoder = createDecoder(testContentTopic, testRoutingInfo); + const d = decoder as IDecoder & IEncoder; + + const mockStore: IStore = { + multicodec: "store", + createCursor: sinon.stub() as any, + queryGenerator: sinon.stub() as any, + queryWithOrderedCallback: sinon + .stub() + .callsFake(async (_decs: any, cb: any) => { + const proto = await decoder.fromProtoObj(decoder.pubsubTopic, { + payload: utf8ToBytes("x"), + contentTopic: testContentTopic, + version: 0, + timestamp: BigInt(Date.now()), + meta: undefined, + rateLimitProof: undefined, + ephemeral: false + } as any); + if (proto) { + await cb({ ...proto, hashStr: "hash2" }); + } + }), + queryWithPromiseCallback: sinon.stub() as any + } as unknown as IStore; + + const mgr = new StoreAckManager(store, mockStore); + await mgr.subscribe(d); + mgr.start(); + await new Promise((r) => setTimeout(r, 5)); + mgr.stop(); + }); +}); + +describe("Messaging", () => { + it("queues and sends via light push, marks sent", async () => { + const encoder = createEncoder({ + contentTopic: testContentTopic, + routingInfo: testRoutingInfo + }); + + const lightPush: ILightPush = { + multicodec: "lightpush", + start: () => {}, + stop: () => {}, + send: sinon.stub().resolves({ successes: [], failures: [] }) as any + } as unknown as ILightPush; + + const filter: IFilter = { + multicodec: "filter", + start: sinon.stub().resolves(), + stop: sinon.stub().resolves(), + subscribe: sinon.stub().resolves(true), + unsubscribe: sinon.stub().resolves(true), + unsubscribeAll: sinon.stub() + } as unknown as IFilter; + + const store: IStore = { + multicodec: "store", + createCursor: sinon.stub() as any, + queryGenerator: sinon.stub() as any, + queryWithOrderedCallback: sinon.stub().resolves(), + queryWithPromiseCallback: sinon.stub().resolves() + } as unknown as IStore; + + const messaging = new Messaging({ lightPush, filter, store }); + + await messaging.send(encoder, { payload: utf8ToBytes("hello") }); + expect((lightPush.send as any).calledOnce).to.be.true; + }); +}); diff --git a/packages/sdk/src/messaging/messaging.ts b/packages/sdk/src/messaging/messaging.ts index f68708311b..a3e058ed07 100644 --- a/packages/sdk/src/messaging/messaging.ts +++ b/packages/sdk/src/messaging/messaging.ts @@ -1,6 +1,4 @@ import { - IDecodedMessage, - IDecoder, IEncoder, IFilter, ILightPush, @@ -8,6 +6,10 @@ import { IStore } from "@waku/interfaces"; +import { FilterAckManager } from "./fitler_ack.js"; +import { MessageStore } from "./message_store.js"; +import { StoreAckManager } from "./store_ack.js"; + interface IMessaging { send(encoder: IEncoder, message: IMessage): Promise; } @@ -19,115 +21,38 @@ type MessagingConstructorParams = { }; export class Messaging implements IMessaging { - public constructor(params: MessagingConstructorParams) {} + private readonly lightPush: ILightPush; + private readonly messageStore: MessageStore; + private readonly filterAckManager: FilterAckManager; + private readonly storeAckManager: StoreAckManager; - public send(encoder: IEncoder, message: IMessage): Promise { - return Promise.resolve(); + public constructor(params: MessagingConstructorParams) { + this.lightPush = params.lightPush; + this.messageStore = new MessageStore(); + this.filterAckManager = new FilterAckManager( + this.messageStore, + params.filter + ); + this.storeAckManager = new StoreAckManager(this.messageStore, params.store); } -} - -class MessageStore { - // const hash: { encoder, message, filterAck, storeAck } - // filterAck(hash) - // storeAck(hash) - // markSent(hash) - // queue(encoder, message) - // getMessagesToSend() - // -> not sent yet (first) - // -> sent more than 2s ago but not acked yet (store or filter) -} - -type ICodec = IEncoder & IDecoder; - -interface IAckManager { - start(): void; - stop(): void; - subscribe(codec: ICodec): void; -} - -class FilterAckManager implements IAckManager { - private codecs: Set = new Set(); - - public constructor( - private messageStore: MessageStore, - private filter: IFilter - ) {} public start(): void { - // noop + this.filterAckManager.start(); + this.storeAckManager.start(); } public async stop(): Promise { - const promises = Array.from(this.codecs.entries()).map((codec) => { - return this.filter.unsubscribe(codec); - }); - - await Promise.all(promises); - this.codecs.clear(); + await this.filterAckManager.stop(); + this.storeAckManager.stop(); } - public async subscribe(codec: ICodec): Promise { - return this.filter.subscribe(codec, this.onMessage.bind(this)); - } - - private async onMessage(message: IDecodedMessage): Promise { - if (!this.messageStore.has(message.hashStr)) { - this.messageStore.add(message); - } - - this.messageStore.markFilterAck(message.hashStr); - } -} - -class StoreAckManager implements IAckManager { - private interval: ReturnType | null = null; - - private codecs: Set = new Set(); - - public constructor( - private messageStore: MessageStore, - private store: IStore - ) {} - - public start(): void { - if (this.interval) { - return; - } - - this.interval = setInterval(() => { - void this.query(); - }, 1000); - } - - public stop(): void { - if (!this.interval) { - return; - } - - clearInterval(this.interval); - this.interval = null; - } - - public subscribe(codec: ICodec): void { - this.codecs.add(codec); - } - - private async query(): Promise { - for (const codec of this.codecs) { - await this.store.queryWithOrderedCallback( - [codec], - (message) => { - if (!this.messageStore.has(message.hashStr)) { - this.messageStore.add(message); - } - - this.messageStore.markStoreAck(message.hashStr); - }, - { - timeStart: new Date(Date.now() - 60 * 60 * 1000), - timeEnd: new Date() - } - ); - } + public send(encoder: IEncoder, message: IMessage): Promise { + return (async () => { + const hash = await this.messageStore.queue(encoder, message); + await this.lightPush.send(encoder, message); + if (hash) { + this.messageStore.markSent(hash); + } + })(); } } diff --git a/packages/sdk/src/messaging/store_ack.ts b/packages/sdk/src/messaging/store_ack.ts new file mode 100644 index 0000000000..94511d1c92 --- /dev/null +++ b/packages/sdk/src/messaging/store_ack.ts @@ -0,0 +1,58 @@ +import { IStore } from "@waku/interfaces"; + +import { MessageStore } from "./message_store.js"; +import { IAckManager, ICodec } from "./utils.js"; + +export class StoreAckManager implements IAckManager { + private interval: ReturnType | null = null; + + private codecs: Set = new Set(); + + public constructor( + private messageStore: MessageStore, + private store: IStore + ) {} + + public start(): void { + if (this.interval) { + return; + } + + this.interval = setInterval(() => { + void this.query(); + }, 1000); + } + + public stop(): void { + if (!this.interval) { + return; + } + + clearInterval(this.interval); + this.interval = null; + } + + public async subscribe(codec: ICodec): Promise { + this.codecs.add(codec); + return true; + } + + private async query(): Promise { + for (const codec of this.codecs) { + await this.store.queryWithOrderedCallback( + [codec], + (message) => { + if (!this.messageStore.has(message.hashStr)) { + this.messageStore.add(message); + } + + this.messageStore.markStoreAck(message.hashStr); + }, + { + timeStart: new Date(Date.now() - 60 * 60 * 1000), + timeEnd: new Date() + } + ); + } + } +} diff --git a/packages/sdk/src/messaging/utils.ts b/packages/sdk/src/messaging/utils.ts new file mode 100644 index 0000000000..db4b5ba758 --- /dev/null +++ b/packages/sdk/src/messaging/utils.ts @@ -0,0 +1,10 @@ +import { IDecodedMessage, IDecoder, IEncoder } from "@waku/interfaces"; + +// TODO: create a local entity for that that will literally extend existing encoder and decoder from package/core +export type ICodec = IEncoder & IDecoder; + +export interface IAckManager { + start(): void; + stop(): void; + subscribe(codec: ICodec): Promise; +} diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index 6fc689d1b1..31098782b5 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -232,7 +232,7 @@ export class WakuNode implements IWaku { this.peerManager.start(); this.healthIndicator.start(); this.lightPush?.start(); - this.sender?.start(); + this.messaging?.start(); this._nodeStateLock = false; this._nodeStarted = true; @@ -243,7 +243,7 @@ export class WakuNode implements IWaku { this._nodeStateLock = true; - this.sender?.stop(); + await this.messaging?.stop(); this.lightPush?.stop(); await this.filter?.stop(); this.healthIndicator.stop();