feat(sds): send and receive sync messages

This commit is contained in:
Arseniy Klempner 2025-03-03 22:02:53 -08:00
parent 5b3a256b4c
commit 13ae5d4f73
No known key found for this signature in database
GPG Key ID: 51653F18863BD24B
2 changed files with 220 additions and 100 deletions

View File

@ -9,8 +9,8 @@ import {
} from "./sds.js"; } from "./sds.js";
const channelId = "test-channel"; const channelId = "test-channel";
const callback = (_message: Message): boolean => { const callback = (_message: Message): Promise<boolean> => {
return true; return Promise.resolve(true);
}; };
const getBloomFilter = (channel: MessageChannel): DefaultBloomFilter => { const getBloomFilter = (channel: MessageChannel): DefaultBloomFilter => {
@ -36,31 +36,31 @@ describe("MessageChannel", function () {
channelA = new MessageChannel(channelId); channelA = new MessageChannel(channelId);
}); });
it("should increase lamport timestamp", () => { it("should increase lamport timestamp", async () => {
const timestampBefore = (channelA as any).lamportTimestamp; const timestampBefore = (channelA as any).lamportTimestamp;
channelA.sendMessage(new Uint8Array(), callback); await channelA.sendMessage(new Uint8Array(), callback);
const timestampAfter = (channelA as any).lamportTimestamp; const timestampAfter = (channelA as any).lamportTimestamp;
expect(timestampAfter).to.equal(timestampBefore + 1); expect(timestampAfter).to.equal(timestampBefore + 1);
}); });
it("should push the message to the outgoing buffer", () => { it("should push the message to the outgoing buffer", async () => {
const bufferLengthBefore = (channelA as any).outgoingBuffer.length; const bufferLengthBefore = (channelA as any).outgoingBuffer.length;
channelA.sendMessage(new Uint8Array(), callback); await channelA.sendMessage(new Uint8Array(), callback);
const bufferLengthAfter = (channelA as any).outgoingBuffer.length; const bufferLengthAfter = (channelA as any).outgoingBuffer.length;
expect(bufferLengthAfter).to.equal(bufferLengthBefore + 1); expect(bufferLengthAfter).to.equal(bufferLengthBefore + 1);
}); });
it("should insert message into bloom filter", () => { it("should insert message into bloom filter", async () => {
const messageId = MessageChannel.getMessageId(new Uint8Array()); const messageId = MessageChannel.getMessageId(new Uint8Array());
channelA.sendMessage(new Uint8Array(), callback); await channelA.sendMessage(new Uint8Array(), callback);
const bloomFilter = getBloomFilter(channelA); const bloomFilter = getBloomFilter(channelA);
expect(bloomFilter.lookup(messageId)).to.equal(true); expect(bloomFilter.lookup(messageId)).to.equal(true);
}); });
it("should insert message id into causal history", () => { it("should insert message id into causal history", async () => {
const expectedTimestamp = (channelA as any).lamportTimestamp + 1; const expectedTimestamp = (channelA as any).lamportTimestamp + 1;
const messageId = MessageChannel.getMessageId(new Uint8Array()); const messageId = MessageChannel.getMessageId(new Uint8Array());
channelA.sendMessage(new Uint8Array(), callback); await channelA.sendMessage(new Uint8Array(), callback);
const messageIdLog = (channelA as any).messageIdLog as { const messageIdLog = (channelA as any).messageIdLog as {
timestamp: number; timestamp: number;
messageId: string; messageId: string;
@ -74,7 +74,7 @@ describe("MessageChannel", function () {
).to.equal(true); ).to.equal(true);
}); });
it("should attach causal history and bloom filter to each message", () => { it("should attach causal history and bloom filter to each message", async () => {
const bloomFilter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS); const bloomFilter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS);
const causalHistorySize = (channelA as any).causalHistorySize; const causalHistorySize = (channelA as any).causalHistorySize;
const filterBytes = new Array<Uint8Array>(); const filterBytes = new Array<Uint8Array>();
@ -82,11 +82,11 @@ describe("MessageChannel", function () {
.fill("message") .fill("message")
.map((message, index) => `${message}-${index}`); .map((message, index) => `${message}-${index}`);
messages.forEach((message) => { for (const message of messages) {
filterBytes.push(bloomFilter.toBytes()); filterBytes.push(bloomFilter.toBytes());
channelA.sendMessage(utf8ToBytes(message), callback); await channelA.sendMessage(utf8ToBytes(message), callback);
bloomFilter.insert(MessageChannel.getMessageId(utf8ToBytes(message))); bloomFilter.insert(MessageChannel.getMessageId(utf8ToBytes(message)));
}); }
const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; const outgoingBuffer = (channelA as any).outgoingBuffer as Message[];
expect(outgoingBuffer.length).to.equal(messages.length); expect(outgoingBuffer.length).to.equal(messages.length);
@ -115,49 +115,49 @@ describe("MessageChannel", function () {
channelB = new MessageChannel(channelId); channelB = new MessageChannel(channelId);
}); });
it("should increase lamport timestamp", () => { it("should increase lamport timestamp", async () => {
const timestampBefore = (channelA as any).lamportTimestamp; const timestampBefore = (channelA as any).lamportTimestamp;
channelB.sendMessage(new Uint8Array(), (message) => { await channelB.sendMessage(new Uint8Array(), (message) => {
channelA.receiveMessage(message); channelA.receiveMessage(message);
return true; return Promise.resolve(true);
}); });
const timestampAfter = (channelA as any).lamportTimestamp; const timestampAfter = (channelA as any).lamportTimestamp;
expect(timestampAfter).to.equal(timestampBefore + 1); expect(timestampAfter).to.equal(timestampBefore + 1);
}); });
it("should update lamport timestamp if greater than current timestamp and dependencies are met", () => { it("should update lamport timestamp if greater than current timestamp and dependencies are met", async () => {
messagesA.forEach((m) => { for (const m of messagesA) {
channelA.sendMessage(utf8ToBytes(m), callback); await channelA.sendMessage(utf8ToBytes(m), callback);
}); }
messagesB.forEach((m) => { for (const m of messagesB) {
channelB.sendMessage(utf8ToBytes(m), (message) => { await channelB.sendMessage(utf8ToBytes(m), (message) => {
channelA.receiveMessage(message); channelA.receiveMessage(message);
return true; return Promise.resolve(true);
}); });
}); }
const timestampAfter = (channelA as any).lamportTimestamp; const timestampAfter = (channelA as any).lamportTimestamp;
expect(timestampAfter).to.equal(messagesB.length); expect(timestampAfter).to.equal(messagesB.length);
}); });
it("should maintain proper timestamps if all messages received", () => { it("should maintain proper timestamps if all messages received", async () => {
let timestamp = 0; let timestamp = 0;
messagesA.forEach((m) => { for (const m of messagesA) {
channelA.sendMessage(utf8ToBytes(m), (message) => { await channelA.sendMessage(utf8ToBytes(m), (message) => {
timestamp++; timestamp++;
channelB.receiveMessage(message); channelB.receiveMessage(message);
expect((channelB as any).lamportTimestamp).to.equal(timestamp); expect((channelB as any).lamportTimestamp).to.equal(timestamp);
return true; return Promise.resolve(true);
}); });
}); }
messagesB.forEach((m) => { for (const m of messagesB) {
channelB.sendMessage(utf8ToBytes(m), (message) => { await channelB.sendMessage(utf8ToBytes(m), (message) => {
timestamp++; timestamp++;
channelA.receiveMessage(message); channelA.receiveMessage(message);
expect((channelA as any).lamportTimestamp).to.equal(timestamp); expect((channelA as any).lamportTimestamp).to.equal(timestamp);
return true; return Promise.resolve(true);
}); });
}); }
const expectedLength = messagesA.length + messagesB.length; const expectedLength = messagesA.length + messagesB.length;
expect((channelA as any).lamportTimestamp).to.equal(expectedLength); expect((channelA as any).lamportTimestamp).to.equal(expectedLength);
@ -166,29 +166,29 @@ describe("MessageChannel", function () {
); );
}); });
it("should add received messages to bloom filter", () => { it("should add received messages to bloom filter", async () => {
messagesA.forEach((m) => { for (const m of messagesA) {
channelA.sendMessage(utf8ToBytes(m), (message) => { await channelA.sendMessage(utf8ToBytes(m), (message) => {
channelB.receiveMessage(message); channelB.receiveMessage(message);
const bloomFilter = getBloomFilter(channelB); const bloomFilter = getBloomFilter(channelB);
expect(bloomFilter.lookup(message.messageId)).to.equal(true); expect(bloomFilter.lookup(message.messageId)).to.equal(true);
return true; return Promise.resolve(true);
}); });
}); }
}); });
it("should add to incoming buffer if dependencies are not met", () => { it("should add to incoming buffer if dependencies are not met", async () => {
messagesA.forEach((m) => { for (const m of messagesA) {
channelA.sendMessage(utf8ToBytes(m), callback); await channelA.sendMessage(utf8ToBytes(m), callback);
}); }
let receivedMessage: Message | null = null; let receivedMessage: Message | null = null;
const timestampBefore = (channelB as any).lamportTimestamp; const timestampBefore = (channelB as any).lamportTimestamp;
channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
receivedMessage = message; receivedMessage = message;
channelB.receiveMessage(message); channelB.receiveMessage(message);
return true; return Promise.resolve(true);
}); });
const incomingBuffer = (channelB as any).incomingBuffer as Message[]; const incomingBuffer = (channelB as any).incomingBuffer as Message[];
@ -216,27 +216,27 @@ describe("MessageChannel", function () {
channelB = new MessageChannel(channelId); channelB = new MessageChannel(channelId);
}); });
it("should mark all messages in causal history as acknowledged", () => { it("should mark all messages in causal history as acknowledged", async () => {
messagesA.forEach((m) => { for (const m of messagesA) {
channelA.sendMessage(utf8ToBytes(m), (message) => { await channelA.sendMessage(utf8ToBytes(m), (message) => {
channelB.receiveMessage(message); channelB.receiveMessage(message);
return true; return Promise.resolve(true);
}); });
}); }
let notInHistory: Message | null = null; let notInHistory: Message | null = null;
channelA.sendMessage(utf8ToBytes("not-in-history"), (message) => { await channelA.sendMessage(utf8ToBytes("not-in-history"), (message) => {
notInHistory = message; notInHistory = message;
return true; return Promise.resolve(true);
}); });
expect((channelA as any).outgoingBuffer.length).to.equal( expect((channelA as any).outgoingBuffer.length).to.equal(
messagesA.length + 1 messagesA.length + 1
); );
channelB.sendMessage(utf8ToBytes(messagesB[0]), (message) => { await channelB.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
channelA.receiveMessage(message); channelA.receiveMessage(message);
return true; return Promise.resolve(true);
}); });
// Since messagesA are in causal history of channel B's message // Since messagesA are in causal history of channel B's message
@ -247,7 +247,7 @@ describe("MessageChannel", function () {
expect(outgoingBuffer[0].messageId).to.equal(notInHistory!.messageId); expect(outgoingBuffer[0].messageId).to.equal(notInHistory!.messageId);
}); });
it("should track probabilistic acknowledgements of messages received in bloom filter", () => { it("should track probabilistic acknowledgements of messages received in bloom filter", async () => {
const acknowledgementCount = (channelA as any).acknowledgementCount; const acknowledgementCount = (channelA as any).acknowledgementCount;
const causalHistorySize = (channelA as any).causalHistorySize; const causalHistorySize = (channelA as any).causalHistorySize;
@ -258,24 +258,24 @@ describe("MessageChannel", function () {
]; ];
const messages = [...messagesA, ...messagesB.slice(0, -1)]; const messages = [...messagesA, ...messagesB.slice(0, -1)];
// Send messages to be received by channel B // Send messages to be received by channel B
messages.forEach((m) => { for (const m of messages) {
channelA.sendMessage(utf8ToBytes(m), (message) => { await channelA.sendMessage(utf8ToBytes(m), (message) => {
channelB.receiveMessage(message); channelB.receiveMessage(message);
return true; return Promise.resolve(true);
}); });
}); }
// Send messages not received by channel B // Send messages not received by channel B
unacknowledgedMessages.forEach((m) => { for (const m of unacknowledgedMessages) {
channelA.sendMessage(utf8ToBytes(m), callback); await channelA.sendMessage(utf8ToBytes(m), callback);
}); }
// Channel B sends a message to channel A // Channel B sends a message to channel A
channelB.sendMessage( await channelB.sendMessage(
utf8ToBytes(messagesB[messagesB.length - 1]), utf8ToBytes(messagesB[messagesB.length - 1]),
(message) => { (message) => {
channelA.receiveMessage(message); channelA.receiveMessage(message);
return true; return Promise.resolve(true);
} }
); );
@ -307,9 +307,9 @@ describe("MessageChannel", function () {
// in the bloom filter as before, which should mark them as fully acknowledged in channel A // in the bloom filter as before, which should mark them as fully acknowledged in channel A
for (let i = 1; i < acknowledgementCount; i++) { for (let i = 1; i < acknowledgementCount; i++) {
// Send messages until acknowledgement count is reached // Send messages until acknowledgement count is reached
channelB.sendMessage(utf8ToBytes(`x-${i}`), (message) => { await channelB.sendMessage(utf8ToBytes(`x-${i}`), (message) => {
channelA.receiveMessage(message); channelA.receiveMessage(message);
return true; return Promise.resolve(true);
}); });
} }
@ -337,15 +337,15 @@ describe("MessageChannel", function () {
channelB = new MessageChannel(channelId); channelB = new MessageChannel(channelId);
}); });
it("should detect messages with missing dependencies", () => { it("should detect messages with missing dependencies", async () => {
const causalHistorySize = (channelA as any).causalHistorySize; const causalHistorySize = (channelA as any).causalHistorySize;
messagesA.forEach((m) => { for (const m of messagesA) {
channelA.sendMessage(utf8ToBytes(m), callback); await channelA.sendMessage(utf8ToBytes(m), callback);
}); }
channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
channelB.receiveMessage(message); channelB.receiveMessage(message);
return true; return Promise.resolve(true);
}); });
const incomingBuffer = (channelB as any).incomingBuffer as Message[]; const incomingBuffer = (channelB as any).incomingBuffer as Message[];
@ -361,19 +361,19 @@ describe("MessageChannel", function () {
); );
}); });
it("should deliver messages after dependencies are met", () => { it("should deliver messages after dependencies are met", async () => {
const causalHistorySize = (channelA as any).causalHistorySize; const causalHistorySize = (channelA as any).causalHistorySize;
const sentMessages = new Array<Message>(); const sentMessages = new Array<Message>();
messagesA.forEach((m) => { for (const m of messagesA) {
channelA.sendMessage(utf8ToBytes(m), (message) => { await channelA.sendMessage(utf8ToBytes(m), (message) => {
sentMessages.push(message); sentMessages.push(message);
return true; return Promise.resolve(true);
}); });
}); }
channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
channelB.receiveMessage(message); channelB.receiveMessage(message);
return true; return Promise.resolve(true);
}); });
const missingMessages = channelB.sweepIncomingBuffer(); const missingMessages = channelB.sweepIncomingBuffer();
@ -406,13 +406,13 @@ describe("MessageChannel", function () {
10 10
); );
messagesA.forEach((m) => { for (const m of messagesA) {
channelA.sendMessage(utf8ToBytes(m), callback); await channelA.sendMessage(utf8ToBytes(m), callback);
}); }
channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
channelC.receiveMessage(message); channelC.receiveMessage(message);
return true; return Promise.resolve(true);
}); });
const missingMessages = channelC.sweepIncomingBuffer(); const missingMessages = channelC.sweepIncomingBuffer();
@ -434,15 +434,15 @@ describe("MessageChannel", function () {
channelB = new MessageChannel(channelId); channelB = new MessageChannel(channelId);
}); });
it("should partition messages based on acknowledgement status", () => { it("should partition messages based on acknowledgement status", async () => {
const unacknowledgedMessages: Message[] = []; const unacknowledgedMessages: Message[] = [];
messagesA.forEach((m) => { for (const m of messagesA) {
channelA.sendMessage(utf8ToBytes(m), (message) => { await channelA.sendMessage(utf8ToBytes(m), (message) => {
unacknowledgedMessages.push(message); unacknowledgedMessages.push(message);
channelB.receiveMessage(message); channelB.receiveMessage(message);
return true; return Promise.resolve(true);
}); });
}); }
let { unacknowledged, possiblyAcknowledged } = let { unacknowledged, possiblyAcknowledged } =
channelA.sweepOutgoingBuffer(); channelA.sweepOutgoingBuffer();
@ -451,15 +451,15 @@ describe("MessageChannel", function () {
// Make sure messages sent by channel A are not in causal history // Make sure messages sent by channel A are not in causal history
const causalHistorySize = (channelA as any).causalHistorySize; const causalHistorySize = (channelA as any).causalHistorySize;
messagesB.slice(0, causalHistorySize).forEach((m) => { for (const m of messagesB.slice(0, causalHistorySize)) {
channelB.sendMessage(utf8ToBytes(m), callback); await channelB.sendMessage(utf8ToBytes(m), callback);
}); }
channelB.sendMessage( await channelB.sendMessage(
utf8ToBytes(messagesB[causalHistorySize]), utf8ToBytes(messagesB[causalHistorySize]),
(message) => { (message) => {
channelA.receiveMessage(message); channelA.receiveMessage(message);
return true; return Promise.resolve(true);
} }
); );
@ -471,4 +471,80 @@ describe("MessageChannel", function () {
expect(possiblyAcknowledged.length).to.equal(messagesA.length); expect(possiblyAcknowledged.length).to.equal(messagesA.length);
}); });
}); });
describe("Sync messages", () => {
beforeEach(() => {
channelA = new MessageChannel(channelId);
channelB = new MessageChannel(channelId);
});
it("should be sent with empty content", async () => {
await channelA.sendSyncMessage((message) => {
expect(message.content?.length).to.equal(0);
return Promise.resolve(true);
});
});
it("should not be added to outgoing buffer, bloom filter, or local log", async () => {
await channelA.sendSyncMessage();
const outgoingBuffer = (channelA as any).outgoingBuffer as Message[];
expect(outgoingBuffer.length).to.equal(0);
const bloomFilter = getBloomFilter(channelA);
expect(
bloomFilter.lookup(MessageChannel.getMessageId(new Uint8Array()))
).to.equal(false);
const localLog = (channelA as any).messageIdLog as {
timestamp: number;
messageId: string;
}[];
expect(localLog.length).to.equal(0);
});
it("should be delivered but not added to local log or bloom filter", async () => {
const timestampBefore = (channelB as any).lamportTimestamp;
let expectedTimestamp: number | undefined;
await channelA.sendSyncMessage((message) => {
expectedTimestamp = message.lamportTimestamp;
channelB.receiveMessage(message);
return Promise.resolve(true);
});
const timestampAfter = (channelB as any).lamportTimestamp;
expect(timestampAfter).to.equal(expectedTimestamp);
expect(timestampAfter).to.be.greaterThan(timestampBefore);
const localLog = (channelB as any).messageIdLog as {
timestamp: number;
messageId: string;
}[];
expect(localLog.length).to.equal(0);
const bloomFilter = getBloomFilter(channelB);
expect(
bloomFilter.lookup(MessageChannel.getMessageId(new Uint8Array()))
).to.equal(false);
});
it("should update ack status of messages in outgoing buffer", async () => {
for (const m of messagesA) {
await channelA.sendMessage(utf8ToBytes(m), (message) => {
channelB.receiveMessage(message);
return Promise.resolve(true);
});
}
await channelB.sendSyncMessage((message) => {
channelA.receiveMessage(message);
return Promise.resolve(true);
});
const causalHistorySize = (channelA as any).causalHistorySize;
const outgoingBuffer = (channelA as any).outgoingBuffer as Message[];
expect(outgoingBuffer.length).to.equal(
messagesA.length - causalHistorySize
);
});
});
}); });

