2025-02-05 11:21:22 +01:00
|
|
|
import type { PeerId } from "@libp2p/interface";
|
2025-06-03 11:08:02 +02:00
|
|
|
import { ConnectionManager, messageHash, StoreCore } from "@waku/core";
|
2024-04-01 16:47:47 +05:30
|
|
|
import {
|
|
|
|
|
IDecodedMessage,
|
|
|
|
|
IDecoder,
|
2024-10-04 13:53:54 +02:00
|
|
|
IStore,
|
2024-08-06 12:06:37 +05:30
|
|
|
Libp2p,
|
|
|
|
|
QueryRequestParams,
|
2025-01-28 17:57:49 +05:30
|
|
|
StoreCursor,
|
|
|
|
|
StoreProtocolOptions
|
2024-04-01 16:47:47 +05:30
|
|
|
} from "@waku/interfaces";
|
|
|
|
|
import { ensurePubsubTopicIsConfigured, isDefined, Logger } from "@waku/utils";
|
|
|
|
|
|
2025-01-31 00:16:00 +01:00
|
|
|
import { PeerManager } from "../peer_manager/index.js";
|
2024-04-01 16:47:47 +05:30
|
|
|
|
2024-08-06 12:06:37 +05:30
|
|
|
const log = new Logger("waku:store:sdk");
|
2024-04-01 16:47:47 +05:30
|
|
|
|
2025-02-25 18:41:32 +01:00
|
|
|
type StoreConstructorParams = {
|
|
|
|
|
connectionManager: ConnectionManager;
|
|
|
|
|
libp2p: Libp2p;
|
|
|
|
|
peerManager: PeerManager;
|
|
|
|
|
options?: Partial<StoreProtocolOptions>;
|
|
|
|
|
};
|
|
|
|
|
|
2024-08-06 12:06:37 +05:30
|
|
|
/**
|
|
|
|
|
* StoreSDK is an implementation of the IStoreSDK interface.
|
|
|
|
|
* It provides methods to interact with the Waku Store protocol.
|
|
|
|
|
*/
|
2025-01-31 00:16:00 +01:00
|
|
|
export class Store implements IStore {
|
2025-06-20 12:53:42 +02:00
|
|
|
private readonly options: Partial<StoreProtocolOptions>;
|
|
|
|
|
private readonly peerManager: PeerManager;
|
|
|
|
|
private readonly connectionManager: ConnectionManager;
|
|
|
|
|
private readonly protocol: StoreCore;
|
2024-04-01 16:47:47 +05:30
|
|
|
|
2025-02-25 18:41:32 +01:00
|
|
|
public constructor(params: StoreConstructorParams) {
|
|
|
|
|
this.options = params.options || {};
|
|
|
|
|
this.peerManager = params.peerManager;
|
|
|
|
|
this.connectionManager = params.connectionManager;
|
|
|
|
|
|
|
|
|
|
this.protocol = new StoreCore(
|
|
|
|
|
params.connectionManager.pubsubTopics,
|
|
|
|
|
params.libp2p
|
|
|
|
|
);
|
2024-04-01 16:47:47 +05:30
|
|
|
}
|
|
|
|
|
|
2025-06-20 12:53:42 +02:00
|
|
|
public get multicodec(): string {
|
|
|
|
|
return this.protocol.multicodec;
|
|
|
|
|
}
|
|
|
|
|
|
2024-04-01 16:47:47 +05:30
|
|
|
/**
|
2024-08-06 12:06:37 +05:30
|
|
|
* Queries the Waku Store for historical messages using the provided decoders and options.
|
|
|
|
|
* Returns an asynchronous generator that yields promises of decoded messages.
|
2024-04-01 16:47:47 +05:30
|
|
|
*
|
2024-08-06 12:06:37 +05:30
|
|
|
* @param decoders - An array of message decoders.
|
|
|
|
|
* @param options - Optional query parameters.
|
|
|
|
|
* @returns An asynchronous generator of promises of decoded messages.
|
|
|
|
|
* @throws If no peers are available to query or if an error occurs during the query.
|
2024-04-01 16:47:47 +05:30
|
|
|
*/
|
2024-07-19 15:58:17 +05:30
|
|
|
public async *queryGenerator<T extends IDecodedMessage>(
|
2024-04-01 16:47:47 +05:30
|
|
|
decoders: IDecoder<T>[],
|
2024-08-06 12:06:37 +05:30
|
|
|
options?: Partial<QueryRequestParams>
|
2024-04-01 16:47:47 +05:30
|
|
|
): AsyncGenerator<Promise<T | undefined>[]> {
|
2025-05-30 11:44:03 -07:00
|
|
|
// For message hash queries, don't validate decoders but still need decodersAsMap
|
|
|
|
|
const isHashQuery =
|
|
|
|
|
options?.messageHashes && options.messageHashes.length > 0;
|
2024-04-01 16:47:47 +05:30
|
|
|
|
2025-05-30 11:44:03 -07:00
|
|
|
let pubsubTopic: string;
|
|
|
|
|
let contentTopics: string[];
|
|
|
|
|
let decodersAsMap: Map<string, IDecoder<T>>;
|
|
|
|
|
|
|
|
|
|
if (isHashQuery) {
|
|
|
|
|
// For hash queries, we still need decoders to decode messages
|
|
|
|
|
// but we don't validate pubsubTopic consistency
|
|
|
|
|
// Use pubsubTopic from options if provided, otherwise from first decoder
|
|
|
|
|
pubsubTopic = options.pubsubTopic || decoders[0]?.pubsubTopic || "";
|
|
|
|
|
contentTopics = [];
|
|
|
|
|
decodersAsMap = new Map();
|
|
|
|
|
decoders.forEach((dec) => {
|
|
|
|
|
decodersAsMap.set(dec.contentTopic, dec);
|
|
|
|
|
});
|
|
|
|
|
} else {
|
|
|
|
|
const validated = this.validateDecodersAndPubsubTopic(decoders);
|
|
|
|
|
pubsubTopic = validated.pubsubTopic;
|
|
|
|
|
contentTopics = validated.contentTopics;
|
|
|
|
|
decodersAsMap = validated.decodersAsMap;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const queryOpts: QueryRequestParams = {
|
2024-04-01 16:47:47 +05:30
|
|
|
pubsubTopic,
|
|
|
|
|
contentTopics,
|
2024-08-06 12:06:37 +05:30
|
|
|
includeData: true,
|
|
|
|
|
paginationForward: true,
|
|
|
|
|
...options
|
|
|
|
|
};
|
2024-04-01 16:47:47 +05:30
|
|
|
|
2025-01-28 17:57:49 +05:30
|
|
|
const peer = await this.getPeerToUse();
|
|
|
|
|
|
2024-08-06 12:06:37 +05:30
|
|
|
if (!peer) {
|
|
|
|
|
log.error("No peers available to query");
|
|
|
|
|
throw new Error("No peers available to query");
|
|
|
|
|
}
|
2024-04-01 16:47:47 +05:30
|
|
|
|
2024-08-06 12:06:37 +05:30
|
|
|
log.info(`Querying store with options: ${JSON.stringify(options)}`);
|
2024-04-01 16:47:47 +05:30
|
|
|
const responseGenerator = this.protocol.queryPerPage(
|
|
|
|
|
queryOpts,
|
|
|
|
|
decodersAsMap,
|
|
|
|
|
peer
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
for await (const messages of responseGenerator) {
|
|
|
|
|
yield messages;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2024-08-06 12:06:37 +05:30
|
|
|
* Queries the Waku Store for historical messages and processes them with the provided callback in order.
|
2024-04-01 16:47:47 +05:30
|
|
|
*
|
2024-08-06 12:06:37 +05:30
|
|
|
* @param decoders - An array of message decoders.
|
|
|
|
|
* @param callback - A callback function to process each decoded message.
|
|
|
|
|
* @param options - Optional query parameters.
|
|
|
|
|
* @returns A promise that resolves when the query and message processing are completed.
|
2024-04-01 16:47:47 +05:30
|
|
|
*/
|
2024-07-19 15:58:17 +05:30
|
|
|
public async queryWithOrderedCallback<T extends IDecodedMessage>(
|
2024-04-01 16:47:47 +05:30
|
|
|
decoders: IDecoder<T>[],
|
|
|
|
|
callback: (message: T) => Promise<void | boolean> | boolean | void,
|
2024-08-06 12:06:37 +05:30
|
|
|
options?: Partial<QueryRequestParams>
|
2024-04-01 16:47:47 +05:30
|
|
|
): Promise<void> {
|
2024-08-06 12:06:37 +05:30
|
|
|
log.info("Querying store with ordered callback");
|
2024-04-01 16:47:47 +05:30
|
|
|
for await (const promises of this.queryGenerator(decoders, options)) {
|
2024-08-06 12:06:37 +05:30
|
|
|
if (await this.processMessages(promises, callback)) break;
|
2024-04-01 16:47:47 +05:30
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2024-08-06 12:06:37 +05:30
|
|
|
* Queries the Waku Store for historical messages and processes them with the provided callback using promises.
|
2024-04-01 16:47:47 +05:30
|
|
|
*
|
2024-08-06 12:06:37 +05:30
|
|
|
* @param decoders - An array of message decoders.
|
|
|
|
|
* @param callback - A callback function to process each promise of a decoded message.
|
|
|
|
|
* @param options - Optional query parameters.
|
|
|
|
|
* @returns A promise that resolves when the query and message processing are completed.
|
2024-04-01 16:47:47 +05:30
|
|
|
*/
|
2024-07-19 15:58:17 +05:30
|
|
|
public async queryWithPromiseCallback<T extends IDecodedMessage>(
|
2024-04-01 16:47:47 +05:30
|
|
|
decoders: IDecoder<T>[],
|
|
|
|
|
callback: (
|
|
|
|
|
message: Promise<T | undefined>
|
|
|
|
|
) => Promise<void | boolean> | boolean | void,
|
2024-08-06 12:06:37 +05:30
|
|
|
options?: Partial<QueryRequestParams>
|
2024-04-01 16:47:47 +05:30
|
|
|
): Promise<void> {
|
2024-08-06 12:06:37 +05:30
|
|
|
log.info("Querying store with promise callback");
|
2024-04-01 16:47:47 +05:30
|
|
|
let abort = false;
|
|
|
|
|
for await (const page of this.queryGenerator(decoders, options)) {
|
|
|
|
|
const _promises = page.map(async (msgPromise) => {
|
|
|
|
|
if (abort) return;
|
|
|
|
|
abort = Boolean(await callback(msgPromise));
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
await Promise.all(_promises);
|
|
|
|
|
if (abort) break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2024-08-06 12:06:37 +05:30
|
|
|
/**
|
|
|
|
|
* Processes messages based on the provided callback and options.
|
|
|
|
|
*
|
|
|
|
|
* @param messages - An array of promises of decoded messages.
|
|
|
|
|
* @param callback - A callback function to process each decoded message.
|
|
|
|
|
* @returns A promise that resolves to a boolean indicating whether the processing should abort.
|
|
|
|
|
* @private
|
|
|
|
|
*/
|
|
|
|
|
private async processMessages<T extends IDecodedMessage>(
|
|
|
|
|
messages: Promise<T | undefined>[],
|
|
|
|
|
callback: (message: T) => Promise<void | boolean> | boolean | void
|
|
|
|
|
): Promise<boolean> {
|
|
|
|
|
let abort = false;
|
|
|
|
|
const messagesOrUndef: Array<T | undefined> = await Promise.all(messages);
|
|
|
|
|
const processedMessages: Array<T> = messagesOrUndef.filter(isDefined);
|
2024-04-01 16:47:47 +05:30
|
|
|
|
2024-08-06 12:06:37 +05:30
|
|
|
await Promise.all(
|
|
|
|
|
processedMessages.map(async (msg) => {
|
|
|
|
|
if (msg && !abort) {
|
|
|
|
|
abort = Boolean(await callback(msg));
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
);
|
2024-04-01 16:47:47 +05:30
|
|
|
|
2024-08-06 12:06:37 +05:30
|
|
|
return abort;
|
|
|
|
|
}
|
2024-04-01 16:47:47 +05:30
|
|
|
|
2024-08-06 12:06:37 +05:30
|
|
|
/**
|
|
|
|
|
* Creates a cursor based on the provided decoded message.
|
|
|
|
|
*
|
|
|
|
|
* @param message - The decoded message.
|
|
|
|
|
* @returns A StoreCursor representing the message.
|
|
|
|
|
*/
|
|
|
|
|
public createCursor(message: IDecodedMessage): StoreCursor {
|
|
|
|
|
return messageHash(message.pubsubTopic, message);
|
2024-04-01 16:47:47 +05:30
|
|
|
}
|
|
|
|
|
|
2024-08-06 12:06:37 +05:30
|
|
|
/**
|
|
|
|
|
* Validates the provided decoders and pubsub topic.
|
|
|
|
|
*
|
|
|
|
|
* @param decoders - An array of message decoders.
|
|
|
|
|
* @returns An object containing the pubsub topic, content topics, and a map of decoders.
|
|
|
|
|
* @throws If no decoders are provided, if multiple pubsub topics are provided, or if no decoders are found for the pubsub topic.
|
|
|
|
|
* @private
|
|
|
|
|
*/
|
2024-04-01 16:47:47 +05:30
|
|
|
private validateDecodersAndPubsubTopic<T extends IDecodedMessage>(
|
2024-08-06 12:06:37 +05:30
|
|
|
decoders: IDecoder<T>[]
|
2024-04-01 16:47:47 +05:30
|
|
|
): {
|
|
|
|
|
pubsubTopic: string;
|
|
|
|
|
contentTopics: string[];
|
|
|
|
|
decodersAsMap: Map<string, IDecoder<T>>;
|
|
|
|
|
} {
|
|
|
|
|
if (decoders.length === 0) {
|
2024-08-06 12:06:37 +05:30
|
|
|
log.error("No decoders provided");
|
2024-04-01 16:47:47 +05:30
|
|
|
throw new Error("No decoders provided");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const uniquePubsubTopicsInQuery = Array.from(
|
|
|
|
|
new Set(decoders.map((decoder) => decoder.pubsubTopic))
|
|
|
|
|
);
|
|
|
|
|
if (uniquePubsubTopicsInQuery.length > 1) {
|
2024-08-06 12:06:37 +05:30
|
|
|
log.error("API does not support querying multiple pubsub topics at once");
|
2024-04-01 16:47:47 +05:30
|
|
|
throw new Error(
|
|
|
|
|
"API does not support querying multiple pubsub topics at once"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const pubsubTopicForQuery = uniquePubsubTopicsInQuery[0];
|
|
|
|
|
|
|
|
|
|
ensurePubsubTopicIsConfigured(
|
|
|
|
|
pubsubTopicForQuery,
|
|
|
|
|
this.protocol.pubsubTopics
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
const decodersAsMap = new Map();
|
|
|
|
|
decoders.forEach((dec) => {
|
|
|
|
|
if (decodersAsMap.has(dec.contentTopic)) {
|
2024-08-06 12:06:37 +05:30
|
|
|
log.error("API does not support different decoder per content topic");
|
2024-04-01 16:47:47 +05:30
|
|
|
throw new Error(
|
|
|
|
|
"API does not support different decoder per content topic"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
decodersAsMap.set(dec.contentTopic, dec);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
const contentTopics = decoders
|
|
|
|
|
.filter((decoder) => decoder.pubsubTopic === pubsubTopicForQuery)
|
|
|
|
|
.map((dec) => dec.contentTopic);
|
|
|
|
|
|
|
|
|
|
if (contentTopics.length === 0) {
|
2024-08-06 12:06:37 +05:30
|
|
|
log.error(`No decoders found for topic ${pubsubTopicForQuery}`);
|
2024-04-01 16:47:47 +05:30
|
|
|
throw new Error("No decoders found for topic " + pubsubTopicForQuery);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
pubsubTopic: pubsubTopicForQuery,
|
|
|
|
|
contentTopics,
|
|
|
|
|
decodersAsMap
|
|
|
|
|
};
|
|
|
|
|
}
|
2025-01-28 17:57:49 +05:30
|
|
|
|
2025-02-05 11:21:22 +01:00
|
|
|
private async getPeerToUse(): Promise<PeerId | undefined> {
|
|
|
|
|
let peerId: PeerId | undefined;
|
2025-01-31 00:16:00 +01:00
|
|
|
|
|
|
|
|
if (this.options?.peer) {
|
|
|
|
|
const connectedPeers = await this.connectionManager.getConnectedPeers();
|
|
|
|
|
|
2025-02-05 11:21:22 +01:00
|
|
|
const peer = connectedPeers.find(
|
|
|
|
|
(p) => p.id.toString() === this.options?.peer
|
|
|
|
|
);
|
|
|
|
|
peerId = peer?.id;
|
2025-01-31 00:16:00 +01:00
|
|
|
|
2025-02-05 11:21:22 +01:00
|
|
|
if (!peerId) {
|
2025-01-31 00:16:00 +01:00
|
|
|
log.warn(
|
|
|
|
|
`Passed node to use for Store not found: ${this.options.peer}. Attempting to use random peers.`
|
|
|
|
|
);
|
|
|
|
|
}
|
2025-01-28 17:57:49 +05:30
|
|
|
}
|
|
|
|
|
|
2025-02-25 22:40:03 +01:00
|
|
|
const peerIds = this.peerManager.getPeers();
|
2025-01-28 17:57:49 +05:30
|
|
|
|
2025-02-05 11:21:22 +01:00
|
|
|
if (peerIds.length > 0) {
|
2025-01-31 00:16:00 +01:00
|
|
|
// TODO(weboko): implement smart way of getting a peer https://github.com/waku-org/js-waku/issues/2243
|
2025-02-05 11:21:22 +01:00
|
|
|
return peerIds[Math.floor(Math.random() * peerIds.length)];
|
2025-01-31 00:16:00 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.error("No peers available to use.");
|
|
|
|
|
return;
|
2025-01-28 17:57:49 +05:30
|
|
|
}
|
2024-04-01 16:47:47 +05:30
|
|
|
}
|