mirror of
https://github.com/waku-org/js-waku.git
synced 2025-02-26 11:05:39 +00:00
Merge pull request #947 from waku-org/split-waku-store
This commit is contained in:
commit
8daa6d5c95
@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
### Changed
|
||||
|
||||
- Correct options type for `createFullNode` & `createPrivacy` to enable passing gossipsub options.
|
||||
- `WakuStore` now provides several APIs: `queryGenerator`, `queryCallbackOnPromise`, `queryOrderedCallback`;
|
||||
each provides different guarantees and performance.
|
||||
|
||||
## [0.27.0] - 2022-09-13
|
||||
|
||||
|
@ -1,13 +1,14 @@
|
||||
import { Peer } from "@libp2p/interface-peer-store";
|
||||
import { Libp2p } from "libp2p";
|
||||
import type { PeerId } from "@libp2p/interface-peer-id";
|
||||
import type { Peer, PeerStore } from "@libp2p/interface-peer-store";
|
||||
import debug from "debug";
|
||||
|
||||
const log = debug("waku:select-peer");
|
||||
|
||||
/**
|
||||
* Returns a pseudo-random peer that supports the given protocol.
|
||||
* Useful for protocols such as store and light push
|
||||
*/
|
||||
export async function selectRandomPeer(
|
||||
peers: Peer[]
|
||||
): Promise<Peer | undefined> {
|
||||
export function selectRandomPeer(peers: Peer[]): Peer | undefined {
|
||||
if (peers.length === 0) return;
|
||||
|
||||
const index = Math.round(Math.random() * (peers.length - 1));
|
||||
@ -18,11 +19,11 @@ export async function selectRandomPeer(
|
||||
* Returns the list of peers that supports the given protocol.
|
||||
*/
|
||||
export async function getPeersForProtocol(
|
||||
libp2p: Libp2p,
|
||||
peerStore: PeerStore,
|
||||
protocols: string[]
|
||||
): Promise<Peer[]> {
|
||||
const peers: Peer[] = [];
|
||||
await libp2p.peerStore.forEach((peer) => {
|
||||
await peerStore.forEach((peer) => {
|
||||
for (let i = 0; i < protocols.length; i++) {
|
||||
if (peer.protocols.includes(protocols[i])) {
|
||||
peers.push(peer);
|
||||
@ -32,3 +33,45 @@ export async function getPeersForProtocol(
|
||||
});
|
||||
return peers;
|
||||
}
|
||||
|
||||
export async function selectPeerForProtocol(
|
||||
peerStore: PeerStore,
|
||||
protocols: string[],
|
||||
peerId?: PeerId
|
||||
): Promise<{ peer: Peer; protocol: string } | undefined> {
|
||||
let peer;
|
||||
if (peerId) {
|
||||
peer = await peerStore.get(peerId);
|
||||
if (!peer) {
|
||||
log(
|
||||
`Failed to retrieve connection details for provided peer in peer store: ${peerId.toString()}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
const peers = await getPeersForProtocol(peerStore, protocols);
|
||||
peer = selectRandomPeer(peers);
|
||||
if (!peer) {
|
||||
log("Failed to find known peer that registers protocols", protocols);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
let protocol;
|
||||
for (const codec of protocols) {
|
||||
if (peer.protocols.includes(codec)) {
|
||||
protocol = codec;
|
||||
// Do not break as we want to keep the last value
|
||||
}
|
||||
}
|
||||
log(`Using codec ${protocol}`);
|
||||
if (!protocol) {
|
||||
log(
|
||||
`Peer does not register required protocols: ${peer.id.toString()}`,
|
||||
protocols
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
return { peer, protocol };
|
||||
}
|
||||
|
@ -11,7 +11,11 @@ import type { Libp2p } from "libp2p";
|
||||
import { WakuMessage as WakuMessageProto } from "../../proto/message";
|
||||
import { DefaultPubSubTopic } from "../constants";
|
||||
import { selectConnection } from "../select_connection";
|
||||
import { getPeersForProtocol, selectRandomPeer } from "../select_peer";
|
||||
import {
|
||||
getPeersForProtocol,
|
||||
selectPeerForProtocol,
|
||||
selectRandomPeer,
|
||||
} from "../select_peer";
|
||||
import { hexToBytes } from "../utils";
|
||||
import { DecryptionMethod, WakuMessage } from "../waku_message";
|
||||
|
||||
@ -228,23 +232,15 @@ export class WakuFilter {
|
||||
}
|
||||
|
||||
private async getPeer(peerId?: PeerId): Promise<Peer> {
|
||||
let peer;
|
||||
if (peerId) {
|
||||
peer = await this.libp2p.peerStore.get(peerId);
|
||||
if (!peer) {
|
||||
throw new Error(
|
||||
`Failed to retrieve connection details for provided peer in peer store: ${peerId.toString()}`
|
||||
const res = await selectPeerForProtocol(
|
||||
this.libp2p.peerStore,
|
||||
[FilterCodec],
|
||||
peerId
|
||||
);
|
||||
if (!res) {
|
||||
throw new Error(`Failed to select peer for ${FilterCodec}`);
|
||||
}
|
||||
} else {
|
||||
peer = await this.randomPeer();
|
||||
if (!peer) {
|
||||
throw new Error(
|
||||
"Failed to find known peer that registers waku filter protocol"
|
||||
);
|
||||
}
|
||||
}
|
||||
return peer;
|
||||
return res.peer;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -272,7 +268,7 @@ export class WakuFilter {
|
||||
}
|
||||
|
||||
async peers(): Promise<Peer[]> {
|
||||
return getPeersForProtocol(this.libp2p, [FilterCodec]);
|
||||
return getPeersForProtocol(this.libp2p.peerStore, [FilterCodec]);
|
||||
}
|
||||
|
||||
async randomPeer(): Promise<Peer | undefined> {
|
||||
|
@ -10,7 +10,11 @@ import { Uint8ArrayList } from "uint8arraylist";
|
||||
import { PushResponse } from "../../proto/light_push";
|
||||
import { DefaultPubSubTopic } from "../constants";
|
||||
import { selectConnection } from "../select_connection";
|
||||
import { getPeersForProtocol, selectRandomPeer } from "../select_peer";
|
||||
import {
|
||||
getPeersForProtocol,
|
||||
selectPeerForProtocol,
|
||||
selectRandomPeer,
|
||||
} from "../select_peer";
|
||||
import { WakuMessage } from "../waku_message";
|
||||
|
||||
import { PushRPC } from "./push_rpc";
|
||||
@ -51,16 +55,16 @@ export class WakuLightPush {
|
||||
message: WakuMessage,
|
||||
opts?: PushOptions
|
||||
): Promise<PushResponse | null> {
|
||||
let peer;
|
||||
if (opts?.peerId) {
|
||||
peer = await this.libp2p.peerStore.get(opts.peerId);
|
||||
if (!peer) throw "Peer is unknown";
|
||||
} else {
|
||||
peer = await this.randomPeer();
|
||||
const res = await selectPeerForProtocol(
|
||||
this.libp2p.peerStore,
|
||||
[LightPushCodec],
|
||||
opts?.peerId
|
||||
);
|
||||
|
||||
if (!res) {
|
||||
throw new Error("Failed to get a peer");
|
||||
}
|
||||
if (!peer) throw "No peer available";
|
||||
if (!peer.protocols.includes(LightPushCodec))
|
||||
throw "Peer does not register waku light push protocol";
|
||||
const { peer } = res;
|
||||
|
||||
const connections = this.libp2p.connectionManager.getConnections(peer.id);
|
||||
const connection = selectConnection(connections);
|
||||
@ -109,7 +113,7 @@ export class WakuLightPush {
|
||||
* peers.
|
||||
*/
|
||||
async peers(): Promise<Peer[]> {
|
||||
return getPeersForProtocol(this.libp2p, [LightPushCodec]);
|
||||
return getPeersForProtocol(this.libp2p.peerStore, [LightPushCodec]);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -29,18 +29,22 @@ describe("Waku Store", () => {
|
||||
let waku: WakuFull;
|
||||
let nwaku: Nwaku;
|
||||
|
||||
beforeEach(async function () {
|
||||
this.timeout(15_000);
|
||||
nwaku = new Nwaku(makeLogFileName(this));
|
||||
await nwaku.start({ persistMessages: true, store: true, lightpush: true });
|
||||
});
|
||||
|
||||
afterEach(async function () {
|
||||
!!nwaku && nwaku.stop();
|
||||
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
|
||||
});
|
||||
|
||||
it("Retrieves history", async function () {
|
||||
it("Generator", async function () {
|
||||
this.timeout(15_000);
|
||||
const totalMsgs = 20;
|
||||
|
||||
nwaku = new Nwaku(makeLogFileName(this));
|
||||
await nwaku.start({ persistMessages: true, store: true });
|
||||
|
||||
for (let i = 0; i < 2; i++) {
|
||||
for (let i = 0; i < totalMsgs; i++) {
|
||||
expect(
|
||||
await nwaku.sendMessage(
|
||||
Nwaku.toMessageRpcQuery({
|
||||
@ -57,20 +61,95 @@ describe("Waku Store", () => {
|
||||
await waku.start();
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||
const messages = await waku.store.queryHistory([]);
|
||||
|
||||
expect(messages?.length).eq(2);
|
||||
const messages: WakuMessage[] = [];
|
||||
let promises: Promise<void>[] = [];
|
||||
for await (const msgPromises of waku.store.queryGenerator([])) {
|
||||
const _promises = msgPromises.map(async (promise) => {
|
||||
const msg = await promise;
|
||||
if (msg) {
|
||||
messages.push(msg);
|
||||
}
|
||||
});
|
||||
|
||||
promises = promises.concat(_promises);
|
||||
}
|
||||
await Promise.all(promises);
|
||||
|
||||
expect(messages?.length).eq(totalMsgs);
|
||||
const result = messages?.findIndex((msg) => {
|
||||
return msg.payloadAsUtf8 === "Message 0";
|
||||
});
|
||||
expect(result).to.not.eq(-1);
|
||||
});
|
||||
|
||||
it("Retrieves history using callback", async function () {
|
||||
it("Generator, no message returned", async function () {
|
||||
this.timeout(15_000);
|
||||
|
||||
nwaku = new Nwaku(makeLogFileName(this));
|
||||
await nwaku.start({ persistMessages: true, store: true });
|
||||
waku = await createFullNode({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
});
|
||||
await waku.start();
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||
|
||||
const messages: WakuMessage[] = [];
|
||||
let promises: Promise<void>[] = [];
|
||||
for await (const msgPromises of waku.store.queryGenerator([])) {
|
||||
const _promises = msgPromises.map(async (promise) => {
|
||||
const msg = await promise;
|
||||
if (msg) {
|
||||
messages.push(msg);
|
||||
}
|
||||
});
|
||||
|
||||
promises = promises.concat(_promises);
|
||||
}
|
||||
await Promise.all(promises);
|
||||
|
||||
expect(messages?.length).eq(0);
|
||||
});
|
||||
|
||||
it("Callback on promise", async function () {
|
||||
this.timeout(15_000);
|
||||
|
||||
const totalMsgs = 15;
|
||||
|
||||
for (let i = 0; i < totalMsgs; i++) {
|
||||
expect(
|
||||
await nwaku.sendMessage(
|
||||
Nwaku.toMessageRpcQuery({
|
||||
payload: utf8ToBytes(`Message ${i}`),
|
||||
contentTopic: TestContentTopic,
|
||||
})
|
||||
)
|
||||
).to.be.true;
|
||||
}
|
||||
|
||||
waku = await createFullNode({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
});
|
||||
await waku.start();
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||
|
||||
const messages: WakuMessage[] = [];
|
||||
await waku.store.queryCallbackOnPromise([], async (msgPromise) => {
|
||||
const msg = await msgPromise;
|
||||
if (msg) {
|
||||
messages.push(msg);
|
||||
}
|
||||
});
|
||||
|
||||
expect(messages?.length).eq(totalMsgs);
|
||||
const result = messages?.findIndex((msg) => {
|
||||
return msg.payloadAsUtf8 === "Message 0";
|
||||
});
|
||||
expect(result).to.not.eq(-1);
|
||||
});
|
||||
|
||||
it("Callback on promise, aborts when callback returns true", async function () {
|
||||
this.timeout(15_000);
|
||||
|
||||
const totalMsgs = 20;
|
||||
|
||||
@ -92,68 +171,28 @@ describe("Waku Store", () => {
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||
|
||||
let messages: WakuMessage[] = [];
|
||||
|
||||
await waku.store.queryHistory([], {
|
||||
callback: (_msgs) => {
|
||||
messages = messages.concat(_msgs);
|
||||
},
|
||||
});
|
||||
|
||||
expect(messages?.length).eq(totalMsgs);
|
||||
const result = messages?.findIndex((msg) => {
|
||||
return msg.payloadAsUtf8 === "Message 0";
|
||||
});
|
||||
expect(result).to.not.eq(-1);
|
||||
});
|
||||
|
||||
it("Retrieval aborts when callback returns true", async function () {
|
||||
this.timeout(15_000);
|
||||
|
||||
nwaku = new Nwaku(makeLogFileName(this));
|
||||
await nwaku.start({ persistMessages: true, store: true });
|
||||
|
||||
const availMsgs = 20;
|
||||
|
||||
for (let i = 0; i < availMsgs; i++) {
|
||||
expect(
|
||||
await nwaku.sendMessage(
|
||||
Nwaku.toMessageRpcQuery({
|
||||
payload: utf8ToBytes(`Message ${i}`),
|
||||
contentTopic: TestContentTopic,
|
||||
})
|
||||
)
|
||||
).to.be.true;
|
||||
}
|
||||
|
||||
waku = await createFullNode({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
});
|
||||
await waku.start();
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||
|
||||
let messages: WakuMessage[] = [];
|
||||
const desiredMsgs = 14;
|
||||
|
||||
await waku.store.queryHistory([], {
|
||||
pageSize: 7,
|
||||
callback: (_msgs) => {
|
||||
messages = messages.concat(_msgs);
|
||||
const messages: WakuMessage[] = [];
|
||||
await waku.store.queryCallbackOnPromise(
|
||||
[],
|
||||
async (msgPromise) => {
|
||||
const msg = await msgPromise;
|
||||
if (msg) {
|
||||
messages.push(msg);
|
||||
}
|
||||
return messages.length >= desiredMsgs;
|
||||
},
|
||||
});
|
||||
{ pageSize: 7 }
|
||||
);
|
||||
|
||||
expect(messages?.length).eq(desiredMsgs);
|
||||
});
|
||||
|
||||
it("Retrieves all historical elements in chronological order through paging", async function () {
|
||||
it("Ordered Callback - Forward", async function () {
|
||||
this.timeout(15_000);
|
||||
|
||||
nwaku = new Nwaku(makeLogFileName(this));
|
||||
await nwaku.start({ persistMessages: true, store: true });
|
||||
|
||||
for (let i = 0; i < 15; i++) {
|
||||
const totalMsgs = 18;
|
||||
for (let i = 0; i < totalMsgs; i++) {
|
||||
expect(
|
||||
await nwaku.sendMessage(
|
||||
Nwaku.toMessageRpcQuery({
|
||||
@ -171,12 +210,19 @@ describe("Waku Store", () => {
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||
|
||||
const messages = await waku.store.queryHistory([], {
|
||||
const messages: WakuMessage[] = [];
|
||||
await waku.store.queryOrderedCallback(
|
||||
[],
|
||||
async (msg) => {
|
||||
messages.push(msg);
|
||||
},
|
||||
{
|
||||
pageDirection: PageDirection.FORWARD,
|
||||
});
|
||||
}
|
||||
);
|
||||
|
||||
expect(messages?.length).eq(15);
|
||||
for (let index = 0; index < 2; index++) {
|
||||
expect(messages?.length).eq(totalMsgs);
|
||||
for (let index = 0; index < totalMsgs; index++) {
|
||||
expect(
|
||||
messages?.findIndex((msg) => {
|
||||
return msg.payloadAsUtf8 === `Message ${index}`;
|
||||
@ -185,147 +231,54 @@ describe("Waku Store", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("Retrieves history using custom pubsub topic", async function () {
|
||||
it("Ordered Callback - Backward", async function () {
|
||||
this.timeout(15_000);
|
||||
|
||||
const customPubSubTopic = "/waku/2/custom-dapp/proto";
|
||||
nwaku = new Nwaku(makeLogFileName(this));
|
||||
await nwaku.start({
|
||||
persistMessages: true,
|
||||
store: true,
|
||||
topics: customPubSubTopic,
|
||||
});
|
||||
|
||||
for (let i = 0; i < 2; i++) {
|
||||
const totalMsgs = 18;
|
||||
for (let i = 0; i < totalMsgs; i++) {
|
||||
expect(
|
||||
await nwaku.sendMessage(
|
||||
Nwaku.toMessageRpcQuery({
|
||||
payload: utf8ToBytes(`Message ${i}`),
|
||||
contentTopic: TestContentTopic,
|
||||
}),
|
||||
customPubSubTopic
|
||||
})
|
||||
)
|
||||
).to.be.true;
|
||||
}
|
||||
|
||||
waku = await createFullNode({
|
||||
pubSubTopic: customPubSubTopic,
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
});
|
||||
await waku.start();
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||
|
||||
const nimPeerId = await nwaku.getPeerId();
|
||||
|
||||
const messages = await waku.store.queryHistory([], {
|
||||
peerId: nimPeerId,
|
||||
});
|
||||
|
||||
expect(messages?.length).eq(2);
|
||||
const result = messages?.findIndex((msg) => {
|
||||
return msg.payloadAsUtf8 === "Message 0";
|
||||
});
|
||||
expect(result).to.not.eq(-1);
|
||||
});
|
||||
|
||||
it("Retrieves history with asymmetric & symmetric encrypted messages", async function () {
|
||||
this.timeout(15_000);
|
||||
|
||||
nwaku = new Nwaku(makeLogFileName(this));
|
||||
await nwaku.start({ persistMessages: true, store: true, lightpush: true });
|
||||
|
||||
const encryptedAsymmetricMessageText = "asymmetric encryption";
|
||||
const encryptedSymmetricMessageText = "symmetric encryption";
|
||||
const clearMessageText =
|
||||
"This is a clear text message for everyone to read";
|
||||
const otherEncMessageText =
|
||||
"This message is not for and I must not be able to read it";
|
||||
|
||||
const privateKey = generatePrivateKey();
|
||||
const symKey = generateSymmetricKey();
|
||||
const publicKey = getPublicKey(privateKey);
|
||||
|
||||
const [
|
||||
encryptedAsymmetricMessage,
|
||||
encryptedSymmetricMessage,
|
||||
clearMessage,
|
||||
otherEncMessage,
|
||||
] = await Promise.all([
|
||||
WakuMessage.fromUtf8String(
|
||||
encryptedAsymmetricMessageText,
|
||||
TestContentTopic,
|
||||
let messages: WakuMessage[] = [];
|
||||
await waku.store.queryOrderedCallback(
|
||||
[],
|
||||
async (msg) => {
|
||||
messages.push(msg);
|
||||
},
|
||||
{
|
||||
encPublicKey: publicKey,
|
||||
pageDirection: PageDirection.BACKWARD,
|
||||
}
|
||||
),
|
||||
WakuMessage.fromUtf8String(
|
||||
encryptedSymmetricMessageText,
|
||||
TestContentTopic,
|
||||
{
|
||||
symKey: symKey,
|
||||
);
|
||||
|
||||
messages = messages.reverse();
|
||||
|
||||
expect(messages?.length).eq(totalMsgs);
|
||||
for (let index = 0; index < totalMsgs; index++) {
|
||||
expect(
|
||||
messages?.findIndex((msg) => {
|
||||
return msg.payloadAsUtf8 === `Message ${index}`;
|
||||
})
|
||||
).to.eq(index);
|
||||
}
|
||||
),
|
||||
WakuMessage.fromUtf8String(clearMessageText, TestContentTopic),
|
||||
WakuMessage.fromUtf8String(otherEncMessageText, TestContentTopic, {
|
||||
encPublicKey: getPublicKey(generatePrivateKey()),
|
||||
}),
|
||||
]);
|
||||
|
||||
log("Messages have been encrypted");
|
||||
|
||||
const [waku1, waku2, nimWakuMultiaddr] = await Promise.all([
|
||||
createFullNode({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
}).then((waku) => waku.start().then(() => waku)),
|
||||
createFullNode({
|
||||
staticNoiseKey: NOISE_KEY_2,
|
||||
}).then((waku) => waku.start().then(() => waku)),
|
||||
nwaku.getMultiaddrWithId(),
|
||||
]);
|
||||
|
||||
log("Waku nodes created");
|
||||
|
||||
await Promise.all([
|
||||
waku1.dial(nimWakuMultiaddr),
|
||||
waku2.dial(nimWakuMultiaddr),
|
||||
]);
|
||||
|
||||
log("Waku nodes connected to nwaku");
|
||||
|
||||
await waitForRemotePeer(waku1, [Protocols.LightPush]);
|
||||
|
||||
log("Sending messages using light push");
|
||||
await Promise.all([
|
||||
waku1.lightPush.push(encryptedAsymmetricMessage),
|
||||
waku1.lightPush.push(encryptedSymmetricMessage),
|
||||
waku1.lightPush.push(otherEncMessage),
|
||||
waku1.lightPush.push(clearMessage),
|
||||
]);
|
||||
|
||||
await waitForRemotePeer(waku2, [Protocols.Store]);
|
||||
|
||||
waku2.store.addDecryptionKey(symKey);
|
||||
|
||||
log("Retrieve messages from store");
|
||||
const messages = await waku2.store.queryHistory([], {
|
||||
decryptionParams: [{ key: privateKey }],
|
||||
});
|
||||
|
||||
expect(messages[0]?.payloadAsUtf8).to.eq(clearMessageText);
|
||||
expect(messages[1]?.payloadAsUtf8).to.eq(encryptedSymmetricMessageText);
|
||||
expect(messages[2]?.payloadAsUtf8).to.eq(encryptedAsymmetricMessageText);
|
||||
|
||||
!!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e));
|
||||
!!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e));
|
||||
});
|
||||
|
||||
it("Retrieves history with asymmetric & symmetric encrypted messages on different content topics", async function () {
|
||||
it("Generator, with asymmetric & symmetric encrypted messages", async function () {
|
||||
this.timeout(15_000);
|
||||
|
||||
nwaku = new Nwaku(makeLogFileName(this));
|
||||
await nwaku.start({ persistMessages: true, store: true, lightpush: true });
|
||||
|
||||
const encryptedAsymmetricMessageText =
|
||||
"This message is encrypted for me using asymmetric";
|
||||
const encryptedAsymmetricContentTopic = "/test/1/asymmetric/proto";
|
||||
@ -341,6 +294,7 @@ describe("Waku Store", () => {
|
||||
const symKey = generateSymmetricKey();
|
||||
const publicKey = getPublicKey(privateKey);
|
||||
|
||||
const timestamp = new Date();
|
||||
const [
|
||||
encryptedAsymmetricMessage,
|
||||
encryptedSymmetricMessage,
|
||||
@ -352,6 +306,7 @@ describe("Waku Store", () => {
|
||||
encryptedAsymmetricContentTopic,
|
||||
{
|
||||
encPublicKey: publicKey,
|
||||
timestamp,
|
||||
}
|
||||
),
|
||||
WakuMessage.fromUtf8String(
|
||||
@ -359,17 +314,20 @@ describe("Waku Store", () => {
|
||||
encryptedSymmetricContentTopic,
|
||||
{
|
||||
symKey: symKey,
|
||||
timestamp: new Date(timestamp.valueOf() + 1),
|
||||
}
|
||||
),
|
||||
WakuMessage.fromUtf8String(
|
||||
clearMessageText,
|
||||
encryptedAsymmetricContentTopic
|
||||
encryptedAsymmetricContentTopic,
|
||||
{ timestamp: new Date(timestamp.valueOf() + 2) }
|
||||
),
|
||||
WakuMessage.fromUtf8String(
|
||||
otherEncMessageText,
|
||||
encryptedSymmetricContentTopic,
|
||||
{
|
||||
encPublicKey: getPublicKey(generatePrivateKey()),
|
||||
timestamp: new Date(timestamp.valueOf() + 3),
|
||||
}
|
||||
),
|
||||
]);
|
||||
@ -412,26 +370,45 @@ describe("Waku Store", () => {
|
||||
method: DecryptionMethod.Symmetric,
|
||||
});
|
||||
|
||||
const messages: WakuMessage[] = [];
|
||||
log("Retrieve messages from store");
|
||||
const messages = await waku2.store.queryHistory([], {
|
||||
|
||||
for await (const msgPromises of waku2.store.queryGenerator([], {
|
||||
decryptionParams: [{ key: privateKey }],
|
||||
});
|
||||
})) {
|
||||
for (const promise of msgPromises) {
|
||||
const msg = await promise;
|
||||
if (msg) {
|
||||
messages.push(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
expect(messages?.length).eq(3);
|
||||
if (!messages) throw "Length was tested";
|
||||
expect(messages[0].payloadAsUtf8).to.eq(clearMessageText);
|
||||
// Messages are ordered from oldest to latest within a page (1 page query)
|
||||
expect(messages[0].payloadAsUtf8).to.eq(encryptedAsymmetricMessageText);
|
||||
expect(messages[1].payloadAsUtf8).to.eq(encryptedSymmetricMessageText);
|
||||
expect(messages[2].payloadAsUtf8).to.eq(encryptedAsymmetricMessageText);
|
||||
expect(messages[2].payloadAsUtf8).to.eq(clearMessageText);
|
||||
|
||||
for (const text of [
|
||||
encryptedAsymmetricMessageText,
|
||||
encryptedSymmetricMessageText,
|
||||
clearMessageText,
|
||||
]) {
|
||||
expect(
|
||||
messages?.findIndex((msg) => {
|
||||
return msg.payloadAsUtf8 === text;
|
||||
})
|
||||
).to.not.eq(-1);
|
||||
}
|
||||
|
||||
!!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e));
|
||||
!!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e));
|
||||
});
|
||||
|
||||
it("Retrieves history using start and end time", async function () {
|
||||
this.timeout(15_000);
|
||||
|
||||
nwaku = new Nwaku(makeLogFileName(this));
|
||||
await nwaku.start({ persistMessages: true, store: true });
|
||||
it("Ordered callback, using start and end time", async function () {
|
||||
this.timeout(20000);
|
||||
|
||||
const now = new Date();
|
||||
|
||||
@ -473,23 +450,105 @@ describe("Waku Store", () => {
|
||||
|
||||
const nwakuPeerId = await nwaku.getPeerId();
|
||||
|
||||
const firstMessage = await waku.store.queryHistory([], {
|
||||
const firstMessages: WakuMessage[] = [];
|
||||
await waku.store.queryOrderedCallback(
|
||||
[],
|
||||
(msg) => {
|
||||
if (msg) {
|
||||
firstMessages.push(msg);
|
||||
}
|
||||
},
|
||||
{
|
||||
peerId: nwakuPeerId,
|
||||
timeFilter: { startTime, endTime: message1Timestamp },
|
||||
});
|
||||
}
|
||||
);
|
||||
|
||||
const bothMessages = await waku.store.queryHistory([], {
|
||||
const bothMessages: WakuMessage[] = [];
|
||||
await waku.store.queryOrderedCallback(
|
||||
[],
|
||||
async (msg) => {
|
||||
bothMessages.push(msg);
|
||||
},
|
||||
{
|
||||
peerId: nwakuPeerId,
|
||||
timeFilter: {
|
||||
startTime,
|
||||
endTime,
|
||||
},
|
||||
});
|
||||
}
|
||||
);
|
||||
|
||||
expect(firstMessage?.length).eq(1);
|
||||
expect(firstMessages?.length).eq(1);
|
||||
|
||||
expect(firstMessage[0]?.payloadAsUtf8).eq("Message 0");
|
||||
expect(firstMessages[0]?.payloadAsUtf8).eq("Message 0");
|
||||
|
||||
expect(bothMessages?.length).eq(2);
|
||||
});
|
||||
});
|
||||
|
||||
describe("Waku Store, custom pubsub topic", () => {
|
||||
const customPubSubTopic = "/waku/2/custom-dapp/proto";
|
||||
let waku: WakuFull;
|
||||
let nwaku: Nwaku;
|
||||
|
||||
beforeEach(async function () {
|
||||
this.timeout(15_000);
|
||||
nwaku = new Nwaku(makeLogFileName(this));
|
||||
await nwaku.start({
|
||||
persistMessages: true,
|
||||
store: true,
|
||||
topics: customPubSubTopic,
|
||||
});
|
||||
});
|
||||
|
||||
afterEach(async function () {
|
||||
!!nwaku && nwaku.stop();
|
||||
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
|
||||
});
|
||||
|
||||
it("Generator, custom pubsub topic", async function () {
|
||||
this.timeout(15_000);
|
||||
|
||||
const totalMsgs = 20;
|
||||
for (let i = 0; i < totalMsgs; i++) {
|
||||
expect(
|
||||
await nwaku.sendMessage(
|
||||
Nwaku.toMessageRpcQuery({
|
||||
payload: utf8ToBytes(`Message ${i}`),
|
||||
contentTopic: TestContentTopic,
|
||||
}),
|
||||
customPubSubTopic
|
||||
)
|
||||
).to.be.true;
|
||||
}
|
||||
|
||||
waku = await createFullNode({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
pubSubTopic: customPubSubTopic,
|
||||
});
|
||||
await waku.start();
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||
|
||||
const messages: WakuMessage[] = [];
|
||||
let promises: Promise<void>[] = [];
|
||||
for await (const msgPromises of waku.store.queryGenerator([])) {
|
||||
const _promises = msgPromises.map(async (promise) => {
|
||||
const msg = await promise;
|
||||
if (msg) {
|
||||
messages.push(msg);
|
||||
}
|
||||
});
|
||||
|
||||
promises = promises.concat(_promises);
|
||||
}
|
||||
await Promise.all(promises);
|
||||
|
||||
expect(messages?.length).eq(totalMsgs);
|
||||
const result = messages?.findIndex((msg) => {
|
||||
return msg.payloadAsUtf8 === "Message 0";
|
||||
});
|
||||
expect(result).to.not.eq(-1);
|
||||
});
|
||||
});
|
||||
|
@ -1,3 +1,4 @@
|
||||
import type { Connection } from "@libp2p/interface-connection";
|
||||
import type { PeerId } from "@libp2p/interface-peer-id";
|
||||
import { Peer } from "@libp2p/interface-peer-store";
|
||||
import debug from "debug";
|
||||
@ -11,7 +12,7 @@ import * as protoV2Beta4 from "../../proto/store_v2beta4";
|
||||
import { HistoryResponse } from "../../proto/store_v2beta4";
|
||||
import { DefaultPubSubTopic, StoreCodecs } from "../constants";
|
||||
import { selectConnection } from "../select_connection";
|
||||
import { getPeersForProtocol, selectRandomPeer } from "../select_peer";
|
||||
import { getPeersForProtocol, selectPeerForProtocol } from "../select_peer";
|
||||
import { hexToBytes } from "../utils";
|
||||
import {
|
||||
DecryptionMethod,
|
||||
@ -19,9 +20,9 @@ import {
|
||||
WakuMessage,
|
||||
} from "../waku_message";
|
||||
|
||||
import { HistoryRPC, PageDirection } from "./history_rpc";
|
||||
import { HistoryRPC, PageDirection, Params } from "./history_rpc";
|
||||
|
||||
import Error = HistoryResponse.HistoryError;
|
||||
import HistoryError = HistoryResponse.HistoryError;
|
||||
|
||||
const log = debug("waku:store");
|
||||
|
||||
@ -77,18 +78,6 @@ export interface QueryOptions {
|
||||
* Retrieve messages with a timestamp within the provided values.
|
||||
*/
|
||||
timeFilter?: TimeFilter;
|
||||
/**
|
||||
* Callback called on pages of stored messages as they are retrieved.
|
||||
*
|
||||
* Allows for a faster access to the results as it is called as soon as a page
|
||||
* is received. Traversal of the pages is done automatically so this function
|
||||
* will invoked for each retrieved page.
|
||||
*
|
||||
* If the call on a page returns `true`, then traversal of the pages is aborted.
|
||||
* For example, this can be used for the caller to stop the query after a
|
||||
* specific message is found.
|
||||
*/
|
||||
callback?: (messages: WakuMessage[]) => void | boolean;
|
||||
/**
|
||||
* Keys that will be used to decrypt messages.
|
||||
*
|
||||
@ -119,17 +108,95 @@ export class WakuStore {
|
||||
/**
|
||||
* Do a query to a Waku Store to retrieve historical/missed messages.
|
||||
*
|
||||
* @param contentTopics The content topics to pass to the query, leave empty to
|
||||
* retrieve all messages.
|
||||
* @param options Optional parameters.
|
||||
* The callback function takes a `WakuMessage` in input,
|
||||
* messages are processed in order:
|
||||
* - oldest to latest if `options.pageDirection` == { @link PageDirection.FORWARD }
|
||||
* - latest to oldest if `options.pageDirection` == { @link PageDirection.BACKWARD }
|
||||
*
|
||||
* The ordering may affect performance.
|
||||
*
|
||||
* @throws If not able to reach a Waku Store peer to query
|
||||
* or if an error is encountered when processing the reply.
|
||||
*/
|
||||
async queryHistory(
|
||||
async queryOrderedCallback(
|
||||
contentTopics: string[],
|
||||
callback: (
|
||||
message: WakuMessage
|
||||
) => Promise<void | boolean> | boolean | void,
|
||||
options?: QueryOptions
|
||||
): Promise<void> {
|
||||
const abort = false;
|
||||
for await (const promises of this.queryGenerator(contentTopics, options)) {
|
||||
if (abort) break;
|
||||
let messages = await Promise.all(promises);
|
||||
|
||||
messages = messages.filter(isWakuMessageDefined);
|
||||
|
||||
// Messages in pages are ordered from oldest (first) to most recent (last).
|
||||
// https://github.com/vacp2p/rfc/issues/533
|
||||
if (
|
||||
typeof options?.pageDirection === "undefined" ||
|
||||
options?.pageDirection === PageDirection.BACKWARD
|
||||
) {
|
||||
messages = messages.reverse();
|
||||
}
|
||||
|
||||
await Promise.all(
|
||||
messages.map((msg) => {
|
||||
if (!abort) {
|
||||
if (msg) return callback(msg);
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Do a query to a Waku Store to retrieve historical/missed messages.
|
||||
*
|
||||
* The callback function takes a `Promise<WakuMessage>` in input,
|
||||
* useful if messages needs to be decrypted and performance matters.
|
||||
* **Order of messages is not guaranteed**.
|
||||
*
|
||||
* @returns the promises of the callback call.
|
||||
*
|
||||
* @throws If not able to reach a Waku Store peer to query
|
||||
* or if an error is encountered when processing the reply.
|
||||
*/
|
||||
async queryCallbackOnPromise(
|
||||
contentTopics: string[],
|
||||
callback: (
|
||||
message: Promise<WakuMessage | undefined>
|
||||
) => Promise<void | boolean> | boolean | void,
|
||||
options?: QueryOptions
|
||||
): Promise<Array<Promise<void>>> {
|
||||
let abort = false;
|
||||
let promises: Promise<void>[] = [];
|
||||
for await (const page of this.queryGenerator(contentTopics, options)) {
|
||||
const _promises = page.map(async (msg) => {
|
||||
if (!abort) {
|
||||
abort = Boolean(await callback(msg));
|
||||
}
|
||||
});
|
||||
|
||||
promises = promises.concat(_promises);
|
||||
}
|
||||
return promises;
|
||||
}
|
||||
|
||||
/**
|
||||
* Do a query to a Waku Store to retrieve historical/missed messages.
|
||||
*
|
||||
* This is a generator, useful if you want most control on how messages
|
||||
* are processed.
|
||||
*
|
||||
* @throws If not able to reach a Waku Store peer to query
|
||||
* or if an error is encountered when processing the reply.
|
||||
*/
|
||||
async *queryGenerator(
|
||||
contentTopics: string[],
|
||||
options?: QueryOptions
|
||||
): Promise<WakuMessage[]> {
|
||||
): AsyncGenerator<Promise<WakuMessage | undefined>[]> {
|
||||
let startTime, endTime;
|
||||
|
||||
if (options?.timeFilter) {
|
||||
@ -137,7 +204,7 @@ export class WakuStore {
|
||||
endTime = options.timeFilter.endTime;
|
||||
}
|
||||
|
||||
const opts = Object.assign(
|
||||
const queryOpts = Object.assign(
|
||||
{
|
||||
pubSubTopic: this.pubSubTopic,
|
||||
pageDirection: PageDirection.BACKWARD,
|
||||
@ -152,29 +219,17 @@ export class WakuStore {
|
||||
...options,
|
||||
});
|
||||
|
||||
let peer;
|
||||
if (opts.peerId) {
|
||||
peer = await this.libp2p.peerStore.get(opts.peerId);
|
||||
if (!peer)
|
||||
throw `Failed to retrieve connection details for provided peer in peer store: ${opts.peerId.toString()}`;
|
||||
} else {
|
||||
peer = await this.randomPeer();
|
||||
if (!peer)
|
||||
throw "Failed to find known peer that registers waku store protocol";
|
||||
}
|
||||
const res = await selectPeerForProtocol(
|
||||
this.libp2p.peerStore,
|
||||
Object.values(StoreCodecs),
|
||||
options?.peerId
|
||||
);
|
||||
|
||||
let storeCodec = "";
|
||||
for (const codec of Object.values(StoreCodecs)) {
|
||||
if (peer.protocols.includes(codec)) {
|
||||
storeCodec = codec;
|
||||
// Do not break as we want to keep the last value
|
||||
if (!res) {
|
||||
throw new Error("Failed to get a peer");
|
||||
}
|
||||
}
|
||||
log(`Use store codec ${storeCodec}`);
|
||||
if (!storeCodec)
|
||||
throw `Peer does not register waku store protocol: ${peer.id.toString()}`;
|
||||
const { peer, protocol } = res;
|
||||
|
||||
Object.assign(opts, { storeCodec });
|
||||
const connections = this.libp2p.connectionManager.getConnections(peer.id);
|
||||
const connection = selectConnection(connections);
|
||||
|
||||
@ -192,94 +247,23 @@ export class WakuStore {
|
||||
|
||||
// Add the decryption keys passed to this function against the
|
||||
// content topics also passed to this function.
|
||||
if (opts.decryptionParams) {
|
||||
decryptionParams = decryptionParams.concat(opts.decryptionParams);
|
||||
if (options?.decryptionParams) {
|
||||
decryptionParams = decryptionParams.concat(options.decryptionParams);
|
||||
}
|
||||
|
||||
const messages: WakuMessage[] = [];
|
||||
let cursor = undefined;
|
||||
while (true) {
|
||||
const stream = await connection.newStream(storeCodec);
|
||||
const queryOpts = Object.assign(opts, { cursor });
|
||||
const historyRpcQuery = HistoryRPC.createQuery(queryOpts);
|
||||
log("Querying store peer", connections[0].remoteAddr.toString());
|
||||
|
||||
const res = await pipe(
|
||||
[historyRpcQuery.encode()],
|
||||
lp.encode(),
|
||||
stream,
|
||||
lp.decode(),
|
||||
async (source) => await all(source)
|
||||
);
|
||||
const bytes = new Uint8ArrayList();
|
||||
res.forEach((chunk) => {
|
||||
bytes.append(chunk);
|
||||
});
|
||||
|
||||
const reply = historyRpcQuery.decode(bytes);
|
||||
|
||||
if (!reply.response) {
|
||||
log("No message returned from store: `response` field missing");
|
||||
return messages;
|
||||
}
|
||||
|
||||
const response = reply.response as protoV2Beta4.HistoryResponse;
|
||||
|
||||
if (response.error && response.error !== Error.ERROR_NONE_UNSPECIFIED) {
|
||||
throw "History response contains an Error: " + response.error;
|
||||
}
|
||||
|
||||
if (!response.messages || !response.messages.length) {
|
||||
// No messages left (or stored)
|
||||
log("No message returned from store: `messages` array empty");
|
||||
return messages;
|
||||
}
|
||||
|
||||
log(
|
||||
`${response.messages.length} messages retrieved for (${opts.pubSubTopic})`,
|
||||
contentTopics
|
||||
);
|
||||
|
||||
const pageMessages: WakuMessage[] = [];
|
||||
await Promise.all(
|
||||
response.messages.map(async (protoMsg) => {
|
||||
const msg = await WakuMessage.decodeProto(protoMsg, decryptionParams);
|
||||
|
||||
if (msg) {
|
||||
messages.push(msg);
|
||||
pageMessages.push(msg);
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
let abort = false;
|
||||
if (opts.callback) {
|
||||
abort = Boolean(opts.callback(pageMessages));
|
||||
}
|
||||
|
||||
const responsePageSize = response.pagingInfo?.pageSize;
|
||||
const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize;
|
||||
if (
|
||||
abort ||
|
||||
// Response page size smaller than query, meaning this is the last page
|
||||
(responsePageSize && queryPageSize && responsePageSize < queryPageSize)
|
||||
) {
|
||||
return messages;
|
||||
}
|
||||
|
||||
cursor = response.pagingInfo?.cursor;
|
||||
if (cursor === undefined) {
|
||||
// If the server does not return cursor then there is an issue,
|
||||
// Need to abort, or we end up in an infinite loop
|
||||
log("Store response does not contain a cursor, stopping pagination");
|
||||
return messages;
|
||||
}
|
||||
for await (const messages of paginate(
|
||||
connection,
|
||||
protocol,
|
||||
queryOpts,
|
||||
decryptionParams
|
||||
)) {
|
||||
yield messages;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a decryption key to attempt decryption of messages received in any
|
||||
* subsequent { @link queryHistory } call. This can either be a private key for
|
||||
* subsequent query call. This can either be a private key for
|
||||
* asymmetric encryption or a symmetric key. { @link WakuStore } will attempt to
|
||||
* decrypt messages using both methods.
|
||||
*
|
||||
@ -294,7 +278,7 @@ export class WakuStore {
|
||||
|
||||
/**cursorV2Beta4
|
||||
* Delete a decryption key that was used to attempt decryption of messages
|
||||
* received in subsequent { @link queryHistory } calls.
|
||||
* received in subsequent query calls.
|
||||
*
|
||||
* Strings must be in hex format.
|
||||
*/
|
||||
@ -312,15 +296,97 @@ export class WakuStore {
|
||||
codecs.push(codec);
|
||||
}
|
||||
|
||||
return getPeersForProtocol(this.libp2p, codecs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a random peer that supports store protocol from the address
|
||||
* book (`libp2p.peerStore`). Waku may or may not be currently connected to
|
||||
* this peer.
|
||||
*/
|
||||
async randomPeer(): Promise<Peer | undefined> {
|
||||
return selectRandomPeer(await this.peers());
|
||||
return getPeersForProtocol(this.libp2p.peerStore, codecs);
|
||||
}
|
||||
}
|
||||
|
||||
async function* paginate(
|
||||
connection: Connection,
|
||||
protocol: string,
|
||||
queryOpts: Params,
|
||||
decryptionParams: DecryptionParams[]
|
||||
): AsyncGenerator<Promise<WakuMessage | undefined>[]> {
|
||||
let cursor = undefined;
|
||||
while (true) {
|
||||
queryOpts = Object.assign(queryOpts, { cursor });
|
||||
|
||||
const stream = await connection.newStream(protocol);
|
||||
const historyRpcQuery = HistoryRPC.createQuery(queryOpts);
|
||||
|
||||
log(
|
||||
"Querying store peer",
|
||||
connection.remoteAddr.toString(),
|
||||
`for (${queryOpts.pubSubTopic})`,
|
||||
queryOpts.contentTopics
|
||||
);
|
||||
|
||||
const res = await pipe(
|
||||
[historyRpcQuery.encode()],
|
||||
lp.encode(),
|
||||
stream,
|
||||
lp.decode(),
|
||||
async (source) => await all(source)
|
||||
);
|
||||
|
||||
const bytes = new Uint8ArrayList();
|
||||
res.forEach((chunk) => {
|
||||
bytes.append(chunk);
|
||||
});
|
||||
|
||||
const reply = historyRpcQuery.decode(bytes);
|
||||
|
||||
if (!reply.response) {
|
||||
log("Stopping pagination due to store `response` field missing");
|
||||
break;
|
||||
}
|
||||
|
||||
const response = reply.response as protoV2Beta4.HistoryResponse;
|
||||
|
||||
if (
|
||||
response.error &&
|
||||
response.error !== HistoryError.ERROR_NONE_UNSPECIFIED
|
||||
) {
|
||||
throw "History response contains an Error: " + response.error;
|
||||
}
|
||||
|
||||
if (!response.messages || !response.messages.length) {
|
||||
log(
|
||||
"Stopping pagination due to store `response.messages` field missing or empty"
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
log(`${response.messages.length} messages retrieved from store`);
|
||||
|
||||
yield response.messages.map((protoMsg) =>
|
||||
WakuMessage.decodeProto(protoMsg, decryptionParams)
|
||||
);
|
||||
|
||||
cursor = response.pagingInfo?.cursor;
|
||||
if (typeof cursor === "undefined") {
|
||||
// If the server does not return cursor then there is an issue,
|
||||
// Need to abort, or we end up in an infinite loop
|
||||
log(
|
||||
"Stopping pagination due to `response.pagingInfo.cursor` missing from store response"
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
const responsePageSize = response.pagingInfo?.pageSize;
|
||||
const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize;
|
||||
if (
|
||||
// Response page size smaller than query, meaning this is the last page
|
||||
responsePageSize &&
|
||||
queryPageSize &&
|
||||
responsePageSize < queryPageSize
|
||||
) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const isWakuMessageDefined = (
|
||||
msg: WakuMessage | undefined
|
||||
): msg is WakuMessage => {
|
||||
return !!msg;
|
||||
};
|
||||
|
Loading…
x
Reference in New Issue
Block a user