implement main ack manager, improve message store, implement Sender entity

This commit is contained in:
Sasha 2025-09-25 01:32:12 +02:00
parent 3de906a78a
commit 4fe8bfdd88
No known key found for this signature in database
6 changed files with 242 additions and 151 deletions

View File

@ -0,0 +1,138 @@
import { IDecodedMessage, IFilter, IStore } from "@waku/interfaces";
import { MessageStore } from "./message_store.js";
import { IAckManager, ICodec } from "./utils.js";
type AckManagerConstructorParams = {
messageStore: MessageStore;
filter: IFilter;
store: IStore;
};
export class AckManager implements IAckManager {
private readonly messageStore: MessageStore;
private readonly filterAckManager: FilterAckManager;
private readonly storeAckManager: StoreAckManager;
public constructor(params: AckManagerConstructorParams) {
this.messageStore = params.messageStore;
this.filterAckManager = new FilterAckManager(
this.messageStore,
params.filter
);
this.storeAckManager = new StoreAckManager(this.messageStore, params.store);
}
public start(): void {
this.filterAckManager.start();
this.storeAckManager.start();
}
public async stop(): Promise<void> {
await this.filterAckManager.stop();
this.storeAckManager.stop();
}
public async subscribe(codec: ICodec): Promise<boolean> {
return (
(await this.filterAckManager.subscribe(codec)) ||
(await this.storeAckManager.subscribe(codec))
);
}
}
class FilterAckManager implements IAckManager {
private codecs: Set<ICodec> = new Set();
public constructor(
private messageStore: MessageStore,
private filter: IFilter
) {}
public start(): void {
return;
}
public async stop(): Promise<void> {
const promises = Array.from(this.codecs.entries()).map((codec) =>
this.filter.unsubscribe(codec)
);
await Promise.all(promises);
this.codecs.clear();
}
public async subscribe(codec: ICodec): Promise<boolean> {
const success = await this.filter.subscribe(
codec,
this.onMessage.bind(this)
);
if (success) {
this.codecs.add(codec);
}
return success;
}
private async onMessage(message: IDecodedMessage): Promise<void> {
if (!this.messageStore.has(message.hashStr)) {
this.messageStore.add(message);
}
this.messageStore.markFilterAck(message.hashStr);
}
}
class StoreAckManager implements IAckManager {
private interval: ReturnType<typeof setInterval> | null = null;
private codecs: Set<ICodec> = new Set();
public constructor(
private messageStore: MessageStore,
private store: IStore
) {}
public start(): void {
if (this.interval) {
return;
}
this.interval = setInterval(() => {
void this.query();
}, 1000);
}
public stop(): void {
if (!this.interval) {
return;
}
clearInterval(this.interval);
this.interval = null;
}
public async subscribe(codec: ICodec): Promise<boolean> {
this.codecs.add(codec);
return true;
}
private async query(): Promise<void> {
for (const codec of this.codecs) {
await this.store.queryWithOrderedCallback(
[codec],
(message) => {
if (!this.messageStore.has(message.hashStr)) {
this.messageStore.add(message);
}
this.messageStore.markStoreAck(message.hashStr);
},
{
timeStart: new Date(Date.now() - 60 * 60 * 1000),
timeEnd: new Date()
}
);
}
}
}

View File

@ -1,44 +0,0 @@
import { IDecodedMessage, IFilter } from "@waku/interfaces";
import { MessageStore } from "./message_store.js";
import { IAckManager, ICodec } from "./utils.js";
export class FilterAckManager implements IAckManager {
private codecs: Set<ICodec> = new Set();
public constructor(
private messageStore: MessageStore,
private filter: IFilter
) {}
public start(): void {
return;
}
public async stop(): Promise<void> {
const promises = Array.from(this.codecs.entries()).map((codec) =>
this.filter.unsubscribe(codec)
);
await Promise.all(promises);
this.codecs.clear();
}
public async subscribe(codec: ICodec): Promise<boolean> {
const success = await this.filter.subscribe(
codec,
this.onMessage.bind(this)
);
if (success) {
this.codecs.add(codec);
}
return success;
}
private async onMessage(message: IDecodedMessage): Promise<void> {
if (!this.messageStore.has(message.hashStr)) {
this.messageStore.add(message);
}
this.messageStore.markFilterAck(message.hashStr);
}
}

