implement basic entites and structure, decouple into separate files

This commit is contained in:
Sasha 2025-09-25 00:53:29 +02:00
parent e54645aeba
commit 3de906a78a
No known key found for this signature in database
7 changed files with 413 additions and 105 deletions

View File

@ -0,0 +1,44 @@
import { IDecodedMessage, IFilter } from "@waku/interfaces";
import { MessageStore } from "./message_store.js";
import { IAckManager, ICodec } from "./utils.js";
export class FilterAckManager implements IAckManager {
private codecs: Set<ICodec> = new Set();
public constructor(
private messageStore: MessageStore,
private filter: IFilter
) {}
public start(): void {
return;
}
public async stop(): Promise<void> {
const promises = Array.from(this.codecs.entries()).map((codec) =>
this.filter.unsubscribe(codec)
);
await Promise.all(promises);
this.codecs.clear();
}
public async subscribe(codec: ICodec): Promise<boolean> {
const success = await this.filter.subscribe(
codec,
this.onMessage.bind(this)
);
if (success) {
this.codecs.add(codec);
}
return success;
}
private async onMessage(message: IDecodedMessage): Promise<void> {
if (!this.messageStore.has(message.hashStr)) {
this.messageStore.add(message);
}
this.messageStore.markFilterAck(message.hashStr);
}
}

View File

@ -0,0 +1,101 @@
import { messageHashStr } from "@waku/core";
import { IDecodedMessage, IEncoder, IMessage } from "@waku/interfaces";
type QueuedMessage = {
encoder?: IEncoder;
message?: IMessage;
filterAck: boolean;
storeAck: boolean;
lastSentAt?: number;
createdAt: number;
};
type MessageStoreOptions = {
resendIntervalMs?: number;
};
export class MessageStore {
private readonly messages: Map<string, QueuedMessage> = new Map();
private readonly resendIntervalMs: number;
public constructor(options: MessageStoreOptions = {}) {
this.resendIntervalMs = options.resendIntervalMs ?? 2000;
}
public has(hashStr: string): boolean {
return this.messages.has(hashStr);
}
public add(message: IDecodedMessage): void {
if (!this.messages.has(message.hashStr)) {
this.messages.set(message.hashStr, {
filterAck: false,
storeAck: false,
createdAt: Date.now()
});
}
}
public markFilterAck(hashStr: string): void {
const entry = this.messages.get(hashStr);
if (!entry) return;
entry.filterAck = true;
}
public markStoreAck(hashStr: string): void {
const entry = this.messages.get(hashStr);
if (!entry) return;
entry.storeAck = true;
}
public markSent(hashStr: string): void {
const entry = this.messages.get(hashStr);
if (!entry) return;
entry.lastSentAt = Date.now();
}
public async queue(
encoder: IEncoder,
message: IMessage
): Promise<string | undefined> {
const proto = await encoder.toProtoObj(message);
if (!proto) return undefined;
const hashStr = messageHashStr(encoder.pubsubTopic, proto);
const existing = this.messages.get(hashStr);
if (!existing) {
this.messages.set(hashStr, {
encoder,
message,
filterAck: false,
storeAck: false,
createdAt: Date.now()
});
}
return hashStr;
}
public getMessagesToSend(): Array<{
hashStr: string;
encoder: IEncoder;
message: IMessage;
}> {
const now = Date.now();
const res: Array<{
hashStr: string;
encoder: IEncoder;
message: IMessage;
}> = [];
for (const [hashStr, entry] of this.messages.entries()) {
if (!entry.encoder || !entry.message) continue;
const isAcknowledged = entry.filterAck || entry.storeAck;
if (isAcknowledged) continue;
if (
!entry.lastSentAt ||
now - entry.lastSentAt >= this.resendIntervalMs
) {
res.push({ hashStr, encoder: entry.encoder, message: entry.message });
}
}
return res;
}
}

View File

