fix(sds): reexport proto codec, use set for missing messages

This commit is contained in:
Arseniy Klempner 2025-03-26 21:57:52 -07:00
parent dd7663f378
commit c55c694298
No known key found for this signature in database
GPG Key ID: 51653F18863BD24B
5 changed files with 22 additions and 8 deletions

View File

@ -1,4 +1,4 @@
import { hashN } from "../nim_hashn/nim_hashn.mjs.js";
import { hashN } from "../nim_hashn/nim_hashn.mjs";
import { getMOverNBitsForK } from "../probabilities.js";
export interface BloomFilterOptions {

View File

@ -1,5 +1,5 @@
import { BloomFilter } from "./bloom_filter/bloom.js";
export * from "./message_channel/message_channel.js";
export * from "./message_channel/index.js";
export { BloomFilter };

View File

@ -15,6 +15,14 @@ export type Message = proto_sds_message.SdsMessage;
export type HistoryEntry = proto_sds_message.HistoryEntry;
export type ChannelId = string;
export function encodeMessage(message: Message): Uint8Array {
return proto_sds_message.SdsMessage.encode(message);
}
export function decodeMessage(data: Uint8Array): Message {
return proto_sds_message.SdsMessage.decode(data);
}
export type MessageChannelEvents = {
[MessageChannelEvent.MessageSent]: CustomEvent<Message>;
[MessageChannelEvent.MessageDelivered]: CustomEvent<{

View File

@ -0,0 +1,3 @@
export * from "./command_queue.js";
export * from "./events.js";
export * from "./message_channel.js";

View File

@ -298,7 +298,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
public sweepIncomingBuffer(): HistoryEntry[] {
const { buffer, missing } = this.incomingBuffer.reduce<{
buffer: Message[];
missing: HistoryEntry[];
missing: Set<HistoryEntry>;
}>(
({ buffer, missing }, message) => {
// Check each message for missing dependencies
@ -335,21 +335,24 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
}
// Any message with missing dependencies stays in the buffer
// and the missing message IDs are returned for processing.
missingDependencies.forEach((dependency) => {
missing.add(dependency);
});
return {
buffer: buffer.concat(message),
missing: missing.concat(missingDependencies)
missing
};
},
{ buffer: new Array<Message>(), missing: new Array<HistoryEntry>() }
{ buffer: new Array<Message>(), missing: new Set<HistoryEntry>() }
);
// Update the incoming buffer to only include messages with no missing dependencies
this.incomingBuffer = buffer;
if (missing.length > 0) {
if (missing.size > 0) {
this.safeDispatchEvent(MessageChannelEvent.MissedMessages, {
detail: missing
detail: Array.from(missing)
});
}
return missing;
return Array.from(missing);
}
// https://rfc.vac.dev/vac/raw/sds/#periodic-outgoing-buffer-sweep