diff --git a/packages/core/src/lib/store/store.ts b/packages/core/src/lib/store/store.ts index 9597ecf1d2..5a25cfb4de 100644 --- a/packages/core/src/lib/store/store.ts +++ b/packages/core/src/lib/store/store.ts @@ -72,6 +72,11 @@ export class StoreCore { let currentCursor = queryOpts.paginationCursor; while (true) { + if (queryOpts.abortSignal?.aborted) { + log.info("Store query aborted by signal"); + break; + } + const storeQueryRequest = StoreQueryRequest.create({ ...queryOpts, paginationCursor: currentCursor @@ -93,13 +98,22 @@ export class StoreCore { break; } - const res = await pipe( - [storeQueryRequest.encode()], - lp.encode, - stream, - lp.decode, - async (source) => await all(source) - ); + let res; + try { + res = await pipe( + [storeQueryRequest.encode()], + lp.encode, + stream, + lp.decode, + async (source) => await all(source) + ); + } catch (error) { + if (error instanceof Error && error.name === "AbortError") { + log.info(`Store query aborted for peer ${peerId.toString()}`); + break; + } + throw error; + } const bytes = new Uint8ArrayList(); res.forEach((chunk) => { @@ -126,6 +140,11 @@ export class StoreCore { `${storeQueryResponse.messages.length} messages retrieved from store` ); + if (queryOpts.abortSignal?.aborted) { + log.info("Store query aborted by signal before processing messages"); + break; + } + const decodedMessages = storeQueryResponse.messages.map((protoMsg) => { if (!protoMsg.message) { return Promise.resolve(undefined); diff --git a/packages/interfaces/src/store.ts b/packages/interfaces/src/store.ts index 8a1e91a451..0bb5128237 100644 --- a/packages/interfaces/src/store.ts +++ b/packages/interfaces/src/store.ts @@ -88,6 +88,12 @@ export type QueryRequestParams = { * Only use if you know what you are doing. */ peerId?: PeerId; + + /** + * An optional AbortSignal to cancel the query. + * When the signal is aborted, the query will stop processing and return early. + */ + abortSignal?: AbortSignal; }; export type IStore = {