From 593bc452259c48503e099946bb316c764f89cc1d Mon Sep 17 00:00:00 2001 From: fryorcraken <110212804+fryorcraken@users.noreply.github.com> Date: Wed, 1 Oct 2025 21:35:52 +1000 Subject: [PATCH] feat: reliable channels search up to 30 days to find message (#2657) * feat: query on connect stops on predicate * test: query on connect stops at predicate * feat: reliable channels search up to 30 days to find message Queries stop once a valid sync or content message is found in the channel. * fix: protect against decoding exceptions * stop range queries on messages with a causal history --- .../query_on_connect/query_on_connect.spec.ts | 255 ++++++++++ .../src/query_on_connect/query_on_connect.ts | 8 +- .../reliable_channel/reliable_channel.spec.ts | 454 +++++++++++++++++- .../src/reliable_channel/reliable_channel.ts | 17 +- packages/sds/src/message_channel/message.ts | 86 ++-- 5 files changed, 776 insertions(+), 44 deletions(-) diff --git a/packages/sdk/src/query_on_connect/query_on_connect.spec.ts b/packages/sdk/src/query_on_connect/query_on_connect.spec.ts index 9006239d33..b87caa5ce7 100644 --- a/packages/sdk/src/query_on_connect/query_on_connect.spec.ts +++ b/packages/sdk/src/query_on_connect/query_on_connect.spec.ts @@ -95,6 +95,7 @@ describe("QueryOnConnect", () => { it("should create QueryOnConnect instance with all required parameters", () => { queryOnConnect = new QueryOnConnect( mockDecoders, + () => false, mockPeerManagerEventEmitter, mockWakuEventEmitter, mockQueryGenerator, @@ -108,6 +109,7 @@ describe("QueryOnConnect", () => { it("should create QueryOnConnect instance without options", () => { queryOnConnect = new QueryOnConnect( mockDecoders, + () => false, mockPeerManagerEventEmitter, mockWakuEventEmitter, mockQueryGenerator @@ -120,6 +122,7 @@ describe("QueryOnConnect", () => { it("should accept empty decoders array", () => { queryOnConnect = new QueryOnConnect( [], + () => false, mockPeerManagerEventEmitter, mockWakuEventEmitter, mockQueryGenerator, @@ -134,6 +137,7 @@ describe("QueryOnConnect", () => { beforeEach(() => { queryOnConnect = new QueryOnConnect( mockDecoders, + () => false, mockPeerManagerEventEmitter, mockWakuEventEmitter, mockQueryGenerator, @@ -173,6 +177,7 @@ describe("QueryOnConnect", () => { beforeEach(() => { queryOnConnect = new QueryOnConnect( mockDecoders, + () => false, mockPeerManagerEventEmitter, mockWakuEventEmitter, mockQueryGenerator, @@ -224,6 +229,7 @@ describe("QueryOnConnect", () => { queryOnConnect = new QueryOnConnect( mockDecoders, + () => false, mockPeerManagerEventEmitter, mockWakuEventEmitter, mockQueryGenerator, @@ -276,6 +282,7 @@ describe("QueryOnConnect", () => { queryOnConnect = new QueryOnConnect( mockDecoders, + () => false, mockPeerManagerEventEmitter, mockWakuEventEmitter, mockQueryGenerator, @@ -298,6 +305,7 @@ describe("QueryOnConnect", () => { queryOnConnect = new QueryOnConnect( mockDecoders, + () => false, mockPeerManagerEventEmitter, mockWakuEventEmitter, mockQueryGenerator, @@ -320,6 +328,7 @@ describe("QueryOnConnect", () => { queryOnConnect = new QueryOnConnect( mockDecoders, + () => false, mockPeerManagerEventEmitter, mockWakuEventEmitter, mockQueryGenerator, @@ -391,6 +400,7 @@ describe("QueryOnConnect", () => { const queryOnConnect = new QueryOnConnect( mockDecoders, + () => false, mockPeerManagerEventEmitter, mockWakuEventEmitter, mockQueryGenerator, @@ -418,6 +428,7 @@ describe("QueryOnConnect", () => { queryOnConnect = new QueryOnConnect( mockDecoders, + () => false, mockPeerManagerEventEmitter, mockWakuEventEmitter, mockQueryGenerator, @@ -473,6 +484,7 @@ describe("QueryOnConnect", () => { queryOnConnect = new QueryOnConnect( mockDecoders, + () => false, mockPeerManagerEventEmitter, mockWakuEventEmitter, mockQueryGenerator, @@ -605,6 +617,7 @@ describe("QueryOnConnect", () => { queryOnConnect = new QueryOnConnect( mockDecoders, + () => false, mockPeerManagerEventEmitter, mockWakuEventEmitter, mockQueryGenerator, @@ -750,6 +763,248 @@ describe("QueryOnConnect", () => { expect(mockQueryGenerator.calledTwice).to.be.true; }); }); + + describe("stopIfTrue predicate", () => { + beforeEach(() => { + mockPeerManagerEventEmitter.addEventListener = sinon.stub(); + mockWakuEventEmitter.addEventListener = sinon.stub(); + }); + + it("should stop query iteration when stopIfTrue returns true", async () => { + const messages = [ + { + hash: new Uint8Array(), + hashStr: "msg1", + version: 1, + timestamp: new Date(), + contentTopic: "/test/1/content", + pubsubTopic: "/waku/2/default-waku/proto", + payload: new Uint8Array([1]), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + }, + { + hash: new Uint8Array(), + hashStr: "stop-hash", + version: 1, + timestamp: new Date(), + contentTopic: "/test/1/content", + pubsubTopic: "/waku/2/default-waku/proto", + payload: new Uint8Array([2]), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + }, + { + hash: new Uint8Array(), + hashStr: "msg3", + version: 1, + timestamp: new Date(), + contentTopic: "/test/1/content", + pubsubTopic: "/waku/2/default-waku/proto", + payload: new Uint8Array([3]), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + } + ]; + + // Setup generator to yield 3 pages, stop should occur on page 2 + const mockAsyncGenerator = async function* (): AsyncGenerator< + Promise[] + > { + yield [Promise.resolve(messages[0])]; + yield [Promise.resolve(messages[1])]; + yield [Promise.resolve(messages[2])]; + }; + mockQueryGenerator.returns(mockAsyncGenerator()); + + const stopPredicate = (msg: IDecodedMessage): boolean => + msg.hashStr === "stop-hash"; + + queryOnConnect = new QueryOnConnect( + mockDecoders, + stopPredicate, + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator, + options + ); + + const receivedMessages: IDecodedMessage[] = []; + queryOnConnect.addEventListener( + QueryOnConnectEvent.MessagesRetrieved, + (event: CustomEvent) => { + receivedMessages.push(...event.detail); + } + ); + + queryOnConnect.start(); + await queryOnConnect["maybeQuery"](mockPeerId); + + // Should have received messages from first 2 pages only + expect(receivedMessages).to.have.length(2); + expect(receivedMessages[0].hashStr).to.equal("msg1"); + expect(receivedMessages[1].hashStr).to.equal("stop-hash"); + }); + + it("should process all pages when stopIfTrue never returns true", async () => { + const messages = [ + { + hash: new Uint8Array(), + hashStr: "msg1", + version: 1, + timestamp: new Date(), + contentTopic: "/test/1/content", + pubsubTopic: "/waku/2/default-waku/proto", + payload: new Uint8Array([1]), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + }, + { + hash: new Uint8Array(), + hashStr: "msg2", + version: 1, + timestamp: new Date(), + contentTopic: "/test/1/content", + pubsubTopic: "/waku/2/default-waku/proto", + payload: new Uint8Array([2]), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + }, + { + hash: new Uint8Array(), + hashStr: "msg3", + version: 1, + timestamp: new Date(), + contentTopic: "/test/1/content", + pubsubTopic: "/waku/2/default-waku/proto", + payload: new Uint8Array([3]), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + } + ]; + + const mockAsyncGenerator = async function* (): AsyncGenerator< + Promise[] + > { + yield [Promise.resolve(messages[0])]; + yield [Promise.resolve(messages[1])]; + yield [Promise.resolve(messages[2])]; + }; + mockQueryGenerator.returns(mockAsyncGenerator()); + + const stopPredicate = (): boolean => false; + + queryOnConnect = new QueryOnConnect( + mockDecoders, + stopPredicate, + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator, + options + ); + + const receivedMessages: IDecodedMessage[] = []; + queryOnConnect.addEventListener( + QueryOnConnectEvent.MessagesRetrieved, + (event: CustomEvent) => { + receivedMessages.push(...event.detail); + } + ); + + queryOnConnect.start(); + await queryOnConnect["maybeQuery"](mockPeerId); + + // Should have received all 3 messages + expect(receivedMessages).to.have.length(3); + }); + + it("should stop on first message of a page if stopIfTrue matches", async () => { + const messages = [ + { + hash: new Uint8Array(), + hashStr: "stop-hash", + version: 1, + timestamp: new Date(), + contentTopic: "/test/1/content", + pubsubTopic: "/waku/2/default-waku/proto", + payload: new Uint8Array([1]), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + }, + { + hash: new Uint8Array(), + hashStr: "msg2", + version: 1, + timestamp: new Date(), + contentTopic: "/test/1/content", + pubsubTopic: "/waku/2/default-waku/proto", + payload: new Uint8Array([2]), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + }, + { + hash: new Uint8Array(), + hashStr: "msg3", + version: 1, + timestamp: new Date(), + contentTopic: "/test/1/content", + pubsubTopic: "/waku/2/default-waku/proto", + payload: new Uint8Array([3]), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + } + ]; + + const mockAsyncGenerator = async function* (): AsyncGenerator< + Promise[] + > { + yield [ + Promise.resolve(messages[0]), + Promise.resolve(messages[1]), + Promise.resolve(messages[2]) + ]; + }; + mockQueryGenerator.returns(mockAsyncGenerator()); + + const stopPredicate = (msg: IDecodedMessage): boolean => + msg.hashStr === "stop-hash"; + + queryOnConnect = new QueryOnConnect( + mockDecoders, + stopPredicate, + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator, + options + ); + + const receivedMessages: IDecodedMessage[] = []; + queryOnConnect.addEventListener( + QueryOnConnectEvent.MessagesRetrieved, + (event: CustomEvent) => { + receivedMessages.push(...event.detail); + } + ); + + queryOnConnect.start(); + await queryOnConnect["maybeQuery"](mockPeerId); + + // Should have received all 3 messages from the page, even though first matched + expect(receivedMessages).to.have.length(3); + expect(receivedMessages[0].hashStr).to.equal("stop-hash"); + expect(receivedMessages[1].hashStr).to.equal("msg2"); + expect(receivedMessages[2].hashStr).to.equal("msg3"); + }); + }); }); describe("calculateTimeRange", () => { diff --git a/packages/sdk/src/query_on_connect/query_on_connect.ts b/packages/sdk/src/query_on_connect/query_on_connect.ts index f42c2ada91..da9e78a763 100644 --- a/packages/sdk/src/query_on_connect/query_on_connect.ts +++ b/packages/sdk/src/query_on_connect/query_on_connect.ts @@ -17,7 +17,7 @@ import { const log = new Logger("sdk:query-on-connect"); export const DEFAULT_FORCE_QUERY_THRESHOLD_MS = 5 * 60 * 1000; // 5 minutes -export const MAX_TIME_RANGE_QUERY_MS = 24 * 60 * 60 * 1000; // 24 hours +export const MAX_TIME_RANGE_QUERY_MS = 30 * 24 * 60 * 60 * 1000; // 30 days (queries are split) export interface QueryOnConnectOptions { /** @@ -54,6 +54,7 @@ export class QueryOnConnect< public constructor( public decoders: IDecoder[], + public stopIfTrue: (msg: T) => boolean, private readonly peerManagerEventEmitter: TypedEventEmitter, private readonly wakuEventEmitter: IWakuEventEmitter, private readonly _queryGenerator: ( @@ -125,8 +126,13 @@ export class QueryOnConnect< const messages = (await Promise.all(page)).filter( (m) => m !== undefined ); + const stop = messages.some((msg: T) => this.stopIfTrue(msg)); // Bundle the messages to help batch process by sds this.dispatchMessages(messages); + + if (stop) { + break; + } } // Didn't throw, so it didn't fail diff --git a/packages/sdk/src/reliable_channel/reliable_channel.spec.ts b/packages/sdk/src/reliable_channel/reliable_channel.spec.ts index 28563feebf..7b29a6b55d 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel.spec.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel.spec.ts @@ -13,7 +13,7 @@ import { LightPushSDKResult, QueryRequestParams } from "@waku/interfaces"; -import { ContentMessage } from "@waku/sds"; +import { ContentMessage, SyncMessage } from "@waku/sds"; import { createRoutingInfo, delay, @@ -678,4 +678,456 @@ describe("Reliable Channel", () => { expect(queryGeneratorStub.called).to.be.true; }); }); + + describe("stopIfTrue Integration with QueryOnConnect", () => { + let mockWakuNode: MockWakuNode; + let encoder: IEncoder; + let decoder: IDecoder; + let mockPeerManagerEvents: TypedEventEmitter; + let queryGeneratorStub: sinon.SinonStub; + let mockPeerId: PeerId; + + beforeEach(async () => { + mockWakuNode = new MockWakuNode(); + mockPeerManagerEvents = new TypedEventEmitter(); + (mockWakuNode as any).peerManager = { + events: mockPeerManagerEvents + }; + + encoder = createEncoder({ + contentTopic: TEST_CONTENT_TOPIC, + routingInfo: TEST_ROUTING_INFO + }); + + decoder = createDecoder(TEST_CONTENT_TOPIC, TEST_ROUTING_INFO); + + queryGeneratorStub = sinon.stub(); + mockWakuNode.store = { + queryGenerator: queryGeneratorStub + } as any; + + mockPeerId = { + toString: () => "QmTestPeerId" + } as unknown as PeerId; + }); + + it("should stop query when sync message from same channel is found", async () => { + const channelId = "testChannel"; + const senderId = "testSender"; + + // Create messages: one from different channel, one sync from same channel, one more + const sdsMessageDifferentChannel = new ContentMessage( + "msg1", + "differentChannel", + senderId, + [], + 1, + undefined, + utf8ToBytes("different channel") + ); + + const sdsSyncMessage = new SyncMessage( + "sync-msg-id", + channelId, + senderId, + [], + 2, + undefined, + undefined + ); + + const sdsMessageAfterSync = new ContentMessage( + "msg3", + channelId, + senderId, + [], + 3, + undefined, + utf8ToBytes("after sync") + ); + + const messages: IDecodedMessage[] = [ + { + hash: hexToBytes("1111"), + hashStr: "1111", + version: 1, + timestamp: new Date(), + contentTopic: TEST_CONTENT_TOPIC, + pubsubTopic: decoder.pubsubTopic, + payload: sdsMessageDifferentChannel.encode(), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + }, + { + hash: hexToBytes("2222"), + hashStr: "2222", + version: 1, + timestamp: new Date(), + contentTopic: TEST_CONTENT_TOPIC, + pubsubTopic: decoder.pubsubTopic, + payload: sdsSyncMessage.encode(), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + }, + { + hash: hexToBytes("3333"), + hashStr: "3333", + version: 1, + timestamp: new Date(), + contentTopic: TEST_CONTENT_TOPIC, + pubsubTopic: decoder.pubsubTopic, + payload: sdsMessageAfterSync.encode(), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + } + ]; + + // Setup generator to yield 3 messages, but should stop after 2nd + queryGeneratorStub.callsFake(async function* () { + yield [Promise.resolve(messages[0])]; + yield [Promise.resolve(messages[1])]; + yield [Promise.resolve(messages[2])]; + }); + + const reliableChannel = await ReliableChannel.create( + mockWakuNode, + channelId, + senderId, + encoder, + decoder + ); + + await delay(50); + + // Trigger query on connect + mockPeerManagerEvents.dispatchEvent( + new CustomEvent("store:connect", { detail: mockPeerId }) + ); + + await delay(200); + + // queryGenerator should have been called + expect(queryGeneratorStub.called).to.be.true; + // The query should have stopped after finding sync message from same channel + expect(reliableChannel).to.not.be.undefined; + }); + + it("should stop query on content message from same channel", async () => { + const channelId = "testChannel"; + const senderId = "testSender"; + + const sdsContentMessage = new ContentMessage( + "msg1", + channelId, + senderId, + [{ messageId: "previous-msg-id" }], + 1, + undefined, + utf8ToBytes("content message") + ); + + const sdsMessageAfter = new ContentMessage( + "msg2", + channelId, + senderId, + [], + 2, + undefined, + utf8ToBytes("after content") + ); + + const messages: IDecodedMessage[] = [ + { + hash: hexToBytes("1111"), + hashStr: "1111", + version: 1, + timestamp: new Date(), + contentTopic: TEST_CONTENT_TOPIC, + pubsubTopic: decoder.pubsubTopic, + payload: sdsContentMessage.encode(), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + }, + { + hash: hexToBytes("2222"), + hashStr: "2222", + version: 1, + timestamp: new Date(), + contentTopic: TEST_CONTENT_TOPIC, + pubsubTopic: decoder.pubsubTopic, + payload: sdsMessageAfter.encode(), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + } + ]; + + let pagesYielded = 0; + queryGeneratorStub.callsFake(async function* () { + pagesYielded++; + yield [Promise.resolve(messages[0])]; + pagesYielded++; + yield [Promise.resolve(messages[1])]; + }); + + const reliableChannel = await ReliableChannel.create( + mockWakuNode, + channelId, + senderId, + encoder, + decoder + ); + + await delay(50); + + mockPeerManagerEvents.dispatchEvent( + new CustomEvent("store:connect", { detail: mockPeerId }) + ); + + await delay(200); + + expect(queryGeneratorStub.called).to.be.true; + expect(reliableChannel).to.not.be.undefined; + // Should have stopped after first page with content message + expect(pagesYielded).to.equal(1); + }); + + it("should continue query when messages are from different channels", async () => { + const channelId = "testChannel"; + const senderId = "testSender"; + + const sdsMessageDifferent1 = new ContentMessage( + "msg1", + "differentChannel1", + senderId, + [], + 1, + undefined, + utf8ToBytes("different 1") + ); + + const sdsMessageDifferent2 = new ContentMessage( + "msg2", + "differentChannel2", + senderId, + [], + 2, + undefined, + utf8ToBytes("different 2") + ); + + const sdsMessageDifferent3 = new ContentMessage( + "msg3", + "differentChannel3", + senderId, + [], + 3, + undefined, + utf8ToBytes("different 3") + ); + + const messages: IDecodedMessage[] = [ + { + hash: hexToBytes("1111"), + hashStr: "1111", + version: 1, + timestamp: new Date(), + contentTopic: TEST_CONTENT_TOPIC, + pubsubTopic: decoder.pubsubTopic, + payload: sdsMessageDifferent1.encode(), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + }, + { + hash: hexToBytes("2222"), + hashStr: "2222", + version: 1, + timestamp: new Date(), + contentTopic: TEST_CONTENT_TOPIC, + pubsubTopic: decoder.pubsubTopic, + payload: sdsMessageDifferent2.encode(), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + }, + { + hash: hexToBytes("3333"), + hashStr: "3333", + version: 1, + timestamp: new Date(), + contentTopic: TEST_CONTENT_TOPIC, + pubsubTopic: decoder.pubsubTopic, + payload: sdsMessageDifferent3.encode(), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + } + ]; + + let pagesYielded = 0; + queryGeneratorStub.callsFake(async function* () { + pagesYielded++; + yield [Promise.resolve(messages[0])]; + pagesYielded++; + yield [Promise.resolve(messages[1])]; + pagesYielded++; + yield [Promise.resolve(messages[2])]; + }); + + const reliableChannel = await ReliableChannel.create( + mockWakuNode, + channelId, + senderId, + encoder, + decoder + ); + + await delay(50); + + mockPeerManagerEvents.dispatchEvent( + new CustomEvent("store:connect", { detail: mockPeerId }) + ); + + await delay(200); + + expect(queryGeneratorStub.called).to.be.true; + expect(reliableChannel).to.not.be.undefined; + // Should have processed all pages since no matching channel + expect(pagesYielded).to.equal(3); + }); + }); + + describe("isChannelMessageWithCausalHistory predicate", () => { + let mockWakuNode: MockWakuNode; + let reliableChannel: ReliableChannel; + let encoder: IEncoder; + let decoder: IDecoder; + + beforeEach(async () => { + mockWakuNode = new MockWakuNode(); + encoder = createEncoder({ + contentTopic: TEST_CONTENT_TOPIC, + routingInfo: TEST_ROUTING_INFO + }); + decoder = createDecoder(TEST_CONTENT_TOPIC, TEST_ROUTING_INFO); + + reliableChannel = await ReliableChannel.create( + mockWakuNode, + "testChannel", + "testSender", + encoder, + decoder, + { queryOnConnect: false } + ); + }); + + it("should return false for malformed SDS messages", () => { + const msg = { + payload: new Uint8Array([1, 2, 3]) + } as IDecodedMessage; + + const result = reliableChannel["isChannelMessageWithCausalHistory"](msg); + expect(result).to.be.false; + }); + + it("should return false for different channelId", () => { + const sdsMsg = new ContentMessage( + "msg1", + "differentChannel", + "sender", + [], + 1, + undefined, + utf8ToBytes("content") + ); + + const msg = { + payload: sdsMsg.encode() + } as IDecodedMessage; + + const result = reliableChannel["isChannelMessageWithCausalHistory"](msg); + expect(result).to.be.false; + }); + + it("should return false for sync message without causal history", () => { + const syncMsg = new SyncMessage( + "sync-msg-id", + "testChannel", + "sender", + [], + 1, + undefined, + undefined + ); + + const msg = { + payload: syncMsg.encode() + } as IDecodedMessage; + + const result = reliableChannel["isChannelMessageWithCausalHistory"](msg); + expect(result).to.be.false; + }); + + it("should return false for content message without causal history", () => { + const contentMsg = new ContentMessage( + "msg1", + "testChannel", + "sender", + [], + 1, + undefined, + utf8ToBytes("content") + ); + + const msg = { + payload: contentMsg.encode() + } as IDecodedMessage; + + const result = reliableChannel["isChannelMessageWithCausalHistory"](msg); + expect(result).to.be.false; + }); + + it("should return true for message with causal history", () => { + const contentMsg = new ContentMessage( + "msg1", + "testChannel", + "sender", + [{ messageId: "previous-msg-id" }], + 1, + undefined, + utf8ToBytes("content") + ); + + const msg = { + payload: contentMsg.encode() + } as IDecodedMessage; + + const result = reliableChannel["isChannelMessageWithCausalHistory"](msg); + expect(result).to.be.true; + }); + + it("should return true for sync message with causal history", () => { + const syncMsg = new SyncMessage( + "sync-msg-id", + "testChannel", + "sender", + [{ messageId: "previous-msg-id" }], + 1, + undefined, + undefined + ); + + const msg = { + payload: syncMsg.encode() + } as IDecodedMessage; + + const result = reliableChannel["isChannelMessageWithCausalHistory"](msg); + expect(result).to.be.true; + }); + }); }); diff --git a/packages/sdk/src/reliable_channel/reliable_channel.ts b/packages/sdk/src/reliable_channel/reliable_channel.ts index 713309b90f..49b55aa495 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel.ts @@ -185,9 +185,9 @@ export class ReliableChannel< peerManagerEvents !== undefined && (options?.queryOnConnect ?? true) ) { - log.info("auto-query enabled"); this.queryOnConnect = new QueryOnConnect( [this.decoder], + this.isChannelMessageWithCausalHistory.bind(this), peerManagerEvents, node.events, this._retrieve.bind(this) @@ -580,6 +580,21 @@ export class ReliableChannel< this.messageChannel.sweepOutgoingBuffer(); } + private isChannelMessageWithCausalHistory(msg: T): boolean { + // TODO: we do end-up decoding messages twice as this is used to stop store queries. + const sdsMessage = SdsMessage.decode(msg.payload); + + if (!sdsMessage) { + return false; + } + + if (sdsMessage.channelId !== this.messageChannel.channelId) { + return false; + } + + return sdsMessage.causalHistory && sdsMessage.causalHistory.length > 0; + } + private setupEventListeners(): void { this.messageChannel.addEventListener( MessageChannelEvent.OutMessageSent, diff --git a/packages/sds/src/message_channel/message.ts b/packages/sds/src/message_channel/message.ts index e186124750..638f6c71a0 100644 --- a/packages/sds/src/message_channel/message.ts +++ b/packages/sds/src/message_channel/message.ts @@ -30,56 +30,60 @@ export class Message implements proto_sds_message.SdsMessage { public static decode( data: Uint8Array ): undefined | ContentMessage | SyncMessage | EphemeralMessage { - const { - messageId, - channelId, - senderId, - causalHistory, - lamportTimestamp, - bloomFilter, - content - } = proto_sds_message.SdsMessage.decode(data); - - if (testContentMessage({ lamportTimestamp, content })) { - return new ContentMessage( + try { + const { messageId, channelId, senderId, causalHistory, - lamportTimestamp!, + lamportTimestamp, bloomFilter, - content! - ); - } + content + } = proto_sds_message.SdsMessage.decode(data); - if (testEphemeralMessage({ lamportTimestamp, content })) { - return new EphemeralMessage( - messageId, - channelId, - senderId, - causalHistory, - undefined, - bloomFilter, - content! - ); - } + if (testContentMessage({ lamportTimestamp, content })) { + return new ContentMessage( + messageId, + channelId, + senderId, + causalHistory, + lamportTimestamp!, + bloomFilter, + content! + ); + } - if (testSyncMessage({ lamportTimestamp, content })) { - return new SyncMessage( - messageId, - channelId, - senderId, - causalHistory, - lamportTimestamp!, - bloomFilter, - undefined + if (testEphemeralMessage({ lamportTimestamp, content })) { + return new EphemeralMessage( + messageId, + channelId, + senderId, + causalHistory, + undefined, + bloomFilter, + content! + ); + } + + if (testSyncMessage({ lamportTimestamp, content })) { + return new SyncMessage( + messageId, + channelId, + senderId, + causalHistory, + lamportTimestamp!, + bloomFilter, + undefined + ); + } + log.error( + "message received was of unknown type", + lamportTimestamp, + content ); + } catch (err) { + log.error("failed to decode sds message", err); } - log.error( - "message received was of unknown type", - lamportTimestamp, - content - ); return undefined; } }