mirror of
https://github.com/waku-org/js-waku.git
synced 2025-02-17 14:57:19 +00:00
feat: provide several API for store queries
This commit is contained in:
parent
65511a5888
commit
5a529c1cd7
@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
### Changed
|
### Changed
|
||||||
|
|
||||||
- Correct options type for `createFullNode` & `createPrivacy` to enable passing gossipsub options.
|
- Correct options type for `createFullNode` & `createPrivacy` to enable passing gossipsub options.
|
||||||
|
- `WakuStore` now provides several APIs: `queryGenerator`, `queryCallbackOnPromise`, `queryOrderedCallback`;
|
||||||
|
each provides different guarantees and performance.
|
||||||
|
|
||||||
## [0.27.0] - 2022-09-13
|
## [0.27.0] - 2022-09-13
|
||||||
|
|
||||||
|
@ -25,70 +25,23 @@ const log = debug("waku:test:store");
|
|||||||
|
|
||||||
const TestContentTopic = "/test/1/waku-store/utf8";
|
const TestContentTopic = "/test/1/waku-store/utf8";
|
||||||
|
|
||||||
const isWakuMessageDefined = (
|
|
||||||
msg: WakuMessage | undefined
|
|
||||||
): msg is WakuMessage => {
|
|
||||||
return !!msg;
|
|
||||||
};
|
|
||||||
|
|
||||||
describe("Waku Store", () => {
|
describe("Waku Store", () => {
|
||||||
let waku: WakuFull;
|
let waku: WakuFull;
|
||||||
let nwaku: Nwaku;
|
let nwaku: Nwaku;
|
||||||
|
|
||||||
|
beforeEach(async function () {
|
||||||
|
this.timeout(15_000);
|
||||||
|
nwaku = new Nwaku(makeLogFileName(this));
|
||||||
|
await nwaku.start({ persistMessages: true, store: true, lightpush: true });
|
||||||
|
});
|
||||||
|
|
||||||
afterEach(async function () {
|
afterEach(async function () {
|
||||||
!!nwaku && nwaku.stop();
|
!!nwaku && nwaku.stop();
|
||||||
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
|
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
|
||||||
});
|
});
|
||||||
|
|
||||||
it("Retrieves history", async function () {
|
it("Generator", async function () {
|
||||||
this.timeout(15_000);
|
this.timeout(15_000);
|
||||||
|
|
||||||
nwaku = new Nwaku(makeLogFileName(this));
|
|
||||||
await nwaku.start({ persistMessages: true, store: true });
|
|
||||||
|
|
||||||
for (let i = 0; i < 2; i++) {
|
|
||||||
expect(
|
|
||||||
await nwaku.sendMessage(
|
|
||||||
Nwaku.toMessageRpcQuery({
|
|
||||||
payload: utf8ToBytes(`Message ${i}`),
|
|
||||||
contentTopic: TestContentTopic,
|
|
||||||
})
|
|
||||||
)
|
|
||||||
).to.be.true;
|
|
||||||
}
|
|
||||||
|
|
||||||
waku = await createFullNode({
|
|
||||||
staticNoiseKey: NOISE_KEY_1,
|
|
||||||
});
|
|
||||||
await waku.start();
|
|
||||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
|
||||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
|
||||||
|
|
||||||
const messages: WakuMessage[] = [];
|
|
||||||
await waku.store.queryHistory([], async (msgPromises) => {
|
|
||||||
await Promise.all(
|
|
||||||
msgPromises.map(async (promise) => {
|
|
||||||
const msg = await promise;
|
|
||||||
if (msg) {
|
|
||||||
messages.push(msg);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
expect(messages?.length).eq(2);
|
|
||||||
const result = messages?.findIndex((msg) => {
|
|
||||||
return msg.payloadAsUtf8 === "Message 0";
|
|
||||||
});
|
|
||||||
expect(result).to.not.eq(-1);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Retrieves history using callback", async function () {
|
|
||||||
this.timeout(15_000);
|
|
||||||
|
|
||||||
nwaku = new Nwaku(makeLogFileName(this));
|
|
||||||
await nwaku.start({ persistMessages: true, store: true });
|
|
||||||
|
|
||||||
const totalMsgs = 20;
|
const totalMsgs = 20;
|
||||||
|
|
||||||
for (let i = 0; i < totalMsgs; i++) {
|
for (let i = 0; i < totalMsgs; i++) {
|
||||||
@ -110,15 +63,82 @@ describe("Waku Store", () => {
|
|||||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||||
|
|
||||||
const messages: WakuMessage[] = [];
|
const messages: WakuMessage[] = [];
|
||||||
await waku.store.queryHistory([], async (msgPromises) => {
|
let promises: Promise<void>[] = [];
|
||||||
await Promise.all(
|
for await (const msgPromises of waku.store.queryGenerator([])) {
|
||||||
msgPromises.map(async (promise) => {
|
const _promises = msgPromises.map(async (promise) => {
|
||||||
const msg = await promise;
|
const msg = await promise;
|
||||||
if (msg) {
|
if (msg) {
|
||||||
messages.push(msg);
|
messages.push(msg);
|
||||||
}
|
}
|
||||||
})
|
});
|
||||||
);
|
|
||||||
|
promises = promises.concat(_promises);
|
||||||
|
}
|
||||||
|
await Promise.all(promises);
|
||||||
|
|
||||||
|
expect(messages?.length).eq(totalMsgs);
|
||||||
|
const result = messages?.findIndex((msg) => {
|
||||||
|
return msg.payloadAsUtf8 === "Message 0";
|
||||||
|
});
|
||||||
|
expect(result).to.not.eq(-1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Generator, no message returned", async function () {
|
||||||
|
this.timeout(15_000);
|
||||||
|
|
||||||
|
waku = await createFullNode({
|
||||||
|
staticNoiseKey: NOISE_KEY_1,
|
||||||
|
});
|
||||||
|
await waku.start();
|
||||||
|
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||||
|
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||||
|
|
||||||
|
const messages: WakuMessage[] = [];
|
||||||
|
let promises: Promise<void>[] = [];
|
||||||
|
for await (const msgPromises of waku.store.queryGenerator([])) {
|
||||||
|
const _promises = msgPromises.map(async (promise) => {
|
||||||
|
const msg = await promise;
|
||||||
|
if (msg) {
|
||||||
|
messages.push(msg);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
promises = promises.concat(_promises);
|
||||||
|
}
|
||||||
|
await Promise.all(promises);
|
||||||
|
|
||||||
|
expect(messages?.length).eq(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Callback on promise", async function () {
|
||||||
|
this.timeout(15_000);
|
||||||
|
|
||||||
|
const totalMsgs = 15;
|
||||||
|
|
||||||
|
for (let i = 0; i < totalMsgs; i++) {
|
||||||
|
expect(
|
||||||
|
await nwaku.sendMessage(
|
||||||
|
Nwaku.toMessageRpcQuery({
|
||||||
|
payload: utf8ToBytes(`Message ${i}`),
|
||||||
|
contentTopic: TestContentTopic,
|
||||||
|
})
|
||||||
|
)
|
||||||
|
).to.be.true;
|
||||||
|
}
|
||||||
|
|
||||||
|
waku = await createFullNode({
|
||||||
|
staticNoiseKey: NOISE_KEY_1,
|
||||||
|
});
|
||||||
|
await waku.start();
|
||||||
|
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||||
|
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||||
|
|
||||||
|
const messages: WakuMessage[] = [];
|
||||||
|
await waku.store.queryCallbackOnPromise([], async (msgPromise) => {
|
||||||
|
const msg = await msgPromise;
|
||||||
|
if (msg) {
|
||||||
|
messages.push(msg);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
expect(messages?.length).eq(totalMsgs);
|
expect(messages?.length).eq(totalMsgs);
|
||||||
@ -128,15 +148,12 @@ describe("Waku Store", () => {
|
|||||||
expect(result).to.not.eq(-1);
|
expect(result).to.not.eq(-1);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("Retrieval aborts when callback returns true", async function () {
|
it("Callback on promise, aborts when callback returns true", async function () {
|
||||||
this.timeout(15_000);
|
this.timeout(15_000);
|
||||||
|
|
||||||
nwaku = new Nwaku(makeLogFileName(this));
|
const totalMsgs = 20;
|
||||||
await nwaku.start({ persistMessages: true, store: true });
|
|
||||||
|
|
||||||
const availMsgs = 20;
|
for (let i = 0; i < totalMsgs; i++) {
|
||||||
|
|
||||||
for (let i = 0; i < availMsgs; i++) {
|
|
||||||
expect(
|
expect(
|
||||||
await nwaku.sendMessage(
|
await nwaku.sendMessage(
|
||||||
Nwaku.toMessageRpcQuery({
|
Nwaku.toMessageRpcQuery({
|
||||||
@ -154,15 +171,15 @@ describe("Waku Store", () => {
|
|||||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||||
|
|
||||||
let messages: WakuMessage[] = [];
|
|
||||||
const desiredMsgs = 14;
|
const desiredMsgs = 14;
|
||||||
|
const messages: WakuMessage[] = [];
|
||||||
await waku.store.queryHistory(
|
await waku.store.queryCallbackOnPromise(
|
||||||
[],
|
[],
|
||||||
async (msgPromises) => {
|
async (msgPromise) => {
|
||||||
const msgsOrUndefined = await Promise.all(msgPromises);
|
const msg = await msgPromise;
|
||||||
const msgs = msgsOrUndefined.filter(isWakuMessageDefined);
|
if (msg) {
|
||||||
messages = messages.concat(msgs);
|
messages.push(msg);
|
||||||
|
}
|
||||||
return messages.length >= desiredMsgs;
|
return messages.length >= desiredMsgs;
|
||||||
},
|
},
|
||||||
{ pageSize: 7 }
|
{ pageSize: 7 }
|
||||||
@ -171,13 +188,11 @@ describe("Waku Store", () => {
|
|||||||
expect(messages?.length).eq(desiredMsgs);
|
expect(messages?.length).eq(desiredMsgs);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("Retrieves all historical elements in chronological order through paging", async function () {
|
it("Ordered Callback - Forward", async function () {
|
||||||
this.timeout(15_000);
|
this.timeout(15_000);
|
||||||
|
|
||||||
nwaku = new Nwaku(makeLogFileName(this));
|
const totalMsgs = 18;
|
||||||
await nwaku.start({ persistMessages: true, store: true });
|
for (let i = 0; i < totalMsgs; i++) {
|
||||||
|
|
||||||
for (let i = 0; i < 15; i++) {
|
|
||||||
expect(
|
expect(
|
||||||
await nwaku.sendMessage(
|
await nwaku.sendMessage(
|
||||||
Nwaku.toMessageRpcQuery({
|
Nwaku.toMessageRpcQuery({
|
||||||
@ -195,24 +210,19 @@ describe("Waku Store", () => {
|
|||||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||||
|
|
||||||
let messages: WakuMessage[] = [];
|
const messages: WakuMessage[] = [];
|
||||||
await waku.store.queryHistory(
|
await waku.store.queryOrderedCallback(
|
||||||
[],
|
[],
|
||||||
async (msgPromises) => {
|
async (msg) => {
|
||||||
const msgsOrUndefined = await Promise.all(msgPromises);
|
messages.push(msg);
|
||||||
const msgs = msgsOrUndefined.filter(isWakuMessageDefined);
|
|
||||||
// Note: messages within a page are ordered from oldest to most recent
|
|
||||||
// so the `concat` can only preserve order when `PageDirection`
|
|
||||||
// is forward
|
|
||||||
messages = messages.concat(msgs);
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
pageDirection: PageDirection.FORWARD,
|
pageDirection: PageDirection.FORWARD,
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
expect(messages?.length).eq(15);
|
expect(messages?.length).eq(totalMsgs);
|
||||||
for (let index = 0; index < 2; index++) {
|
for (let index = 0; index < totalMsgs; index++) {
|
||||||
expect(
|
expect(
|
||||||
messages?.findIndex((msg) => {
|
messages?.findIndex((msg) => {
|
||||||
return msg.payloadAsUtf8 === `Message ${index}`;
|
return msg.payloadAsUtf8 === `Message ${index}`;
|
||||||
@ -221,176 +231,54 @@ describe("Waku Store", () => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
it("Retrieves history using custom pubsub topic", async function () {
|
it("Ordered Callback - Backward", async function () {
|
||||||
this.timeout(15_000);
|
this.timeout(15_000);
|
||||||
|
|
||||||
const customPubSubTopic = "/waku/2/custom-dapp/proto";
|
const totalMsgs = 18;
|
||||||
nwaku = new Nwaku(makeLogFileName(this));
|
for (let i = 0; i < totalMsgs; i++) {
|
||||||
await nwaku.start({
|
|
||||||
persistMessages: true,
|
|
||||||
store: true,
|
|
||||||
topics: customPubSubTopic,
|
|
||||||
});
|
|
||||||
|
|
||||||
for (let i = 0; i < 2; i++) {
|
|
||||||
expect(
|
expect(
|
||||||
await nwaku.sendMessage(
|
await nwaku.sendMessage(
|
||||||
Nwaku.toMessageRpcQuery({
|
Nwaku.toMessageRpcQuery({
|
||||||
payload: utf8ToBytes(`Message ${i}`),
|
payload: utf8ToBytes(`Message ${i}`),
|
||||||
contentTopic: TestContentTopic,
|
contentTopic: TestContentTopic,
|
||||||
}),
|
})
|
||||||
customPubSubTopic
|
|
||||||
)
|
)
|
||||||
).to.be.true;
|
).to.be.true;
|
||||||
}
|
}
|
||||||
|
|
||||||
waku = await createFullNode({
|
waku = await createFullNode({
|
||||||
pubSubTopic: customPubSubTopic,
|
|
||||||
staticNoiseKey: NOISE_KEY_1,
|
staticNoiseKey: NOISE_KEY_1,
|
||||||
});
|
});
|
||||||
await waku.start();
|
await waku.start();
|
||||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||||
|
|
||||||
const nimPeerId = await nwaku.getPeerId();
|
|
||||||
|
|
||||||
const messages: WakuMessage[] = [];
|
|
||||||
await waku.store.queryHistory(
|
|
||||||
[],
|
|
||||||
async (msgPromises) => {
|
|
||||||
await Promise.all(
|
|
||||||
msgPromises.map(async (promise) => {
|
|
||||||
const msg = await promise;
|
|
||||||
if (msg) {
|
|
||||||
messages.push(msg);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
);
|
|
||||||
},
|
|
||||||
{
|
|
||||||
peerId: nimPeerId,
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
expect(messages?.length).eq(2);
|
|
||||||
const result = messages?.findIndex((msg) => {
|
|
||||||
return msg.payloadAsUtf8 === "Message 0";
|
|
||||||
});
|
|
||||||
expect(result).to.not.eq(-1);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Retrieves history with asymmetric & symmetric encrypted messages", async function () {
|
|
||||||
this.timeout(15_000);
|
|
||||||
|
|
||||||
nwaku = new Nwaku(makeLogFileName(this));
|
|
||||||
await nwaku.start({ persistMessages: true, store: true, lightpush: true });
|
|
||||||
|
|
||||||
const encryptedAsymmetricMessageText = "asymmetric encryption";
|
|
||||||
const encryptedSymmetricMessageText = "symmetric encryption";
|
|
||||||
const clearMessageText =
|
|
||||||
"This is a clear text message for everyone to read";
|
|
||||||
const otherEncMessageText =
|
|
||||||
"This message is not for and I must not be able to read it";
|
|
||||||
|
|
||||||
const privateKey = generatePrivateKey();
|
|
||||||
const symKey = generateSymmetricKey();
|
|
||||||
const publicKey = getPublicKey(privateKey);
|
|
||||||
|
|
||||||
const timestamp = new Date();
|
|
||||||
const [
|
|
||||||
encryptedAsymmetricMessage,
|
|
||||||
encryptedSymmetricMessage,
|
|
||||||
clearMessage,
|
|
||||||
otherEncMessage,
|
|
||||||
] = await Promise.all([
|
|
||||||
WakuMessage.fromUtf8String(
|
|
||||||
encryptedAsymmetricMessageText,
|
|
||||||
TestContentTopic,
|
|
||||||
{
|
|
||||||
encPublicKey: publicKey,
|
|
||||||
timestamp,
|
|
||||||
}
|
|
||||||
),
|
|
||||||
WakuMessage.fromUtf8String(
|
|
||||||
encryptedSymmetricMessageText,
|
|
||||||
TestContentTopic,
|
|
||||||
{
|
|
||||||
symKey: symKey,
|
|
||||||
timestamp: new Date(timestamp.valueOf() + 1),
|
|
||||||
}
|
|
||||||
),
|
|
||||||
WakuMessage.fromUtf8String(clearMessageText, TestContentTopic, {
|
|
||||||
timestamp: new Date(timestamp.valueOf() + 2),
|
|
||||||
}),
|
|
||||||
WakuMessage.fromUtf8String(otherEncMessageText, TestContentTopic, {
|
|
||||||
encPublicKey: getPublicKey(generatePrivateKey()),
|
|
||||||
timestamp: new Date(timestamp.valueOf() + 3),
|
|
||||||
}),
|
|
||||||
]);
|
|
||||||
|
|
||||||
log("Messages have been encrypted");
|
|
||||||
const [waku1, waku2, nimWakuMultiaddr] = await Promise.all([
|
|
||||||
createFullNode({
|
|
||||||
staticNoiseKey: NOISE_KEY_1,
|
|
||||||
}).then((waku) => waku.start().then(() => waku)),
|
|
||||||
createFullNode({
|
|
||||||
staticNoiseKey: NOISE_KEY_2,
|
|
||||||
}).then((waku) => waku.start().then(() => waku)),
|
|
||||||
nwaku.getMultiaddrWithId(),
|
|
||||||
]);
|
|
||||||
|
|
||||||
log("Waku nodes created");
|
|
||||||
|
|
||||||
await Promise.all([
|
|
||||||
waku1.dial(nimWakuMultiaddr),
|
|
||||||
waku2.dial(nimWakuMultiaddr),
|
|
||||||
]);
|
|
||||||
|
|
||||||
log("Waku nodes connected to nwaku");
|
|
||||||
|
|
||||||
await waitForRemotePeer(waku1, [Protocols.LightPush]);
|
|
||||||
|
|
||||||
log("Sending messages using light push");
|
|
||||||
await Promise.all([
|
|
||||||
waku1.lightPush.push(encryptedAsymmetricMessage),
|
|
||||||
waku1.lightPush.push(encryptedSymmetricMessage),
|
|
||||||
waku1.lightPush.push(otherEncMessage),
|
|
||||||
waku1.lightPush.push(clearMessage),
|
|
||||||
]);
|
|
||||||
|
|
||||||
await waitForRemotePeer(waku2, [Protocols.Store]);
|
|
||||||
|
|
||||||
waku2.store.addDecryptionKey(symKey);
|
|
||||||
|
|
||||||
log("Retrieve messages from store");
|
|
||||||
let messages: WakuMessage[] = [];
|
let messages: WakuMessage[] = [];
|
||||||
await waku2.store.queryHistory(
|
await waku.store.queryOrderedCallback(
|
||||||
[],
|
[],
|
||||||
async (msgPromises) => {
|
async (msg) => {
|
||||||
const msgsOrUndefined = await Promise.all(msgPromises);
|
messages.push(msg);
|
||||||
const msgs = msgsOrUndefined.filter(isWakuMessageDefined);
|
|
||||||
messages = messages.concat(msgs);
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
decryptionParams: [{ key: privateKey }],
|
pageDirection: PageDirection.BACKWARD,
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
// Messages are ordered from oldest to latest within a page (1 page query)
|
messages = messages.reverse();
|
||||||
expect(messages[0]?.payloadAsUtf8).to.eq(encryptedAsymmetricMessageText);
|
|
||||||
expect(messages[1]?.payloadAsUtf8).to.eq(encryptedSymmetricMessageText);
|
|
||||||
expect(messages[2]?.payloadAsUtf8).to.eq(clearMessageText);
|
|
||||||
|
|
||||||
!!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e));
|
expect(messages?.length).eq(totalMsgs);
|
||||||
!!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e));
|
for (let index = 0; index < totalMsgs; index++) {
|
||||||
|
expect(
|
||||||
|
messages?.findIndex((msg) => {
|
||||||
|
return msg.payloadAsUtf8 === `Message ${index}`;
|
||||||
|
})
|
||||||
|
).to.eq(index);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
it("Retrieves history with asymmetric & symmetric encrypted messages on different content topics", async function () {
|
it("Generator, with asymmetric & symmetric encrypted messages", async function () {
|
||||||
this.timeout(15_000);
|
this.timeout(15_000);
|
||||||
|
|
||||||
nwaku = new Nwaku(makeLogFileName(this));
|
|
||||||
await nwaku.start({ persistMessages: true, store: true, lightpush: true });
|
|
||||||
|
|
||||||
const encryptedAsymmetricMessageText =
|
const encryptedAsymmetricMessageText =
|
||||||
"This message is encrypted for me using asymmetric";
|
"This message is encrypted for me using asymmetric";
|
||||||
const encryptedAsymmetricContentTopic = "/test/1/asymmetric/proto";
|
const encryptedAsymmetricContentTopic = "/test/1/asymmetric/proto";
|
||||||
@ -482,19 +370,19 @@ describe("Waku Store", () => {
|
|||||||
method: DecryptionMethod.Symmetric,
|
method: DecryptionMethod.Symmetric,
|
||||||
});
|
});
|
||||||
|
|
||||||
let messages: WakuMessage[] = [];
|
const messages: WakuMessage[] = [];
|
||||||
log("Retrieve messages from store");
|
log("Retrieve messages from store");
|
||||||
await waku2.store.queryHistory(
|
|
||||||
[],
|
for await (const msgPromises of waku2.store.queryGenerator([], {
|
||||||
async (msgPromises) => {
|
decryptionParams: [{ key: privateKey }],
|
||||||
const msgsOrUndefined = await Promise.all(msgPromises);
|
})) {
|
||||||
const msgs = msgsOrUndefined.filter(isWakuMessageDefined);
|
for (const promise of msgPromises) {
|
||||||
messages = messages.concat(msgs);
|
const msg = await promise;
|
||||||
},
|
if (msg) {
|
||||||
{
|
messages.push(msg);
|
||||||
decryptionParams: [{ key: privateKey }],
|
}
|
||||||
}
|
}
|
||||||
);
|
}
|
||||||
|
|
||||||
expect(messages?.length).eq(3);
|
expect(messages?.length).eq(3);
|
||||||
if (!messages) throw "Length was tested";
|
if (!messages) throw "Length was tested";
|
||||||
@ -503,15 +391,24 @@ describe("Waku Store", () => {
|
|||||||
expect(messages[1].payloadAsUtf8).to.eq(encryptedSymmetricMessageText);
|
expect(messages[1].payloadAsUtf8).to.eq(encryptedSymmetricMessageText);
|
||||||
expect(messages[2].payloadAsUtf8).to.eq(clearMessageText);
|
expect(messages[2].payloadAsUtf8).to.eq(clearMessageText);
|
||||||
|
|
||||||
|
for (const text of [
|
||||||
|
encryptedAsymmetricMessageText,
|
||||||
|
encryptedSymmetricMessageText,
|
||||||
|
clearMessageText,
|
||||||
|
]) {
|
||||||
|
expect(
|
||||||
|
messages?.findIndex((msg) => {
|
||||||
|
return msg.payloadAsUtf8 === text;
|
||||||
|
})
|
||||||
|
).to.not.eq(-1);
|
||||||
|
}
|
||||||
|
|
||||||
!!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e));
|
!!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e));
|
||||||
!!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e));
|
!!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e));
|
||||||
});
|
});
|
||||||
|
|
||||||
it("Retrieves history using start and end time", async function () {
|
it("Ordered callback, using start and end time", async function () {
|
||||||
this.timeout(15_000);
|
this.timeout(20000);
|
||||||
|
|
||||||
nwaku = new Nwaku(makeLogFileName(this));
|
|
||||||
await nwaku.start({ persistMessages: true, store: true });
|
|
||||||
|
|
||||||
const now = new Date();
|
const now = new Date();
|
||||||
|
|
||||||
@ -554,17 +451,12 @@ describe("Waku Store", () => {
|
|||||||
const nwakuPeerId = await nwaku.getPeerId();
|
const nwakuPeerId = await nwaku.getPeerId();
|
||||||
|
|
||||||
const firstMessages: WakuMessage[] = [];
|
const firstMessages: WakuMessage[] = [];
|
||||||
await waku.store.queryHistory(
|
await waku.store.queryOrderedCallback(
|
||||||
[],
|
[],
|
||||||
async (msgPromises) => {
|
(msg) => {
|
||||||
await Promise.all(
|
if (msg) {
|
||||||
msgPromises.map(async (promise) => {
|
firstMessages.push(msg);
|
||||||
const msg = await promise;
|
}
|
||||||
if (msg) {
|
|
||||||
firstMessages.push(msg);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
);
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
peerId: nwakuPeerId,
|
peerId: nwakuPeerId,
|
||||||
@ -573,17 +465,10 @@ describe("Waku Store", () => {
|
|||||||
);
|
);
|
||||||
|
|
||||||
const bothMessages: WakuMessage[] = [];
|
const bothMessages: WakuMessage[] = [];
|
||||||
await waku.store.queryHistory(
|
await waku.store.queryOrderedCallback(
|
||||||
[],
|
[],
|
||||||
async (msgPromises) => {
|
async (msg) => {
|
||||||
await Promise.all(
|
bothMessages.push(msg);
|
||||||
msgPromises.map(async (promise) => {
|
|
||||||
const msg = await promise;
|
|
||||||
if (msg) {
|
|
||||||
bothMessages.push(msg);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
);
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
peerId: nwakuPeerId,
|
peerId: nwakuPeerId,
|
||||||
@ -601,3 +486,69 @@ describe("Waku Store", () => {
|
|||||||
expect(bothMessages?.length).eq(2);
|
expect(bothMessages?.length).eq(2);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe("Waku Store, custom pubsub topic", () => {
|
||||||
|
const customPubSubTopic = "/waku/2/custom-dapp/proto";
|
||||||
|
let waku: WakuFull;
|
||||||
|
let nwaku: Nwaku;
|
||||||
|
|
||||||
|
beforeEach(async function () {
|
||||||
|
this.timeout(15_000);
|
||||||
|
nwaku = new Nwaku(makeLogFileName(this));
|
||||||
|
await nwaku.start({
|
||||||
|
persistMessages: true,
|
||||||
|
store: true,
|
||||||
|
topics: customPubSubTopic,
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async function () {
|
||||||
|
!!nwaku && nwaku.stop();
|
||||||
|
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Generator, custom pubsub topic", async function () {
|
||||||
|
this.timeout(15_000);
|
||||||
|
|
||||||
|
const totalMsgs = 20;
|
||||||
|
for (let i = 0; i < totalMsgs; i++) {
|
||||||
|
expect(
|
||||||
|
await nwaku.sendMessage(
|
||||||
|
Nwaku.toMessageRpcQuery({
|
||||||
|
payload: utf8ToBytes(`Message ${i}`),
|
||||||
|
contentTopic: TestContentTopic,
|
||||||
|
}),
|
||||||
|
customPubSubTopic
|
||||||
|
)
|
||||||
|
).to.be.true;
|
||||||
|
}
|
||||||
|
|
||||||
|
waku = await createFullNode({
|
||||||
|
staticNoiseKey: NOISE_KEY_1,
|
||||||
|
pubSubTopic: customPubSubTopic,
|
||||||
|
});
|
||||||
|
await waku.start();
|
||||||
|
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||||
|
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||||
|
|
||||||
|
const messages: WakuMessage[] = [];
|
||||||
|
let promises: Promise<void>[] = [];
|
||||||
|
for await (const msgPromises of waku.store.queryGenerator([])) {
|
||||||
|
const _promises = msgPromises.map(async (promise) => {
|
||||||
|
const msg = await promise;
|
||||||
|
if (msg) {
|
||||||
|
messages.push(msg);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
promises = promises.concat(_promises);
|
||||||
|
}
|
||||||
|
await Promise.all(promises);
|
||||||
|
|
||||||
|
expect(messages?.length).eq(totalMsgs);
|
||||||
|
const result = messages?.findIndex((msg) => {
|
||||||
|
return msg.payloadAsUtf8 === "Message 0";
|
||||||
|
});
|
||||||
|
expect(result).to.not.eq(-1);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
@ -108,22 +108,95 @@ export class WakuStore {
|
|||||||
/**
|
/**
|
||||||
* Do a query to a Waku Store to retrieve historical/missed messages.
|
* Do a query to a Waku Store to retrieve historical/missed messages.
|
||||||
*
|
*
|
||||||
* @param contentTopics The content topics to pass to the query, leave empty to
|
* The callback function takes a `WakuMessage` in input,
|
||||||
* retrieve all messages.
|
* messages are processed in order:
|
||||||
* @param callback called on a page of retrieved messages. If the callback returns `true`
|
* - oldest to latest if `options.pageDirection` == { @link PageDirection.FORWARD }
|
||||||
* then pagination is stopped.
|
* - latest to oldest if `options.pageDirection` == { @link PageDirection.BACKWARD }
|
||||||
* @param options Optional parameters.
|
*
|
||||||
|
* The ordering may affect performance.
|
||||||
*
|
*
|
||||||
* @throws If not able to reach a Waku Store peer to query
|
* @throws If not able to reach a Waku Store peer to query
|
||||||
* or if an error is encountered when processing the reply.
|
* or if an error is encountered when processing the reply.
|
||||||
*/
|
*/
|
||||||
async queryHistory(
|
async queryOrderedCallback(
|
||||||
contentTopics: string[],
|
contentTopics: string[],
|
||||||
callback: (
|
callback: (
|
||||||
messages: Array<Promise<WakuMessage | undefined>>
|
message: WakuMessage
|
||||||
) => Promise<void | boolean> | boolean | void,
|
) => Promise<void | boolean> | boolean | void,
|
||||||
options?: QueryOptions
|
options?: QueryOptions
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
|
const abort = false;
|
||||||
|
for await (const promises of this.queryGenerator(contentTopics, options)) {
|
||||||
|
if (abort) break;
|
||||||
|
let messages = await Promise.all(promises);
|
||||||
|
|
||||||
|
messages = messages.filter(isWakuMessageDefined);
|
||||||
|
|
||||||
|
// Messages in pages are ordered from oldest (first) to most recent (last).
|
||||||
|
// https://github.com/vacp2p/rfc/issues/533
|
||||||
|
if (
|
||||||
|
typeof options?.pageDirection === "undefined" ||
|
||||||
|
options?.pageDirection === PageDirection.BACKWARD
|
||||||
|
) {
|
||||||
|
messages = messages.reverse();
|
||||||
|
}
|
||||||
|
|
||||||
|
await Promise.all(
|
||||||
|
messages.map((msg) => {
|
||||||
|
if (!abort) {
|
||||||
|
if (msg) return callback(msg);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Do a query to a Waku Store to retrieve historical/missed messages.
|
||||||
|
*
|
||||||
|
* The callback function takes a `Promise<WakuMessage>` in input,
|
||||||
|
* useful if messages needs to be decrypted and performance matters.
|
||||||
|
* **Order of messages is not guaranteed**.
|
||||||
|
*
|
||||||
|
* @returns the promises of the callback call.
|
||||||
|
*
|
||||||
|
* @throws If not able to reach a Waku Store peer to query
|
||||||
|
* or if an error is encountered when processing the reply.
|
||||||
|
*/
|
||||||
|
async queryCallbackOnPromise(
|
||||||
|
contentTopics: string[],
|
||||||
|
callback: (
|
||||||
|
message: Promise<WakuMessage | undefined>
|
||||||
|
) => Promise<void | boolean> | boolean | void,
|
||||||
|
options?: QueryOptions
|
||||||
|
): Promise<Array<Promise<void>>> {
|
||||||
|
let abort = false;
|
||||||
|
let promises: Promise<void>[] = [];
|
||||||
|
for await (const page of this.queryGenerator(contentTopics, options)) {
|
||||||
|
const _promises = page.map(async (msg) => {
|
||||||
|
if (!abort) {
|
||||||
|
abort = Boolean(await callback(msg));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
promises = promises.concat(_promises);
|
||||||
|
}
|
||||||
|
return promises;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Do a query to a Waku Store to retrieve historical/missed messages.
|
||||||
|
*
|
||||||
|
* This is a generator, useful if you want most control on how messages
|
||||||
|
* are processed.
|
||||||
|
*
|
||||||
|
* @throws If not able to reach a Waku Store peer to query
|
||||||
|
* or if an error is encountered when processing the reply.
|
||||||
|
*/
|
||||||
|
async *queryGenerator(
|
||||||
|
contentTopics: string[],
|
||||||
|
options?: QueryOptions
|
||||||
|
): AsyncGenerator<Promise<WakuMessage | undefined>[]> {
|
||||||
let startTime, endTime;
|
let startTime, endTime;
|
||||||
|
|
||||||
if (options?.timeFilter) {
|
if (options?.timeFilter) {
|
||||||
@ -178,23 +251,19 @@ export class WakuStore {
|
|||||||
decryptionParams = decryptionParams.concat(options.decryptionParams);
|
decryptionParams = decryptionParams.concat(options.decryptionParams);
|
||||||
}
|
}
|
||||||
|
|
||||||
for await (const messagePromises of paginate(
|
for await (const messages of paginate(
|
||||||
connection,
|
connection,
|
||||||
protocol,
|
protocol,
|
||||||
queryOpts,
|
queryOpts,
|
||||||
decryptionParams
|
decryptionParams
|
||||||
)) {
|
)) {
|
||||||
const abort = Boolean(await callback(messagePromises));
|
yield messages;
|
||||||
if (abort) {
|
|
||||||
// TODO: Also abort underlying generator
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register a decryption key to attempt decryption of messages received in any
|
* Register a decryption key to attempt decryption of messages received in any
|
||||||
* subsequent { @link queryHistory } call. This can either be a private key for
|
* subsequent query call. This can either be a private key for
|
||||||
* asymmetric encryption or a symmetric key. { @link WakuStore } will attempt to
|
* asymmetric encryption or a symmetric key. { @link WakuStore } will attempt to
|
||||||
* decrypt messages using both methods.
|
* decrypt messages using both methods.
|
||||||
*
|
*
|
||||||
@ -209,7 +278,7 @@ export class WakuStore {
|
|||||||
|
|
||||||
/**cursorV2Beta4
|
/**cursorV2Beta4
|
||||||
* Delete a decryption key that was used to attempt decryption of messages
|
* Delete a decryption key that was used to attempt decryption of messages
|
||||||
* received in subsequent { @link queryHistory } calls.
|
* received in subsequent query calls.
|
||||||
*
|
*
|
||||||
* Strings must be in hex format.
|
* Strings must be in hex format.
|
||||||
*/
|
*/
|
||||||
@ -280,7 +349,7 @@ async function* paginate(
|
|||||||
throw "History response contains an Error: " + response.error;
|
throw "History response contains an Error: " + response.error;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!response.messages) {
|
if (!response.messages || !response.messages.length) {
|
||||||
log(
|
log(
|
||||||
"Stopping pagination due to store `response.messages` field missing or empty"
|
"Stopping pagination due to store `response.messages` field missing or empty"
|
||||||
);
|
);
|
||||||
@ -315,3 +384,9 @@ async function* paginate(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export const isWakuMessageDefined = (
|
||||||
|
msg: WakuMessage | undefined
|
||||||
|
): msg is WakuMessage => {
|
||||||
|
return !!msg;
|
||||||
|
};
|
||||||
|
Loading…
x
Reference in New Issue
Block a user