View File

@ -65,10 +65,10 @@ export class MessageChannel {
* @param payload - The payload to send. * @param payload - The payload to send.
* @param callback - A callback function that returns a boolean indicating whether the message was sent successfully. * @param callback - A callback function that returns a boolean indicating whether the message was sent successfully.
*/ */
public sendMessage( public async sendMessage(
payload: Uint8Array, payload: Uint8Array,
callback?: (message: Message) => boolean callback?: (message: Message) => Promise<boolean>
): void { ): Promise<void> {
this.lamportTimestamp++; this.lamportTimestamp++;
const messageId = MessageChannel.getMessageId(payload); const messageId = MessageChannel.getMessageId(payload);
@ -87,7 +87,7 @@ export class MessageChannel {
this.outgoingBuffer.push(message); this.outgoingBuffer.push(message);
if (callback) { if (callback) {
const success = callback(message); const success = await callback(message);
if (success) { if (success) {
this.filter.insert(messageId); this.filter.insert(messageId);
this.messageIdLog.push({ timestamp: this.lamportTimestamp, messageId }); this.messageIdLog.push({ timestamp: this.lamportTimestamp, messageId });
@ -112,8 +112,10 @@ export class MessageChannel {
public receiveMessage(message: Message): void { public receiveMessage(message: Message): void {
// review ack status // review ack status
this.reviewAckStatus(message); this.reviewAckStatus(message);
// add to bloom filter // add to bloom filter (skip for messages with empty content)
this.filter.insert(message.messageId); if (message.content?.length && message.content.length > 0) {
this.filter.insert(message.messageId);
}
// verify causal history // verify causal history
const dependenciesMet = message.causalHistory.every((messageId) => const dependenciesMet = message.causalHistory.every((messageId) =>
this.messageIdLog.some( this.messageIdLog.some(
@ -203,6 +205,40 @@ export class MessageChannel {
); );
} }
/**
* Send a sync message to the SDS channel.
*
* Increments the lamport timestamp, constructs a `Message` object
* with an empty load. Skips outgoing buffer, filter, and local log.
*
* See https://rfc.vac.dev/vac/raw/sds/#send-sync-message
*
* @param callback - A callback function that returns a boolean indicating whether the message was sent successfully.
*/
public sendSyncMessage(
callback?: (message: Message) => Promise<boolean>
): Promise<boolean> {
this.lamportTimestamp++;
const emptyMessage = new Uint8Array();
const message: Message = {
messageId: MessageChannel.getMessageId(emptyMessage),
channelId: this.channelId,
lamportTimestamp: this.lamportTimestamp,
causalHistory: this.messageIdLog
.slice(-this.causalHistorySize)
.map(({ messageId }) => messageId),
bloomFilter: this.filter.toBytes(),
content: emptyMessage
};
if (callback) {
return callback(message);
}
return Promise.resolve(false);
}
// See https://rfc.vac.dev/vac/raw/sds/#deliver-message // See https://rfc.vac.dev/vac/raw/sds/#deliver-message
private deliverMessage(message: Message): void { private deliverMessage(message: Message): void {
const messageLamportTimestamp = message.lamportTimestamp ?? 0; const messageLamportTimestamp = message.lamportTimestamp ?? 0;
@ -210,6 +246,12 @@ export class MessageChannel {
this.lamportTimestamp = messageLamportTimestamp; this.lamportTimestamp = messageLamportTimestamp;
} }
if (message.content?.length === 0) {
// Messages with empty content are sync messages.
// They are not added to the local log or bloom filter.
return;
}
// The participant MUST insert the message ID into its local log, // The participant MUST insert the message ID into its local log,
// based on Lamport timestamp. // based on Lamport timestamp.
// If one or more message IDs with the same Lamport timestamp already exists, // If one or more message IDs with the same Lamport timestamp already exists,
@ -227,6 +269,8 @@ export class MessageChannel {
}); });
} }
// For each received message (including sync messages), inspect the causal history and bloom filter
// to determine the acknowledgement status of messages in the outgoing buffer.
// See https://rfc.vac.dev/vac/raw/sds/#review-ack-status // See https://rfc.vac.dev/vac/raw/sds/#review-ack-status
private reviewAckStatus(receivedMessage: Message): void { private reviewAckStatus(receivedMessage: Message): void {
// the participant MUST mark all messages in the received causal_history as acknowledged. // the participant MUST mark all messages in the received causal_history as acknowledged.