@ -0,0 +1,170 @@
import { createDecoder, createEncoder } from "@waku/core";
import type {
IDecodedMessage,
IDecoder,
IEncoder,
IFilter,
ILightPush,
IMessage,
IStore
} from "@waku/interfaces";
import { createRoutingInfo } from "@waku/utils";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import sinon from "sinon";
import {
FilterAckManager,
MessageStore,
Messaging,
StoreAckManager
} from "./messaging.js";
const testContentTopic = "/test/1/waku-messaging/utf8";
const testNetworkconfig = {
clusterId: 0,
numShardsInCluster: 9
};
const testRoutingInfo = createRoutingInfo(testNetworkconfig, {
contentTopic: testContentTopic
});
describe("MessageStore", () => {
it("queues, marks sent and acks", async () => {
const encoder = createEncoder({
contentTopic: testContentTopic,
routingInfo: testRoutingInfo
});
const store = new MessageStore({ resendIntervalMs: 1 });
const msg: IMessage = { payload: utf8ToBytes("hello") };
const hash = await store.queue(encoder as IEncoder, msg);
expect(hash).to.be.a("string");
if (!hash) return;
expect(store.has(hash)).to.be.true;
store.markSent(hash);
store.markFilterAck(hash);
store.markStoreAck(hash);
const toSend = store.getMessagesToSend();
expect(toSend.length).to.eq(0);
});
});
describe("FilterAckManager", () => {
it("subscribes and marks filter ack on messages", async () => {
const store = new MessageStore();
const filter: IFilter = {
multicodec: "filter",
start: sinon.stub().resolves(),
stop: sinon.stub().resolves(),
subscribe: sinon.stub().callsFake(async (_dec, cb: any) => {
const decoder = createDecoder(testContentTopic, testRoutingInfo);
const proto = await decoder.fromProtoObj(decoder.pubsubTopic, {
payload: utf8ToBytes("x"),
contentTopic: testContentTopic,
version: 0,
timestamp: BigInt(Date.now()),
meta: undefined,
rateLimitProof: undefined,
ephemeral: false
} as any);
if (proto) {
await cb({ ...proto, hashStr: "hash" } as IDecodedMessage);
}
return true;
}),
unsubscribe: sinon.stub().resolves(true),
unsubscribeAll: sinon.stub()
} as unknown as IFilter;
const mgr = new FilterAckManager(store, filter);
const encoder = createEncoder({
contentTopic: testContentTopic,
routingInfo: testRoutingInfo
});
const subscribed = await mgr.subscribe({
...encoder,
fromWireToProtoObj: (b: Uint8Array) =>
createDecoder(testContentTopic, testRoutingInfo).fromWireToProtoObj(b),
fromProtoObj: (pubsub: string, p: any) =>
createDecoder(testContentTopic, testRoutingInfo).fromProtoObj(pubsub, p)
} as unknown as IDecoder<IDecodedMessage> & IEncoder);
expect(subscribed).to.be.true;
});
});
describe("StoreAckManager", () => {
it("queries and marks store ack", async () => {
const store = new MessageStore();
const decoder = createDecoder(testContentTopic, testRoutingInfo);
const d = decoder as IDecoder<IDecodedMessage> & IEncoder;
const mockStore: IStore = {
multicodec: "store",
createCursor: sinon.stub() as any,
queryGenerator: sinon.stub() as any,
queryWithOrderedCallback: sinon
.stub()
.callsFake(async (_decs: any, cb: any) => {
const proto = await decoder.fromProtoObj(decoder.pubsubTopic, {
payload: utf8ToBytes("x"),
contentTopic: testContentTopic,
version: 0,
timestamp: BigInt(Date.now()),
meta: undefined,
rateLimitProof: undefined,
ephemeral: false
} as any);
if (proto) {
await cb({ ...proto, hashStr: "hash2" });
}
}),
queryWithPromiseCallback: sinon.stub() as any
} as unknown as IStore;
const mgr = new StoreAckManager(store, mockStore);
await mgr.subscribe(d);
mgr.start();
await new Promise((r) => setTimeout(r, 5));
mgr.stop();
});
});
describe("Messaging", () => {
it("queues and sends via light push, marks sent", async () => {
const encoder = createEncoder({
contentTopic: testContentTopic,
routingInfo: testRoutingInfo
});
const lightPush: ILightPush = {
multicodec: "lightpush",
start: () => {},
stop: () => {},
send: sinon.stub().resolves({ successes: [], failures: [] }) as any
} as unknown as ILightPush;
const filter: IFilter = {
multicodec: "filter",
start: sinon.stub().resolves(),
stop: sinon.stub().resolves(),
subscribe: sinon.stub().resolves(true),
unsubscribe: sinon.stub().resolves(true),
unsubscribeAll: sinon.stub()
} as unknown as IFilter;
const store: IStore = {
multicodec: "store",
createCursor: sinon.stub() as any,
queryGenerator: sinon.stub() as any,
queryWithOrderedCallback: sinon.stub().resolves(),
queryWithPromiseCallback: sinon.stub().resolves()
} as unknown as IStore;
const messaging = new Messaging({ lightPush, filter, store });
await messaging.send(encoder, { payload: utf8ToBytes("hello") });
expect((lightPush.send as any).calledOnce).to.be.true;
});
});

View File

