From 8444bc940fd26b52a0b1662f6923b49a22f9325d Mon Sep 17 00:00:00 2001 From: Arseniy Klempner Date: Wed, 11 Jun 2025 17:34:51 -0700 Subject: [PATCH] fix: reorder methods by visibility --- .../src/message_channel/message_channel.ts | 302 +++++++++--------- 1 file changed, 158 insertions(+), 144 deletions(-) diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index 37d5995391..12ae632283 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -31,13 +31,13 @@ interface MessageChannelOptions { } export class MessageChannel extends TypedEventEmitter { + public readonly channelId: ChannelId; private lamportTimestamp: number; private filter: DefaultBloomFilter; private outgoingBuffer: Message[]; private acknowledgements: Map; private incomingBuffer: Message[]; private localHistory: { timestamp: number; historyEntry: HistoryEntry }[]; - public readonly channelId: ChannelId; private causalHistorySize: number; private acknowledgementCount: number; private timeReceived: Map; @@ -85,6 +85,10 @@ export class MessageChannel extends TypedEventEmitter { options.receivedMessageTimeout ?? DEFAULT_RECEIVED_MESSAGE_TIMEOUT; } + public static getMessageId(payload: Uint8Array): string { + return bytesToHex(sha256(payload)); + } + /** * Processes all queued tasks sequentially to ensure proper message ordering. * @@ -116,24 +120,6 @@ export class MessageChannel extends TypedEventEmitter { } } - private async executeTask(item: Task): Promise { - try { - const handler = this.handlers[item.command]; - await handler(item.params as ParamsByAction[A]); - } catch (error) { - log.error(`Task execution failed for command ${item.command}:`, error); - this.dispatchEvent( - new CustomEvent("taskError", { - detail: { command: item.command, error, params: item.params } - }) - ); - } - } - - public static getMessageId(payload: Uint8Array): string { - return bytesToHex(sha256(payload)); - } - /** * Queues a message to be sent on this channel. * @@ -174,54 +160,6 @@ export class MessageChannel extends TypedEventEmitter { }); } - private async _sendMessage( - payload: Uint8Array, - callback?: (message: Message) => Promise<{ - success: boolean; - retrievalHint?: Uint8Array; - }> - ): Promise { - this.lamportTimestamp++; - - const messageId = MessageChannel.getMessageId(payload); - - const message: Message = { - messageId, - channelId: this.channelId, - lamportTimestamp: this.lamportTimestamp, - causalHistory: this.localHistory - .slice(-this.causalHistorySize) - .map(({ historyEntry }) => historyEntry), - bloomFilter: this.filter.toBytes(), - content: payload - }; - - this.outgoingBuffer.push(message); - - if (callback) { - try { - const { success, retrievalHint } = await callback(message); - if (success) { - this.filter.insert(messageId); - this.localHistory.push({ - timestamp: this.lamportTimestamp, - historyEntry: { - messageId, - retrievalHint - } - }); - this.timeReceived.set(messageId, Date.now()); - this.safeDispatchEvent(MessageChannelEvent.MessageSent, { - detail: message - }); - } - } catch (error) { - log.error("Callback execution failed in _sendMessage:", error); - throw error; - } - } - } - /** * Sends a short-lived message without synchronization or reliability requirements. * @@ -248,29 +186,6 @@ export class MessageChannel extends TypedEventEmitter { }); } - private async _sendEphemeralMessage( - payload: Uint8Array, - callback?: (message: Message) => Promise - ): Promise { - const message: Message = { - messageId: MessageChannel.getMessageId(payload), - channelId: this.channelId, - content: payload, - lamportTimestamp: undefined, - causalHistory: [], - bloomFilter: undefined - }; - - if (callback) { - try { - await callback(message); - } catch (error) { - log.error("Callback execution failed in _sendEphemeralMessage:", error); - throw error; - } - } - } - /** * Queues a received message for processing. * @@ -299,55 +214,11 @@ export class MessageChannel extends TypedEventEmitter { }); } - public _receiveMessage(message: Message): void { - if ( - message.content && - message.content.length > 0 && - this.timeReceived.has(message.messageId) - ) { - return; - } - - if (!message.lamportTimestamp) { - this.deliverMessage(message); - return; - } - if (message.content?.length === 0) { - this.safeDispatchEvent(MessageChannelEvent.SyncReceived, { - detail: message - }); - } else { - this.safeDispatchEvent(MessageChannelEvent.MessageReceived, { - detail: message - }); - } - this.reviewAckStatus(message); - if (message.content?.length && message.content.length > 0) { - this.filter.insert(message.messageId); - } - const dependenciesMet = message.causalHistory.every((historyEntry) => - this.localHistory.some( - ({ historyEntry: { messageId } }) => - messageId === historyEntry.messageId - ) - ); - if (!dependenciesMet) { - this.incomingBuffer.push(message); - this.timeReceived.set(message.messageId, Date.now()); - } else { - this.deliverMessage(message); - this.safeDispatchEvent(MessageChannelEvent.MessageDelivered, { - detail: { - messageId: message.messageId, - sentOrReceived: "received" - } - }); - } - } - - // https://rfc.vac.dev/vac/raw/sds/#periodic-incoming-buffer-sweep - // Note that even though this function has side effects, it is not async - // and does not need to be called through the queue. + /** + * Processes messages in the incoming buffer, delivering those with satisfied dependencies. + * + * @returns Array of history entries for messages still missing dependencies + */ public sweepIncomingBuffer(): HistoryEntry[] { const { buffer, missing } = this.incomingBuffer.reduce<{ buffer: Message[]; @@ -363,7 +234,7 @@ export class MessageChannel extends TypedEventEmitter { ); if (missingDependencies.length === 0) { this.deliverMessage(message); - this.safeDispatchEvent(MessageChannelEvent.MessageDelivered, { + this.safeSendEvent(MessageChannelEvent.MessageDelivered, { detail: { messageId: message.messageId, sentOrReceived: "received" @@ -395,7 +266,7 @@ export class MessageChannel extends TypedEventEmitter { ); this.incomingBuffer = buffer; - this.safeDispatchEvent(MessageChannelEvent.MissedMessages, { + this.safeSendEvent(MessageChannelEvent.MissedMessages, { detail: Array.from(missing) }); @@ -461,7 +332,7 @@ export class MessageChannel extends TypedEventEmitter { if (callback) { try { await callback(message); - this.safeDispatchEvent(MessageChannelEvent.SyncSent, { + this.safeSendEvent(MessageChannelEvent.SyncSent, { detail: message }); return true; @@ -473,6 +344,149 @@ export class MessageChannel extends TypedEventEmitter { return false; } + private _receiveMessage(message: Message): void { + const isDuplicate = + message.content && + message.content.length > 0 && + this.timeReceived.has(message.messageId); + + if (isDuplicate) { + return; + } + + if (!message.lamportTimestamp) { + this.deliverMessage(message); + return; + } + if (message.content?.length === 0) { + this.safeSendEvent(MessageChannelEvent.SyncReceived, { + detail: message + }); + } else { + this.safeSendEvent(MessageChannelEvent.MessageReceived, { + detail: message + }); + } + this.reviewAckStatus(message); + if (message.content?.length && message.content.length > 0) { + this.filter.insert(message.messageId); + } + const dependenciesMet = message.causalHistory.every((historyEntry) => + this.localHistory.some( + ({ historyEntry: { messageId } }) => + messageId === historyEntry.messageId + ) + ); + if (!dependenciesMet) { + this.incomingBuffer.push(message); + this.timeReceived.set(message.messageId, Date.now()); + } else { + this.deliverMessage(message); + this.safeSendEvent(MessageChannelEvent.MessageDelivered, { + detail: { + messageId: message.messageId, + sentOrReceived: "received" + } + }); + } + } + + private async executeTask(item: Task): Promise { + try { + const handler = this.handlers[item.command]; + await handler(item.params as ParamsByAction[A]); + } catch (error) { + log.error(`Task execution failed for command ${item.command}:`, error); + this.dispatchEvent( + new CustomEvent("taskError", { + detail: { command: item.command, error, params: item.params } + }) + ); + } + } + + private safeSendEvent( + event: T, + eventInit?: CustomEventInit + ): void { + try { + this.dispatchEvent(new CustomEvent(event, eventInit)); + } catch (error) { + log.error(`Failed to dispatch event ${event}:`, error); + } + } + + private async _sendMessage( + payload: Uint8Array, + callback?: (message: Message) => Promise<{ + success: boolean; + retrievalHint?: Uint8Array; + }> + ): Promise { + this.lamportTimestamp++; + + const messageId = MessageChannel.getMessageId(payload); + + const message: Message = { + messageId, + channelId: this.channelId, + lamportTimestamp: this.lamportTimestamp, + causalHistory: this.localHistory + .slice(-this.causalHistorySize) + .map(({ historyEntry }) => historyEntry), + bloomFilter: this.filter.toBytes(), + content: payload + }; + + this.outgoingBuffer.push(message); + + if (callback) { + try { + const { success, retrievalHint } = await callback(message); + if (success) { + this.filter.insert(messageId); + this.localHistory.push({ + timestamp: this.lamportTimestamp, + historyEntry: { + messageId, + retrievalHint + } + }); + this.timeReceived.set(messageId, Date.now()); + this.safeSendEvent(MessageChannelEvent.MessageSent, { + detail: message + }); + } + } catch (error) { + log.error("Callback execution failed in _sendMessage:", error); + throw error; + } + } + } + + private async _sendEphemeralMessage( + payload: Uint8Array, + callback?: (message: Message) => Promise + ): Promise { + const message: Message = { + messageId: MessageChannel.getMessageId(payload), + channelId: this.channelId, + content: payload, + lamportTimestamp: undefined, + causalHistory: [], + bloomFilter: undefined + }; + + if (callback) { + try { + await callback(message); + } catch (error) { + log.error("Callback execution failed in _sendEphemeralMessage:", error); + throw error; + } + } + } + // See https://rfc.vac.dev/vac/raw/sds/#deliver-message private deliverMessage(message: Message, retrievalHint?: Uint8Array): void { const messageLamportTimestamp = message.lamportTimestamp ?? 0; @@ -520,7 +534,7 @@ export class MessageChannel extends TypedEventEmitter { if (outgoingMessageId !== messageId) { return true; } - this.safeDispatchEvent(MessageChannelEvent.MessageAcknowledged, { + this.safeSendEvent(MessageChannelEvent.MessageAcknowledged, { detail: messageId }); return false; @@ -548,7 +562,7 @@ export class MessageChannel extends TypedEventEmitter { const count = (this.acknowledgements.get(message.messageId) ?? 0) + 1; if (count < this.acknowledgementCount) { this.acknowledgements.set(message.messageId, count); - this.safeDispatchEvent(MessageChannelEvent.PartialAcknowledgement, { + this.safeSendEvent(MessageChannelEvent.PartialAcknowledgement, { detail: { messageId: message.messageId, count