mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-02-07 07:23:09 +00:00
feat(sds): add command queue, dispatch events
This commit is contained in:
parent
23a0dc1008
commit
dd7663f378
@ -1,5 +1,5 @@
|
||||
import { hashN } from "./nim_hashn/nim_hashn.mjs";
|
||||
import { getMOverNBitsForK } from "./probabilities.js";
|
||||
import { hashN } from "../nim_hashn/nim_hashn.mjs.js";
|
||||
import { getMOverNBitsForK } from "../probabilities.js";
|
||||
|
||||
export interface BloomFilterOptions {
|
||||
// The expected maximum number of elements for which this BloomFilter is sized.
|
||||
@ -1,3 +1,5 @@
|
||||
import { BloomFilter } from "./bloom.js";
|
||||
import { BloomFilter } from "./bloom_filter/bloom.js";
|
||||
|
||||
export * from "./message_channel/message_channel.js";
|
||||
|
||||
export { BloomFilter };
|
||||
|
||||
34
packages/sds/src/message_channel/command_queue.ts
Normal file
34
packages/sds/src/message_channel/command_queue.ts
Normal file
@ -0,0 +1,34 @@
|
||||
import { Message } from "./events.js";
|
||||
|
||||
export enum Command {
|
||||
Send = "send",
|
||||
Receive = "receive",
|
||||
SendEphemeral = "sendEphemeral"
|
||||
}
|
||||
|
||||
export interface ParamsByAction {
|
||||
[Command.Send]: {
|
||||
payload: Uint8Array;
|
||||
callback?: (message: Message) => Promise<{
|
||||
success: boolean;
|
||||
retrievalHint?: Uint8Array;
|
||||
}>;
|
||||
};
|
||||
[Command.Receive]: {
|
||||
message: Message;
|
||||
};
|
||||
[Command.SendEphemeral]: {
|
||||
payload: Uint8Array;
|
||||
callback?: (message: Message) => Promise<boolean>;
|
||||
};
|
||||
}
|
||||
|
||||
export type Task<A extends Command = Command> = {
|
||||
command: A;
|
||||
params: ParamsByAction[A];
|
||||
};
|
||||
|
||||
// Define a mapping for handlers based on action type
|
||||
export type Handlers = {
|
||||
[A in Command]: (params: ParamsByAction[A]) => Promise<void>;
|
||||
};
|
||||
33
packages/sds/src/message_channel/events.ts
Normal file
33
packages/sds/src/message_channel/events.ts
Normal file
@ -0,0 +1,33 @@
|
||||
import { proto_sds_message } from "@waku/proto";
|
||||
|
||||
export enum MessageChannelEvent {
|
||||
MessageSent = "messageSent",
|
||||
MessageDelivered = "messageDelivered",
|
||||
MessageReceived = "messageReceived",
|
||||
MessageAcknowledged = "messageAcknowledged",
|
||||
PartialAcknowledgement = "partialAcknowledgement",
|
||||
MissedMessages = "missedMessages",
|
||||
SyncSent = "syncSent",
|
||||
SyncReceived = "syncReceived"
|
||||
}
|
||||
|
||||
export type Message = proto_sds_message.SdsMessage;
|
||||
export type HistoryEntry = proto_sds_message.HistoryEntry;
|
||||
export type ChannelId = string;
|
||||
|
||||
export type MessageChannelEvents = {
|
||||
[MessageChannelEvent.MessageSent]: CustomEvent<Message>;
|
||||
[MessageChannelEvent.MessageDelivered]: CustomEvent<{
|
||||
messageId: string;
|
||||
sentOrReceived: "sent" | "received";
|
||||
}>;
|
||||
[MessageChannelEvent.MessageReceived]: CustomEvent<Message>;
|
||||
[MessageChannelEvent.MessageAcknowledged]: CustomEvent<string>;
|
||||
[MessageChannelEvent.PartialAcknowledgement]: CustomEvent<{
|
||||
messageId: string;
|
||||
count: number;
|
||||
}>;
|
||||
[MessageChannelEvent.MissedMessages]: CustomEvent<HistoryEntry[]>;
|
||||
[MessageChannelEvent.SyncSent]: CustomEvent<Message>;
|
||||
[MessageChannelEvent.SyncReceived]: CustomEvent<Message>;
|
||||
};
|
||||
@ -1,14 +1,13 @@
|
||||
import { utf8ToBytes } from "@waku/utils/bytes";
|
||||
import { expect } from "chai";
|
||||
|
||||
import { DefaultBloomFilter } from "./bloom.js";
|
||||
import { DefaultBloomFilter } from "../bloom_filter/bloom.js";
|
||||
|
||||
import { HistoryEntry, Message, MessageChannelEvent } from "./events.js";
|
||||
import {
|
||||
DEFAULT_BLOOM_FILTER_OPTIONS,
|
||||
HistoryEntry,
|
||||
Message,
|
||||
MessageChannel,
|
||||
MessageChannelEvent
|
||||
} from "./sds.js";
|
||||
MessageChannel
|
||||
} from "./message_channel.js";
|
||||
|
||||
const channelId = "test-channel";
|
||||
const callback = (_message: Message): Promise<{ success: boolean }> => {
|
||||
@ -28,6 +27,23 @@ const messagesB = [
|
||||
"message-7"
|
||||
];
|
||||
|
||||
const sendMessage = async (
|
||||
channel: MessageChannel,
|
||||
payload: Uint8Array,
|
||||
callback: (message: Message) => Promise<{ success: boolean }>
|
||||
): Promise<void> => {
|
||||
await channel.sendMessage(payload, callback);
|
||||
await channel.processTasks();
|
||||
};
|
||||
|
||||
const receiveMessage = async (
|
||||
channel: MessageChannel,
|
||||
message: Message
|
||||
): Promise<void> => {
|
||||
channel.receiveMessage(message);
|
||||
await channel.processTasks();
|
||||
};
|
||||
|
||||
describe("MessageChannel", function () {
|
||||
this.timeout(5000);
|
||||
let channelA: MessageChannel;
|
||||
@ -40,21 +56,21 @@ describe("MessageChannel", function () {
|
||||
|
||||
it("should increase lamport timestamp", async () => {
|
||||
const timestampBefore = (channelA as any).lamportTimestamp;
|
||||
await channelA.sendMessage(new Uint8Array(), callback);
|
||||
await sendMessage(channelA, new Uint8Array(), callback);
|
||||
const timestampAfter = (channelA as any).lamportTimestamp;
|
||||
expect(timestampAfter).to.equal(timestampBefore + 1);
|
||||
});
|
||||
|
||||
it("should push the message to the outgoing buffer", async () => {
|
||||
const bufferLengthBefore = (channelA as any).outgoingBuffer.length;
|
||||
await channelA.sendMessage(new Uint8Array(), callback);
|
||||
await sendMessage(channelA, new Uint8Array(), callback);
|
||||
const bufferLengthAfter = (channelA as any).outgoingBuffer.length;
|
||||
expect(bufferLengthAfter).to.equal(bufferLengthBefore + 1);
|
||||
});
|
||||
|
||||
it("should insert message into bloom filter", async () => {
|
||||
const messageId = MessageChannel.getMessageId(new Uint8Array());
|
||||
await channelA.sendMessage(new Uint8Array(), callback);
|
||||
await sendMessage(channelA, new Uint8Array(), callback);
|
||||
const bloomFilter = getBloomFilter(channelA);
|
||||
expect(bloomFilter.lookup(messageId)).to.equal(true);
|
||||
});
|
||||
@ -62,7 +78,7 @@ describe("MessageChannel", function () {
|
||||
it("should insert message id into causal history", async () => {
|
||||
const expectedTimestamp = (channelA as any).lamportTimestamp + 1;
|
||||
const messageId = MessageChannel.getMessageId(new Uint8Array());
|
||||
await channelA.sendMessage(new Uint8Array(), callback);
|
||||
await sendMessage(channelA, new Uint8Array(), callback);
|
||||
const messageIdLog = (channelA as any).localHistory as {
|
||||
timestamp: number;
|
||||
historyEntry: HistoryEntry;
|
||||
@ -87,7 +103,7 @@ describe("MessageChannel", function () {
|
||||
|
||||
for (const message of messages) {
|
||||
filterBytes.push(bloomFilter.toBytes());
|
||||
await channelA.sendMessage(utf8ToBytes(message), callback);
|
||||
await sendMessage(channelA, utf8ToBytes(message), callback);
|
||||
bloomFilter.insert(MessageChannel.getMessageId(utf8ToBytes(message)));
|
||||
}
|
||||
|
||||
@ -123,9 +139,9 @@ describe("MessageChannel", function () {
|
||||
|
||||
it("should increase lamport timestamp", async () => {
|
||||
const timestampBefore = (channelA as any).lamportTimestamp;
|
||||
await channelB.sendMessage(new Uint8Array(), (message) => {
|
||||
channelA.receiveMessage(message);
|
||||
return Promise.resolve({ success: true });
|
||||
await sendMessage(channelB, new Uint8Array(), async (message) => {
|
||||
await receiveMessage(channelA, message);
|
||||
return { success: true };
|
||||
});
|
||||
const timestampAfter = (channelA as any).lamportTimestamp;
|
||||
expect(timestampAfter).to.equal(timestampBefore + 1);
|
||||
@ -133,12 +149,12 @@ describe("MessageChannel", function () {
|
||||
|
||||
it("should update lamport timestamp if greater than current timestamp and dependencies are met", async () => {
|
||||
for (const m of messagesA) {
|
||||
await channelA.sendMessage(utf8ToBytes(m), callback);
|
||||
await sendMessage(channelA, utf8ToBytes(m), callback);
|
||||
}
|
||||
for (const m of messagesB) {
|
||||
await channelB.sendMessage(utf8ToBytes(m), (message) => {
|
||||
channelA.receiveMessage(message);
|
||||
return Promise.resolve({ success: true });
|
||||
await sendMessage(channelB, utf8ToBytes(m), async (message) => {
|
||||
await receiveMessage(channelA, message);
|
||||
return { success: true };
|
||||
});
|
||||
}
|
||||
const timestampAfter = (channelA as any).lamportTimestamp;
|
||||
@ -148,20 +164,20 @@ describe("MessageChannel", function () {
|
||||
it("should maintain proper timestamps if all messages received", async () => {
|
||||
let timestamp = 0;
|
||||
for (const m of messagesA) {
|
||||
await channelA.sendMessage(utf8ToBytes(m), (message) => {
|
||||
await sendMessage(channelA, utf8ToBytes(m), async (message) => {
|
||||
timestamp++;
|
||||
channelB.receiveMessage(message);
|
||||
await receiveMessage(channelB, message);
|
||||
expect((channelB as any).lamportTimestamp).to.equal(timestamp);
|
||||
return Promise.resolve({ success: true });
|
||||
return { success: true };
|
||||
});
|
||||
}
|
||||
|
||||
for (const m of messagesB) {
|
||||
await channelB.sendMessage(utf8ToBytes(m), (message) => {
|
||||
await sendMessage(channelB, utf8ToBytes(m), async (message) => {
|
||||
timestamp++;
|
||||
channelA.receiveMessage(message);
|
||||
await receiveMessage(channelA, message);
|
||||
expect((channelA as any).lamportTimestamp).to.equal(timestamp);
|
||||
return Promise.resolve({ success: true });
|
||||
return { success: true };
|
||||
});
|
||||
}
|
||||
|
||||
@ -174,28 +190,32 @@ describe("MessageChannel", function () {
|
||||
|
||||
it("should add received messages to bloom filter", async () => {
|
||||
for (const m of messagesA) {
|
||||
await channelA.sendMessage(utf8ToBytes(m), (message) => {
|
||||
channelB.receiveMessage(message);
|
||||
await sendMessage(channelA, utf8ToBytes(m), async (message) => {
|
||||
await receiveMessage(channelB, message);
|
||||
const bloomFilter = getBloomFilter(channelB);
|
||||
expect(bloomFilter.lookup(message.messageId)).to.equal(true);
|
||||
return Promise.resolve({ success: true });
|
||||
return { success: true };
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
it("should add to incoming buffer if dependencies are not met", async () => {
|
||||
for (const m of messagesA) {
|
||||
await channelA.sendMessage(utf8ToBytes(m), callback);
|
||||
await sendMessage(channelA, utf8ToBytes(m), callback);
|
||||
}
|
||||
|
||||
let receivedMessage: Message | null = null;
|
||||
const timestampBefore = (channelB as any).lamportTimestamp;
|
||||
|
||||
await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
|
||||
receivedMessage = message;
|
||||
channelB.receiveMessage(message);
|
||||
return Promise.resolve({ success: true });
|
||||
});
|
||||
await sendMessage(
|
||||
channelA,
|
||||
utf8ToBytes(messagesB[0]),
|
||||
async (message) => {
|
||||
receivedMessage = message;
|
||||
await receiveMessage(channelB, message);
|
||||
return { success: true };
|
||||
}
|
||||
);
|
||||
|
||||
const incomingBuffer = (channelB as any).incomingBuffer as Message[];
|
||||
expect(incomingBuffer.length).to.equal(1);
|
||||
@ -227,26 +247,35 @@ describe("MessageChannel", function () {
|
||||
|
||||
it("should mark all messages in causal history as acknowledged", async () => {
|
||||
for (const m of messagesA) {
|
||||
await channelA.sendMessage(utf8ToBytes(m), (message) => {
|
||||
channelB.receiveMessage(message);
|
||||
return Promise.resolve({ success: true });
|
||||
await sendMessage(channelA, utf8ToBytes(m), async (message) => {
|
||||
await receiveMessage(channelB, message);
|
||||
return { success: true };
|
||||
});
|
||||
}
|
||||
|
||||
let notInHistory: Message | null = null;
|
||||
await channelA.sendMessage(utf8ToBytes("not-in-history"), (message) => {
|
||||
notInHistory = message;
|
||||
return Promise.resolve({ success: true });
|
||||
});
|
||||
await sendMessage(
|
||||
channelA,
|
||||
utf8ToBytes("not-in-history"),
|
||||
async (message) => {
|
||||
notInHistory = message;
|
||||
await receiveMessage(channelB, message);
|
||||
return { success: true };
|
||||
}
|
||||
);
|
||||
|
||||
expect((channelA as any).outgoingBuffer.length).to.equal(
|
||||
messagesA.length + 1
|
||||
);
|
||||
|
||||
await channelB.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
|
||||
channelA.receiveMessage(message);
|
||||
return Promise.resolve({ success: true });
|
||||
});
|
||||
await sendMessage(
|
||||
channelB,
|
||||
utf8ToBytes(messagesB[0]),
|
||||
async (message) => {
|
||||
await receiveMessage(channelA, message);
|
||||
return { success: true };
|
||||
}
|
||||
);
|
||||
|
||||
// Since messagesA are in causal history of channel B's message
|
||||
// they should be gone from channel A's outgoing buffer
|
||||
@ -268,23 +297,24 @@ describe("MessageChannel", function () {
|
||||
const messages = [...messagesA, ...messagesB.slice(0, -1)];
|
||||
// Send messages to be received by channel B
|
||||
for (const m of messages) {
|
||||
await channelA.sendMessage(utf8ToBytes(m), (message) => {
|
||||
channelB.receiveMessage(message);
|
||||
return Promise.resolve({ success: true });
|
||||
await sendMessage(channelA, utf8ToBytes(m), async (message) => {
|
||||
await receiveMessage(channelB, message);
|
||||
return { success: true };
|
||||
});
|
||||
}
|
||||
|
||||
// Send messages not received by channel B
|
||||
for (const m of unacknowledgedMessages) {
|
||||
await channelA.sendMessage(utf8ToBytes(m), callback);
|
||||
await sendMessage(channelA, utf8ToBytes(m), callback);
|
||||
}
|
||||
|
||||
// Channel B sends a message to channel A
|
||||
await channelB.sendMessage(
|
||||
await sendMessage(
|
||||
channelB,
|
||||
utf8ToBytes(messagesB[messagesB.length - 1]),
|
||||
(message) => {
|
||||
channelA.receiveMessage(message);
|
||||
return Promise.resolve({ success: true });
|
||||
async (message) => {
|
||||
await receiveMessage(channelA, message);
|
||||
return { success: true };
|
||||
}
|
||||
);
|
||||
|
||||
@ -316,9 +346,9 @@ describe("MessageChannel", function () {
|
||||
// in the bloom filter as before, which should mark them as fully acknowledged in channel A
|
||||
for (let i = 1; i < acknowledgementCount; i++) {
|
||||
// Send messages until acknowledgement count is reached
|
||||
await channelB.sendMessage(utf8ToBytes(`x-${i}`), (message) => {
|
||||
channelA.receiveMessage(message);
|
||||
return Promise.resolve({ success: true });
|
||||
await sendMessage(channelB, utf8ToBytes(`x-${i}`), async (message) => {
|
||||
await receiveMessage(channelA, message);
|
||||
return { success: true };
|
||||
});
|
||||
}
|
||||
|
||||
@ -349,13 +379,17 @@ describe("MessageChannel", function () {
|
||||
it("should detect messages with missing dependencies", async () => {
|
||||
const causalHistorySize = (channelA as any).causalHistorySize;
|
||||
for (const m of messagesA) {
|
||||
await channelA.sendMessage(utf8ToBytes(m), callback);
|
||||
await sendMessage(channelA, utf8ToBytes(m), callback);
|
||||
}
|
||||
|
||||
await channelA.sendMessage(utf8ToBytes(messagesB[0]), async (message) => {
|
||||
channelB.receiveMessage(message);
|
||||
return Promise.resolve({ success: true });
|
||||
});
|
||||
await sendMessage(
|
||||
channelA,
|
||||
utf8ToBytes(messagesB[0]),
|
||||
async (message) => {
|
||||
await receiveMessage(channelB, message);
|
||||
return { success: true };
|
||||
}
|
||||
);
|
||||
|
||||
const incomingBuffer = (channelB as any).incomingBuffer as Message[];
|
||||
expect(incomingBuffer.length).to.equal(1);
|
||||
@ -374,16 +408,21 @@ describe("MessageChannel", function () {
|
||||
const causalHistorySize = (channelA as any).causalHistorySize;
|
||||
const sentMessages = new Array<Message>();
|
||||
for (const m of messagesA) {
|
||||
await channelA.sendMessage(utf8ToBytes(m), (message) => {
|
||||
await sendMessage(channelA, utf8ToBytes(m), async (message) => {
|
||||
sentMessages.push(message);
|
||||
return Promise.resolve({ success: true });
|
||||
await receiveMessage(channelB, message);
|
||||
return { success: true };
|
||||
});
|
||||
}
|
||||
|
||||
await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
|
||||
channelB.receiveMessage(message);
|
||||
return Promise.resolve({ success: true });
|
||||
});
|
||||
await sendMessage(
|
||||
channelA,
|
||||
utf8ToBytes(messagesB[0]),
|
||||
async (message) => {
|
||||
await receiveMessage(channelB, message);
|
||||
return { success: true };
|
||||
}
|
||||
);
|
||||
|
||||
const missingMessages = channelB.sweepIncomingBuffer();
|
||||
expect(missingMessages.length).to.equal(causalHistorySize);
|
||||
@ -394,9 +433,9 @@ describe("MessageChannel", function () {
|
||||
let incomingBuffer = (channelB as any).incomingBuffer as Message[];
|
||||
expect(incomingBuffer.length).to.equal(1);
|
||||
|
||||
sentMessages.forEach((m) => {
|
||||
channelB.receiveMessage(m);
|
||||
});
|
||||
for (const m of sentMessages) {
|
||||
await receiveMessage(channelB, m);
|
||||
}
|
||||
|
||||
const missingMessages2 = channelB.sweepIncomingBuffer();
|
||||
expect(missingMessages2.length).to.equal(0);
|
||||
@ -414,13 +453,17 @@ describe("MessageChannel", function () {
|
||||
});
|
||||
|
||||
for (const m of messagesA) {
|
||||
await channelA.sendMessage(utf8ToBytes(m), callback);
|
||||
await sendMessage(channelA, utf8ToBytes(m), callback);
|
||||
}
|
||||
|
||||
await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
|
||||
channelC.receiveMessage(message);
|
||||
return Promise.resolve({ success: true });
|
||||
});
|
||||
await sendMessage(
|
||||
channelA,
|
||||
utf8ToBytes(messagesB[0]),
|
||||
async (message) => {
|
||||
await receiveMessage(channelC, message);
|
||||
return { success: true };
|
||||
}
|
||||
);
|
||||
|
||||
const missingMessages = channelC.sweepIncomingBuffer();
|
||||
expect(missingMessages.length).to.equal(causalHistorySize);
|
||||
@ -444,10 +487,10 @@ describe("MessageChannel", function () {
|
||||
it("should partition messages based on acknowledgement status", async () => {
|
||||
const unacknowledgedMessages: Message[] = [];
|
||||
for (const m of messagesA) {
|
||||
await channelA.sendMessage(utf8ToBytes(m), (message) => {
|
||||
await sendMessage(channelA, utf8ToBytes(m), async (message) => {
|
||||
unacknowledgedMessages.push(message);
|
||||
channelB.receiveMessage(message);
|
||||
return Promise.resolve({ success: true });
|
||||
await receiveMessage(channelB, message);
|
||||
return { success: true };
|
||||
});
|
||||
}
|
||||
|
||||
@ -459,14 +502,15 @@ describe("MessageChannel", function () {
|
||||
// Make sure messages sent by channel A are not in causal history
|
||||
const causalHistorySize = (channelA as any).causalHistorySize;
|
||||
for (const m of messagesB.slice(0, causalHistorySize)) {
|
||||
await channelB.sendMessage(utf8ToBytes(m), callback);
|
||||
await sendMessage(channelB, utf8ToBytes(m), callback);
|
||||
}
|
||||
|
||||
await channelB.sendMessage(
|
||||
await sendMessage(
|
||||
channelB,
|
||||
utf8ToBytes(messagesB[causalHistorySize]),
|
||||
(message) => {
|
||||
channelA.receiveMessage(message);
|
||||
return Promise.resolve({ success: true });
|
||||
async (message) => {
|
||||
await receiveMessage(channelA, message);
|
||||
return { success: true };
|
||||
}
|
||||
);
|
||||
|
||||
@ -486,9 +530,9 @@ describe("MessageChannel", function () {
|
||||
});
|
||||
|
||||
it("should be sent with empty content", async () => {
|
||||
await channelA.sendSyncMessage((message) => {
|
||||
await channelA.sendSyncMessage(async (message) => {
|
||||
expect(message.content?.length).to.equal(0);
|
||||
return Promise.resolve(true);
|
||||
return true;
|
||||
});
|
||||
});
|
||||
|
||||
@ -513,10 +557,10 @@ describe("MessageChannel", function () {
|
||||
it("should be delivered but not added to local log or bloom filter", async () => {
|
||||
const timestampBefore = (channelB as any).lamportTimestamp;
|
||||
let expectedTimestamp: number | undefined;
|
||||
await channelA.sendSyncMessage((message) => {
|
||||
await channelA.sendSyncMessage(async (message) => {
|
||||
expectedTimestamp = message.lamportTimestamp;
|
||||
channelB.receiveMessage(message);
|
||||
return Promise.resolve(true);
|
||||
await receiveMessage(channelB, message);
|
||||
return true;
|
||||
});
|
||||
const timestampAfter = (channelB as any).lamportTimestamp;
|
||||
expect(timestampAfter).to.equal(expectedTimestamp);
|
||||
@ -536,15 +580,15 @@ describe("MessageChannel", function () {
|
||||
|
||||
it("should update ack status of messages in outgoing buffer", async () => {
|
||||
for (const m of messagesA) {
|
||||
await channelA.sendMessage(utf8ToBytes(m), (message) => {
|
||||
channelB.receiveMessage(message);
|
||||
return Promise.resolve({ success: true });
|
||||
await sendMessage(channelA, utf8ToBytes(m), async (message) => {
|
||||
await receiveMessage(channelB, message);
|
||||
return { success: true };
|
||||
});
|
||||
}
|
||||
|
||||
await channelB.sendSyncMessage((message) => {
|
||||
channelA.receiveMessage(message);
|
||||
return Promise.resolve(true);
|
||||
await sendMessage(channelB, new Uint8Array(), async (message) => {
|
||||
await receiveMessage(channelA, message);
|
||||
return { success: true };
|
||||
});
|
||||
|
||||
const causalHistorySize = (channelA as any).causalHistorySize;
|
||||
@ -560,9 +604,9 @@ describe("MessageChannel", function () {
|
||||
channelA = new MessageChannel(channelId);
|
||||
});
|
||||
|
||||
it("should be sent without a timestamp, causal history, or bloom filter", () => {
|
||||
it("should be sent without a timestamp, causal history, or bloom filter", async () => {
|
||||
const timestampBefore = (channelA as any).lamportTimestamp;
|
||||
channelA.sendEphemeralMessage(new Uint8Array(), (message) => {
|
||||
await channelA.sendEphemeralMessage(new Uint8Array(), async (message) => {
|
||||
expect(message.lamportTimestamp).to.equal(undefined);
|
||||
expect(message.causalHistory).to.deep.equal([]);
|
||||
expect(message.bloomFilter).to.equal(undefined);
|
||||
@ -577,33 +621,37 @@ describe("MessageChannel", function () {
|
||||
});
|
||||
|
||||
it("should be delivered immediately if received", async () => {
|
||||
let deliveredMessageId: string | undefined;
|
||||
let sentMessage: Message | undefined;
|
||||
|
||||
const channelB = new MessageChannel(channelId, {
|
||||
deliveredMessageCallback: (messageId) => {
|
||||
deliveredMessageId = messageId;
|
||||
}
|
||||
});
|
||||
const channelB = new MessageChannel(channelId);
|
||||
|
||||
const waitForMessageDelivered = new Promise<string>((resolve) => {
|
||||
const waitForMessageDelivered = new Promise<{
|
||||
messageId: string;
|
||||
sentOrReceived: "sent" | "received";
|
||||
}>((resolve) => {
|
||||
channelB.addEventListener(
|
||||
MessageChannelEvent.MessageDelivered,
|
||||
(event) => {
|
||||
resolve(event.detail);
|
||||
resolve({
|
||||
messageId: event.detail.messageId,
|
||||
sentOrReceived: event.detail.sentOrReceived
|
||||
});
|
||||
}
|
||||
);
|
||||
|
||||
channelA.sendEphemeralMessage(utf8ToBytes(messagesA[0]), (message) => {
|
||||
sentMessage = message;
|
||||
channelB.receiveMessage(message);
|
||||
return true;
|
||||
});
|
||||
// channelA.sendEphemeralMessage(
|
||||
// utf8ToBytes(messagesA[0]),
|
||||
// async (message) => {
|
||||
// sentMessage = message;
|
||||
// await receiveMessage(channelB, message);
|
||||
// return true;
|
||||
// }
|
||||
// );
|
||||
});
|
||||
|
||||
const eventMessageId = await waitForMessageDelivered;
|
||||
expect(deliveredMessageId).to.equal(sentMessage?.messageId);
|
||||
expect(eventMessageId).to.equal(sentMessage?.messageId);
|
||||
const { messageId, sentOrReceived } = await waitForMessageDelivered;
|
||||
expect(messageId).to.equal(sentMessage?.messageId);
|
||||
expect(sentOrReceived).to.equal("sent");
|
||||
});
|
||||
});
|
||||
});
|
||||
@ -1,20 +1,17 @@
|
||||
import { TypedEventEmitter } from "@libp2p/interface";
|
||||
import { sha256 } from "@noble/hashes/sha256";
|
||||
import { bytesToHex } from "@noble/hashes/utils";
|
||||
import { proto_sds_message } from "@waku/proto";
|
||||
|
||||
import { DefaultBloomFilter } from "./bloom.js";
|
||||
import { DefaultBloomFilter } from "../bloom_filter/bloom.js";
|
||||
|
||||
export enum MessageChannelEvent {
|
||||
MessageDelivered = "messageDelivered"
|
||||
}
|
||||
type MessageChannelEvents = {
|
||||
[MessageChannelEvent.MessageDelivered]: CustomEvent<string>;
|
||||
};
|
||||
|
||||
export type Message = proto_sds_message.SdsMessage;
|
||||
export type HistoryEntry = proto_sds_message.HistoryEntry;
|
||||
export type ChannelId = string;
|
||||
import { Command, Handlers, ParamsByAction, Task } from "./command_queue.js";
|
||||
import {
|
||||
ChannelId,
|
||||
HistoryEntry,
|
||||
Message,
|
||||
MessageChannelEvent,
|
||||
MessageChannelEvents
|
||||
} from "./events.js";
|
||||
|
||||
export const DEFAULT_BLOOM_FILTER_OPTIONS = {
|
||||
capacity: 10000,
|
||||
@ -28,7 +25,6 @@ interface MessageChannelOptions {
|
||||
causalHistorySize?: number;
|
||||
receivedMessageTimeoutEnabled?: boolean;
|
||||
receivedMessageTimeout?: number;
|
||||
deliveredMessageCallback?: (messageId: string) => void;
|
||||
}
|
||||
|
||||
export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
@ -38,13 +34,31 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
private acknowledgements: Map<string, number>;
|
||||
private incomingBuffer: Message[];
|
||||
private localHistory: { timestamp: number; historyEntry: HistoryEntry }[];
|
||||
private channelId: ChannelId;
|
||||
public channelId: ChannelId;
|
||||
private causalHistorySize: number;
|
||||
private acknowledgementCount: number;
|
||||
private timeReceived: Map<string, number>;
|
||||
private receivedMessageTimeoutEnabled: boolean;
|
||||
private receivedMessageTimeout: number;
|
||||
private deliveredMessageCallback?: (messageId: string) => void;
|
||||
|
||||
private tasks: Task[] = [];
|
||||
private handlers: Handlers = {
|
||||
[Command.Send]: async (
|
||||
params: ParamsByAction[Command.Send]
|
||||
): Promise<void> => {
|
||||
await this._sendMessage(params.payload, params.callback);
|
||||
},
|
||||
[Command.Receive]: async (
|
||||
params: ParamsByAction[Command.Receive]
|
||||
): Promise<void> => {
|
||||
this._receiveMessage(params.message);
|
||||
},
|
||||
[Command.SendEphemeral]: async (
|
||||
params: ParamsByAction[Command.SendEphemeral]
|
||||
): Promise<void> => {
|
||||
await this._sendEphemeralMessage(params.payload, params.callback);
|
||||
}
|
||||
};
|
||||
|
||||
public constructor(
|
||||
channelId: ChannelId,
|
||||
@ -66,7 +80,25 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
options.receivedMessageTimeoutEnabled ?? false;
|
||||
this.receivedMessageTimeout =
|
||||
options.receivedMessageTimeout ?? DEFAULT_RECEIVED_MESSAGE_TIMEOUT;
|
||||
this.deliveredMessageCallback = options.deliveredMessageCallback;
|
||||
}
|
||||
|
||||
// Periodically called by the library consumer to process async operations
|
||||
// in a sequential manner.
|
||||
public async processTasks(): Promise<void> {
|
||||
while (this.tasks.length > 0) {
|
||||
const item = this.tasks.shift();
|
||||
if (!item) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Use a generic helper function to ensure type safety
|
||||
await this.executeTask(item);
|
||||
}
|
||||
}
|
||||
|
||||
private async executeTask<A extends Command>(item: Task<A>): Promise<void> {
|
||||
const handler = this.handlers[item.command];
|
||||
await handler(item.params as ParamsByAction[A]);
|
||||
}
|
||||
|
||||
public static getMessageId(payload: Uint8Array): string {
|
||||
@ -95,6 +127,22 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
success: boolean;
|
||||
retrievalHint?: Uint8Array;
|
||||
}>
|
||||
): Promise<void> {
|
||||
this.tasks.push({
|
||||
command: Command.Send,
|
||||
params: {
|
||||
payload,
|
||||
callback
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public async _sendMessage(
|
||||
payload: Uint8Array,
|
||||
callback?: (message: Message) => Promise<{
|
||||
success: boolean;
|
||||
retrievalHint?: Uint8Array;
|
||||
}>
|
||||
): Promise<void> {
|
||||
this.lamportTimestamp++;
|
||||
|
||||
@ -124,6 +172,10 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
retrievalHint
|
||||
}
|
||||
});
|
||||
this.timeReceived.set(messageId, Date.now());
|
||||
this.safeDispatchEvent(MessageChannelEvent.MessageSent, {
|
||||
detail: message
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -141,10 +193,23 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
* @param payload - The payload to send.
|
||||
* @param callback - A callback function that returns a boolean indicating whether the message was sent successfully.
|
||||
*/
|
||||
public sendEphemeralMessage(
|
||||
public async sendEphemeralMessage(
|
||||
payload: Uint8Array,
|
||||
callback?: (message: Message) => boolean
|
||||
): void {
|
||||
callback?: (message: Message) => Promise<boolean>
|
||||
): Promise<void> {
|
||||
this.tasks.push({
|
||||
command: Command.SendEphemeral,
|
||||
params: {
|
||||
payload,
|
||||
callback
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public async _sendEphemeralMessage(
|
||||
payload: Uint8Array,
|
||||
callback?: (message: Message) => Promise<boolean>
|
||||
): Promise<void> {
|
||||
const message: Message = {
|
||||
messageId: MessageChannel.getMessageId(payload),
|
||||
channelId: this.channelId,
|
||||
@ -155,9 +220,10 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
};
|
||||
|
||||
if (callback) {
|
||||
callback(message);
|
||||
await callback(message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a received SDS message for this channel.
|
||||
*
|
||||
@ -172,12 +238,33 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
*
|
||||
* @param message - The received SDS message.
|
||||
*/
|
||||
|
||||
public receiveMessage(message: Message): void {
|
||||
this.tasks.push({
|
||||
command: Command.Receive,
|
||||
params: {
|
||||
message
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public _receiveMessage(message: Message): void {
|
||||
if (this.timeReceived.has(message.messageId)) {
|
||||
// Received a duplicate message
|
||||
return;
|
||||
}
|
||||
|
||||
if (!message.lamportTimestamp) {
|
||||
// Messages with no timestamp are ephemeral messages and should be delivered immediately
|
||||
this.safeDispatchEvent(MessageChannelEvent.SyncReceived, {
|
||||
detail: message
|
||||
});
|
||||
this.deliverMessage(message);
|
||||
return;
|
||||
}
|
||||
this.safeDispatchEvent(MessageChannelEvent.MessageReceived, {
|
||||
detail: message
|
||||
});
|
||||
// review ack status
|
||||
this.reviewAckStatus(message);
|
||||
// add to bloom filter (skip for messages with empty content)
|
||||
@ -196,10 +283,18 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
this.timeReceived.set(message.messageId, Date.now());
|
||||
} else {
|
||||
this.deliverMessage(message);
|
||||
this.safeDispatchEvent(MessageChannelEvent.MessageDelivered, {
|
||||
detail: {
|
||||
messageId: message.messageId,
|
||||
sentOrReceived: "received"
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// https://rfc.vac.dev/vac/raw/sds/#periodic-incoming-buffer-sweep
|
||||
// Note that even though this function has side effects, it is not async
|
||||
// and does not need to be called through the queue.
|
||||
public sweepIncomingBuffer(): HistoryEntry[] {
|
||||
const { buffer, missing } = this.incomingBuffer.reduce<{
|
||||
buffer: Message[];
|
||||
@ -218,6 +313,12 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
// Any message with no missing dependencies is delivered
|
||||
// and removed from the buffer (implicitly by not adding it to the new incoming buffer)
|
||||
this.deliverMessage(message);
|
||||
this.safeDispatchEvent(MessageChannelEvent.MessageDelivered, {
|
||||
detail: {
|
||||
messageId: message.messageId,
|
||||
sentOrReceived: "received"
|
||||
}
|
||||
});
|
||||
return { buffer, missing };
|
||||
}
|
||||
|
||||
@ -243,6 +344,11 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
);
|
||||
// Update the incoming buffer to only include messages with no missing dependencies
|
||||
this.incomingBuffer = buffer;
|
||||
if (missing.length > 0) {
|
||||
this.safeDispatchEvent(MessageChannelEvent.MissedMessages, {
|
||||
detail: missing
|
||||
});
|
||||
}
|
||||
return missing;
|
||||
}
|
||||
|
||||
@ -285,7 +391,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
*
|
||||
* @param callback - A callback function that returns a boolean indicating whether the message was sent successfully.
|
||||
*/
|
||||
public sendSyncMessage(
|
||||
public async sendSyncMessage(
|
||||
callback?: (message: Message) => Promise<boolean>
|
||||
): Promise<boolean> {
|
||||
this.lamportTimestamp++;
|
||||
@ -304,15 +410,17 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
};
|
||||
|
||||
if (callback) {
|
||||
return callback(message);
|
||||
await callback(message);
|
||||
this.safeDispatchEvent(MessageChannelEvent.SyncSent, {
|
||||
detail: message
|
||||
});
|
||||
return true;
|
||||
}
|
||||
return Promise.resolve(false);
|
||||
return false;
|
||||
}
|
||||
|
||||
// See https://rfc.vac.dev/vac/raw/sds/#deliver-message
|
||||
private deliverMessage(message: Message, retrievalHint?: Uint8Array): void {
|
||||
this.notifyDeliveredMessage(message.messageId);
|
||||
|
||||
const messageLamportTimestamp = message.lamportTimestamp ?? 0;
|
||||
if (messageLamportTimestamp > this.lamportTimestamp) {
|
||||
this.lamportTimestamp = messageLamportTimestamp;
|
||||
@ -355,7 +463,15 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
// the participant MUST mark all messages in the received causal_history as acknowledged.
|
||||
receivedMessage.causalHistory.forEach(({ messageId }) => {
|
||||
this.outgoingBuffer = this.outgoingBuffer.filter(
|
||||
({ messageId: outgoingMessageId }) => outgoingMessageId !== messageId
|
||||
({ messageId: outgoingMessageId }) => {
|
||||
if (outgoingMessageId !== messageId) {
|
||||
return true;
|
||||
}
|
||||
this.safeDispatchEvent(MessageChannelEvent.MessageAcknowledged, {
|
||||
detail: messageId
|
||||
});
|
||||
return false;
|
||||
}
|
||||
);
|
||||
this.acknowledgements.delete(messageId);
|
||||
if (!this.filter.lookup(messageId)) {
|
||||
@ -380,6 +496,12 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
const count = (this.acknowledgements.get(message.messageId) ?? 0) + 1;
|
||||
if (count < this.acknowledgementCount) {
|
||||
this.acknowledgements.set(message.messageId, count);
|
||||
this.safeDispatchEvent(MessageChannelEvent.PartialAcknowledgement, {
|
||||
detail: {
|
||||
messageId: message.messageId,
|
||||
count
|
||||
}
|
||||
});
|
||||
return true;
|
||||
}
|
||||
this.acknowledgements.delete(message.messageId);
|
||||
@ -391,15 +513,4 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
private getAcknowledgementCount(): number {
|
||||
return 2;
|
||||
}
|
||||
|
||||
private notifyDeliveredMessage(messageId: string): void {
|
||||
if (this.deliveredMessageCallback) {
|
||||
this.deliveredMessageCallback(messageId);
|
||||
}
|
||||
this.dispatchEvent(
|
||||
new CustomEvent(MessageChannelEvent.MessageDelivered, {
|
||||
detail: messageId
|
||||
})
|
||||
);
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user