From 4d5c152f5b1b1c241bbe7bb96d13d927a6f7550e Mon Sep 17 00:00:00 2001 From: fryorcraken <110212804+fryorcraken@users.noreply.github.com> Date: Tue, 9 Sep 2025 12:43:48 +1000 Subject: [PATCH] feat: introduce reliable channels (#2526) * SDS: pushOutgoingMessage is actually sync * SDS: ensure that `ContentMessage` class is stored in local history with `valueOf` method * feat: introduce reliable channels Easy to use Scalable Data Sync (SDS, e2e reliability) wrapper, that includes: - store queries upon connection to store nodes - store queries to retrieve missing messages * remove `channel` prefix * attempt to improve performance when processing a lot of incoming messages * test: split test file * use index.ts for re-export only. * improve if condition * use getter for isStarted * waku node already auto-start * rename send * fix lightPush.send type post rebase * test: remove extra console.log * SDS: emit messages as missing as soon as they are received * make configurable elapse time for task process * typo * use string instead of enum for event types * ReliableChannel.send returns the message id --- package-lock.json | 102 ++- package.json | 4 +- packages/sdk/package.json | 2 + packages/sdk/src/index.ts | 1 + packages/sdk/src/reliable_channel/events.ts | 66 ++ packages/sdk/src/reliable_channel/index.ts | 2 + .../missing_message_retriever.ts | 78 ++ .../reliable_channel/reliable_channel.spec.ts | 672 +++++++++++++++++ .../src/reliable_channel/reliable_channel.ts | 676 ++++++++++++++++++ .../reliable_channel_acks.spec.ts | 187 +++++ .../reliable_channel_encryption.spec.ts | 326 +++++++++ .../reliable_channel_sync.spec.ts | 332 +++++++++ .../reliable_channel/retry_manager.spec.ts | 48 ++ .../sdk/src/reliable_channel/retry_manager.ts | 51 ++ .../sds/src/message_channel/message.spec.ts | 2 +- packages/sds/src/message_channel/message.ts | 79 +- .../message_channel/message_channel.spec.ts | 5 +- .../src/message_channel/message_channel.ts | 10 +- packages/utils/src/common/index.ts | 1 + packages/utils/src/common/mock_node.ts | 166 +++++ 20 files changed, 2789 insertions(+), 21 deletions(-) create mode 100644 packages/sdk/src/reliable_channel/events.ts create mode 100644 packages/sdk/src/reliable_channel/index.ts create mode 100644 packages/sdk/src/reliable_channel/missing_message_retriever.ts create mode 100644 packages/sdk/src/reliable_channel/reliable_channel.spec.ts create mode 100644 packages/sdk/src/reliable_channel/reliable_channel.ts create mode 100644 packages/sdk/src/reliable_channel/reliable_channel_acks.spec.ts create mode 100644 packages/sdk/src/reliable_channel/reliable_channel_encryption.spec.ts create mode 100644 packages/sdk/src/reliable_channel/reliable_channel_sync.spec.ts create mode 100644 packages/sdk/src/reliable_channel/retry_manager.spec.ts create mode 100644 packages/sdk/src/reliable_channel/retry_manager.ts create mode 100644 packages/utils/src/common/mock_node.ts diff --git a/package-lock.json b/package-lock.json index d17408b2ec..7c3c140727 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,10 +13,10 @@ "packages/core", "packages/discovery", "packages/message-encryption", - "packages/sdk", - "packages/relay", "packages/sds", "packages/rln", + "packages/sdk", + "packages/relay", "packages/tests", "packages/reliability-tests", "packages/headless-tests", @@ -37621,6 +37621,7 @@ "@waku/discovery": "0.0.11", "@waku/interfaces": "0.0.33", "@waku/proto": "^0.0.13", + "@waku/sds": "^0.0.6", "@waku/utils": "0.0.26", "libp2p": "2.8.11", "lodash.debounce": "^4.0.8" @@ -37634,6 +37635,7 @@ "@types/chai": "^4.3.11", "@types/mocha": "^10.0.9", "@waku/build-utils": "*", + "@waku/message-encryption": "^0.0.36", "chai": "^5.1.1", "cspell": "^8.6.1", "interface-datastore": "8.3.2", @@ -37654,6 +37656,102 @@ "@sinonjs/commons": "^3.0.1" } }, + "packages/sdk/node_modules/@waku/sds/node_modules/@waku/interfaces": { + "version": "0.0.32", + "resolved": "https://registry.npmjs.org/@waku/interfaces/-/interfaces-0.0.32.tgz", + "integrity": "sha512-4MNfc7ZzQCyQZR1GQQKPgHaWTuPTIvE2wo/b7iokjdeOT+ZSKyJFSetcV07cqnBwyzUv1gc53bJdzyHwVIa5Vw==", + "extraneous": true, + "license": "MIT OR Apache-2.0", + "engines": { + "node": ">=22" + } + }, + "packages/sdk/node_modules/@waku/sds/node_modules/@waku/proto": { + "version": "0.0.12", + "resolved": "https://registry.npmjs.org/@waku/proto/-/proto-0.0.12.tgz", + "integrity": "sha512-JR7wiy3Di628Ywo9qKIi7rhfdC2K7ABoaWa9WX4ZQKieYDs+YwOK+syE53VNwXrtponNeLDI0JIOFzRDalUm1A==", + "extraneous": true, + "license": "MIT OR Apache-2.0", + "dependencies": { + "protons-runtime": "^5.4.0" + }, + "engines": { + "node": ">=22" + } + }, + "packages/sdk/node_modules/@waku/sds/node_modules/@waku/utils": { + "version": "0.0.25", + "resolved": "https://registry.npmjs.org/@waku/utils/-/utils-0.0.25.tgz", + "integrity": "sha512-yCbfQ3uqByGNUvCNTj6oHi8fJ6BdVvg+Rj0y2YKrZDSNn73uTMF856lCJdsE86eqDZNCDaRaawTs3ZNEXyWaXw==", + "extraneous": true, + "license": "MIT OR Apache-2.0", + "dependencies": { + "@noble/hashes": "^1.3.2", + "@waku/interfaces": "0.0.32", + "chai": "^4.3.10", + "debug": "^4.3.4", + "uint8arrays": "^5.0.1" + }, + "engines": { + "node": ">=22" + } + }, + "packages/sdk/node_modules/@waku/sds/node_modules/assertion-error": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/assertion-error/-/assertion-error-1.1.0.tgz", + "integrity": "sha512-jgsaNduz+ndvGyFt3uSuWqvy4lCnIJiovtouQN5JZHOKCS2QuhEdbcQHFhVksz2N2U9hXJo8odG7ETyWlEeuDw==", + "extraneous": true, + "license": "MIT", + "engines": { + "node": "*" + } + }, + "packages/sdk/node_modules/@waku/sds/node_modules/check-error": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/check-error/-/check-error-1.0.3.tgz", + "integrity": "sha512-iKEoDYaRmd1mxM90a2OEfWhjsjPpYPuQ+lMYsoxB126+t8fw7ySEO48nmDg5COTjxDI65/Y2OWpeEHk3ZOe8zg==", + "extraneous": true, + "license": "MIT", + "dependencies": { + "get-func-name": "^2.0.2" + }, + "engines": { + "node": "*" + } + }, + "packages/sdk/node_modules/@waku/sds/node_modules/deep-eql": { + "version": "4.1.4", + "resolved": "https://registry.npmjs.org/deep-eql/-/deep-eql-4.1.4.tgz", + "integrity": "sha512-SUwdGfqdKOwxCPeVYjwSyRpJ7Z+fhpwIAtmCUdZIWZ/YP5R9WAsyuSgpLVDi9bjWoN2LXHNss/dk3urXtdQxGg==", + "extraneous": true, + "license": "MIT", + "dependencies": { + "type-detect": "^4.0.0" + }, + "engines": { + "node": ">=6" + } + }, + "packages/sdk/node_modules/@waku/sds/node_modules/loupe": { + "version": "2.3.7", + "resolved": "https://registry.npmjs.org/loupe/-/loupe-2.3.7.tgz", + "integrity": "sha512-zSMINGVYkdpYSOBmLi0D1Uo7JU9nVdQKrHxC8eYlV+9YKK9WePqAlL7lSlorG/U2Fw1w0hTBmaa/jrQ3UbPHtA==", + "extraneous": true, + "license": "MIT", + "dependencies": { + "get-func-name": "^2.0.1" + } + }, + "packages/sdk/node_modules/@waku/sds/node_modules/pathval": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/pathval/-/pathval-1.1.1.tgz", + "integrity": "sha512-Dp6zGqpTdETdR63lehJYPeIOqpiNBNtc7BpWSLrOje7UaIsE5aY92r/AunQA7rsXvet3lrJ3JnZX29UPTKXyKQ==", + "extraneous": true, + "license": "MIT", + "engines": { + "node": "*" + } + }, "packages/sdk/node_modules/assertion-error": { "version": "2.0.1", "dev": true, diff --git a/package.json b/package.json index 53baf7ee6b..b4551f0b68 100644 --- a/package.json +++ b/package.json @@ -10,10 +10,10 @@ "packages/core", "packages/discovery", "packages/message-encryption", - "packages/sdk", - "packages/relay", "packages/sds", "packages/rln", + "packages/sdk", + "packages/relay", "packages/tests", "packages/reliability-tests", "packages/headless-tests", diff --git a/packages/sdk/package.json b/packages/sdk/package.json index dd88c6f350..0d32c14376 100644 --- a/packages/sdk/package.json +++ b/packages/sdk/package.json @@ -72,6 +72,7 @@ "@waku/discovery": "0.0.11", "@waku/interfaces": "0.0.33", "@waku/proto": "^0.0.13", + "@waku/sds": "^0.0.6", "@waku/utils": "0.0.26", "libp2p": "2.8.11", "lodash.debounce": "^4.0.8" @@ -85,6 +86,7 @@ "@types/chai": "^4.3.11", "@types/mocha": "^10.0.9", "@waku/build-utils": "*", + "@waku/message-encryption": "^0.0.36", "chai": "^5.1.1", "cspell": "^8.6.1", "interface-datastore": "8.3.2", diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index 441cba30ea..8c0546b16c 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -17,6 +17,7 @@ export { export { LightPush } from "./light_push/index.js"; export { Filter } from "./filter/index.js"; export { Store } from "./store/index.js"; +export * from "./reliable_channel/index.js"; export * as waku from "@waku/core"; export * as utils from "@waku/utils"; diff --git a/packages/sdk/src/reliable_channel/events.ts b/packages/sdk/src/reliable_channel/events.ts new file mode 100644 index 0000000000..c79c2c0c0f --- /dev/null +++ b/packages/sdk/src/reliable_channel/events.ts @@ -0,0 +1,66 @@ +import { IDecodedMessage, ProtocolError } from "@waku/interfaces"; +import type { HistoryEntry, MessageId } from "@waku/sds"; + +export const ReliableChannelEvent = { + /** + * The message is being sent over the wire. + * + * This event may be emitted several times if the retry mechanism kicks in. + */ + SendingMessage: "sending-message", + /** + * The message has been sent over the wire but has not been acknowledged by + * any other party yet. + * + * We are now waiting for acknowledgements. + * + * This event may be emitted several times if the + * several times if the retry mechanisms kicks in. + */ + MessageSent: "message-sent", + /** + * A received bloom filter seems to indicate that the messages was received + * by another party. + * + * However, this is probabilistic. The retry mechanism will wait a bit longer + * before trying to send the message again. + */ + MessagePossiblyAcknowledged: "message-possibly-acknowledged", + /** + * The message was fully acknowledged by other members of the channel + */ + MessageAcknowledged: "message-acknowledged", + /** + * It was not possible to send the messages due to a non-recoverable error, + * most likely an internal error for a developer to resolve. + */ + SendingMessageIrrecoverableError: "sending-message-irrecoverable-error", + /** + * A new message has been received. + */ + MessageReceived: "message-received", + /** + * We are aware of a missing message but failed to retrieve it successfully. + */ + IrretrievableMessage: "irretrievable-message" +}; + +export type ReliableChannelEvent = + (typeof ReliableChannelEvent)[keyof typeof ReliableChannelEvent]; + +export interface ReliableChannelEvents { + "sending-message": CustomEvent; + "message-sent": CustomEvent; + "message-possibly-acknowledged": CustomEvent<{ + messageId: MessageId; + possibleAckCount: number; + }>; + "message-acknowledged": CustomEvent; + // TODO probably T extends IDecodedMessage? + "message-received": CustomEvent; + "irretrievable-message": CustomEvent; + "sending-message-irrecoverable-error": CustomEvent<{ + messageId: MessageId; + error: ProtocolError; + }>; +} diff --git a/packages/sdk/src/reliable_channel/index.ts b/packages/sdk/src/reliable_channel/index.ts new file mode 100644 index 0000000000..60622414bf --- /dev/null +++ b/packages/sdk/src/reliable_channel/index.ts @@ -0,0 +1,2 @@ +export { ReliableChannel, ReliableChannelOptions } from "./reliable_channel.js"; +export { ReliableChannelEvents, ReliableChannelEvent } from "./events.js"; diff --git a/packages/sdk/src/reliable_channel/missing_message_retriever.ts b/packages/sdk/src/reliable_channel/missing_message_retriever.ts new file mode 100644 index 0000000000..f5f1cb3503 --- /dev/null +++ b/packages/sdk/src/reliable_channel/missing_message_retriever.ts @@ -0,0 +1,78 @@ +import type { + IDecodedMessage, + IDecoder, + QueryRequestParams +} from "@waku/interfaces"; +import type { MessageId } from "@waku/sds"; +import { Logger } from "@waku/utils"; + +const log = new Logger("sdk:missing-message-retriever"); + +const DEFAULT_RETRIEVE_FREQUENCY_MS = 10 * 1000; // 10 seconds + +export class MissingMessageRetriever { + private retrieveInterval: ReturnType | undefined; + private missingMessages: Map>; // Waku Message Ids + + public constructor( + private readonly decoder: IDecoder, + private readonly retrieveFrequencyMs: number = DEFAULT_RETRIEVE_FREQUENCY_MS, + private readonly _retrieve: ( + decoders: IDecoder[], + options?: Partial + ) => AsyncGenerator[]>, + private readonly onMessageRetrieved?: (message: T) => Promise + ) { + this.missingMessages = new Map(); + } + + public start(): void { + if (this.retrieveInterval) { + clearInterval(this.retrieveInterval); + } + if (this.retrieveFrequencyMs !== 0) { + log.info(`start retrieve loop every ${this.retrieveFrequencyMs}ms`); + this.retrieveInterval = setInterval(() => { + void this.retrieveMissingMessage(); + }, this.retrieveFrequencyMs); + } + } + + public stop(): void { + if (this.retrieveInterval) { + clearInterval(this.retrieveInterval); + } + } + + public addMissingMessage( + messageId: MessageId, + retrievalHint: Uint8Array + ): void { + if (!this.missingMessages.has(messageId)) { + log.info("missing message notice", messageId, retrievalHint); + this.missingMessages.set(messageId, retrievalHint); + } + } + + public removeMissingMessage(messageId: MessageId): void { + if (this.missingMessages.has(messageId)) { + this.missingMessages.delete(messageId); + } + } + + private async retrieveMissingMessage(): Promise { + if (this.missingMessages.size) { + const messageHashes = Array.from(this.missingMessages.values()); + log.info("attempting to retrieve missing message", messageHashes.length); + for await (const page of this._retrieve([this.decoder], { + messageHashes + })) { + for await (const msg of page) { + if (msg && this.onMessageRetrieved) { + await this.onMessageRetrieved(msg); + } + } + } + } + } +} diff --git a/packages/sdk/src/reliable_channel/reliable_channel.spec.ts b/packages/sdk/src/reliable_channel/reliable_channel.spec.ts new file mode 100644 index 0000000000..c54d34b806 --- /dev/null +++ b/packages/sdk/src/reliable_channel/reliable_channel.spec.ts @@ -0,0 +1,672 @@ +import { PeerId, TypedEventEmitter } from "@libp2p/interface"; +import { createDecoder, createEncoder } from "@waku/core"; +import { + AutoSharding, + HealthStatus, + IDecodedMessage, + IDecoder, + IEncoder, + type IMessage, + ISendOptions, + IWaku, + LightPushError, + LightPushSDKResult, + QueryRequestParams +} from "@waku/interfaces"; +import { ContentMessage } from "@waku/sds"; +import { + createRoutingInfo, + delay, + MockWakuEvents, + MockWakuNode +} from "@waku/utils"; +import { bytesToUtf8, hexToBytes, utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; +import { beforeEach, describe } from "mocha"; +import sinon from "sinon"; + +import { ReliableChannel } from "./index.js"; + +const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto"; +const TEST_NETWORK_CONFIG: AutoSharding = { + clusterId: 0, + numShardsInCluster: 1 +}; +const TEST_ROUTING_INFO = createRoutingInfo(TEST_NETWORK_CONFIG, { + contentTopic: TEST_CONTENT_TOPIC +}); + +describe("Reliable Channel", () => { + let mockWakuNode: IWaku; + let encoder: IEncoder; + let decoder: IDecoder; + + beforeEach(async () => { + mockWakuNode = new MockWakuNode(); + encoder = createEncoder({ + contentTopic: TEST_CONTENT_TOPIC, + routingInfo: TEST_ROUTING_INFO + }); + decoder = createDecoder(TEST_CONTENT_TOPIC, TEST_ROUTING_INFO); + }); + + it("Outgoing message is emitted as sending", async () => { + const reliableChannel = await ReliableChannel.create( + mockWakuNode, + "MyChannel", + "alice", + encoder, + decoder + ); + + const message = utf8ToBytes("message in channel"); + + // Setting up message tracking + const messageId = reliableChannel.send(message); + let messageSending = false; + reliableChannel.addEventListener("sending-message", (event) => { + if (event.detail === messageId) { + messageSending = true; + } + }); + + while (!messageSending) { + await delay(50); + } + + expect(messageSending).to.be.true; + }); + + it("Outgoing message is emitted as sent", async () => { + const reliableChannel = await ReliableChannel.create( + mockWakuNode, + "MyChannel", + "alice", + encoder, + decoder + ); + + const message = utf8ToBytes("message in channel"); + + const messageId = reliableChannel.send(message); + + // Setting up message tracking + let messageSent = false; + reliableChannel.addEventListener("message-sent", (event) => { + if (event.detail === messageId) { + messageSent = true; + } + }); + + while (!messageSent) { + await delay(50); + } + + expect(messageSent).to.be.true; + }); + + it("Encoder error raises irrecoverable error", async () => { + mockWakuNode.lightPush!.send = ( + _encoder: IEncoder, + _message: IMessage, + _sendOptions?: ISendOptions + ): Promise => { + return Promise.resolve({ + failures: [{ error: LightPushError.EMPTY_PAYLOAD }], + successes: [] + }); + }; + + const reliableChannel = await ReliableChannel.create( + mockWakuNode, + "MyChannel", + "alice", + encoder, + decoder + ); + + const message = utf8ToBytes("payload doesnt matter"); + + encoder.contentTopic = "..."; + const messageId = reliableChannel.send(message); + + // Setting up message tracking + let irrecoverableError = false; + reliableChannel.addEventListener( + "sending-message-irrecoverable-error", + (event) => { + if (event.detail.messageId === messageId) { + irrecoverableError = true; + } + } + ); + + while (!irrecoverableError) { + await delay(50); + } + + expect(irrecoverableError).to.be.true; + }); + + it("Outgoing message is not emitted as acknowledged from own outgoing messages", async () => { + const reliableChannel = await ReliableChannel.create( + mockWakuNode, + "MyChannel", + "alice", + encoder, + decoder + ); + + const message = utf8ToBytes("first message in channel"); + + // Setting up message tracking + const messageId = ReliableChannel.getMessageId(message); + let messageAcknowledged = false; + reliableChannel.addEventListener("message-acknowledged", (event) => { + if (event.detail === messageId) { + messageAcknowledged = true; + } + }); + + reliableChannel.send(message); + + // Sending a second message from the same node should not acknowledge the first one + reliableChannel.send(utf8ToBytes("second message in channel")); + + expect(messageAcknowledged).to.be.false; + }); + + it("Outgoing message is possibly acknowledged", async () => { + const commonEventEmitter = new TypedEventEmitter(); + const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter); + const mockWakuNodeBob = new MockWakuNode(commonEventEmitter); + + const reliableChannelAlice = await ReliableChannel.create( + mockWakuNodeAlice, + "MyChannel", + "alice", + encoder, + decoder + ); + const reliableChannelBob = await ReliableChannel.create( + mockWakuNodeBob, + "MyChannel", + "bob", + encoder, + decoder, + // Bob only includes one message in causal history + { causalHistorySize: 1 } + ); + + const messages = ["first", "second", "third"].map((m) => { + return utf8ToBytes(m); + }); + + // Alice sets up message tracking for first message + const firstMessageId = ReliableChannel.getMessageId(messages[0]); + let firstMessagePossiblyAcknowledged = false; + reliableChannelAlice.addEventListener( + "message-possibly-acknowledged", + (event) => { + if (event.detail.messageId === firstMessageId) { + firstMessagePossiblyAcknowledged = true; + } + } + ); + + let messageReceived = false; + reliableChannelBob.addEventListener("message-received", (event) => { + if (bytesToUtf8(event.detail.payload) === "third") { + messageReceived = true; + } + }); + + for (const m of messages) { + reliableChannelAlice.send(m); + } + + // Wait for Bob to receive last message to ensure it is all included in filter + while (!messageReceived) { + await delay(50); + } + + // Bobs sends a message now, it should include first one in bloom filter + reliableChannelBob.send(utf8ToBytes("message back")); + while (!firstMessagePossiblyAcknowledged) { + await delay(50); + } + + expect(firstMessagePossiblyAcknowledged).to.be.true; + }); + + it("Outgoing message is acknowledged", async () => { + const commonEventEmitter = new TypedEventEmitter(); + const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter); + const mockWakuNodeBob = new MockWakuNode(commonEventEmitter); + + const reliableChannelAlice = await ReliableChannel.create( + mockWakuNodeAlice, + "MyChannel", + "alice", + encoder, + decoder + ); + const reliableChannelBob = await ReliableChannel.create( + mockWakuNodeBob, + "MyChannel", + "bob", + encoder, + decoder + ); + + const message = utf8ToBytes("first message in channel"); + + const messageId = reliableChannelAlice.send(message); + + // Alice sets up message tracking + let messageAcknowledged = false; + reliableChannelAlice.addEventListener("message-acknowledged", (event) => { + if (event.detail === messageId) { + messageAcknowledged = true; + } + }); + + let bobReceivedMessage = false; + reliableChannelBob.addEventListener("message-received", () => { + bobReceivedMessage = true; + }); + + // Wait for bob to receive the message to ensure it's included in causal history + while (!bobReceivedMessage) { + await delay(50); + } + + // Bobs sends a message now, it should include first one in causal history + reliableChannelBob.send(utf8ToBytes("second message in channel")); + while (!messageAcknowledged) { + await delay(50); + } + + expect(messageAcknowledged).to.be.true; + }); + + it("Incoming message is emitted as received", async () => { + const reliableChannel = await ReliableChannel.create( + mockWakuNode, + "MyChannel", + "alice", + encoder, + decoder + ); + + let receivedMessage: IDecodedMessage; + reliableChannel.addEventListener("message-received", (event) => { + receivedMessage = event.detail; + }); + + const message = utf8ToBytes("message in channel"); + + reliableChannel.send(message); + while (!receivedMessage!) { + await delay(50); + } + + expect(bytesToUtf8(receivedMessage!.payload)).to.eq(bytesToUtf8(message)); + }); + + describe("Retries", () => { + it("Outgoing message is retried until acknowledged", async () => { + const commonEventEmitter = new TypedEventEmitter(); + const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter); + const mockWakuNodeBob = new MockWakuNode(commonEventEmitter); + + const reliableChannelAlice = await ReliableChannel.create( + mockWakuNodeAlice, + "MyChannel", + "alice", + encoder, + decoder, + { + retryIntervalMs: 200, // faster for a quick test, + processTaskMinElapseMs: 10 // faster so it process message as soon as they arrive + } + ); + const reliableChannelBob = await ReliableChannel.create( + mockWakuNodeBob, + "MyChannel", + "bob", + encoder, + decoder, + { + syncMinIntervalMs: 0, // do not send sync messages automatically + maxRetryAttempts: 0 // This one does not perform retries + } + ); + + const msgTxt = "first message in channel"; + const message = utf8ToBytes(msgTxt); + + // Let's count how many times Bob receives Alice's message + let messageCount = 0; + reliableChannelBob.addEventListener("message-received", (event) => { + if (bytesToUtf8(event.detail.payload) === msgTxt) { + messageCount++; + } + }); + + reliableChannelAlice.send(message); + + while (messageCount < 1) { + await delay(10); + } + expect(messageCount).to.equal(1, "Bob received Alice's message once"); + + // No response from Bob should trigger a retry from Alice + while (messageCount < 2) { + await delay(10); + } + expect(messageCount).to.equal(2, "retried once"); + + // Bobs sends a message now, it should include first one in causal history + reliableChannelBob.send(utf8ToBytes("second message in channel")); + + // Wait long enough to confirm no retry is executed + await delay(300); + + // Alice should have stopped sending + expect(messageCount).to.equal(2, "hasn't retried since it's acked"); + }); + }); + + describe("Missing Message Retrieval", () => { + it("Automatically retrieves missing message", async () => { + const commonEventEmitter = new TypedEventEmitter(); + const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter); + + // Setup, Alice first + const reliableChannelAlice = await ReliableChannel.create( + mockWakuNodeAlice, + "MyChannel", + "alice", + encoder, + decoder, + { + // disable any automation to better control the test + retryIntervalMs: 0, + syncMinIntervalMs: 0, + retrieveFrequencyMs: 0, + processTaskMinElapseMs: 10 + } + ); + + // Bob is offline, Alice sends a message, this is the message we want + // Bob to receive in this test. + const message = utf8ToBytes("missing message"); + reliableChannelAlice.send(message); + // Wait to be sent + await new Promise((resolve) => { + reliableChannelAlice.addEventListener("message-sent", resolve, { + once: true + }); + }); + + const sdsMessage = new ContentMessage( + ReliableChannel.getMessageId(message), + "MyChannel", + "alice", + [], + 1, + undefined, + message + ); + + // Now Bob goes online + const mockWakuNodeBob = new MockWakuNode(commonEventEmitter); + + // Stub store.queryGenerator to return a message + const mockMessage = { + payload: sdsMessage.encode() + }; + const queryGeneratorStub = sinon.stub().callsFake(async function* ( + _decoders: IDecoder[], + _options?: Partial + ) { + yield [Promise.resolve(mockMessage as IDecodedMessage)]; + }); + + (mockWakuNodeBob.store as any) = { + queryGenerator: queryGeneratorStub + }; + + const reliableChannelBob = await ReliableChannel.create( + mockWakuNodeBob, + "MyChannel", + "bob", + encoder, + decoder, + { + retryIntervalMs: 0, // disable any automation to better control the test + syncMinIntervalMs: 0, + processTaskMinElapseMs: 10, + retrieveFrequencyMs: 100 // quick loop so the test go fast + } + ); + + let messageRetrieved = false; + reliableChannelBob.addEventListener("message-received", (event) => { + if (bytesToUtf8(event.detail.payload) === "missing message") { + messageRetrieved = true; + } + }); + + // Alice sends a sync message, Bob should learn about missing message + // and retrieve it + await reliableChannelAlice["sendSyncMessage"](); + + await delay(200); + + expect(messageRetrieved).to.be.true; + + // Verify the stub was called once with the right messageHash info + expect(queryGeneratorStub.calledOnce).to.be.true; + const callArgs = queryGeneratorStub.getCall(0).args; + expect(callArgs[1]).to.have.property("messageHashes"); + expect(callArgs[1].messageHashes).to.be.an("array"); + }); + }); + + describe("Query On Connect Integration E2E Tests", () => { + let mockWakuNode: MockWakuNode; + let reliableChannel: ReliableChannel; + let encoder: IEncoder; + let decoder: IDecoder; + let mockPeerManagerEvents: TypedEventEmitter; + let queryGeneratorStub: sinon.SinonStub; + let mockPeerId: PeerId; + + beforeEach(async () => { + // Setup mock waku node with store capability + mockWakuNode = new MockWakuNode(); + + // Setup mock peer manager events for QueryOnConnect + mockPeerManagerEvents = new TypedEventEmitter(); + (mockWakuNode as any).peerManager = { + events: mockPeerManagerEvents + }; + + // Setup encoder and decoder + encoder = createEncoder({ + contentTopic: TEST_CONTENT_TOPIC, + routingInfo: TEST_ROUTING_INFO + }); + + decoder = createDecoder(TEST_CONTENT_TOPIC, TEST_ROUTING_INFO); + + // Setup store with queryGenerator for QueryOnConnect + queryGeneratorStub = sinon.stub(); + mockWakuNode.store = { + queryGenerator: queryGeneratorStub + } as any; + + mockPeerId = { + toString: () => "QmTestPeerId" + } as unknown as PeerId; + }); + + it("should trigger QueryOnConnect when going offline and store peer reconnects", async () => { + // Create a message that will be auto-retrieved + const messageText = "Auto-retrieved message"; + const messagePayload = utf8ToBytes(messageText); + + const sdsMessage = new ContentMessage( + ReliableChannel.getMessageId(messagePayload), + "testChannel", + "testSender", + [], + 1, + undefined, + messagePayload + ); + + const autoRetrievedMessage: IDecodedMessage = { + hash: hexToBytes("1234"), + hashStr: "1234", + version: 1, + timestamp: new Date(), + contentTopic: TEST_CONTENT_TOPIC, + pubsubTopic: decoder.pubsubTopic, + payload: sdsMessage.encode(), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + }; + + // Setup queryGenerator to return the auto-retrieved message + queryGeneratorStub.callsFake(async function* () { + yield [Promise.resolve(autoRetrievedMessage)]; + }); + + // Create ReliableChannel with queryOnConnect enabled + reliableChannel = await ReliableChannel.create( + mockWakuNode, + "testChannel", + "testSender", + encoder, + decoder + ); + + // Wait for initial setup + await delay(50); + + // Setup complete - focus on testing QueryOnConnect trigger + + // Simulate going offline (change health status) + mockWakuNode.events.dispatchEvent( + new CustomEvent("health", { detail: HealthStatus.Unhealthy }) + ); + + await delay(10); + + // Simulate store peer reconnection which should trigger QueryOnConnect + mockPeerManagerEvents.dispatchEvent( + new CustomEvent("store:connect", { detail: mockPeerId }) + ); + + // Wait for store query to be triggered + await delay(200); + + // Verify that QueryOnConnect was triggered by the conditions + expect(queryGeneratorStub.called).to.be.true; + }); + + it("should trigger QueryOnConnect when time threshold is exceeded", async () => { + // Create multiple messages that will be auto-retrieved + const message1Text = "First auto-retrieved message"; + const message2Text = "Second auto-retrieved message"; + const message1Payload = utf8ToBytes(message1Text); + const message2Payload = utf8ToBytes(message2Text); + + const sdsMessage1 = new ContentMessage( + ReliableChannel.getMessageId(message1Payload), + "testChannel", + "testSender", + [], + 1, + undefined, + message1Payload + ); + + const sdsMessage2 = new ContentMessage( + ReliableChannel.getMessageId(message2Payload), + "testChannel", + "testSender", + [], + 2, + undefined, + message2Payload + ); + + const autoRetrievedMessage1: IDecodedMessage = { + hash: hexToBytes("5678"), + hashStr: "5678", + version: 1, + timestamp: new Date(Date.now() - 1000), + contentTopic: TEST_CONTENT_TOPIC, + pubsubTopic: decoder.pubsubTopic, + payload: sdsMessage1.encode(), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + }; + + const autoRetrievedMessage2: IDecodedMessage = { + hash: hexToBytes("9abc"), + hashStr: "9abc", + version: 1, + timestamp: new Date(), + contentTopic: TEST_CONTENT_TOPIC, + pubsubTopic: decoder.pubsubTopic, + payload: sdsMessage2.encode(), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + }; + + // Setup queryGenerator to return multiple messages + queryGeneratorStub.callsFake(async function* () { + yield [Promise.resolve(autoRetrievedMessage1)]; + yield [Promise.resolve(autoRetrievedMessage2)]; + }); + + // Create ReliableChannel with queryOnConnect enabled + reliableChannel = await ReliableChannel.create( + mockWakuNode, + "testChannel", + "testSender", + encoder, + decoder, + { queryOnConnect: true } + ); + + await delay(50); + + // Simulate old last successful query by accessing QueryOnConnect internals + // The default threshold is 5 minutes, so we'll set it to an old time + if ((reliableChannel as any).queryOnConnect) { + ((reliableChannel as any).queryOnConnect as any).lastSuccessfulQuery = + Date.now() - 6 * 60 * 1000; // 6 minutes ago + } + + // Simulate store peer connection which should trigger retrieval due to time threshold + mockPeerManagerEvents.dispatchEvent( + new CustomEvent("store:connect", { detail: mockPeerId }) + ); + + // Wait for store query to be triggered + await delay(200); + + // Verify that QueryOnConnect was triggered due to time threshold + expect(queryGeneratorStub.called).to.be.true; + }); + }); +}); diff --git a/packages/sdk/src/reliable_channel/reliable_channel.ts b/packages/sdk/src/reliable_channel/reliable_channel.ts new file mode 100644 index 0000000000..713309b90f --- /dev/null +++ b/packages/sdk/src/reliable_channel/reliable_channel.ts @@ -0,0 +1,676 @@ +import { TypedEventEmitter } from "@libp2p/interface"; +import { messageHash } from "@waku/core"; +import { + type Callback, + type IDecodedMessage, + type IDecoder, + type IEncoder, + type IMessage, + ISendOptions, + type IWaku, + LightPushError, + LightPushSDKResult, + QueryRequestParams +} from "@waku/interfaces"; +import { + type ChannelId, + isContentMessage, + MessageChannel, + MessageChannelEvent, + type MessageChannelOptions, + Message as SdsMessage, + type SenderId, + SyncMessage +} from "@waku/sds"; +import { Logger } from "@waku/utils"; + +import { + QueryOnConnect, + QueryOnConnectEvent +} from "../query_on_connect/index.js"; + +import { ReliableChannelEvent, ReliableChannelEvents } from "./events.js"; +import { MissingMessageRetriever } from "./missing_message_retriever.js"; +import { RetryManager } from "./retry_manager.js"; + +const log = new Logger("sdk:reliable-channel"); + +const DEFAULT_SYNC_MIN_INTERVAL_MS = 30 * 1000; // 30 seconds +const DEFAULT_RETRY_INTERVAL_MS = 30 * 1000; // 30 seconds +const DEFAULT_MAX_RETRY_ATTEMPTS = 10; +const DEFAULT_SWEEP_IN_BUF_INTERVAL_MS = 5 * 1000; +const DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS = 1000; + +const IRRECOVERABLE_SENDING_ERRORS: LightPushError[] = [ + LightPushError.ENCODE_FAILED, + LightPushError.EMPTY_PAYLOAD, + LightPushError.SIZE_TOO_BIG, + LightPushError.RLN_PROOF_GENERATION +]; + +export type ReliableChannelOptions = MessageChannelOptions & { + /** + * The minimum interval between 2 sync messages in the channel. + * + * Meaning, how frequently we want messages in the channel, noting that the + * responsibility of sending a sync messages is shared between participants + * of the channel. + * + * `0` means no sync messages will be sent. + * + * @default 30,000 (30 seconds) [[DEFAULT_SYNC_MIN_INTERVAL_MS]] + */ + syncMinIntervalMs?: number; + + /** + * How long to wait before re-sending a message that as not acknowledged. + * + * @default 60,000 (60 seconds) [[DEFAULT_RETRY_INTERVAL_MS]] + */ + retryIntervalMs?: number; + + /** + * How many times do we attempt resending messages that were not acknowledged. + * + * @default 10 [[DEFAULT_MAX_RETRY_ATTEMPTS]] + */ + maxRetryAttempts?: number; + + /** + * How often store queries are done to retrieve missing messages. + * + * @default 10,000 (10 seconds) + */ + retrieveFrequencyMs?: number; + + /** + * How often SDS message channel incoming buffer is swept. + * + * @default 5000 (every 5 seconds) + */ + sweepInBufIntervalMs?: number; + + /** + * Whether to automatically do a store query after connection to store nodes. + * + * @default true + */ + queryOnConnect?: boolean; + + /** + * Whether to auto start the message channel + * + * @default true + */ + autoStart?: boolean; + + /** The minimum elapse time between calling the underlying channel process + * task for incoming messages. This is to avoid overload when processing + * a lot of messages. + * + * @default 1000 (1 second) + */ + processTaskMinElapseMs?: number; +}; + +/** + * An easy-to-use reliable channel that ensures all participants to the channel have eventual message consistency. + * + * Use events to track: + * - if your outgoing messages are sent, acknowledged or error out + * - for new incoming messages + * @emits [[ReliableChannelEvents]] + * + */ +export class ReliableChannel< + T extends IDecodedMessage +> extends TypedEventEmitter { + private readonly _send: ( + encoder: IEncoder, + message: IMessage, + sendOptions?: ISendOptions + ) => Promise; + + private readonly _subscribe: ( + decoders: IDecoder | IDecoder[], + callback: Callback + ) => Promise; + + private readonly _retrieve?: ( + decoders: IDecoder[], + options?: Partial + ) => AsyncGenerator[]>; + + private readonly syncMinIntervalMs: number; + private syncTimeout: ReturnType | undefined; + private sweepInBufInterval: ReturnType | undefined; + private readonly sweepInBufIntervalMs: number; + private processTaskTimeout: ReturnType | undefined; + private readonly retryManager: RetryManager | undefined; + private readonly missingMessageRetriever?: MissingMessageRetriever; + private readonly queryOnConnect?: QueryOnConnect; + private readonly processTaskMinElapseMs: number; + private _started: boolean; + + private constructor( + public node: IWaku, + public messageChannel: MessageChannel, + private encoder: IEncoder, + private decoder: IDecoder, + options?: ReliableChannelOptions + ) { + super(); + if (node.lightPush) { + this._send = node.lightPush.send.bind(node.lightPush); + } else if (node.relay) { + this._send = node.relay.send.bind(node.relay); + } else { + throw "No protocol available to send messages"; + } + + if (node.filter) { + this._subscribe = node.filter.subscribe.bind(node.filter); + } else if (node.relay) { + // TODO: Why do relay and filter have different interfaces? + // this._subscribe = node.relay.subscribeWithUnsubscribe; + throw "Not implemented"; + } else { + throw "No protocol available to receive messages"; + } + + if (node.store) { + this._retrieve = node.store.queryGenerator.bind(node.store); + const peerManagerEvents = (node as any)?.peerManager?.events; + if ( + peerManagerEvents !== undefined && + (options?.queryOnConnect ?? true) + ) { + log.info("auto-query enabled"); + this.queryOnConnect = new QueryOnConnect( + [this.decoder], + peerManagerEvents, + node.events, + this._retrieve.bind(this) + ); + } + } + + this.syncMinIntervalMs = + options?.syncMinIntervalMs ?? DEFAULT_SYNC_MIN_INTERVAL_MS; + + this.sweepInBufIntervalMs = + options?.sweepInBufIntervalMs ?? DEFAULT_SWEEP_IN_BUF_INTERVAL_MS; + + const retryIntervalMs = + options?.retryIntervalMs ?? DEFAULT_RETRY_INTERVAL_MS; + const maxRetryAttempts = + options?.maxRetryAttempts ?? DEFAULT_MAX_RETRY_ATTEMPTS; + + if (retryIntervalMs && maxRetryAttempts) { + // TODO: there is a lot to improve. e.g. not point retry to send if node is offline. + this.retryManager = new RetryManager(retryIntervalMs, maxRetryAttempts); + } + + this.processTaskMinElapseMs = + options?.processTaskMinElapseMs ?? DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS; + + if (this._retrieve) { + this.missingMessageRetriever = new MissingMessageRetriever( + this.decoder, + options?.retrieveFrequencyMs, + this._retrieve, + async (msg: T) => { + await this.processIncomingMessage(msg); + } + ); + } + + this._started = false; + } + + public get isStarted(): boolean { + return this._started; + } + + /** + * Used to identify messages, pass the payload of a message you are + * about to send to track the events for this message. + * This is pre-sds wrapping + * @param messagePayload + */ + public static getMessageId(messagePayload: Uint8Array): string { + return MessageChannel.getMessageId(messagePayload); + } + + /** + * Create a new message channels. Message channels enables end-to-end + * reliability by ensuring that all messages in the channel are received + * by other users, and retrieved by this local node. + * + * emits events about outgoing messages, see [[`ReliableChannel`]] docs. + * + * Note that all participants in a message channels need to get the messages + * from the channel. Meaning: + * - all participants must be able to decrypt the messages + * - all participants must be subscribing to content topic(s) where the messages are sent + * + * @param node The waku node to use to send and receive messages + * @param channelId An id for the channel, all participants of the channel should use the same id + * @param senderId An id for the sender, to ensure acknowledgements are only valid if originating from someone else; best if persisted between sessions + * @param encoder A channel operates within a singular encryption layer, hence the same encoder is needed for all messages + * @param decoder A channel operates within a singular encryption layer, hence the same decoder is needed for all messages + * @param options + */ + public static async create( + node: IWaku, + channelId: ChannelId, + senderId: SenderId, + encoder: IEncoder, + decoder: IDecoder, + options?: ReliableChannelOptions + ): Promise> { + const sdsMessageChannel = new MessageChannel(channelId, senderId, options); + const messageChannel = new ReliableChannel( + node, + sdsMessageChannel, + encoder, + decoder, + options + ); + + const autoStart = options?.autoStart ?? true; + if (autoStart) { + await messageChannel.start(); + } + + return messageChannel; + } + + /** + * Sends a message in the channel, will attempt to re-send if not acknowledged + * by other participants. + * + * @param messagePayload + * @returns the message id + */ + public send(messagePayload: Uint8Array): string { + const messageId = ReliableChannel.getMessageId(messagePayload); + if (!this._started) { + this.safeSendEvent("sending-message-irrecoverable-error", { + detail: { messageId: messageId, error: "channel is not started" } + }); + } + const wrapAndSendBind = this._wrapAndSend.bind(this, messagePayload); + this.retryManager?.startRetries(messageId, wrapAndSendBind); + wrapAndSendBind(); + return messageId; + } + + private _wrapAndSend(messagePayload: Uint8Array): void { + this.messageChannel.pushOutgoingMessage( + messagePayload, + async ( + sdsMessage: SdsMessage + ): Promise<{ success: boolean; retrievalHint?: Uint8Array }> => { + // Callback is called once message has added to the SDS outgoing queue + // We start by trying to send the message now. + + // `payload` wrapped in SDS + const sdsPayload = sdsMessage.encode(); + + const wakuMessage = { + payload: sdsPayload + }; + + const messageId = ReliableChannel.getMessageId(messagePayload); + + // TODO: should the encoder give me the message hash? + // Encoding now to fail early, used later to get message hash + const protoMessage = await this.encoder.toProtoObj(wakuMessage); + if (!protoMessage) { + this.safeSendEvent("sending-message-irrecoverable-error", { + detail: { + messageId: messageId, + error: "could not encode message" + } + }); + return { success: false }; + } + const retrievalHint = messageHash( + this.encoder.pubsubTopic, + protoMessage + ); + + this.safeSendEvent("sending-message", { + detail: messageId + }); + + const sendRes = await this._send(this.encoder, wakuMessage); + + // If it's a recoverable failure, we will try again to send later + // If not, then we should error to the user now + for (const { error } of sendRes.failures) { + if (IRRECOVERABLE_SENDING_ERRORS.includes(error)) { + // Not recoverable, best to return it + log.error("Irrecoverable error, cannot send message: ", error); + this.safeSendEvent("sending-message-irrecoverable-error", { + detail: { + messageId, + error + } + }); + return { success: false, retrievalHint }; + } + } + + return { + success: true, + retrievalHint + }; + } + ); + + // Process outgoing messages straight away + this.messageChannel + .processTasks() + .then(() => { + this.messageChannel.sweepOutgoingBuffer(); + }) + .catch((err) => { + log.error("error encountered when processing sds tasks", err); + }); + } + + private async subscribe(): Promise { + this.assertStarted(); + return this._subscribe(this.decoder, async (message: T) => { + await this.processIncomingMessage(message); + }); + } + + /** + * Don't forget to call `this.messageChannel.sweepIncomingBuffer();` once done. + * @param msg + * @private + */ + private async processIncomingMessage( + msg: T + ): Promise { + // New message arrives, we need to unwrap it first + const sdsMessage = SdsMessage.decode(msg.payload); + + if (!sdsMessage) { + log.error("could not SDS decode message", msg); + return; + } + + if (sdsMessage.channelId !== this.messageChannel.channelId) { + log.warn( + "ignoring message with different channel id", + sdsMessage.channelId + ); + return; + } + + const retrievalHint = msg.hash; + log.info(`processing message ${sdsMessage.messageId}:${msg.hashStr}`); + // SDS Message decoded, let's pass it to the channel so we can learn about + // missing messages or the status of previous outgoing messages + this.messageChannel.pushIncomingMessage(sdsMessage, retrievalHint); + + this.missingMessageRetriever?.removeMissingMessage(sdsMessage.messageId); + + if (sdsMessage.content && sdsMessage.content.length > 0) { + // Now, process the message with callback + + // Overrides msg.payload with unwrapped payload + // TODO: can we do better? + const { payload: _p, ...allButPayload } = msg; + const unwrappedMessage = Object.assign(allButPayload, { + payload: sdsMessage.content, + hash: msg.hash, + hashStr: msg.hashStr, + version: msg.version, + contentTopic: msg.contentTopic, + pubsubTopic: msg.pubsubTopic, + timestamp: msg.timestamp, + rateLimitProof: msg.rateLimitProof, + ephemeral: msg.ephemeral, + meta: msg.meta + }); + + this.safeSendEvent("message-received", { + detail: unwrappedMessage as unknown as T + }); + } + + this.queueProcessTasks(); + } + + private async processIncomingMessages( + messages: T[] + ): Promise { + for (const message of messages) { + await this.processIncomingMessage(message); + } + } + + // TODO: For now we only queue process tasks for incoming messages + // As this is where there is most volume + private queueProcessTasks(): void { + // If one is already queued, then we can ignore it + if (this.processTaskTimeout === undefined) { + this.processTaskTimeout = setTimeout(() => { + void this.messageChannel.processTasks().catch((err) => { + log.error("error encountered when processing sds tasks", err); + }); + + // Clear timeout once triggered + clearTimeout(this.processTaskTimeout); + this.processTaskTimeout = undefined; + }, this.processTaskMinElapseMs); // we ensure that we don't call process tasks more than once per second + } + } + + public async start(): Promise { + if (this._started) return true; + this._started = true; + this.setupEventListeners(); + this.restartSync(); + this.startSweepIncomingBufferLoop(); + if (this._retrieve) { + this.missingMessageRetriever?.start(); + this.queryOnConnect?.start(); + } + return this.subscribe(); + } + + public stop(): void { + if (!this._started) return; + this._started = false; + this.stopSync(); + this.stopSweepIncomingBufferLoop(); + this.missingMessageRetriever?.stop(); + this.queryOnConnect?.stop(); + // TODO unsubscribe + // TODO unsetMessageListeners + } + + private assertStarted(): void { + if (!this._started) throw Error("Message Channel must be started"); + } + + private startSweepIncomingBufferLoop(): void { + this.stopSweepIncomingBufferLoop(); + this.sweepInBufInterval = setInterval(() => { + log.info("sweep incoming buffer"); + this.messageChannel.sweepIncomingBuffer(); + }, this.sweepInBufIntervalMs); + } + + private stopSweepIncomingBufferLoop(): void { + if (this.sweepInBufInterval) clearInterval(this.sweepInBufInterval); + } + + private restartSync(multiplier: number = 1): void { + if (this.syncTimeout) { + clearTimeout(this.syncTimeout); + } + if (this.syncMinIntervalMs) { + const timeoutMs = this.random() * this.syncMinIntervalMs * multiplier; + + this.syncTimeout = setTimeout(() => { + void this.sendSyncMessage(); + // Always restart a sync, no matter whether the message was sent. + // Set a multiplier so we wait a bit longer to not hog the conversation + void this.restartSync(2); + }, timeoutMs); + } + } + + private stopSync(): void { + if (this.syncTimeout) { + clearTimeout(this.syncTimeout); + } + } + + // Used to enable overriding when testing + private random(): number { + return Math.random(); + } + + private safeSendEvent( + event: T, + eventInit?: CustomEventInit + ): void { + try { + this.dispatchEvent(new CustomEvent(event, eventInit)); + } catch (error) { + log.error(`Failed to dispatch event ${event}:`, error); + } + } + + private async sendSyncMessage(): Promise { + this.assertStarted(); + await this.messageChannel.pushOutgoingSyncMessage( + async (syncMessage: SyncMessage): Promise => { + // Callback is called once message has added to the SDS outgoing queue + // We start by trying to send the message now. + + // `payload` wrapped in SDS + const sdsPayload = syncMessage.encode(); + + const wakuMessage = { + payload: sdsPayload + }; + + const sendRes = await this._send(this.encoder, wakuMessage); + if (sendRes.failures.length > 0) { + log.error("Error sending sync message: ", sendRes); + return false; + } + + return true; + } + ); + + // Process outgoing messages straight away + // TODO: review and optimize + await this.messageChannel.processTasks(); + this.messageChannel.sweepOutgoingBuffer(); + } + + private setupEventListeners(): void { + this.messageChannel.addEventListener( + MessageChannelEvent.OutMessageSent, + (event) => { + if (event.detail.content) { + const messageId = ReliableChannel.getMessageId(event.detail.content); + this.safeSendEvent("message-sent", { + detail: messageId + }); + } + } + ); + + this.messageChannel.addEventListener( + MessageChannelEvent.OutMessageAcknowledged, + (event) => { + if (event.detail) { + this.safeSendEvent("message-acknowledged", { + detail: event.detail + }); + + // Stopping retries + this.retryManager?.stopRetries(event.detail); + } + } + ); + + this.messageChannel.addEventListener( + MessageChannelEvent.OutMessagePossiblyAcknowledged, + (event) => { + if (event.detail) { + this.safeSendEvent("message-possibly-acknowledged", { + detail: { + messageId: event.detail.messageId, + possibleAckCount: event.detail.count + } + }); + } + } + ); + + this.messageChannel.addEventListener( + MessageChannelEvent.InSyncReceived, + (_event) => { + // restart the timeout when a sync message has been received + this.restartSync(); + } + ); + + this.messageChannel.addEventListener( + MessageChannelEvent.InMessageReceived, + (event) => { + // restart the timeout when a content message has been received + if (isContentMessage(event.detail)) { + // send a sync message faster to ack someone's else + this.restartSync(0.5); + } + } + ); + + this.messageChannel.addEventListener( + MessageChannelEvent.OutMessageSent, + (event) => { + // restart the timeout when a content message has been sent + if (isContentMessage(event.detail)) { + this.restartSync(); + } + } + ); + + this.messageChannel.addEventListener( + MessageChannelEvent.InMessageMissing, + (event) => { + for (const { messageId, retrievalHint } of event.detail) { + if (retrievalHint && this.missingMessageRetriever) { + this.missingMessageRetriever.addMissingMessage( + messageId, + retrievalHint + ); + } + } + } + ); + + if (this.queryOnConnect) { + this.queryOnConnect.addEventListener( + QueryOnConnectEvent.MessagesRetrieved, + (event) => { + void this.processIncomingMessages(event.detail); + } + ); + } + } +} diff --git a/packages/sdk/src/reliable_channel/reliable_channel_acks.spec.ts b/packages/sdk/src/reliable_channel/reliable_channel_acks.spec.ts new file mode 100644 index 0000000000..9cce421c59 --- /dev/null +++ b/packages/sdk/src/reliable_channel/reliable_channel_acks.spec.ts @@ -0,0 +1,187 @@ +import { TypedEventEmitter } from "@libp2p/interface"; +import { createDecoder, createEncoder } from "@waku/core"; +import { + AutoSharding, + IDecodedMessage, + IDecoder, + IEncoder +} from "@waku/interfaces"; +import { + createRoutingInfo, + delay, + MockWakuEvents, + MockWakuNode +} from "@waku/utils"; +import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; +import { beforeEach, describe } from "mocha"; + +import { ReliableChannel } from "./index.js"; + +const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto"; +const TEST_NETWORK_CONFIG: AutoSharding = { + clusterId: 0, + numShardsInCluster: 1 +}; +const TEST_ROUTING_INFO = createRoutingInfo(TEST_NETWORK_CONFIG, { + contentTopic: TEST_CONTENT_TOPIC +}); + +describe("Reliable Channel: Acks", () => { + let encoder: IEncoder; + let decoder: IDecoder; + + beforeEach(async () => { + encoder = createEncoder({ + contentTopic: TEST_CONTENT_TOPIC, + routingInfo: TEST_ROUTING_INFO + }); + decoder = createDecoder(TEST_CONTENT_TOPIC, TEST_ROUTING_INFO); + }); + + it("Outgoing message is acknowledged", async () => { + const commonEventEmitter = new TypedEventEmitter(); + const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter); + const mockWakuNodeBob = new MockWakuNode(commonEventEmitter); + + const reliableChannelAlice = await ReliableChannel.create( + mockWakuNodeAlice, + "MyChannel", + "alice", + encoder, + decoder + ); + const reliableChannelBob = await ReliableChannel.create( + mockWakuNodeBob, + "MyChannel", + "bob", + encoder, + decoder + ); + + const message = utf8ToBytes("first message in channel"); + + // Alice sets up message tracking + const messageId = ReliableChannel.getMessageId(message); + + let messageReceived = false; + reliableChannelBob.addEventListener("message-received", (event) => { + if (bytesToUtf8(event.detail.payload) === "first message in channel") { + messageReceived = true; + } + }); + + let messageAcknowledged = false; + reliableChannelAlice.addEventListener("message-acknowledged", (event) => { + if (event.detail === messageId) { + messageAcknowledged = true; + } + }); + + reliableChannelAlice.send(message); + + // Wait for Bob to receive the message to ensure it uses it in causal history + while (!messageReceived) { + await delay(50); + } + // Bobs sends a message now, it should include first one in causal history + reliableChannelBob.send(utf8ToBytes("second message in channel")); + while (!messageAcknowledged) { + await delay(50); + } + + expect(messageAcknowledged).to.be.true; + }); + + it("Re-sent message is acknowledged once other parties join.", async () => { + const commonEventEmitter = new TypedEventEmitter(); + const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter); + + // Setup, Alice first + const reliableChannelAlice = await ReliableChannel.create( + mockWakuNodeAlice, + "MyChannel", + "alice", + encoder, + decoder, + { + retryIntervalMs: 0, // disable any automation to better control the test + syncMinIntervalMs: 0, + processTaskMinElapseMs: 10 + } + ); + + // Bob is offline, Alice sends a message, this is the message we want + // acknowledged in this test. + const message = utf8ToBytes("message to be acknowledged"); + const messageId = ReliableChannel.getMessageId(message); + let messageAcknowledged = false; + reliableChannelAlice.addEventListener("message-acknowledged", (event) => { + if (event.detail === messageId) { + messageAcknowledged = true; + } + }); + reliableChannelAlice.send(message); + + // Wait a bit to ensure Bob does not receive the message + await delay(100); + + // Now Bob goes online + const mockWakuNodeBob = new MockWakuNode(commonEventEmitter); + const reliableChannelBob = await ReliableChannel.create( + mockWakuNodeBob, + "MyChannel", + "bob", + encoder, + decoder, + { + retryIntervalMs: 0, // disable any automation to better control the test + syncMinIntervalMs: 0, + processTaskMinElapseMs: 10 + } + ); + + // Track when Bob receives the message + let bobReceivedMessage = false; + reliableChannelBob.addEventListener("message-received", (event) => { + if (bytesToUtf8(event.detail.payload!) === "message to be acknowledged") { + bobReceivedMessage = true; + } + }); + + // Some sync messages are exchanged + await reliableChannelAlice["sendSyncMessage"](); + await reliableChannelBob["sendSyncMessage"](); + + // wait a bit to ensure messages are processed + await delay(100); + + // Some content messages are exchanged too + reliableChannelAlice.send(utf8ToBytes("some message")); + reliableChannelBob.send(utf8ToBytes("some other message")); + + // wait a bit to ensure messages are processed + await delay(100); + + // At this point, the message shouldn't be acknowledged yet as Bob + // does not have a complete log + expect(messageAcknowledged).to.be.false; + + // Now Alice resends the message + reliableChannelAlice.send(message); + + // Wait for Bob to receive the message + while (!bobReceivedMessage) { + await delay(50); + } + + // Bob receives it, and should include it in its sync + await reliableChannelBob["sendSyncMessage"](); + while (!messageAcknowledged) { + await delay(50); + } + + // The sync should acknowledge the message + expect(messageAcknowledged).to.be.true; + }); +}); diff --git a/packages/sdk/src/reliable_channel/reliable_channel_encryption.spec.ts b/packages/sdk/src/reliable_channel/reliable_channel_encryption.spec.ts new file mode 100644 index 0000000000..978d357ec6 --- /dev/null +++ b/packages/sdk/src/reliable_channel/reliable_channel_encryption.spec.ts @@ -0,0 +1,326 @@ +import { TypedEventEmitter } from "@libp2p/interface"; +import { + AutoSharding, + IDecodedMessage, + IDecoder, + IEncoder, + type IMessage, + ISendOptions, + IWaku, + LightPushError, + LightPushSDKResult +} from "@waku/interfaces"; +import { generatePrivateKey, getPublicKey } from "@waku/message-encryption"; +import { + createDecoder as createEciesDecoder, + createEncoder as createEciesEncoder +} from "@waku/message-encryption/ecies"; +import { + createRoutingInfo, + delay, + MockWakuEvents, + MockWakuNode +} from "@waku/utils"; +import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; +import { beforeEach, describe } from "mocha"; + +import { ReliableChannel } from "./index.js"; + +const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto"; +const TEST_NETWORK_CONFIG: AutoSharding = { + clusterId: 0, + numShardsInCluster: 1 +}; +const TEST_ROUTING_INFO = createRoutingInfo(TEST_NETWORK_CONFIG, { + contentTopic: TEST_CONTENT_TOPIC +}); + +describe("Reliable Channel: Encryption", () => { + let mockWakuNode: IWaku; + let encoder: IEncoder; + let decoder: IDecoder; + + beforeEach(async () => { + mockWakuNode = new MockWakuNode(); + const privateKey = generatePrivateKey(); + const publicKey = getPublicKey(privateKey); + encoder = createEciesEncoder({ + contentTopic: TEST_CONTENT_TOPIC, + routingInfo: TEST_ROUTING_INFO, + publicKey + }); + decoder = createEciesDecoder( + TEST_CONTENT_TOPIC, + TEST_ROUTING_INFO, + privateKey + ); + }); + + it("Outgoing message is emitted as sending", async () => { + const reliableChannel = await ReliableChannel.create( + mockWakuNode, + "MyChannel", + "alice", + encoder, + decoder + ); + + const message = utf8ToBytes("message in channel"); + + // Setting up message tracking + const messageId = ReliableChannel.getMessageId(message); + let messageSending = false; + reliableChannel.addEventListener("sending-message", (event) => { + if (event.detail === messageId) { + messageSending = true; + } + }); + + reliableChannel.send(message); + while (!messageSending) { + await delay(50); + } + + expect(messageSending).to.be.true; + }); + + it("Outgoing message is emitted as sent", async () => { + const reliableChannel = await ReliableChannel.create( + mockWakuNode, + "MyChannel", + "alice", + encoder, + decoder + ); + + const message = utf8ToBytes("message in channel"); + + // Setting up message tracking + const messageId = ReliableChannel.getMessageId(message); + let messageSent = false; + reliableChannel.addEventListener("message-sent", (event) => { + if (event.detail === messageId) { + messageSent = true; + } + }); + + reliableChannel.send(message); + while (!messageSent) { + await delay(50); + } + + expect(messageSent).to.be.true; + }); + + it("Encoder error raises irrecoverable error", async () => { + mockWakuNode.lightPush!.send = ( + _encoder: IEncoder, + _message: IMessage, + _sendOptions?: ISendOptions + ): Promise => { + return Promise.resolve({ + failures: [{ error: LightPushError.EMPTY_PAYLOAD }], + successes: [] + }); + }; + + const reliableChannel = await ReliableChannel.create( + mockWakuNode, + "MyChannel", + "alice", + encoder, + decoder + ); + + const message = utf8ToBytes("payload doesnt matter"); + + // Setting up message tracking + const messageId = ReliableChannel.getMessageId(message); + let irrecoverableError = false; + reliableChannel.addEventListener( + "sending-message-irrecoverable-error", + (event) => { + if (event.detail.messageId === messageId) { + irrecoverableError = true; + } + } + ); + + encoder.contentTopic = "..."; + reliableChannel.send(message); + while (!irrecoverableError) { + await delay(50); + } + + expect(irrecoverableError).to.be.true; + }); + + it("Outgoing message is not emitted as acknowledged from own outgoing messages", async () => { + const reliableChannel = await ReliableChannel.create( + mockWakuNode, + "MyChannel", + "alice", + encoder, + decoder + ); + + const message = utf8ToBytes("first message in channel"); + + // Setting up message tracking + const messageId = ReliableChannel.getMessageId(message); + let messageAcknowledged = false; + reliableChannel.addEventListener("message-acknowledged", (event) => { + if (event.detail === messageId) { + messageAcknowledged = true; + } + }); + + reliableChannel.send(message); + + // Sending a second message from the same node should not acknowledge the first one + reliableChannel.send(utf8ToBytes("second message in channel")); + + // Wait a bit to be sure no event is emitted + await delay(200); + + expect(messageAcknowledged).to.be.false; + }); + + it("Outgoing message is possibly acknowledged", async () => { + const commonEventEmitter = new TypedEventEmitter(); + const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter); + const mockWakuNodeBob = new MockWakuNode(commonEventEmitter); + + const reliableChannelAlice = await ReliableChannel.create( + mockWakuNodeAlice, + "MyChannel", + "alice", + encoder, + decoder + ); + const reliableChannelBob = await ReliableChannel.create( + mockWakuNodeBob, + "MyChannel", + "bob", + encoder, + decoder, + // Bob only includes one message in causal history + { causalHistorySize: 1 } + ); + + const messages = ["first", "second", "third"].map((m) => { + return utf8ToBytes(m); + }); + + // Alice sets up message tracking for first message + const firstMessageId = ReliableChannel.getMessageId(messages[0]); + let firstMessagePossiblyAcknowledged = false; + reliableChannelAlice.addEventListener( + "message-possibly-acknowledged", + (event) => { + if (event.detail.messageId === firstMessageId) { + firstMessagePossiblyAcknowledged = true; + } + } + ); + + let bobMessageReceived = 0; + reliableChannelAlice.addEventListener("message-received", () => { + bobMessageReceived++; + }); + + for (const m of messages) { + reliableChannelAlice.send(m); + } + + // Wait for Bob to receive all messages to ensure filter is updated + while (bobMessageReceived < 3) { + await delay(50); + } + + // Bobs sends a message now, it should include first one in bloom filter + reliableChannelBob.send(utf8ToBytes("message back")); + while (!firstMessagePossiblyAcknowledged) { + await delay(50); + } + + expect(firstMessagePossiblyAcknowledged).to.be.true; + }); + + it("Outgoing message is acknowledged", async () => { + const commonEventEmitter = new TypedEventEmitter(); + const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter); + const mockWakuNodeBob = new MockWakuNode(commonEventEmitter); + + const reliableChannelAlice = await ReliableChannel.create( + mockWakuNodeAlice, + "MyChannel", + "alice", + encoder, + decoder + ); + const reliableChannelBob = await ReliableChannel.create( + mockWakuNodeBob, + "MyChannel", + "bob", + encoder, + decoder + ); + + const message = utf8ToBytes("first message in channel"); + + // Alice sets up message tracking + const messageId = ReliableChannel.getMessageId(message); + let messageAcknowledged = false; + reliableChannelAlice.addEventListener("message-acknowledged", (event) => { + if (event.detail === messageId) { + messageAcknowledged = true; + } + }); + + let bobReceivedMessage = false; + reliableChannelBob.addEventListener("message-received", () => { + bobReceivedMessage = true; + }); + + reliableChannelAlice.send(message); + + // Wait for Bob to receive the message + while (!bobReceivedMessage) { + await delay(50); + } + + // Bobs sends a message now, it should include first one in causal history + reliableChannelBob.send(utf8ToBytes("second message in channel")); + while (!messageAcknowledged) { + await delay(50); + } + + expect(messageAcknowledged).to.be.true; + }); + + it("Incoming message is emitted as received", async () => { + const reliableChannel = await ReliableChannel.create( + mockWakuNode, + "MyChannel", + "alice", + encoder, + decoder + ); + + let receivedMessage: IDecodedMessage; + reliableChannel.addEventListener("message-received", (event) => { + receivedMessage = event.detail; + }); + + const message = utf8ToBytes("message in channel"); + + reliableChannel.send(message); + while (!receivedMessage!) { + await delay(50); + } + + expect(bytesToUtf8(receivedMessage!.payload)).to.eq(bytesToUtf8(message)); + }); +}); diff --git a/packages/sdk/src/reliable_channel/reliable_channel_sync.spec.ts b/packages/sdk/src/reliable_channel/reliable_channel_sync.spec.ts new file mode 100644 index 0000000000..75dbb2eda0 --- /dev/null +++ b/packages/sdk/src/reliable_channel/reliable_channel_sync.spec.ts @@ -0,0 +1,332 @@ +import { TypedEventEmitter } from "@libp2p/interface"; +import { createDecoder, createEncoder } from "@waku/core"; +import { + AutoSharding, + IDecodedMessage, + IDecoder, + IEncoder, + IWaku +} from "@waku/interfaces"; +import { MessageChannelEvent } from "@waku/sds"; +import { + createRoutingInfo, + delay, + MockWakuEvents, + MockWakuNode +} from "@waku/utils"; +import { utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; +import { beforeEach, describe } from "mocha"; + +import { ReliableChannel } from "./index.js"; + +const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto"; +const TEST_NETWORK_CONFIG: AutoSharding = { + clusterId: 0, + numShardsInCluster: 1 +}; +const TEST_ROUTING_INFO = createRoutingInfo(TEST_NETWORK_CONFIG, { + contentTopic: TEST_CONTENT_TOPIC +}); + +describe("Reliable Channel: Sync", () => { + let mockWakuNode: IWaku; + let encoder: IEncoder; + let decoder: IDecoder; + + beforeEach(async () => { + mockWakuNode = new MockWakuNode(); + encoder = createEncoder({ + contentTopic: TEST_CONTENT_TOPIC, + routingInfo: TEST_ROUTING_INFO + }); + decoder = createDecoder(TEST_CONTENT_TOPIC, TEST_ROUTING_INFO); + }); + + it("Sync message is sent within sync frequency", async () => { + const syncMinIntervalMs = 100; + const reliableChannel = await ReliableChannel.create( + mockWakuNode, + "MyChannel", + "alice", + encoder, + decoder, + { + syncMinIntervalMs + } + ); + + let syncMessageSent = false; + reliableChannel.messageChannel.addEventListener( + MessageChannelEvent.OutSyncSent, + (_event) => { + syncMessageSent = true; + } + ); + + await delay(syncMinIntervalMs); + + expect(syncMessageSent).to.be.true; + }); + + it("Sync message are not sent excessively within sync frequency", async () => { + const syncMinIntervalMs = 100; + const reliableChannel = await ReliableChannel.create( + mockWakuNode, + "MyChannel", + "alice", + encoder, + decoder, + { + syncMinIntervalMs + } + ); + + let syncMessageSentCount = 0; + reliableChannel.messageChannel.addEventListener( + MessageChannelEvent.OutSyncSent, + (_event) => { + syncMessageSentCount++; + } + ); + + await delay(syncMinIntervalMs); + + // There is randomness to this, but it should not be excessive + expect(syncMessageSentCount).to.be.lessThan(3); + }); + + it("Sync message is not sent if another sync message was just received", async function () { + this.timeout(5000); + + const commonEventEmitter = new TypedEventEmitter(); + const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter); + const mockWakuNodeBob = new MockWakuNode(commonEventEmitter); + + const syncMinIntervalMs = 1000; + + const reliableChannelAlice = await ReliableChannel.create( + mockWakuNodeAlice, + "MyChannel", + "alice", + encoder, + decoder, + { + syncMinIntervalMs: 0, // does not send sync messages automatically + processTaskMinElapseMs: 10 + } + ); + const reliableChannelBob = await ReliableChannel.create( + mockWakuNodeBob, + "MyChannel", + "bob", + encoder, + decoder, + { + syncMinIntervalMs, + processTaskMinElapseMs: 10 + } + ); + (reliableChannelBob as any).random = () => { + return 1; + }; // will wait a full second + + let syncMessageSent = false; + reliableChannelBob.messageChannel.addEventListener( + MessageChannelEvent.OutSyncSent, + (_event) => { + syncMessageSent = true; + } + ); + + while (!syncMessageSent) { + // Bob will send a sync message as soon as it started, we are waiting for this one + await delay(100); + } + // Let's reset the tracker + syncMessageSent = false; + // We should be faster than Bob as Bob will "randomly" wait a full second + await reliableChannelAlice["sendSyncMessage"](); + + // Bob should be waiting a full second before sending a message after Alice + await delay(900); + + // Now, let's wait Bob to send the sync message + await delay(200); + expect(syncMessageSent).to.be.true; + }); + + it("Sync message is not sent if another non-ephemeral message was just received", async function () { + this.timeout(5000); + + const commonEventEmitter = new TypedEventEmitter(); + const mockWakuNodeAlice = new MockWakuNode(commonEventEmitter); + const mockWakuNodeBob = new MockWakuNode(commonEventEmitter); + + const syncMinIntervalMs = 1000; + + const reliableChannelAlice = await ReliableChannel.create( + mockWakuNodeAlice, + "MyChannel", + "alice", + encoder, + decoder, + { + syncMinIntervalMs: 0, // does not send sync messages automatically + processTaskMinElapseMs: 10 + } + ); + const reliableChannelBob = await ReliableChannel.create( + mockWakuNodeBob, + "MyChannel", + "bob", + encoder, + decoder, + { + syncMinIntervalMs, + processTaskMinElapseMs: 10 + } + ); + (reliableChannelBob as any).random = () => { + return 1; + }; // will wait a full second + + let syncMessageSent = false; + reliableChannelBob.messageChannel.addEventListener( + MessageChannelEvent.OutSyncSent, + (_event) => { + syncMessageSent = true; + } + ); + + while (!syncMessageSent) { + // Bob will send a sync message as soon as it started, we are waiting for this one + await delay(100); + } + // Let's reset the tracker + syncMessageSent = false; + // We should be faster than Bob as Bob will "randomly" wait a full second + reliableChannelAlice.send(utf8ToBytes("some message")); + + // Bob should be waiting a full second before sending a message after Alice + await delay(900); + + // Now, let's wait Bob to send the sync message + await delay(200); + expect(syncMessageSent).to.be.true; + }); + + it("Sync message is not sent if another sync message was just sent", async function () { + this.timeout(5000); + const syncMinIntervalMs = 1000; + + const reliableChannel = await ReliableChannel.create( + mockWakuNode, + "MyChannel", + "alice", + encoder, + decoder, + { syncMinIntervalMs } + ); + (reliableChannel as any).random = () => { + return 1; + }; // will wait a full second + + let syncMessageSent = false; + reliableChannel.messageChannel.addEventListener( + MessageChannelEvent.OutSyncSent, + (_event) => { + syncMessageSent = true; + } + ); + + while (!syncMessageSent) { + // Will send a sync message as soon as it started, we are waiting for this one + await delay(100); + } + // Let's reset the tracker + syncMessageSent = false; + // We should be faster than automated sync as it will "randomly" wait a full second + await reliableChannel["sendSyncMessage"](); + + // should be waiting a full second before sending a message after Alice + await delay(900); + + // Now, let's wait to send the automated sync message + await delay(200); + expect(syncMessageSent).to.be.true; + }); + + it("Sync message is not sent if another non-ephemeral message was just sent", async function () { + this.timeout(5000); + const syncMinIntervalMs = 1000; + + const reliableChannel = await ReliableChannel.create( + mockWakuNode, + "MyChannel", + "alice", + encoder, + decoder, + { syncMinIntervalMs } + ); + (reliableChannel as any).random = () => { + return 1; + }; // will wait a full second + + let syncMessageSent = false; + reliableChannel.messageChannel.addEventListener( + MessageChannelEvent.OutSyncSent, + (_event) => { + syncMessageSent = true; + } + ); + + while (!syncMessageSent) { + // Will send a sync message as soon as it started, we are waiting for this one + await delay(100); + } + // Let's reset the tracker + syncMessageSent = false; + // We should be faster than automated sync as it will "randomly" wait a full second + reliableChannel.send(utf8ToBytes("non-ephemeral message")); + + // should be waiting a full second before sending a message after Alice + await delay(900); + + // Now, let's wait to send the automated sync message + await delay(200); + expect(syncMessageSent).to.be.true; + }); + + it("Own sync message does not acknowledge own messages", async () => { + const syncMinIntervalMs = 100; + const reliableChannel = await ReliableChannel.create( + mockWakuNode, + "MyChannel", + "alice", + encoder, + decoder, + { + syncMinIntervalMs + } + ); + + const msg = utf8ToBytes("some message"); + const msgId = ReliableChannel.getMessageId(msg); + + let messageAcknowledged = false; + reliableChannel.messageChannel.addEventListener( + MessageChannelEvent.OutMessageAcknowledged, + (event) => { + if (event.detail === msgId) messageAcknowledged = true; + } + ); + + reliableChannel.send(msg); + + await delay(syncMinIntervalMs * 2); + + // There is randomness to this, but it should not be excessive + expect(messageAcknowledged).to.be.false; + }); +}); diff --git a/packages/sdk/src/reliable_channel/retry_manager.spec.ts b/packages/sdk/src/reliable_channel/retry_manager.spec.ts new file mode 100644 index 0000000000..1a80a5cea1 --- /dev/null +++ b/packages/sdk/src/reliable_channel/retry_manager.spec.ts @@ -0,0 +1,48 @@ +import { delay } from "@waku/utils"; +import { expect } from "chai"; + +import { RetryManager } from "./retry_manager.js"; + +describe("Retry Manager", () => { + it("Retries within given interval", async function () { + const retryManager = new RetryManager(100, 1); + + let retryCount = 0; + retryManager.startRetries("1", () => { + retryCount++; + }); + + await delay(110); + + expect(retryCount).to.equal(1); + }); + + it("Retries within maximum given attempts", async function () { + const maxAttempts = 5; + const retryManager = new RetryManager(10, maxAttempts); + + let retryCount = 0; + retryManager.startRetries("1", () => { + retryCount++; + }); + + await delay(200); + + expect(retryCount).to.equal(maxAttempts); + }); + + it("Wait given interval before re-trying", async function () { + const retryManager = new RetryManager(100, 1); + + let retryCount = 0; + retryManager.startRetries("1", () => { + retryCount++; + }); + + await delay(90); + expect(retryCount).to.equal(0); + + await delay(110); + expect(retryCount).to.equal(1); + }); +}); diff --git a/packages/sdk/src/reliable_channel/retry_manager.ts b/packages/sdk/src/reliable_channel/retry_manager.ts new file mode 100644 index 0000000000..00426fd854 --- /dev/null +++ b/packages/sdk/src/reliable_channel/retry_manager.ts @@ -0,0 +1,51 @@ +export class RetryManager { + private timeouts: Map>; + + public constructor( + // TODO: back-off strategy + private retryIntervalMs: number, + private maxRetryNumber: number + ) { + this.timeouts = new Map(); + + if ( + !retryIntervalMs || + retryIntervalMs <= 0 || + !maxRetryNumber || + maxRetryNumber <= 0 + ) { + throw Error( + `Invalid retryIntervalMs ${retryIntervalMs} or maxRetryNumber ${maxRetryNumber} values` + ); + } + } + + public stopRetries(id: string): void { + const timeout = this.timeouts.get(id); + if (timeout) { + clearTimeout(timeout); + } + } + + public startRetries(id: string, retry: () => void | Promise): void { + this.retry(id, retry, 0); + } + + private retry( + id: string, + retry: () => void | Promise, + attemptNumber: number + ): void { + clearTimeout(this.timeouts.get(id)); + if (attemptNumber < this.maxRetryNumber) { + const interval = setTimeout(() => { + void retry(); + + // Register for next retry until we are told to stop; + this.retry(id, retry, ++attemptNumber); + }, this.retryIntervalMs); + + this.timeouts.set(id, interval); + } + } +} diff --git a/packages/sds/src/message_channel/message.spec.ts b/packages/sds/src/message_channel/message.spec.ts index 37dfd5db28..11bb9b3735 100644 --- a/packages/sds/src/message_channel/message.spec.ts +++ b/packages/sds/src/message_channel/message.spec.ts @@ -50,7 +50,7 @@ describe("Message serialization", () => { const bytes = message.encode(); const decMessage = Message.decode(bytes); - expect(decMessage.causalHistory).to.deep.equal([ + expect(decMessage!.causalHistory).to.deep.equal([ { messageId: depMessageId, retrievalHint: depRetrievalHint } ]); }); diff --git a/packages/sds/src/message_channel/message.ts b/packages/sds/src/message_channel/message.ts index eeb5c732d2..e186124750 100644 --- a/packages/sds/src/message_channel/message.ts +++ b/packages/sds/src/message_channel/message.ts @@ -1,10 +1,13 @@ import { proto_sds_message } from "@waku/proto"; +import { Logger } from "@waku/utils"; export type MessageId = string; export type HistoryEntry = proto_sds_message.HistoryEntry; export type ChannelId = string; export type SenderId = string; +const log = new Logger("sds:message"); + export class Message implements proto_sds_message.SdsMessage { public constructor( public messageId: string, @@ -24,7 +27,9 @@ export class Message implements proto_sds_message.SdsMessage { return proto_sds_message.SdsMessage.encode(this); } - public static decode(data: Uint8Array): Message { + public static decode( + data: Uint8Array + ): undefined | ContentMessage | SyncMessage | EphemeralMessage { const { messageId, channelId, @@ -34,15 +39,48 @@ export class Message implements proto_sds_message.SdsMessage { bloomFilter, content } = proto_sds_message.SdsMessage.decode(data); - return new Message( - messageId, - channelId, - senderId, - causalHistory, + + if (testContentMessage({ lamportTimestamp, content })) { + return new ContentMessage( + messageId, + channelId, + senderId, + causalHistory, + lamportTimestamp!, + bloomFilter, + content! + ); + } + + if (testEphemeralMessage({ lamportTimestamp, content })) { + return new EphemeralMessage( + messageId, + channelId, + senderId, + causalHistory, + undefined, + bloomFilter, + content! + ); + } + + if (testSyncMessage({ lamportTimestamp, content })) { + return new SyncMessage( + messageId, + channelId, + senderId, + causalHistory, + lamportTimestamp!, + bloomFilter, + undefined + ); + } + log.error( + "message received was of unknown type", lamportTimestamp, - bloomFilter, content ); + return undefined; } } @@ -73,9 +111,10 @@ export class SyncMessage extends Message { } } -export function isSyncMessage( - message: Message | ContentMessage | SyncMessage | EphemeralMessage -): message is SyncMessage { +function testSyncMessage(message: { + lamportTimestamp?: number; + content?: Uint8Array; +}): boolean { return Boolean( "lamportTimestamp" in message && typeof message.lamportTimestamp === "number" && @@ -83,6 +122,12 @@ export function isSyncMessage( ); } +export function isSyncMessage( + message: Message | ContentMessage | SyncMessage | EphemeralMessage +): message is SyncMessage { + return testSyncMessage(message); +} + export class EphemeralMessage extends Message { public constructor( public messageId: string, @@ -116,6 +161,13 @@ export class EphemeralMessage extends Message { export function isEphemeralMessage( message: Message | ContentMessage | SyncMessage | EphemeralMessage ): message is EphemeralMessage { + return testEphemeralMessage(message); +} + +function testEphemeralMessage(message: { + lamportTimestamp?: number; + content?: Uint8Array; +}): boolean { return Boolean( message.lamportTimestamp === undefined && "content" in message && @@ -166,6 +218,13 @@ export class ContentMessage extends Message { export function isContentMessage( message: Message | ContentMessage ): message is ContentMessage { + return testContentMessage(message); +} + +function testContentMessage(message: { + lamportTimestamp?: number; + content?: Uint8Array; +}): message is { lamportTimestamp: number; content: Uint8Array } { return Boolean( "lamportTimestamp" in message && typeof message.lamportTimestamp === "number" && diff --git a/packages/sds/src/message_channel/message_channel.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts index 6132f469d9..1a4e357b6e 100644 --- a/packages/sds/src/message_channel/message_channel.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -40,7 +40,7 @@ const sendMessage = async ( payload: Uint8Array, callback: (message: ContentMessage) => Promise<{ success: boolean }> ): Promise => { - await channel.pushOutgoingMessage(payload, callback); + channel.pushOutgoingMessage(payload, callback); await channel.processTasks(); }; @@ -292,14 +292,12 @@ describe("MessageChannel", function () { ); const localHistory = channelA["localHistory"] as ILocalHistory; - console.log("localHistory", localHistory); expect(localHistory.length).to.equal(1); // Find the message in local history const historyEntry = localHistory.find( (entry) => entry.messageId === messageId ); - console.log("history entry", historyEntry); expect(historyEntry).to.exist; expect(historyEntry!.retrievalHint).to.deep.equal(testRetrievalHint); }); @@ -596,7 +594,6 @@ describe("MessageChannel", function () { it("First message is missed, then re-sent, should be ack'd", async () => { const firstMessage = utf8ToBytes("first message"); const firstMessageId = MessageChannel.getMessageId(firstMessage); - console.log("firstMessage", firstMessageId); let messageAcked = false; channelA.addEventListener( MessageChannelEvent.OutMessageAcknowledged, diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index a9cd980a71..a4492f8708 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -174,13 +174,13 @@ export class MessageChannel extends TypedEventEmitter { * * @throws Error if the payload is empty */ - public async pushOutgoingMessage( + public pushOutgoingMessage( payload: Uint8Array, callback?: (processedMessage: ContentMessage) => Promise<{ success: boolean; retrievalHint?: Uint8Array; }> - ): Promise { + ): void { if (!payload || !payload.length) { throw Error("Only messages with valid payloads are allowed"); } @@ -285,6 +285,7 @@ export class MessageChannel extends TypedEventEmitter { } log.info( this.senderId, + "message from incoming buffer", message.messageId, "is missing dependencies", missingDependencies.map(({ messageId, retrievalHint }) => { @@ -470,10 +471,15 @@ export class MessageChannel extends TypedEventEmitter { this.timeReceived.set(message.messageId, Date.now()); log.info( this.senderId, + "new incoming message", message.messageId, "is missing dependencies", missingDependencies.map((ch) => ch.messageId) ); + + this.safeSendEvent(MessageChannelEvent.InMessageMissing, { + detail: Array.from(missingDependencies) + }); } else { if (isContentMessage(message) && this.deliverMessage(message)) { this.safeSendEvent(MessageChannelEvent.InMessageDelivered, { diff --git a/packages/utils/src/common/index.ts b/packages/utils/src/common/index.ts index 26573fe6ff..3197d5199b 100644 --- a/packages/utils/src/common/index.ts +++ b/packages/utils/src/common/index.ts @@ -7,3 +7,4 @@ export * from "./sharding/index.js"; export * from "./push_or_init_map.js"; export * from "./relay_shard_codec.js"; export * from "./delay.js"; +export * from "./mock_node.js"; diff --git a/packages/utils/src/common/mock_node.ts b/packages/utils/src/common/mock_node.ts new file mode 100644 index 0000000000..40472fea17 --- /dev/null +++ b/packages/utils/src/common/mock_node.ts @@ -0,0 +1,166 @@ +import { Peer, PeerId, Stream, TypedEventEmitter } from "@libp2p/interface"; +import { MultiaddrInput } from "@multiformats/multiaddr"; +import { + Callback, + CreateDecoderParams, + CreateEncoderParams, + HealthStatus, + IDecodedMessage, + IDecoder, + IEncoder, + IFilter, + ILightPush, + type IMessage, + IRelay, + ISendOptions, + IStore, + IWaku, + IWakuEventEmitter, + Libp2p, + LightPushSDKResult, + Protocols +} from "@waku/interfaces"; + +export type MockWakuEvents = { + ["new-message"]: CustomEvent; +}; + +export class MockWakuNode implements IWaku { + public relay?: IRelay; + public store?: IStore; + public filter?: IFilter; + public lightPush?: ILightPush; + public protocols: string[]; + + private readonly subscriptions: { + decoders: IDecoder[]; + callback: Callback; + }[]; + + public constructor( + private mockMessageEmitter?: TypedEventEmitter + ) { + this.protocols = []; + this.events = new TypedEventEmitter(); + this.subscriptions = []; + + this.lightPush = { + multicodec: [], + send: this._send.bind(this), + start(): void {}, + stop(): void {} + }; + + this.filter = { + start: async () => {}, + stop: async () => {}, + multicodec: "filter", + subscribe: this._subscribe.bind(this), + unsubscribe( + _decoders: IDecoder | IDecoder[] + ): Promise { + throw "Not implemented"; + }, + unsubscribeAll(): void { + throw "Not implemented"; + } + }; + } + + public get libp2p(): Libp2p { + throw "No libp2p on MockWakuNode"; + } + + private async _send( + encoder: IEncoder, + message: IMessage, + _sendOptions?: ISendOptions + ): Promise { + for (const { decoders, callback } of this.subscriptions) { + const protoMessage = await encoder.toProtoObj(message); + if (!protoMessage) throw "Issue in mock encoding message"; + for (const decoder of decoders) { + const decodedMessage = await decoder.fromProtoObj( + decoder.pubsubTopic, + protoMessage + ); + if (!decodedMessage) throw "Issue in mock decoding message"; + await callback(decodedMessage); + if (this.mockMessageEmitter) { + this.mockMessageEmitter.dispatchEvent( + new CustomEvent("new-message", { + detail: decodedMessage + }) + ); + } + } + } + return { + failures: [], + successes: [] + }; + } + + private async _subscribe( + decoders: IDecoder | IDecoder[], + callback: Callback + ): Promise { + this.subscriptions.push({ + decoders: Array.isArray(decoders) ? decoders : [decoders], + callback + }); + if (this.mockMessageEmitter) { + this.mockMessageEmitter.addEventListener("new-message", (event) => { + void callback(event.detail as unknown as T); + }); + } + return Promise.resolve(true); + } + + public events: IWakuEventEmitter; + + public get peerId(): PeerId { + throw "no peerId on MockWakuNode"; + } + public get health(): HealthStatus { + throw "no health on MockWakuNode"; + } + public dial( + _peer: PeerId | MultiaddrInput, + _protocols?: Protocols[] + ): Promise { + throw new Error("Method not implemented."); + } + public hangUp(_peer: PeerId | MultiaddrInput): Promise { + throw new Error("Method not implemented."); + } + public start(): Promise { + return Promise.resolve(); + } + public stop(): Promise { + throw new Error("Method not implemented."); + } + public waitForPeers( + _protocols?: Protocols[], + _timeoutMs?: number + ): Promise { + throw new Error("Method not implemented."); + } + public createDecoder( + _params: CreateDecoderParams + ): IDecoder { + throw new Error("Method not implemented."); + } + public createEncoder(_params: CreateEncoderParams): IEncoder { + throw new Error("Method not implemented."); + } + public isStarted(): boolean { + throw new Error("Method not implemented."); + } + public isConnected(): boolean { + throw new Error("Method not implemented."); + } + public getConnectedPeers(): Promise { + throw new Error("Method not implemented."); + } +}