From 389ca4062eebda91eac6d8e212ca4d063e7ac103 Mon Sep 17 00:00:00 2001 From: Arseniy Klempner Date: Tue, 11 Feb 2025 13:24:43 -0800 Subject: [PATCH] feat(sds): add message channel with buffers and send/receive logic This commit creates the class for an SDS message channel, including buffers for outgoing and incoming messages. Adds logic for sending messages, receiving messages, delivering messages, and reviewing acknowledgement status of messages. Also adds byte serialization for bloom filters. --- .cspell.json | 1 + package-lock.json | 5 +- packages/proto/src/index.ts | 2 + packages/sds/.mocharc.cjs | 2 +- packages/sds/package.json | 4 + packages/sds/src/bloom.spec.ts | 51 +-- packages/sds/src/bloom.ts | 40 +++ .../{nim_hashn.d.ts => nim_hashn.mjs.d.ts} | 0 packages/sds/src/sds.spec.ts | 333 ++++++++++++++++++ packages/sds/src/sds.ts | 190 ++++++++++ packages/sds/tsconfig.json | 4 +- 11 files changed, 604 insertions(+), 28 deletions(-) rename packages/sds/src/nim_hashn/{nim_hashn.d.ts => nim_hashn.mjs.d.ts} (100%) create mode 100644 packages/sds/src/sds.spec.ts create mode 100644 packages/sds/src/sds.ts diff --git a/.cspell.json b/.cspell.json index da0e8d0f65..125ad6ab84 100644 --- a/.cspell.json +++ b/.cspell.json @@ -64,6 +64,7 @@ "kdfparams", "keccak", "keypair", + "lamport", "lastpub", "libauth", "libp", diff --git a/package-lock.json b/package-lock.json index 226f8833db..3af4c070b1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -6937,7 +6937,6 @@ "version": "1.7.1", "resolved": "https://registry.npmjs.org/@noble/hashes/-/hashes-1.7.1.tgz", "integrity": "sha512-B8XBPsn4vT/KJAGqDzbwztd+6Yte3P4V7iafm24bxgDe/mlRuK6xmWPuCNrKt2vDafZ8MfJLlchDG/vYafQEjQ==", - "license": "MIT", "engines": { "node": "^14.21.3 || >=16" }, @@ -42781,6 +42780,10 @@ "version": "0.0.1", "license": "MIT OR Apache-2.0", "dependencies": { + "@noble/hashes": "^1.7.1", + "@waku/message-hash": "^0.1.17", + "@waku/proto": "^0.0.8", + "@waku/utils": "^0.0.21", "chai": "^5.1.2" }, "devDependencies": { diff --git a/packages/proto/src/index.ts b/packages/proto/src/index.ts index 4dc3f16ba9..598708e1f0 100644 --- a/packages/proto/src/index.ts +++ b/packages/proto/src/index.ts @@ -15,3 +15,5 @@ export * as proto_store from './generated/store_v3.js' export * as proto_peer_exchange from "./generated/peer_exchange.js"; export * as proto_metadata from './generated/metadata.js' + +export * as proto_sds_message from './generated/sds_message.js' \ No newline at end of file diff --git a/packages/sds/.mocharc.cjs b/packages/sds/.mocharc.cjs index b61a6aaee5..480b30d613 100644 --- a/packages/sds/.mocharc.cjs +++ b/packages/sds/.mocharc.cjs @@ -8,7 +8,7 @@ const config = { 'loader=ts-node/esm' ], exit: true, - retries: 4 + retries: 2 }; if (process.env.CI) { diff --git a/packages/sds/package.json b/packages/sds/package.json index a7fe4f303e..69b7743151 100644 --- a/packages/sds/package.json +++ b/packages/sds/package.json @@ -59,6 +59,10 @@ "node": ">=20" }, "dependencies": { + "@noble/hashes": "^1.7.1", + "@waku/message-hash": "^0.1.17", + "@waku/proto": "^0.0.8", + "@waku/utils": "^0.0.21", "chai": "^5.1.2" }, "devDependencies": { diff --git a/packages/sds/src/bloom.spec.ts b/packages/sds/src/bloom.spec.ts index c37ac4215e..22d6991302 100644 --- a/packages/sds/src/bloom.spec.ts +++ b/packages/sds/src/bloom.spec.ts @@ -1,7 +1,6 @@ import { expect } from "chai"; -import { BloomFilter } from "./bloom.js"; -import { hashN } from "./nim_hashn/nim_hashn.mjs"; +import { BloomFilter, DefaultBloomFilter } from "./bloom.js"; const n = 10000; const sampleChars = @@ -20,13 +19,10 @@ describe("BloomFilter", () => { let testElements: string[]; beforeEach(() => { - bloomFilter = new BloomFilter( - { - capacity: n, - errorRate: 0.001 - }, - hashN - ); + bloomFilter = new DefaultBloomFilter({ + capacity: n, + errorRate: 0.001 + }); testElements = new Array(n); @@ -55,15 +51,12 @@ describe("BloomFilter", () => { expect(bloomFilter.kHashes).to.equal(10); expect(bloomFilter.totalBits / n).to.equal(15); - const bloomFilter2 = new BloomFilter( - { - capacity: 10000, - errorRate: 0.001, - kHashes: 4, - forceNBitsPerElem: 20 - }, - hashN - ); + const bloomFilter2 = new DefaultBloomFilter({ + capacity: 10000, + errorRate: 0.001, + kHashes: 4, + forceNBitsPerElem: 20 + }); expect(bloomFilter2.kHashes).to.equal(4); expect(bloomFilter2.totalBits).to.equal(200000); }); @@ -107,6 +100,17 @@ describe("BloomFilter", () => { expect(bloomFilter.lookup(item)).to.equal(true); } }); + + it("should serialize and deserialize correctly", () => { + const serialized = bloomFilter.toBytes(); + const deserialized = DefaultBloomFilter.fromBytes( + serialized, + bloomFilter.options + ); + for (const item of testElements) { + expect(deserialized.lookup(item)).to.equal(true); + } + }); }); describe("BloomFilter with special patterns", () => { @@ -114,13 +118,10 @@ describe("BloomFilter with special patterns", () => { const inserted: string[] = []; beforeEach(() => { - bloomFilter = new BloomFilter( - { - capacity: n, - errorRate: 0.001 - }, - hashN - ); + bloomFilter = new DefaultBloomFilter({ + capacity: n, + errorRate: 0.001 + }); }); it("should handle special patterns correctly", () => { diff --git a/packages/sds/src/bloom.ts b/packages/sds/src/bloom.ts index 78eb8fcd79..97cbed4a91 100644 --- a/packages/sds/src/bloom.ts +++ b/packages/sds/src/bloom.ts @@ -1,3 +1,4 @@ +import { hashN } from "./nim_hashn/nim_hashn.mjs"; import { getMOverNBitsForK } from "./probabilities.js"; export interface BloomFilterOptions { @@ -30,11 +31,15 @@ export class BloomFilter { public kHashes: number; public errorRate: number; + public options: BloomFilterOptions; + private hashN: (item: string, n: number, maxValue: number) => number; public constructor( options: BloomFilterOptions, hashN: (item: string, n: number, maxValue: number) => number ) { + this.options = options; + let nBitsPerElem: number; let k = options.kHashes ?? 0; const forceNBitsPerElem = options.forceNBitsPerElem ?? 0; @@ -103,4 +108,39 @@ export class BloomFilter { } return true; } + + public toBytes(): Uint8Array { + const buffer = new ArrayBuffer(this.data.length * 8); + const view = new DataView(buffer); + for (let i = 0; i < this.data.length; i++) { + view.setBigInt64(i * 8, this.data[i]); + } + return new Uint8Array(buffer); + } + + public static fromBytes( + bytes: Uint8Array, + options: BloomFilterOptions, + hashN: (item: string, n: number, maxValue: number) => number + ): BloomFilter { + const bloomFilter = new BloomFilter(options, hashN); + const view = new DataView(bytes.buffer); + for (let i = 0; i < bloomFilter.data.length; i++) { + bloomFilter.data[i] = view.getBigUint64(i * 8, false); + } + return bloomFilter; + } +} + +export class DefaultBloomFilter extends BloomFilter { + public constructor(options: BloomFilterOptions) { + super(options, hashN); + } + + public static fromBytes( + bytes: Uint8Array, + options: BloomFilterOptions + ): DefaultBloomFilter { + return BloomFilter.fromBytes(bytes, options, hashN); + } } diff --git a/packages/sds/src/nim_hashn/nim_hashn.d.ts b/packages/sds/src/nim_hashn/nim_hashn.mjs.d.ts similarity index 100% rename from packages/sds/src/nim_hashn/nim_hashn.d.ts rename to packages/sds/src/nim_hashn/nim_hashn.mjs.d.ts diff --git a/packages/sds/src/sds.spec.ts b/packages/sds/src/sds.spec.ts new file mode 100644 index 0000000000..0a22563cd7 --- /dev/null +++ b/packages/sds/src/sds.spec.ts @@ -0,0 +1,333 @@ +import { utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; + +import { DefaultBloomFilter } from "./bloom.js"; +import { + DEFAULT_BLOOM_FILTER_OPTIONS, + Message, + MessageChannel +} from "./sds.js"; + +const channelId = "test-channel"; +const callback = (_message: Message): boolean => { + return true; +}; + +const getBloomFilter = (channel: MessageChannel): DefaultBloomFilter => { + return (channel as any).filter as DefaultBloomFilter; +}; + +const messagesA = ["message-1", "message-2"]; +const messagesB = [ + "message-3", + "message-4", + "message-5", + "message-6", + "message-7" +]; + +describe("MessageChannel", function () { + this.timeout(5000); + let channelA: MessageChannel; + let channelB: MessageChannel; + + describe("sending a message ", () => { + beforeEach(() => { + channelA = new MessageChannel(channelId); + }); + + it("should increase lamport timestamp", () => { + const timestampBefore = (channelA as any).lamportTimestamp; + channelA.sendMessage(new Uint8Array(), callback); + const timestampAfter = (channelA as any).lamportTimestamp; + expect(timestampAfter).to.equal(timestampBefore + 1); + }); + + it("should push the message to the outgoing buffer", () => { + const bufferLengthBefore = (channelA as any).outgoingBuffer.length; + channelA.sendMessage(new Uint8Array(), callback); + const bufferLengthAfter = (channelA as any).outgoingBuffer.length; + expect(bufferLengthAfter).to.equal(bufferLengthBefore + 1); + }); + + it("should insert message into bloom filter", () => { + const messageId = MessageChannel.getMessageId(new Uint8Array()); + channelA.sendMessage(new Uint8Array(), callback); + const bloomFilter = getBloomFilter(channelA); + expect(bloomFilter.lookup(messageId)).to.equal(true); + }); + + it("should insert message id into causal history", () => { + const expectedTimestamp = (channelA as any).lamportTimestamp + 1; + const messageId = MessageChannel.getMessageId(new Uint8Array()); + channelA.sendMessage(new Uint8Array(), callback); + const messageIdLog = (channelA as any).messageIdLog as { + timestamp: number; + messageId: string; + }[]; + expect(messageIdLog.length).to.equal(1); + expect( + messageIdLog.some( + (log) => + log.timestamp === expectedTimestamp && log.messageId === messageId + ) + ).to.equal(true); + }); + + it("should attach causal history and bloom filter to each message", () => { + const bloomFilter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS); + const causalHistorySize = (channelA as any).causalHistorySize; + const filterBytes = new Array(); + const messages = new Array(causalHistorySize + 5) + .fill("message") + .map((message, index) => `${message}-${index}`); + + messages.forEach((message) => { + filterBytes.push(bloomFilter.toBytes()); + channelA.sendMessage(utf8ToBytes(message), callback); + bloomFilter.insert(MessageChannel.getMessageId(utf8ToBytes(message))); + }); + + const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; + expect(outgoingBuffer.length).to.equal(messages.length); + + outgoingBuffer.forEach((message, index) => { + expect(message.content).to.deep.equal(utf8ToBytes(messages[index])); + // Correct bloom filter should be attached to each message + expect(message.bloomFilter).to.deep.equal(filterBytes[index]); + }); + + // Causal history should only contain the last N messages as defined by causalHistorySize + const causalHistory = outgoingBuffer[outgoingBuffer.length - 1] + .causalHistory as string[]; + expect(causalHistory.length).to.equal(causalHistorySize); + + const expectedCausalHistory = messages + .slice(-causalHistorySize - 1, -1) + .map((message) => MessageChannel.getMessageId(utf8ToBytes(message))); + expect(causalHistory).to.deep.equal(expectedCausalHistory); + }); + }); + + describe("receiving a message", () => { + beforeEach(() => { + channelA = new MessageChannel(channelId); + channelB = new MessageChannel(channelId); + }); + + it("should increase lamport timestamp", () => { + const timestampBefore = (channelA as any).lamportTimestamp; + channelB.sendMessage(new Uint8Array(), (message) => { + channelA.receiveMessage(message); + return true; + }); + const timestampAfter = (channelA as any).lamportTimestamp; + expect(timestampAfter).to.equal(timestampBefore + 1); + }); + + it("should update lamport timestamp if greater than current timestamp and dependencies are met", () => { + messagesA.forEach((m) => { + channelA.sendMessage(utf8ToBytes(m), callback); + }); + messagesB.forEach((m) => { + channelB.sendMessage(utf8ToBytes(m), (message) => { + channelA.receiveMessage(message); + return true; + }); + }); + const timestampAfter = (channelA as any).lamportTimestamp; + expect(timestampAfter).to.equal(messagesB.length); + }); + + it("should maintain proper timestamps if all messages received", () => { + let timestamp = 0; + messagesA.forEach((m) => { + channelA.sendMessage(utf8ToBytes(m), (message) => { + timestamp++; + channelB.receiveMessage(message); + expect((channelB as any).lamportTimestamp).to.equal(timestamp); + return true; + }); + }); + + messagesB.forEach((m) => { + channelB.sendMessage(utf8ToBytes(m), (message) => { + timestamp++; + channelA.receiveMessage(message); + expect((channelA as any).lamportTimestamp).to.equal(timestamp); + return true; + }); + }); + + const expectedLength = messagesA.length + messagesB.length; + expect((channelA as any).lamportTimestamp).to.equal(expectedLength); + expect((channelA as any).lamportTimestamp).to.equal( + (channelB as any).lamportTimestamp + ); + }); + + it("should add received messages to bloom filter", () => { + messagesA.forEach((m) => { + channelA.sendMessage(utf8ToBytes(m), (message) => { + channelB.receiveMessage(message); + const bloomFilter = getBloomFilter(channelB); + expect(bloomFilter.lookup(message.messageId)).to.equal(true); + return true; + }); + }); + }); + + it("should add to incoming buffer if dependencies are not met", () => { + messagesA.forEach((m) => { + channelA.sendMessage(utf8ToBytes(m), callback); + }); + + let receivedMessage: Message | null = null; + const timestampBefore = (channelB as any).lamportTimestamp; + + channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { + receivedMessage = message; + channelB.receiveMessage(message); + return true; + }); + + const incomingBuffer = (channelB as any).incomingBuffer as Message[]; + expect(incomingBuffer.length).to.equal(1); + expect(incomingBuffer[0].messageId).to.equal(receivedMessage!.messageId); + + // Since the dependency is not met, the lamport timestamp should not increase + const timestampAfter = (channelB as any).lamportTimestamp; + expect(timestampAfter).to.equal(timestampBefore); + + // Message should not be in local history + const localHistory = (channelB as any).messageIdLog as { + timestamp: number; + messageId: string; + }[]; + expect( + localHistory.some((m) => m.messageId === receivedMessage!.messageId) + ).to.equal(false); + }); + }); + + describe("reviewing ack status", () => { + beforeEach(() => { + channelA = new MessageChannel(channelId); + channelB = new MessageChannel(channelId); + }); + + it("should mark all messages in causal history as acknowledged", () => { + messagesA.forEach((m) => { + channelA.sendMessage(utf8ToBytes(m), (message) => { + channelB.receiveMessage(message); + return true; + }); + }); + + let notInHistory: Message | null = null; + channelA.sendMessage(utf8ToBytes("not-in-history"), (message) => { + notInHistory = message; + return true; + }); + + expect((channelA as any).outgoingBuffer.length).to.equal( + messagesA.length + 1 + ); + + channelB.sendMessage(utf8ToBytes(messagesB[0]), (message) => { + channelA.receiveMessage(message); + return true; + }); + + // Since messagesA are in causal history of channel B's message + // they should be gone from channel A's outgoing buffer + // and notInHistory should still be in the outgoing buffer + const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; + expect(outgoingBuffer.length).to.equal(1); + expect(outgoingBuffer[0].messageId).to.equal(notInHistory!.messageId); + }); + + it("should track probabilistic acknowledgements of messages received in bloom filter", () => { + const acknowledgementCount = (channelA as any).acknowledgementCount; + + const causalHistorySize = (channelA as any).causalHistorySize; + + const unacknowledgedMessages = [ + "unacknowledged-message-1", + "unacknowledged-message-2" + ]; + const messages = [...messagesA, ...messagesB.slice(0, -1)]; + // Send messages to be received by channel B + messages.forEach((m) => { + channelA.sendMessage(utf8ToBytes(m), (message) => { + channelB.receiveMessage(message); + return true; + }); + }); + + // Send messages not received by channel B + unacknowledgedMessages.forEach((m) => { + channelA.sendMessage(utf8ToBytes(m), callback); + }); + + // Channel B sends a message to channel A + channelB.sendMessage( + utf8ToBytes(messagesB[messagesB.length - 1]), + (message) => { + channelA.receiveMessage(message); + return true; + } + ); + + const acknowledgements: ReadonlyMap = (channelA as any) + .acknowledgements; + // Other than the message IDs which were included in causal history, + // the remaining messages sent by channel A should be considered possibly acknowledged + // for having been included in the bloom filter sent from channel B + const expectedAcknowledgementsSize = messages.length - causalHistorySize; + if (expectedAcknowledgementsSize <= 0) { + throw new Error("expectedAcknowledgementsSize must be greater than 0"); + } + expect(acknowledgements.size).to.equal(expectedAcknowledgementsSize); + // Channel B only included the last N messages in causal history + messages.slice(0, -causalHistorySize).forEach((m) => { + expect( + acknowledgements.get(MessageChannel.getMessageId(utf8ToBytes(m))) + ).to.equal(1); + }); + + // Messages that never reached channel B should not be acknowledged + unacknowledgedMessages.forEach((m) => { + expect( + acknowledgements.has(MessageChannel.getMessageId(utf8ToBytes(m))) + ).to.equal(false); + }); + + // When channel C sends more messages, it will include all the same messages + // in the bloom filter as before, which should mark them as fully acknowledged in channel A + for (let i = 1; i < acknowledgementCount; i++) { + // Send messages until acknowledgement count is reached + channelB.sendMessage(utf8ToBytes(`x-${i}`), (message) => { + channelA.receiveMessage(message); + return true; + }); + } + + // No more partial acknowledgements should be in channel A + expect(acknowledgements.size).to.equal(0); + + // Messages that were not acknowledged should still be in the outgoing buffer + expect((channelA as any).outgoingBuffer.length).to.equal( + unacknowledgedMessages.length + ); + unacknowledgedMessages.forEach((m) => { + expect( + ((channelA as any).outgoingBuffer as Message[]).some( + (message) => + message.messageId === MessageChannel.getMessageId(utf8ToBytes(m)) + ) + ).to.equal(true); + }); + }); + }); +}); diff --git a/packages/sds/src/sds.ts b/packages/sds/src/sds.ts new file mode 100644 index 0000000000..1643c272b5 --- /dev/null +++ b/packages/sds/src/sds.ts @@ -0,0 +1,190 @@ +import { sha256 } from "@noble/hashes/sha256"; +import { bytesToHex } from "@noble/hashes/utils"; +import { proto_sds_message } from "@waku/proto"; + +import { DefaultBloomFilter } from "./bloom.js"; + +export type Message = proto_sds_message.SdsMessage; +export type ChannelId = string; + +export const DEFAULT_BLOOM_FILTER_OPTIONS = { + capacity: 10000, + errorRate: 0.001 +}; + +const DEFAULT_CAUSAL_HISTORY_SIZE = 2; + +export class MessageChannel { + private lamportTimestamp: number; + private filter: DefaultBloomFilter; + private outgoingBuffer: Message[]; + private acknowledgements: Map; + private incomingBuffer: Message[]; + private messageIdLog: { timestamp: number; messageId: string }[]; + private channelId: ChannelId; + private causalHistorySize: number; + private acknowledgementCount: number; + + public constructor( + channelId: ChannelId, + causalHistorySize: number = DEFAULT_CAUSAL_HISTORY_SIZE + ) { + this.channelId = channelId; + this.lamportTimestamp = 0; + this.filter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS); + this.outgoingBuffer = []; + this.acknowledgements = new Map(); + this.incomingBuffer = []; + this.messageIdLog = []; + this.causalHistorySize = causalHistorySize; + this.acknowledgementCount = this.getAcknowledgementCount(); + } + + public static getMessageId(payload: Uint8Array): string { + return bytesToHex(sha256(payload)); + } + + /** + * Send a message to the SDS channel. + * + * Increments the lamport timestamp, constructs a `Message` object + * with the given payload, and adds it to the outgoing buffer. + * + * If the callback is successful, the message is also added to + * the bloom filter and message history. In the context of + * Waku, this likely means the message was published via + * light push or relay. + * + * See https://rfc.vac.dev/vac/raw/sds/#send-message + * + * @param payload - The payload to send. + * @param callback - A callback function that returns a boolean indicating whether the message was sent successfully. + */ + public sendMessage( + payload: Uint8Array, + callback?: (message: Message) => boolean + ): void { + this.lamportTimestamp++; + + const messageId = MessageChannel.getMessageId(payload); + + const message: Message = { + messageId, + channelId: this.channelId, + lamportTimestamp: this.lamportTimestamp, + causalHistory: this.messageIdLog + .slice(-this.causalHistorySize) + .map(({ messageId }) => messageId), + bloomFilter: this.filter.toBytes(), + content: payload + }; + + this.outgoingBuffer.push(message); + + if (callback) { + const success = callback(message); + if (success) { + this.filter.insert(messageId); + this.messageIdLog.push({ timestamp: this.lamportTimestamp, messageId }); + } + } + } + + /** + * Process a received SDS message for this channel. + * + * Review the acknowledgement status of messages in the outgoing buffer + * by inspecting the received message's bloom filter and causal history. + * Add the received message to the bloom filter. + * If the local history contains every message in the received message's + * causal history, deliver the message. Otherwise, add the message to the + * incoming buffer. + * + * See https://rfc.vac.dev/vac/raw/sds/#receive-message + * + * @param message - The received SDS message. + */ + public receiveMessage(message: Message): void { + // review ack status + this.reviewAckStatus(message); + // add to bloom filter + this.filter.insert(message.messageId); + // verify causal history + const dependenciesMet = message.causalHistory.every((messageId) => + this.messageIdLog.some( + ({ messageId: logMessageId }) => logMessageId === messageId + ) + ); + if (!dependenciesMet) { + this.incomingBuffer.push(message); + } else { + this.deliverMessage(message); + } + } + + // See https://rfc.vac.dev/vac/raw/sds/#deliver-message + private deliverMessage(message: Message): void { + const messageLamportTimestamp = message.lamportTimestamp ?? 0; + if (messageLamportTimestamp > this.lamportTimestamp) { + this.lamportTimestamp = messageLamportTimestamp; + } + + // The participant MUST insert the message ID into its local log, + // based on Lamport timestamp. + // If one or more message IDs with the same Lamport timestamp already exists, + // the participant MUST follow the Resolve Conflicts procedure. + // https://rfc.vac.dev/vac/raw/sds/#resolve-conflicts + this.messageIdLog.push({ + timestamp: messageLamportTimestamp, + messageId: message.messageId + }); + this.messageIdLog.sort((a, b) => { + if (a.timestamp !== b.timestamp) { + return a.timestamp - b.timestamp; + } + return a.messageId.localeCompare(b.messageId); + }); + } + + // See https://rfc.vac.dev/vac/raw/sds/#review-ack-status + private reviewAckStatus(receivedMessage: Message): void { + // the participant MUST mark all messages in the received causal_history as acknowledged. + receivedMessage.causalHistory.forEach((messageId) => { + this.outgoingBuffer = this.outgoingBuffer.filter( + (msg) => msg.messageId !== messageId + ); + this.acknowledgements.delete(messageId); + if (!this.filter.lookup(messageId)) { + this.filter.insert(messageId); + } + }); + // the participant MUST mark all messages included in the bloom_filter as possibly acknowledged + if (!receivedMessage.bloomFilter) { + return; + } + const messageBloomFilter = DefaultBloomFilter.fromBytes( + receivedMessage.bloomFilter, + this.filter.options + ); + this.outgoingBuffer = this.outgoingBuffer.filter((message) => { + if (!messageBloomFilter.lookup(message.messageId)) { + return true; + } + // If a message appears as possibly acknowledged in multiple received bloom filters, + // the participant MAY mark it as acknowledged based on probabilistic grounds, + // taking into account the bloom filter size and hash number. + const count = (this.acknowledgements.get(message.messageId) ?? 0) + 1; + if (count < this.acknowledgementCount) { + this.acknowledgements.set(message.messageId, count); + return true; + } + this.acknowledgements.delete(message.messageId); + return false; + }); + } + + // TODO: this should be determined based on the bloom filter parameters and number of hashes + private getAcknowledgementCount(): number { + return 2; + } +} diff --git a/packages/sds/tsconfig.json b/packages/sds/tsconfig.json index eebbc51585..cb9dbbfe2b 100644 --- a/packages/sds/tsconfig.json +++ b/packages/sds/tsconfig.json @@ -3,7 +3,9 @@ "compilerOptions": { "outDir": "dist/", "rootDir": "src", - "tsBuildInfoFile": "dist/.tsbuildinfo" + "tsBuildInfoFile": "dist/.tsbuildinfo", + "allowJs": true, + "moduleResolution": "node" }, "include": ["src"], "exclude": ["src/**/*.spec.ts", "src/test_utils"]