fix: reorder methods by visibility

This commit is contained in:
Arseniy Klempner 2025-06-11 17:34:51 -07:00
parent a2c3b2e6aa
commit 8444bc940f
No known key found for this signature in database
GPG Key ID: 51653F18863BD24B

View File

@ -31,13 +31,13 @@ interface MessageChannelOptions {
}
export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
public readonly channelId: ChannelId;
private lamportTimestamp: number;
private filter: DefaultBloomFilter;
private outgoingBuffer: Message[];
private acknowledgements: Map<string, number>;
private incomingBuffer: Message[];
private localHistory: { timestamp: number; historyEntry: HistoryEntry }[];
public readonly channelId: ChannelId;
private causalHistorySize: number;
private acknowledgementCount: number;
private timeReceived: Map<string, number>;
@ -85,6 +85,10 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
options.receivedMessageTimeout ?? DEFAULT_RECEIVED_MESSAGE_TIMEOUT;
}
public static getMessageId(payload: Uint8Array): string {
return bytesToHex(sha256(payload));
}
/**
* Processes all queued tasks sequentially to ensure proper message ordering.
*
@ -116,24 +120,6 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
}
}
private async executeTask<A extends Command>(item: Task<A>): Promise<void> {
try {
const handler = this.handlers[item.command];
await handler(item.params as ParamsByAction[A]);
} catch (error) {
log.error(`Task execution failed for command ${item.command}:`, error);
this.dispatchEvent(
new CustomEvent("taskError", {
detail: { command: item.command, error, params: item.params }
})
);
}
}
public static getMessageId(payload: Uint8Array): string {
return bytesToHex(sha256(payload));
}
/**
* Queues a message to be sent on this channel.
*
@ -174,54 +160,6 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
});
}
private async _sendMessage(
payload: Uint8Array,
callback?: (message: Message) => Promise<{
success: boolean;
retrievalHint?: Uint8Array;
}>
): Promise<void> {
this.lamportTimestamp++;
const messageId = MessageChannel.getMessageId(payload);
const message: Message = {
messageId,
channelId: this.channelId,
lamportTimestamp: this.lamportTimestamp,
causalHistory: this.localHistory
.slice(-this.causalHistorySize)
.map(({ historyEntry }) => historyEntry),
bloomFilter: this.filter.toBytes(),
content: payload
};
this.outgoingBuffer.push(message);
if (callback) {
try {
const { success, retrievalHint } = await callback(message);
if (success) {
this.filter.insert(messageId);
this.localHistory.push({
timestamp: this.lamportTimestamp,
historyEntry: {
messageId,
retrievalHint
}
});
this.timeReceived.set(messageId, Date.now());
this.safeDispatchEvent(MessageChannelEvent.MessageSent, {
detail: message
});
}
} catch (error) {
log.error("Callback execution failed in _sendMessage:", error);
throw error;
}
}
}
/**
* Sends a short-lived message without synchronization or reliability requirements.
*
@ -248,29 +186,6 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
});
}
private async _sendEphemeralMessage(
payload: Uint8Array,
callback?: (message: Message) => Promise<boolean>
): Promise<void> {
const message: Message = {
messageId: MessageChannel.getMessageId(payload),
channelId: this.channelId,
content: payload,
lamportTimestamp: undefined,
causalHistory: [],
bloomFilter: undefined
};
if (callback) {
try {
await callback(message);
} catch (error) {
log.error("Callback execution failed in _sendEphemeralMessage:", error);
throw error;
}
}
}
/**
* Queues a received message for processing.
*
@ -299,55 +214,11 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
});
}
public _receiveMessage(message: Message): void {
if (
message.content &&
message.content.length > 0 &&
this.timeReceived.has(message.messageId)
) {
return;
}
if (!message.lamportTimestamp) {
this.deliverMessage(message);
return;
}
if (message.content?.length === 0) {
this.safeDispatchEvent(MessageChannelEvent.SyncReceived, {
detail: message
});
} else {
this.safeDispatchEvent(MessageChannelEvent.MessageReceived, {
detail: message
});
}
this.reviewAckStatus(message);
if (message.content?.length && message.content.length > 0) {
this.filter.insert(message.messageId);
}
const dependenciesMet = message.causalHistory.every((historyEntry) =>
this.localHistory.some(
({ historyEntry: { messageId } }) =>
messageId === historyEntry.messageId
)
);
if (!dependenciesMet) {
this.incomingBuffer.push(message);
this.timeReceived.set(message.messageId, Date.now());
} else {
this.deliverMessage(message);
this.safeDispatchEvent(MessageChannelEvent.MessageDelivered, {
detail: {
messageId: message.messageId,
sentOrReceived: "received"
}
});
}
}
// https://rfc.vac.dev/vac/raw/sds/#periodic-incoming-buffer-sweep
// Note that even though this function has side effects, it is not async
// and does not need to be called through the queue.
/**
* Processes messages in the incoming buffer, delivering those with satisfied dependencies.
*
* @returns Array of history entries for messages still missing dependencies
*/
public sweepIncomingBuffer(): HistoryEntry[] {
const { buffer, missing } = this.incomingBuffer.reduce<{
buffer: Message[];
@ -363,7 +234,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
);
if (missingDependencies.length === 0) {
this.deliverMessage(message);
this.safeDispatchEvent(MessageChannelEvent.MessageDelivered, {
this.safeSendEvent(MessageChannelEvent.MessageDelivered, {
detail: {
messageId: message.messageId,
sentOrReceived: "received"
@ -395,7 +266,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
);
this.incomingBuffer = buffer;
this.safeDispatchEvent(MessageChannelEvent.MissedMessages, {
this.safeSendEvent(MessageChannelEvent.MissedMessages, {
detail: Array.from(missing)
});
@ -461,7 +332,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
if (callback) {
try {
await callback(message);
this.safeDispatchEvent(MessageChannelEvent.SyncSent, {
this.safeSendEvent(MessageChannelEvent.SyncSent, {
detail: message
});
return true;
@ -473,6 +344,149 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
return false;
}
private _receiveMessage(message: Message): void {
const isDuplicate =
message.content &&
message.content.length > 0 &&
this.timeReceived.has(message.messageId);
if (isDuplicate) {
return;
}
if (!message.lamportTimestamp) {
this.deliverMessage(message);
return;
}
if (message.content?.length === 0) {
this.safeSendEvent(MessageChannelEvent.SyncReceived, {
detail: message
});
} else {
this.safeSendEvent(MessageChannelEvent.MessageReceived, {
detail: message
});
}
this.reviewAckStatus(message);
if (message.content?.length && message.content.length > 0) {
this.filter.insert(message.messageId);
}
const dependenciesMet = message.causalHistory.every((historyEntry) =>
this.localHistory.some(
({ historyEntry: { messageId } }) =>
messageId === historyEntry.messageId
)
);
if (!dependenciesMet) {
this.incomingBuffer.push(message);
this.timeReceived.set(message.messageId, Date.now());
} else {
this.deliverMessage(message);
this.safeSendEvent(MessageChannelEvent.MessageDelivered, {
detail: {
messageId: message.messageId,
sentOrReceived: "received"
}
});
}
}
private async executeTask<A extends Command>(item: Task<A>): Promise<void> {
try {
const handler = this.handlers[item.command];
await handler(item.params as ParamsByAction[A]);
} catch (error) {
log.error(`Task execution failed for command ${item.command}:`, error);
this.dispatchEvent(
new CustomEvent("taskError", {
detail: { command: item.command, error, params: item.params }
})
);
}
}
private safeSendEvent<T extends MessageChannelEvent>(
event: T,
eventInit?: CustomEventInit
): void {
try {
this.dispatchEvent(new CustomEvent(event, eventInit));
} catch (error) {
log.error(`Failed to dispatch event ${event}:`, error);
}
}
private async _sendMessage(
payload: Uint8Array,
callback?: (message: Message) => Promise<{
success: boolean;
retrievalHint?: Uint8Array;
}>
): Promise<void> {
this.lamportTimestamp++;
const messageId = MessageChannel.getMessageId(payload);
const message: Message = {
messageId,
channelId: this.channelId,
lamportTimestamp: this.lamportTimestamp,
causalHistory: this.localHistory
.slice(-this.causalHistorySize)
.map(({ historyEntry }) => historyEntry),
bloomFilter: this.filter.toBytes(),
content: payload
};
this.outgoingBuffer.push(message);
if (callback) {
try {
const { success, retrievalHint } = await callback(message);
if (success) {
this.filter.insert(messageId);
this.localHistory.push({
timestamp: this.lamportTimestamp,
historyEntry: {
messageId,
retrievalHint
}
});
this.timeReceived.set(messageId, Date.now());
this.safeSendEvent(MessageChannelEvent.MessageSent, {
detail: message
});
}
} catch (error) {
log.error("Callback execution failed in _sendMessage:", error);
throw error;
}
}
}
private async _sendEphemeralMessage(
payload: Uint8Array,
callback?: (message: Message) => Promise<boolean>
): Promise<void> {
const message: Message = {
messageId: MessageChannel.getMessageId(payload),
channelId: this.channelId,
content: payload,
lamportTimestamp: undefined,
causalHistory: [],
bloomFilter: undefined
};
if (callback) {
try {
await callback(message);
} catch (error) {
log.error("Callback execution failed in _sendEphemeralMessage:", error);
throw error;
}
}
}
// See https://rfc.vac.dev/vac/raw/sds/#deliver-message
private deliverMessage(message: Message, retrievalHint?: Uint8Array): void {
const messageLamportTimestamp = message.lamportTimestamp ?? 0;
@ -520,7 +534,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
if (outgoingMessageId !== messageId) {
return true;
}
this.safeDispatchEvent(MessageChannelEvent.MessageAcknowledged, {
this.safeSendEvent(MessageChannelEvent.MessageAcknowledged, {
detail: messageId
});
return false;
@ -548,7 +562,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
const count = (this.acknowledgements.get(message.messageId) ?? 0) + 1;
if (count < this.acknowledgementCount) {
this.acknowledgements.set(message.messageId, count);
this.safeDispatchEvent(MessageChannelEvent.PartialAcknowledgement, {
this.safeSendEvent(MessageChannelEvent.PartialAcknowledgement, {
detail: {
messageId: message.messageId,
count