functionality works! test wip

This commit is contained in:
danisharora099 2022-11-15 05:17:24 +05:30
parent b8a44a6060
commit 0169a0ccb1
No known key found for this signature in database
GPG Key ID: FBD2BF500037F135
8 changed files with 221 additions and 103 deletions

12
package-lock.json generated
View File

@ -8628,6 +8628,11 @@
"integrity": "sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw==", "integrity": "sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw==",
"dev": true "dev": true
}, },
"node_modules/fast-sha256": {
"version": "1.3.0",
"resolved": "https://registry.npmjs.org/fast-sha256/-/fast-sha256-1.3.0.tgz",
"integrity": "sha512-n11RGP/lrWEFI/bWdygLxhI+pVeo1ZYIVwvvPkW7azl/rOy+F3HYRZ2K5zeE9mmkhQppyv9sQFx0JM9UabnpPQ=="
},
"node_modules/fastq": { "node_modules/fastq": {
"version": "1.13.0", "version": "1.13.0",
"resolved": "https://registry.npmjs.org/fastq/-/fastq-1.13.0.tgz", "resolved": "https://registry.npmjs.org/fastq/-/fastq-1.13.0.tgz",
@ -21910,6 +21915,7 @@
"@waku/byte-utils": "*", "@waku/byte-utils": "*",
"@waku/interfaces": "*", "@waku/interfaces": "*",
"debug": "^4.3.4", "debug": "^4.3.4",
"fast-sha256": "^1.3.0",
"it-all": "^1.0.6", "it-all": "^1.0.6",
"it-length-prefixed": "^8.0.2", "it-length-prefixed": "^8.0.2",
"it-pipe": "^2.0.4", "it-pipe": "^2.0.4",
@ -26287,6 +26293,7 @@
"eslint-plugin-import": "^2.25.3", "eslint-plugin-import": "^2.25.3",
"eslint-plugin-prettier": "^4.0.0", "eslint-plugin-prettier": "^4.0.0",
"fast-check": "^2.14.0", "fast-check": "^2.14.0",
"fast-sha256": "*",
"gh-pages": "^3.2.3", "gh-pages": "^3.2.3",
"ignore-loader": "^0.1.2", "ignore-loader": "^0.1.2",
"isomorphic-fetch": "^3.0.0", "isomorphic-fetch": "^3.0.0",
@ -29381,6 +29388,11 @@
"integrity": "sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw==", "integrity": "sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw==",
"dev": true "dev": true
}, },
"fast-sha256": {
"version": "1.3.0",
"resolved": "https://registry.npmjs.org/fast-sha256/-/fast-sha256-1.3.0.tgz",
"integrity": "sha512-n11RGP/lrWEFI/bWdygLxhI+pVeo1ZYIVwvvPkW7azl/rOy+F3HYRZ2K5zeE9mmkhQppyv9sQFx0JM9UabnpPQ=="
},
"fastq": { "fastq": {
"version": "1.13.0", "version": "1.13.0",
"resolved": "https://registry.npmjs.org/fastq/-/fastq-1.13.0.tgz", "resolved": "https://registry.npmjs.org/fastq/-/fastq-1.13.0.tgz",

View File

@ -86,7 +86,6 @@
"node": ">=16" "node": ">=16"
}, },
"dependencies": { "dependencies": {
"@waku/byte-utils": "*",
"@chainsafe/libp2p-gossipsub": "^4.1.1", "@chainsafe/libp2p-gossipsub": "^4.1.1",
"@libp2p/interface-connection": "^3.0.3", "@libp2p/interface-connection": "^3.0.3",
"@libp2p/interface-peer-discovery": "^1.0.0", "@libp2p/interface-peer-discovery": "^1.0.0",
@ -97,8 +96,10 @@
"@libp2p/interfaces": "^3.0.2", "@libp2p/interfaces": "^3.0.2",
"@libp2p/peer-id": "^1.1.10", "@libp2p/peer-id": "^1.1.10",
"@multiformats/multiaddr": "^11.0.6", "@multiformats/multiaddr": "^11.0.6",
"@waku/byte-utils": "*",
"@waku/interfaces": "*", "@waku/interfaces": "*",
"debug": "^4.3.4", "debug": "^4.3.4",
"fast-sha256": "^1.3.0",
"it-all": "^1.0.6", "it-all": "^1.0.6",
"it-length-prefixed": "^8.0.2", "it-length-prefixed": "^8.0.2",
"it-pipe": "^2.0.4", "it-pipe": "^2.0.4",

View File

@ -20,4 +20,9 @@ export * as waku_relay from "./lib/waku_relay";
export { WakuRelay } from "./lib/waku_relay"; export { WakuRelay } from "./lib/waku_relay";
export * as waku_store from "./lib/waku_store"; export * as waku_store from "./lib/waku_store";
export { PageDirection, WakuStore, StoreCodec } from "./lib/waku_store"; export {
PageDirection,
WakuStore,
StoreCodec,
createCursor,
} from "./lib/waku_store";

View File

@ -1,8 +1,10 @@
import type { Connection } from "@libp2p/interface-connection"; import type { Connection } from "@libp2p/interface-connection";
import type { PeerId } from "@libp2p/interface-peer-id"; import type { PeerId } from "@libp2p/interface-peer-id";
import { Peer } from "@libp2p/interface-peer-store"; import { Peer } from "@libp2p/interface-peer-store";
import { utf8ToBytes } from "@waku/byte-utils";
import { DecodedMessage, Decoder } from "@waku/interfaces"; import { DecodedMessage, Decoder } from "@waku/interfaces";
import debug from "debug"; import debug from "debug";
import sha256 from "fast-sha256";
import all from "it-all"; import all from "it-all";
import * as lp from "it-length-prefixed"; import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe"; import { pipe } from "it-pipe";
@ -75,6 +77,10 @@ export interface QueryOptions {
* Retrieve messages with a timestamp within the provided values. * Retrieve messages with a timestamp within the provided values.
*/ */
timeFilter?: TimeFilter; timeFilter?: TimeFilter;
/**
* Cursor as an index to start a query from.
*/
cursor?: proto.Index;
} }
/** /**
@ -251,7 +257,8 @@ export class WakuStore {
connection, connection,
protocol, protocol,
queryOpts, queryOpts,
decodersAsMap decodersAsMap,
options?.cursor
)) { )) {
yield messages; yield messages;
} }
@ -270,7 +277,8 @@ async function* paginate<T extends DecodedMessage>(
connection: Connection, connection: Connection,
protocol: string, protocol: string,
queryOpts: Params, queryOpts: Params,
decoders: Map<string, Decoder<T>> decoders: Map<string, Decoder<T>>,
cursor?: proto.Index
): AsyncGenerator<Promise<T | undefined>[]> { ): AsyncGenerator<Promise<T | undefined>[]> {
if ( if (
queryOpts.contentTopics.toString() !== queryOpts.contentTopics.toString() !==
@ -281,7 +289,6 @@ async function* paginate<T extends DecodedMessage>(
); );
} }
let cursor = undefined;
while (true) { while (true) {
queryOpts = Object.assign(queryOpts, { cursor }); queryOpts = Object.assign(queryOpts, { cursor });
@ -370,3 +377,20 @@ async function* paginate<T extends DecodedMessage>(
export function isDefined<T>(msg: T | undefined): msg is T { export function isDefined<T>(msg: T | undefined): msg is T {
return !!msg; return !!msg;
} }
export async function createCursor(
message: string,
messageTimestamp: bigint,
contentTopic: string,
pubsubTopic: string = DefaultPubSubTopic
): Promise<proto.Index> {
const contentTopicBytes = utf8ToBytes(contentTopic);
const messageBytes = utf8ToBytes(message);
const digest = sha256(Buffer.concat([contentTopicBytes, messageBytes]));
return {
digest,
pubsubTopic,
senderTime: messageTimestamp,
};
}

View File

@ -16,6 +16,12 @@ export interface PointToPointProtocol {
libp2p: Libp2p; libp2p: Libp2p;
peers: () => Promise<Peer[]>; peers: () => Promise<Peer[]>;
} }
export interface Index {
digest?: Uint8Array;
receivedTime?: bigint;
senderTime?: bigint;
pubsubTopic?: string;
}
export type ProtocolOptions = { export type ProtocolOptions = {
pubSubTopic?: string; pubSubTopic?: string;
@ -73,6 +79,10 @@ export type StoreQueryOptions = {
* Retrieve messages with a timestamp within the provided values. * Retrieve messages with a timestamp within the provided values.
*/ */
timeFilter?: TimeFilter; timeFilter?: TimeFilter;
/**
* Cursor as an index to start a query from.
*/
cursor?: Index;
} & ProtocolOptions; } & ProtocolOptions;
export interface Store extends PointToPointProtocol { export interface Store extends PointToPointProtocol {

View File

@ -62,6 +62,8 @@ export class MessageV1 extends MessageV0 implements DecodedMessage {
} }
} }
export { sha256 } from "./crypto";
export class AsymEncoder implements Encoder { export class AsymEncoder implements Encoder {
constructor( constructor(
public contentTopic: string, public contentTopic: string,

View File

@ -1,5 +1,5 @@
import { bytesToUtf8, utf8ToBytes } from "@waku/byte-utils"; import { bytesToUtf8, utf8ToBytes } from "@waku/byte-utils";
import { PageDirection } from "@waku/core"; import { createCursor, PageDirection } from "@waku/core";
import { waitForRemotePeer } from "@waku/core/lib/wait_for_remote_peer"; import { waitForRemotePeer } from "@waku/core/lib/wait_for_remote_peer";
import { DecoderV0, EncoderV0 } from "@waku/core/lib/waku_message/version_0"; import { DecoderV0, EncoderV0 } from "@waku/core/lib/waku_message/version_0";
import { createFullNode } from "@waku/create"; import { createFullNode } from "@waku/create";
@ -40,8 +40,78 @@ describe("Waku Store", () => {
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
}); });
it("Generator", async function () { // it("Generator", async function () {
this.timeout(15_000); // this.timeout(1000_000);
// const totalMsgs = 20;
// for (let i = 0; i < totalMsgs; i++) {
// expect(
// await nwaku.sendMessage(
// Nwaku.toMessageRpcQuery({
// payload: utf8ToBytes(`Message ${i}`),
// contentTopic: TestContentTopic,
// })
// )
// ).to.be.true;
// }
// waku = await createFullNode({
// staticNoiseKey: NOISE_KEY_1,
// });
// await waku.start();
// await waku.dial(await nwaku.getMultiaddrWithId());
// await waitForRemotePeer(waku, [Protocols.Store]);
// const messages: Message[] = [];
// let promises: Promise<void>[] = [];
// for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) {
// const _promises = msgPromises.map(async (promise) => {
// const msg = await promise;
// if (msg) {
// messages.push(msg);
// }
// });
// promises = promises.concat(_promises);
// }
// await Promise.all(promises);
// expect(messages?.length).eq(totalMsgs);
// const result = messages?.findIndex((msg) => {
// return bytesToUtf8(msg.payload!) === "Message 0";
// });
// expect(result).to.not.eq(-1);
// });
// it("Generator, no message returned", async function () {
// this.timeout(15_000);
// waku = await createFullNode({
// staticNoiseKey: NOISE_KEY_1,
// });
// await waku.start();
// await waku.dial(await nwaku.getMultiaddrWithId());
// await waitForRemotePeer(waku, [Protocols.Store]);
// const messages: Message[] = [];
// let promises: Promise<void>[] = [];
// for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) {
// const _promises = msgPromises.map(async (promise) => {
// const msg = await promise;
// if (msg) {
// messages.push(msg);
// }
// });
// promises = promises.concat(_promises);
// }
// await Promise.all(promises);
// expect(messages?.length).eq(0);
// });
it("Passing a cursor", async function () {
this.timeout(4_000);
const totalMsgs = 20; const totalMsgs = 20;
for (let i = 0; i < totalMsgs; i++) { for (let i = 0; i < totalMsgs; i++) {
@ -63,51 +133,45 @@ describe("Waku Store", () => {
await waitForRemotePeer(waku, [Protocols.Store]); await waitForRemotePeer(waku, [Protocols.Store]);
const messages: Message[] = []; const messages: Message[] = [];
let promises: Promise<void>[] = [];
for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { const query = waku.store.queryGenerator([TestDecoder]);
const _promises = msgPromises.map(async (promise) => {
const msg = await promise; for await (const page of query) {
if (msg) { for await (const msg of page) {
messages.push(msg); messages.push(msg as Message);
} }
});
promises = promises.concat(_promises);
} }
await Promise.all(promises);
expect(messages?.length).eq(totalMsgs); const cursorIndex = 2;
const result = messages?.findIndex((msg) => { const cursorMessage = messages[cursorIndex];
return bytesToUtf8(msg.payload!) === "Message 0";
});
expect(result).to.not.eq(-1);
});
it("Generator, no message returned", async function () { const cursor = await createCursor(
this.timeout(15_000); bytesToUtf8(cursorMessage.payload!),
BigInt(cursorMessage.timestamp!.getTime()) * BigInt(1000000),
TestContentTopic
);
waku = await createFullNode({ const val = await waku.store
staticNoiseKey: NOISE_KEY_1, .queryGenerator([TestDecoder], { cursor })
}); .next();
await waku.start(); //realIndexOfTest = (cursor-pageSize+test+len)%len
await waku.dial(await nwaku.getMultiaddrWithId()); // the last message received on this page
await waitForRemotePeer(waku, [Protocols.Store]); const testMessage = await val.value[10 - 1];
const messages: Message[] = []; // for (const msg of val.value) {
let promises: Promise<void>[] = []; // const _msg = await msg;
for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { // console.log({
const _promises = msgPromises.map(async (promise) => { // msg: bytesToUtf8(_msg.payload!),
const msg = await promise; // });
if (msg) { // }
messages.push(msg); // console.log({
} // cursorMessage: bytesToUtf8(cursorMessage.payload!),
}); // testMessage: bytesToUtf8(testMessage.payload!),
// });
promises = promises.concat(_promises); expect(messages?.length).be.eq(totalMsgs);
}
await Promise.all(promises);
expect(messages?.length).eq(0); expect(testMessage).to.be.eq(messages[cursorIndex + 1]);
}); });
it("Callback on promise", async function () { it("Callback on promise", async function () {
@ -496,68 +560,68 @@ describe("Waku Store", () => {
}); });
}); });
describe("Waku Store, custom pubsub topic", () => { // describe("Waku Store, custom pubsub topic", () => {
const customPubSubTopic = "/waku/2/custom-dapp/proto"; // const customPubSubTopic = "/waku/2/custom-dapp/proto";
let waku: WakuFull; // let waku: WakuFull;
let nwaku: Nwaku; // let nwaku: Nwaku;
beforeEach(async function () { // beforeEach(async function () {
this.timeout(15_000); // this.timeout(15_000);
nwaku = new Nwaku(makeLogFileName(this)); // nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({ // await nwaku.start({
persistMessages: true, // persistMessages: true,
store: true, // store: true,
topics: customPubSubTopic, // topics: customPubSubTopic,
}); // });
}); // });
afterEach(async function () { // afterEach(async function () {
!!nwaku && nwaku.stop(); // !!nwaku && nwaku.stop();
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); // !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
}); // });
it("Generator, custom pubsub topic", async function () { // it("Generator, custom pubsub topic", async function () {
this.timeout(15_000); // this.timeout(15_000);
const totalMsgs = 20; // const totalMsgs = 20;
for (let i = 0; i < totalMsgs; i++) { // for (let i = 0; i < totalMsgs; i++) {
expect( // expect(
await nwaku.sendMessage( // await nwaku.sendMessage(
Nwaku.toMessageRpcQuery({ // Nwaku.toMessageRpcQuery({
payload: utf8ToBytes(`Message ${i}`), // payload: utf8ToBytes(`Message ${i}`),
contentTopic: TestContentTopic, // contentTopic: TestContentTopic,
}), // }),
customPubSubTopic // customPubSubTopic
) // )
).to.be.true; // ).to.be.true;
} // }
waku = await createFullNode({ // waku = await createFullNode({
staticNoiseKey: NOISE_KEY_1, // staticNoiseKey: NOISE_KEY_1,
pubSubTopic: customPubSubTopic, // pubSubTopic: customPubSubTopic,
}); // });
await waku.start(); // await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId()); // await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]); // await waitForRemotePeer(waku, [Protocols.Store]);
const messages: Message[] = []; // const messages: Message[] = [];
let promises: Promise<void>[] = []; // let promises: Promise<void>[] = [];
for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { // for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) {
const _promises = msgPromises.map(async (promise) => { // const _promises = msgPromises.map(async (promise) => {
const msg = await promise; // const msg = await promise;
if (msg) { // if (msg) {
messages.push(msg); // messages.push(msg);
} // }
}); // });
promises = promises.concat(_promises); // promises = promises.concat(_promises);
} // }
await Promise.all(promises); // await Promise.all(promises);
expect(messages?.length).eq(totalMsgs); // expect(messages?.length).eq(totalMsgs);
const result = messages?.findIndex((msg) => { // const result = messages?.findIndex((msg) => {
return bytesToUtf8(msg.payload!) === "Message 0"; // return bytesToUtf8(msg.payload!) === "Message 0";
}); // });
expect(result).to.not.eq(-1); // expect(result).to.not.eq(-1);
}); // });
}); // });

View File

@ -1,7 +1,7 @@
{ {
"extends": "./tsconfig", "extends": "./tsconfig",
"compilerOptions": { "compilerOptions": {
"module": "esnext", "module": "es2020",
"noEmit": true "noEmit": true
}, },
"include": ["src", "tests"] "include": ["src", "tests"]