feat: query on connect stops on predicate

This commit is contained in:
fryorcraken 2025-09-30 15:50:18 +10:00
parent bbcfc94879
commit 88a3bf641b
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
2 changed files with 23 additions and 1 deletions

View File

@ -54,6 +54,7 @@ export class QueryOnConnect<
public constructor(
public decoders: IDecoder<T>[],
public stopIfTrue: (msg: T) => boolean,
private readonly peerManagerEventEmitter: TypedEventEmitter<IPeerManagerEvents>,
private readonly wakuEventEmitter: IWakuEventEmitter,
private readonly _queryGenerator: <T extends IDecodedMessage>(
@ -125,8 +126,13 @@ export class QueryOnConnect<
const messages = (await Promise.all(page)).filter(
(m) => m !== undefined
);
const stop = messages.some((msg: T) => this.stopIfTrue(msg));
// Bundle the messages to help batch process by sds
this.dispatchMessages(messages);
if (stop) {
break;
}
}
// Didn't throw, so it didn't fail

View File

@ -15,6 +15,7 @@ import {
import {
type ChannelId,
isContentMessage,
isSyncMessage,
MessageChannel,
MessageChannelEvent,
type MessageChannelOptions,
@ -185,9 +186,9 @@ export class ReliableChannel<
peerManagerEvents !== undefined &&
(options?.queryOnConnect ?? true)
) {
log.info("auto-query enabled");
this.queryOnConnect = new QueryOnConnect(
[this.decoder],
this.isSyncOrContentMessage.bind(this),
peerManagerEvents,
node.events,
this._retrieve.bind(this)
@ -580,6 +581,21 @@ export class ReliableChannel<
this.messageChannel.sweepOutgoingBuffer();
}
private isSyncOrContentMessage(msg: T): boolean {
// TODO: we do end-up decoding messages twice as this is used to stop store queries.
const sdsMessage = SdsMessage.decode(msg.payload);
if (!sdsMessage) {
return false;
}
if (sdsMessage.channelId !== this.messageChannel.channelId) {
return false;
}
return isSyncMessage(sdsMessage) || isContentMessage(sdsMessage);
}
private setupEventListeners(): void {
this.messageChannel.addEventListener(
MessageChannelEvent.OutMessageSent,