mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-01-07 16:23:09 +00:00
feat: introduce reliable channels (#2526)
* SDS: pushOutgoingMessage is actually sync * SDS: ensure that `ContentMessage` class is stored in local history with `valueOf` method * feat: introduce reliable channels Easy to use Scalable Data Sync (SDS, e2e reliability) wrapper, that includes: - store queries upon connection to store nodes - store queries to retrieve missing messages * remove `channel` prefix * attempt to improve performance when processing a lot of incoming messages * test: split test file * use index.ts for re-export only. * improve if condition * use getter for isStarted * waku node already auto-start * rename send * fix lightPush.send type post rebase * test: remove extra console.log * SDS: emit messages as missing as soon as they are received * make configurable elapse time for task process * typo * use string instead of enum for event types * ReliableChannel.send returns the message id
This commit is contained in:
parent
8f09f5fa5a
commit
4d5c152f5b
102
package-lock.json
generated
102
package-lock.json
generated
@ -13,10 +13,10 @@
|
|||||||
"packages/core",
|
"packages/core",
|
||||||
"packages/discovery",
|
"packages/discovery",
|
||||||
"packages/message-encryption",
|
"packages/message-encryption",
|
||||||
"packages/sdk",
|
|
||||||
"packages/relay",
|
|
||||||
"packages/sds",
|
"packages/sds",
|
||||||
"packages/rln",
|
"packages/rln",
|
||||||
|
"packages/sdk",
|
||||||
|
"packages/relay",
|
||||||
"packages/tests",
|
"packages/tests",
|
||||||
"packages/reliability-tests",
|
"packages/reliability-tests",
|
||||||
"packages/headless-tests",
|
"packages/headless-tests",
|
||||||
@ -37621,6 +37621,7 @@
|
|||||||
"@waku/discovery": "0.0.11",
|
"@waku/discovery": "0.0.11",
|
||||||
"@waku/interfaces": "0.0.33",
|
"@waku/interfaces": "0.0.33",
|
||||||
"@waku/proto": "^0.0.13",
|
"@waku/proto": "^0.0.13",
|
||||||
|
"@waku/sds": "^0.0.6",
|
||||||
"@waku/utils": "0.0.26",
|
"@waku/utils": "0.0.26",
|
||||||
"libp2p": "2.8.11",
|
"libp2p": "2.8.11",
|
||||||
"lodash.debounce": "^4.0.8"
|
"lodash.debounce": "^4.0.8"
|
||||||
@ -37634,6 +37635,7 @@
|
|||||||
"@types/chai": "^4.3.11",
|
"@types/chai": "^4.3.11",
|
||||||
"@types/mocha": "^10.0.9",
|
"@types/mocha": "^10.0.9",
|
||||||
"@waku/build-utils": "*",
|
"@waku/build-utils": "*",
|
||||||
|
"@waku/message-encryption": "^0.0.36",
|
||||||
"chai": "^5.1.1",
|
"chai": "^5.1.1",
|
||||||
"cspell": "^8.6.1",
|
"cspell": "^8.6.1",
|
||||||
"interface-datastore": "8.3.2",
|
"interface-datastore": "8.3.2",
|
||||||
@ -37654,6 +37656,102 @@
|
|||||||
"@sinonjs/commons": "^3.0.1"
|
"@sinonjs/commons": "^3.0.1"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"packages/sdk/node_modules/@waku/sds/node_modules/@waku/interfaces": {
|
||||||
|
"version": "0.0.32",
|
||||||
|
"resolved": "https://registry.npmjs.org/@waku/interfaces/-/interfaces-0.0.32.tgz",
|
||||||
|
"integrity": "sha512-4MNfc7ZzQCyQZR1GQQKPgHaWTuPTIvE2wo/b7iokjdeOT+ZSKyJFSetcV07cqnBwyzUv1gc53bJdzyHwVIa5Vw==",
|
||||||
|
"extraneous": true,
|
||||||
|
"license": "MIT OR Apache-2.0",
|
||||||
|
"engines": {
|
||||||
|
"node": ">=22"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"packages/sdk/node_modules/@waku/sds/node_modules/@waku/proto": {
|
||||||
|
"version": "0.0.12",
|
||||||
|
"resolved": "https://registry.npmjs.org/@waku/proto/-/proto-0.0.12.tgz",
|
||||||
|
"integrity": "sha512-JR7wiy3Di628Ywo9qKIi7rhfdC2K7ABoaWa9WX4ZQKieYDs+YwOK+syE53VNwXrtponNeLDI0JIOFzRDalUm1A==",
|
||||||
|
"extraneous": true,
|
||||||
|
"license": "MIT OR Apache-2.0",
|
||||||
|
"dependencies": {
|
||||||
|
"protons-runtime": "^5.4.0"
|
||||||
|
},
|
||||||
|
"engines": {
|
||||||
|
"node": ">=22"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"packages/sdk/node_modules/@waku/sds/node_modules/@waku/utils": {
|
||||||
|
"version": "0.0.25",
|
||||||
|
"resolved": "https://registry.npmjs.org/@waku/utils/-/utils-0.0.25.tgz",
|
||||||
|
"integrity": "sha512-yCbfQ3uqByGNUvCNTj6oHi8fJ6BdVvg+Rj0y2YKrZDSNn73uTMF856lCJdsE86eqDZNCDaRaawTs3ZNEXyWaXw==",
|
||||||
|
"extraneous": true,
|
||||||
|
"license": "MIT OR Apache-2.0",
|
||||||
|
"dependencies": {
|
||||||
|
"@noble/hashes": "^1.3.2",
|
||||||
|
"@waku/interfaces": "0.0.32",
|
||||||
|
"chai": "^4.3.10",
|
||||||
|
"debug": "^4.3.4",
|
||||||
|
"uint8arrays": "^5.0.1"
|
||||||
|
},
|
||||||
|
"engines": {
|
||||||
|
"node": ">=22"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"packages/sdk/node_modules/@waku/sds/node_modules/assertion-error": {
|
||||||
|
"version": "1.1.0",
|
||||||
|
"resolved": "https://registry.npmjs.org/assertion-error/-/assertion-error-1.1.0.tgz",
|
||||||
|
"integrity": "sha512-jgsaNduz+ndvGyFt3uSuWqvy4lCnIJiovtouQN5JZHOKCS2QuhEdbcQHFhVksz2N2U9hXJo8odG7ETyWlEeuDw==",
|
||||||
|
"extraneous": true,
|
||||||
|
"license": "MIT",
|
||||||
|
"engines": {
|
||||||
|
"node": "*"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"packages/sdk/node_modules/@waku/sds/node_modules/check-error": {
|
||||||
|
"version": "1.0.3",
|
||||||
|
"resolved": "https://registry.npmjs.org/check-error/-/check-error-1.0.3.tgz",
|
||||||
|
"integrity": "sha512-iKEoDYaRmd1mxM90a2OEfWhjsjPpYPuQ+lMYsoxB126+t8fw7ySEO48nmDg5COTjxDI65/Y2OWpeEHk3ZOe8zg==",
|
||||||
|
"extraneous": true,
|
||||||
|
"license": "MIT",
|
||||||
|
"dependencies": {
|
||||||
|
"get-func-name": "^2.0.2"
|
||||||
|
},
|
||||||
|
"engines": {
|
||||||
|
"node": "*"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"packages/sdk/node_modules/@waku/sds/node_modules/deep-eql": {
|
||||||
|
"version": "4.1.4",
|
||||||
|
"resolved": "https://registry.npmjs.org/deep-eql/-/deep-eql-4.1.4.tgz",
|
||||||
|
"integrity": "sha512-SUwdGfqdKOwxCPeVYjwSyRpJ7Z+fhpwIAtmCUdZIWZ/YP5R9WAsyuSgpLVDi9bjWoN2LXHNss/dk3urXtdQxGg==",
|
||||||
|
"extraneous": true,
|
||||||
|
"license": "MIT",
|
||||||
|
"dependencies": {
|
||||||
|
"type-detect": "^4.0.0"
|
||||||
|
},
|
||||||
|
"engines": {
|
||||||
|
"node": ">=6"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"packages/sdk/node_modules/@waku/sds/node_modules/loupe": {
|
||||||
|
"version": "2.3.7",
|
||||||
|
"resolved": "https://registry.npmjs.org/loupe/-/loupe-2.3.7.tgz",
|
||||||
|
"integrity": "sha512-zSMINGVYkdpYSOBmLi0D1Uo7JU9nVdQKrHxC8eYlV+9YKK9WePqAlL7lSlorG/U2Fw1w0hTBmaa/jrQ3UbPHtA==",
|
||||||
|
"extraneous": true,
|
||||||
|
"license": "MIT",
|
||||||
|
"dependencies": {
|
||||||
|
"get-func-name": "^2.0.1"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"packages/sdk/node_modules/@waku/sds/node_modules/pathval": {
|
||||||
|
"version": "1.1.1",
|
||||||
|
"resolved": "https://registry.npmjs.org/pathval/-/pathval-1.1.1.tgz",
|
||||||
|
"integrity": "sha512-Dp6zGqpTdETdR63lehJYPeIOqpiNBNtc7BpWSLrOje7UaIsE5aY92r/AunQA7rsXvet3lrJ3JnZX29UPTKXyKQ==",
|
||||||
|
"extraneous": true,
|
||||||
|
"license": "MIT",
|
||||||
|
"engines": {
|
||||||
|
"node": "*"
|
||||||
|
}
|
||||||
|
},
|
||||||
"packages/sdk/node_modules/assertion-error": {
|
"packages/sdk/node_modules/assertion-error": {
|
||||||
"version": "2.0.1",
|
"version": "2.0.1",
|
||||||
"dev": true,
|
"dev": true,
|
||||||
|
|||||||
@ -10,10 +10,10 @@
|
|||||||
"packages/core",
|
"packages/core",
|
||||||
"packages/discovery",
|
"packages/discovery",
|
||||||
"packages/message-encryption",
|
"packages/message-encryption",
|
||||||
"packages/sdk",
|
|
||||||
"packages/relay",
|
|
||||||
"packages/sds",
|
"packages/sds",
|
||||||
"packages/rln",
|
"packages/rln",
|
||||||
|
"packages/sdk",
|
||||||
|
"packages/relay",
|
||||||
"packages/tests",
|
"packages/tests",
|
||||||
"packages/reliability-tests",
|
"packages/reliability-tests",
|
||||||
"packages/headless-tests",
|
"packages/headless-tests",
|
||||||
|
|||||||
@ -72,6 +72,7 @@
|
|||||||
"@waku/discovery": "0.0.11",
|
"@waku/discovery": "0.0.11",
|
||||||
"@waku/interfaces": "0.0.33",
|
"@waku/interfaces": "0.0.33",
|
||||||
"@waku/proto": "^0.0.13",
|
"@waku/proto": "^0.0.13",
|
||||||
|
"@waku/sds": "^0.0.6",
|
||||||
"@waku/utils": "0.0.26",
|
"@waku/utils": "0.0.26",
|
||||||
"libp2p": "2.8.11",
|
"libp2p": "2.8.11",
|
||||||
"lodash.debounce": "^4.0.8"
|
"lodash.debounce": "^4.0.8"
|
||||||
@ -85,6 +86,7 @@
|
|||||||
"@types/chai": "^4.3.11",
|
"@types/chai": "^4.3.11",
|
||||||
"@types/mocha": "^10.0.9",
|
"@types/mocha": "^10.0.9",
|
||||||
"@waku/build-utils": "*",
|
"@waku/build-utils": "*",
|
||||||
|
"@waku/message-encryption": "^0.0.36",
|
||||||
"chai": "^5.1.1",
|
"chai": "^5.1.1",
|
||||||
"cspell": "^8.6.1",
|
"cspell": "^8.6.1",
|
||||||
"interface-datastore": "8.3.2",
|
"interface-datastore": "8.3.2",
|
||||||
|
|||||||
@ -17,6 +17,7 @@ export {
|
|||||||
export { LightPush } from "./light_push/index.js";
|
export { LightPush } from "./light_push/index.js";
|
||||||
export { Filter } from "./filter/index.js";
|
export { Filter } from "./filter/index.js";
|
||||||
export { Store } from "./store/index.js";
|
export { Store } from "./store/index.js";
|
||||||
|
export * from "./reliable_channel/index.js";
|
||||||
|
|
||||||
export * as waku from "@waku/core";
|
export * as waku from "@waku/core";
|
||||||
export * as utils from "@waku/utils";
|
export * as utils from "@waku/utils";
|
||||||
|
|||||||
66
packages/sdk/src/reliable_channel/events.ts
Normal file
66
packages/sdk/src/reliable_channel/events.ts
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
import { IDecodedMessage, ProtocolError } from "@waku/interfaces";
|
||||||
|
import type { HistoryEntry, MessageId } from "@waku/sds";
|
||||||
|
|
||||||
|
export const ReliableChannelEvent = {
|
||||||
|
/**
|
||||||
|
* The message is being sent over the wire.
|
||||||
|
*
|
||||||
|
* This event may be emitted several times if the retry mechanism kicks in.
|
||||||
|
*/
|
||||||
|
SendingMessage: "sending-message",
|
||||||
|
/**
|
||||||
|
* The message has been sent over the wire but has not been acknowledged by
|
||||||
|
* any other party yet.
|
||||||
|
*
|
||||||
|
* We are now waiting for acknowledgements.
|
||||||
|
*
|
||||||
|
* This event may be emitted several times if the
|
||||||
|
* several times if the retry mechanisms kicks in.
|
||||||
|
*/
|
||||||
|
MessageSent: "message-sent",
|
||||||
|
/**
|
||||||
|
* A received bloom filter seems to indicate that the messages was received
|
||||||
|
* by another party.
|
||||||
|
*
|
||||||
|
* However, this is probabilistic. The retry mechanism will wait a bit longer
|
||||||
|
* before trying to send the message again.
|
||||||
|
*/
|
||||||
|
MessagePossiblyAcknowledged: "message-possibly-acknowledged",
|
||||||
|
/**
|
||||||
|
* The message was fully acknowledged by other members of the channel
|
||||||
|
*/
|
||||||
|
MessageAcknowledged: "message-acknowledged",
|
||||||
|
/**
|
||||||
|
* It was not possible to send the messages due to a non-recoverable error,
|
||||||
|
* most likely an internal error for a developer to resolve.
|
||||||
|
*/
|
||||||
|
SendingMessageIrrecoverableError: "sending-message-irrecoverable-error",
|
||||||
|
/**
|
||||||
|
* A new message has been received.
|
||||||
|
*/
|
||||||
|
MessageReceived: "message-received",
|
||||||
|
/**
|
||||||
|
* We are aware of a missing message but failed to retrieve it successfully.
|
||||||
|
*/
|
||||||
|
IrretrievableMessage: "irretrievable-message"
|
||||||
|
};
|
||||||
|
|
||||||
|
export type ReliableChannelEvent =
|
||||||
|
(typeof ReliableChannelEvent)[keyof typeof ReliableChannelEvent];
|
||||||
|
|
||||||
|
export interface ReliableChannelEvents {
|
||||||
|
"sending-message": CustomEvent<MessageId>;
|
||||||
|
"message-sent": CustomEvent<MessageId>;
|
||||||
|
"message-possibly-acknowledged": CustomEvent<{
|
||||||
|
messageId: MessageId;
|
||||||
|
possibleAckCount: number;
|
||||||
|
}>;
|
||||||
|
"message-acknowledged": CustomEvent<MessageId>;
|
||||||
|
// TODO probably T extends IDecodedMessage?
|
||||||
|
"message-received": CustomEvent<IDecodedMessage>;
|
||||||
|
"irretrievable-message": CustomEvent<HistoryEntry>;
|
||||||
|
"sending-message-irrecoverable-error": CustomEvent<{
|
||||||
|
messageId: MessageId;
|
||||||
|
error: ProtocolError;
|
||||||
|
}>;
|
||||||
|
}
|
||||||
2
packages/sdk/src/reliable_channel/index.ts
Normal file
2
packages/sdk/src/reliable_channel/index.ts
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
export { ReliableChannel, ReliableChannelOptions } from "./reliable_channel.js";
|
||||||
|
export { ReliableChannelEvents, ReliableChannelEvent } from "./events.js";
|
||||||
@ -0,0 +1,78 @@
|
|||||||
|
import type {
|
||||||
|
IDecodedMessage,
|
||||||
|
IDecoder,
|
||||||
|
QueryRequestParams
|
||||||
|
} from "@waku/interfaces";
|
||||||
|
import type { MessageId } from "@waku/sds";
|
||||||
|
import { Logger } from "@waku/utils";
|
||||||
|
|
||||||
|
const log = new Logger("sdk:missing-message-retriever");
|
||||||
|
|
||||||
|
const DEFAULT_RETRIEVE_FREQUENCY_MS = 10 * 1000; // 10 seconds
|
||||||
|
|
||||||
|
export class MissingMessageRetriever<T extends IDecodedMessage> {
|
||||||
|
private retrieveInterval: ReturnType<typeof setInterval> | undefined;
|
||||||
|
private missingMessages: Map<MessageId, Uint8Array<ArrayBufferLike>>; // Waku Message Ids
|
||||||
|
|
||||||
|
public constructor(
|
||||||
|
private readonly decoder: IDecoder<T>,
|
||||||
|
private readonly retrieveFrequencyMs: number = DEFAULT_RETRIEVE_FREQUENCY_MS,
|
||||||
|
private readonly _retrieve: <T extends IDecodedMessage>(
|
||||||
|
decoders: IDecoder<T>[],
|
||||||
|
options?: Partial<QueryRequestParams>
|
||||||
|
) => AsyncGenerator<Promise<T | undefined>[]>,
|
||||||
|
private readonly onMessageRetrieved?: (message: T) => Promise<void>
|
||||||
|
) {
|
||||||
|
this.missingMessages = new Map();
|
||||||
|
}
|
||||||
|
|
||||||
|
public start(): void {
|
||||||
|
if (this.retrieveInterval) {
|
||||||
|
clearInterval(this.retrieveInterval);
|
||||||
|
}
|
||||||
|
if (this.retrieveFrequencyMs !== 0) {
|
||||||
|
log.info(`start retrieve loop every ${this.retrieveFrequencyMs}ms`);
|
||||||
|
this.retrieveInterval = setInterval(() => {
|
||||||
|
void this.retrieveMissingMessage();
|
||||||
|
}, this.retrieveFrequencyMs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public stop(): void {
|
||||||
|
if (this.retrieveInterval) {
|
||||||
|
clearInterval(this.retrieveInterval);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public addMissingMessage(
|
||||||
|
messageId: MessageId,
|
||||||
|
retrievalHint: Uint8Array
|
||||||
|
): void {
|
||||||
|
if (!this.missingMessages.has(messageId)) {
|
||||||
|
log.info("missing message notice", messageId, retrievalHint);
|
||||||
|
this.missingMessages.set(messageId, retrievalHint);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public removeMissingMessage(messageId: MessageId): void {
|
||||||
|
if (this.missingMessages.has(messageId)) {
|
||||||
|
this.missingMessages.delete(messageId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async retrieveMissingMessage(): Promise<void> {
|
||||||
|
if (this.missingMessages.size) {
|
||||||
|
const messageHashes = Array.from(this.missingMessages.values());
|
||||||
|
log.info("attempting to retrieve missing message", messageHashes.length);
|
||||||
|
for await (const page of this._retrieve([this.decoder], {
|
||||||
|
messageHashes
|
||||||
|
})) {
|
||||||
|
for await (const msg of page) {
|
||||||
|
if (msg && this.onMessageRetrieved) {
|
||||||
|
await this.onMessageRetrieved(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
672
packages/sdk/src/reliable_channel/reliable_channel.spec.ts
Normal file
672
packages/sdk/src/reliable_channel/reliable_channel.spec.ts
Normal file
@ -0,0 +1,672 @@
|
|||||||
|
import { PeerId, TypedEventEmitter } from "@libp2p/interface";
|
||||||
|
import { createDecoder, createEncoder } from "@waku/core";
|
||||||
|
import {
|
||||||
|
AutoSharding,
|
||||||
|
HealthStatus,
|
||||||
|
IDecodedMessage,
|
||||||
|
IDecoder,
|
||||||
|
IEncoder,
|
||||||
|
type IMessage,
|
||||||
|
ISendOptions,
|
||||||
|
IWaku,
|
||||||
|
LightPushError,
|
||||||
|
LightPushSDKResult,
|
||||||
|
QueryRequestParams
|
||||||
|
} from "@waku/interfaces";
|
||||||
|
import { ContentMessage } from "@waku/sds";
|
||||||
|
import {
|
||||||
|
createRoutingInfo,
|
||||||
|
delay,
|
||||||
|
MockWakuEvents,
|
||||||
|
MockWakuNode
|
||||||
|
} from "@waku/utils";
|
||||||
|
import { bytesToUtf8, hexToBytes, utf8ToBytes } from "@waku/utils/bytes";
|
||||||
|
import { expect } from "chai";
|
||||||
|
import { beforeEach, describe } from "mocha";
|
||||||
|
import sinon from "sinon";
|
||||||
|
|
||||||
|
import { ReliableChannel } from "./index.js";
|
||||||
|
|
||||||
|
const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto";
|
||||||
|
const TEST_NETWORK_CONFIG: AutoSharding = {
|
||||||
|
clusterId: 0,
|
||||||
|
numShardsInCluster: 1
|
||||||
|
};
|
||||||
|
const TEST_ROUTING_INFO = createRoutingInfo(TEST_NETWORK_CONFIG, {
|
||||||
|
contentTopic: TEST_CONTENT_TOPIC
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("Reliable Channel", () => {
|
||||||
|
let mockWakuNode: IWaku;
|
||||||
|
let encoder: IEncoder;
|
||||||
|
let decoder: IDecoder<IDecodedMessage>;
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
mockWakuNode = new MockWakuNode();
|
||||||
|
encoder = createEncoder({
|
||||||
|
contentTopic: TEST_CONTENT_TOPIC,
|
||||||
|
routingInfo: TEST_ROUTING_INFO
|
||||||
|
});
|
||||||
|
decoder = createDecoder(TEST_CONTENT_TOPIC, TEST_ROUTING_INFO);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Outgoing message is emitted as sending", async () => {
|
||||||
|
const reliableChannel = await ReliableChannel.create(
|
||||||
|
mockWakuNode,
|
||||||
|
"MyChannel",
|
||||||
|
"alice",
|
||||||
|
encoder,
|
||||||
|
decoder
|
||||||
|
);
|
||||||
|
|
||||||
|
const message = utf8ToBytes("message in channel");
|
||||||
|
|
||||||
|
// Setting up message tracking
|
||||||
|
const messageId = reliableChannel.send(message);
|
||||||
|
let messageSending = false;
|
||||||
|
reliableChannel.addEventListener("sending-message", (event) => {
|
||||||
|
if (event.detail === messageId) {
|
||||||
|
messageSending = true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
while (!messageSending) {
|
||||||
|
await delay(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(messageSending).to.be.true;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Outgoing message is emitted as sent", async () => {
|
||||||
|
const reliableChannel = await ReliableChannel.create(
|
||||||
|
mockWakuNode,
|
||||||
|
"MyChannel",
|
||||||
|
"alice",
|
||||||
|
encoder,
|
||||||
|
decoder
|
||||||
|
);
|
||||||
|
|
||||||
|
const message = utf8ToBytes("message in channel");
|
||||||
|
|
||||||
|
const messageId = reliableChannel.send(message);
|
||||||
|
|
||||||
|
// Setting up message tracking
|
||||||
|
let messageSent = false;
|
||||||
|
reliableChannel.addEventListener("message-sent", (event) => {
|
||||||
|
if (event.detail === messageId) {
|
||||||
|
messageSent = true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
while (!messageSent) {
|
||||||
|
await delay(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(messageSent).to.be.true;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Encoder error raises irrecoverable error", async () => {
|
||||||
|
mockWakuNode.lightPush!.send = (
|
||||||
|
_encoder: IEncoder,
|
||||||
|
_message: IMessage,
|
||||||
|
_sendOptions?: ISendOptions
|
||||||
|
): Promise<LightPushSDKResult> => {
|
||||||
|
return Promise.resolve({
|
||||||
|
failures: [{ error: LightPushError.EMPTY_PAYLOAD }],
|
||||||
|
successes: []
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
const reliableChannel = await ReliableChannel.create(
|
||||||
|
mockWakuNode,
|
||||||
|
"MyChannel",
|
||||||
|
"alice",
|
||||||
|
encoder,
|
||||||
|
decoder
|
||||||
|
);
|
||||||
|
|
||||||
|
const message = utf8ToBytes("payload doesnt matter");
|
||||||
|
|
||||||
|
encoder.contentTopic = "...";
|
||||||
|
const messageId = reliableChannel.send(message);
|
||||||
|
|
||||||
|
// Setting up message tracking
|
||||||
|
let irrecoverableError = false;
|
||||||
|
reliableChannel.addEventListener(
|
||||||
|
"sending-message-irrecoverable-error",
|
||||||
|
(event) => {
|
||||||
|
if (event.detail.messageId === messageId) {
|
||||||
|
irrecoverableError = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
while (!irrecoverableError) {
|
||||||
|
await delay(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(irrecoverableError).to.be.true;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Outgoing message is not emitted as acknowledged from own outgoing messages", async () => {
|
||||||
|
const reliableChannel = await ReliableChannel.create(
|
||||||
|
mockWakuNode,
|
||||||
|
"MyChannel",
|
||||||
|
"alice",
|
||||||
|
encoder,
|
||||||
|
decoder
|
||||||
|
);
|
||||||
|
|
||||||
|
const message = utf8ToBytes("first message in channel");
|
||||||
|
|
||||||
|
// Setting up message tracking
|
||||||
|
const messageId = ReliableChannel.getMessageId(message);
|
||||||
|
let messageAcknowledged = false;
|
||||||
|
reliableChannel.addEventListener("message-acknowledged", (event) => {
|
||||||
|
if (event.detail === messageId) {
|
||||||
|
messageAcknowledged = true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
reliableChannel.send(message);
|
||||||
|
|
||||||
|
// Sending a second message from the same node should not acknowledge the first one
|
||||||
|
reliableChannel.send(utf8ToBytes("second message in channel"));
|
||||||
|
|
||||||
|
expect(messageAcknowledged).to.be.false;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Outgoing message is possibly acknowledged", async () => {
|
||||||
|
const commonEventEmitter = new TypedEventEmitter<MockWakuEvents>();
|
||||||
|
const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter);
|
||||||
|
const mockWakuNodeBob = new MockWakuNode(commonEventEmitter);
|
||||||
|
|
||||||
|
const reliableChannelAlice = await ReliableChannel.create(
|
||||||
|
mockWakuNodeAlice,
|
||||||
|
"MyChannel",
|
||||||
|
"alice",
|
||||||
|
encoder,
|
||||||
|
decoder
|
||||||
|
);
|
||||||
|
const reliableChannelBob = await ReliableChannel.create(
|
||||||
|
mockWakuNodeBob,
|
||||||
|
"MyChannel",
|
||||||
|
"bob",
|
||||||
|
encoder,
|
||||||
|
decoder,
|
||||||
|
// Bob only includes one message in causal history
|
||||||
|
{ causalHistorySize: 1 }
|
||||||
|
);
|
||||||
|
|
||||||
|
const messages = ["first", "second", "third"].map((m) => {
|
||||||
|
return utf8ToBytes(m);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Alice sets up message tracking for first message
|
||||||
|
const firstMessageId = ReliableChannel.getMessageId(messages[0]);
|
||||||
|
let firstMessagePossiblyAcknowledged = false;
|
||||||
|
reliableChannelAlice.addEventListener(
|
||||||
|
"message-possibly-acknowledged",
|
||||||
|
(event) => {
|
||||||
|
if (event.detail.messageId === firstMessageId) {
|
||||||
|
firstMessagePossiblyAcknowledged = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let messageReceived = false;
|
||||||
|
reliableChannelBob.addEventListener("message-received", (event) => {
|
||||||
|
if (bytesToUtf8(event.detail.payload) === "third") {
|
||||||
|
messageReceived = true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
for (const m of messages) {
|
||||||
|
reliableChannelAlice.send(m);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for Bob to receive last message to ensure it is all included in filter
|
||||||
|
while (!messageReceived) {
|
||||||
|
await delay(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bobs sends a message now, it should include first one in bloom filter
|
||||||
|
reliableChannelBob.send(utf8ToBytes("message back"));
|
||||||
|
while (!firstMessagePossiblyAcknowledged) {
|
||||||
|
await delay(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(firstMessagePossiblyAcknowledged).to.be.true;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Outgoing message is acknowledged", async () => {
|
||||||
|
const commonEventEmitter = new TypedEventEmitter<MockWakuEvents>();
|
||||||
|
const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter);
|
||||||
|
const mockWakuNodeBob = new MockWakuNode(commonEventEmitter);
|
||||||
|
|
||||||
|
const reliableChannelAlice = await ReliableChannel.create(
|
||||||
|
mockWakuNodeAlice,
|
||||||
|
"MyChannel",
|
||||||
|
"alice",
|
||||||
|
encoder,
|
||||||
|
decoder
|
||||||
|
);
|
||||||
|
const reliableChannelBob = await ReliableChannel.create(
|
||||||
|
mockWakuNodeBob,
|
||||||
|
"MyChannel",
|
||||||
|
"bob",
|
||||||
|
encoder,
|
||||||
|
decoder
|
||||||
|
);
|
||||||
|
|
||||||
|
const message = utf8ToBytes("first message in channel");
|
||||||
|
|
||||||
|
const messageId = reliableChannelAlice.send(message);
|
||||||
|
|
||||||
|
// Alice sets up message tracking
|
||||||
|
let messageAcknowledged = false;
|
||||||
|
reliableChannelAlice.addEventListener("message-acknowledged", (event) => {
|
||||||
|
if (event.detail === messageId) {
|
||||||
|
messageAcknowledged = true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let bobReceivedMessage = false;
|
||||||
|
reliableChannelBob.addEventListener("message-received", () => {
|
||||||
|
bobReceivedMessage = true;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Wait for bob to receive the message to ensure it's included in causal history
|
||||||
|
while (!bobReceivedMessage) {
|
||||||
|
await delay(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bobs sends a message now, it should include first one in causal history
|
||||||
|
reliableChannelBob.send(utf8ToBytes("second message in channel"));
|
||||||
|
while (!messageAcknowledged) {
|
||||||
|
await delay(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(messageAcknowledged).to.be.true;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Incoming message is emitted as received", async () => {
|
||||||
|
const reliableChannel = await ReliableChannel.create(
|
||||||
|
mockWakuNode,
|
||||||
|
"MyChannel",
|
||||||
|
"alice",
|
||||||
|
encoder,
|
||||||
|
decoder
|
||||||
|
);
|
||||||
|
|
||||||
|
let receivedMessage: IDecodedMessage;
|
||||||
|
reliableChannel.addEventListener("message-received", (event) => {
|
||||||
|
receivedMessage = event.detail;
|
||||||
|
});
|
||||||
|
|
||||||
|
const message = utf8ToBytes("message in channel");
|
||||||
|
|
||||||
|
reliableChannel.send(message);
|
||||||
|
while (!receivedMessage!) {
|
||||||
|
await delay(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(bytesToUtf8(receivedMessage!.payload)).to.eq(bytesToUtf8(message));
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("Retries", () => {
|
||||||
|
it("Outgoing message is retried until acknowledged", async () => {
|
||||||
|
const commonEventEmitter = new TypedEventEmitter<MockWakuEvents>();
|
||||||
|
const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter);
|
||||||
|
const mockWakuNodeBob = new MockWakuNode(commonEventEmitter);
|
||||||
|
|
||||||
|
const reliableChannelAlice = await ReliableChannel.create(
|
||||||
|
mockWakuNodeAlice,
|
||||||
|
"MyChannel",
|
||||||
|
"alice",
|
||||||
|
encoder,
|
||||||
|
decoder,
|
||||||
|
{
|
||||||
|
retryIntervalMs: 200, // faster for a quick test,
|
||||||
|
processTaskMinElapseMs: 10 // faster so it process message as soon as they arrive
|
||||||
|
}
|
||||||
|
);
|
||||||
|
const reliableChannelBob = await ReliableChannel.create(
|
||||||
|
mockWakuNodeBob,
|
||||||
|
"MyChannel",
|
||||||
|
"bob",
|
||||||
|
encoder,
|
||||||
|
decoder,
|
||||||
|
{
|
||||||
|
syncMinIntervalMs: 0, // do not send sync messages automatically
|
||||||
|
maxRetryAttempts: 0 // This one does not perform retries
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
const msgTxt = "first message in channel";
|
||||||
|
const message = utf8ToBytes(msgTxt);
|
||||||
|
|
||||||
|
// Let's count how many times Bob receives Alice's message
|
||||||
|
let messageCount = 0;
|
||||||
|
reliableChannelBob.addEventListener("message-received", (event) => {
|
||||||
|
if (bytesToUtf8(event.detail.payload) === msgTxt) {
|
||||||
|
messageCount++;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
reliableChannelAlice.send(message);
|
||||||
|
|
||||||
|
while (messageCount < 1) {
|
||||||
|
await delay(10);
|
||||||
|
}
|
||||||
|
expect(messageCount).to.equal(1, "Bob received Alice's message once");
|
||||||
|
|
||||||
|
// No response from Bob should trigger a retry from Alice
|
||||||
|
while (messageCount < 2) {
|
||||||
|
await delay(10);
|
||||||
|
}
|
||||||
|
expect(messageCount).to.equal(2, "retried once");
|
||||||
|
|
||||||
|
// Bobs sends a message now, it should include first one in causal history
|
||||||
|
reliableChannelBob.send(utf8ToBytes("second message in channel"));
|
||||||
|
|
||||||
|
// Wait long enough to confirm no retry is executed
|
||||||
|
await delay(300);
|
||||||
|
|
||||||
|
// Alice should have stopped sending
|
||||||
|
expect(messageCount).to.equal(2, "hasn't retried since it's acked");
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("Missing Message Retrieval", () => {
|
||||||
|
it("Automatically retrieves missing message", async () => {
|
||||||
|
const commonEventEmitter = new TypedEventEmitter<MockWakuEvents>();
|
||||||
|
const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter);
|
||||||
|
|
||||||
|
// Setup, Alice first
|
||||||
|
const reliableChannelAlice = await ReliableChannel.create(
|
||||||
|
mockWakuNodeAlice,
|
||||||
|
"MyChannel",
|
||||||
|
"alice",
|
||||||
|
encoder,
|
||||||
|
decoder,
|
||||||
|
{
|
||||||
|
// disable any automation to better control the test
|
||||||
|
retryIntervalMs: 0,
|
||||||
|
syncMinIntervalMs: 0,
|
||||||
|
retrieveFrequencyMs: 0,
|
||||||
|
processTaskMinElapseMs: 10
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// Bob is offline, Alice sends a message, this is the message we want
|
||||||
|
// Bob to receive in this test.
|
||||||
|
const message = utf8ToBytes("missing message");
|
||||||
|
reliableChannelAlice.send(message);
|
||||||
|
// Wait to be sent
|
||||||
|
await new Promise((resolve) => {
|
||||||
|
reliableChannelAlice.addEventListener("message-sent", resolve, {
|
||||||
|
once: true
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
const sdsMessage = new ContentMessage(
|
||||||
|
ReliableChannel.getMessageId(message),
|
||||||
|
"MyChannel",
|
||||||
|
"alice",
|
||||||
|
[],
|
||||||
|
1,
|
||||||
|
undefined,
|
||||||
|
message
|
||||||
|
);
|
||||||
|
|
||||||
|
// Now Bob goes online
|
||||||
|
const mockWakuNodeBob = new MockWakuNode(commonEventEmitter);
|
||||||
|
|
||||||
|
// Stub store.queryGenerator to return a message
|
||||||
|
const mockMessage = {
|
||||||
|
payload: sdsMessage.encode()
|
||||||
|
};
|
||||||
|
const queryGeneratorStub = sinon.stub().callsFake(async function* (
|
||||||
|
_decoders: IDecoder<IDecodedMessage>[],
|
||||||
|
_options?: Partial<QueryRequestParams>
|
||||||
|
) {
|
||||||
|
yield [Promise.resolve(mockMessage as IDecodedMessage)];
|
||||||
|
});
|
||||||
|
|
||||||
|
(mockWakuNodeBob.store as any) = {
|
||||||
|
queryGenerator: queryGeneratorStub
|
||||||
|
};
|
||||||
|
|
||||||
|
const reliableChannelBob = await ReliableChannel.create(
|
||||||
|
mockWakuNodeBob,
|
||||||
|
"MyChannel",
|
||||||
|
"bob",
|
||||||
|
encoder,
|
||||||
|
decoder,
|
||||||
|
{
|
||||||
|
retryIntervalMs: 0, // disable any automation to better control the test
|
||||||
|
syncMinIntervalMs: 0,
|
||||||
|
processTaskMinElapseMs: 10,
|
||||||
|
retrieveFrequencyMs: 100 // quick loop so the test go fast
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let messageRetrieved = false;
|
||||||
|
reliableChannelBob.addEventListener("message-received", (event) => {
|
||||||
|
if (bytesToUtf8(event.detail.payload) === "missing message") {
|
||||||
|
messageRetrieved = true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Alice sends a sync message, Bob should learn about missing message
|
||||||
|
// and retrieve it
|
||||||
|
await reliableChannelAlice["sendSyncMessage"]();
|
||||||
|
|
||||||
|
await delay(200);
|
||||||
|
|
||||||
|
expect(messageRetrieved).to.be.true;
|
||||||
|
|
||||||
|
// Verify the stub was called once with the right messageHash info
|
||||||
|
expect(queryGeneratorStub.calledOnce).to.be.true;
|
||||||
|
const callArgs = queryGeneratorStub.getCall(0).args;
|
||||||
|
expect(callArgs[1]).to.have.property("messageHashes");
|
||||||
|
expect(callArgs[1].messageHashes).to.be.an("array");
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("Query On Connect Integration E2E Tests", () => {
|
||||||
|
let mockWakuNode: MockWakuNode;
|
||||||
|
let reliableChannel: ReliableChannel<IDecodedMessage>;
|
||||||
|
let encoder: IEncoder;
|
||||||
|
let decoder: IDecoder<IDecodedMessage>;
|
||||||
|
let mockPeerManagerEvents: TypedEventEmitter<any>;
|
||||||
|
let queryGeneratorStub: sinon.SinonStub;
|
||||||
|
let mockPeerId: PeerId;
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
// Setup mock waku node with store capability
|
||||||
|
mockWakuNode = new MockWakuNode();
|
||||||
|
|
||||||
|
// Setup mock peer manager events for QueryOnConnect
|
||||||
|
mockPeerManagerEvents = new TypedEventEmitter();
|
||||||
|
(mockWakuNode as any).peerManager = {
|
||||||
|
events: mockPeerManagerEvents
|
||||||
|
};
|
||||||
|
|
||||||
|
// Setup encoder and decoder
|
||||||
|
encoder = createEncoder({
|
||||||
|
contentTopic: TEST_CONTENT_TOPIC,
|
||||||
|
routingInfo: TEST_ROUTING_INFO
|
||||||
|
});
|
||||||
|
|
||||||
|
decoder = createDecoder(TEST_CONTENT_TOPIC, TEST_ROUTING_INFO);
|
||||||
|
|
||||||
|
// Setup store with queryGenerator for QueryOnConnect
|
||||||
|
queryGeneratorStub = sinon.stub();
|
||||||
|
mockWakuNode.store = {
|
||||||
|
queryGenerator: queryGeneratorStub
|
||||||
|
} as any;
|
||||||
|
|
||||||
|
mockPeerId = {
|
||||||
|
toString: () => "QmTestPeerId"
|
||||||
|
} as unknown as PeerId;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should trigger QueryOnConnect when going offline and store peer reconnects", async () => {
|
||||||
|
// Create a message that will be auto-retrieved
|
||||||
|
const messageText = "Auto-retrieved message";
|
||||||
|
const messagePayload = utf8ToBytes(messageText);
|
||||||
|
|
||||||
|
const sdsMessage = new ContentMessage(
|
||||||
|
ReliableChannel.getMessageId(messagePayload),
|
||||||
|
"testChannel",
|
||||||
|
"testSender",
|
||||||
|
[],
|
||||||
|
1,
|
||||||
|
undefined,
|
||||||
|
messagePayload
|
||||||
|
);
|
||||||
|
|
||||||
|
const autoRetrievedMessage: IDecodedMessage = {
|
||||||
|
hash: hexToBytes("1234"),
|
||||||
|
hashStr: "1234",
|
||||||
|
version: 1,
|
||||||
|
timestamp: new Date(),
|
||||||
|
contentTopic: TEST_CONTENT_TOPIC,
|
||||||
|
pubsubTopic: decoder.pubsubTopic,
|
||||||
|
payload: sdsMessage.encode(),
|
||||||
|
rateLimitProof: undefined,
|
||||||
|
ephemeral: false,
|
||||||
|
meta: undefined
|
||||||
|
};
|
||||||
|
|
||||||
|
// Setup queryGenerator to return the auto-retrieved message
|
||||||
|
queryGeneratorStub.callsFake(async function* () {
|
||||||
|
yield [Promise.resolve(autoRetrievedMessage)];
|
||||||
|
});
|
||||||
|
|
||||||
|
// Create ReliableChannel with queryOnConnect enabled
|
||||||
|
reliableChannel = await ReliableChannel.create(
|
||||||
|
mockWakuNode,
|
||||||
|
"testChannel",
|
||||||
|
"testSender",
|
||||||
|
encoder,
|
||||||
|
decoder
|
||||||
|
);
|
||||||
|
|
||||||
|
// Wait for initial setup
|
||||||
|
await delay(50);
|
||||||
|
|
||||||
|
// Setup complete - focus on testing QueryOnConnect trigger
|
||||||
|
|
||||||
|
// Simulate going offline (change health status)
|
||||||
|
mockWakuNode.events.dispatchEvent(
|
||||||
|
new CustomEvent("health", { detail: HealthStatus.Unhealthy })
|
||||||
|
);
|
||||||
|
|
||||||
|
await delay(10);
|
||||||
|
|
||||||
|
// Simulate store peer reconnection which should trigger QueryOnConnect
|
||||||
|
mockPeerManagerEvents.dispatchEvent(
|
||||||
|
new CustomEvent("store:connect", { detail: mockPeerId })
|
||||||
|
);
|
||||||
|
|
||||||
|
// Wait for store query to be triggered
|
||||||
|
await delay(200);
|
||||||
|
|
||||||
|
// Verify that QueryOnConnect was triggered by the conditions
|
||||||
|
expect(queryGeneratorStub.called).to.be.true;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should trigger QueryOnConnect when time threshold is exceeded", async () => {
|
||||||
|
// Create multiple messages that will be auto-retrieved
|
||||||
|
const message1Text = "First auto-retrieved message";
|
||||||
|
const message2Text = "Second auto-retrieved message";
|
||||||
|
const message1Payload = utf8ToBytes(message1Text);
|
||||||
|
const message2Payload = utf8ToBytes(message2Text);
|
||||||
|
|
||||||
|
const sdsMessage1 = new ContentMessage(
|
||||||
|
ReliableChannel.getMessageId(message1Payload),
|
||||||
|
"testChannel",
|
||||||
|
"testSender",
|
||||||
|
[],
|
||||||
|
1,
|
||||||
|
undefined,
|
||||||
|
message1Payload
|
||||||
|
);
|
||||||
|
|
||||||
|
const sdsMessage2 = new ContentMessage(
|
||||||
|
ReliableChannel.getMessageId(message2Payload),
|
||||||
|
"testChannel",
|
||||||
|
"testSender",
|
||||||
|
[],
|
||||||
|
2,
|
||||||
|
undefined,
|
||||||
|
message2Payload
|
||||||
|
);
|
||||||
|
|
||||||
|
const autoRetrievedMessage1: IDecodedMessage = {
|
||||||
|
hash: hexToBytes("5678"),
|
||||||
|
hashStr: "5678",
|
||||||
|
version: 1,
|
||||||
|
timestamp: new Date(Date.now() - 1000),
|
||||||
|
contentTopic: TEST_CONTENT_TOPIC,
|
||||||
|
pubsubTopic: decoder.pubsubTopic,
|
||||||
|
payload: sdsMessage1.encode(),
|
||||||
|
rateLimitProof: undefined,
|
||||||
|
ephemeral: false,
|
||||||
|
meta: undefined
|
||||||
|
};
|
||||||
|
|
||||||
|
const autoRetrievedMessage2: IDecodedMessage = {
|
||||||
|
hash: hexToBytes("9abc"),
|
||||||
|
hashStr: "9abc",
|
||||||
|
version: 1,
|
||||||
|
timestamp: new Date(),
|
||||||
|
contentTopic: TEST_CONTENT_TOPIC,
|
||||||
|
pubsubTopic: decoder.pubsubTopic,
|
||||||
|
payload: sdsMessage2.encode(),
|
||||||
|
rateLimitProof: undefined,
|
||||||
|
ephemeral: false,
|
||||||
|
meta: undefined
|
||||||
|
};
|
||||||
|
|
||||||
|
// Setup queryGenerator to return multiple messages
|
||||||
|
queryGeneratorStub.callsFake(async function* () {
|
||||||
|
yield [Promise.resolve(autoRetrievedMessage1)];
|
||||||
|
yield [Promise.resolve(autoRetrievedMessage2)];
|
||||||
|
});
|
||||||
|
|
||||||
|
// Create ReliableChannel with queryOnConnect enabled
|
||||||
|
reliableChannel = await ReliableChannel.create(
|
||||||
|
mockWakuNode,
|
||||||
|
"testChannel",
|
||||||
|
"testSender",
|
||||||
|
encoder,
|
||||||
|
decoder,
|
||||||
|
{ queryOnConnect: true }
|
||||||
|
);
|
||||||
|
|
||||||
|
await delay(50);
|
||||||
|
|
||||||
|
// Simulate old last successful query by accessing QueryOnConnect internals
|
||||||
|
// The default threshold is 5 minutes, so we'll set it to an old time
|
||||||
|
if ((reliableChannel as any).queryOnConnect) {
|
||||||
|
((reliableChannel as any).queryOnConnect as any).lastSuccessfulQuery =
|
||||||
|
Date.now() - 6 * 60 * 1000; // 6 minutes ago
|
||||||
|
}
|
||||||
|
|
||||||
|
// Simulate store peer connection which should trigger retrieval due to time threshold
|
||||||
|
mockPeerManagerEvents.dispatchEvent(
|
||||||
|
new CustomEvent("store:connect", { detail: mockPeerId })
|
||||||
|
);
|
||||||
|
|
||||||
|
// Wait for store query to be triggered
|
||||||
|
await delay(200);
|
||||||
|
|
||||||
|
// Verify that QueryOnConnect was triggered due to time threshold
|
||||||
|
expect(queryGeneratorStub.called).to.be.true;
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
676
packages/sdk/src/reliable_channel/reliable_channel.ts
Normal file
676
packages/sdk/src/reliable_channel/reliable_channel.ts
Normal file
@ -0,0 +1,676 @@
|
|||||||
|
import { TypedEventEmitter } from "@libp2p/interface";
|
||||||
|
import { messageHash } from "@waku/core";
|
||||||
|
import {
|
||||||
|
type Callback,
|
||||||
|
type IDecodedMessage,
|
||||||
|
type IDecoder,
|
||||||
|
type IEncoder,
|
||||||
|
type IMessage,
|
||||||
|
ISendOptions,
|
||||||
|
type IWaku,
|
||||||
|
LightPushError,
|
||||||
|
LightPushSDKResult,
|
||||||
|
QueryRequestParams
|
||||||
|
} from "@waku/interfaces";
|
||||||
|
import {
|
||||||
|
type ChannelId,
|
||||||
|
isContentMessage,
|
||||||
|
MessageChannel,
|
||||||
|
MessageChannelEvent,
|
||||||
|
type MessageChannelOptions,
|
||||||
|
Message as SdsMessage,
|
||||||
|
type SenderId,
|
||||||
|
SyncMessage
|
||||||
|
} from "@waku/sds";
|
||||||
|
import { Logger } from "@waku/utils";
|
||||||
|
|
||||||
|
import {
|
||||||
|
QueryOnConnect,
|
||||||
|
QueryOnConnectEvent
|
||||||
|
} from "../query_on_connect/index.js";
|
||||||
|
|
||||||
|
import { ReliableChannelEvent, ReliableChannelEvents } from "./events.js";
|
||||||
|
import { MissingMessageRetriever } from "./missing_message_retriever.js";
|
||||||
|
import { RetryManager } from "./retry_manager.js";
|
||||||
|
|
||||||
|
const log = new Logger("sdk:reliable-channel");
|
||||||
|
|
||||||
|
const DEFAULT_SYNC_MIN_INTERVAL_MS = 30 * 1000; // 30 seconds
|
||||||
|
const DEFAULT_RETRY_INTERVAL_MS = 30 * 1000; // 30 seconds
|
||||||
|
const DEFAULT_MAX_RETRY_ATTEMPTS = 10;
|
||||||
|
const DEFAULT_SWEEP_IN_BUF_INTERVAL_MS = 5 * 1000;
|
||||||
|
const DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS = 1000;
|
||||||
|
|
||||||
|
const IRRECOVERABLE_SENDING_ERRORS: LightPushError[] = [
|
||||||
|
LightPushError.ENCODE_FAILED,
|
||||||
|
LightPushError.EMPTY_PAYLOAD,
|
||||||
|
LightPushError.SIZE_TOO_BIG,
|
||||||
|
LightPushError.RLN_PROOF_GENERATION
|
||||||
|
];
|
||||||
|
|
||||||
|
export type ReliableChannelOptions = MessageChannelOptions & {
|
||||||
|
/**
|
||||||
|
* The minimum interval between 2 sync messages in the channel.
|
||||||
|
*
|
||||||
|
* Meaning, how frequently we want messages in the channel, noting that the
|
||||||
|
* responsibility of sending a sync messages is shared between participants
|
||||||
|
* of the channel.
|
||||||
|
*
|
||||||
|
* `0` means no sync messages will be sent.
|
||||||
|
*
|
||||||
|
* @default 30,000 (30 seconds) [[DEFAULT_SYNC_MIN_INTERVAL_MS]]
|
||||||
|
*/
|
||||||
|
syncMinIntervalMs?: number;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* How long to wait before re-sending a message that as not acknowledged.
|
||||||
|
*
|
||||||
|
* @default 60,000 (60 seconds) [[DEFAULT_RETRY_INTERVAL_MS]]
|
||||||
|
*/
|
||||||
|
retryIntervalMs?: number;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* How many times do we attempt resending messages that were not acknowledged.
|
||||||
|
*
|
||||||
|
* @default 10 [[DEFAULT_MAX_RETRY_ATTEMPTS]]
|
||||||
|
*/
|
||||||
|
maxRetryAttempts?: number;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* How often store queries are done to retrieve missing messages.
|
||||||
|
*
|
||||||
|
* @default 10,000 (10 seconds)
|
||||||
|
*/
|
||||||
|
retrieveFrequencyMs?: number;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* How often SDS message channel incoming buffer is swept.
|
||||||
|
*
|
||||||
|
* @default 5000 (every 5 seconds)
|
||||||
|
*/
|
||||||
|
sweepInBufIntervalMs?: number;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether to automatically do a store query after connection to store nodes.
|
||||||
|
*
|
||||||
|
* @default true
|
||||||
|
*/
|
||||||
|
queryOnConnect?: boolean;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether to auto start the message channel
|
||||||
|
*
|
||||||
|
* @default true
|
||||||
|
*/
|
||||||
|
autoStart?: boolean;
|
||||||
|
|
||||||
|
/** The minimum elapse time between calling the underlying channel process
|
||||||
|
* task for incoming messages. This is to avoid overload when processing
|
||||||
|
* a lot of messages.
|
||||||
|
*
|
||||||
|
* @default 1000 (1 second)
|
||||||
|
*/
|
||||||
|
processTaskMinElapseMs?: number;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An easy-to-use reliable channel that ensures all participants to the channel have eventual message consistency.
|
||||||
|
*
|
||||||
|
* Use events to track:
|
||||||
|
* - if your outgoing messages are sent, acknowledged or error out
|
||||||
|
* - for new incoming messages
|
||||||
|
* @emits [[ReliableChannelEvents]]
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
export class ReliableChannel<
|
||||||
|
T extends IDecodedMessage
|
||||||
|
> extends TypedEventEmitter<ReliableChannelEvents> {
|
||||||
|
private readonly _send: (
|
||||||
|
encoder: IEncoder,
|
||||||
|
message: IMessage,
|
||||||
|
sendOptions?: ISendOptions
|
||||||
|
) => Promise<LightPushSDKResult>;
|
||||||
|
|
||||||
|
private readonly _subscribe: (
|
||||||
|
decoders: IDecoder<T> | IDecoder<T>[],
|
||||||
|
callback: Callback<T>
|
||||||
|
) => Promise<boolean>;
|
||||||
|
|
||||||
|
private readonly _retrieve?: <T extends IDecodedMessage>(
|
||||||
|
decoders: IDecoder<T>[],
|
||||||
|
options?: Partial<QueryRequestParams>
|
||||||
|
) => AsyncGenerator<Promise<T | undefined>[]>;
|
||||||
|
|
||||||
|
private readonly syncMinIntervalMs: number;
|
||||||
|
private syncTimeout: ReturnType<typeof setTimeout> | undefined;
|
||||||
|
private sweepInBufInterval: ReturnType<typeof setInterval> | undefined;
|
||||||
|
private readonly sweepInBufIntervalMs: number;
|
||||||
|
private processTaskTimeout: ReturnType<typeof setTimeout> | undefined;
|
||||||
|
private readonly retryManager: RetryManager | undefined;
|
||||||
|
private readonly missingMessageRetriever?: MissingMessageRetriever<T>;
|
||||||
|
private readonly queryOnConnect?: QueryOnConnect<T>;
|
||||||
|
private readonly processTaskMinElapseMs: number;
|
||||||
|
private _started: boolean;
|
||||||
|
|
||||||
|
private constructor(
|
||||||
|
public node: IWaku,
|
||||||
|
public messageChannel: MessageChannel,
|
||||||
|
private encoder: IEncoder,
|
||||||
|
private decoder: IDecoder<T>,
|
||||||
|
options?: ReliableChannelOptions
|
||||||
|
) {
|
||||||
|
super();
|
||||||
|
if (node.lightPush) {
|
||||||
|
this._send = node.lightPush.send.bind(node.lightPush);
|
||||||
|
} else if (node.relay) {
|
||||||
|
this._send = node.relay.send.bind(node.relay);
|
||||||
|
} else {
|
||||||
|
throw "No protocol available to send messages";
|
||||||
|
}
|
||||||
|
|
||||||
|
if (node.filter) {
|
||||||
|
this._subscribe = node.filter.subscribe.bind(node.filter);
|
||||||
|
} else if (node.relay) {
|
||||||
|
// TODO: Why do relay and filter have different interfaces?
|
||||||
|
// this._subscribe = node.relay.subscribeWithUnsubscribe;
|
||||||
|
throw "Not implemented";
|
||||||
|
} else {
|
||||||
|
throw "No protocol available to receive messages";
|
||||||
|
}
|
||||||
|
|
||||||
|
if (node.store) {
|
||||||
|
this._retrieve = node.store.queryGenerator.bind(node.store);
|
||||||
|
const peerManagerEvents = (node as any)?.peerManager?.events;
|
||||||
|
if (
|
||||||
|
peerManagerEvents !== undefined &&
|
||||||
|
(options?.queryOnConnect ?? true)
|
||||||
|
) {
|
||||||
|
log.info("auto-query enabled");
|
||||||
|
this.queryOnConnect = new QueryOnConnect(
|
||||||
|
[this.decoder],
|
||||||
|
peerManagerEvents,
|
||||||
|
node.events,
|
||||||
|
this._retrieve.bind(this)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.syncMinIntervalMs =
|
||||||
|
options?.syncMinIntervalMs ?? DEFAULT_SYNC_MIN_INTERVAL_MS;
|
||||||
|
|
||||||
|
this.sweepInBufIntervalMs =
|
||||||
|
options?.sweepInBufIntervalMs ?? DEFAULT_SWEEP_IN_BUF_INTERVAL_MS;
|
||||||
|
|
||||||
|
const retryIntervalMs =
|
||||||
|
options?.retryIntervalMs ?? DEFAULT_RETRY_INTERVAL_MS;
|
||||||
|
const maxRetryAttempts =
|
||||||
|
options?.maxRetryAttempts ?? DEFAULT_MAX_RETRY_ATTEMPTS;
|
||||||
|
|
||||||
|
if (retryIntervalMs && maxRetryAttempts) {
|
||||||
|
// TODO: there is a lot to improve. e.g. not point retry to send if node is offline.
|
||||||
|
this.retryManager = new RetryManager(retryIntervalMs, maxRetryAttempts);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.processTaskMinElapseMs =
|
||||||
|
options?.processTaskMinElapseMs ?? DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS;
|
||||||
|
|
||||||
|
if (this._retrieve) {
|
||||||
|
this.missingMessageRetriever = new MissingMessageRetriever(
|
||||||
|
this.decoder,
|
||||||
|
options?.retrieveFrequencyMs,
|
||||||
|
this._retrieve,
|
||||||
|
async (msg: T) => {
|
||||||
|
await this.processIncomingMessage(msg);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
this._started = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public get isStarted(): boolean {
|
||||||
|
return this._started;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used to identify messages, pass the payload of a message you are
|
||||||
|
* about to send to track the events for this message.
|
||||||
|
* This is pre-sds wrapping
|
||||||
|
* @param messagePayload
|
||||||
|
*/
|
||||||
|
public static getMessageId(messagePayload: Uint8Array): string {
|
||||||
|
return MessageChannel.getMessageId(messagePayload);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new message channels. Message channels enables end-to-end
|
||||||
|
* reliability by ensuring that all messages in the channel are received
|
||||||
|
* by other users, and retrieved by this local node.
|
||||||
|
*
|
||||||
|
* emits events about outgoing messages, see [[`ReliableChannel`]] docs.
|
||||||
|
*
|
||||||
|
* Note that all participants in a message channels need to get the messages
|
||||||
|
* from the channel. Meaning:
|
||||||
|
* - all participants must be able to decrypt the messages
|
||||||
|
* - all participants must be subscribing to content topic(s) where the messages are sent
|
||||||
|
*
|
||||||
|
* @param node The waku node to use to send and receive messages
|
||||||
|
* @param channelId An id for the channel, all participants of the channel should use the same id
|
||||||
|
* @param senderId An id for the sender, to ensure acknowledgements are only valid if originating from someone else; best if persisted between sessions
|
||||||
|
* @param encoder A channel operates within a singular encryption layer, hence the same encoder is needed for all messages
|
||||||
|
* @param decoder A channel operates within a singular encryption layer, hence the same decoder is needed for all messages
|
||||||
|
* @param options
|
||||||
|
*/
|
||||||
|
public static async create<T extends IDecodedMessage>(
|
||||||
|
node: IWaku,
|
||||||
|
channelId: ChannelId,
|
||||||
|
senderId: SenderId,
|
||||||
|
encoder: IEncoder,
|
||||||
|
decoder: IDecoder<T>,
|
||||||
|
options?: ReliableChannelOptions
|
||||||
|
): Promise<ReliableChannel<T>> {
|
||||||
|
const sdsMessageChannel = new MessageChannel(channelId, senderId, options);
|
||||||
|
const messageChannel = new ReliableChannel(
|
||||||
|
node,
|
||||||
|
sdsMessageChannel,
|
||||||
|
encoder,
|
||||||
|
decoder,
|
||||||
|
options
|
||||||
|
);
|
||||||
|
|
||||||
|
const autoStart = options?.autoStart ?? true;
|
||||||
|
if (autoStart) {
|
||||||
|
await messageChannel.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
return messageChannel;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sends a message in the channel, will attempt to re-send if not acknowledged
|
||||||
|
* by other participants.
|
||||||
|
*
|
||||||
|
* @param messagePayload
|
||||||
|
* @returns the message id
|
||||||
|
*/
|
||||||
|
public send(messagePayload: Uint8Array): string {
|
||||||
|
const messageId = ReliableChannel.getMessageId(messagePayload);
|
||||||
|
if (!this._started) {
|
||||||
|
this.safeSendEvent("sending-message-irrecoverable-error", {
|
||||||
|
detail: { messageId: messageId, error: "channel is not started" }
|
||||||
|
});
|
||||||
|
}
|
||||||
|
const wrapAndSendBind = this._wrapAndSend.bind(this, messagePayload);
|
||||||
|
this.retryManager?.startRetries(messageId, wrapAndSendBind);
|
||||||
|
wrapAndSendBind();
|
||||||
|
return messageId;
|
||||||
|
}
|
||||||
|
|
||||||
|
private _wrapAndSend(messagePayload: Uint8Array): void {
|
||||||
|
this.messageChannel.pushOutgoingMessage(
|
||||||
|
messagePayload,
|
||||||
|
async (
|
||||||
|
sdsMessage: SdsMessage
|
||||||
|
): Promise<{ success: boolean; retrievalHint?: Uint8Array }> => {
|
||||||
|
// Callback is called once message has added to the SDS outgoing queue
|
||||||
|
// We start by trying to send the message now.
|
||||||
|
|
||||||
|
// `payload` wrapped in SDS
|
||||||
|
const sdsPayload = sdsMessage.encode();
|
||||||
|
|
||||||
|
const wakuMessage = {
|
||||||
|
payload: sdsPayload
|
||||||
|
};
|
||||||
|
|
||||||
|
const messageId = ReliableChannel.getMessageId(messagePayload);
|
||||||
|
|
||||||
|
// TODO: should the encoder give me the message hash?
|
||||||
|
// Encoding now to fail early, used later to get message hash
|
||||||
|
const protoMessage = await this.encoder.toProtoObj(wakuMessage);
|
||||||
|
if (!protoMessage) {
|
||||||
|
this.safeSendEvent("sending-message-irrecoverable-error", {
|
||||||
|
detail: {
|
||||||
|
messageId: messageId,
|
||||||
|
error: "could not encode message"
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return { success: false };
|
||||||
|
}
|
||||||
|
const retrievalHint = messageHash(
|
||||||
|
this.encoder.pubsubTopic,
|
||||||
|
protoMessage
|
||||||
|
);
|
||||||
|
|
||||||
|
this.safeSendEvent("sending-message", {
|
||||||
|
detail: messageId
|
||||||
|
});
|
||||||
|
|
||||||
|
const sendRes = await this._send(this.encoder, wakuMessage);
|
||||||
|
|
||||||
|
// If it's a recoverable failure, we will try again to send later
|
||||||
|
// If not, then we should error to the user now
|
||||||
|
for (const { error } of sendRes.failures) {
|
||||||
|
if (IRRECOVERABLE_SENDING_ERRORS.includes(error)) {
|
||||||
|
// Not recoverable, best to return it
|
||||||
|
log.error("Irrecoverable error, cannot send message: ", error);
|
||||||
|
this.safeSendEvent("sending-message-irrecoverable-error", {
|
||||||
|
detail: {
|
||||||
|
messageId,
|
||||||
|
error
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return { success: false, retrievalHint };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
success: true,
|
||||||
|
retrievalHint
|
||||||
|
};
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// Process outgoing messages straight away
|
||||||
|
this.messageChannel
|
||||||
|
.processTasks()
|
||||||
|
.then(() => {
|
||||||
|
this.messageChannel.sweepOutgoingBuffer();
|
||||||
|
})
|
||||||
|
.catch((err) => {
|
||||||
|
log.error("error encountered when processing sds tasks", err);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private async subscribe(): Promise<boolean> {
|
||||||
|
this.assertStarted();
|
||||||
|
return this._subscribe(this.decoder, async (message: T) => {
|
||||||
|
await this.processIncomingMessage(message);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Don't forget to call `this.messageChannel.sweepIncomingBuffer();` once done.
|
||||||
|
* @param msg
|
||||||
|
* @private
|
||||||
|
*/
|
||||||
|
private async processIncomingMessage<T extends IDecodedMessage>(
|
||||||
|
msg: T
|
||||||
|
): Promise<void> {
|
||||||
|
// New message arrives, we need to unwrap it first
|
||||||
|
const sdsMessage = SdsMessage.decode(msg.payload);
|
||||||
|
|
||||||
|
if (!sdsMessage) {
|
||||||
|
log.error("could not SDS decode message", msg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (sdsMessage.channelId !== this.messageChannel.channelId) {
|
||||||
|
log.warn(
|
||||||
|
"ignoring message with different channel id",
|
||||||
|
sdsMessage.channelId
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const retrievalHint = msg.hash;
|
||||||
|
log.info(`processing message ${sdsMessage.messageId}:${msg.hashStr}`);
|
||||||
|
// SDS Message decoded, let's pass it to the channel so we can learn about
|
||||||
|
// missing messages or the status of previous outgoing messages
|
||||||
|
this.messageChannel.pushIncomingMessage(sdsMessage, retrievalHint);
|
||||||
|
|
||||||
|
this.missingMessageRetriever?.removeMissingMessage(sdsMessage.messageId);
|
||||||
|
|
||||||
|
if (sdsMessage.content && sdsMessage.content.length > 0) {
|
||||||
|
// Now, process the message with callback
|
||||||
|
|
||||||
|
// Overrides msg.payload with unwrapped payload
|
||||||
|
// TODO: can we do better?
|
||||||
|
const { payload: _p, ...allButPayload } = msg;
|
||||||
|
const unwrappedMessage = Object.assign(allButPayload, {
|
||||||
|
payload: sdsMessage.content,
|
||||||
|
hash: msg.hash,
|
||||||
|
hashStr: msg.hashStr,
|
||||||
|
version: msg.version,
|
||||||
|
contentTopic: msg.contentTopic,
|
||||||
|
pubsubTopic: msg.pubsubTopic,
|
||||||
|
timestamp: msg.timestamp,
|
||||||
|
rateLimitProof: msg.rateLimitProof,
|
||||||
|
ephemeral: msg.ephemeral,
|
||||||
|
meta: msg.meta
|
||||||
|
});
|
||||||
|
|
||||||
|
this.safeSendEvent("message-received", {
|
||||||
|
detail: unwrappedMessage as unknown as T
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
this.queueProcessTasks();
|
||||||
|
}
|
||||||
|
|
||||||
|
private async processIncomingMessages<T extends IDecodedMessage>(
|
||||||
|
messages: T[]
|
||||||
|
): Promise<void> {
|
||||||
|
for (const message of messages) {
|
||||||
|
await this.processIncomingMessage(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: For now we only queue process tasks for incoming messages
|
||||||
|
// As this is where there is most volume
|
||||||
|
private queueProcessTasks(): void {
|
||||||
|
// If one is already queued, then we can ignore it
|
||||||
|
if (this.processTaskTimeout === undefined) {
|
||||||
|
this.processTaskTimeout = setTimeout(() => {
|
||||||
|
void this.messageChannel.processTasks().catch((err) => {
|
||||||
|
log.error("error encountered when processing sds tasks", err);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Clear timeout once triggered
|
||||||
|
clearTimeout(this.processTaskTimeout);
|
||||||
|
this.processTaskTimeout = undefined;
|
||||||
|
}, this.processTaskMinElapseMs); // we ensure that we don't call process tasks more than once per second
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async start(): Promise<boolean> {
|
||||||
|
if (this._started) return true;
|
||||||
|
this._started = true;
|
||||||
|
this.setupEventListeners();
|
||||||
|
this.restartSync();
|
||||||
|
this.startSweepIncomingBufferLoop();
|
||||||
|
if (this._retrieve) {
|
||||||
|
this.missingMessageRetriever?.start();
|
||||||
|
this.queryOnConnect?.start();
|
||||||
|
}
|
||||||
|
return this.subscribe();
|
||||||
|
}
|
||||||
|
|
||||||
|
public stop(): void {
|
||||||
|
if (!this._started) return;
|
||||||
|
this._started = false;
|
||||||
|
this.stopSync();
|
||||||
|
this.stopSweepIncomingBufferLoop();
|
||||||
|
this.missingMessageRetriever?.stop();
|
||||||
|
this.queryOnConnect?.stop();
|
||||||
|
// TODO unsubscribe
|
||||||
|
// TODO unsetMessageListeners
|
||||||
|
}
|
||||||
|
|
||||||
|
private assertStarted(): void {
|
||||||
|
if (!this._started) throw Error("Message Channel must be started");
|
||||||
|
}
|
||||||
|
|
||||||
|
private startSweepIncomingBufferLoop(): void {
|
||||||
|
this.stopSweepIncomingBufferLoop();
|
||||||
|
this.sweepInBufInterval = setInterval(() => {
|
||||||
|
log.info("sweep incoming buffer");
|
||||||
|
this.messageChannel.sweepIncomingBuffer();
|
||||||
|
}, this.sweepInBufIntervalMs);
|
||||||
|
}
|
||||||
|
|
||||||
|
private stopSweepIncomingBufferLoop(): void {
|
||||||
|
if (this.sweepInBufInterval) clearInterval(this.sweepInBufInterval);
|
||||||
|
}
|
||||||
|
|
||||||
|
private restartSync(multiplier: number = 1): void {
|
||||||
|
if (this.syncTimeout) {
|
||||||
|
clearTimeout(this.syncTimeout);
|
||||||
|
}
|
||||||
|
if (this.syncMinIntervalMs) {
|
||||||
|
const timeoutMs = this.random() * this.syncMinIntervalMs * multiplier;
|
||||||
|
|
||||||
|
this.syncTimeout = setTimeout(() => {
|
||||||
|
void this.sendSyncMessage();
|
||||||
|
// Always restart a sync, no matter whether the message was sent.
|
||||||
|
// Set a multiplier so we wait a bit longer to not hog the conversation
|
||||||
|
void this.restartSync(2);
|
||||||
|
}, timeoutMs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private stopSync(): void {
|
||||||
|
if (this.syncTimeout) {
|
||||||
|
clearTimeout(this.syncTimeout);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Used to enable overriding when testing
|
||||||
|
private random(): number {
|
||||||
|
return Math.random();
|
||||||
|
}
|
||||||
|
|
||||||
|
private safeSendEvent<T extends ReliableChannelEvent>(
|
||||||
|
event: T,
|
||||||
|
eventInit?: CustomEventInit
|
||||||
|
): void {
|
||||||
|
try {
|
||||||
|
this.dispatchEvent(new CustomEvent(event, eventInit));
|
||||||
|
} catch (error) {
|
||||||
|
log.error(`Failed to dispatch event ${event}:`, error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async sendSyncMessage(): Promise<void> {
|
||||||
|
this.assertStarted();
|
||||||
|
await this.messageChannel.pushOutgoingSyncMessage(
|
||||||
|
async (syncMessage: SyncMessage): Promise<boolean> => {
|
||||||
|
// Callback is called once message has added to the SDS outgoing queue
|
||||||
|
// We start by trying to send the message now.
|
||||||
|
|
||||||
|
// `payload` wrapped in SDS
|
||||||
|
const sdsPayload = syncMessage.encode();
|
||||||
|
|
||||||
|
const wakuMessage = {
|
||||||
|
payload: sdsPayload
|
||||||
|
};
|
||||||
|
|
||||||
|
const sendRes = await this._send(this.encoder, wakuMessage);
|
||||||
|
if (sendRes.failures.length > 0) {
|
||||||
|
log.error("Error sending sync message: ", sendRes);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// Process outgoing messages straight away
|
||||||
|
// TODO: review and optimize
|
||||||
|
await this.messageChannel.processTasks();
|
||||||
|
this.messageChannel.sweepOutgoingBuffer();
|
||||||
|
}
|
||||||
|
|
||||||
|
private setupEventListeners(): void {
|
||||||
|
this.messageChannel.addEventListener(
|
||||||
|
MessageChannelEvent.OutMessageSent,
|
||||||
|
(event) => {
|
||||||
|
if (event.detail.content) {
|
||||||
|
const messageId = ReliableChannel.getMessageId(event.detail.content);
|
||||||
|
this.safeSendEvent("message-sent", {
|
||||||
|
detail: messageId
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
this.messageChannel.addEventListener(
|
||||||
|
MessageChannelEvent.OutMessageAcknowledged,
|
||||||
|
(event) => {
|
||||||
|
if (event.detail) {
|
||||||
|
this.safeSendEvent("message-acknowledged", {
|
||||||
|
detail: event.detail
|
||||||
|
});
|
||||||
|
|
||||||
|
// Stopping retries
|
||||||
|
this.retryManager?.stopRetries(event.detail);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
this.messageChannel.addEventListener(
|
||||||
|
MessageChannelEvent.OutMessagePossiblyAcknowledged,
|
||||||
|
(event) => {
|
||||||
|
if (event.detail) {
|
||||||
|
this.safeSendEvent("message-possibly-acknowledged", {
|
||||||
|
detail: {
|
||||||
|
messageId: event.detail.messageId,
|
||||||
|
possibleAckCount: event.detail.count
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
this.messageChannel.addEventListener(
|
||||||
|
MessageChannelEvent.InSyncReceived,
|
||||||
|
(_event) => {
|
||||||
|
// restart the timeout when a sync message has been received
|
||||||
|
this.restartSync();
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
this.messageChannel.addEventListener(
|
||||||
|
MessageChannelEvent.InMessageReceived,
|
||||||
|
(event) => {
|
||||||
|
// restart the timeout when a content message has been received
|
||||||
|
if (isContentMessage(event.detail)) {
|
||||||
|
// send a sync message faster to ack someone's else
|
||||||
|
this.restartSync(0.5);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
this.messageChannel.addEventListener(
|
||||||
|
MessageChannelEvent.OutMessageSent,
|
||||||
|
(event) => {
|
||||||
|
// restart the timeout when a content message has been sent
|
||||||
|
if (isContentMessage(event.detail)) {
|
||||||
|
this.restartSync();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
this.messageChannel.addEventListener(
|
||||||
|
MessageChannelEvent.InMessageMissing,
|
||||||
|
(event) => {
|
||||||
|
for (const { messageId, retrievalHint } of event.detail) {
|
||||||
|
if (retrievalHint && this.missingMessageRetriever) {
|
||||||
|
this.missingMessageRetriever.addMissingMessage(
|
||||||
|
messageId,
|
||||||
|
retrievalHint
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
if (this.queryOnConnect) {
|
||||||
|
this.queryOnConnect.addEventListener(
|
||||||
|
QueryOnConnectEvent.MessagesRetrieved,
|
||||||
|
(event) => {
|
||||||
|
void this.processIncomingMessages(event.detail);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
187
packages/sdk/src/reliable_channel/reliable_channel_acks.spec.ts
Normal file
187
packages/sdk/src/reliable_channel/reliable_channel_acks.spec.ts
Normal file
@ -0,0 +1,187 @@
|
|||||||
|
import { TypedEventEmitter } from "@libp2p/interface";
|
||||||
|
import { createDecoder, createEncoder } from "@waku/core";
|
||||||
|
import {
|
||||||
|
AutoSharding,
|
||||||
|
IDecodedMessage,
|
||||||
|
IDecoder,
|
||||||
|
IEncoder
|
||||||
|
} from "@waku/interfaces";
|
||||||
|
import {
|
||||||
|
createRoutingInfo,
|
||||||
|
delay,
|
||||||
|
MockWakuEvents,
|
||||||
|
MockWakuNode
|
||||||
|
} from "@waku/utils";
|
||||||
|
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
|
||||||
|
import { expect } from "chai";
|
||||||
|
import { beforeEach, describe } from "mocha";
|
||||||
|
|
||||||
|
import { ReliableChannel } from "./index.js";
|
||||||
|
|
||||||
|
const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto";
|
||||||
|
const TEST_NETWORK_CONFIG: AutoSharding = {
|
||||||
|
clusterId: 0,
|
||||||
|
numShardsInCluster: 1
|
||||||
|
};
|
||||||
|
const TEST_ROUTING_INFO = createRoutingInfo(TEST_NETWORK_CONFIG, {
|
||||||
|
contentTopic: TEST_CONTENT_TOPIC
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("Reliable Channel: Acks", () => {
|
||||||
|
let encoder: IEncoder;
|
||||||
|
let decoder: IDecoder<IDecodedMessage>;
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
encoder = createEncoder({
|
||||||
|
contentTopic: TEST_CONTENT_TOPIC,
|
||||||
|
routingInfo: TEST_ROUTING_INFO
|
||||||
|
});
|
||||||
|
decoder = createDecoder(TEST_CONTENT_TOPIC, TEST_ROUTING_INFO);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Outgoing message is acknowledged", async () => {
|
||||||
|
const commonEventEmitter = new TypedEventEmitter<MockWakuEvents>();
|
||||||
|
const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter);
|
||||||
|
const mockWakuNodeBob = new MockWakuNode(commonEventEmitter);
|
||||||
|
|
||||||
|
const reliableChannelAlice = await ReliableChannel.create(
|
||||||
|
mockWakuNodeAlice,
|
||||||
|
"MyChannel",
|
||||||
|
"alice",
|
||||||
|
encoder,
|
||||||
|
decoder
|
||||||
|
);
|
||||||
|
const reliableChannelBob = await ReliableChannel.create(
|
||||||
|
mockWakuNodeBob,
|
||||||
|
"MyChannel",
|
||||||
|
"bob",
|
||||||
|
encoder,
|
||||||
|
decoder
|
||||||
|
);
|
||||||
|
|
||||||
|
const message = utf8ToBytes("first message in channel");
|
||||||
|
|
||||||
|
// Alice sets up message tracking
|
||||||
|
const messageId = ReliableChannel.getMessageId(message);
|
||||||
|
|
||||||
|
let messageReceived = false;
|
||||||
|
reliableChannelBob.addEventListener("message-received", (event) => {
|
||||||
|
if (bytesToUtf8(event.detail.payload) === "first message in channel") {
|
||||||
|
messageReceived = true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let messageAcknowledged = false;
|
||||||
|
reliableChannelAlice.addEventListener("message-acknowledged", (event) => {
|
||||||
|
if (event.detail === messageId) {
|
||||||
|
messageAcknowledged = true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
reliableChannelAlice.send(message);
|
||||||
|
|
||||||
|
// Wait for Bob to receive the message to ensure it uses it in causal history
|
||||||
|
while (!messageReceived) {
|
||||||
|
await delay(50);
|
||||||
|
}
|
||||||
|
// Bobs sends a message now, it should include first one in causal history
|
||||||
|
reliableChannelBob.send(utf8ToBytes("second message in channel"));
|
||||||
|
while (!messageAcknowledged) {
|
||||||
|
await delay(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(messageAcknowledged).to.be.true;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Re-sent message is acknowledged once other parties join.", async () => {
|
||||||
|
const commonEventEmitter = new TypedEventEmitter<MockWakuEvents>();
|
||||||
|
const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter);
|
||||||
|
|
||||||
|
// Setup, Alice first
|
||||||
|
const reliableChannelAlice = await ReliableChannel.create(
|
||||||
|
mockWakuNodeAlice,
|
||||||
|
"MyChannel",
|
||||||
|
"alice",
|
||||||
|
encoder,
|
||||||
|
decoder,
|
||||||
|
{
|
||||||
|
retryIntervalMs: 0, // disable any automation to better control the test
|
||||||
|
syncMinIntervalMs: 0,
|
||||||
|
processTaskMinElapseMs: 10
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// Bob is offline, Alice sends a message, this is the message we want
|
||||||
|
// acknowledged in this test.
|
||||||
|
const message = utf8ToBytes("message to be acknowledged");
|
||||||
|
const messageId = ReliableChannel.getMessageId(message);
|
||||||
|
let messageAcknowledged = false;
|
||||||
|
reliableChannelAlice.addEventListener("message-acknowledged", (event) => {
|
||||||
|
if (event.detail === messageId) {
|
||||||
|
messageAcknowledged = true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
reliableChannelAlice.send(message);
|
||||||
|
|
||||||
|
// Wait a bit to ensure Bob does not receive the message
|
||||||
|
await delay(100);
|
||||||
|
|
||||||
|
// Now Bob goes online
|
||||||
|
const mockWakuNodeBob = new MockWakuNode(commonEventEmitter);
|
||||||
|
const reliableChannelBob = await ReliableChannel.create(
|
||||||
|
mockWakuNodeBob,
|
||||||
|
"MyChannel",
|
||||||
|
"bob",
|
||||||
|
encoder,
|
||||||
|
decoder,
|
||||||
|
{
|
||||||
|
retryIntervalMs: 0, // disable any automation to better control the test
|
||||||
|
syncMinIntervalMs: 0,
|
||||||
|
processTaskMinElapseMs: 10
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// Track when Bob receives the message
|
||||||
|
let bobReceivedMessage = false;
|
||||||
|
reliableChannelBob.addEventListener("message-received", (event) => {
|
||||||
|
if (bytesToUtf8(event.detail.payload!) === "message to be acknowledged") {
|
||||||
|
bobReceivedMessage = true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Some sync messages are exchanged
|
||||||
|
await reliableChannelAlice["sendSyncMessage"]();
|
||||||
|
await reliableChannelBob["sendSyncMessage"]();
|
||||||
|
|
||||||
|
// wait a bit to ensure messages are processed
|
||||||
|
await delay(100);
|
||||||
|
|
||||||
|
// Some content messages are exchanged too
|
||||||
|
reliableChannelAlice.send(utf8ToBytes("some message"));
|
||||||
|
reliableChannelBob.send(utf8ToBytes("some other message"));
|
||||||
|
|
||||||
|
// wait a bit to ensure messages are processed
|
||||||
|
await delay(100);
|
||||||
|
|
||||||
|
// At this point, the message shouldn't be acknowledged yet as Bob
|
||||||
|
// does not have a complete log
|
||||||
|
expect(messageAcknowledged).to.be.false;
|
||||||
|
|
||||||
|
// Now Alice resends the message
|
||||||
|
reliableChannelAlice.send(message);
|
||||||
|
|
||||||
|
// Wait for Bob to receive the message
|
||||||
|
while (!bobReceivedMessage) {
|
||||||
|
await delay(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bob receives it, and should include it in its sync
|
||||||
|
await reliableChannelBob["sendSyncMessage"]();
|
||||||
|
while (!messageAcknowledged) {
|
||||||
|
await delay(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
// The sync should acknowledge the message
|
||||||
|
expect(messageAcknowledged).to.be.true;
|
||||||
|
});
|
||||||
|
});
|
||||||
@ -0,0 +1,326 @@
|
|||||||
|
import { TypedEventEmitter } from "@libp2p/interface";
|
||||||
|
import {
|
||||||
|
AutoSharding,
|
||||||
|
IDecodedMessage,
|
||||||
|
IDecoder,
|
||||||
|
IEncoder,
|
||||||
|
type IMessage,
|
||||||
|
ISendOptions,
|
||||||
|
IWaku,
|
||||||
|
LightPushError,
|
||||||
|
LightPushSDKResult
|
||||||
|
} from "@waku/interfaces";
|
||||||
|
import { generatePrivateKey, getPublicKey } from "@waku/message-encryption";
|
||||||
|
import {
|
||||||
|
createDecoder as createEciesDecoder,
|
||||||
|
createEncoder as createEciesEncoder
|
||||||
|
} from "@waku/message-encryption/ecies";
|
||||||
|
import {
|
||||||
|
createRoutingInfo,
|
||||||
|
delay,
|
||||||
|
MockWakuEvents,
|
||||||
|
MockWakuNode
|
||||||
|
} from "@waku/utils";
|
||||||
|
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
|
||||||
|
import { expect } from "chai";
|
||||||
|
import { beforeEach, describe } from "mocha";
|
||||||
|
|
||||||
|
import { ReliableChannel } from "./index.js";
|
||||||
|
|
||||||
|
const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto";
|
||||||
|
const TEST_NETWORK_CONFIG: AutoSharding = {
|
||||||
|
clusterId: 0,
|
||||||
|
numShardsInCluster: 1
|
||||||
|
};
|
||||||
|
const TEST_ROUTING_INFO = createRoutingInfo(TEST_NETWORK_CONFIG, {
|
||||||
|
contentTopic: TEST_CONTENT_TOPIC
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("Reliable Channel: Encryption", () => {
|
||||||
|
let mockWakuNode: IWaku;
|
||||||
|
let encoder: IEncoder;
|
||||||
|
let decoder: IDecoder<IDecodedMessage>;
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
mockWakuNode = new MockWakuNode();
|
||||||
|
const privateKey = generatePrivateKey();
|
||||||
|
const publicKey = getPublicKey(privateKey);
|
||||||
|
encoder = createEciesEncoder({
|
||||||
|
contentTopic: TEST_CONTENT_TOPIC,
|
||||||
|
routingInfo: TEST_ROUTING_INFO,
|
||||||
|
publicKey
|
||||||
|
});
|
||||||
|
decoder = createEciesDecoder(
|
||||||
|
TEST_CONTENT_TOPIC,
|
||||||
|
TEST_ROUTING_INFO,
|
||||||
|
privateKey
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Outgoing message is emitted as sending", async () => {
|
||||||
|
const reliableChannel = await ReliableChannel.create(
|
||||||
|
mockWakuNode,
|
||||||
|
"MyChannel",
|
||||||
|
"alice",
|
||||||
|
encoder,
|
||||||
|
decoder
|
||||||
|
);
|
||||||
|
|
||||||
|
const message = utf8ToBytes("message in channel");
|
||||||
|
|
||||||
|
// Setting up message tracking
|
||||||
|
const messageId = ReliableChannel.getMessageId(message);
|
||||||
|
let messageSending = false;
|
||||||
|
reliableChannel.addEventListener("sending-message", (event) => {
|
||||||
|
if (event.detail === messageId) {
|
||||||
|
messageSending = true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
reliableChannel.send(message);
|
||||||
|
while (!messageSending) {
|
||||||
|
await delay(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(messageSending).to.be.true;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Outgoing message is emitted as sent", async () => {
|
||||||
|
const reliableChannel = await ReliableChannel.create(
|
||||||
|
mockWakuNode,
|
||||||
|
"MyChannel",
|
||||||
|
"alice",
|
||||||
|
encoder,
|
||||||
|
decoder
|
||||||
|
);
|
||||||
|
|
||||||
|
const message = utf8ToBytes("message in channel");
|
||||||
|
|
||||||
|
// Setting up message tracking
|
||||||
|
const messageId = ReliableChannel.getMessageId(message);
|
||||||
|
let messageSent = false;
|
||||||
|
reliableChannel.addEventListener("message-sent", (event) => {
|
||||||
|
if (event.detail === messageId) {
|
||||||
|
messageSent = true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
reliableChannel.send(message);
|
||||||
|
while (!messageSent) {
|
||||||
|
await delay(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(messageSent).to.be.true;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Encoder error raises irrecoverable error", async () => {
|
||||||
|
mockWakuNode.lightPush!.send = (
|
||||||
|
_encoder: IEncoder,
|
||||||
|
_message: IMessage,
|
||||||
|
_sendOptions?: ISendOptions
|
||||||
|
): Promise<LightPushSDKResult> => {
|
||||||
|
return Promise.resolve({
|
||||||
|
failures: [{ error: LightPushError.EMPTY_PAYLOAD }],
|
||||||
|
successes: []
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
const reliableChannel = await ReliableChannel.create(
|
||||||
|
mockWakuNode,
|
||||||
|
"MyChannel",
|
||||||
|
"alice",
|
||||||
|
encoder,
|
||||||
|
decoder
|
||||||
|
);
|
||||||
|
|
||||||
|
const message = utf8ToBytes("payload doesnt matter");
|
||||||
|
|
||||||
|
// Setting up message tracking
|
||||||
|
const messageId = ReliableChannel.getMessageId(message);
|
||||||
|
let irrecoverableError = false;
|
||||||
|
reliableChannel.addEventListener(
|
||||||
|
"sending-message-irrecoverable-error",
|
||||||
|
(event) => {
|
||||||
|
if (event.detail.messageId === messageId) {
|
||||||
|
irrecoverableError = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
encoder.contentTopic = "...";
|
||||||
|
reliableChannel.send(message);
|
||||||
|
while (!irrecoverableError) {
|
||||||
|
await delay(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(irrecoverableError).to.be.true;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Outgoing message is not emitted as acknowledged from own outgoing messages", async () => {
|
||||||
|
const reliableChannel = await ReliableChannel.create(
|
||||||
|
mockWakuNode,
|
||||||
|
"MyChannel",
|
||||||
|
"alice",
|
||||||
|
encoder,
|
||||||
|
decoder
|
||||||
|
);
|
||||||
|
|
||||||
|
const message = utf8ToBytes("first message in channel");
|
||||||
|
|
||||||
|
// Setting up message tracking
|
||||||
|
const messageId = ReliableChannel.getMessageId(message);
|
||||||
|
let messageAcknowledged = false;
|
||||||
|
reliableChannel.addEventListener("message-acknowledged", (event) => {
|
||||||
|
if (event.detail === messageId) {
|
||||||
|
messageAcknowledged = true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
reliableChannel.send(message);
|
||||||
|
|
||||||
|
// Sending a second message from the same node should not acknowledge the first one
|
||||||
|
reliableChannel.send(utf8ToBytes("second message in channel"));
|
||||||
|
|
||||||
|
// Wait a bit to be sure no event is emitted
|
||||||
|
await delay(200);
|
||||||
|
|
||||||
|
expect(messageAcknowledged).to.be.false;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Outgoing message is possibly acknowledged", async () => {
|
||||||
|
const commonEventEmitter = new TypedEventEmitter<MockWakuEvents>();
|
||||||
|
const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter);
|
||||||
|
const mockWakuNodeBob = new MockWakuNode(commonEventEmitter);
|
||||||
|
|
||||||
|
const reliableChannelAlice = await ReliableChannel.create(
|
||||||
|
mockWakuNodeAlice,
|
||||||
|
"MyChannel",
|
||||||
|
"alice",
|
||||||
|
encoder,
|
||||||
|
decoder
|
||||||
|
);
|
||||||
|
const reliableChannelBob = await ReliableChannel.create(
|
||||||
|
mockWakuNodeBob,
|
||||||
|
"MyChannel",
|
||||||
|
"bob",
|
||||||
|
encoder,
|
||||||
|
decoder,
|
||||||
|
// Bob only includes one message in causal history
|
||||||
|
{ causalHistorySize: 1 }
|
||||||
|
);
|
||||||
|
|
||||||
|
const messages = ["first", "second", "third"].map((m) => {
|
||||||
|
return utf8ToBytes(m);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Alice sets up message tracking for first message
|
||||||
|
const firstMessageId = ReliableChannel.getMessageId(messages[0]);
|
||||||
|
let firstMessagePossiblyAcknowledged = false;
|
||||||
|
reliableChannelAlice.addEventListener(
|
||||||
|
"message-possibly-acknowledged",
|
||||||
|
(event) => {
|
||||||
|
if (event.detail.messageId === firstMessageId) {
|
||||||
|
firstMessagePossiblyAcknowledged = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let bobMessageReceived = 0;
|
||||||
|
reliableChannelAlice.addEventListener("message-received", () => {
|
||||||
|
bobMessageReceived++;
|
||||||
|
});
|
||||||
|
|
||||||
|
for (const m of messages) {
|
||||||
|
reliableChannelAlice.send(m);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for Bob to receive all messages to ensure filter is updated
|
||||||
|
while (bobMessageReceived < 3) {
|
||||||
|
await delay(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bobs sends a message now, it should include first one in bloom filter
|
||||||
|
reliableChannelBob.send(utf8ToBytes("message back"));
|
||||||
|
while (!firstMessagePossiblyAcknowledged) {
|
||||||
|
await delay(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(firstMessagePossiblyAcknowledged).to.be.true;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Outgoing message is acknowledged", async () => {
|
||||||
|
const commonEventEmitter = new TypedEventEmitter<MockWakuEvents>();
|
||||||
|
const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter);
|
||||||
|
const mockWakuNodeBob = new MockWakuNode(commonEventEmitter);
|
||||||
|
|
||||||
|
const reliableChannelAlice = await ReliableChannel.create(
|
||||||
|
mockWakuNodeAlice,
|
||||||
|
"MyChannel",
|
||||||
|
"alice",
|
||||||
|
encoder,
|
||||||
|
decoder
|
||||||
|
);
|
||||||
|
const reliableChannelBob = await ReliableChannel.create(
|
||||||
|
mockWakuNodeBob,
|
||||||
|
"MyChannel",
|
||||||
|
"bob",
|
||||||
|
encoder,
|
||||||
|
decoder
|
||||||
|
);
|
||||||
|
|
||||||
|
const message = utf8ToBytes("first message in channel");
|
||||||
|
|
||||||
|
// Alice sets up message tracking
|
||||||
|
const messageId = ReliableChannel.getMessageId(message);
|
||||||
|
let messageAcknowledged = false;
|
||||||
|
reliableChannelAlice.addEventListener("message-acknowledged", (event) => {
|
||||||
|
if (event.detail === messageId) {
|
||||||
|
messageAcknowledged = true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let bobReceivedMessage = false;
|
||||||
|
reliableChannelBob.addEventListener("message-received", () => {
|
||||||
|
bobReceivedMessage = true;
|
||||||
|
});
|
||||||
|
|
||||||
|
reliableChannelAlice.send(message);
|
||||||
|
|
||||||
|
// Wait for Bob to receive the message
|
||||||
|
while (!bobReceivedMessage) {
|
||||||
|
await delay(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bobs sends a message now, it should include first one in causal history
|
||||||
|
reliableChannelBob.send(utf8ToBytes("second message in channel"));
|
||||||
|
while (!messageAcknowledged) {
|
||||||
|
await delay(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(messageAcknowledged).to.be.true;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Incoming message is emitted as received", async () => {
|
||||||
|
const reliableChannel = await ReliableChannel.create(
|
||||||
|
mockWakuNode,
|
||||||
|
"MyChannel",
|
||||||
|
"alice",
|
||||||
|
encoder,
|
||||||
|
decoder
|
||||||
|
);
|
||||||
|
|
||||||
|
let receivedMessage: IDecodedMessage;
|
||||||
|
reliableChannel.addEventListener("message-received", (event) => {
|
||||||
|
receivedMessage = event.detail;
|
||||||
|
});
|
||||||
|
|
||||||
|
const message = utf8ToBytes("message in channel");
|
||||||
|
|
||||||
|
reliableChannel.send(message);
|
||||||
|
while (!receivedMessage!) {
|
||||||
|
await delay(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(bytesToUtf8(receivedMessage!.payload)).to.eq(bytesToUtf8(message));
|
||||||
|
});
|
||||||
|
});
|
||||||
332
packages/sdk/src/reliable_channel/reliable_channel_sync.spec.ts
Normal file
332
packages/sdk/src/reliable_channel/reliable_channel_sync.spec.ts
Normal file
@ -0,0 +1,332 @@
|
|||||||
|
import { TypedEventEmitter } from "@libp2p/interface";
|
||||||
|
import { createDecoder, createEncoder } from "@waku/core";
|
||||||
|
import {
|
||||||
|
AutoSharding,
|
||||||
|
IDecodedMessage,
|
||||||
|
IDecoder,
|
||||||
|
IEncoder,
|
||||||
|
IWaku
|
||||||
|
} from "@waku/interfaces";
|
||||||
|
import { MessageChannelEvent } from "@waku/sds";
|
||||||
|
import {
|
||||||
|
createRoutingInfo,
|
||||||
|
delay,
|
||||||
|
MockWakuEvents,
|
||||||
|
MockWakuNode
|
||||||
|
} from "@waku/utils";
|
||||||
|
import { utf8ToBytes } from "@waku/utils/bytes";
|
||||||
|
import { expect } from "chai";
|
||||||
|
import { beforeEach, describe } from "mocha";
|
||||||
|
|
||||||
|
import { ReliableChannel } from "./index.js";
|
||||||
|
|
||||||
|
const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto";
|
||||||
|
const TEST_NETWORK_CONFIG: AutoSharding = {
|
||||||
|
clusterId: 0,
|
||||||
|
numShardsInCluster: 1
|
||||||
|
};
|
||||||
|
const TEST_ROUTING_INFO = createRoutingInfo(TEST_NETWORK_CONFIG, {
|
||||||
|
contentTopic: TEST_CONTENT_TOPIC
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("Reliable Channel: Sync", () => {
|
||||||
|
let mockWakuNode: IWaku;
|
||||||
|
let encoder: IEncoder;
|
||||||
|
let decoder: IDecoder<IDecodedMessage>;
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
mockWakuNode = new MockWakuNode();
|
||||||
|
encoder = createEncoder({
|
||||||
|
contentTopic: TEST_CONTENT_TOPIC,
|
||||||
|
routingInfo: TEST_ROUTING_INFO
|
||||||
|
});
|
||||||
|
decoder = createDecoder(TEST_CONTENT_TOPIC, TEST_ROUTING_INFO);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Sync message is sent within sync frequency", async () => {
|
||||||
|
const syncMinIntervalMs = 100;
|
||||||
|
const reliableChannel = await ReliableChannel.create(
|
||||||
|
mockWakuNode,
|
||||||
|
"MyChannel",
|
||||||
|
"alice",
|
||||||
|
encoder,
|
||||||
|
decoder,
|
||||||
|
{
|
||||||
|
syncMinIntervalMs
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let syncMessageSent = false;
|
||||||
|
reliableChannel.messageChannel.addEventListener(
|
||||||
|
MessageChannelEvent.OutSyncSent,
|
||||||
|
(_event) => {
|
||||||
|
syncMessageSent = true;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
await delay(syncMinIntervalMs);
|
||||||
|
|
||||||
|
expect(syncMessageSent).to.be.true;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Sync message are not sent excessively within sync frequency", async () => {
|
||||||
|
const syncMinIntervalMs = 100;
|
||||||
|
const reliableChannel = await ReliableChannel.create(
|
||||||
|
mockWakuNode,
|
||||||
|
"MyChannel",
|
||||||
|
"alice",
|
||||||
|
encoder,
|
||||||
|
decoder,
|
||||||
|
{
|
||||||
|
syncMinIntervalMs
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let syncMessageSentCount = 0;
|
||||||
|
reliableChannel.messageChannel.addEventListener(
|
||||||
|
MessageChannelEvent.OutSyncSent,
|
||||||
|
(_event) => {
|
||||||
|
syncMessageSentCount++;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
await delay(syncMinIntervalMs);
|
||||||
|
|
||||||
|
// There is randomness to this, but it should not be excessive
|
||||||
|
expect(syncMessageSentCount).to.be.lessThan(3);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Sync message is not sent if another sync message was just received", async function () {
|
||||||
|
this.timeout(5000);
|
||||||
|
|
||||||
|
const commonEventEmitter = new TypedEventEmitter<MockWakuEvents>();
|
||||||
|
const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter);
|
||||||
|
const mockWakuNodeBob = new MockWakuNode(commonEventEmitter);
|
||||||
|
|
||||||
|
const syncMinIntervalMs = 1000;
|
||||||
|
|
||||||
|
const reliableChannelAlice = await ReliableChannel.create(
|
||||||
|
mockWakuNodeAlice,
|
||||||
|
"MyChannel",
|
||||||
|
"alice",
|
||||||
|
encoder,
|
||||||
|
decoder,
|
||||||
|
{
|
||||||
|
syncMinIntervalMs: 0, // does not send sync messages automatically
|
||||||
|
processTaskMinElapseMs: 10
|
||||||
|
}
|
||||||
|
);
|
||||||
|
const reliableChannelBob = await ReliableChannel.create(
|
||||||
|
mockWakuNodeBob,
|
||||||
|
"MyChannel",
|
||||||
|
"bob",
|
||||||
|
encoder,
|
||||||
|
decoder,
|
||||||
|
{
|
||||||
|
syncMinIntervalMs,
|
||||||
|
processTaskMinElapseMs: 10
|
||||||
|
}
|
||||||
|
);
|
||||||
|
(reliableChannelBob as any).random = () => {
|
||||||
|
return 1;
|
||||||
|
}; // will wait a full second
|
||||||
|
|
||||||
|
let syncMessageSent = false;
|
||||||
|
reliableChannelBob.messageChannel.addEventListener(
|
||||||
|
MessageChannelEvent.OutSyncSent,
|
||||||
|
(_event) => {
|
||||||
|
syncMessageSent = true;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
while (!syncMessageSent) {
|
||||||
|
// Bob will send a sync message as soon as it started, we are waiting for this one
|
||||||
|
await delay(100);
|
||||||
|
}
|
||||||
|
// Let's reset the tracker
|
||||||
|
syncMessageSent = false;
|
||||||
|
// We should be faster than Bob as Bob will "randomly" wait a full second
|
||||||
|
await reliableChannelAlice["sendSyncMessage"]();
|
||||||
|
|
||||||
|
// Bob should be waiting a full second before sending a message after Alice
|
||||||
|
await delay(900);
|
||||||
|
|
||||||
|
// Now, let's wait Bob to send the sync message
|
||||||
|
await delay(200);
|
||||||
|
expect(syncMessageSent).to.be.true;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Sync message is not sent if another non-ephemeral message was just received", async function () {
|
||||||
|
this.timeout(5000);
|
||||||
|
|
||||||
|
const commonEventEmitter = new TypedEventEmitter<MockWakuEvents>();
|
||||||
|
const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter);
|
||||||
|
const mockWakuNodeBob = new MockWakuNode(commonEventEmitter);
|
||||||
|
|
||||||
|
const syncMinIntervalMs = 1000;
|
||||||
|
|
||||||
|
const reliableChannelAlice = await ReliableChannel.create(
|
||||||
|
mockWakuNodeAlice,
|
||||||
|
"MyChannel",
|
||||||
|
"alice",
|
||||||
|
encoder,
|
||||||
|
decoder,
|
||||||
|
{
|
||||||
|
syncMinIntervalMs: 0, // does not send sync messages automatically
|
||||||
|
processTaskMinElapseMs: 10
|
||||||
|
}
|
||||||
|
);
|
||||||
|
const reliableChannelBob = await ReliableChannel.create(
|
||||||
|
mockWakuNodeBob,
|
||||||
|
"MyChannel",
|
||||||
|
"bob",
|
||||||
|
encoder,
|
||||||
|
decoder,
|
||||||
|
{
|
||||||
|
syncMinIntervalMs,
|
||||||
|
processTaskMinElapseMs: 10
|
||||||
|
}
|
||||||
|
);
|
||||||
|
(reliableChannelBob as any).random = () => {
|
||||||
|
return 1;
|
||||||
|
}; // will wait a full second
|
||||||
|
|
||||||
|
let syncMessageSent = false;
|
||||||
|
reliableChannelBob.messageChannel.addEventListener(
|
||||||
|
MessageChannelEvent.OutSyncSent,
|
||||||
|
(_event) => {
|
||||||
|
syncMessageSent = true;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
while (!syncMessageSent) {
|
||||||
|
// Bob will send a sync message as soon as it started, we are waiting for this one
|
||||||
|
await delay(100);
|
||||||
|
}
|
||||||
|
// Let's reset the tracker
|
||||||
|
syncMessageSent = false;
|
||||||
|
// We should be faster than Bob as Bob will "randomly" wait a full second
|
||||||
|
reliableChannelAlice.send(utf8ToBytes("some message"));
|
||||||
|
|
||||||
|
// Bob should be waiting a full second before sending a message after Alice
|
||||||
|
await delay(900);
|
||||||
|
|
||||||
|
// Now, let's wait Bob to send the sync message
|
||||||
|
await delay(200);
|
||||||
|
expect(syncMessageSent).to.be.true;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Sync message is not sent if another sync message was just sent", async function () {
|
||||||
|
this.timeout(5000);
|
||||||
|
const syncMinIntervalMs = 1000;
|
||||||
|
|
||||||
|
const reliableChannel = await ReliableChannel.create(
|
||||||
|
mockWakuNode,
|
||||||
|
"MyChannel",
|
||||||
|
"alice",
|
||||||
|
encoder,
|
||||||
|
decoder,
|
||||||
|
{ syncMinIntervalMs }
|
||||||
|
);
|
||||||
|
(reliableChannel as any).random = () => {
|
||||||
|
return 1;
|
||||||
|
}; // will wait a full second
|
||||||
|
|
||||||
|
let syncMessageSent = false;
|
||||||
|
reliableChannel.messageChannel.addEventListener(
|
||||||
|
MessageChannelEvent.OutSyncSent,
|
||||||
|
(_event) => {
|
||||||
|
syncMessageSent = true;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
while (!syncMessageSent) {
|
||||||
|
// Will send a sync message as soon as it started, we are waiting for this one
|
||||||
|
await delay(100);
|
||||||
|
}
|
||||||
|
// Let's reset the tracker
|
||||||
|
syncMessageSent = false;
|
||||||
|
// We should be faster than automated sync as it will "randomly" wait a full second
|
||||||
|
await reliableChannel["sendSyncMessage"]();
|
||||||
|
|
||||||
|
// should be waiting a full second before sending a message after Alice
|
||||||
|
await delay(900);
|
||||||
|
|
||||||
|
// Now, let's wait to send the automated sync message
|
||||||
|
await delay(200);
|
||||||
|
expect(syncMessageSent).to.be.true;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Sync message is not sent if another non-ephemeral message was just sent", async function () {
|
||||||
|
this.timeout(5000);
|
||||||
|
const syncMinIntervalMs = 1000;
|
||||||
|
|
||||||
|
const reliableChannel = await ReliableChannel.create(
|
||||||
|
mockWakuNode,
|
||||||
|
"MyChannel",
|
||||||
|
"alice",
|
||||||
|
encoder,
|
||||||
|
decoder,
|
||||||
|
{ syncMinIntervalMs }
|
||||||
|
);
|
||||||
|
(reliableChannel as any).random = () => {
|
||||||
|
return 1;
|
||||||
|
}; // will wait a full second
|
||||||
|
|
||||||
|
let syncMessageSent = false;
|
||||||
|
reliableChannel.messageChannel.addEventListener(
|
||||||
|
MessageChannelEvent.OutSyncSent,
|
||||||
|
(_event) => {
|
||||||
|
syncMessageSent = true;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
while (!syncMessageSent) {
|
||||||
|
// Will send a sync message as soon as it started, we are waiting for this one
|
||||||
|
await delay(100);
|
||||||
|
}
|
||||||
|
// Let's reset the tracker
|
||||||
|
syncMessageSent = false;
|
||||||
|
// We should be faster than automated sync as it will "randomly" wait a full second
|
||||||
|
reliableChannel.send(utf8ToBytes("non-ephemeral message"));
|
||||||
|
|
||||||
|
// should be waiting a full second before sending a message after Alice
|
||||||
|
await delay(900);
|
||||||
|
|
||||||
|
// Now, let's wait to send the automated sync message
|
||||||
|
await delay(200);
|
||||||
|
expect(syncMessageSent).to.be.true;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Own sync message does not acknowledge own messages", async () => {
|
||||||
|
const syncMinIntervalMs = 100;
|
||||||
|
const reliableChannel = await ReliableChannel.create(
|
||||||
|
mockWakuNode,
|
||||||
|
"MyChannel",
|
||||||
|
"alice",
|
||||||
|
encoder,
|
||||||
|
decoder,
|
||||||
|
{
|
||||||
|
syncMinIntervalMs
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
const msg = utf8ToBytes("some message");
|
||||||
|
const msgId = ReliableChannel.getMessageId(msg);
|
||||||
|
|
||||||
|
let messageAcknowledged = false;
|
||||||
|
reliableChannel.messageChannel.addEventListener(
|
||||||
|
MessageChannelEvent.OutMessageAcknowledged,
|
||||||
|
(event) => {
|
||||||
|
if (event.detail === msgId) messageAcknowledged = true;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
reliableChannel.send(msg);
|
||||||
|
|
||||||
|
await delay(syncMinIntervalMs * 2);
|
||||||
|
|
||||||
|
// There is randomness to this, but it should not be excessive
|
||||||
|
expect(messageAcknowledged).to.be.false;
|
||||||
|
});
|
||||||
|
});
|
||||||
48
packages/sdk/src/reliable_channel/retry_manager.spec.ts
Normal file
48
packages/sdk/src/reliable_channel/retry_manager.spec.ts
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
import { delay } from "@waku/utils";
|
||||||
|
import { expect } from "chai";
|
||||||
|
|
||||||
|
import { RetryManager } from "./retry_manager.js";
|
||||||
|
|
||||||
|
describe("Retry Manager", () => {
|
||||||
|
it("Retries within given interval", async function () {
|
||||||
|
const retryManager = new RetryManager(100, 1);
|
||||||
|
|
||||||
|
let retryCount = 0;
|
||||||
|
retryManager.startRetries("1", () => {
|
||||||
|
retryCount++;
|
||||||
|
});
|
||||||
|
|
||||||
|
await delay(110);
|
||||||
|
|
||||||
|
expect(retryCount).to.equal(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Retries within maximum given attempts", async function () {
|
||||||
|
const maxAttempts = 5;
|
||||||
|
const retryManager = new RetryManager(10, maxAttempts);
|
||||||
|
|
||||||
|
let retryCount = 0;
|
||||||
|
retryManager.startRetries("1", () => {
|
||||||
|
retryCount++;
|
||||||
|
});
|
||||||
|
|
||||||
|
await delay(200);
|
||||||
|
|
||||||
|
expect(retryCount).to.equal(maxAttempts);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Wait given interval before re-trying", async function () {
|
||||||
|
const retryManager = new RetryManager(100, 1);
|
||||||
|
|
||||||
|
let retryCount = 0;
|
||||||
|
retryManager.startRetries("1", () => {
|
||||||
|
retryCount++;
|
||||||
|
});
|
||||||
|
|
||||||
|
await delay(90);
|
||||||
|
expect(retryCount).to.equal(0);
|
||||||
|
|
||||||
|
await delay(110);
|
||||||
|
expect(retryCount).to.equal(1);
|
||||||
|
});
|
||||||
|
});
|
||||||
51
packages/sdk/src/reliable_channel/retry_manager.ts
Normal file
51
packages/sdk/src/reliable_channel/retry_manager.ts
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
export class RetryManager {
|
||||||
|
private timeouts: Map<string, ReturnType<typeof setTimeout>>;
|
||||||
|
|
||||||
|
public constructor(
|
||||||
|
// TODO: back-off strategy
|
||||||
|
private retryIntervalMs: number,
|
||||||
|
private maxRetryNumber: number
|
||||||
|
) {
|
||||||
|
this.timeouts = new Map();
|
||||||
|
|
||||||
|
if (
|
||||||
|
!retryIntervalMs ||
|
||||||
|
retryIntervalMs <= 0 ||
|
||||||
|
!maxRetryNumber ||
|
||||||
|
maxRetryNumber <= 0
|
||||||
|
) {
|
||||||
|
throw Error(
|
||||||
|
`Invalid retryIntervalMs ${retryIntervalMs} or maxRetryNumber ${maxRetryNumber} values`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public stopRetries(id: string): void {
|
||||||
|
const timeout = this.timeouts.get(id);
|
||||||
|
if (timeout) {
|
||||||
|
clearTimeout(timeout);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public startRetries(id: string, retry: () => void | Promise<void>): void {
|
||||||
|
this.retry(id, retry, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private retry(
|
||||||
|
id: string,
|
||||||
|
retry: () => void | Promise<void>,
|
||||||
|
attemptNumber: number
|
||||||
|
): void {
|
||||||
|
clearTimeout(this.timeouts.get(id));
|
||||||
|
if (attemptNumber < this.maxRetryNumber) {
|
||||||
|
const interval = setTimeout(() => {
|
||||||
|
void retry();
|
||||||
|
|
||||||
|
// Register for next retry until we are told to stop;
|
||||||
|
this.retry(id, retry, ++attemptNumber);
|
||||||
|
}, this.retryIntervalMs);
|
||||||
|
|
||||||
|
this.timeouts.set(id, interval);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -50,7 +50,7 @@ describe("Message serialization", () => {
|
|||||||
const bytes = message.encode();
|
const bytes = message.encode();
|
||||||
const decMessage = Message.decode(bytes);
|
const decMessage = Message.decode(bytes);
|
||||||
|
|
||||||
expect(decMessage.causalHistory).to.deep.equal([
|
expect(decMessage!.causalHistory).to.deep.equal([
|
||||||
{ messageId: depMessageId, retrievalHint: depRetrievalHint }
|
{ messageId: depMessageId, retrievalHint: depRetrievalHint }
|
||||||
]);
|
]);
|
||||||
});
|
});
|
||||||
|
|||||||
@ -1,10 +1,13 @@
|
|||||||
import { proto_sds_message } from "@waku/proto";
|
import { proto_sds_message } from "@waku/proto";
|
||||||
|
import { Logger } from "@waku/utils";
|
||||||
|
|
||||||
export type MessageId = string;
|
export type MessageId = string;
|
||||||
export type HistoryEntry = proto_sds_message.HistoryEntry;
|
export type HistoryEntry = proto_sds_message.HistoryEntry;
|
||||||
export type ChannelId = string;
|
export type ChannelId = string;
|
||||||
export type SenderId = string;
|
export type SenderId = string;
|
||||||
|
|
||||||
|
const log = new Logger("sds:message");
|
||||||
|
|
||||||
export class Message implements proto_sds_message.SdsMessage {
|
export class Message implements proto_sds_message.SdsMessage {
|
||||||
public constructor(
|
public constructor(
|
||||||
public messageId: string,
|
public messageId: string,
|
||||||
@ -24,7 +27,9 @@ export class Message implements proto_sds_message.SdsMessage {
|
|||||||
return proto_sds_message.SdsMessage.encode(this);
|
return proto_sds_message.SdsMessage.encode(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static decode(data: Uint8Array): Message {
|
public static decode(
|
||||||
|
data: Uint8Array
|
||||||
|
): undefined | ContentMessage | SyncMessage | EphemeralMessage {
|
||||||
const {
|
const {
|
||||||
messageId,
|
messageId,
|
||||||
channelId,
|
channelId,
|
||||||
@ -34,15 +39,48 @@ export class Message implements proto_sds_message.SdsMessage {
|
|||||||
bloomFilter,
|
bloomFilter,
|
||||||
content
|
content
|
||||||
} = proto_sds_message.SdsMessage.decode(data);
|
} = proto_sds_message.SdsMessage.decode(data);
|
||||||
return new Message(
|
|
||||||
messageId,
|
if (testContentMessage({ lamportTimestamp, content })) {
|
||||||
channelId,
|
return new ContentMessage(
|
||||||
senderId,
|
messageId,
|
||||||
causalHistory,
|
channelId,
|
||||||
|
senderId,
|
||||||
|
causalHistory,
|
||||||
|
lamportTimestamp!,
|
||||||
|
bloomFilter,
|
||||||
|
content!
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (testEphemeralMessage({ lamportTimestamp, content })) {
|
||||||
|
return new EphemeralMessage(
|
||||||
|
messageId,
|
||||||
|
channelId,
|
||||||
|
senderId,
|
||||||
|
causalHistory,
|
||||||
|
undefined,
|
||||||
|
bloomFilter,
|
||||||
|
content!
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (testSyncMessage({ lamportTimestamp, content })) {
|
||||||
|
return new SyncMessage(
|
||||||
|
messageId,
|
||||||
|
channelId,
|
||||||
|
senderId,
|
||||||
|
causalHistory,
|
||||||
|
lamportTimestamp!,
|
||||||
|
bloomFilter,
|
||||||
|
undefined
|
||||||
|
);
|
||||||
|
}
|
||||||
|
log.error(
|
||||||
|
"message received was of unknown type",
|
||||||
lamportTimestamp,
|
lamportTimestamp,
|
||||||
bloomFilter,
|
|
||||||
content
|
content
|
||||||
);
|
);
|
||||||
|
return undefined;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -73,9 +111,10 @@ export class SyncMessage extends Message {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export function isSyncMessage(
|
function testSyncMessage(message: {
|
||||||
message: Message | ContentMessage | SyncMessage | EphemeralMessage
|
lamportTimestamp?: number;
|
||||||
): message is SyncMessage {
|
content?: Uint8Array;
|
||||||
|
}): boolean {
|
||||||
return Boolean(
|
return Boolean(
|
||||||
"lamportTimestamp" in message &&
|
"lamportTimestamp" in message &&
|
||||||
typeof message.lamportTimestamp === "number" &&
|
typeof message.lamportTimestamp === "number" &&
|
||||||
@ -83,6 +122,12 @@ export function isSyncMessage(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function isSyncMessage(
|
||||||
|
message: Message | ContentMessage | SyncMessage | EphemeralMessage
|
||||||
|
): message is SyncMessage {
|
||||||
|
return testSyncMessage(message);
|
||||||
|
}
|
||||||
|
|
||||||
export class EphemeralMessage extends Message {
|
export class EphemeralMessage extends Message {
|
||||||
public constructor(
|
public constructor(
|
||||||
public messageId: string,
|
public messageId: string,
|
||||||
@ -116,6 +161,13 @@ export class EphemeralMessage extends Message {
|
|||||||
export function isEphemeralMessage(
|
export function isEphemeralMessage(
|
||||||
message: Message | ContentMessage | SyncMessage | EphemeralMessage
|
message: Message | ContentMessage | SyncMessage | EphemeralMessage
|
||||||
): message is EphemeralMessage {
|
): message is EphemeralMessage {
|
||||||
|
return testEphemeralMessage(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
function testEphemeralMessage(message: {
|
||||||
|
lamportTimestamp?: number;
|
||||||
|
content?: Uint8Array;
|
||||||
|
}): boolean {
|
||||||
return Boolean(
|
return Boolean(
|
||||||
message.lamportTimestamp === undefined &&
|
message.lamportTimestamp === undefined &&
|
||||||
"content" in message &&
|
"content" in message &&
|
||||||
@ -166,6 +218,13 @@ export class ContentMessage extends Message {
|
|||||||
export function isContentMessage(
|
export function isContentMessage(
|
||||||
message: Message | ContentMessage
|
message: Message | ContentMessage
|
||||||
): message is ContentMessage {
|
): message is ContentMessage {
|
||||||
|
return testContentMessage(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
function testContentMessage(message: {
|
||||||
|
lamportTimestamp?: number;
|
||||||
|
content?: Uint8Array;
|
||||||
|
}): message is { lamportTimestamp: number; content: Uint8Array } {
|
||||||
return Boolean(
|
return Boolean(
|
||||||
"lamportTimestamp" in message &&
|
"lamportTimestamp" in message &&
|
||||||
typeof message.lamportTimestamp === "number" &&
|
typeof message.lamportTimestamp === "number" &&
|
||||||
|
|||||||
@ -40,7 +40,7 @@ const sendMessage = async (
|
|||||||
payload: Uint8Array,
|
payload: Uint8Array,
|
||||||
callback: (message: ContentMessage) => Promise<{ success: boolean }>
|
callback: (message: ContentMessage) => Promise<{ success: boolean }>
|
||||||
): Promise<void> => {
|
): Promise<void> => {
|
||||||
await channel.pushOutgoingMessage(payload, callback);
|
channel.pushOutgoingMessage(payload, callback);
|
||||||
await channel.processTasks();
|
await channel.processTasks();
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -292,14 +292,12 @@ describe("MessageChannel", function () {
|
|||||||
);
|
);
|
||||||
|
|
||||||
const localHistory = channelA["localHistory"] as ILocalHistory;
|
const localHistory = channelA["localHistory"] as ILocalHistory;
|
||||||
console.log("localHistory", localHistory);
|
|
||||||
expect(localHistory.length).to.equal(1);
|
expect(localHistory.length).to.equal(1);
|
||||||
|
|
||||||
// Find the message in local history
|
// Find the message in local history
|
||||||
const historyEntry = localHistory.find(
|
const historyEntry = localHistory.find(
|
||||||
(entry) => entry.messageId === messageId
|
(entry) => entry.messageId === messageId
|
||||||
);
|
);
|
||||||
console.log("history entry", historyEntry);
|
|
||||||
expect(historyEntry).to.exist;
|
expect(historyEntry).to.exist;
|
||||||
expect(historyEntry!.retrievalHint).to.deep.equal(testRetrievalHint);
|
expect(historyEntry!.retrievalHint).to.deep.equal(testRetrievalHint);
|
||||||
});
|
});
|
||||||
@ -596,7 +594,6 @@ describe("MessageChannel", function () {
|
|||||||
it("First message is missed, then re-sent, should be ack'd", async () => {
|
it("First message is missed, then re-sent, should be ack'd", async () => {
|
||||||
const firstMessage = utf8ToBytes("first message");
|
const firstMessage = utf8ToBytes("first message");
|
||||||
const firstMessageId = MessageChannel.getMessageId(firstMessage);
|
const firstMessageId = MessageChannel.getMessageId(firstMessage);
|
||||||
console.log("firstMessage", firstMessageId);
|
|
||||||
let messageAcked = false;
|
let messageAcked = false;
|
||||||
channelA.addEventListener(
|
channelA.addEventListener(
|
||||||
MessageChannelEvent.OutMessageAcknowledged,
|
MessageChannelEvent.OutMessageAcknowledged,
|
||||||
|
|||||||
@ -174,13 +174,13 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
|||||||
*
|
*
|
||||||
* @throws Error if the payload is empty
|
* @throws Error if the payload is empty
|
||||||
*/
|
*/
|
||||||
public async pushOutgoingMessage(
|
public pushOutgoingMessage(
|
||||||
payload: Uint8Array,
|
payload: Uint8Array,
|
||||||
callback?: (processedMessage: ContentMessage) => Promise<{
|
callback?: (processedMessage: ContentMessage) => Promise<{
|
||||||
success: boolean;
|
success: boolean;
|
||||||
retrievalHint?: Uint8Array;
|
retrievalHint?: Uint8Array;
|
||||||
}>
|
}>
|
||||||
): Promise<void> {
|
): void {
|
||||||
if (!payload || !payload.length) {
|
if (!payload || !payload.length) {
|
||||||
throw Error("Only messages with valid payloads are allowed");
|
throw Error("Only messages with valid payloads are allowed");
|
||||||
}
|
}
|
||||||
@ -285,6 +285,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
|||||||
}
|
}
|
||||||
log.info(
|
log.info(
|
||||||
this.senderId,
|
this.senderId,
|
||||||
|
"message from incoming buffer",
|
||||||
message.messageId,
|
message.messageId,
|
||||||
"is missing dependencies",
|
"is missing dependencies",
|
||||||
missingDependencies.map(({ messageId, retrievalHint }) => {
|
missingDependencies.map(({ messageId, retrievalHint }) => {
|
||||||
@ -470,10 +471,15 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
|||||||
this.timeReceived.set(message.messageId, Date.now());
|
this.timeReceived.set(message.messageId, Date.now());
|
||||||
log.info(
|
log.info(
|
||||||
this.senderId,
|
this.senderId,
|
||||||
|
"new incoming message",
|
||||||
message.messageId,
|
message.messageId,
|
||||||
"is missing dependencies",
|
"is missing dependencies",
|
||||||
missingDependencies.map((ch) => ch.messageId)
|
missingDependencies.map((ch) => ch.messageId)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
this.safeSendEvent(MessageChannelEvent.InMessageMissing, {
|
||||||
|
detail: Array.from(missingDependencies)
|
||||||
|
});
|
||||||
} else {
|
} else {
|
||||||
if (isContentMessage(message) && this.deliverMessage(message)) {
|
if (isContentMessage(message) && this.deliverMessage(message)) {
|
||||||
this.safeSendEvent(MessageChannelEvent.InMessageDelivered, {
|
this.safeSendEvent(MessageChannelEvent.InMessageDelivered, {
|
||||||
|
|||||||
@ -7,3 +7,4 @@ export * from "./sharding/index.js";
|
|||||||
export * from "./push_or_init_map.js";
|
export * from "./push_or_init_map.js";
|
||||||
export * from "./relay_shard_codec.js";
|
export * from "./relay_shard_codec.js";
|
||||||
export * from "./delay.js";
|
export * from "./delay.js";
|
||||||
|
export * from "./mock_node.js";
|
||||||
|
|||||||
166
packages/utils/src/common/mock_node.ts
Normal file
166
packages/utils/src/common/mock_node.ts
Normal file
@ -0,0 +1,166 @@
|
|||||||
|
import { Peer, PeerId, Stream, TypedEventEmitter } from "@libp2p/interface";
|
||||||
|
import { MultiaddrInput } from "@multiformats/multiaddr";
|
||||||
|
import {
|
||||||
|
Callback,
|
||||||
|
CreateDecoderParams,
|
||||||
|
CreateEncoderParams,
|
||||||
|
HealthStatus,
|
||||||
|
IDecodedMessage,
|
||||||
|
IDecoder,
|
||||||
|
IEncoder,
|
||||||
|
IFilter,
|
||||||
|
ILightPush,
|
||||||
|
type IMessage,
|
||||||
|
IRelay,
|
||||||
|
ISendOptions,
|
||||||
|
IStore,
|
||||||
|
IWaku,
|
||||||
|
IWakuEventEmitter,
|
||||||
|
Libp2p,
|
||||||
|
LightPushSDKResult,
|
||||||
|
Protocols
|
||||||
|
} from "@waku/interfaces";
|
||||||
|
|
||||||
|
export type MockWakuEvents = {
|
||||||
|
["new-message"]: CustomEvent<IDecodedMessage>;
|
||||||
|
};
|
||||||
|
|
||||||
|
export class MockWakuNode implements IWaku {
|
||||||
|
public relay?: IRelay;
|
||||||
|
public store?: IStore;
|
||||||
|
public filter?: IFilter;
|
||||||
|
public lightPush?: ILightPush;
|
||||||
|
public protocols: string[];
|
||||||
|
|
||||||
|
private readonly subscriptions: {
|
||||||
|
decoders: IDecoder<any>[];
|
||||||
|
callback: Callback<any>;
|
||||||
|
}[];
|
||||||
|
|
||||||
|
public constructor(
|
||||||
|
private mockMessageEmitter?: TypedEventEmitter<MockWakuEvents>
|
||||||
|
) {
|
||||||
|
this.protocols = [];
|
||||||
|
this.events = new TypedEventEmitter();
|
||||||
|
this.subscriptions = [];
|
||||||
|
|
||||||
|
this.lightPush = {
|
||||||
|
multicodec: [],
|
||||||
|
send: this._send.bind(this),
|
||||||
|
start(): void {},
|
||||||
|
stop(): void {}
|
||||||
|
};
|
||||||
|
|
||||||
|
this.filter = {
|
||||||
|
start: async () => {},
|
||||||
|
stop: async () => {},
|
||||||
|
multicodec: "filter",
|
||||||
|
subscribe: this._subscribe.bind(this),
|
||||||
|
unsubscribe<T extends IDecodedMessage>(
|
||||||
|
_decoders: IDecoder<T> | IDecoder<T>[]
|
||||||
|
): Promise<boolean> {
|
||||||
|
throw "Not implemented";
|
||||||
|
},
|
||||||
|
unsubscribeAll(): void {
|
||||||
|
throw "Not implemented";
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
public get libp2p(): Libp2p {
|
||||||
|
throw "No libp2p on MockWakuNode";
|
||||||
|
}
|
||||||
|
|
||||||
|
private async _send(
|
||||||
|
encoder: IEncoder,
|
||||||
|
message: IMessage,
|
||||||
|
_sendOptions?: ISendOptions
|
||||||
|
): Promise<LightPushSDKResult> {
|
||||||
|
for (const { decoders, callback } of this.subscriptions) {
|
||||||
|
const protoMessage = await encoder.toProtoObj(message);
|
||||||
|
if (!protoMessage) throw "Issue in mock encoding message";
|
||||||
|
for (const decoder of decoders) {
|
||||||
|
const decodedMessage = await decoder.fromProtoObj(
|
||||||
|
decoder.pubsubTopic,
|
||||||
|
protoMessage
|
||||||
|
);
|
||||||
|
if (!decodedMessage) throw "Issue in mock decoding message";
|
||||||
|
await callback(decodedMessage);
|
||||||
|
if (this.mockMessageEmitter) {
|
||||||
|
this.mockMessageEmitter.dispatchEvent(
|
||||||
|
new CustomEvent<IDecodedMessage>("new-message", {
|
||||||
|
detail: decodedMessage
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
failures: [],
|
||||||
|
successes: []
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private async _subscribe<T extends IDecodedMessage>(
|
||||||
|
decoders: IDecoder<T> | IDecoder<T>[],
|
||||||
|
callback: Callback<T>
|
||||||
|
): Promise<boolean> {
|
||||||
|
this.subscriptions.push({
|
||||||
|
decoders: Array.isArray(decoders) ? decoders : [decoders],
|
||||||
|
callback
|
||||||
|
});
|
||||||
|
if (this.mockMessageEmitter) {
|
||||||
|
this.mockMessageEmitter.addEventListener("new-message", (event) => {
|
||||||
|
void callback(event.detail as unknown as T);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return Promise.resolve(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public events: IWakuEventEmitter;
|
||||||
|
|
||||||
|
public get peerId(): PeerId {
|
||||||
|
throw "no peerId on MockWakuNode";
|
||||||
|
}
|
||||||
|
public get health(): HealthStatus {
|
||||||
|
throw "no health on MockWakuNode";
|
||||||
|
}
|
||||||
|
public dial(
|
||||||
|
_peer: PeerId | MultiaddrInput,
|
||||||
|
_protocols?: Protocols[]
|
||||||
|
): Promise<Stream> {
|
||||||
|
throw new Error("Method not implemented.");
|
||||||
|
}
|
||||||
|
public hangUp(_peer: PeerId | MultiaddrInput): Promise<boolean> {
|
||||||
|
throw new Error("Method not implemented.");
|
||||||
|
}
|
||||||
|
public start(): Promise<void> {
|
||||||
|
return Promise.resolve();
|
||||||
|
}
|
||||||
|
public stop(): Promise<void> {
|
||||||
|
throw new Error("Method not implemented.");
|
||||||
|
}
|
||||||
|
public waitForPeers(
|
||||||
|
_protocols?: Protocols[],
|
||||||
|
_timeoutMs?: number
|
||||||
|
): Promise<void> {
|
||||||
|
throw new Error("Method not implemented.");
|
||||||
|
}
|
||||||
|
public createDecoder(
|
||||||
|
_params: CreateDecoderParams
|
||||||
|
): IDecoder<IDecodedMessage> {
|
||||||
|
throw new Error("Method not implemented.");
|
||||||
|
}
|
||||||
|
public createEncoder(_params: CreateEncoderParams): IEncoder {
|
||||||
|
throw new Error("Method not implemented.");
|
||||||
|
}
|
||||||
|
public isStarted(): boolean {
|
||||||
|
throw new Error("Method not implemented.");
|
||||||
|
}
|
||||||
|
public isConnected(): boolean {
|
||||||
|
throw new Error("Method not implemented.");
|
||||||
|
}
|
||||||
|
public getConnectedPeers(): Promise<Peer[]> {
|
||||||
|
throw new Error("Method not implemented.");
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user