add store queries for missed messages

This commit is contained in:
Arseniy Klempner 2025-03-28 19:04:51 -07:00
parent 054760e752
commit dc18ae685b
No known key found for this signature in database
GPG Key ID: 51653F18863BD24B
2 changed files with 46 additions and 8 deletions

View File

@ -26,19 +26,49 @@ export function sweepOut() {
channel.sweepOutgoingBuffer();
}
export function sweepIn() {
channel.sweepIncomingBuffer();
export async function sweepIn() {
const missedMessages = channel.sweepIncomingBuffer();
console.log('missedMessages', missedMessages);
const messageHashes = missedMessages
.filter((message) => message.retrievalHint !== undefined)
.map((message) => message.retrievalHint!);
console.log('messageHashes', messageHashes);
if (messageHashes.length === 0) {
return;
}
const query = wakuNode.queryStore(messageHashes);
if (!query) {
console.error('no query');
return;
}
console.log('query', query);
// Process all batches of promises from the AsyncGenerator
for await (const promises of query) {
// Resolve all promises in the batch
const messages = await Promise.all(promises);
console.log('messages', messages);
// Process each message
for (const msg of messages) {
if (msg?.payload) {
const sdsMessage = decodeMessage(msg.payload) as unknown as Message;
channel.receiveMessage(sdsMessage);
}
}
}
}
async function send(payload: Uint8Array): Promise<void> {
await channel.sendMessage(payload, async (message: Message) => {
const encodedMessage = encodeMessage(message);
const timestamp = new Date();
const protoMessage = await encoder.toProtoObj({
payload: encodedMessage,
timestamp: new Date()
timestamp
});
const hash = messageHash(encoder.pubsubTopic, protoMessage);
const result = await wakuNode.sendWithLightPush(encodedMessage);
const result = await wakuNode.sendWithLightPush(encodedMessage, timestamp);
if (result.failures.length > 0) {
console.error('error sending message', result.failures);
}

View File

@ -59,13 +59,20 @@ export class WakuNode {
await this.subscription?.unsubscribe([decoder.contentTopic]);
}
public async sendWithLightPush(payload: Uint8Array): Promise<SDKProtocolResult> {
public async sendWithLightPush(payload: Uint8Array, timestamp: Date): Promise<SDKProtocolResult> {
if (!node) {
throw new Error('Waku node not started');
}
return await node.lightPush.send(encoder, {
payload: payload,
timestamp: new Date()
timestamp: timestamp
});
}
public queryStore(messageHashes: Uint8Array[]) {
return node?.store.queryGenerator([decoder], {
includeData: true,
messageHashes,
});
}
}
@ -114,8 +121,9 @@ export async function startWaku(): Promise<void> {
// Connect to peers
await node.dial(
"/ip4/127.0.0.1/tcp/8000/ws/p2p/16Uiu2HAm6LgMnvadFttVeFsW5WHuoefsviCRbfo4AvnjySp4rnNt"
// "/dns4/node-01.do-ams3.waku.sandbox.status.im/tcp/8095/wss/p2p/16Uiu2HAmNaeL4p3WEYzC9mgXBmBWSgWjPHRvatZTXnp8Jgv3iKsb"
'/dns4/waku-test.bloxy.one/tcp/8095/wss/p2p/16Uiu2HAmSZbDB7CusdRhgkD81VssRjQV5ZH13FbzCGcdnbbh6VwZ'
// '/dns4/waku-test.bloxy.one/tcp/8095/wss/p2p/16Uiu2HAmSZbDB7CusdRhgkD81VssRjQV5ZH13FbzCGcdnbbh6VwZ'
);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(window as any).waku = node;
@ -126,7 +134,7 @@ export async function startWaku(): Promise<void> {
// Wait for peer connections
try {
await node.waitForPeers([Protocols.LightPush, Protocols.Filter]);
await node.waitForPeers([Protocols.LightPush, Protocols.Filter, Protocols.Store]);
connectionState.update((state) => ({
...state,
status: 'setting_up_subscriptions'