mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-01-02 13:53:12 +00:00
feat: reliable channels search up to 30 days to find message (#2657)
* feat: query on connect stops on predicate * test: query on connect stops at predicate * feat: reliable channels search up to 30 days to find message Queries stop once a valid sync or content message is found in the channel. * fix: protect against decoding exceptions * stop range queries on messages with a causal history
This commit is contained in:
parent
bbcfc94879
commit
593bc45225
@ -95,6 +95,7 @@ describe("QueryOnConnect", () => {
|
||||
it("should create QueryOnConnect instance with all required parameters", () => {
|
||||
queryOnConnect = new QueryOnConnect(
|
||||
mockDecoders,
|
||||
() => false,
|
||||
mockPeerManagerEventEmitter,
|
||||
mockWakuEventEmitter,
|
||||
mockQueryGenerator,
|
||||
@ -108,6 +109,7 @@ describe("QueryOnConnect", () => {
|
||||
it("should create QueryOnConnect instance without options", () => {
|
||||
queryOnConnect = new QueryOnConnect(
|
||||
mockDecoders,
|
||||
() => false,
|
||||
mockPeerManagerEventEmitter,
|
||||
mockWakuEventEmitter,
|
||||
mockQueryGenerator
|
||||
@ -120,6 +122,7 @@ describe("QueryOnConnect", () => {
|
||||
it("should accept empty decoders array", () => {
|
||||
queryOnConnect = new QueryOnConnect(
|
||||
[],
|
||||
() => false,
|
||||
mockPeerManagerEventEmitter,
|
||||
mockWakuEventEmitter,
|
||||
mockQueryGenerator,
|
||||
@ -134,6 +137,7 @@ describe("QueryOnConnect", () => {
|
||||
beforeEach(() => {
|
||||
queryOnConnect = new QueryOnConnect(
|
||||
mockDecoders,
|
||||
() => false,
|
||||
mockPeerManagerEventEmitter,
|
||||
mockWakuEventEmitter,
|
||||
mockQueryGenerator,
|
||||
@ -173,6 +177,7 @@ describe("QueryOnConnect", () => {
|
||||
beforeEach(() => {
|
||||
queryOnConnect = new QueryOnConnect(
|
||||
mockDecoders,
|
||||
() => false,
|
||||
mockPeerManagerEventEmitter,
|
||||
mockWakuEventEmitter,
|
||||
mockQueryGenerator,
|
||||
@ -224,6 +229,7 @@ describe("QueryOnConnect", () => {
|
||||
|
||||
queryOnConnect = new QueryOnConnect(
|
||||
mockDecoders,
|
||||
() => false,
|
||||
mockPeerManagerEventEmitter,
|
||||
mockWakuEventEmitter,
|
||||
mockQueryGenerator,
|
||||
@ -276,6 +282,7 @@ describe("QueryOnConnect", () => {
|
||||
|
||||
queryOnConnect = new QueryOnConnect(
|
||||
mockDecoders,
|
||||
() => false,
|
||||
mockPeerManagerEventEmitter,
|
||||
mockWakuEventEmitter,
|
||||
mockQueryGenerator,
|
||||
@ -298,6 +305,7 @@ describe("QueryOnConnect", () => {
|
||||
|
||||
queryOnConnect = new QueryOnConnect(
|
||||
mockDecoders,
|
||||
() => false,
|
||||
mockPeerManagerEventEmitter,
|
||||
mockWakuEventEmitter,
|
||||
mockQueryGenerator,
|
||||
@ -320,6 +328,7 @@ describe("QueryOnConnect", () => {
|
||||
|
||||
queryOnConnect = new QueryOnConnect(
|
||||
mockDecoders,
|
||||
() => false,
|
||||
mockPeerManagerEventEmitter,
|
||||
mockWakuEventEmitter,
|
||||
mockQueryGenerator,
|
||||
@ -391,6 +400,7 @@ describe("QueryOnConnect", () => {
|
||||
|
||||
const queryOnConnect = new QueryOnConnect(
|
||||
mockDecoders,
|
||||
() => false,
|
||||
mockPeerManagerEventEmitter,
|
||||
mockWakuEventEmitter,
|
||||
mockQueryGenerator,
|
||||
@ -418,6 +428,7 @@ describe("QueryOnConnect", () => {
|
||||
|
||||
queryOnConnect = new QueryOnConnect(
|
||||
mockDecoders,
|
||||
() => false,
|
||||
mockPeerManagerEventEmitter,
|
||||
mockWakuEventEmitter,
|
||||
mockQueryGenerator,
|
||||
@ -473,6 +484,7 @@ describe("QueryOnConnect", () => {
|
||||
|
||||
queryOnConnect = new QueryOnConnect(
|
||||
mockDecoders,
|
||||
() => false,
|
||||
mockPeerManagerEventEmitter,
|
||||
mockWakuEventEmitter,
|
||||
mockQueryGenerator,
|
||||
@ -605,6 +617,7 @@ describe("QueryOnConnect", () => {
|
||||
|
||||
queryOnConnect = new QueryOnConnect(
|
||||
mockDecoders,
|
||||
() => false,
|
||||
mockPeerManagerEventEmitter,
|
||||
mockWakuEventEmitter,
|
||||
mockQueryGenerator,
|
||||
@ -750,6 +763,248 @@ describe("QueryOnConnect", () => {
|
||||
expect(mockQueryGenerator.calledTwice).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
describe("stopIfTrue predicate", () => {
|
||||
beforeEach(() => {
|
||||
mockPeerManagerEventEmitter.addEventListener = sinon.stub();
|
||||
mockWakuEventEmitter.addEventListener = sinon.stub();
|
||||
});
|
||||
|
||||
it("should stop query iteration when stopIfTrue returns true", async () => {
|
||||
const messages = [
|
||||
{
|
||||
hash: new Uint8Array(),
|
||||
hashStr: "msg1",
|
||||
version: 1,
|
||||
timestamp: new Date(),
|
||||
contentTopic: "/test/1/content",
|
||||
pubsubTopic: "/waku/2/default-waku/proto",
|
||||
payload: new Uint8Array([1]),
|
||||
rateLimitProof: undefined,
|
||||
ephemeral: false,
|
||||
meta: undefined
|
||||
},
|
||||
{
|
||||
hash: new Uint8Array(),
|
||||
hashStr: "stop-hash",
|
||||
version: 1,
|
||||
timestamp: new Date(),
|
||||
contentTopic: "/test/1/content",
|
||||
pubsubTopic: "/waku/2/default-waku/proto",
|
||||
payload: new Uint8Array([2]),
|
||||
rateLimitProof: undefined,
|
||||
ephemeral: false,
|
||||
meta: undefined
|
||||
},
|
||||
{
|
||||
hash: new Uint8Array(),
|
||||
hashStr: "msg3",
|
||||
version: 1,
|
||||
timestamp: new Date(),
|
||||
contentTopic: "/test/1/content",
|
||||
pubsubTopic: "/waku/2/default-waku/proto",
|
||||
payload: new Uint8Array([3]),
|
||||
rateLimitProof: undefined,
|
||||
ephemeral: false,
|
||||
meta: undefined
|
||||
}
|
||||
];
|
||||
|
||||
// Setup generator to yield 3 pages, stop should occur on page 2
|
||||
const mockAsyncGenerator = async function* (): AsyncGenerator<
|
||||
Promise<IDecodedMessage | undefined>[]
|
||||
> {
|
||||
yield [Promise.resolve(messages[0])];
|
||||
yield [Promise.resolve(messages[1])];
|
||||
yield [Promise.resolve(messages[2])];
|
||||
};
|
||||
mockQueryGenerator.returns(mockAsyncGenerator());
|
||||
|
||||
const stopPredicate = (msg: IDecodedMessage): boolean =>
|
||||
msg.hashStr === "stop-hash";
|
||||
|
||||
queryOnConnect = new QueryOnConnect(
|
||||
mockDecoders,
|
||||
stopPredicate,
|
||||
mockPeerManagerEventEmitter,
|
||||
mockWakuEventEmitter,
|
||||
mockQueryGenerator,
|
||||
options
|
||||
);
|
||||
|
||||
const receivedMessages: IDecodedMessage[] = [];
|
||||
queryOnConnect.addEventListener(
|
||||
QueryOnConnectEvent.MessagesRetrieved,
|
||||
(event: CustomEvent<IDecodedMessage[]>) => {
|
||||
receivedMessages.push(...event.detail);
|
||||
}
|
||||
);
|
||||
|
||||
queryOnConnect.start();
|
||||
await queryOnConnect["maybeQuery"](mockPeerId);
|
||||
|
||||
// Should have received messages from first 2 pages only
|
||||
expect(receivedMessages).to.have.length(2);
|
||||
expect(receivedMessages[0].hashStr).to.equal("msg1");
|
||||
expect(receivedMessages[1].hashStr).to.equal("stop-hash");
|
||||
});
|
||||
|
||||
it("should process all pages when stopIfTrue never returns true", async () => {
|
||||
const messages = [
|
||||
{
|
||||
hash: new Uint8Array(),
|
||||
hashStr: "msg1",
|
||||
version: 1,
|
||||
timestamp: new Date(),
|
||||
contentTopic: "/test/1/content",
|
||||
pubsubTopic: "/waku/2/default-waku/proto",
|
||||
payload: new Uint8Array([1]),
|
||||
rateLimitProof: undefined,
|
||||
ephemeral: false,
|
||||
meta: undefined
|
||||
},
|
||||
{
|
||||
hash: new Uint8Array(),
|
||||
hashStr: "msg2",
|
||||
version: 1,
|
||||
timestamp: new Date(),
|
||||
contentTopic: "/test/1/content",
|
||||
pubsubTopic: "/waku/2/default-waku/proto",
|
||||
payload: new Uint8Array([2]),
|
||||
rateLimitProof: undefined,
|
||||
ephemeral: false,
|
||||
meta: undefined
|
||||
},
|
||||
{
|
||||
hash: new Uint8Array(),
|
||||
hashStr: "msg3",
|
||||
version: 1,
|
||||
timestamp: new Date(),
|
||||
contentTopic: "/test/1/content",
|
||||
pubsubTopic: "/waku/2/default-waku/proto",
|
||||
payload: new Uint8Array([3]),
|
||||
rateLimitProof: undefined,
|
||||
ephemeral: false,
|
||||
meta: undefined
|
||||
}
|
||||
];
|
||||
|
||||
const mockAsyncGenerator = async function* (): AsyncGenerator<
|
||||
Promise<IDecodedMessage | undefined>[]
|
||||
> {
|
||||
yield [Promise.resolve(messages[0])];
|
||||
yield [Promise.resolve(messages[1])];
|
||||
yield [Promise.resolve(messages[2])];
|
||||
};
|
||||
mockQueryGenerator.returns(mockAsyncGenerator());
|
||||
|
||||
const stopPredicate = (): boolean => false;
|
||||
|
||||
queryOnConnect = new QueryOnConnect(
|
||||
mockDecoders,
|
||||
stopPredicate,
|
||||
mockPeerManagerEventEmitter,
|
||||
mockWakuEventEmitter,
|
||||
mockQueryGenerator,
|
||||
options
|
||||
);
|
||||
|
||||
const receivedMessages: IDecodedMessage[] = [];
|
||||
queryOnConnect.addEventListener(
|
||||
QueryOnConnectEvent.MessagesRetrieved,
|
||||
(event: CustomEvent<IDecodedMessage[]>) => {
|
||||
receivedMessages.push(...event.detail);
|
||||
}
|
||||
);
|
||||
|
||||
queryOnConnect.start();
|
||||
await queryOnConnect["maybeQuery"](mockPeerId);
|
||||
|
||||
// Should have received all 3 messages
|
||||
expect(receivedMessages).to.have.length(3);
|
||||
});
|
||||
|
||||
it("should stop on first message of a page if stopIfTrue matches", async () => {
|
||||
const messages = [
|
||||
{
|
||||
hash: new Uint8Array(),
|
||||
hashStr: "stop-hash",
|
||||
version: 1,
|
||||
timestamp: new Date(),
|
||||
contentTopic: "/test/1/content",
|
||||
pubsubTopic: "/waku/2/default-waku/proto",
|
||||
payload: new Uint8Array([1]),
|
||||
rateLimitProof: undefined,
|
||||
ephemeral: false,
|
||||
meta: undefined
|
||||
},
|
||||
{
|
||||
hash: new Uint8Array(),
|
||||
hashStr: "msg2",
|
||||
version: 1,
|
||||
timestamp: new Date(),
|
||||
contentTopic: "/test/1/content",
|
||||
pubsubTopic: "/waku/2/default-waku/proto",
|
||||
payload: new Uint8Array([2]),
|
||||
rateLimitProof: undefined,
|
||||
ephemeral: false,
|
||||
meta: undefined
|
||||
},
|
||||
{
|
||||
hash: new Uint8Array(),
|
||||
hashStr: "msg3",
|
||||
version: 1,
|
||||
timestamp: new Date(),
|
||||
contentTopic: "/test/1/content",
|
||||
pubsubTopic: "/waku/2/default-waku/proto",
|
||||
payload: new Uint8Array([3]),
|
||||
rateLimitProof: undefined,
|
||||
ephemeral: false,
|
||||
meta: undefined
|
||||
}
|
||||
];
|
||||
|
||||
const mockAsyncGenerator = async function* (): AsyncGenerator<
|
||||
Promise<IDecodedMessage | undefined>[]
|
||||
> {
|
||||
yield [
|
||||
Promise.resolve(messages[0]),
|
||||
Promise.resolve(messages[1]),
|
||||
Promise.resolve(messages[2])
|
||||
];
|
||||
};
|
||||
mockQueryGenerator.returns(mockAsyncGenerator());
|
||||
|
||||
const stopPredicate = (msg: IDecodedMessage): boolean =>
|
||||
msg.hashStr === "stop-hash";
|
||||
|
||||
queryOnConnect = new QueryOnConnect(
|
||||
mockDecoders,
|
||||
stopPredicate,
|
||||
mockPeerManagerEventEmitter,
|
||||
mockWakuEventEmitter,
|
||||
mockQueryGenerator,
|
||||
options
|
||||
);
|
||||
|
||||
const receivedMessages: IDecodedMessage[] = [];
|
||||
queryOnConnect.addEventListener(
|
||||
QueryOnConnectEvent.MessagesRetrieved,
|
||||
(event: CustomEvent<IDecodedMessage[]>) => {
|
||||
receivedMessages.push(...event.detail);
|
||||
}
|
||||
);
|
||||
|
||||
queryOnConnect.start();
|
||||
await queryOnConnect["maybeQuery"](mockPeerId);
|
||||
|
||||
// Should have received all 3 messages from the page, even though first matched
|
||||
expect(receivedMessages).to.have.length(3);
|
||||
expect(receivedMessages[0].hashStr).to.equal("stop-hash");
|
||||
expect(receivedMessages[1].hashStr).to.equal("msg2");
|
||||
expect(receivedMessages[2].hashStr).to.equal("msg3");
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("calculateTimeRange", () => {
|
||||
|
||||
@ -17,7 +17,7 @@ import {
|
||||
const log = new Logger("sdk:query-on-connect");
|
||||
|
||||
export const DEFAULT_FORCE_QUERY_THRESHOLD_MS = 5 * 60 * 1000; // 5 minutes
|
||||
export const MAX_TIME_RANGE_QUERY_MS = 24 * 60 * 60 * 1000; // 24 hours
|
||||
export const MAX_TIME_RANGE_QUERY_MS = 30 * 24 * 60 * 60 * 1000; // 30 days (queries are split)
|
||||
|
||||
export interface QueryOnConnectOptions {
|
||||
/**
|
||||
@ -54,6 +54,7 @@ export class QueryOnConnect<
|
||||
|
||||
public constructor(
|
||||
public decoders: IDecoder<T>[],
|
||||
public stopIfTrue: (msg: T) => boolean,
|
||||
private readonly peerManagerEventEmitter: TypedEventEmitter<IPeerManagerEvents>,
|
||||
private readonly wakuEventEmitter: IWakuEventEmitter,
|
||||
private readonly _queryGenerator: <T extends IDecodedMessage>(
|
||||
@ -125,8 +126,13 @@ export class QueryOnConnect<
|
||||
const messages = (await Promise.all(page)).filter(
|
||||
(m) => m !== undefined
|
||||
);
|
||||
const stop = messages.some((msg: T) => this.stopIfTrue(msg));
|
||||
// Bundle the messages to help batch process by sds
|
||||
this.dispatchMessages(messages);
|
||||
|
||||
if (stop) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Didn't throw, so it didn't fail
|
||||
|
||||
@ -13,7 +13,7 @@ import {
|
||||
LightPushSDKResult,
|
||||
QueryRequestParams
|
||||
} from "@waku/interfaces";
|
||||
import { ContentMessage } from "@waku/sds";
|
||||
import { ContentMessage, SyncMessage } from "@waku/sds";
|
||||
import {
|
||||
createRoutingInfo,
|
||||
delay,
|
||||
@ -678,4 +678,456 @@ describe("Reliable Channel", () => {
|
||||
expect(queryGeneratorStub.called).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
describe("stopIfTrue Integration with QueryOnConnect", () => {
|
||||
let mockWakuNode: MockWakuNode;
|
||||
let encoder: IEncoder;
|
||||
let decoder: IDecoder<IDecodedMessage>;
|
||||
let mockPeerManagerEvents: TypedEventEmitter<any>;
|
||||
let queryGeneratorStub: sinon.SinonStub;
|
||||
let mockPeerId: PeerId;
|
||||
|
||||
beforeEach(async () => {
|
||||
mockWakuNode = new MockWakuNode();
|
||||
mockPeerManagerEvents = new TypedEventEmitter();
|
||||
(mockWakuNode as any).peerManager = {
|
||||
events: mockPeerManagerEvents
|
||||
};
|
||||
|
||||
encoder = createEncoder({
|
||||
contentTopic: TEST_CONTENT_TOPIC,
|
||||
routingInfo: TEST_ROUTING_INFO
|
||||
});
|
||||
|
||||
decoder = createDecoder(TEST_CONTENT_TOPIC, TEST_ROUTING_INFO);
|
||||
|
||||
queryGeneratorStub = sinon.stub();
|
||||
mockWakuNode.store = {
|
||||
queryGenerator: queryGeneratorStub
|
||||
} as any;
|
||||
|
||||
mockPeerId = {
|
||||
toString: () => "QmTestPeerId"
|
||||
} as unknown as PeerId;
|
||||
});
|
||||
|
||||
it("should stop query when sync message from same channel is found", async () => {
|
||||
const channelId = "testChannel";
|
||||
const senderId = "testSender";
|
||||
|
||||
// Create messages: one from different channel, one sync from same channel, one more
|
||||
const sdsMessageDifferentChannel = new ContentMessage(
|
||||
"msg1",
|
||||
"differentChannel",
|
||||
senderId,
|
||||
[],
|
||||
1,
|
||||
undefined,
|
||||
utf8ToBytes("different channel")
|
||||
);
|
||||
|
||||
const sdsSyncMessage = new SyncMessage(
|
||||
"sync-msg-id",
|
||||
channelId,
|
||||
senderId,
|
||||
[],
|
||||
2,
|
||||
undefined,
|
||||
undefined
|
||||
);
|
||||
|
||||
const sdsMessageAfterSync = new ContentMessage(
|
||||
"msg3",
|
||||
channelId,
|
||||
senderId,
|
||||
[],
|
||||
3,
|
||||
undefined,
|
||||
utf8ToBytes("after sync")
|
||||
);
|
||||
|
||||
const messages: IDecodedMessage[] = [
|
||||
{
|
||||
hash: hexToBytes("1111"),
|
||||
hashStr: "1111",
|
||||
version: 1,
|
||||
timestamp: new Date(),
|
||||
contentTopic: TEST_CONTENT_TOPIC,
|
||||
pubsubTopic: decoder.pubsubTopic,
|
||||
payload: sdsMessageDifferentChannel.encode(),
|
||||
rateLimitProof: undefined,
|
||||
ephemeral: false,
|
||||
meta: undefined
|
||||
},
|
||||
{
|
||||
hash: hexToBytes("2222"),
|
||||
hashStr: "2222",
|
||||
version: 1,
|
||||
timestamp: new Date(),
|
||||
contentTopic: TEST_CONTENT_TOPIC,
|
||||
pubsubTopic: decoder.pubsubTopic,
|
||||
payload: sdsSyncMessage.encode(),
|
||||
rateLimitProof: undefined,
|
||||
ephemeral: false,
|
||||
meta: undefined
|
||||
},
|
||||
{
|
||||
hash: hexToBytes("3333"),
|
||||
hashStr: "3333",
|
||||
version: 1,
|
||||
timestamp: new Date(),
|
||||
contentTopic: TEST_CONTENT_TOPIC,
|
||||
pubsubTopic: decoder.pubsubTopic,
|
||||
payload: sdsMessageAfterSync.encode(),
|
||||
rateLimitProof: undefined,
|
||||
ephemeral: false,
|
||||
meta: undefined
|
||||
}
|
||||
];
|
||||
|
||||
// Setup generator to yield 3 messages, but should stop after 2nd
|
||||
queryGeneratorStub.callsFake(async function* () {
|
||||
yield [Promise.resolve(messages[0])];
|
||||
yield [Promise.resolve(messages[1])];
|
||||
yield [Promise.resolve(messages[2])];
|
||||
});
|
||||
|
||||
const reliableChannel = await ReliableChannel.create(
|
||||
mockWakuNode,
|
||||
channelId,
|
||||
senderId,
|
||||
encoder,
|
||||
decoder
|
||||
);
|
||||
|
||||
await delay(50);
|
||||
|
||||
// Trigger query on connect
|
||||
mockPeerManagerEvents.dispatchEvent(
|
||||
new CustomEvent("store:connect", { detail: mockPeerId })
|
||||
);
|
||||
|
||||
await delay(200);
|
||||
|
||||
// queryGenerator should have been called
|
||||
expect(queryGeneratorStub.called).to.be.true;
|
||||
// The query should have stopped after finding sync message from same channel
|
||||
expect(reliableChannel).to.not.be.undefined;
|
||||
});
|
||||
|
||||
it("should stop query on content message from same channel", async () => {
|
||||
const channelId = "testChannel";
|
||||
const senderId = "testSender";
|
||||
|
||||
const sdsContentMessage = new ContentMessage(
|
||||
"msg1",
|
||||
channelId,
|
||||
senderId,
|
||||
[{ messageId: "previous-msg-id" }],
|
||||
1,
|
||||
undefined,
|
||||
utf8ToBytes("content message")
|
||||
);
|
||||
|
||||
const sdsMessageAfter = new ContentMessage(
|
||||
"msg2",
|
||||
channelId,
|
||||
senderId,
|
||||
[],
|
||||
2,
|
||||
undefined,
|
||||
utf8ToBytes("after content")
|
||||
);
|
||||
|
||||
const messages: IDecodedMessage[] = [
|
||||
{
|
||||
hash: hexToBytes("1111"),
|
||||
hashStr: "1111",
|
||||
version: 1,
|
||||
timestamp: new Date(),
|
||||
contentTopic: TEST_CONTENT_TOPIC,
|
||||
pubsubTopic: decoder.pubsubTopic,
|
||||
payload: sdsContentMessage.encode(),
|
||||
rateLimitProof: undefined,
|
||||
ephemeral: false,
|
||||
meta: undefined
|
||||
},
|
||||
{
|
||||
hash: hexToBytes("2222"),
|
||||
hashStr: "2222",
|
||||
version: 1,
|
||||
timestamp: new Date(),
|
||||
contentTopic: TEST_CONTENT_TOPIC,
|
||||
pubsubTopic: decoder.pubsubTopic,
|
||||
payload: sdsMessageAfter.encode(),
|
||||
rateLimitProof: undefined,
|
||||
ephemeral: false,
|
||||
meta: undefined
|
||||
}
|
||||
];
|
||||
|
||||
let pagesYielded = 0;
|
||||
queryGeneratorStub.callsFake(async function* () {
|
||||
pagesYielded++;
|
||||
yield [Promise.resolve(messages[0])];
|
||||
pagesYielded++;
|
||||
yield [Promise.resolve(messages[1])];
|
||||
});
|
||||
|
||||
const reliableChannel = await ReliableChannel.create(
|
||||
mockWakuNode,
|
||||
channelId,
|
||||
senderId,
|
||||
encoder,
|
||||
decoder
|
||||
);
|
||||
|
||||
await delay(50);
|
||||
|
||||
mockPeerManagerEvents.dispatchEvent(
|
||||
new CustomEvent("store:connect", { detail: mockPeerId })
|
||||
);
|
||||
|
||||
await delay(200);
|
||||
|
||||
expect(queryGeneratorStub.called).to.be.true;
|
||||
expect(reliableChannel).to.not.be.undefined;
|
||||
// Should have stopped after first page with content message
|
||||
expect(pagesYielded).to.equal(1);
|
||||
});
|
||||
|
||||
it("should continue query when messages are from different channels", async () => {
|
||||
const channelId = "testChannel";
|
||||
const senderId = "testSender";
|
||||
|
||||
const sdsMessageDifferent1 = new ContentMessage(
|
||||
"msg1",
|
||||
"differentChannel1",
|
||||
senderId,
|
||||
[],
|
||||
1,
|
||||
undefined,
|
||||
utf8ToBytes("different 1")
|
||||
);
|
||||
|
||||
const sdsMessageDifferent2 = new ContentMessage(
|
||||
"msg2",
|
||||
"differentChannel2",
|
||||
senderId,
|
||||
[],
|
||||
2,
|
||||
undefined,
|
||||
utf8ToBytes("different 2")
|
||||
);
|
||||
|
||||
const sdsMessageDifferent3 = new ContentMessage(
|
||||
"msg3",
|
||||
"differentChannel3",
|
||||
senderId,
|
||||
[],
|
||||
3,
|
||||
undefined,
|
||||
utf8ToBytes("different 3")
|
||||
);
|
||||
|
||||
const messages: IDecodedMessage[] = [
|
||||
{
|
||||
hash: hexToBytes("1111"),
|
||||
hashStr: "1111",
|
||||
version: 1,
|
||||
timestamp: new Date(),
|
||||
contentTopic: TEST_CONTENT_TOPIC,
|
||||
pubsubTopic: decoder.pubsubTopic,
|
||||
payload: sdsMessageDifferent1.encode(),
|
||||
rateLimitProof: undefined,
|
||||
ephemeral: false,
|
||||
meta: undefined
|
||||
},
|
||||
{
|
||||
hash: hexToBytes("2222"),
|
||||
hashStr: "2222",
|
||||
version: 1,
|
||||
timestamp: new Date(),
|
||||
contentTopic: TEST_CONTENT_TOPIC,
|
||||
pubsubTopic: decoder.pubsubTopic,
|
||||
payload: sdsMessageDifferent2.encode(),
|
||||
rateLimitProof: undefined,
|
||||
ephemeral: false,
|
||||
meta: undefined
|
||||
},
|
||||
{
|
||||
hash: hexToBytes("3333"),
|
||||
hashStr: "3333",
|
||||
version: 1,
|
||||
timestamp: new Date(),
|
||||
contentTopic: TEST_CONTENT_TOPIC,
|
||||
pubsubTopic: decoder.pubsubTopic,
|
||||
payload: sdsMessageDifferent3.encode(),
|
||||
rateLimitProof: undefined,
|
||||
ephemeral: false,
|
||||
meta: undefined
|
||||
}
|
||||
];
|
||||
|
||||
let pagesYielded = 0;
|
||||
queryGeneratorStub.callsFake(async function* () {
|
||||
pagesYielded++;
|
||||
yield [Promise.resolve(messages[0])];
|
||||
pagesYielded++;
|
||||
yield [Promise.resolve(messages[1])];
|
||||
pagesYielded++;
|
||||
yield [Promise.resolve(messages[2])];
|
||||
});
|
||||
|
||||
const reliableChannel = await ReliableChannel.create(
|
||||
mockWakuNode,
|
||||
channelId,
|
||||
senderId,
|
||||
encoder,
|
||||
decoder
|
||||
);
|
||||
|
||||
await delay(50);
|
||||
|
||||
mockPeerManagerEvents.dispatchEvent(
|
||||
new CustomEvent("store:connect", { detail: mockPeerId })
|
||||
);
|
||||
|
||||
await delay(200);
|
||||
|
||||
expect(queryGeneratorStub.called).to.be.true;
|
||||
expect(reliableChannel).to.not.be.undefined;
|
||||
// Should have processed all pages since no matching channel
|
||||
expect(pagesYielded).to.equal(3);
|
||||
});
|
||||
});
|
||||
|
||||
describe("isChannelMessageWithCausalHistory predicate", () => {
|
||||
let mockWakuNode: MockWakuNode;
|
||||
let reliableChannel: ReliableChannel<IDecodedMessage>;
|
||||
let encoder: IEncoder;
|
||||
let decoder: IDecoder<IDecodedMessage>;
|
||||
|
||||
beforeEach(async () => {
|
||||
mockWakuNode = new MockWakuNode();
|
||||
encoder = createEncoder({
|
||||
contentTopic: TEST_CONTENT_TOPIC,
|
||||
routingInfo: TEST_ROUTING_INFO
|
||||
});
|
||||
decoder = createDecoder(TEST_CONTENT_TOPIC, TEST_ROUTING_INFO);
|
||||
|
||||
reliableChannel = await ReliableChannel.create(
|
||||
mockWakuNode,
|
||||
"testChannel",
|
||||
"testSender",
|
||||
encoder,
|
||||
decoder,
|
||||
{ queryOnConnect: false }
|
||||
);
|
||||
});
|
||||
|
||||
it("should return false for malformed SDS messages", () => {
|
||||
const msg = {
|
||||
payload: new Uint8Array([1, 2, 3])
|
||||
} as IDecodedMessage;
|
||||
|
||||
const result = reliableChannel["isChannelMessageWithCausalHistory"](msg);
|
||||
expect(result).to.be.false;
|
||||
});
|
||||
|
||||
it("should return false for different channelId", () => {
|
||||
const sdsMsg = new ContentMessage(
|
||||
"msg1",
|
||||
"differentChannel",
|
||||
"sender",
|
||||
[],
|
||||
1,
|
||||
undefined,
|
||||
utf8ToBytes("content")
|
||||
);
|
||||
|
||||
const msg = {
|
||||
payload: sdsMsg.encode()
|
||||
} as IDecodedMessage;
|
||||
|
||||
const result = reliableChannel["isChannelMessageWithCausalHistory"](msg);
|
||||
expect(result).to.be.false;
|
||||
});
|
||||
|
||||
it("should return false for sync message without causal history", () => {
|
||||
const syncMsg = new SyncMessage(
|
||||
"sync-msg-id",
|
||||
"testChannel",
|
||||
"sender",
|
||||
[],
|
||||
1,
|
||||
undefined,
|
||||
undefined
|
||||
);
|
||||
|
||||
const msg = {
|
||||
payload: syncMsg.encode()
|
||||
} as IDecodedMessage;
|
||||
|
||||
const result = reliableChannel["isChannelMessageWithCausalHistory"](msg);
|
||||
expect(result).to.be.false;
|
||||
});
|
||||
|
||||
it("should return false for content message without causal history", () => {
|
||||
const contentMsg = new ContentMessage(
|
||||
"msg1",
|
||||
"testChannel",
|
||||
"sender",
|
||||
[],
|
||||
1,
|
||||
undefined,
|
||||
utf8ToBytes("content")
|
||||
);
|
||||
|
||||
const msg = {
|
||||
payload: contentMsg.encode()
|
||||
} as IDecodedMessage;
|
||||
|
||||
const result = reliableChannel["isChannelMessageWithCausalHistory"](msg);
|
||||
expect(result).to.be.false;
|
||||
});
|
||||
|
||||
it("should return true for message with causal history", () => {
|
||||
const contentMsg = new ContentMessage(
|
||||
"msg1",
|
||||
"testChannel",
|
||||
"sender",
|
||||
[{ messageId: "previous-msg-id" }],
|
||||
1,
|
||||
undefined,
|
||||
utf8ToBytes("content")
|
||||
);
|
||||
|
||||
const msg = {
|
||||
payload: contentMsg.encode()
|
||||
} as IDecodedMessage;
|
||||
|
||||
const result = reliableChannel["isChannelMessageWithCausalHistory"](msg);
|
||||
expect(result).to.be.true;
|
||||
});
|
||||
|
||||
it("should return true for sync message with causal history", () => {
|
||||
const syncMsg = new SyncMessage(
|
||||
"sync-msg-id",
|
||||
"testChannel",
|
||||
"sender",
|
||||
[{ messageId: "previous-msg-id" }],
|
||||
1,
|
||||
undefined,
|
||||
undefined
|
||||
);
|
||||
|
||||
const msg = {
|
||||
payload: syncMsg.encode()
|
||||
} as IDecodedMessage;
|
||||
|
||||
const result = reliableChannel["isChannelMessageWithCausalHistory"](msg);
|
||||
expect(result).to.be.true;
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@ -185,9 +185,9 @@ export class ReliableChannel<
|
||||
peerManagerEvents !== undefined &&
|
||||
(options?.queryOnConnect ?? true)
|
||||
) {
|
||||
log.info("auto-query enabled");
|
||||
this.queryOnConnect = new QueryOnConnect(
|
||||
[this.decoder],
|
||||
this.isChannelMessageWithCausalHistory.bind(this),
|
||||
peerManagerEvents,
|
||||
node.events,
|
||||
this._retrieve.bind(this)
|
||||
@ -580,6 +580,21 @@ export class ReliableChannel<
|
||||
this.messageChannel.sweepOutgoingBuffer();
|
||||
}
|
||||
|
||||
private isChannelMessageWithCausalHistory(msg: T): boolean {
|
||||
// TODO: we do end-up decoding messages twice as this is used to stop store queries.
|
||||
const sdsMessage = SdsMessage.decode(msg.payload);
|
||||
|
||||
if (!sdsMessage) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (sdsMessage.channelId !== this.messageChannel.channelId) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return sdsMessage.causalHistory && sdsMessage.causalHistory.length > 0;
|
||||
}
|
||||
|
||||
private setupEventListeners(): void {
|
||||
this.messageChannel.addEventListener(
|
||||
MessageChannelEvent.OutMessageSent,
|
||||
|
||||
@ -30,56 +30,60 @@ export class Message implements proto_sds_message.SdsMessage {
|
||||
public static decode(
|
||||
data: Uint8Array
|
||||
): undefined | ContentMessage | SyncMessage | EphemeralMessage {
|
||||
const {
|
||||
messageId,
|
||||
channelId,
|
||||
senderId,
|
||||
causalHistory,
|
||||
lamportTimestamp,
|
||||
bloomFilter,
|
||||
content
|
||||
} = proto_sds_message.SdsMessage.decode(data);
|
||||
|
||||
if (testContentMessage({ lamportTimestamp, content })) {
|
||||
return new ContentMessage(
|
||||
try {
|
||||
const {
|
||||
messageId,
|
||||
channelId,
|
||||
senderId,
|
||||
causalHistory,
|
||||
lamportTimestamp!,
|
||||
lamportTimestamp,
|
||||
bloomFilter,
|
||||
content!
|
||||
);
|
||||
}
|
||||
content
|
||||
} = proto_sds_message.SdsMessage.decode(data);
|
||||
|
||||
if (testEphemeralMessage({ lamportTimestamp, content })) {
|
||||
return new EphemeralMessage(
|
||||
messageId,
|
||||
channelId,
|
||||
senderId,
|
||||
causalHistory,
|
||||
undefined,
|
||||
bloomFilter,
|
||||
content!
|
||||
);
|
||||
}
|
||||
if (testContentMessage({ lamportTimestamp, content })) {
|
||||
return new ContentMessage(
|
||||
messageId,
|
||||
channelId,
|
||||
senderId,
|
||||
causalHistory,
|
||||
lamportTimestamp!,
|
||||
bloomFilter,
|
||||
content!
|
||||
);
|
||||
}
|
||||
|
||||
if (testSyncMessage({ lamportTimestamp, content })) {
|
||||
return new SyncMessage(
|
||||
messageId,
|
||||
channelId,
|
||||
senderId,
|
||||
causalHistory,
|
||||
lamportTimestamp!,
|
||||
bloomFilter,
|
||||
undefined
|
||||
if (testEphemeralMessage({ lamportTimestamp, content })) {
|
||||
return new EphemeralMessage(
|
||||
messageId,
|
||||
channelId,
|
||||
senderId,
|
||||
causalHistory,
|
||||
undefined,
|
||||
bloomFilter,
|
||||
content!
|
||||
);
|
||||
}
|
||||
|
||||
if (testSyncMessage({ lamportTimestamp, content })) {
|
||||
return new SyncMessage(
|
||||
messageId,
|
||||
channelId,
|
||||
senderId,
|
||||
causalHistory,
|
||||
lamportTimestamp!,
|
||||
bloomFilter,
|
||||
undefined
|
||||
);
|
||||
}
|
||||
log.error(
|
||||
"message received was of unknown type",
|
||||
lamportTimestamp,
|
||||
content
|
||||
);
|
||||
} catch (err) {
|
||||
log.error("failed to decode sds message", err);
|
||||
}
|
||||
log.error(
|
||||
"message received was of unknown type",
|
||||
lamportTimestamp,
|
||||
content
|
||||
);
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user