diff --git a/examples/sds-demo/src/lib/sds.svelte.ts b/examples/sds-demo/src/lib/sds.svelte.ts index 1649ff2..f6d0c73 100644 --- a/examples/sds-demo/src/lib/sds.svelte.ts +++ b/examples/sds-demo/src/lib/sds.svelte.ts @@ -26,19 +26,49 @@ export function sweepOut() { channel.sweepOutgoingBuffer(); } -export function sweepIn() { - channel.sweepIncomingBuffer(); +export async function sweepIn() { + const missedMessages = channel.sweepIncomingBuffer(); + console.log('missedMessages', missedMessages); + const messageHashes = missedMessages + .filter((message) => message.retrievalHint !== undefined) + .map((message) => message.retrievalHint!); + console.log('messageHashes', messageHashes); + if (messageHashes.length === 0) { + return; + } + const query = wakuNode.queryStore(messageHashes); + if (!query) { + console.error('no query'); + return; + } + console.log('query', query); + + // Process all batches of promises from the AsyncGenerator + for await (const promises of query) { + // Resolve all promises in the batch + const messages = await Promise.all(promises); + console.log('messages', messages); + + // Process each message + for (const msg of messages) { + if (msg?.payload) { + const sdsMessage = decodeMessage(msg.payload) as unknown as Message; + channel.receiveMessage(sdsMessage); + } + } + } } async function send(payload: Uint8Array): Promise { await channel.sendMessage(payload, async (message: Message) => { const encodedMessage = encodeMessage(message); + const timestamp = new Date(); const protoMessage = await encoder.toProtoObj({ payload: encodedMessage, - timestamp: new Date() + timestamp }); const hash = messageHash(encoder.pubsubTopic, protoMessage); - const result = await wakuNode.sendWithLightPush(encodedMessage); + const result = await wakuNode.sendWithLightPush(encodedMessage, timestamp); if (result.failures.length > 0) { console.error('error sending message', result.failures); } diff --git a/examples/sds-demo/src/lib/waku/waku.svelte.ts b/examples/sds-demo/src/lib/waku/waku.svelte.ts index 422ef72..145f445 100644 --- a/examples/sds-demo/src/lib/waku/waku.svelte.ts +++ b/examples/sds-demo/src/lib/waku/waku.svelte.ts @@ -59,13 +59,20 @@ export class WakuNode { await this.subscription?.unsubscribe([decoder.contentTopic]); } - public async sendWithLightPush(payload: Uint8Array): Promise { + public async sendWithLightPush(payload: Uint8Array, timestamp: Date): Promise { if (!node) { throw new Error('Waku node not started'); } return await node.lightPush.send(encoder, { payload: payload, - timestamp: new Date() + timestamp: timestamp + }); + } + + public queryStore(messageHashes: Uint8Array[]) { + return node?.store.queryGenerator([decoder], { + includeData: true, + messageHashes, }); } } @@ -114,8 +121,9 @@ export async function startWaku(): Promise { // Connect to peers await node.dial( + "/ip4/127.0.0.1/tcp/8000/ws/p2p/16Uiu2HAm6LgMnvadFttVeFsW5WHuoefsviCRbfo4AvnjySp4rnNt" // "/dns4/node-01.do-ams3.waku.sandbox.status.im/tcp/8095/wss/p2p/16Uiu2HAmNaeL4p3WEYzC9mgXBmBWSgWjPHRvatZTXnp8Jgv3iKsb" - '/dns4/waku-test.bloxy.one/tcp/8095/wss/p2p/16Uiu2HAmSZbDB7CusdRhgkD81VssRjQV5ZH13FbzCGcdnbbh6VwZ' + // '/dns4/waku-test.bloxy.one/tcp/8095/wss/p2p/16Uiu2HAmSZbDB7CusdRhgkD81VssRjQV5ZH13FbzCGcdnbbh6VwZ' ); // eslint-disable-next-line @typescript-eslint/no-explicit-any (window as any).waku = node; @@ -126,7 +134,7 @@ export async function startWaku(): Promise { // Wait for peer connections try { - await node.waitForPeers([Protocols.LightPush, Protocols.Filter]); + await node.waitForPeers([Protocols.LightPush, Protocols.Filter, Protocols.Store]); connectionState.update((state) => ({ ...state, status: 'setting_up_subscriptions'