mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-01-02 13:53:12 +00:00
add draft api
This commit is contained in:
parent
97f0fb279c
commit
01288ff5e2
@ -1,6 +1,6 @@
|
||||
import { messageHashStr } from "@waku/core";
|
||||
import {
|
||||
IDecodedMessage,
|
||||
IDecoder,
|
||||
IEncoder,
|
||||
IFilter,
|
||||
ILightPush,
|
||||
@ -37,7 +37,7 @@ class MessageStore {
|
||||
// -> sent more than 2s ago but not acked yet (store or filter)
|
||||
}
|
||||
|
||||
type ICodec = null;
|
||||
type ICodec = IEncoder & IDecoder<IDecodedMessage>;
|
||||
|
||||
interface IAckManager {
|
||||
start(): void;
|
||||
@ -46,42 +46,88 @@ interface IAckManager {
|
||||
}
|
||||
|
||||
class FilterAckManager implements IAckManager {
|
||||
private subscriptions: Set<ICodec> = new Set();
|
||||
private codecs: Set<ICodec> = new Set();
|
||||
|
||||
public constructor(
|
||||
private messageStore: MessageStore,
|
||||
private filter: IFilter
|
||||
) {}
|
||||
|
||||
public start(): void {}
|
||||
public start(): void {
|
||||
// noop
|
||||
}
|
||||
|
||||
public stop(): void {}
|
||||
public async stop(): Promise<void> {
|
||||
const promises = Array.from(this.codecs.entries()).map((codec) => {
|
||||
return this.filter.unsubscribe(codec);
|
||||
});
|
||||
|
||||
await Promise.all(promises);
|
||||
this.codecs.clear();
|
||||
}
|
||||
|
||||
public async subscribe(codec: ICodec): Promise<boolean> {
|
||||
return this.filter.subscribe(codec, this.onMessage.bind(this));
|
||||
}
|
||||
|
||||
private async onMessage(message: IDecodedMessage): Promise<void> {
|
||||
const hash = messageHashStr(message.pubsubTopic, message);
|
||||
|
||||
if (this.messageStore.has(message)) {
|
||||
this.messageStore.markFilterAck(hash);
|
||||
} else {
|
||||
this.messageStore.put(message);
|
||||
this.messageStore.markFilterAck(hash);
|
||||
if (!this.messageStore.has(message.hashStr)) {
|
||||
this.messageStore.add(message);
|
||||
}
|
||||
|
||||
this.messageStore.markFilterAck(message.hashStr);
|
||||
}
|
||||
}
|
||||
|
||||
class StoreAckManager implements IAckManager {
|
||||
private interval: ReturnType<typeof setInterval> | null = null;
|
||||
|
||||
private codecs: Set<ICodec> = new Set();
|
||||
|
||||
public constructor(
|
||||
private messageStore: MessageStore,
|
||||
private store: IStore
|
||||
) {}
|
||||
|
||||
public start(): void {}
|
||||
public start(): void {
|
||||
if (this.interval) {
|
||||
return;
|
||||
}
|
||||
|
||||
public stop(): void {}
|
||||
this.interval = setInterval(() => {
|
||||
void this.query();
|
||||
}, 1000);
|
||||
}
|
||||
|
||||
public subscribe(codec: ICodec): void {}
|
||||
public stop(): void {
|
||||
if (!this.interval) {
|
||||
return;
|
||||
}
|
||||
|
||||
clearInterval(this.interval);
|
||||
this.interval = null;
|
||||
}
|
||||
|
||||
public subscribe(codec: ICodec): void {
|
||||
this.codecs.add(codec);
|
||||
}
|
||||
|
||||
private async query(): Promise<void> {
|
||||
for (const codec of this.codecs) {
|
||||
await this.store.queryWithOrderedCallback(
|
||||
[codec],
|
||||
(message) => {
|
||||
if (!this.messageStore.has(message.hashStr)) {
|
||||
this.messageStore.add(message);
|
||||
}
|
||||
|
||||
this.messageStore.markStoreAck(message.hashStr);
|
||||
},
|
||||
{
|
||||
timeStart: new Date(Date.now() - 60 * 60 * 1000),
|
||||
timeEnd: new Date()
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user