mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-01-02 13:53:12 +00:00
feat(sds): add message channel with buffers and send/receive logic
This commit creates the class for an SDS message channel, including buffers for outgoing and incoming messages. Adds logic for sending messages, receiving messages, delivering messages, and reviewing acknowledgement status of messages. Also adds byte serialization for bloom filters.
This commit is contained in:
parent
a3fb1d7a5b
commit
389ca4062e
@ -64,6 +64,7 @@
|
||||
"kdfparams",
|
||||
"keccak",
|
||||
"keypair",
|
||||
"lamport",
|
||||
"lastpub",
|
||||
"libauth",
|
||||
"libp",
|
||||
|
||||
5
package-lock.json
generated
5
package-lock.json
generated
@ -6937,7 +6937,6 @@
|
||||
"version": "1.7.1",
|
||||
"resolved": "https://registry.npmjs.org/@noble/hashes/-/hashes-1.7.1.tgz",
|
||||
"integrity": "sha512-B8XBPsn4vT/KJAGqDzbwztd+6Yte3P4V7iafm24bxgDe/mlRuK6xmWPuCNrKt2vDafZ8MfJLlchDG/vYafQEjQ==",
|
||||
"license": "MIT",
|
||||
"engines": {
|
||||
"node": "^14.21.3 || >=16"
|
||||
},
|
||||
@ -42781,6 +42780,10 @@
|
||||
"version": "0.0.1",
|
||||
"license": "MIT OR Apache-2.0",
|
||||
"dependencies": {
|
||||
"@noble/hashes": "^1.7.1",
|
||||
"@waku/message-hash": "^0.1.17",
|
||||
"@waku/proto": "^0.0.8",
|
||||
"@waku/utils": "^0.0.21",
|
||||
"chai": "^5.1.2"
|
||||
},
|
||||
"devDependencies": {
|
||||
|
||||
@ -15,3 +15,5 @@ export * as proto_store from './generated/store_v3.js'
|
||||
export * as proto_peer_exchange from "./generated/peer_exchange.js";
|
||||
|
||||
export * as proto_metadata from './generated/metadata.js'
|
||||
|
||||
export * as proto_sds_message from './generated/sds_message.js'
|
||||
@ -8,7 +8,7 @@ const config = {
|
||||
'loader=ts-node/esm'
|
||||
],
|
||||
exit: true,
|
||||
retries: 4
|
||||
retries: 2
|
||||
};
|
||||
|
||||
if (process.env.CI) {
|
||||
|
||||
@ -59,6 +59,10 @@
|
||||
"node": ">=20"
|
||||
},
|
||||
"dependencies": {
|
||||
"@noble/hashes": "^1.7.1",
|
||||
"@waku/message-hash": "^0.1.17",
|
||||
"@waku/proto": "^0.0.8",
|
||||
"@waku/utils": "^0.0.21",
|
||||
"chai": "^5.1.2"
|
||||
},
|
||||
"devDependencies": {
|
||||
|
||||
@ -1,7 +1,6 @@
|
||||
import { expect } from "chai";
|
||||
|
||||
import { BloomFilter } from "./bloom.js";
|
||||
import { hashN } from "./nim_hashn/nim_hashn.mjs";
|
||||
import { BloomFilter, DefaultBloomFilter } from "./bloom.js";
|
||||
|
||||
const n = 10000;
|
||||
const sampleChars =
|
||||
@ -20,13 +19,10 @@ describe("BloomFilter", () => {
|
||||
let testElements: string[];
|
||||
|
||||
beforeEach(() => {
|
||||
bloomFilter = new BloomFilter(
|
||||
{
|
||||
capacity: n,
|
||||
errorRate: 0.001
|
||||
},
|
||||
hashN
|
||||
);
|
||||
bloomFilter = new DefaultBloomFilter({
|
||||
capacity: n,
|
||||
errorRate: 0.001
|
||||
});
|
||||
|
||||
testElements = new Array<string>(n);
|
||||
|
||||
@ -55,15 +51,12 @@ describe("BloomFilter", () => {
|
||||
expect(bloomFilter.kHashes).to.equal(10);
|
||||
expect(bloomFilter.totalBits / n).to.equal(15);
|
||||
|
||||
const bloomFilter2 = new BloomFilter(
|
||||
{
|
||||
capacity: 10000,
|
||||
errorRate: 0.001,
|
||||
kHashes: 4,
|
||||
forceNBitsPerElem: 20
|
||||
},
|
||||
hashN
|
||||
);
|
||||
const bloomFilter2 = new DefaultBloomFilter({
|
||||
capacity: 10000,
|
||||
errorRate: 0.001,
|
||||
kHashes: 4,
|
||||
forceNBitsPerElem: 20
|
||||
});
|
||||
expect(bloomFilter2.kHashes).to.equal(4);
|
||||
expect(bloomFilter2.totalBits).to.equal(200000);
|
||||
});
|
||||
@ -107,6 +100,17 @@ describe("BloomFilter", () => {
|
||||
expect(bloomFilter.lookup(item)).to.equal(true);
|
||||
}
|
||||
});
|
||||
|
||||
it("should serialize and deserialize correctly", () => {
|
||||
const serialized = bloomFilter.toBytes();
|
||||
const deserialized = DefaultBloomFilter.fromBytes(
|
||||
serialized,
|
||||
bloomFilter.options
|
||||
);
|
||||
for (const item of testElements) {
|
||||
expect(deserialized.lookup(item)).to.equal(true);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("BloomFilter with special patterns", () => {
|
||||
@ -114,13 +118,10 @@ describe("BloomFilter with special patterns", () => {
|
||||
const inserted: string[] = [];
|
||||
|
||||
beforeEach(() => {
|
||||
bloomFilter = new BloomFilter(
|
||||
{
|
||||
capacity: n,
|
||||
errorRate: 0.001
|
||||
},
|
||||
hashN
|
||||
);
|
||||
bloomFilter = new DefaultBloomFilter({
|
||||
capacity: n,
|
||||
errorRate: 0.001
|
||||
});
|
||||
});
|
||||
|
||||
it("should handle special patterns correctly", () => {
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
import { hashN } from "./nim_hashn/nim_hashn.mjs";
|
||||
import { getMOverNBitsForK } from "./probabilities.js";
|
||||
|
||||
export interface BloomFilterOptions {
|
||||
@ -30,11 +31,15 @@ export class BloomFilter {
|
||||
public kHashes: number;
|
||||
public errorRate: number;
|
||||
|
||||
public options: BloomFilterOptions;
|
||||
|
||||
private hashN: (item: string, n: number, maxValue: number) => number;
|
||||
public constructor(
|
||||
options: BloomFilterOptions,
|
||||
hashN: (item: string, n: number, maxValue: number) => number
|
||||
) {
|
||||
this.options = options;
|
||||
|
||||
let nBitsPerElem: number;
|
||||
let k = options.kHashes ?? 0;
|
||||
const forceNBitsPerElem = options.forceNBitsPerElem ?? 0;
|
||||
@ -103,4 +108,39 @@ export class BloomFilter {
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public toBytes(): Uint8Array {
|
||||
const buffer = new ArrayBuffer(this.data.length * 8);
|
||||
const view = new DataView(buffer);
|
||||
for (let i = 0; i < this.data.length; i++) {
|
||||
view.setBigInt64(i * 8, this.data[i]);
|
||||
}
|
||||
return new Uint8Array(buffer);
|
||||
}
|
||||
|
||||
public static fromBytes(
|
||||
bytes: Uint8Array,
|
||||
options: BloomFilterOptions,
|
||||
hashN: (item: string, n: number, maxValue: number) => number
|
||||
): BloomFilter {
|
||||
const bloomFilter = new BloomFilter(options, hashN);
|
||||
const view = new DataView(bytes.buffer);
|
||||
for (let i = 0; i < bloomFilter.data.length; i++) {
|
||||
bloomFilter.data[i] = view.getBigUint64(i * 8, false);
|
||||
}
|
||||
return bloomFilter;
|
||||
}
|
||||
}
|
||||
|
||||
export class DefaultBloomFilter extends BloomFilter {
|
||||
public constructor(options: BloomFilterOptions) {
|
||||
super(options, hashN);
|
||||
}
|
||||
|
||||
public static fromBytes(
|
||||
bytes: Uint8Array,
|
||||
options: BloomFilterOptions
|
||||
): DefaultBloomFilter {
|
||||
return BloomFilter.fromBytes(bytes, options, hashN);
|
||||
}
|
||||
}
|
||||
|
||||
333
packages/sds/src/sds.spec.ts
Normal file
333
packages/sds/src/sds.spec.ts
Normal file
@ -0,0 +1,333 @@
|
||||
import { utf8ToBytes } from "@waku/utils/bytes";
|
||||
import { expect } from "chai";
|
||||
|
||||
import { DefaultBloomFilter } from "./bloom.js";
|
||||
import {
|
||||
DEFAULT_BLOOM_FILTER_OPTIONS,
|
||||
Message,
|
||||
MessageChannel
|
||||
} from "./sds.js";
|
||||
|
||||
const channelId = "test-channel";
|
||||
const callback = (_message: Message): boolean => {
|
||||
return true;
|
||||
};
|
||||
|
||||
const getBloomFilter = (channel: MessageChannel): DefaultBloomFilter => {
|
||||
return (channel as any).filter as DefaultBloomFilter;
|
||||
};
|
||||
|
||||
const messagesA = ["message-1", "message-2"];
|
||||
const messagesB = [
|
||||
"message-3",
|
||||
"message-4",
|
||||
"message-5",
|
||||
"message-6",
|
||||
"message-7"
|
||||
];
|
||||
|
||||
describe("MessageChannel", function () {
|
||||
this.timeout(5000);
|
||||
let channelA: MessageChannel;
|
||||
let channelB: MessageChannel;
|
||||
|
||||
describe("sending a message ", () => {
|
||||
beforeEach(() => {
|
||||
channelA = new MessageChannel(channelId);
|
||||
});
|
||||
|
||||
it("should increase lamport timestamp", () => {
|
||||
const timestampBefore = (channelA as any).lamportTimestamp;
|
||||
channelA.sendMessage(new Uint8Array(), callback);
|
||||
const timestampAfter = (channelA as any).lamportTimestamp;
|
||||
expect(timestampAfter).to.equal(timestampBefore + 1);
|
||||
});
|
||||
|
||||
it("should push the message to the outgoing buffer", () => {
|
||||
const bufferLengthBefore = (channelA as any).outgoingBuffer.length;
|
||||
channelA.sendMessage(new Uint8Array(), callback);
|
||||
const bufferLengthAfter = (channelA as any).outgoingBuffer.length;
|
||||
expect(bufferLengthAfter).to.equal(bufferLengthBefore + 1);
|
||||
});
|
||||
|
||||
it("should insert message into bloom filter", () => {
|
||||
const messageId = MessageChannel.getMessageId(new Uint8Array());
|
||||
channelA.sendMessage(new Uint8Array(), callback);
|
||||
const bloomFilter = getBloomFilter(channelA);
|
||||
expect(bloomFilter.lookup(messageId)).to.equal(true);
|
||||
});
|
||||
|
||||
it("should insert message id into causal history", () => {
|
||||
const expectedTimestamp = (channelA as any).lamportTimestamp + 1;
|
||||
const messageId = MessageChannel.getMessageId(new Uint8Array());
|
||||
channelA.sendMessage(new Uint8Array(), callback);
|
||||
const messageIdLog = (channelA as any).messageIdLog as {
|
||||
timestamp: number;
|
||||
messageId: string;
|
||||
}[];
|
||||
expect(messageIdLog.length).to.equal(1);
|
||||
expect(
|
||||
messageIdLog.some(
|
||||
(log) =>
|
||||
log.timestamp === expectedTimestamp && log.messageId === messageId
|
||||
)
|
||||
).to.equal(true);
|
||||
});
|
||||
|
||||
it("should attach causal history and bloom filter to each message", () => {
|
||||
const bloomFilter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS);
|
||||
const causalHistorySize = (channelA as any).causalHistorySize;
|
||||
const filterBytes = new Array<Uint8Array>();
|
||||
const messages = new Array<string>(causalHistorySize + 5)
|
||||
.fill("message")
|
||||
.map((message, index) => `${message}-${index}`);
|
||||
|
||||
messages.forEach((message) => {
|
||||
filterBytes.push(bloomFilter.toBytes());
|
||||
channelA.sendMessage(utf8ToBytes(message), callback);
|
||||
bloomFilter.insert(MessageChannel.getMessageId(utf8ToBytes(message)));
|
||||
});
|
||||
|
||||
const outgoingBuffer = (channelA as any).outgoingBuffer as Message[];
|
||||
expect(outgoingBuffer.length).to.equal(messages.length);
|
||||
|
||||
outgoingBuffer.forEach((message, index) => {
|
||||
expect(message.content).to.deep.equal(utf8ToBytes(messages[index]));
|
||||
// Correct bloom filter should be attached to each message
|
||||
expect(message.bloomFilter).to.deep.equal(filterBytes[index]);
|
||||
});
|
||||
|
||||
// Causal history should only contain the last N messages as defined by causalHistorySize
|
||||
const causalHistory = outgoingBuffer[outgoingBuffer.length - 1]
|
||||
.causalHistory as string[];
|
||||
expect(causalHistory.length).to.equal(causalHistorySize);
|
||||
|
||||
const expectedCausalHistory = messages
|
||||
.slice(-causalHistorySize - 1, -1)
|
||||
.map((message) => MessageChannel.getMessageId(utf8ToBytes(message)));
|
||||
expect(causalHistory).to.deep.equal(expectedCausalHistory);
|
||||
});
|
||||
});
|
||||
|
||||
describe("receiving a message", () => {
|
||||
beforeEach(() => {
|
||||
channelA = new MessageChannel(channelId);
|
||||
channelB = new MessageChannel(channelId);
|
||||
});
|
||||
|
||||
it("should increase lamport timestamp", () => {
|
||||
const timestampBefore = (channelA as any).lamportTimestamp;
|
||||
channelB.sendMessage(new Uint8Array(), (message) => {
|
||||
channelA.receiveMessage(message);
|
||||
return true;
|
||||
});
|
||||
const timestampAfter = (channelA as any).lamportTimestamp;
|
||||
expect(timestampAfter).to.equal(timestampBefore + 1);
|
||||
});
|
||||
|
||||
it("should update lamport timestamp if greater than current timestamp and dependencies are met", () => {
|
||||
messagesA.forEach((m) => {
|
||||
channelA.sendMessage(utf8ToBytes(m), callback);
|
||||
});
|
||||
messagesB.forEach((m) => {
|
||||
channelB.sendMessage(utf8ToBytes(m), (message) => {
|
||||
channelA.receiveMessage(message);
|
||||
return true;
|
||||
});
|
||||
});
|
||||
const timestampAfter = (channelA as any).lamportTimestamp;
|
||||
expect(timestampAfter).to.equal(messagesB.length);
|
||||
});
|
||||
|
||||
it("should maintain proper timestamps if all messages received", () => {
|
||||
let timestamp = 0;
|
||||
messagesA.forEach((m) => {
|
||||
channelA.sendMessage(utf8ToBytes(m), (message) => {
|
||||
timestamp++;
|
||||
channelB.receiveMessage(message);
|
||||
expect((channelB as any).lamportTimestamp).to.equal(timestamp);
|
||||
return true;
|
||||
});
|
||||
});
|
||||
|
||||
messagesB.forEach((m) => {
|
||||
channelB.sendMessage(utf8ToBytes(m), (message) => {
|
||||
timestamp++;
|
||||
channelA.receiveMessage(message);
|
||||
expect((channelA as any).lamportTimestamp).to.equal(timestamp);
|
||||
return true;
|
||||
});
|
||||
});
|
||||
|
||||
const expectedLength = messagesA.length + messagesB.length;
|
||||
expect((channelA as any).lamportTimestamp).to.equal(expectedLength);
|
||||
expect((channelA as any).lamportTimestamp).to.equal(
|
||||
(channelB as any).lamportTimestamp
|
||||
);
|
||||
});
|
||||
|
||||
it("should add received messages to bloom filter", () => {
|
||||
messagesA.forEach((m) => {
|
||||
channelA.sendMessage(utf8ToBytes(m), (message) => {
|
||||
channelB.receiveMessage(message);
|
||||
const bloomFilter = getBloomFilter(channelB);
|
||||
expect(bloomFilter.lookup(message.messageId)).to.equal(true);
|
||||
return true;
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it("should add to incoming buffer if dependencies are not met", () => {
|
||||
messagesA.forEach((m) => {
|
||||
channelA.sendMessage(utf8ToBytes(m), callback);
|
||||
});
|
||||
|
||||
let receivedMessage: Message | null = null;
|
||||
const timestampBefore = (channelB as any).lamportTimestamp;
|
||||
|
||||
channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
|
||||
receivedMessage = message;
|
||||
channelB.receiveMessage(message);
|
||||
return true;
|
||||
});
|
||||
|
||||
const incomingBuffer = (channelB as any).incomingBuffer as Message[];
|
||||
expect(incomingBuffer.length).to.equal(1);
|
||||
expect(incomingBuffer[0].messageId).to.equal(receivedMessage!.messageId);
|
||||
|
||||
// Since the dependency is not met, the lamport timestamp should not increase
|
||||
const timestampAfter = (channelB as any).lamportTimestamp;
|
||||
expect(timestampAfter).to.equal(timestampBefore);
|
||||
|
||||
// Message should not be in local history
|
||||
const localHistory = (channelB as any).messageIdLog as {
|
||||
timestamp: number;
|
||||
messageId: string;
|
||||
}[];
|
||||
expect(
|
||||
localHistory.some((m) => m.messageId === receivedMessage!.messageId)
|
||||
).to.equal(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("reviewing ack status", () => {
|
||||
beforeEach(() => {
|
||||
channelA = new MessageChannel(channelId);
|
||||
channelB = new MessageChannel(channelId);
|
||||
});
|
||||
|
||||
it("should mark all messages in causal history as acknowledged", () => {
|
||||
messagesA.forEach((m) => {
|
||||
channelA.sendMessage(utf8ToBytes(m), (message) => {
|
||||
channelB.receiveMessage(message);
|
||||
return true;
|
||||
});
|
||||
});
|
||||
|
||||
let notInHistory: Message | null = null;
|
||||
channelA.sendMessage(utf8ToBytes("not-in-history"), (message) => {
|
||||
notInHistory = message;
|
||||
return true;
|
||||
});
|
||||
|
||||
expect((channelA as any).outgoingBuffer.length).to.equal(
|
||||
messagesA.length + 1
|
||||
);
|
||||
|
||||
channelB.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
|
||||
channelA.receiveMessage(message);
|
||||
return true;
|
||||
});
|
||||
|
||||
// Since messagesA are in causal history of channel B's message
|
||||
// they should be gone from channel A's outgoing buffer
|
||||
// and notInHistory should still be in the outgoing buffer
|
||||
const outgoingBuffer = (channelA as any).outgoingBuffer as Message[];
|
||||
expect(outgoingBuffer.length).to.equal(1);
|
||||
expect(outgoingBuffer[0].messageId).to.equal(notInHistory!.messageId);
|
||||
});
|
||||
|
||||
it("should track probabilistic acknowledgements of messages received in bloom filter", () => {
|
||||
const acknowledgementCount = (channelA as any).acknowledgementCount;
|
||||
|
||||
const causalHistorySize = (channelA as any).causalHistorySize;
|
||||
|
||||
const unacknowledgedMessages = [
|
||||
"unacknowledged-message-1",
|
||||
"unacknowledged-message-2"
|
||||
];
|
||||
const messages = [...messagesA, ...messagesB.slice(0, -1)];
|
||||
// Send messages to be received by channel B
|
||||
messages.forEach((m) => {
|
||||
channelA.sendMessage(utf8ToBytes(m), (message) => {
|
||||
channelB.receiveMessage(message);
|
||||
return true;
|
||||
});
|
||||
});
|
||||
|
||||
// Send messages not received by channel B
|
||||
unacknowledgedMessages.forEach((m) => {
|
||||
channelA.sendMessage(utf8ToBytes(m), callback);
|
||||
});
|
||||
|
||||
// Channel B sends a message to channel A
|
||||
channelB.sendMessage(
|
||||
utf8ToBytes(messagesB[messagesB.length - 1]),
|
||||
(message) => {
|
||||
channelA.receiveMessage(message);
|
||||
return true;
|
||||
}
|
||||
);
|
||||
|
||||
const acknowledgements: ReadonlyMap<string, number> = (channelA as any)
|
||||
.acknowledgements;
|
||||
// Other than the message IDs which were included in causal history,
|
||||
// the remaining messages sent by channel A should be considered possibly acknowledged
|
||||
// for having been included in the bloom filter sent from channel B
|
||||
const expectedAcknowledgementsSize = messages.length - causalHistorySize;
|
||||
if (expectedAcknowledgementsSize <= 0) {
|
||||
throw new Error("expectedAcknowledgementsSize must be greater than 0");
|
||||
}
|
||||
expect(acknowledgements.size).to.equal(expectedAcknowledgementsSize);
|
||||
// Channel B only included the last N messages in causal history
|
||||
messages.slice(0, -causalHistorySize).forEach((m) => {
|
||||
expect(
|
||||
acknowledgements.get(MessageChannel.getMessageId(utf8ToBytes(m)))
|
||||
).to.equal(1);
|
||||
});
|
||||
|
||||
// Messages that never reached channel B should not be acknowledged
|
||||
unacknowledgedMessages.forEach((m) => {
|
||||
expect(
|
||||
acknowledgements.has(MessageChannel.getMessageId(utf8ToBytes(m)))
|
||||
).to.equal(false);
|
||||
});
|
||||
|
||||
// When channel C sends more messages, it will include all the same messages
|
||||
// in the bloom filter as before, which should mark them as fully acknowledged in channel A
|
||||
for (let i = 1; i < acknowledgementCount; i++) {
|
||||
// Send messages until acknowledgement count is reached
|
||||
channelB.sendMessage(utf8ToBytes(`x-${i}`), (message) => {
|
||||
channelA.receiveMessage(message);
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
// No more partial acknowledgements should be in channel A
|
||||
expect(acknowledgements.size).to.equal(0);
|
||||
|
||||
// Messages that were not acknowledged should still be in the outgoing buffer
|
||||
expect((channelA as any).outgoingBuffer.length).to.equal(
|
||||
unacknowledgedMessages.length
|
||||
);
|
||||
unacknowledgedMessages.forEach((m) => {
|
||||
expect(
|
||||
((channelA as any).outgoingBuffer as Message[]).some(
|
||||
(message) =>
|
||||
message.messageId === MessageChannel.getMessageId(utf8ToBytes(m))
|
||||
)
|
||||
).to.equal(true);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
190
packages/sds/src/sds.ts
Normal file
190
packages/sds/src/sds.ts
Normal file
@ -0,0 +1,190 @@
|
||||
import { sha256 } from "@noble/hashes/sha256";
|
||||
import { bytesToHex } from "@noble/hashes/utils";
|
||||
import { proto_sds_message } from "@waku/proto";
|
||||
|
||||
import { DefaultBloomFilter } from "./bloom.js";
|
||||
|
||||
export type Message = proto_sds_message.SdsMessage;
|
||||
export type ChannelId = string;
|
||||
|
||||
export const DEFAULT_BLOOM_FILTER_OPTIONS = {
|
||||
capacity: 10000,
|
||||
errorRate: 0.001
|
||||
};
|
||||
|
||||
const DEFAULT_CAUSAL_HISTORY_SIZE = 2;
|
||||
|
||||
export class MessageChannel {
|
||||
private lamportTimestamp: number;
|
||||
private filter: DefaultBloomFilter;
|
||||
private outgoingBuffer: Message[];
|
||||
private acknowledgements: Map<string, number>;
|
||||
private incomingBuffer: Message[];
|
||||
private messageIdLog: { timestamp: number; messageId: string }[];
|
||||
private channelId: ChannelId;
|
||||
private causalHistorySize: number;
|
||||
private acknowledgementCount: number;
|
||||
|
||||
public constructor(
|
||||
channelId: ChannelId,
|
||||
causalHistorySize: number = DEFAULT_CAUSAL_HISTORY_SIZE
|
||||
) {
|
||||
this.channelId = channelId;
|
||||
this.lamportTimestamp = 0;
|
||||
this.filter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS);
|
||||
this.outgoingBuffer = [];
|
||||
this.acknowledgements = new Map();
|
||||
this.incomingBuffer = [];
|
||||
this.messageIdLog = [];
|
||||
this.causalHistorySize = causalHistorySize;
|
||||
this.acknowledgementCount = this.getAcknowledgementCount();
|
||||
}
|
||||
|
||||
public static getMessageId(payload: Uint8Array): string {
|
||||
return bytesToHex(sha256(payload));
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a message to the SDS channel.
|
||||
*
|
||||
* Increments the lamport timestamp, constructs a `Message` object
|
||||
* with the given payload, and adds it to the outgoing buffer.
|
||||
*
|
||||
* If the callback is successful, the message is also added to
|
||||
* the bloom filter and message history. In the context of
|
||||
* Waku, this likely means the message was published via
|
||||
* light push or relay.
|
||||
*
|
||||
* See https://rfc.vac.dev/vac/raw/sds/#send-message
|
||||
*
|
||||
* @param payload - The payload to send.
|
||||
* @param callback - A callback function that returns a boolean indicating whether the message was sent successfully.
|
||||
*/
|
||||
public sendMessage(
|
||||
payload: Uint8Array,
|
||||
callback?: (message: Message) => boolean
|
||||
): void {
|
||||
this.lamportTimestamp++;
|
||||
|
||||
const messageId = MessageChannel.getMessageId(payload);
|
||||
|
||||
const message: Message = {
|
||||
messageId,
|
||||
channelId: this.channelId,
|
||||
lamportTimestamp: this.lamportTimestamp,
|
||||
causalHistory: this.messageIdLog
|
||||
.slice(-this.causalHistorySize)
|
||||
.map(({ messageId }) => messageId),
|
||||
bloomFilter: this.filter.toBytes(),
|
||||
content: payload
|
||||
};
|
||||
|
||||
this.outgoingBuffer.push(message);
|
||||
|
||||
if (callback) {
|
||||
const success = callback(message);
|
||||
if (success) {
|
||||
this.filter.insert(messageId);
|
||||
this.messageIdLog.push({ timestamp: this.lamportTimestamp, messageId });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a received SDS message for this channel.
|
||||
*
|
||||
* Review the acknowledgement status of messages in the outgoing buffer
|
||||
* by inspecting the received message's bloom filter and causal history.
|
||||
* Add the received message to the bloom filter.
|
||||
* If the local history contains every message in the received message's
|
||||
* causal history, deliver the message. Otherwise, add the message to the
|
||||
* incoming buffer.
|
||||
*
|
||||
* See https://rfc.vac.dev/vac/raw/sds/#receive-message
|
||||
*
|
||||
* @param message - The received SDS message.
|
||||
*/
|
||||
public receiveMessage(message: Message): void {
|
||||
// review ack status
|
||||
this.reviewAckStatus(message);
|
||||
// add to bloom filter
|
||||
this.filter.insert(message.messageId);
|
||||
// verify causal history
|
||||
const dependenciesMet = message.causalHistory.every((messageId) =>
|
||||
this.messageIdLog.some(
|
||||
({ messageId: logMessageId }) => logMessageId === messageId
|
||||
)
|
||||
);
|
||||
if (!dependenciesMet) {
|
||||
this.incomingBuffer.push(message);
|
||||
} else {
|
||||
this.deliverMessage(message);
|
||||
}
|
||||
}
|
||||
|
||||
// See https://rfc.vac.dev/vac/raw/sds/#deliver-message
|
||||
private deliverMessage(message: Message): void {
|
||||
const messageLamportTimestamp = message.lamportTimestamp ?? 0;
|
||||
if (messageLamportTimestamp > this.lamportTimestamp) {
|
||||
this.lamportTimestamp = messageLamportTimestamp;
|
||||
}
|
||||
|
||||
// The participant MUST insert the message ID into its local log,
|
||||
// based on Lamport timestamp.
|
||||
// If one or more message IDs with the same Lamport timestamp already exists,
|
||||
// the participant MUST follow the Resolve Conflicts procedure.
|
||||
// https://rfc.vac.dev/vac/raw/sds/#resolve-conflicts
|
||||
this.messageIdLog.push({
|
||||
timestamp: messageLamportTimestamp,
|
||||
messageId: message.messageId
|
||||
});
|
||||
this.messageIdLog.sort((a, b) => {
|
||||
if (a.timestamp !== b.timestamp) {
|
||||
return a.timestamp - b.timestamp;
|
||||
}
|
||||
return a.messageId.localeCompare(b.messageId);
|
||||
});
|
||||
}
|
||||
|
||||
// See https://rfc.vac.dev/vac/raw/sds/#review-ack-status
|
||||
private reviewAckStatus(receivedMessage: Message): void {
|
||||
// the participant MUST mark all messages in the received causal_history as acknowledged.
|
||||
receivedMessage.causalHistory.forEach((messageId) => {
|
||||
this.outgoingBuffer = this.outgoingBuffer.filter(
|
||||
(msg) => msg.messageId !== messageId
|
||||
);
|
||||
this.acknowledgements.delete(messageId);
|
||||
if (!this.filter.lookup(messageId)) {
|
||||
this.filter.insert(messageId);
|
||||
}
|
||||
});
|
||||
// the participant MUST mark all messages included in the bloom_filter as possibly acknowledged
|
||||
if (!receivedMessage.bloomFilter) {
|
||||
return;
|
||||
}
|
||||
const messageBloomFilter = DefaultBloomFilter.fromBytes(
|
||||
receivedMessage.bloomFilter,
|
||||
this.filter.options
|
||||
);
|
||||
this.outgoingBuffer = this.outgoingBuffer.filter((message) => {
|
||||
if (!messageBloomFilter.lookup(message.messageId)) {
|
||||
return true;
|
||||
}
|
||||
// If a message appears as possibly acknowledged in multiple received bloom filters,
|
||||
// the participant MAY mark it as acknowledged based on probabilistic grounds,
|
||||
// taking into account the bloom filter size and hash number.
|
||||
const count = (this.acknowledgements.get(message.messageId) ?? 0) + 1;
|
||||
if (count < this.acknowledgementCount) {
|
||||
this.acknowledgements.set(message.messageId, count);
|
||||
return true;
|
||||
}
|
||||
this.acknowledgements.delete(message.messageId);
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
// TODO: this should be determined based on the bloom filter parameters and number of hashes
|
||||
private getAcknowledgementCount(): number {
|
||||
return 2;
|
||||
}
|
||||
}
|
||||
@ -3,7 +3,9 @@
|
||||
"compilerOptions": {
|
||||
"outDir": "dist/",
|
||||
"rootDir": "src",
|
||||
"tsBuildInfoFile": "dist/.tsbuildinfo"
|
||||
"tsBuildInfoFile": "dist/.tsbuildinfo",
|
||||
"allowJs": true,
|
||||
"moduleResolution": "node"
|
||||
},
|
||||
"include": ["src"],
|
||||
"exclude": ["src/**/*.spec.ts", "src/test_utils"]
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user