diff --git a/packages/tests/tests/store.node.spec.ts b/packages/tests/tests/store.node.spec.ts index e01b686a9a..9689245c51 100644 --- a/packages/tests/tests/store.node.spec.ts +++ b/packages/tests/tests/store.node.spec.ts @@ -40,75 +40,75 @@ describe("Waku Store", () => { !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); }); - // it("Generator", async function () { - // this.timeout(1000_000); - // const totalMsgs = 20; + it("Generator", async function () { + this.timeout(1000_000); + const totalMsgs = 20; - // for (let i = 0; i < totalMsgs; i++) { - // expect( - // await nwaku.sendMessage( - // Nwaku.toMessageRpcQuery({ - // payload: utf8ToBytes(`Message ${i}`), - // contentTopic: TestContentTopic, - // }) - // ) - // ).to.be.true; - // } + for (let i = 0; i < totalMsgs; i++) { + expect( + await nwaku.sendMessage( + Nwaku.toMessageRpcQuery({ + payload: utf8ToBytes(`Message ${i}`), + contentTopic: TestContentTopic, + }) + ) + ).to.be.true; + } - // waku = await createFullNode({ - // staticNoiseKey: NOISE_KEY_1, - // }); - // await waku.start(); - // await waku.dial(await nwaku.getMultiaddrWithId()); - // await waitForRemotePeer(waku, [Protocols.Store]); + waku = await createFullNode({ + staticNoiseKey: NOISE_KEY_1, + }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); - // const messages: Message[] = []; - // let promises: Promise[] = []; - // for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { - // const _promises = msgPromises.map(async (promise) => { - // const msg = await promise; - // if (msg) { - // messages.push(msg); - // } - // }); + const messages: Message[] = []; + let promises: Promise[] = []; + for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { + const _promises = msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + messages.push(msg); + } + }); - // promises = promises.concat(_promises); - // } - // await Promise.all(promises); + promises = promises.concat(_promises); + } + await Promise.all(promises); - // expect(messages?.length).eq(totalMsgs); - // const result = messages?.findIndex((msg) => { - // return bytesToUtf8(msg.payload!) === "Message 0"; - // }); - // expect(result).to.not.eq(-1); - // }); + expect(messages?.length).eq(totalMsgs); + const result = messages?.findIndex((msg) => { + return bytesToUtf8(msg.payload!) === "Message 0"; + }); + expect(result).to.not.eq(-1); + }); - // it("Generator, no message returned", async function () { - // this.timeout(15_000); + it("Generator, no message returned", async function () { + this.timeout(15_000); - // waku = await createFullNode({ - // staticNoiseKey: NOISE_KEY_1, - // }); - // await waku.start(); - // await waku.dial(await nwaku.getMultiaddrWithId()); - // await waitForRemotePeer(waku, [Protocols.Store]); + waku = await createFullNode({ + staticNoiseKey: NOISE_KEY_1, + }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); - // const messages: Message[] = []; - // let promises: Promise[] = []; - // for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { - // const _promises = msgPromises.map(async (promise) => { - // const msg = await promise; - // if (msg) { - // messages.push(msg); - // } - // }); + const messages: Message[] = []; + let promises: Promise[] = []; + for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { + const _promises = msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + messages.push(msg); + } + }); - // promises = promises.concat(_promises); - // } - // await Promise.all(promises); + promises = promises.concat(_promises); + } + await Promise.all(promises); - // expect(messages?.length).eq(0); - // }); + expect(messages?.length).eq(0); + }); it("Passing a cursor", async function () { this.timeout(4_000); @@ -132,46 +132,43 @@ describe("Waku Store", () => { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - const messages: Message[] = []; - const query = waku.store.queryGenerator([TestDecoder]); + // messages in reversed order (first message at last index) + const messages: Message[] = []; for await (const page of query) { - for await (const msg of page) { + for await (const msg of page.reverse()) { messages.push(msg as Message); } } + // index 2 would mean the third last message sent const cursorIndex = 2; const cursorMessage = messages[cursorIndex]; + // create cursor to extract messages after the 3rd index const cursor = await createCursor( bytesToUtf8(cursorMessage.payload!), BigInt(cursorMessage.timestamp!.getTime()) * BigInt(1000000), TestContentTopic ); - const val = await waku.store - .queryGenerator([TestDecoder], { cursor }) - .next(); - //realIndexOfTest = (cursor-pageSize+test+len)%len - // the last message received on this page - const testMessage = await val.value[10 - 1]; + const messagesAfterCursor: Message[] = []; + for await (const page of waku.store.queryGenerator([TestDecoder], { + cursor, + })) { + for await (const msg of page.reverse()) { + messagesAfterCursor.push(msg as Message); + } + } - // for (const msg of val.value) { - // const _msg = await msg; - // console.log({ - // msg: bytesToUtf8(_msg.payload!), - // }); - // } - // console.log({ - // cursorMessage: bytesToUtf8(cursorMessage.payload!), - // testMessage: bytesToUtf8(testMessage.payload!), - // }); + const testMessage = messagesAfterCursor[0]; - expect(messages?.length).be.eq(totalMsgs); + expect(messages.length).be.eq(totalMsgs); - expect(testMessage).to.be.eq(messages[cursorIndex + 1]); + expect(bytesToUtf8(testMessage.payload!)).to.be.eq( + bytesToUtf8(messages[cursorIndex + 1].payload!) + ); }); it("Callback on promise", async function () { @@ -560,68 +557,68 @@ describe("Waku Store", () => { }); }); -// describe("Waku Store, custom pubsub topic", () => { -// const customPubSubTopic = "/waku/2/custom-dapp/proto"; -// let waku: WakuFull; -// let nwaku: Nwaku; +describe("Waku Store, custom pubsub topic", () => { + const customPubSubTopic = "/waku/2/custom-dapp/proto"; + let waku: WakuFull; + let nwaku: Nwaku; -// beforeEach(async function () { -// this.timeout(15_000); -// nwaku = new Nwaku(makeLogFileName(this)); -// await nwaku.start({ -// persistMessages: true, -// store: true, -// topics: customPubSubTopic, -// }); -// }); + beforeEach(async function () { + this.timeout(15_000); + nwaku = new Nwaku(makeLogFileName(this)); + await nwaku.start({ + persistMessages: true, + store: true, + topics: customPubSubTopic, + }); + }); -// afterEach(async function () { -// !!nwaku && nwaku.stop(); -// !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); -// }); + afterEach(async function () { + !!nwaku && nwaku.stop(); + !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); + }); -// it("Generator, custom pubsub topic", async function () { -// this.timeout(15_000); + it("Generator, custom pubsub topic", async function () { + this.timeout(15_000); -// const totalMsgs = 20; -// for (let i = 0; i < totalMsgs; i++) { -// expect( -// await nwaku.sendMessage( -// Nwaku.toMessageRpcQuery({ -// payload: utf8ToBytes(`Message ${i}`), -// contentTopic: TestContentTopic, -// }), -// customPubSubTopic -// ) -// ).to.be.true; -// } + const totalMsgs = 20; + for (let i = 0; i < totalMsgs; i++) { + expect( + await nwaku.sendMessage( + Nwaku.toMessageRpcQuery({ + payload: utf8ToBytes(`Message ${i}`), + contentTopic: TestContentTopic, + }), + customPubSubTopic + ) + ).to.be.true; + } -// waku = await createFullNode({ -// staticNoiseKey: NOISE_KEY_1, -// pubSubTopic: customPubSubTopic, -// }); -// await waku.start(); -// await waku.dial(await nwaku.getMultiaddrWithId()); -// await waitForRemotePeer(waku, [Protocols.Store]); + waku = await createFullNode({ + staticNoiseKey: NOISE_KEY_1, + pubSubTopic: customPubSubTopic, + }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); -// const messages: Message[] = []; -// let promises: Promise[] = []; -// for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { -// const _promises = msgPromises.map(async (promise) => { -// const msg = await promise; -// if (msg) { -// messages.push(msg); -// } -// }); + const messages: Message[] = []; + let promises: Promise[] = []; + for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { + const _promises = msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + messages.push(msg); + } + }); -// promises = promises.concat(_promises); -// } -// await Promise.all(promises); + promises = promises.concat(_promises); + } + await Promise.all(promises); -// expect(messages?.length).eq(totalMsgs); -// const result = messages?.findIndex((msg) => { -// return bytesToUtf8(msg.payload!) === "Message 0"; -// }); -// expect(result).to.not.eq(-1); -// }); -// }); + expect(messages?.length).eq(totalMsgs); + const result = messages?.findIndex((msg) => { + return bytesToUtf8(msg.payload!) === "Message 0"; + }); + expect(result).to.not.eq(-1); + }); +});