@ -1,6 +1,4 @@
import {
IDecodedMessage,
IDecoder,
IEncoder,
IFilter,
ILightPush,
@ -8,6 +6,10 @@ import {
IStore
} from "@waku/interfaces";
import { FilterAckManager } from "./fitler_ack.js";
import { MessageStore } from "./message_store.js";
import { StoreAckManager } from "./store_ack.js";
interface IMessaging {
send(encoder: IEncoder, message: IMessage): Promise<void>;
}
@ -19,115 +21,38 @@ type MessagingConstructorParams = {
};
export class Messaging implements IMessaging {
public constructor(params: MessagingConstructorParams) {}
private readonly lightPush: ILightPush;
private readonly messageStore: MessageStore;
private readonly filterAckManager: FilterAckManager;
private readonly storeAckManager: StoreAckManager;
public send(encoder: IEncoder, message: IMessage): Promise<void> {
return Promise.resolve();
public constructor(params: MessagingConstructorParams) {
this.lightPush = params.lightPush;
this.messageStore = new MessageStore();
this.filterAckManager = new FilterAckManager(
this.messageStore,
params.filter
);
this.storeAckManager = new StoreAckManager(this.messageStore, params.store);
}
}
class MessageStore {
// const hash: { encoder, message, filterAck, storeAck }
// filterAck(hash)
// storeAck(hash)
// markSent(hash)
// queue(encoder, message)
// getMessagesToSend()
// -> not sent yet (first)
// -> sent more than 2s ago but not acked yet (store or filter)
}
type ICodec = IEncoder & IDecoder<IDecodedMessage>;
interface IAckManager {
start(): void;
stop(): void;
subscribe(codec: ICodec): void;
}
class FilterAckManager implements IAckManager {
private codecs: Set<ICodec> = new Set();
public constructor(
private messageStore: MessageStore,
private filter: IFilter
) {}
public start(): void {
// noop
this.filterAckManager.start();
this.storeAckManager.start();
}
public async stop(): Promise<void> {
const promises = Array.from(this.codecs.entries()).map((codec) => {
return this.filter.unsubscribe(codec);
});
await Promise.all(promises);
this.codecs.clear();
await this.filterAckManager.stop();
this.storeAckManager.stop();
}
public async subscribe(codec: ICodec): Promise<boolean> {
return this.filter.subscribe(codec, this.onMessage.bind(this));
}
private async onMessage(message: IDecodedMessage): Promise<void> {
if (!this.messageStore.has(message.hashStr)) {
this.messageStore.add(message);
}
this.messageStore.markFilterAck(message.hashStr);
}
}
class StoreAckManager implements IAckManager {
private interval: ReturnType<typeof setInterval> | null = null;
private codecs: Set<ICodec> = new Set();
public constructor(
private messageStore: MessageStore,
private store: IStore
) {}
public start(): void {
if (this.interval) {
return;
}
this.interval = setInterval(() => {
void this.query();
}, 1000);
}
public stop(): void {
if (!this.interval) {
return;
}
clearInterval(this.interval);
this.interval = null;
}
public subscribe(codec: ICodec): void {
this.codecs.add(codec);
}
private async query(): Promise<void> {
for (const codec of this.codecs) {
await this.store.queryWithOrderedCallback(
[codec],
(message) => {
if (!this.messageStore.has(message.hashStr)) {
this.messageStore.add(message);
}
this.messageStore.markStoreAck(message.hashStr);
},
{
timeStart: new Date(Date.now() - 60 * 60 * 1000),
timeEnd: new Date()
}
);
}
public send(encoder: IEncoder, message: IMessage): Promise<void> {
return (async () => {
const hash = await this.messageStore.queue(encoder, message);
await this.lightPush.send(encoder, message);
if (hash) {
this.messageStore.markSent(hash);
}
})();
}
}

View File

@ -0,0 +1,58 @@
import { IStore } from "@waku/interfaces";
import { MessageStore } from "./message_store.js";
import { IAckManager, ICodec } from "./utils.js";
export class StoreAckManager implements IAckManager {
private interval: ReturnType<typeof setInterval> | null = null;
private codecs: Set<ICodec> = new Set();
public constructor(
private messageStore: MessageStore,
private store: IStore
) {}
public start(): void {
if (this.interval) {
return;
}
this.interval = setInterval(() => {
void this.query();
}, 1000);
}
public stop(): void {
if (!this.interval) {
return;
}
clearInterval(this.interval);
this.interval = null;
}
public async subscribe(codec: ICodec): Promise<boolean> {
this.codecs.add(codec);
return true;
}
private async query(): Promise<void> {
for (const codec of this.codecs) {
await this.store.queryWithOrderedCallback(
[codec],
(message) => {
if (!this.messageStore.has(message.hashStr)) {
this.messageStore.add(message);
}
this.messageStore.markStoreAck(message.hashStr);
},
{
timeStart: new Date(Date.now() - 60 * 60 * 1000),
timeEnd: new Date()
}
);
}
}
}

View File

@ -0,0 +1,10 @@
import { IDecodedMessage, IDecoder, IEncoder } from "@waku/interfaces";
// TODO: create a local entity for that that will literally extend existing encoder and decoder from package/core
export type ICodec = IEncoder & IDecoder<IDecodedMessage>;
export interface IAckManager {
start(): void;
stop(): void;
subscribe(codec: ICodec): Promise<boolean>;
}

View File

@ -232,7 +232,7 @@ export class WakuNode implements IWaku {
this.peerManager.start();
this.healthIndicator.start();
this.lightPush?.start();
this.sender?.start();
this.messaging?.start();
this._nodeStateLock = false;
this._nodeStarted = true;
@ -243,7 +243,7 @@ export class WakuNode implements IWaku {
this._nodeStateLock = true;
this.sender?.stop();
await this.messaging?.stop();
this.lightPush?.stop();
await this.filter?.stop();
this.healthIndicator.stop();