mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-01-02 13:53:12 +00:00
Merge pull request #2293 from waku-org/feat/sds-sync
feat(sds): send and receive sync messages
This commit is contained in:
commit
8db7690233
@ -9,8 +9,8 @@ import {
|
||||
} from "./sds.js";
|
||||
|
||||
const channelId = "test-channel";
|
||||
const callback = (_message: Message): boolean => {
|
||||
return true;
|
||||
const callback = (_message: Message): Promise<boolean> => {
|
||||
return Promise.resolve(true);
|
||||
};
|
||||
|
||||
const getBloomFilter = (channel: MessageChannel): DefaultBloomFilter => {
|
||||
@ -36,31 +36,31 @@ describe("MessageChannel", function () {
|
||||
channelA = new MessageChannel(channelId);
|
||||
});
|
||||
|
||||
it("should increase lamport timestamp", () => {
|
||||
it("should increase lamport timestamp", async () => {
|
||||
const timestampBefore = (channelA as any).lamportTimestamp;
|
||||
channelA.sendMessage(new Uint8Array(), callback);
|
||||
await channelA.sendMessage(new Uint8Array(), callback);
|
||||
const timestampAfter = (channelA as any).lamportTimestamp;
|
||||
expect(timestampAfter).to.equal(timestampBefore + 1);
|
||||
});
|
||||
|
||||
it("should push the message to the outgoing buffer", () => {
|
||||
it("should push the message to the outgoing buffer", async () => {
|
||||
const bufferLengthBefore = (channelA as any).outgoingBuffer.length;
|
||||
channelA.sendMessage(new Uint8Array(), callback);
|
||||
await channelA.sendMessage(new Uint8Array(), callback);
|
||||
const bufferLengthAfter = (channelA as any).outgoingBuffer.length;
|
||||
expect(bufferLengthAfter).to.equal(bufferLengthBefore + 1);
|
||||
});
|
||||
|
||||
it("should insert message into bloom filter", () => {
|
||||
it("should insert message into bloom filter", async () => {
|
||||
const messageId = MessageChannel.getMessageId(new Uint8Array());
|
||||
channelA.sendMessage(new Uint8Array(), callback);
|
||||
await channelA.sendMessage(new Uint8Array(), callback);
|
||||
const bloomFilter = getBloomFilter(channelA);
|
||||
expect(bloomFilter.lookup(messageId)).to.equal(true);
|
||||
});
|
||||
|
||||
it("should insert message id into causal history", () => {
|
||||
it("should insert message id into causal history", async () => {
|
||||
const expectedTimestamp = (channelA as any).lamportTimestamp + 1;
|
||||
const messageId = MessageChannel.getMessageId(new Uint8Array());
|
||||
channelA.sendMessage(new Uint8Array(), callback);
|
||||
await channelA.sendMessage(new Uint8Array(), callback);
|
||||
const messageIdLog = (channelA as any).messageIdLog as {
|
||||
timestamp: number;
|
||||
messageId: string;
|
||||
@ -74,7 +74,7 @@ describe("MessageChannel", function () {
|
||||
).to.equal(true);
|
||||
});
|
||||
|
||||
it("should attach causal history and bloom filter to each message", () => {
|
||||
it("should attach causal history and bloom filter to each message", async () => {
|
||||
const bloomFilter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS);
|
||||
const causalHistorySize = (channelA as any).causalHistorySize;
|
||||
const filterBytes = new Array<Uint8Array>();
|
||||
@ -82,11 +82,11 @@ describe("MessageChannel", function () {
|
||||
.fill("message")
|
||||
.map((message, index) => `${message}-${index}`);
|
||||
|
||||
messages.forEach((message) => {
|
||||
for (const message of messages) {
|
||||
filterBytes.push(bloomFilter.toBytes());
|
||||
channelA.sendMessage(utf8ToBytes(message), callback);
|
||||
await channelA.sendMessage(utf8ToBytes(message), callback);
|
||||
bloomFilter.insert(MessageChannel.getMessageId(utf8ToBytes(message)));
|
||||
});
|
||||
}
|
||||
|
||||
const outgoingBuffer = (channelA as any).outgoingBuffer as Message[];
|
||||
expect(outgoingBuffer.length).to.equal(messages.length);
|
||||
@ -115,49 +115,49 @@ describe("MessageChannel", function () {
|
||||
channelB = new MessageChannel(channelId);
|
||||
});
|
||||
|
||||
it("should increase lamport timestamp", () => {
|
||||
it("should increase lamport timestamp", async () => {
|
||||
const timestampBefore = (channelA as any).lamportTimestamp;
|
||||
channelB.sendMessage(new Uint8Array(), (message) => {
|
||||
await channelB.sendMessage(new Uint8Array(), (message) => {
|
||||
channelA.receiveMessage(message);
|
||||
return true;
|
||||
return Promise.resolve(true);
|
||||
});
|
||||
const timestampAfter = (channelA as any).lamportTimestamp;
|
||||
expect(timestampAfter).to.equal(timestampBefore + 1);
|
||||
});
|
||||
|
||||
it("should update lamport timestamp if greater than current timestamp and dependencies are met", () => {
|
||||
messagesA.forEach((m) => {
|
||||
channelA.sendMessage(utf8ToBytes(m), callback);
|
||||
});
|
||||
messagesB.forEach((m) => {
|
||||
channelB.sendMessage(utf8ToBytes(m), (message) => {
|
||||
it("should update lamport timestamp if greater than current timestamp and dependencies are met", async () => {
|
||||
for (const m of messagesA) {
|
||||
await channelA.sendMessage(utf8ToBytes(m), callback);
|
||||
}
|
||||
for (const m of messagesB) {
|
||||
await channelB.sendMessage(utf8ToBytes(m), (message) => {
|
||||
channelA.receiveMessage(message);
|
||||
return true;
|
||||
return Promise.resolve(true);
|
||||
});
|
||||
});
|
||||
}
|
||||
const timestampAfter = (channelA as any).lamportTimestamp;
|
||||
expect(timestampAfter).to.equal(messagesB.length);
|
||||
});
|
||||
|
||||
it("should maintain proper timestamps if all messages received", () => {
|
||||
it("should maintain proper timestamps if all messages received", async () => {
|
||||
let timestamp = 0;
|
||||
messagesA.forEach((m) => {
|
||||
channelA.sendMessage(utf8ToBytes(m), (message) => {
|
||||
for (const m of messagesA) {
|
||||
await channelA.sendMessage(utf8ToBytes(m), (message) => {
|
||||
timestamp++;
|
||||
channelB.receiveMessage(message);
|
||||
expect((channelB as any).lamportTimestamp).to.equal(timestamp);
|
||||
return true;
|
||||
return Promise.resolve(true);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
messagesB.forEach((m) => {
|
||||
channelB.sendMessage(utf8ToBytes(m), (message) => {
|
||||
for (const m of messagesB) {
|
||||
await channelB.sendMessage(utf8ToBytes(m), (message) => {
|
||||
timestamp++;
|
||||
channelA.receiveMessage(message);
|
||||
expect((channelA as any).lamportTimestamp).to.equal(timestamp);
|
||||
return true;
|
||||
return Promise.resolve(true);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
const expectedLength = messagesA.length + messagesB.length;
|
||||
expect((channelA as any).lamportTimestamp).to.equal(expectedLength);
|
||||
@ -166,29 +166,29 @@ describe("MessageChannel", function () {
|
||||
);
|
||||
});
|
||||
|
||||
it("should add received messages to bloom filter", () => {
|
||||
messagesA.forEach((m) => {
|
||||
channelA.sendMessage(utf8ToBytes(m), (message) => {
|
||||
it("should add received messages to bloom filter", async () => {
|
||||
for (const m of messagesA) {
|
||||
await channelA.sendMessage(utf8ToBytes(m), (message) => {
|
||||
channelB.receiveMessage(message);
|
||||
const bloomFilter = getBloomFilter(channelB);
|
||||
expect(bloomFilter.lookup(message.messageId)).to.equal(true);
|
||||
return true;
|
||||
return Promise.resolve(true);
|
||||
});
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
it("should add to incoming buffer if dependencies are not met", () => {
|
||||
messagesA.forEach((m) => {
|
||||
channelA.sendMessage(utf8ToBytes(m), callback);
|
||||
});
|
||||
it("should add to incoming buffer if dependencies are not met", async () => {
|
||||
for (const m of messagesA) {
|
||||
await channelA.sendMessage(utf8ToBytes(m), callback);
|
||||
}
|
||||
|
||||
let receivedMessage: Message | null = null;
|
||||
const timestampBefore = (channelB as any).lamportTimestamp;
|
||||
|
||||
channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
|
||||
await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
|
||||
receivedMessage = message;
|
||||
channelB.receiveMessage(message);
|
||||
return true;
|
||||
return Promise.resolve(true);
|
||||
});
|
||||
|
||||
const incomingBuffer = (channelB as any).incomingBuffer as Message[];
|
||||
@ -216,27 +216,27 @@ describe("MessageChannel", function () {
|
||||
channelB = new MessageChannel(channelId);
|
||||
});
|
||||
|
||||
it("should mark all messages in causal history as acknowledged", () => {
|
||||
messagesA.forEach((m) => {
|
||||
channelA.sendMessage(utf8ToBytes(m), (message) => {
|
||||
it("should mark all messages in causal history as acknowledged", async () => {
|
||||
for (const m of messagesA) {
|
||||
await channelA.sendMessage(utf8ToBytes(m), (message) => {
|
||||
channelB.receiveMessage(message);
|
||||
return true;
|
||||
return Promise.resolve(true);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
let notInHistory: Message | null = null;
|
||||
channelA.sendMessage(utf8ToBytes("not-in-history"), (message) => {
|
||||
await channelA.sendMessage(utf8ToBytes("not-in-history"), (message) => {
|
||||
notInHistory = message;
|
||||
return true;
|
||||
return Promise.resolve(true);
|
||||
});
|
||||
|
||||
expect((channelA as any).outgoingBuffer.length).to.equal(
|
||||
messagesA.length + 1
|
||||
);
|
||||
|
||||
channelB.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
|
||||
await channelB.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
|
||||
channelA.receiveMessage(message);
|
||||
return true;
|
||||
return Promise.resolve(true);
|
||||
});
|
||||
|
||||
// Since messagesA are in causal history of channel B's message
|
||||
@ -247,7 +247,7 @@ describe("MessageChannel", function () {
|
||||
expect(outgoingBuffer[0].messageId).to.equal(notInHistory!.messageId);
|
||||
});
|
||||
|
||||
it("should track probabilistic acknowledgements of messages received in bloom filter", () => {
|
||||
it("should track probabilistic acknowledgements of messages received in bloom filter", async () => {
|
||||
const acknowledgementCount = (channelA as any).acknowledgementCount;
|
||||
|
||||
const causalHistorySize = (channelA as any).causalHistorySize;
|
||||
@ -258,24 +258,24 @@ describe("MessageChannel", function () {
|
||||
];
|
||||
const messages = [...messagesA, ...messagesB.slice(0, -1)];
|
||||
// Send messages to be received by channel B
|
||||
messages.forEach((m) => {
|
||||
channelA.sendMessage(utf8ToBytes(m), (message) => {
|
||||
for (const m of messages) {
|
||||
await channelA.sendMessage(utf8ToBytes(m), (message) => {
|
||||
channelB.receiveMessage(message);
|
||||
return true;
|
||||
return Promise.resolve(true);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Send messages not received by channel B
|
||||
unacknowledgedMessages.forEach((m) => {
|
||||
channelA.sendMessage(utf8ToBytes(m), callback);
|
||||
});
|
||||
for (const m of unacknowledgedMessages) {
|
||||
await channelA.sendMessage(utf8ToBytes(m), callback);
|
||||
}
|
||||
|
||||
// Channel B sends a message to channel A
|
||||
channelB.sendMessage(
|
||||
await channelB.sendMessage(
|
||||
utf8ToBytes(messagesB[messagesB.length - 1]),
|
||||
(message) => {
|
||||
channelA.receiveMessage(message);
|
||||
return true;
|
||||
return Promise.resolve(true);
|
||||
}
|
||||
);
|
||||
|
||||
@ -307,9 +307,9 @@ describe("MessageChannel", function () {
|
||||
// in the bloom filter as before, which should mark them as fully acknowledged in channel A
|
||||
for (let i = 1; i < acknowledgementCount; i++) {
|
||||
// Send messages until acknowledgement count is reached
|
||||
channelB.sendMessage(utf8ToBytes(`x-${i}`), (message) => {
|
||||
await channelB.sendMessage(utf8ToBytes(`x-${i}`), (message) => {
|
||||
channelA.receiveMessage(message);
|
||||
return true;
|
||||
return Promise.resolve(true);
|
||||
});
|
||||
}
|
||||
|
||||
@ -337,15 +337,15 @@ describe("MessageChannel", function () {
|
||||
channelB = new MessageChannel(channelId);
|
||||
});
|
||||
|
||||
it("should detect messages with missing dependencies", () => {
|
||||
it("should detect messages with missing dependencies", async () => {
|
||||
const causalHistorySize = (channelA as any).causalHistorySize;
|
||||
messagesA.forEach((m) => {
|
||||
channelA.sendMessage(utf8ToBytes(m), callback);
|
||||
});
|
||||
for (const m of messagesA) {
|
||||
await channelA.sendMessage(utf8ToBytes(m), callback);
|
||||
}
|
||||
|
||||
channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
|
||||
await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
|
||||
channelB.receiveMessage(message);
|
||||
return true;
|
||||
return Promise.resolve(true);
|
||||
});
|
||||
|
||||
const incomingBuffer = (channelB as any).incomingBuffer as Message[];
|
||||
@ -361,19 +361,19 @@ describe("MessageChannel", function () {
|
||||
);
|
||||
});
|
||||
|
||||
it("should deliver messages after dependencies are met", () => {
|
||||
it("should deliver messages after dependencies are met", async () => {
|
||||
const causalHistorySize = (channelA as any).causalHistorySize;
|
||||
const sentMessages = new Array<Message>();
|
||||
messagesA.forEach((m) => {
|
||||
channelA.sendMessage(utf8ToBytes(m), (message) => {
|
||||
for (const m of messagesA) {
|
||||
await channelA.sendMessage(utf8ToBytes(m), (message) => {
|
||||
sentMessages.push(message);
|
||||
return true;
|
||||
return Promise.resolve(true);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
|
||||
await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
|
||||
channelB.receiveMessage(message);
|
||||
return true;
|
||||
return Promise.resolve(true);
|
||||
});
|
||||
|
||||
const missingMessages = channelB.sweepIncomingBuffer();
|
||||
@ -406,13 +406,13 @@ describe("MessageChannel", function () {
|
||||
10
|
||||
);
|
||||
|
||||
messagesA.forEach((m) => {
|
||||
channelA.sendMessage(utf8ToBytes(m), callback);
|
||||
});
|
||||
for (const m of messagesA) {
|
||||
await channelA.sendMessage(utf8ToBytes(m), callback);
|
||||
}
|
||||
|
||||
channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
|
||||
await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
|
||||
channelC.receiveMessage(message);
|
||||
return true;
|
||||
return Promise.resolve(true);
|
||||
});
|
||||
|
||||
const missingMessages = channelC.sweepIncomingBuffer();
|
||||
@ -434,15 +434,15 @@ describe("MessageChannel", function () {
|
||||
channelB = new MessageChannel(channelId);
|
||||
});
|
||||
|
||||
it("should partition messages based on acknowledgement status", () => {
|
||||
it("should partition messages based on acknowledgement status", async () => {
|
||||
const unacknowledgedMessages: Message[] = [];
|
||||
messagesA.forEach((m) => {
|
||||
channelA.sendMessage(utf8ToBytes(m), (message) => {
|
||||
for (const m of messagesA) {
|
||||
await channelA.sendMessage(utf8ToBytes(m), (message) => {
|
||||
unacknowledgedMessages.push(message);
|
||||
channelB.receiveMessage(message);
|
||||
return true;
|
||||
return Promise.resolve(true);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
let { unacknowledged, possiblyAcknowledged } =
|
||||
channelA.sweepOutgoingBuffer();
|
||||
@ -451,15 +451,15 @@ describe("MessageChannel", function () {
|
||||
|
||||
// Make sure messages sent by channel A are not in causal history
|
||||
const causalHistorySize = (channelA as any).causalHistorySize;
|
||||
messagesB.slice(0, causalHistorySize).forEach((m) => {
|
||||
channelB.sendMessage(utf8ToBytes(m), callback);
|
||||
});
|
||||
for (const m of messagesB.slice(0, causalHistorySize)) {
|
||||
await channelB.sendMessage(utf8ToBytes(m), callback);
|
||||
}
|
||||
|
||||
channelB.sendMessage(
|
||||
await channelB.sendMessage(
|
||||
utf8ToBytes(messagesB[causalHistorySize]),
|
||||
(message) => {
|
||||
channelA.receiveMessage(message);
|
||||
return true;
|
||||
return Promise.resolve(true);
|
||||
}
|
||||
);
|
||||
|
||||
@ -471,4 +471,80 @@ describe("MessageChannel", function () {
|
||||
expect(possiblyAcknowledged.length).to.equal(messagesA.length);
|
||||
});
|
||||
});
|
||||
|
||||
describe("Sync messages", () => {
|
||||
beforeEach(() => {
|
||||
channelA = new MessageChannel(channelId);
|
||||
channelB = new MessageChannel(channelId);
|
||||
});
|
||||
|
||||
it("should be sent with empty content", async () => {
|
||||
await channelA.sendSyncMessage((message) => {
|
||||
expect(message.content?.length).to.equal(0);
|
||||
return Promise.resolve(true);
|
||||
});
|
||||
});
|
||||
|
||||
it("should not be added to outgoing buffer, bloom filter, or local log", async () => {
|
||||
await channelA.sendSyncMessage();
|
||||
|
||||
const outgoingBuffer = (channelA as any).outgoingBuffer as Message[];
|
||||
expect(outgoingBuffer.length).to.equal(0);
|
||||
|
||||
const bloomFilter = getBloomFilter(channelA);
|
||||
expect(
|
||||
bloomFilter.lookup(MessageChannel.getMessageId(new Uint8Array()))
|
||||
).to.equal(false);
|
||||
|
||||
const localLog = (channelA as any).messageIdLog as {
|
||||
timestamp: number;
|
||||
messageId: string;
|
||||
}[];
|
||||
expect(localLog.length).to.equal(0);
|
||||
});
|
||||
|
||||
it("should be delivered but not added to local log or bloom filter", async () => {
|
||||
const timestampBefore = (channelB as any).lamportTimestamp;
|
||||
let expectedTimestamp: number | undefined;
|
||||
await channelA.sendSyncMessage((message) => {
|
||||
expectedTimestamp = message.lamportTimestamp;
|
||||
channelB.receiveMessage(message);
|
||||
return Promise.resolve(true);
|
||||
});
|
||||
const timestampAfter = (channelB as any).lamportTimestamp;
|
||||
expect(timestampAfter).to.equal(expectedTimestamp);
|
||||
expect(timestampAfter).to.be.greaterThan(timestampBefore);
|
||||
|
||||
const localLog = (channelB as any).messageIdLog as {
|
||||
timestamp: number;
|
||||
messageId: string;
|
||||
}[];
|
||||
expect(localLog.length).to.equal(0);
|
||||
|
||||
const bloomFilter = getBloomFilter(channelB);
|
||||
expect(
|
||||
bloomFilter.lookup(MessageChannel.getMessageId(new Uint8Array()))
|
||||
).to.equal(false);
|
||||
});
|
||||
|
||||
it("should update ack status of messages in outgoing buffer", async () => {
|
||||
for (const m of messagesA) {
|
||||
await channelA.sendMessage(utf8ToBytes(m), (message) => {
|
||||
channelB.receiveMessage(message);
|
||||
return Promise.resolve(true);
|
||||
});
|
||||
}
|
||||
|
||||
await channelB.sendSyncMessage((message) => {
|
||||
channelA.receiveMessage(message);
|
||||
return Promise.resolve(true);
|
||||
});
|
||||
|
||||
const causalHistorySize = (channelA as any).causalHistorySize;
|
||||
const outgoingBuffer = (channelA as any).outgoingBuffer as Message[];
|
||||
expect(outgoingBuffer.length).to.equal(
|
||||
messagesA.length - causalHistorySize
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@ -65,10 +65,10 @@ export class MessageChannel {
|
||||
* @param payload - The payload to send.
|
||||
* @param callback - A callback function that returns a boolean indicating whether the message was sent successfully.
|
||||
*/
|
||||
public sendMessage(
|
||||
public async sendMessage(
|
||||
payload: Uint8Array,
|
||||
callback?: (message: Message) => boolean
|
||||
): void {
|
||||
callback?: (message: Message) => Promise<boolean>
|
||||
): Promise<void> {
|
||||
this.lamportTimestamp++;
|
||||
|
||||
const messageId = MessageChannel.getMessageId(payload);
|
||||
@ -87,7 +87,7 @@ export class MessageChannel {
|
||||
this.outgoingBuffer.push(message);
|
||||
|
||||
if (callback) {
|
||||
const success = callback(message);
|
||||
const success = await callback(message);
|
||||
if (success) {
|
||||
this.filter.insert(messageId);
|
||||
this.messageIdLog.push({ timestamp: this.lamportTimestamp, messageId });
|
||||
@ -112,8 +112,10 @@ export class MessageChannel {
|
||||
public receiveMessage(message: Message): void {
|
||||
// review ack status
|
||||
this.reviewAckStatus(message);
|
||||
// add to bloom filter
|
||||
this.filter.insert(message.messageId);
|
||||
// add to bloom filter (skip for messages with empty content)
|
||||
if (message.content?.length && message.content.length > 0) {
|
||||
this.filter.insert(message.messageId);
|
||||
}
|
||||
// verify causal history
|
||||
const dependenciesMet = message.causalHistory.every((messageId) =>
|
||||
this.messageIdLog.some(
|
||||
@ -203,6 +205,40 @@ export class MessageChannel {
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a sync message to the SDS channel.
|
||||
*
|
||||
* Increments the lamport timestamp, constructs a `Message` object
|
||||
* with an empty load. Skips outgoing buffer, filter, and local log.
|
||||
*
|
||||
* See https://rfc.vac.dev/vac/raw/sds/#send-sync-message
|
||||
*
|
||||
* @param callback - A callback function that returns a boolean indicating whether the message was sent successfully.
|
||||
*/
|
||||
public sendSyncMessage(
|
||||
callback?: (message: Message) => Promise<boolean>
|
||||
): Promise<boolean> {
|
||||
this.lamportTimestamp++;
|
||||
|
||||
const emptyMessage = new Uint8Array();
|
||||
|
||||
const message: Message = {
|
||||
messageId: MessageChannel.getMessageId(emptyMessage),
|
||||
channelId: this.channelId,
|
||||
lamportTimestamp: this.lamportTimestamp,
|
||||
causalHistory: this.messageIdLog
|
||||
.slice(-this.causalHistorySize)
|
||||
.map(({ messageId }) => messageId),
|
||||
bloomFilter: this.filter.toBytes(),
|
||||
content: emptyMessage
|
||||
};
|
||||
|
||||
if (callback) {
|
||||
return callback(message);
|
||||
}
|
||||
return Promise.resolve(false);
|
||||
}
|
||||
|
||||
// See https://rfc.vac.dev/vac/raw/sds/#deliver-message
|
||||
private deliverMessage(message: Message): void {
|
||||
const messageLamportTimestamp = message.lamportTimestamp ?? 0;
|
||||
@ -210,6 +246,12 @@ export class MessageChannel {
|
||||
this.lamportTimestamp = messageLamportTimestamp;
|
||||
}
|
||||
|
||||
if (message.content?.length === 0) {
|
||||
// Messages with empty content are sync messages.
|
||||
// They are not added to the local log or bloom filter.
|
||||
return;
|
||||
}
|
||||
|
||||
// The participant MUST insert the message ID into its local log,
|
||||
// based on Lamport timestamp.
|
||||
// If one or more message IDs with the same Lamport timestamp already exists,
|
||||
@ -227,6 +269,8 @@ export class MessageChannel {
|
||||
});
|
||||
}
|
||||
|
||||
// For each received message (including sync messages), inspect the causal history and bloom filter
|
||||
// to determine the acknowledgement status of messages in the outgoing buffer.
|
||||
// See https://rfc.vac.dev/vac/raw/sds/#review-ack-status
|
||||
private reviewAckStatus(receivedMessage: Message): void {
|
||||
// the participant MUST mark all messages in the received causal_history as acknowledged.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user