diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 8021ac0a91..c6936f5650 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -25,4 +25,8 @@ export { StreamManager } from "./lib/stream_manager/index.js"; export { MetadataCodec, wakuMetadata } from "./lib/metadata/index.js"; -export { messageHash, messageHashStr } from "./lib/message_hash/index.js"; +export { + messageHash, + messageHashStr, + deterministicMessageHashing +} from "./lib/message_hash/index.js"; diff --git a/packages/core/src/lib/message_hash/index.ts b/packages/core/src/lib/message_hash/index.ts index e641c301f3..ab889ebe6f 100644 --- a/packages/core/src/lib/message_hash/index.ts +++ b/packages/core/src/lib/message_hash/index.ts @@ -1 +1,5 @@ -export { messageHash, messageHashStr } from "./message_hash.js"; +export { + messageHash, + messageHashStr, + deterministicMessageHashing +} from "./message_hash.js"; diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts index 363c6ccdad..16e3f55f01 100644 --- a/packages/interfaces/src/waku.ts +++ b/packages/interfaces/src/waku.ts @@ -14,7 +14,7 @@ import { IDecodedMessage, IDecoder, IEncoder } from "./message.js"; import { ContentTopic } from "./misc.js"; import type { Protocols } from "./protocols.js"; import type { IRelay } from "./relay.js"; -import type { ShardId } from "./sharding.js"; +import type { IRoutingInfo, ShardId } from "./sharding.js"; import type { IStore } from "./store.js"; export type CreateDecoderParams = { @@ -68,7 +68,10 @@ export interface IWakuEvents { } export interface IMessageEmitterEvents { - [contentTopic: string]: CustomEvent; + [contentTopic: string]: CustomEvent<{ + payload: Uint8Array; + messageHash: Uint8Array; + }>; } export type IWakuEventEmitter = TypedEventEmitter; @@ -253,6 +256,8 @@ export interface IWaku { */ createDecoder(params: CreateDecoderParams): IDecoder; + createRoutingInfo(contentTopic?: string, shardId?: number): IRoutingInfo; + /** * Creates an encoder for Waku messages on a specific content topic. * diff --git a/packages/sdk/src/reliable_channel/reliable_channel.ts b/packages/sdk/src/reliable_channel/reliable_channel.ts index 78ac0279d5..783220f17e 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel.ts @@ -1,15 +1,10 @@ import { TypedEventEmitter } from "@libp2p/interface"; -import { messageHash } from "@waku/core"; import { type ContentTopic, type IDecodedMessage, type IDecoder, - type IEncoder, - type IMessage, - ISendOptions, type IWaku, LightPushError, - LightPushSDKResult, QueryRequestParams } from "@waku/interfaces"; import { @@ -25,13 +20,7 @@ import { import { Logger } from "@waku/utils"; import { bytesToHex } from "@waku/utils/bytes"; -import { - QueryOnConnect, - QueryOnConnectEvent -} from "../query_on_connect/index.js"; - import { ReliableChannelEvent, ReliableChannelEvents } from "./events.js"; -import { MissingMessageRetriever } from "./missing_message_retriever.js"; import { RetryManager } from "./retry_manager.js"; const log = new Logger("sdk:reliable-channel"); @@ -114,6 +103,16 @@ export type ReliableChannelOptions = MessageChannelOptions & { processTaskMinElapseMs?: number; }; +/** + * It is best for SDS (e2e reliability) to happen within the encryption layer. + * Hence, the consumer need to pass encryption and decryption methods for + * outgoing and incoming messages. + */ +export interface IEncryption { + encrypt: (clearPayload: Uint8Array) => Uint8Array | Promise; + decrypt: (encryptedPayload: Uint8Array) => Uint8Array | Promise; +} + /** * An easy-to-use reliable channel that ensures all participants to the channel have eventual message consistency. * @@ -123,14 +122,14 @@ export type ReliableChannelOptions = MessageChannelOptions & { * @emits [[ReliableChannelEvents]] * */ -export class ReliableChannel< - T extends IDecodedMessage -> extends TypedEventEmitter { +export class ReliableChannel extends TypedEventEmitter { + // TODO: this is PoC, we assume that message id is returned, and `undefined` means some error. + // Borrowed from https://github.com/waku-org/js-waku/pull/2583/ for now private readonly _send: ( - encoder: IEncoder, - message: IMessage, - sendOptions?: ISendOptions - ) => Promise; + contentTopic: string, + payload: Uint8Array, + ephemeral?: boolean + ) => Promise; private readonly _subscribe: (contentTopics: ContentTopic[]) => void; @@ -145,30 +144,37 @@ export class ReliableChannel< private readonly sweepInBufIntervalMs: number; private processTaskTimeout: ReturnType | undefined; private readonly retryManager: RetryManager | undefined; - private readonly missingMessageRetriever?: MissingMessageRetriever; - private readonly queryOnConnect?: QueryOnConnect; + // private readonly missingMessageRetriever?: MissingMessageRetriever; + // private readonly queryOnConnect?: QueryOnConnect; private readonly processTaskMinElapseMs: number; private _started: boolean; + private encryption: IEncryption; private constructor( public node: IWaku, public messageChannel: MessageChannel, - private encoder: IEncoder, - // TODO: remove once _retrieve is aligned - private decoder: IDecoder, + private contentTopic: ContentTopic, + encryption?: IEncryption, options?: ReliableChannelOptions ) { super(); if (node.lightPush) { - this._send = node.lightPush.send.bind(node.lightPush); + // TODO: this is just a PoC + // this._send = node.lightPush.send.bind(node.lightPush); } else if (node.relay) { - this._send = node.relay.send.bind(node.relay); + // this._send = node.relay.send.bind(node.relay); } else { throw "No protocol available to send messages"; } this._subscribe = node.subscribe.bind(node); + // If no encryption, just set a pass through without changing the payload to keep the code simpler + this.encryption = encryption ?? { + encrypt: (p: Uint8Array) => p, + decrypt: (p: Uint8Array) => p + }; + if (node.store) { this._retrieve = node.store.queryGenerator.bind(node.store); const peerManagerEvents = (node as any)?.peerManager?.events; @@ -176,13 +182,14 @@ export class ReliableChannel< peerManagerEvents !== undefined && (options?.queryOnConnect ?? true) ) { - this.queryOnConnect = new QueryOnConnect( - [this.decoder], - this.isChannelMessageWithCausalHistory.bind(this), - peerManagerEvents, - node.events, - this._retrieve.bind(this) - ); + // this.queryOnConnect = new QueryOnConnect( + // [this.decoder], + // this.isChannelMessageWithCausalHistory.bind(this), + // peerManagerEvents, + // node.events, + // this._retrieve.bind(this) + // ); + // TODO: stop using decoder for store } } @@ -206,14 +213,15 @@ export class ReliableChannel< options?.processTaskMinElapseMs ?? DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS; if (this._retrieve) { - this.missingMessageRetriever = new MissingMessageRetriever( - this.decoder, - options?.retrieveFrequencyMs, - this._retrieve, - async (msg: T) => { - await this.processIncomingMessage(msg.payload); - } - ); + // this.missingMessageRetriever = new MissingMessageRetriever( + // this.decoder, + // options?.retrieveFrequencyMs, + // this._retrieve, + // async (msg: T) => { + // await this.processIncomingMessage(msg.payload); + // } + // ); + // TODO: stop using decoder for store } this._started = false; @@ -252,20 +260,20 @@ export class ReliableChannel< * @param decoder A channel operates within a singular encryption layer, hence the same decoder is needed for all messages * @param options */ - public static async create( + public static async create( node: IWaku, channelId: ChannelId, senderId: SenderId, - encoder: IEncoder, - decoder: IDecoder, + contentTopic: ContentTopic, + encryption?: IEncryption, options?: ReliableChannelOptions - ): Promise> { + ): Promise { const sdsMessageChannel = new MessageChannel(channelId, senderId, options); const messageChannel = new ReliableChannel( node, sdsMessageChannel, - encoder, - decoder, + contentTopic, + encryption, options ); @@ -308,48 +316,32 @@ export class ReliableChannel< // `payload` wrapped in SDS const sdsPayload = sdsMessage.encode(); - - const wakuMessage = { - payload: sdsPayload - }; + const encPayload = await this.encryption.encrypt(sdsPayload); const messageId = ReliableChannel.getMessageId(messagePayload); - const retrievalHint = await computeRetrievalHint( - messagePayload, - this.encoder - ); - if (!retrievalHint) { - this.safeSendEvent("sending-message-irrecoverable-error", { - detail: { - messageId: messageId, - error: "could not encode message" - } - }); - return { success: false }; - } - this.safeSendEvent("sending-message", { detail: messageId }); - const sendRes = await this._send(this.encoder, wakuMessage); + const retrievalHint = await this._send(this.contentTopic, encPayload); // If it's a recoverable failure, we will try again to send later // If not, then we should error to the user now - for (const { error } of sendRes.failures) { - if (IRRECOVERABLE_SENDING_ERRORS.includes(error)) { - // Not recoverable, best to return it - log.error("Irrecoverable error, cannot send message: ", error); - this.safeSendEvent("sending-message-irrecoverable-error", { - detail: { - messageId, - error - } - }); - return { success: false, retrievalHint }; - } - } + // for (const { error } of sendRes.failures) { + // if (IRRECOVERABLE_SENDING_ERRORS.includes(error)) { + // // Not recoverable, best to return it + // log.error("Irrecoverable error, cannot send message: ", error); + // this.safeSendEvent("sending-message-irrecoverable-error", { + // detail: { + // messageId, + // error + // } + // }); + // return { success: false, retrievalHint }; + // } + // } + // TODO: if failure, process it return { success: true, @@ -371,22 +363,30 @@ export class ReliableChannel< private subscribe(): void { this.assertStarted(); - this.node.messageEmitter.addEventListener( - this.decoder.contentTopic, - (event) => void this.processIncomingMessage(event.detail) - ); + this.node.messageEmitter.addEventListener(this.contentTopic, (event) => { + const { payload, messageHash } = event.detail; + // messageHash is the retrievalHint + void this.processIncomingMessage(payload, messageHash); + }); - this._subscribe([this.decoder.contentTopic]); + this._subscribe([this.contentTopic]); } /** * Don't forget to call `this.messageChannel.sweepIncomingBuffer();` once done. - * @param msg * @private + * @param payload */ - private async processIncomingMessage(payload: Uint8Array): Promise { - // New message arrives, we need to unwrap it first - const sdsMessage = SdsMessage.decode(payload); + private async processIncomingMessage( + payload: Uint8Array, + retrievalHint: Uint8Array + ): Promise { + // Decrypt first + // TODO: skip on failure + const decPayload = await this.encryption.decrypt(payload); + + // Unwrap SDS layer + const sdsMessage = SdsMessage.decode(decPayload); if (!sdsMessage) { log.error("could not SDS decode message"); @@ -401,11 +401,6 @@ export class ReliableChannel< return; } - const retrievalHint = await computeRetrievalHint(payload, this.encoder); - if (!retrievalHint) { - log.error("could not compute retrieval hint"); - return; - } log.info( `processing message ${sdsMessage.messageId}:${bytesToHex(retrievalHint)}` ); @@ -413,7 +408,8 @@ export class ReliableChannel< // missing messages or the status of previous outgoing messages this.messageChannel.pushIncomingMessage(sdsMessage, retrievalHint); - this.missingMessageRetriever?.removeMissingMessage(sdsMessage.messageId); + // TODO + // this.missingMessageRetriever?.removeMissingMessage(sdsMessage.messageId); if (sdsMessage.content && sdsMessage.content.length > 0) { // Now, process the message with callback @@ -425,13 +421,13 @@ export class ReliableChannel< this.queueProcessTasks(); } - private async processIncomingMessages( - messages: T[] - ): Promise { - for (const message of messages) { - await this.processIncomingMessage(message.payload); - } - } + // private async processIncomingMessages( + // messages: T[] + // ): Promise { + // for (const message of messages) { + // await this.processIncomingMessage(message.payload); + // } + // } // TODO: For now we only queue process tasks for incoming messages // As this is where there is most volume @@ -456,10 +452,10 @@ export class ReliableChannel< this.setupEventListeners(); this.restartSync(); this.startSweepIncomingBufferLoop(); - if (this._retrieve) { - this.missingMessageRetriever?.start(); - this.queryOnConnect?.start(); - } + // if (this._retrieve) { + // this.missingMessageRetriever?.start(); + // this.queryOnConnect?.start(); + // } this.subscribe(); } @@ -468,8 +464,8 @@ export class ReliableChannel< this._started = false; this.stopSync(); this.stopSweepIncomingBufferLoop(); - this.missingMessageRetriever?.stop(); - this.queryOnConnect?.stop(); + // this.missingMessageRetriever?.stop(); + // this.queryOnConnect?.stop(); // TODO unsubscribe // TODO unsetMessageListeners } @@ -643,40 +639,27 @@ export class ReliableChannel< } ); - this.messageChannel.addEventListener( - MessageChannelEvent.InMessageMissing, - (event) => { - for (const { messageId, retrievalHint } of event.detail) { - if (retrievalHint && this.missingMessageRetriever) { - this.missingMessageRetriever.addMissingMessage( - messageId, - retrievalHint - ); - } - } - } - ); + // this.messageChannel.addEventListener( + // MessageChannelEvent.InMessageMissing, + // (event) => { + // for (const { messageId, retrievalHint } of event.detail) { + // if (retrievalHint && this.missingMessageRetriever) { + // this.missingMessageRetriever.addMissingMessage( + // messageId, + // retrievalHint + // ); + // } + // } + // } + // ); - if (this.queryOnConnect) { - this.queryOnConnect.addEventListener( - QueryOnConnectEvent.MessagesRetrieved, - (event) => { - void this.processIncomingMessages(event.detail); - } - ); - } + // if (this.queryOnConnect) { + // this.queryOnConnect.addEventListener( + // QueryOnConnectEvent.MessagesRetrieved, + // (event) => { + // void this.processIncomingMessages(event.detail); + // } + // ); + // } } } - -async function computeRetrievalHint( - payload: Uint8Array, - encoder: IEncoder -): Promise { - // TODO: should the encoder give me the message hash? - // Encoding now to fail early, used later to get message hash - const protoMessage = await encoder.toProtoObj({ payload }); - if (!protoMessage) { - return undefined; - } - return messageHash(encoder.pubsubTopic, protoMessage); -} diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index a757a0f02c..412cfa30a5 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -326,7 +326,7 @@ export class WakuNode implements IWaku { }); } - private createRoutingInfo( + public createRoutingInfo( contentTopic?: string, shardId?: number ): IRoutingInfo {