feat!: store v3 (#2036)

* feat: write proto

* chore: move store v2 to a subdir

* chore: update v3 proto

* feat: create custom RPC

* feat: implement storev3 core

* chore: set store v3 as default

* chore: move v2 related code

* chore: update v2 imports

* feat: add store-v3 sdk implementation

* fix: rebase

* chore: add ts-doc for store query request params

* chore: update tests for new API

* fix: use nanoseconds instead of millisecond for timerange

* chore: improve store

* chore: remove store v2

* chore: update tests

* chore: fix legacy imports & proto

* tests: remove manual reversal as its part of the API, update incorrect cursor error msg

* chore: update default page size

* chore: account for MAX_PAGE_SIZE from nwaku

* fix: test

* fix: sorting tests
This commit is contained in:
Danish Arora 2024-08-06 12:06:37 +05:30 committed by GitHub
parent fdd9dc44a4
commit 86f730f958
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 732 additions and 1009 deletions

View File

@ -15,8 +15,6 @@ export { LightPushCodec, LightPushCore } from "./lib/light_push/index.js";
export * as waku_store from "./lib/store/index.js";
export { StoreCore } from "./lib/store/index.js";
export { PageDirection } from "./lib/store/index.js";
export { waitForRemotePeer } from "./lib/wait_for_remote_peer.js";
export { ConnectionManager } from "./lib/connection_manager.js";

View File

@ -1,93 +0,0 @@
import { proto_store as proto } from "@waku/proto";
import type { Uint8ArrayList } from "uint8arraylist";
import { v4 as uuid } from "uuid";
const OneMillion = BigInt(1_000_000);
export enum PageDirection {
BACKWARD = "backward",
FORWARD = "forward"
}
export interface Params {
contentTopics: string[];
pubsubTopic: string;
pageDirection: PageDirection;
pageSize: number;
startTime?: Date;
endTime?: Date;
cursor?: proto.Index;
}
export class HistoryRpc {
private constructor(public readonly proto: proto.HistoryRpc) {}
public get query(): proto.HistoryQuery | undefined {
return this.proto.query;
}
public get response(): proto.HistoryResponse | undefined {
return this.proto.response;
}
/**
* Create History Query.
*/
public static createQuery(params: Params): HistoryRpc {
const contentFilters = params.contentTopics.map((contentTopic) => {
return { contentTopic };
});
const direction = directionToProto(params.pageDirection);
const pagingInfo = {
pageSize: BigInt(params.pageSize),
cursor: params.cursor,
direction
} as proto.PagingInfo;
let startTime, endTime;
if (params.startTime) {
// milliseconds 10^-3 to nanoseconds 10^-9
startTime = BigInt(params.startTime.valueOf()) * OneMillion;
}
if (params.endTime) {
// milliseconds 10^-3 to nanoseconds 10^-9
endTime = BigInt(params.endTime.valueOf()) * OneMillion;
}
return new HistoryRpc({
requestId: uuid(),
query: {
pubsubTopic: params.pubsubTopic,
contentFilters,
pagingInfo,
startTime,
endTime
},
response: undefined
});
}
public decode(bytes: Uint8ArrayList): HistoryRpc {
const res = proto.HistoryRpc.decode(bytes);
return new HistoryRpc(res);
}
public encode(): Uint8Array {
return proto.HistoryRpc.encode(this.proto);
}
}
function directionToProto(
pageDirection: PageDirection
): proto.PagingInfo.Direction {
switch (pageDirection) {
case PageDirection.BACKWARD:
return proto.PagingInfo.Direction.BACKWARD;
case PageDirection.FORWARD:
return proto.PagingInfo.Direction.FORWARD;
default:
return proto.PagingInfo.Direction.BACKWARD;
}
}

View File

@ -1,13 +1,12 @@
import type { Peer } from "@libp2p/interface";
import {
Cursor,
IDecodedMessage,
IDecoder,
IStoreCore,
Libp2p,
ProtocolCreateOptions
ProtocolCreateOptions,
QueryRequestParams
} from "@waku/interfaces";
import { proto_store as proto } from "@waku/proto";
import { Logger } from "@waku/utils";
import all from "it-all";
import * as lp from "it-length-prefixed";
@ -17,63 +16,30 @@ import { Uint8ArrayList } from "uint8arraylist";
import { BaseProtocol } from "../base_protocol.js";
import { toProtoMessage } from "../to_proto_message.js";
import { HistoryRpc, PageDirection, Params } from "./history_rpc.js";
import HistoryError = proto.HistoryResponse.HistoryError;
import {
DEFAULT_PAGE_SIZE,
MAX_PAGE_SIZE,
StoreQueryRequest,
StoreQueryResponse
} from "./rpc.js";
const log = new Logger("store");
export const StoreCodec = "/vac/waku/store/2.0.0-beta4";
export const StoreCodec = "/vac/waku/store-query/3.0.0";
export { PageDirection, Params };
export interface TimeFilter {
startTime: Date;
endTime: Date;
}
export interface QueryOptions {
/**
* The direction in which pages are retrieved:
* - { @link PageDirection.BACKWARD }: Most recent page first.
* - { @link PageDirection.FORWARD }: Oldest page first.
*
* Note: This does not affect the ordering of messages with the page
* (the oldest message is always first).
*
* @default { @link PageDirection.BACKWARD }
*/
pageDirection?: PageDirection;
/**
* The number of message per page.
*
* @default { @link DefaultPageSize }
*/
pageSize?: number;
/**
* Retrieve messages with a timestamp within the provided values.
*/
timeFilter?: TimeFilter;
/**
* Cursor as an index to start a query from.
* The cursor index will be exclusive (i.e. the message at the cursor index will not be included in the result).
* If undefined, the query will start from the beginning or end of the history, depending on the page direction.
*/
cursor?: Cursor;
}
/**
* Implements the [Waku v2 Store protocol](https://rfc.vac.dev/spec/13/).
*
* The Waku Store protocol can be used to retrieved historical messages.
*/
export class StoreCore extends BaseProtocol implements IStoreCore {
public constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(StoreCodec, libp2p.components, log, options!.pubsubTopics!, options);
super(
StoreCodec,
libp2p.components,
log,
options?.pubsubTopics || [],
options
);
}
public async *queryPerPage<T extends IDecodedMessage>(
queryOpts: Params,
queryOpts: QueryRequestParams,
decoders: Map<string, IDecoder<T>>,
peer: Peer
): AsyncGenerator<Promise<T | undefined>[]> {
@ -86,11 +52,12 @@ export class StoreCore extends BaseProtocol implements IStoreCore {
);
}
let currentCursor = queryOpts.cursor;
let currentCursor = queryOpts.paginationCursor;
while (true) {
queryOpts.cursor = currentCursor;
const historyRpcQuery = HistoryRpc.createQuery(queryOpts);
const storeQueryRequest = StoreQueryRequest.create({
...queryOpts,
paginationCursor: currentCursor
});
let stream;
try {
@ -101,7 +68,7 @@ export class StoreCore extends BaseProtocol implements IStoreCore {
}
const res = await pipe(
[historyRpcQuery.encode()],
[storeQueryRequest.encode()],
lp.encode,
stream,
lp.decode,
@ -113,61 +80,57 @@ export class StoreCore extends BaseProtocol implements IStoreCore {
bytes.append(chunk);
});
const reply = historyRpcQuery.decode(bytes);
const storeQueryResponse = StoreQueryResponse.decode(bytes);
if (!reply.response) {
log.warn("Stopping pagination due to store `response` field missing");
if (
!storeQueryResponse.statusCode ||
storeQueryResponse.statusCode >= 300
) {
const errorMessage = `Store query failed with status code: ${storeQueryResponse.statusCode}, description: ${storeQueryResponse.statusDesc}`;
log.error(errorMessage);
throw new Error(errorMessage);
}
if (!storeQueryResponse.messages || !storeQueryResponse.messages.length) {
log.warn("Stopping pagination due to empty messages in response");
break;
}
const response = reply.response as proto.HistoryResponse;
log.info(
`${storeQueryResponse.messages.length} messages retrieved from store`
);
if (response.error && response.error !== HistoryError.NONE) {
throw "History response contains an Error: " + response.error;
}
if (!response.messages || !response.messages.length) {
log.warn(
"Stopping pagination due to store `response.messages` field missing or empty"
);
break;
}
log.error(`${response.messages.length} messages retrieved from store`);
yield response.messages.map((protoMsg) => {
const contentTopic = protoMsg.contentTopic;
if (typeof contentTopic !== "undefined") {
const decodedMessages = storeQueryResponse.messages.map((protoMsg) => {
if (!protoMsg.message) {
return Promise.resolve(undefined);
}
const contentTopic = protoMsg.message.contentTopic;
if (contentTopic) {
const decoder = decoders.get(contentTopic);
if (decoder) {
return decoder.fromProtoObj(
queryOpts.pubsubTopic,
toProtoMessage(protoMsg)
protoMsg.pubsubTopic || "",
toProtoMessage(protoMsg.message)
);
}
}
return Promise.resolve(undefined);
});
const nextCursor = response.pagingInfo?.cursor;
if (typeof nextCursor === "undefined") {
// If the server does not return cursor then there is an issue,
// Need to abort, or we end up in an infinite loop
log.warn(
"Stopping pagination due to `response.pagingInfo.cursor` missing from store response"
);
break;
yield decodedMessages;
if (queryOpts.paginationForward) {
currentCursor =
storeQueryResponse.messages[storeQueryResponse.messages.length - 1]
.messageHash;
} else {
currentCursor = storeQueryResponse.messages[0].messageHash;
}
currentCursor = nextCursor;
const responsePageSize = response.pagingInfo?.pageSize;
const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize;
if (
// Response page size smaller than query, meaning this is the last page
responsePageSize &&
queryPageSize &&
responsePageSize < queryPageSize
storeQueryResponse.messages.length > MAX_PAGE_SIZE &&
storeQueryResponse.messages.length <
(queryOpts.paginationLimit || DEFAULT_PAGE_SIZE)
) {
break;
}

View File

@ -0,0 +1,92 @@
import { QueryRequestParams } from "@waku/interfaces";
import { proto_store as proto } from "@waku/proto";
import type { Uint8ArrayList } from "uint8arraylist";
import { v4 as uuid } from "uuid";
// https://github.com/waku-org/nwaku/blob/7205f95cff9f49ca0bb762e8fd0bf56a6a7f3b3b/waku/waku_store/common.nim#L12
export const DEFAULT_PAGE_SIZE = 20;
export const MAX_PAGE_SIZE = 100;
const ONE_MILLION = 1_000000;
export class StoreQueryRequest {
public constructor(public proto: proto.StoreQueryRequest) {}
public static create(params: QueryRequestParams): StoreQueryRequest {
const request = new StoreQueryRequest({
...params,
requestId: uuid(),
timeStart: params.timeStart
? BigInt(params.timeStart.getTime() * ONE_MILLION)
: undefined,
timeEnd: params.timeEnd
? BigInt(params.timeEnd.getTime() * ONE_MILLION)
: undefined,
messageHashes: params.messageHashes || [],
paginationLimit: params.paginationLimit
? BigInt(params.paginationLimit)
: undefined
});
// Validate request parameters based on RFC
if (
(params.pubsubTopic && !params.contentTopics) ||
(!params.pubsubTopic && params.contentTopics)
) {
throw new Error(
"Both pubsubTopic and contentTopics must be set or unset"
);
}
if (
params.messageHashes &&
(params.pubsubTopic ||
params.contentTopics ||
params.timeStart ||
params.timeEnd)
) {
throw new Error(
"Message hash lookup queries cannot include content filter criteria"
);
}
return request;
}
public static decode(bytes: Uint8ArrayList): StoreQueryRequest {
const res = proto.StoreQueryRequest.decode(bytes);
return new StoreQueryRequest(res);
}
public encode(): Uint8Array {
return proto.StoreQueryRequest.encode(this.proto);
}
}
export class StoreQueryResponse {
public constructor(public proto: proto.StoreQueryResponse) {}
public static decode(bytes: Uint8ArrayList): StoreQueryResponse {
const res = proto.StoreQueryResponse.decode(bytes);
return new StoreQueryResponse(res);
}
public encode(): Uint8Array {
return proto.StoreQueryResponse.encode(this.proto);
}
public get statusCode(): number | undefined {
return this.proto.statusCode;
}
public get statusDesc(): string | undefined {
return this.proto.statusDesc;
}
public get messages(): proto.WakuMessageKeyValue[] {
return this.proto.messages;
}
public get paginationCursor(): Uint8Array | undefined {
return this.proto.paginationCursor;
}
}

View File

@ -1,72 +1,101 @@
import { proto_store as proto } from "@waku/proto";
import type { IDecodedMessage, IDecoder } from "./message.js";
import type { IBaseProtocolCore, IBaseProtocolSDK } from "./protocols.js";
export enum PageDirection {
BACKWARD = "backward",
FORWARD = "forward"
}
export type StoreCursor = Uint8Array;
export interface TimeFilter {
startTime: Date;
endTime: Date;
}
/**
* Parameters for a store query request, as specified in the Waku Store v3 RFC.
*/
export type QueryRequestParams = {
/**
* Whether to include the full message data in the response.
* - `true`: The response will include the message content and associated pubsub topic for each matching message.
* - `false`: The response will only include the message hashes for each matching message.
* @default true
*/
includeData: boolean;
export interface Cursor {
digest: Uint8Array;
receiverTime: bigint;
senderTime: bigint;
/**
* The pubsub topic to query. This field is mandatory.
* The query will only return messages that were published on this specific pubsub topic.
*/
pubsubTopic: string;
}
export type StoreQueryOptions = {
/**
* The direction in which pages are retrieved:
* - { @link PageDirection.BACKWARD }: Most recent page first.
* - { @link PageDirection.FORWARD }: Oldest page first.
*
* Note: This does not affect the ordering of messages with the page
* (the oldest message is always first).
*
* @default { @link PageDirection.BACKWARD }
* The content topics to filter the messages.
* The query will only return messages that have a content topic included in this array.
* This field MUST be populated together with the `pubsubTopic` field for content topic filtering to be applied.
* If either `contentTopics` or `pubsubTopic` is not provided or empty, no content topic filtering will be applied.
*/
pageDirection?: PageDirection;
contentTopics: string[];
/**
* The number of message per page.
* The start time for the time range filter.
* The query will only return messages with a timestamp greater than or equal to `timeStart`.
* If not provided, no start time filtering will be applied.
*/
pageSize?: number;
timeStart?: Date;
/**
* Retrieve messages with a timestamp within the provided values.
* The end time for the time range filter.
* The query will only return messages with a timestamp strictly less than `timeEnd`.
* If not provided, no end time filtering will be applied.
*/
timeFilter?: TimeFilter;
timeEnd?: Date;
/**
* Cursor as an index to start a query from. Must be generated from a Waku
* Message.
* The message hashes to lookup.
* If provided, the query will be a message hash lookup query and will only return messages that match the specified hashes.
* If not provided or empty, the query will be a content filtered query based on the other filter parameters.
* @default undefined
*/
cursor?: proto.Index;
messageHashes?: Uint8Array[];
/**
* The cursor to start the query from.
* The cursor represents the message hash of the last message returned in the previous query.
* The query will start from the message immediately following the cursor, excluding the message at the cursor itself.
* If not provided, the query will start from the beginning or end of the store, depending on the `paginationForward` option.
* @default undefined
*/
paginationCursor?: Uint8Array;
/**
* The direction of pagination.
* - `true`: Forward pagination, starting from the oldest message and moving towards the newest.
* - `false`: Backward pagination, starting from the newest message and moving towards the oldest.
* @default false
*/
paginationForward: boolean;
/**
* The maximum number of messages to retrieve per page.
* If not provided, the store's default pagination limit will be used.
* @default undefined
*/
paginationLimit?: number;
};
export type IStoreCore = IBaseProtocolCore;
export type IStoreSDK = IBaseProtocolSDK & {
protocol: IBaseProtocolCore;
createCursor(message: IDecodedMessage): Cursor;
createCursor(message: IDecodedMessage): StoreCursor;
queryGenerator: <T extends IDecodedMessage>(
decoders: IDecoder<T>[],
options?: StoreQueryOptions
options?: Partial<QueryRequestParams>
) => AsyncGenerator<Promise<T | undefined>[]>;
queryWithOrderedCallback: <T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: (message: T) => Promise<void | boolean> | boolean | void,
options?: StoreQueryOptions
options?: Partial<QueryRequestParams>
) => Promise<void>;
queryWithPromiseCallback: <T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: (
message: Promise<T | undefined>
) => Promise<void | boolean> | boolean | void,
options?: StoreQueryOptions
options?: Partial<QueryRequestParams>
) => Promise<void>;
};

View File

@ -4,148 +4,39 @@
/* eslint-disable @typescript-eslint/no-unnecessary-boolean-literal-compare */
/* eslint-disable @typescript-eslint/no-empty-interface */
import { type Codec, CodeError, decodeMessage, type DecodeOptions, encodeMessage, enumeration, message } from 'protons-runtime'
import { type Codec, CodeError, decodeMessage, type DecodeOptions, encodeMessage, message } from 'protons-runtime'
import { alloc as uint8ArrayAlloc } from 'uint8arrays/alloc'
import type { Uint8ArrayList } from 'uint8arraylist'
export interface Index {
digest: Uint8Array
receiverTime: bigint
senderTime: bigint
pubsubTopic: string
export interface WakuMessageKeyValue {
messageHash?: Uint8Array
message?: WakuMessage
pubsubTopic?: string
}
export namespace Index {
let _codec: Codec<Index>
export namespace WakuMessageKeyValue {
let _codec: Codec<WakuMessageKeyValue>
export const codec = (): Codec<Index> => {
export const codec = (): Codec<WakuMessageKeyValue> => {
if (_codec == null) {
_codec = message<Index>((obj, w, opts = {}) => {
_codec = message<WakuMessageKeyValue>((obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork()
}
if ((obj.digest != null && obj.digest.byteLength > 0)) {
if (obj.messageHash != null) {
w.uint32(10)
w.bytes(obj.digest)
w.bytes(obj.messageHash)
}
if ((obj.receiverTime != null && obj.receiverTime !== 0n)) {
w.uint32(16)
w.sint64(obj.receiverTime)
}
if ((obj.senderTime != null && obj.senderTime !== 0n)) {
w.uint32(24)
w.sint64(obj.senderTime)
}
if ((obj.pubsubTopic != null && obj.pubsubTopic !== '')) {
w.uint32(34)
w.string(obj.pubsubTopic)
}
if (opts.lengthDelimited !== false) {
w.ldelim()
}
}, (reader, length, opts = {}) => {
const obj: any = {
digest: uint8ArrayAlloc(0),
receiverTime: 0n,
senderTime: 0n,
pubsubTopic: ''
}
const end = length == null ? reader.len : reader.pos + length
while (reader.pos < end) {
const tag = reader.uint32()
switch (tag >>> 3) {
case 1: {
obj.digest = reader.bytes()
break
}
case 2: {
obj.receiverTime = reader.sint64()
break
}
case 3: {
obj.senderTime = reader.sint64()
break
}
case 4: {
obj.pubsubTopic = reader.string()
break
}
default: {
reader.skipType(tag & 7)
break
}
}
}
return obj
})
}
return _codec
}
export const encode = (obj: Partial<Index>): Uint8Array => {
return encodeMessage(obj, Index.codec())
}
export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions<Index>): Index => {
return decodeMessage(buf, Index.codec(), opts)
}
}
export interface PagingInfo {
pageSize?: bigint
cursor?: Index
direction?: PagingInfo.Direction
}
export namespace PagingInfo {
export enum Direction {
BACKWARD = 'BACKWARD',
FORWARD = 'FORWARD'
}
enum __DirectionValues {
BACKWARD = 0,
FORWARD = 1
}
export namespace Direction {
export const codec = (): Codec<Direction> => {
return enumeration<Direction>(__DirectionValues)
}
}
let _codec: Codec<PagingInfo>
export const codec = (): Codec<PagingInfo> => {
if (_codec == null) {
_codec = message<PagingInfo>((obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork()
}
if (obj.pageSize != null) {
w.uint32(8)
w.uint64(obj.pageSize)
}
if (obj.cursor != null) {
if (obj.message != null) {
w.uint32(18)
Index.codec().encode(obj.cursor, w)
WakuMessage.codec().encode(obj.message, w)
}
if (obj.direction != null) {
w.uint32(24)
PagingInfo.Direction.codec().encode(obj.direction, w)
if (obj.pubsubTopic != null) {
w.uint32(26)
w.string(obj.pubsubTopic)
}
if (opts.lengthDelimited !== false) {
@ -161,189 +52,19 @@ export namespace PagingInfo {
switch (tag >>> 3) {
case 1: {
obj.pageSize = reader.uint64()
obj.messageHash = reader.bytes()
break
}
case 2: {
obj.cursor = Index.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.cursor
obj.message = WakuMessage.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.message
})
break
}
case 3: {
obj.direction = PagingInfo.Direction.codec().decode(reader)
break
}
default: {
reader.skipType(tag & 7)
break
}
}
}
return obj
})
}
return _codec
}
export const encode = (obj: Partial<PagingInfo>): Uint8Array => {
return encodeMessage(obj, PagingInfo.codec())
}
export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions<PagingInfo>): PagingInfo => {
return decodeMessage(buf, PagingInfo.codec(), opts)
}
}
export interface ContentFilter {
contentTopic: string
}
export namespace ContentFilter {
let _codec: Codec<ContentFilter>
export const codec = (): Codec<ContentFilter> => {
if (_codec == null) {
_codec = message<ContentFilter>((obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork()
}
if ((obj.contentTopic != null && obj.contentTopic !== '')) {
w.uint32(10)
w.string(obj.contentTopic)
}
if (opts.lengthDelimited !== false) {
w.ldelim()
}
}, (reader, length, opts = {}) => {
const obj: any = {
contentTopic: ''
}
const end = length == null ? reader.len : reader.pos + length
while (reader.pos < end) {
const tag = reader.uint32()
switch (tag >>> 3) {
case 1: {
obj.contentTopic = reader.string()
break
}
default: {
reader.skipType(tag & 7)
break
}
}
}
return obj
})
}
return _codec
}
export const encode = (obj: Partial<ContentFilter>): Uint8Array => {
return encodeMessage(obj, ContentFilter.codec())
}
export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions<ContentFilter>): ContentFilter => {
return decodeMessage(buf, ContentFilter.codec(), opts)
}
}
export interface HistoryQuery {
pubsubTopic?: string
contentFilters: ContentFilter[]
pagingInfo?: PagingInfo
startTime?: bigint
endTime?: bigint
}
export namespace HistoryQuery {
let _codec: Codec<HistoryQuery>
export const codec = (): Codec<HistoryQuery> => {
if (_codec == null) {
_codec = message<HistoryQuery>((obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork()
}
if (obj.pubsubTopic != null) {
w.uint32(18)
w.string(obj.pubsubTopic)
}
if (obj.contentFilters != null) {
for (const value of obj.contentFilters) {
w.uint32(26)
ContentFilter.codec().encode(value, w)
}
}
if (obj.pagingInfo != null) {
w.uint32(34)
PagingInfo.codec().encode(obj.pagingInfo, w)
}
if (obj.startTime != null) {
w.uint32(40)
w.sint64(obj.startTime)
}
if (obj.endTime != null) {
w.uint32(48)
w.sint64(obj.endTime)
}
if (opts.lengthDelimited !== false) {
w.ldelim()
}
}, (reader, length, opts = {}) => {
const obj: any = {
contentFilters: []
}
const end = length == null ? reader.len : reader.pos + length
while (reader.pos < end) {
const tag = reader.uint32()
switch (tag >>> 3) {
case 2: {
obj.pubsubTopic = reader.string()
break
}
case 3: {
if (opts.limits?.contentFilters != null && obj.contentFilters.length === opts.limits.contentFilters) {
throw new CodeError('decode error - map field "contentFilters" had too many elements', 'ERR_MAX_LENGTH')
}
obj.contentFilters.push(ContentFilter.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.contentFilters$
}))
break
}
case 4: {
obj.pagingInfo = PagingInfo.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.pagingInfo
})
break
}
case 5: {
obj.startTime = reader.sint64()
break
}
case 6: {
obj.endTime = reader.sint64()
break
}
default: {
reader.skipType(tag & 7)
break
@ -358,138 +79,34 @@ export namespace HistoryQuery {
return _codec
}
export const encode = (obj: Partial<HistoryQuery>): Uint8Array => {
return encodeMessage(obj, HistoryQuery.codec())
export const encode = (obj: Partial<WakuMessageKeyValue>): Uint8Array => {
return encodeMessage(obj, WakuMessageKeyValue.codec())
}
export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions<HistoryQuery>): HistoryQuery => {
return decodeMessage(buf, HistoryQuery.codec(), opts)
export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions<WakuMessageKeyValue>): WakuMessageKeyValue => {
return decodeMessage(buf, WakuMessageKeyValue.codec(), opts)
}
}
export interface HistoryResponse {
messages: WakuMessage[]
pagingInfo?: PagingInfo
error: HistoryResponse.HistoryError
}
export namespace HistoryResponse {
export enum HistoryError {
NONE = 'NONE',
INVALID_CURSOR = 'INVALID_CURSOR',
TOO_MANY_REQUESTS = 'TOO_MANY_REQUESTS',
SERVICE_UNAVAILABLE = 'SERVICE_UNAVAILABLE'
}
enum __HistoryErrorValues {
NONE = 0,
INVALID_CURSOR = 1,
TOO_MANY_REQUESTS = 429,
SERVICE_UNAVAILABLE = 503
}
export namespace HistoryError {
export const codec = (): Codec<HistoryError> => {
return enumeration<HistoryError>(__HistoryErrorValues)
}
}
let _codec: Codec<HistoryResponse>
export const codec = (): Codec<HistoryResponse> => {
if (_codec == null) {
_codec = message<HistoryResponse>((obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork()
}
if (obj.messages != null) {
for (const value of obj.messages) {
w.uint32(18)
WakuMessage.codec().encode(value, w)
}
}
if (obj.pagingInfo != null) {
w.uint32(26)
PagingInfo.codec().encode(obj.pagingInfo, w)
}
if (obj.error != null && __HistoryErrorValues[obj.error] !== 0) {
w.uint32(32)
HistoryResponse.HistoryError.codec().encode(obj.error, w)
}
if (opts.lengthDelimited !== false) {
w.ldelim()
}
}, (reader, length, opts = {}) => {
const obj: any = {
messages: [],
error: HistoryError.NONE
}
const end = length == null ? reader.len : reader.pos + length
while (reader.pos < end) {
const tag = reader.uint32()
switch (tag >>> 3) {
case 2: {
if (opts.limits?.messages != null && obj.messages.length === opts.limits.messages) {
throw new CodeError('decode error - map field "messages" had too many elements', 'ERR_MAX_LENGTH')
}
obj.messages.push(WakuMessage.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.messages$
}))
break
}
case 3: {
obj.pagingInfo = PagingInfo.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.pagingInfo
})
break
}
case 4: {
obj.error = HistoryResponse.HistoryError.codec().decode(reader)
break
}
default: {
reader.skipType(tag & 7)
break
}
}
}
return obj
})
}
return _codec
}
export const encode = (obj: Partial<HistoryResponse>): Uint8Array => {
return encodeMessage(obj, HistoryResponse.codec())
}
export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions<HistoryResponse>): HistoryResponse => {
return decodeMessage(buf, HistoryResponse.codec(), opts)
}
}
export interface HistoryRpc {
export interface StoreQueryRequest {
requestId: string
query?: HistoryQuery
response?: HistoryResponse
includeData: boolean
pubsubTopic?: string
contentTopics: string[]
timeStart?: bigint
timeEnd?: bigint
messageHashes: Uint8Array[]
paginationCursor?: Uint8Array
paginationForward: boolean
paginationLimit?: bigint
}
export namespace HistoryRpc {
let _codec: Codec<HistoryRpc>
export namespace StoreQueryRequest {
let _codec: Codec<StoreQueryRequest>
export const codec = (): Codec<HistoryRpc> => {
export const codec = (): Codec<StoreQueryRequest> => {
if (_codec == null) {
_codec = message<HistoryRpc>((obj, w, opts = {}) => {
_codec = message<StoreQueryRequest>((obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork()
}
@ -499,14 +116,53 @@ export namespace HistoryRpc {
w.string(obj.requestId)
}
if (obj.query != null) {
w.uint32(18)
HistoryQuery.codec().encode(obj.query, w)
if ((obj.includeData != null && obj.includeData !== false)) {
w.uint32(16)
w.bool(obj.includeData)
}
if (obj.response != null) {
w.uint32(26)
HistoryResponse.codec().encode(obj.response, w)
if (obj.pubsubTopic != null) {
w.uint32(82)
w.string(obj.pubsubTopic)
}
if (obj.contentTopics != null) {
for (const value of obj.contentTopics) {
w.uint32(90)
w.string(value)
}
}
if (obj.timeStart != null) {
w.uint32(96)
w.sint64(obj.timeStart)
}
if (obj.timeEnd != null) {
w.uint32(104)
w.sint64(obj.timeEnd)
}
if (obj.messageHashes != null) {
for (const value of obj.messageHashes) {
w.uint32(162)
w.bytes(value)
}
}
if (obj.paginationCursor != null) {
w.uint32(410)
w.bytes(obj.paginationCursor)
}
if ((obj.paginationForward != null && obj.paginationForward !== false)) {
w.uint32(416)
w.bool(obj.paginationForward)
}
if (obj.paginationLimit != null) {
w.uint32(424)
w.uint64(obj.paginationLimit)
}
if (opts.lengthDelimited !== false) {
@ -514,7 +170,11 @@ export namespace HistoryRpc {
}
}, (reader, length, opts = {}) => {
const obj: any = {
requestId: ''
requestId: '',
includeData: false,
contentTopics: [],
messageHashes: [],
paginationForward: false
}
const end = length == null ? reader.len : reader.pos + length
@ -528,15 +188,47 @@ export namespace HistoryRpc {
break
}
case 2: {
obj.query = HistoryQuery.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.query
})
obj.includeData = reader.bool()
break
}
case 3: {
obj.response = HistoryResponse.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.response
})
case 10: {
obj.pubsubTopic = reader.string()
break
}
case 11: {
if (opts.limits?.contentTopics != null && obj.contentTopics.length === opts.limits.contentTopics) {
throw new CodeError('decode error - map field "contentTopics" had too many elements', 'ERR_MAX_LENGTH')
}
obj.contentTopics.push(reader.string())
break
}
case 12: {
obj.timeStart = reader.sint64()
break
}
case 13: {
obj.timeEnd = reader.sint64()
break
}
case 20: {
if (opts.limits?.messageHashes != null && obj.messageHashes.length === opts.limits.messageHashes) {
throw new CodeError('decode error - map field "messageHashes" had too many elements', 'ERR_MAX_LENGTH')
}
obj.messageHashes.push(reader.bytes())
break
}
case 51: {
obj.paginationCursor = reader.bytes()
break
}
case 52: {
obj.paginationForward = reader.bool()
break
}
case 53: {
obj.paginationLimit = reader.uint64()
break
}
default: {
@ -553,12 +245,121 @@ export namespace HistoryRpc {
return _codec
}
export const encode = (obj: Partial<HistoryRpc>): Uint8Array => {
return encodeMessage(obj, HistoryRpc.codec())
export const encode = (obj: Partial<StoreQueryRequest>): Uint8Array => {
return encodeMessage(obj, StoreQueryRequest.codec())
}
export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions<HistoryRpc>): HistoryRpc => {
return decodeMessage(buf, HistoryRpc.codec(), opts)
export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions<StoreQueryRequest>): StoreQueryRequest => {
return decodeMessage(buf, StoreQueryRequest.codec(), opts)
}
}
export interface StoreQueryResponse {
requestId: string
statusCode?: number
statusDesc?: string
messages: WakuMessageKeyValue[]
paginationCursor?: Uint8Array
}
export namespace StoreQueryResponse {
let _codec: Codec<StoreQueryResponse>
export const codec = (): Codec<StoreQueryResponse> => {
if (_codec == null) {
_codec = message<StoreQueryResponse>((obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork()
}
if ((obj.requestId != null && obj.requestId !== '')) {
w.uint32(10)
w.string(obj.requestId)
}
if (obj.statusCode != null) {
w.uint32(80)
w.uint32(obj.statusCode)
}
if (obj.statusDesc != null) {
w.uint32(90)
w.string(obj.statusDesc)
}
if (obj.messages != null) {
for (const value of obj.messages) {
w.uint32(162)
WakuMessageKeyValue.codec().encode(value, w)
}
}
if (obj.paginationCursor != null) {
w.uint32(410)
w.bytes(obj.paginationCursor)
}
if (opts.lengthDelimited !== false) {
w.ldelim()
}
}, (reader, length, opts = {}) => {
const obj: any = {
requestId: '',
messages: []
}
const end = length == null ? reader.len : reader.pos + length
while (reader.pos < end) {
const tag = reader.uint32()
switch (tag >>> 3) {
case 1: {
obj.requestId = reader.string()
break
}
case 10: {
obj.statusCode = reader.uint32()
break
}
case 11: {
obj.statusDesc = reader.string()
break
}
case 20: {
if (opts.limits?.messages != null && obj.messages.length === opts.limits.messages) {
throw new CodeError('decode error - map field "messages" had too many elements', 'ERR_MAX_LENGTH')
}
obj.messages.push(WakuMessageKeyValue.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.messages$
}))
break
}
case 51: {
obj.paginationCursor = reader.bytes()
break
}
default: {
reader.skipType(tag & 7)
break
}
}
}
return obj
})
}
return _codec
}
export const encode = (obj: Partial<StoreQueryResponse>): Uint8Array => {
return encodeMessage(obj, StoreQueryResponse.codec())
}
export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions<StoreQueryResponse>): StoreQueryResponse => {
return decodeMessage(buf, StoreQueryResponse.codec(), opts)
}
}

View File

@ -10,7 +10,7 @@ export * as proto_filter_v2 from "./generated/filter_v2.js";
export * as proto_lightpush from "./generated/light_push.js";
export { PushResponse } from "./generated/light_push.js";
export * as proto_store from "./generated/store.js";
export * as proto_store from './generated/store_v3.js'
export * as proto_peer_exchange from "./generated/peer_exchange.js";

View File

@ -1,55 +0,0 @@
// 13/WAKU2-STORE rfc: https://rfc.vac.dev/spec/13/
// Protocol identifier: /vac/waku/store/2.0.0-beta4
syntax = "proto3";
import "message.proto";
message Index {
bytes digest = 1;
sint64 receiver_time = 2;
sint64 sender_time = 3;
string pubsub_topic = 4;
}
message PagingInfo {
optional uint64 page_size = 1;
optional Index cursor = 2;
enum Direction {
BACKWARD = 0;
FORWARD = 1;
}
optional Direction direction = 3;
}
message ContentFilter {
string content_topic = 1;
}
message HistoryQuery {
// The first field is reserved for future use
optional string pubsub_topic = 2;
repeated ContentFilter content_filters = 3;
optional PagingInfo paging_info = 4;
optional sint64 start_time = 5;
optional sint64 end_time = 6;
}
message HistoryResponse {
// The first field is reserved for future use
repeated WakuMessage messages = 2;
optional PagingInfo paging_info = 3;
enum HistoryError {
NONE = 0;
INVALID_CURSOR = 1;
TOO_MANY_REQUESTS = 429;
SERVICE_UNAVAILABLE = 503;
}
HistoryError error = 4;
}
message HistoryRpc {
string request_id = 1;
optional HistoryQuery query = 2;
optional HistoryResponse response = 3;
}

View File

@ -0,0 +1,42 @@
// Protocol identifier: /vac/waku/store-query/3.0.0
syntax = "proto3";
import "message.proto";
message WakuMessageKeyValue {
optional bytes message_hash = 1; // Globally unique key for a Waku Message
// Full message content and associated pubsub_topic as value
optional WakuMessage message = 2;
optional string pubsub_topic = 3;
}
message StoreQueryRequest {
string request_id = 1;
bool include_data = 2; // Response should include full message content
// Filter criteria for content-filtered queries
optional string pubsub_topic = 10;
repeated string content_topics = 11;
optional sint64 time_start = 12;
optional sint64 time_end = 13;
// List of key criteria for lookup queries
repeated bytes message_hashes = 20; // Message hashes (keys) to lookup
// Pagination info. 50 Reserved
optional bytes pagination_cursor = 51; // Message hash (key) from where to start query (exclusive)
bool pagination_forward = 52;
optional uint64 pagination_limit = 53;
}
message StoreQueryResponse {
string request_id = 1;
optional uint32 status_code = 10;
optional string status_desc = 11;
repeated WakuMessageKeyValue messages = 20;
optional bytes pagination_cursor = 51;
}

View File

@ -1,27 +1,26 @@
import { sha256 } from "@noble/hashes/sha256";
import { ConnectionManager, StoreCore, waku_store } from "@waku/core";
import { ConnectionManager, StoreCore } from "@waku/core";
import {
Cursor,
IDecodedMessage,
IDecoder,
IStoreSDK,
type Libp2p,
PageDirection,
type ProtocolCreateOptions
Libp2p,
ProtocolCreateOptions,
QueryRequestParams,
StoreCursor
} from "@waku/interfaces";
import { messageHash } from "@waku/message-hash";
import { ensurePubsubTopicIsConfigured, isDefined, Logger } from "@waku/utils";
import { concat } from "@waku/utils/bytes";
import { utf8ToBytes } from "../index.js";
import { BaseProtocolSDK } from "./base_protocol.js";
export const DefaultPageSize = 10;
import { BaseProtocolSDK } from "./base_protocol";
const DEFAULT_NUM_PEERS = 1;
const log = new Logger("waku:store:protocol");
const log = new Logger("waku:store:sdk");
/**
* StoreSDK is an implementation of the IStoreSDK interface.
* It provides methods to interact with the Waku Store protocol.
*/
export class StoreSDK extends BaseProtocolSDK implements IStoreSDK {
public readonly protocol: StoreCore;
@ -30,46 +29,35 @@ export class StoreSDK extends BaseProtocolSDK implements IStoreSDK {
libp2p: Libp2p,
options?: ProtocolCreateOptions
) {
// TODO: options.numPeersToUse is disregarded: https://github.com/waku-org/js-waku/issues/1685
super(new StoreCore(libp2p, options), connectionManager, {
numPeersToUse: DEFAULT_NUM_PEERS
});
this.protocol = this.core as StoreCore;
}
/**
* Do a query to a Waku Store to retrieve historical/missed messages.
* Queries the Waku Store for historical messages using the provided decoders and options.
* Returns an asynchronous generator that yields promises of decoded messages.
*
* This is a generator, useful if you want most control on how messages
* are processed.
*
* The order of the messages returned by the remote Waku node SHOULD BE
* as follows:
* - within a page, messages SHOULD be ordered from oldest to most recent
* - pages direction depends on { @link QueryOptions.pageDirection }
* @throws If not able to reach a Waku Store peer to query,
* or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed.
*
* This API only supports querying a single pubsub topic at a time.
* If multiple decoders are provided, they must all have the same pubsub topic.
* @throws If multiple decoders with different pubsub topics are provided.
* @throws If no decoders are provided.
* @throws If no decoders are found for the provided pubsub topic.
* @param decoders - An array of message decoders.
* @param options - Optional query parameters.
* @returns An asynchronous generator of promises of decoded messages.
* @throws If no peers are available to query or if an error occurs during the query.
*/
public async *queryGenerator<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
options?: waku_store.QueryOptions
options?: Partial<QueryRequestParams>
): AsyncGenerator<Promise<T | undefined>[]> {
const { pubsubTopic, contentTopics, decodersAsMap } =
this.validateDecodersAndPubsubTopic(decoders, options);
this.validateDecodersAndPubsubTopic(decoders);
const queryOpts = this.constructOptions(
const queryOpts = {
pubsubTopic,
contentTopics,
options
);
includeData: true,
paginationForward: true,
...options
};
const peer = (
await this.protocol.getPeers({
@ -77,9 +65,12 @@ export class StoreSDK extends BaseProtocolSDK implements IStoreSDK {
maxBootstrapPeers: 1
})
)[0];
if (!peer) {
log.error("No peers available to query");
throw new Error("No peers available to query");
}
if (!peer) throw new Error("No peers available to query");
log.info(`Querying store with options: ${JSON.stringify(options)}`);
const responseGenerator = this.protocol.queryPerPage(
queryOpts,
decodersAsMap,
@ -92,56 +83,40 @@ export class StoreSDK extends BaseProtocolSDK implements IStoreSDK {
}
/**
* Do a query to a Waku Store to retrieve historical/missed messages.
* Queries the Waku Store for historical messages and processes them with the provided callback in order.
*
* The callback function takes a `WakuMessage` in input,
* messages are processed in order:
* - oldest to latest if `options.pageDirection` == { @link PageDirection.FORWARD }
* - latest to oldest if `options.pageDirection` == { @link PageDirection.BACKWARD }
*
* The ordering may affect performance.
* The ordering depends on the behavior of the remote store node.
* If strong ordering is needed, you may need to handle this at application level
* and set your own timestamps too (the WakuMessage timestamps are not certified).
*
* @throws If not able to reach a Waku Store peer to query,
* or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed.
* @param decoders - An array of message decoders.
* @param callback - A callback function to process each decoded message.
* @param options - Optional query parameters.
* @returns A promise that resolves when the query and message processing are completed.
*/
public async queryWithOrderedCallback<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: (message: T) => Promise<void | boolean> | boolean | void,
options?: waku_store.QueryOptions
options?: Partial<QueryRequestParams>
): Promise<void> {
log.info("Querying store with ordered callback");
for await (const promises of this.queryGenerator(decoders, options)) {
if (await this.processMessages(promises, callback, options)) break;
if (await this.processMessages(promises, callback)) break;
}
}
/**
* Do a query to a Waku Store to retrieve historical/missed messages.
* The callback function takes a `Promise<WakuMessage>` in input,
* useful if messages need to be decrypted and performance matters.
* Queries the Waku Store for historical messages and processes them with the provided callback using promises.
*
* The order of the messages passed to the callback is as follows:
* - within a page, messages are expected to be ordered from oldest to most recent
* - pages direction depends on { @link QueryOptions.pageDirection }
*
* Do note that the resolution of the `Promise<WakuMessage | undefined` may
* break the order as it may rely on the browser decryption API, which in turn,
* may have a different speed depending on the type of decryption.
*
* @throws If not able to reach a Waku Store peer to query,
* or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed.
* @param decoders - An array of message decoders.
* @param callback - A callback function to process each promise of a decoded message.
* @param options - Optional query parameters.
* @returns A promise that resolves when the query and message processing are completed.
*/
public async queryWithPromiseCallback<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: (
message: Promise<T | undefined>
) => Promise<void | boolean> | boolean | void,
options?: waku_store.QueryOptions
options?: Partial<QueryRequestParams>
): Promise<void> {
log.info("Querying store with promise callback");
let abort = false;
for await (const page of this.queryGenerator(decoders, options)) {
const _promises = page.map(async (msgPromise) => {
@ -154,144 +129,21 @@ export class StoreSDK extends BaseProtocolSDK implements IStoreSDK {
}
}
public createCursor(message: IDecodedMessage): Cursor {
if (
!message ||
!message.timestamp ||
!message.payload ||
!message.contentTopic
) {
throw new Error("Message is missing required fields");
}
const contentTopicBytes = utf8ToBytes(message.contentTopic);
const digest = sha256(concat([contentTopicBytes, message.payload]));
const messageTime = BigInt(message.timestamp.getTime()) * BigInt(1000000);
return {
digest,
pubsubTopic: message.pubsubTopic,
senderTime: messageTime,
receiverTime: messageTime
};
}
private validateDecodersAndPubsubTopic<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
options?: waku_store.QueryOptions
): {
pubsubTopic: string;
contentTopics: string[];
decodersAsMap: Map<string, IDecoder<T>>;
} {
if (decoders.length === 0) {
throw new Error("No decoders provided");
}
// convert array to set to remove duplicates
const uniquePubsubTopicsInQuery = Array.from(
new Set(decoders.map((decoder) => decoder.pubsubTopic))
);
// If multiple pubsub topics are provided, throw an error
if (uniquePubsubTopicsInQuery.length > 1) {
throw new Error(
"API does not support querying multiple pubsub topics at once"
);
}
// we can be certain that there is only one pubsub topic in the query
const pubsubTopicForQuery = uniquePubsubTopicsInQuery[0];
ensurePubsubTopicIsConfigured(
pubsubTopicForQuery,
this.protocol.pubsubTopics
);
// check that the pubsubTopic from the Cursor and Decoder match
if (
options?.cursor?.pubsubTopic &&
options.cursor.pubsubTopic !== pubsubTopicForQuery
) {
throw new Error(
`Cursor pubsub topic (${options?.cursor?.pubsubTopic}) does not match decoder pubsub topic (${pubsubTopicForQuery})`
);
}
const decodersAsMap = new Map();
decoders.forEach((dec) => {
if (decodersAsMap.has(dec.contentTopic)) {
throw new Error(
"API does not support different decoder per content topic"
);
}
decodersAsMap.set(dec.contentTopic, dec);
});
const contentTopics = decoders
.filter((decoder) => decoder.pubsubTopic === pubsubTopicForQuery)
.map((dec) => dec.contentTopic);
if (contentTopics.length === 0) {
throw new Error("No decoders found for topic " + pubsubTopicForQuery);
}
return {
pubsubTopic: pubsubTopicForQuery,
contentTopics,
decodersAsMap
};
}
private constructOptions(
pubsubTopic: string,
contentTopics: string[],
options: waku_store.QueryOptions = {}
): waku_store.Params {
let startTime, endTime;
if (options?.timeFilter) {
startTime = options.timeFilter.startTime;
endTime = options.timeFilter.endTime;
}
if (!startTime) {
log.warn("No start time provided");
}
if (!endTime) {
log.warn("No end time provided");
}
const queryOpts = Object.assign(
{
pubsubTopic: pubsubTopic,
pageDirection: PageDirection.BACKWARD,
pageSize: DefaultPageSize
},
options,
{ contentTopics, startTime, endTime }
);
return queryOpts;
}
/**
* Processes messages based on the provided callback and options.
*
* @param messages - An array of promises of decoded messages.
* @param callback - A callback function to process each decoded message.
* @returns A promise that resolves to a boolean indicating whether the processing should abort.
* @private
*/
private async processMessages<T extends IDecodedMessage>(
messages: Promise<T | undefined>[],
callback: (message: T) => Promise<void | boolean> | boolean | void,
options?: waku_store.QueryOptions
callback: (message: T) => Promise<void | boolean> | boolean | void
): Promise<boolean> {
let abort = false;
const messagesOrUndef: Array<T | undefined> = await Promise.all(messages);
let processedMessages: Array<T> = messagesOrUndef.filter(isDefined);
if (this.shouldReverseOrder(options)) {
processedMessages = processedMessages.reverse();
}
const processedMessages: Array<T> = messagesOrUndef.filter(isDefined);
await Promise.all(
processedMessages.map(async (msg) => {
@ -305,24 +157,91 @@ export class StoreSDK extends BaseProtocolSDK implements IStoreSDK {
}
/**
* Determines whether to reverse the order of messages based on the provided options.
* Creates a cursor based on the provided decoded message.
*
* Messages in pages are ordered from oldest (first) to most recent (last).
* https://github.com/vacp2p/rfc/issues/533
* @param message - The decoded message.
* @returns A StoreCursor representing the message.
*/
public createCursor(message: IDecodedMessage): StoreCursor {
return messageHash(message.pubsubTopic, message);
}
/**
* Validates the provided decoders and pubsub topic.
*
* @param decoders - An array of message decoders.
* @returns An object containing the pubsub topic, content topics, and a map of decoders.
* @throws If no decoders are provided, if multiple pubsub topics are provided, or if no decoders are found for the pubsub topic.
* @private
*/
private shouldReverseOrder(options?: waku_store.QueryOptions): boolean {
return (
typeof options?.pageDirection === "undefined" ||
options?.pageDirection === PageDirection.BACKWARD
private validateDecodersAndPubsubTopic<T extends IDecodedMessage>(
decoders: IDecoder<T>[]
): {
pubsubTopic: string;
contentTopics: string[];
decodersAsMap: Map<string, IDecoder<T>>;
} {
if (decoders.length === 0) {
log.error("No decoders provided");
throw new Error("No decoders provided");
}
const uniquePubsubTopicsInQuery = Array.from(
new Set(decoders.map((decoder) => decoder.pubsubTopic))
);
if (uniquePubsubTopicsInQuery.length > 1) {
log.error("API does not support querying multiple pubsub topics at once");
throw new Error(
"API does not support querying multiple pubsub topics at once"
);
}
const pubsubTopicForQuery = uniquePubsubTopicsInQuery[0];
ensurePubsubTopicIsConfigured(
pubsubTopicForQuery,
this.protocol.pubsubTopics
);
const decodersAsMap = new Map();
decoders.forEach((dec) => {
if (decodersAsMap.has(dec.contentTopic)) {
log.error("API does not support different decoder per content topic");
throw new Error(
"API does not support different decoder per content topic"
);
}
decodersAsMap.set(dec.contentTopic, dec);
});
const contentTopics = decoders
.filter((decoder) => decoder.pubsubTopic === pubsubTopicForQuery)
.map((dec) => dec.contentTopic);
if (contentTopics.length === 0) {
log.error(`No decoders found for topic ${pubsubTopicForQuery}`);
throw new Error("No decoders found for topic " + pubsubTopicForQuery);
}
return {
pubsubTopic: pubsubTopicForQuery,
contentTopics,
decodersAsMap
};
}
}
/**
* Factory function to create an instance of the StoreSDK.
*
* @param init - Partial options for protocol creation.
* @returns A function that takes a Libp2p instance and returns a StoreSDK instance.
*/
export function wakuStore(
connectionManager: ConnectionManager,
init: Partial<ProtocolCreateOptions> = {}
): (libp2p: Libp2p) => IStoreSDK {
return (libp2p: Libp2p) => new StoreSDK(connectionManager, libp2p, init);
return (libp2p: Libp2p) => {
return new StoreSDK(connectionManager, libp2p, init);
};
}

View File

@ -53,7 +53,7 @@ describe("Waku Store, cursor", function () {
// messages in reversed order (first message at last index)
const messages: DecodedMessage[] = [];
for await (const page of waku.store.queryGenerator([TestDecoder])) {
for await (const msg of page.reverse()) {
for await (const msg of page) {
messages.push(msg as DecodedMessage);
}
}
@ -63,9 +63,9 @@ describe("Waku Store, cursor", function () {
const messagesAfterCursor: DecodedMessage[] = [];
for await (const page of waku.store.queryGenerator([TestDecoder], {
cursor
paginationCursor: cursor
})) {
for await (const msg of page.reverse()) {
for await (const msg of page) {
if (msg) {
messagesAfterCursor.push(msg as DecodedMessage);
}
@ -102,7 +102,7 @@ describe("Waku Store, cursor", function () {
// messages in reversed order (first message at last index)
const messages: DecodedMessage[] = [];
for await (const page of waku.store.queryGenerator([TestDecoder])) {
for await (const msg of page.reverse()) {
for await (const msg of page) {
messages.push(msg as DecodedMessage);
}
}
@ -113,9 +113,9 @@ describe("Waku Store, cursor", function () {
// query node2 with the cursor from node1
const messagesAfterCursor: DecodedMessage[] = [];
for await (const page of waku2.store.queryGenerator([TestDecoder], {
cursor
paginationCursor: cursor
})) {
for await (const msg of page.reverse()) {
for await (const msg of page) {
if (msg) {
messagesAfterCursor.push(msg as DecodedMessage);
}
@ -132,7 +132,7 @@ describe("Waku Store, cursor", function () {
).to.be.eq(bytesToUtf8(messages[messages.length - 1].payload));
});
it("Passing cursor with wrong message digest", async function () {
it("Passing invalid cursor", async function () {
await sendMessages(
nwaku,
totalMsgs,
@ -142,39 +142,35 @@ describe("Waku Store, cursor", function () {
const messages: DecodedMessage[] = [];
for await (const page of waku.store.queryGenerator([TestDecoder])) {
for await (const msg of page.reverse()) {
for await (const msg of page) {
messages.push(msg as DecodedMessage);
}
}
const cursor = waku.store.createCursor(messages[5]);
// setting a wrong digest
cursor.digest = new Uint8Array([]);
// setting an invalid cursor
const cursor = new Uint8Array([2, 3]);
const messagesAfterCursor: DecodedMessage[] = [];
try {
for await (const page of waku.store.queryGenerator([TestDecoder], {
cursor
paginationCursor: cursor
})) {
for await (const msg of page.reverse()) {
for await (const msg of page) {
if (msg) {
messagesAfterCursor.push(msg as DecodedMessage);
}
}
}
// Should return same as go-waku. Raised bug: https://github.com/waku-org/nwaku/issues/2117
expect(messagesAfterCursor.length).to.eql(0);
} catch (error) {
} catch (err) {
if (
nwaku.type === "go-waku" &&
typeof error === "string" &&
error.includes("History response contains an Error: INVALID_CURSOR")
!(err instanceof Error) ||
!err.message.includes(
`Store query failed with status code: 300, description: BAD_RESPONSE: archive error: DIRVER_ERROR: cursor not found`
)
) {
return;
throw err;
}
throw error instanceof Error
? new Error(`Unexpected error: ${error.message}`)
: error;
}
});
@ -188,7 +184,7 @@ describe("Waku Store, cursor", function () {
const messages: DecodedMessage[] = [];
for await (const page of waku.store.queryGenerator([TestDecoder])) {
for await (const msg of page.reverse()) {
for await (const msg of page) {
messages.push(msg as DecodedMessage);
}
}
@ -197,7 +193,7 @@ describe("Waku Store, cursor", function () {
try {
for await (const page of waku.store.queryGenerator([TestDecoder], {
cursor
paginationCursor: cursor
})) {
void page;
}
@ -206,7 +202,7 @@ describe("Waku Store, cursor", function () {
if (
!(err instanceof Error) ||
!err.message.includes(
`Cursor pubsub topic (${TestDecoder2.pubsubTopic}) does not match decoder pubsub topic (${TestDecoder.pubsubTopic})`
`Store query failed with status code: 300, description: BAD_RESPONSE: archive error: DIRVER_ERROR: cursor not found`
)
) {
throw err;

View File

@ -215,7 +215,7 @@ describe("Waku Store, general", function () {
}
return messages.length >= desiredMsgs;
},
{ pageSize: 7 }
{ paginationLimit: 7 }
);
expect(messages?.length).eq(desiredMsgs);
@ -334,7 +334,7 @@ describe("Waku Store, general", function () {
messages.push(msg);
return messages.length >= desiredMsgs;
},
{ pageSize: 7 }
{ paginationLimit: 7 }
);
expect(messages?.length).eq(desiredMsgs);

View File

@ -1,4 +1,4 @@
import { DecodedMessage, PageDirection } from "@waku/core";
import { DecodedMessage } from "@waku/core";
import type { IMessage, LightNode } from "@waku/interfaces";
import { expect } from "chai";
@ -10,7 +10,6 @@ import {
} from "../../src/index.js";
import {
chunkAndReverseArray,
runStoreNodes,
sendMessages,
TestDecoder,
@ -31,7 +30,7 @@ describe("Waku Store, order", function () {
await tearDownNodes(nwaku, waku);
});
[PageDirection.FORWARD, PageDirection.BACKWARD].forEach((pageDirection) => {
[true, false].forEach((pageDirection) => {
it(`Query Generator - ${pageDirection}`, async function () {
await sendMessages(
nwaku,
@ -42,7 +41,7 @@ describe("Waku Store, order", function () {
const messages: IMessage[] = [];
for await (const query of waku.store.queryGenerator([TestDecoder], {
pageDirection: pageDirection
paginationForward: pageDirection
})) {
for await (const msg of query) {
if (msg) {
@ -51,10 +50,7 @@ describe("Waku Store, order", function () {
}
}
let expectedPayloads = Array.from(Array(totalMsgs).keys());
if (pageDirection === PageDirection.BACKWARD) {
expectedPayloads = chunkAndReverseArray(expectedPayloads, 10);
}
const expectedPayloads = Array.from(Array(totalMsgs).keys());
expect(messages?.length).eq(totalMsgs);
const payloads = messages.map((msg) => msg.payload[0]!);
@ -62,7 +58,7 @@ describe("Waku Store, order", function () {
});
});
[PageDirection.FORWARD, PageDirection.BACKWARD].forEach((pageDirection) => {
[true, false].forEach((pageDirection) => {
it(`Promise Callback - ${pageDirection}`, async function () {
await sendMessages(
nwaku,
@ -81,14 +77,11 @@ describe("Waku Store, order", function () {
}
},
{
pageDirection: pageDirection
paginationForward: pageDirection
}
);
let expectedPayloads = Array.from(Array(totalMsgs).keys());
if (pageDirection === PageDirection.BACKWARD) {
expectedPayloads = chunkAndReverseArray(expectedPayloads, 10);
}
const expectedPayloads = Array.from(Array(totalMsgs).keys());
expect(messages?.length).eq(totalMsgs);
const payloads = messages.map((msg) => msg.payload[0]!);
@ -96,7 +89,7 @@ describe("Waku Store, order", function () {
});
});
[PageDirection.FORWARD, PageDirection.BACKWARD].forEach((pageDirection) => {
[true, false].forEach((pageDirection) => {
it(`Ordered Callback - ${pageDirection}`, async function () {
await sendMessages(
nwaku,
@ -112,13 +105,10 @@ describe("Waku Store, order", function () {
messages.push(msg);
},
{
pageDirection: pageDirection
paginationForward: pageDirection
}
);
if (pageDirection === PageDirection.BACKWARD) {
messages.reverse();
}
expect(messages?.length).eq(totalMsgs);
const payloads = messages.map((msg) => msg.payload[0]!);
expect(payloads).to.deep.eq(Array.from(Array(totalMsgs).keys()));

View File

@ -50,16 +50,12 @@ describe("Waku Store, page size", function () {
if (pageSize === 0) {
effectivePageSize = 20;
} else if (pageSize > 100) {
if (nwaku.type == "go-waku") {
effectivePageSize = 20;
} else {
effectivePageSize = 100;
}
effectivePageSize = 100;
}
let messagesRetrieved = 0;
for await (const query of waku.store.queryGenerator([TestDecoder], {
pageSize: pageSize
paginationLimit: pageSize
})) {
// Calculate expected page size
const expectedPageSize = Math.min(
@ -90,7 +86,7 @@ describe("Waku Store, page size", function () {
let messagesRetrieved = 0;
for await (const query of waku.store.queryGenerator([TestDecoder])) {
expect(query.length).eq(10);
expect(query.length).eq(20);
for await (const msg of query) {
if (msg) {
messagesRetrieved++;

View File

@ -1,4 +1,4 @@
import { DecodedMessage, PageDirection } from "@waku/core";
import { DecodedMessage } from "@waku/core";
import type { IMessage, LightNode } from "@waku/interfaces";
import {
@ -29,7 +29,7 @@ describe("Waku Store, sorting", function () {
await tearDownNodes(nwaku, waku);
});
[PageDirection.FORWARD, PageDirection.BACKWARD].forEach((pageDirection) => {
[true, false].forEach((pageDirection) => {
it(`Query Generator sorting by timestamp while page direction is ${pageDirection}`, async function () {
await sendMessages(
nwaku,
@ -38,8 +38,10 @@ describe("Waku Store, sorting", function () {
TestDecoder.pubsubTopic
);
const pages: IMessage[][] = [];
for await (const query of waku.store.queryGenerator([TestDecoder], {
pageDirection: PageDirection.FORWARD
paginationForward: pageDirection
})) {
const page: IMessage[] = [];
for await (const msg of query) {
@ -47,23 +49,48 @@ describe("Waku Store, sorting", function () {
page.push(msg as DecodedMessage);
}
}
// Extract timestamps
pages.push(page);
// Check order within the current page
const timestamps = page.map(
(msg) => msg.timestamp as unknown as bigint
);
// Check if timestamps are sorted
for (let i = 1; i < timestamps.length; i++) {
if (timestamps[i] < timestamps[i - 1]) {
throw new Error(
`Messages are not sorted by timestamp. Found out of order at index ${i}`
`Messages within page ${pages.length - 1} are not in sequential order. Found out of order at index ${i}`
);
}
}
}
// Check order between pages
for (let i = 1; i < pages.length; i++) {
const prevPageLastTimestamp = pages[i - 1][pages[i - 1].length - 1]
.timestamp as unknown as bigint;
const currentPageFirstTimestamp = pages[i][0]
.timestamp as unknown as bigint;
if (
pageDirection === true &&
prevPageLastTimestamp < currentPageFirstTimestamp
) {
throw new Error(
`Pages are not in reversed order for FORWARD direction. Issue found between page ${i - 1} and ${i}`
);
} else if (
pageDirection === false &&
prevPageLastTimestamp > currentPageFirstTimestamp
) {
throw new Error(
`Pages are not in reversed order for BACKWARD direction. Issue found between page ${i - 1} and ${i}`
);
}
}
});
});
[PageDirection.FORWARD, PageDirection.BACKWARD].forEach((pageDirection) => {
[true, false].forEach((pageDirection) => {
it(`Ordered Callback sorting by timestamp while page direction is ${pageDirection}`, async function () {
await sendMessages(
nwaku,
@ -73,34 +100,56 @@ describe("Waku Store, sorting", function () {
);
const messages: IMessage[] = [];
const pageSize = 5;
// receive 4 pages, 5 messages each (20/4)
await waku.store.queryWithOrderedCallback(
[TestDecoder],
async (msg) => {
messages.push(msg);
},
{
pageDirection: pageDirection
{ paginationLimit: pageSize, paginationForward: pageDirection }
);
// Split messages into pages
const pages: IMessage[][] = [];
for (let i = 0; i < messages.length; i += pageSize) {
pages.push(messages.slice(i, i + pageSize));
}
// Check order within each page
pages.forEach((page, pageIndex) => {
const pageTimestamps = page.map(
(msg) => msg.timestamp as unknown as bigint
);
for (let i = 1; i < pageTimestamps.length; i++) {
if (pageTimestamps[i] < pageTimestamps[i - 1]) {
throw new Error(
`Messages within page ${pageIndex} are not in sequential order. Found out of order at index ${i}`
);
}
}
);
// Extract timestamps
const timestamps = messages.map(
(msg) => msg.timestamp as unknown as bigint
);
// Check if timestamps are sorted
for (let i = 1; i < timestamps.length; i++) {
});
// Check order between pages
for (let i = 1; i < pages.length; i++) {
const prevPageLastTimestamp = pages[i - 1][pages[i - 1].length - 1]
.timestamp as unknown as bigint;
const currentPageFirstTimestamp = pages[i][0]
.timestamp as unknown as bigint;
if (
pageDirection === PageDirection.FORWARD &&
timestamps[i] < timestamps[i - 1]
pageDirection === true &&
prevPageLastTimestamp > currentPageFirstTimestamp
) {
throw new Error(
`Messages are not sorted by timestamp in FORWARD direction. Found out of order at index ${i}`
`Pages are not in reversed order for FORWARD direction. Issue found between page ${i - 1} and ${i}`
);
} else if (
pageDirection === PageDirection.BACKWARD &&
timestamps[i] > timestamps[i - 1]
pageDirection === false &&
prevPageLastTimestamp < currentPageFirstTimestamp
) {
throw new Error(
`Messages are not sorted by timestamp in BACKWARD direction. Found out of order at index ${i}`
`Pages are not in reversed order for BACKWARD direction. Issue found between page ${i - 1} and ${i}`
);
}
}

View File

@ -60,10 +60,8 @@ describe("Waku Store, time filter", function () {
}
},
{
timeFilter: {
startTime: adjustDate(msgTimestamp, startTime),
endTime: adjustDate(msgTimestamp, endTime)
}
timeStart: adjustDate(msgTimestamp, startTime),
timeEnd: adjustDate(msgTimestamp, endTime)
}
);
@ -103,10 +101,8 @@ describe("Waku Store, time filter", function () {
}
},
{
timeFilter: {
startTime: adjustDate(msgTimestamp, -1000),
endTime: adjustDate(msgTimestamp, 1000)
}
timeStart: adjustDate(msgTimestamp, -1000),
timeEnd: adjustDate(msgTimestamp, 1000)
}
);