Upgrade API with encryption/decryption

This commit is contained in:
fryorcraken 2025-10-17 15:49:39 +11:00
parent 4e6983f6df
commit 2b07864a25
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
5 changed files with 141 additions and 145 deletions

View File

@ -25,4 +25,8 @@ export { StreamManager } from "./lib/stream_manager/index.js";
export { MetadataCodec, wakuMetadata } from "./lib/metadata/index.js"; export { MetadataCodec, wakuMetadata } from "./lib/metadata/index.js";
export { messageHash, messageHashStr } from "./lib/message_hash/index.js"; export {
messageHash,
messageHashStr,
deterministicMessageHashing
} from "./lib/message_hash/index.js";

View File

@ -1 +1,5 @@
export { messageHash, messageHashStr } from "./message_hash.js"; export {
messageHash,
messageHashStr,
deterministicMessageHashing
} from "./message_hash.js";

View File

@ -14,7 +14,7 @@ import { IDecodedMessage, IDecoder, IEncoder } from "./message.js";
import { ContentTopic } from "./misc.js"; import { ContentTopic } from "./misc.js";
import type { Protocols } from "./protocols.js"; import type { Protocols } from "./protocols.js";
import type { IRelay } from "./relay.js"; import type { IRelay } from "./relay.js";
import type { ShardId } from "./sharding.js"; import type { IRoutingInfo, ShardId } from "./sharding.js";
import type { IStore } from "./store.js"; import type { IStore } from "./store.js";
export type CreateDecoderParams = { export type CreateDecoderParams = {
@ -68,7 +68,10 @@ export interface IWakuEvents {
} }
export interface IMessageEmitterEvents { export interface IMessageEmitterEvents {
[contentTopic: string]: CustomEvent<Uint8Array>; [contentTopic: string]: CustomEvent<{
payload: Uint8Array;
messageHash: Uint8Array;
}>;
} }
export type IWakuEventEmitter = TypedEventEmitter<IWakuEvents>; export type IWakuEventEmitter = TypedEventEmitter<IWakuEvents>;
@ -253,6 +256,8 @@ export interface IWaku {
*/ */
createDecoder(params: CreateDecoderParams): IDecoder<IDecodedMessage>; createDecoder(params: CreateDecoderParams): IDecoder<IDecodedMessage>;
createRoutingInfo(contentTopic?: string, shardId?: number): IRoutingInfo;
/** /**
* Creates an encoder for Waku messages on a specific content topic. * Creates an encoder for Waku messages on a specific content topic.
* *

View File

@ -1,15 +1,10 @@
import { TypedEventEmitter } from "@libp2p/interface"; import { TypedEventEmitter } from "@libp2p/interface";
import { messageHash } from "@waku/core";
import { import {
type ContentTopic, type ContentTopic,
type IDecodedMessage, type IDecodedMessage,
type IDecoder, type IDecoder,
type IEncoder,
type IMessage,
ISendOptions,
type IWaku, type IWaku,
LightPushError, LightPushError,
LightPushSDKResult,
QueryRequestParams QueryRequestParams
} from "@waku/interfaces"; } from "@waku/interfaces";
import { import {
@ -25,13 +20,7 @@ import {
import { Logger } from "@waku/utils"; import { Logger } from "@waku/utils";
import { bytesToHex } from "@waku/utils/bytes"; import { bytesToHex } from "@waku/utils/bytes";
import {
QueryOnConnect,
QueryOnConnectEvent
} from "../query_on_connect/index.js";
import { ReliableChannelEvent, ReliableChannelEvents } from "./events.js"; import { ReliableChannelEvent, ReliableChannelEvents } from "./events.js";
import { MissingMessageRetriever } from "./missing_message_retriever.js";
import { RetryManager } from "./retry_manager.js"; import { RetryManager } from "./retry_manager.js";
const log = new Logger("sdk:reliable-channel"); const log = new Logger("sdk:reliable-channel");
@ -114,6 +103,16 @@ export type ReliableChannelOptions = MessageChannelOptions & {
processTaskMinElapseMs?: number; processTaskMinElapseMs?: number;
}; };
/**
* It is best for SDS (e2e reliability) to happen within the encryption layer.
* Hence, the consumer need to pass encryption and decryption methods for
* outgoing and incoming messages.
*/
export interface IEncryption {
encrypt: (clearPayload: Uint8Array) => Uint8Array | Promise<Uint8Array>;
decrypt: (encryptedPayload: Uint8Array) => Uint8Array | Promise<Uint8Array>;
}
/** /**
* An easy-to-use reliable channel that ensures all participants to the channel have eventual message consistency. * An easy-to-use reliable channel that ensures all participants to the channel have eventual message consistency.
* *
@ -123,14 +122,14 @@ export type ReliableChannelOptions = MessageChannelOptions & {
* @emits [[ReliableChannelEvents]] * @emits [[ReliableChannelEvents]]
* *
*/ */
export class ReliableChannel< export class ReliableChannel extends TypedEventEmitter<ReliableChannelEvents> {
T extends IDecodedMessage // TODO: this is PoC, we assume that message id is returned, and `undefined` means some error.
> extends TypedEventEmitter<ReliableChannelEvents> { // Borrowed from https://github.com/waku-org/js-waku/pull/2583/ for now
private readonly _send: ( private readonly _send: (
encoder: IEncoder, contentTopic: string,
message: IMessage, payload: Uint8Array,
sendOptions?: ISendOptions ephemeral?: boolean
) => Promise<LightPushSDKResult>; ) => Promise<Uint8Array | undefined>;
private readonly _subscribe: (contentTopics: ContentTopic[]) => void; private readonly _subscribe: (contentTopics: ContentTopic[]) => void;
@ -145,30 +144,37 @@ export class ReliableChannel<
private readonly sweepInBufIntervalMs: number; private readonly sweepInBufIntervalMs: number;
private processTaskTimeout: ReturnType<typeof setTimeout> | undefined; private processTaskTimeout: ReturnType<typeof setTimeout> | undefined;
private readonly retryManager: RetryManager | undefined; private readonly retryManager: RetryManager | undefined;
private readonly missingMessageRetriever?: MissingMessageRetriever<T>; // private readonly missingMessageRetriever?: MissingMessageRetriever;
private readonly queryOnConnect?: QueryOnConnect<T>; // private readonly queryOnConnect?: QueryOnConnect<T>;
private readonly processTaskMinElapseMs: number; private readonly processTaskMinElapseMs: number;
private _started: boolean; private _started: boolean;
private encryption: IEncryption;
private constructor( private constructor(
public node: IWaku, public node: IWaku,
public messageChannel: MessageChannel, public messageChannel: MessageChannel,
private encoder: IEncoder, private contentTopic: ContentTopic,
// TODO: remove once _retrieve is aligned encryption?: IEncryption,
private decoder: IDecoder<T>,
options?: ReliableChannelOptions options?: ReliableChannelOptions
) { ) {
super(); super();
if (node.lightPush) { if (node.lightPush) {
this._send = node.lightPush.send.bind(node.lightPush); // TODO: this is just a PoC
// this._send = node.lightPush.send.bind(node.lightPush);
} else if (node.relay) { } else if (node.relay) {
this._send = node.relay.send.bind(node.relay); // this._send = node.relay.send.bind(node.relay);
} else { } else {
throw "No protocol available to send messages"; throw "No protocol available to send messages";
} }
this._subscribe = node.subscribe.bind(node); this._subscribe = node.subscribe.bind(node);
// If no encryption, just set a pass through without changing the payload to keep the code simpler
this.encryption = encryption ?? {
encrypt: (p: Uint8Array) => p,
decrypt: (p: Uint8Array) => p
};
if (node.store) { if (node.store) {
this._retrieve = node.store.queryGenerator.bind(node.store); this._retrieve = node.store.queryGenerator.bind(node.store);
const peerManagerEvents = (node as any)?.peerManager?.events; const peerManagerEvents = (node as any)?.peerManager?.events;
@ -176,13 +182,14 @@ export class ReliableChannel<
peerManagerEvents !== undefined && peerManagerEvents !== undefined &&
(options?.queryOnConnect ?? true) (options?.queryOnConnect ?? true)
) { ) {
this.queryOnConnect = new QueryOnConnect( // this.queryOnConnect = new QueryOnConnect(
[this.decoder], // [this.decoder],
this.isChannelMessageWithCausalHistory.bind(this), // this.isChannelMessageWithCausalHistory.bind(this),
peerManagerEvents, // peerManagerEvents,
node.events, // node.events,
this._retrieve.bind(this) // this._retrieve.bind(this)
); // );
// TODO: stop using decoder for store
} }
} }
@ -206,14 +213,15 @@ export class ReliableChannel<
options?.processTaskMinElapseMs ?? DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS; options?.processTaskMinElapseMs ?? DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS;
if (this._retrieve) { if (this._retrieve) {
this.missingMessageRetriever = new MissingMessageRetriever( // this.missingMessageRetriever = new MissingMessageRetriever(
this.decoder, // this.decoder,
options?.retrieveFrequencyMs, // options?.retrieveFrequencyMs,
this._retrieve, // this._retrieve,
async (msg: T) => { // async (msg: T) => {
await this.processIncomingMessage(msg.payload); // await this.processIncomingMessage(msg.payload);
} // }
); // );
// TODO: stop using decoder for store
} }
this._started = false; this._started = false;
@ -252,20 +260,20 @@ export class ReliableChannel<
* @param decoder A channel operates within a singular encryption layer, hence the same decoder is needed for all messages * @param decoder A channel operates within a singular encryption layer, hence the same decoder is needed for all messages
* @param options * @param options
*/ */
public static async create<T extends IDecodedMessage>( public static async create(
node: IWaku, node: IWaku,
channelId: ChannelId, channelId: ChannelId,
senderId: SenderId, senderId: SenderId,
encoder: IEncoder, contentTopic: ContentTopic,
decoder: IDecoder<T>, encryption?: IEncryption,
options?: ReliableChannelOptions options?: ReliableChannelOptions
): Promise<ReliableChannel<T>> { ): Promise<ReliableChannel> {
const sdsMessageChannel = new MessageChannel(channelId, senderId, options); const sdsMessageChannel = new MessageChannel(channelId, senderId, options);
const messageChannel = new ReliableChannel( const messageChannel = new ReliableChannel(
node, node,
sdsMessageChannel, sdsMessageChannel,
encoder, contentTopic,
decoder, encryption,
options options
); );
@ -308,48 +316,32 @@ export class ReliableChannel<
// `payload` wrapped in SDS // `payload` wrapped in SDS
const sdsPayload = sdsMessage.encode(); const sdsPayload = sdsMessage.encode();
const encPayload = await this.encryption.encrypt(sdsPayload);
const wakuMessage = {
payload: sdsPayload
};
const messageId = ReliableChannel.getMessageId(messagePayload); const messageId = ReliableChannel.getMessageId(messagePayload);
const retrievalHint = await computeRetrievalHint(
messagePayload,
this.encoder
);
if (!retrievalHint) {
this.safeSendEvent("sending-message-irrecoverable-error", {
detail: {
messageId: messageId,
error: "could not encode message"
}
});
return { success: false };
}
this.safeSendEvent("sending-message", { this.safeSendEvent("sending-message", {
detail: messageId detail: messageId
}); });
const sendRes = await this._send(this.encoder, wakuMessage); const retrievalHint = await this._send(this.contentTopic, encPayload);
// If it's a recoverable failure, we will try again to send later // If it's a recoverable failure, we will try again to send later
// If not, then we should error to the user now // If not, then we should error to the user now
for (const { error } of sendRes.failures) { // for (const { error } of sendRes.failures) {
if (IRRECOVERABLE_SENDING_ERRORS.includes(error)) { // if (IRRECOVERABLE_SENDING_ERRORS.includes(error)) {
// Not recoverable, best to return it // // Not recoverable, best to return it
log.error("Irrecoverable error, cannot send message: ", error); // log.error("Irrecoverable error, cannot send message: ", error);
this.safeSendEvent("sending-message-irrecoverable-error", { // this.safeSendEvent("sending-message-irrecoverable-error", {
detail: { // detail: {
messageId, // messageId,
error // error
} // }
}); // });
return { success: false, retrievalHint }; // return { success: false, retrievalHint };
} // }
} // }
// TODO: if failure, process it
return { return {
success: true, success: true,
@ -371,22 +363,30 @@ export class ReliableChannel<
private subscribe(): void { private subscribe(): void {
this.assertStarted(); this.assertStarted();
this.node.messageEmitter.addEventListener( this.node.messageEmitter.addEventListener(this.contentTopic, (event) => {
this.decoder.contentTopic, const { payload, messageHash } = event.detail;
(event) => void this.processIncomingMessage(event.detail) // messageHash is the retrievalHint
); void this.processIncomingMessage(payload, messageHash);
});
this._subscribe([this.decoder.contentTopic]); this._subscribe([this.contentTopic]);
} }
/** /**
* Don't forget to call `this.messageChannel.sweepIncomingBuffer();` once done. * Don't forget to call `this.messageChannel.sweepIncomingBuffer();` once done.
* @param msg
* @private * @private
* @param payload
*/ */
private async processIncomingMessage(payload: Uint8Array): Promise<void> { private async processIncomingMessage(
// New message arrives, we need to unwrap it first payload: Uint8Array,
const sdsMessage = SdsMessage.decode(payload); retrievalHint: Uint8Array
): Promise<void> {
// Decrypt first
// TODO: skip on failure
const decPayload = await this.encryption.decrypt(payload);
// Unwrap SDS layer
const sdsMessage = SdsMessage.decode(decPayload);
if (!sdsMessage) { if (!sdsMessage) {
log.error("could not SDS decode message"); log.error("could not SDS decode message");
@ -401,11 +401,6 @@ export class ReliableChannel<
return; return;
} }
const retrievalHint = await computeRetrievalHint(payload, this.encoder);
if (!retrievalHint) {
log.error("could not compute retrieval hint");
return;
}
log.info( log.info(
`processing message ${sdsMessage.messageId}:${bytesToHex(retrievalHint)}` `processing message ${sdsMessage.messageId}:${bytesToHex(retrievalHint)}`
); );
@ -413,7 +408,8 @@ export class ReliableChannel<
// missing messages or the status of previous outgoing messages // missing messages or the status of previous outgoing messages
this.messageChannel.pushIncomingMessage(sdsMessage, retrievalHint); this.messageChannel.pushIncomingMessage(sdsMessage, retrievalHint);
this.missingMessageRetriever?.removeMissingMessage(sdsMessage.messageId); // TODO
// this.missingMessageRetriever?.removeMissingMessage(sdsMessage.messageId);
if (sdsMessage.content && sdsMessage.content.length > 0) { if (sdsMessage.content && sdsMessage.content.length > 0) {
// Now, process the message with callback // Now, process the message with callback
@ -425,13 +421,13 @@ export class ReliableChannel<
this.queueProcessTasks(); this.queueProcessTasks();
} }
private async processIncomingMessages<T extends IDecodedMessage>( // private async processIncomingMessages<T extends IDecodedMessage>(
messages: T[] // messages: T[]
): Promise<void> { // ): Promise<void> {
for (const message of messages) { // for (const message of messages) {
await this.processIncomingMessage(message.payload); // await this.processIncomingMessage(message.payload);
} // }
} // }
// TODO: For now we only queue process tasks for incoming messages // TODO: For now we only queue process tasks for incoming messages
// As this is where there is most volume // As this is where there is most volume
@ -456,10 +452,10 @@ export class ReliableChannel<
this.setupEventListeners(); this.setupEventListeners();
this.restartSync(); this.restartSync();
this.startSweepIncomingBufferLoop(); this.startSweepIncomingBufferLoop();
if (this._retrieve) { // if (this._retrieve) {
this.missingMessageRetriever?.start(); // this.missingMessageRetriever?.start();
this.queryOnConnect?.start(); // this.queryOnConnect?.start();
} // }
this.subscribe(); this.subscribe();
} }
@ -468,8 +464,8 @@ export class ReliableChannel<
this._started = false; this._started = false;
this.stopSync(); this.stopSync();
this.stopSweepIncomingBufferLoop(); this.stopSweepIncomingBufferLoop();
this.missingMessageRetriever?.stop(); // this.missingMessageRetriever?.stop();
this.queryOnConnect?.stop(); // this.queryOnConnect?.stop();
// TODO unsubscribe // TODO unsubscribe
// TODO unsetMessageListeners // TODO unsetMessageListeners
} }
@ -643,40 +639,27 @@ export class ReliableChannel<
} }
); );
this.messageChannel.addEventListener( // this.messageChannel.addEventListener(
MessageChannelEvent.InMessageMissing, // MessageChannelEvent.InMessageMissing,
(event) => { // (event) => {
for (const { messageId, retrievalHint } of event.detail) { // for (const { messageId, retrievalHint } of event.detail) {
if (retrievalHint && this.missingMessageRetriever) { // if (retrievalHint && this.missingMessageRetriever) {
this.missingMessageRetriever.addMissingMessage( // this.missingMessageRetriever.addMissingMessage(
messageId, // messageId,
retrievalHint // retrievalHint
); // );
} // }
} // }
} // }
); // );
if (this.queryOnConnect) { // if (this.queryOnConnect) {
this.queryOnConnect.addEventListener( // this.queryOnConnect.addEventListener(
QueryOnConnectEvent.MessagesRetrieved, // QueryOnConnectEvent.MessagesRetrieved,
(event) => { // (event) => {
void this.processIncomingMessages(event.detail); // void this.processIncomingMessages(event.detail);
} // }
); // );
// }
} }
} }
}
async function computeRetrievalHint(
payload: Uint8Array,
encoder: IEncoder
): Promise<Uint8Array | undefined> {
// TODO: should the encoder give me the message hash?
// Encoding now to fail early, used later to get message hash
const protoMessage = await encoder.toProtoObj({ payload });
if (!protoMessage) {
return undefined;
}
return messageHash(encoder.pubsubTopic, protoMessage);
}

View File

@ -326,7 +326,7 @@ export class WakuNode implements IWaku {
}); });
} }
private createRoutingInfo( public createRoutingInfo(
contentTopic?: string, contentTopic?: string,
shardId?: number shardId?: number
): IRoutingInfo { ): IRoutingInfo {