View File

@ -1,4 +1,4 @@
import { messageHashStr } from "@waku/core"; import { message, messageHashStr } from "@waku/core";
import { IDecodedMessage, IEncoder, IMessage } from "@waku/interfaces"; import { IDecodedMessage, IEncoder, IMessage } from "@waku/interfaces";
type QueuedMessage = { type QueuedMessage = {
@ -14,8 +14,12 @@ type MessageStoreOptions = {
resendIntervalMs?: number; resendIntervalMs?: number;
}; };
type RequestId = string;
export class MessageStore { export class MessageStore {
private readonly messages: Map<string, QueuedMessage> = new Map(); private readonly messages: Map<string, QueuedMessage> = new Map();
private readonly pendingRequests: Map<RequestId, QueuedMessage> = new Map();
private readonly resendIntervalMs: number; private readonly resendIntervalMs: number;
public constructor(options: MessageStoreOptions = {}) { public constructor(options: MessageStoreOptions = {}) {
@ -40,62 +44,91 @@ export class MessageStore {
const entry = this.messages.get(hashStr); const entry = this.messages.get(hashStr);
if (!entry) return; if (!entry) return;
entry.filterAck = true; entry.filterAck = true;
// TODO: implement events
} }
public markStoreAck(hashStr: string): void { public markStoreAck(hashStr: string): void {
const entry = this.messages.get(hashStr); const entry = this.messages.get(hashStr);
if (!entry) return; if (!entry) return;
entry.storeAck = true; entry.storeAck = true;
// TODO: implement events
} }
public markSent(hashStr: string): void { public async markSent(requestId: RequestId): Promise<void> {
const entry = this.messages.get(hashStr); const entry = this.pendingRequests.get(requestId);
if (!entry) return;
entry.lastSentAt = Date.now(); if (!entry || !entry.encoder || !entry.message) {
return;
}
try {
entry.lastSentAt = Date.now();
this.pendingRequests.delete(requestId);
const proto = await entry.encoder.toProtoObj(entry.message);
if (!proto) {
return;
}
const hashStr = messageHashStr(entry.encoder.pubsubTopic, proto);
this.messages.set(hashStr, entry);
} catch (error) {
// TODO: better recovery
this.pendingRequests.set(requestId, entry);
}
} }
public async queue( public async queue(
encoder: IEncoder, encoder: IEncoder,
message: IMessage message: IMessage
): Promise<string | undefined> { ): Promise<RequestId | undefined> {
const proto = await encoder.toProtoObj(message); const requestId = crypto.randomUUID();
if (!proto) return undefined;
const hashStr = messageHashStr(encoder.pubsubTopic, proto); this.pendingRequests.set(requestId, {
const existing = this.messages.get(hashStr); encoder,
if (!existing) { message,
this.messages.set(hashStr, { filterAck: false,
encoder, storeAck: false,
message, createdAt: Date.now()
filterAck: false, });
storeAck: false,
createdAt: Date.now() return requestId;
});
}
return hashStr;
} }
public getMessagesToSend(): Array<{ public getMessagesToSend(): Array<{
hashStr: string; requestId: string;
encoder: IEncoder; encoder: IEncoder;
message: IMessage; message: IMessage;
}> { }> {
const now = Date.now(); const now = Date.now();
const res: Array<{ const res: Array<{
hashStr: string; requestId: string;
encoder: IEncoder; encoder: IEncoder;
message: IMessage; message: IMessage;
}> = []; }> = [];
for (const [hashStr, entry] of this.messages.entries()) {
if (!entry.encoder || !entry.message) continue; for (const [requestId, entry] of this.pendingRequests.entries()) {
const isAcknowledged = entry.filterAck || entry.storeAck; if (!entry.encoder || !entry.message) {
if (isAcknowledged) continue; continue;
}
const isAcknowledged = entry.filterAck || entry.storeAck; // TODO: make sure it works with message and pending requests and returns messages to re-sent that are not ack yet
if (isAcknowledged) {
continue;
}
if ( if (
!entry.lastSentAt || !entry.lastSentAt ||
now - entry.lastSentAt >= this.resendIntervalMs now - entry.lastSentAt >= this.resendIntervalMs
) { ) {
res.push({ hashStr, encoder: entry.encoder, message: entry.message }); res.push({ requestId, encoder: entry.encoder, message: entry.message });
} }
} }
return res; return res;
} }
} }

View File

@ -6,9 +6,9 @@ import {
IStore IStore
} from "@waku/interfaces"; } from "@waku/interfaces";
import { FilterAckManager } from "./fitler_ack.js"; import { AckManager } from "./ack_manager.js";
import { MessageStore } from "./message_store.js"; import { MessageStore } from "./message_store.js";
import { StoreAckManager } from "./store_ack.js"; import { Sender } from "./sender.js";
interface IMessaging { interface IMessaging {
send(encoder: IEncoder, message: IMessage): Promise<void>; send(encoder: IEncoder, message: IMessage): Promise<void>;
@ -21,38 +21,34 @@ type MessagingConstructorParams = {
}; };
export class Messaging implements IMessaging { export class Messaging implements IMessaging {
private readonly lightPush: ILightPush;
private readonly messageStore: MessageStore; private readonly messageStore: MessageStore;
private readonly filterAckManager: FilterAckManager; private readonly ackManager: AckManager;
private readonly storeAckManager: StoreAckManager; private readonly sender: Sender;
public constructor(params: MessagingConstructorParams) { public constructor(params: MessagingConstructorParams) {
this.lightPush = params.lightPush;
this.messageStore = new MessageStore(); this.messageStore = new MessageStore();
this.filterAckManager = new FilterAckManager(
this.messageStore, this.ackManager = new AckManager({
params.filter messageStore: this.messageStore,
); filter: params.filter,
this.storeAckManager = new StoreAckManager(this.messageStore, params.store); store: params.store
});
this.sender = new Sender({
messageStore: this.messageStore,
lightPush: params.lightPush
});
} }
public start(): void { public start(): void {
this.filterAckManager.start(); this.ackManager.start();
this.storeAckManager.start();
} }
public async stop(): Promise<void> { public async stop(): Promise<void> {
await this.filterAckManager.stop(); await this.ackManager.stop();
this.storeAckManager.stop();
} }
public send(encoder: IEncoder, message: IMessage): Promise<void> { public send(encoder: IEncoder, message: IMessage): Promise<void> {
return (async () => { return this.sender.send(encoder, message);
const hash = await this.messageStore.queue(encoder, message);
await this.lightPush.send(encoder, message);
if (hash) {
this.messageStore.markSent(hash);
}
})();
} }
} }

View File

@ -0,0 +1,26 @@
import { IEncoder, ILightPush, IMessage } from "@waku/interfaces";
import type { MessageStore } from "./message_store.js";
type SenderConstructorParams = {
messageStore: MessageStore;
lightPush: ILightPush;
};
export class Sender {
private readonly messageStore: MessageStore;
private readonly lightPush: ILightPush;
public constructor(params: SenderConstructorParams) {
this.messageStore = params.messageStore;
this.lightPush = params.lightPush;
}
public async send(encoder: IEncoder, message: IMessage): Promise<void> {
const requestId = await this.messageStore.queue(encoder, message);
await this.lightPush.send(encoder, message);
if (requestId) {
await this.messageStore.markSent(requestId);
}
}
}

View File

@ -1,58 +0,0 @@
import { IStore } from "@waku/interfaces";
import { MessageStore } from "./message_store.js";
import { IAckManager, ICodec } from "./utils.js";
export class StoreAckManager implements IAckManager {
private interval: ReturnType<typeof setInterval> | null = null;
private codecs: Set<ICodec> = new Set();
public constructor(
private messageStore: MessageStore,
private store: IStore
) {}
public start(): void {
if (this.interval) {
return;
}
this.interval = setInterval(() => {
void this.query();
}, 1000);
}
public stop(): void {
if (!this.interval) {
return;
}
clearInterval(this.interval);
this.interval = null;
}
public async subscribe(codec: ICodec): Promise<boolean> {
this.codecs.add(codec);
return true;
}
private async query(): Promise<void> {
for (const codec of this.codecs) {
await this.store.queryWithOrderedCallback(
[codec],
(message) => {
if (!this.messageStore.has(message.hashStr)) {
this.messageStore.add(message);
}
this.messageStore.markStoreAck(message.hashStr);
},
{
timeStart: new Date(Date.now() - 60 * 60 * 1000),
timeEnd: new Date()
}
);
}
}
}