diff --git a/packages/sds/src/message_channel/events.ts b/packages/sds/src/message_channel/events.ts index e3ca17936e..546f6aafad 100644 --- a/packages/sds/src/message_channel/events.ts +++ b/packages/sds/src/message_channel/events.ts @@ -11,6 +11,7 @@ export enum MessageChannelEvent { SyncReceived = "syncReceived" } +export type MessageId = string; export type Message = proto_sds_message.SdsMessage; export type HistoryEntry = proto_sds_message.HistoryEntry; export type ChannelId = string; @@ -26,13 +27,13 @@ export function decodeMessage(data: Uint8Array): Message { export type MessageChannelEvents = { [MessageChannelEvent.MessageSent]: CustomEvent; [MessageChannelEvent.MessageDelivered]: CustomEvent<{ - messageId: string; + messageId: MessageId; sentOrReceived: "sent" | "received"; }>; [MessageChannelEvent.MessageReceived]: CustomEvent; - [MessageChannelEvent.MessageAcknowledged]: CustomEvent; + [MessageChannelEvent.MessageAcknowledged]: CustomEvent; [MessageChannelEvent.PartialAcknowledgement]: CustomEvent<{ - messageId: string; + messageId: MessageId; count: number; }>; [MessageChannelEvent.MissedMessages]: CustomEvent; diff --git a/packages/sds/src/message_channel/message_channel.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts index c9bbcc455e..e541da8d34 100644 --- a/packages/sds/src/message_channel/message_channel.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -3,7 +3,7 @@ import { expect } from "chai"; import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; -import { HistoryEntry, Message } from "./events.js"; +import { HistoryEntry, Message, MessageId } from "./events.js"; import { DEFAULT_BLOOM_FILTER_OPTIONS, MessageChannel @@ -293,6 +293,21 @@ describe("MessageChannel", function () { ); }); + it("should not mark messages in causal history as acknowledged if it's our own message", async () => { + for (const m of messagesA) { + await sendMessage(channelA, utf8ToBytes(m), async (message) => { + await receiveMessage(channelA, message); // same channel used on purpose + return { success: true }; + }); + } + await channelA.processTasks(); + + // All messages remain in the buffer + expect((channelA as any).outgoingBuffer.length).to.equal( + messagesA.length + ); + }); + it("should track probabilistic acknowledgements of messages received in bloom filter", async () => { const acknowledgementCount = (channelA as any).acknowledgementCount; @@ -326,7 +341,7 @@ describe("MessageChannel", function () { } ); - const acknowledgements: ReadonlyMap = (channelA as any) + const acknowledgements: ReadonlyMap = (channelA as any) .acknowledgements; // Other than the message IDs which were included in causal history, // the remaining messages sent by channel A should be considered possibly acknowledged @@ -376,6 +391,20 @@ describe("MessageChannel", function () { ).to.equal(true); }); }); + + it("should not track probabilistic acknowledgements of messages received in bloom filter of own messages", async () => { + for (const m of messagesA) { + await sendMessage(channelA, utf8ToBytes(m), async (message) => { + await receiveMessage(channelA, message); + return { success: true }; + }); + } + + const acknowledgements: ReadonlyMap = (channelA as any) + .acknowledgements; + + expect(acknowledgements.size).to.equal(0); + }); }); describe("Sweeping incoming buffer", () => { @@ -566,7 +595,7 @@ describe("MessageChannel", function () { const localLog = (channelA as any).localHistory as { timestamp: number; - messageId: string; + messageId: MessageId; }[]; expect(localLog.length).to.equal(0); }); @@ -585,7 +614,7 @@ describe("MessageChannel", function () { const localLog = (channelB as any).localHistory as { timestamp: number; - messageId: string; + messageId: MessageId; }[]; expect(localLog.length).to.equal(0); diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index 12ae632283..b53e6caa4f 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -1,5 +1,5 @@ import { TypedEventEmitter } from "@libp2p/interface"; -import { sha256 } from "@noble/hashes/sha256"; +import { sha256 } from "@noble/hashes/sha2"; import { bytesToHex } from "@noble/hashes/utils"; import { Logger } from "@waku/utils"; @@ -11,7 +11,8 @@ import { HistoryEntry, Message, MessageChannelEvent, - MessageChannelEvents + MessageChannelEvents, + type MessageId } from "./events.js"; export const DEFAULT_BLOOM_FILTER_OPTIONS = { @@ -35,14 +36,16 @@ export class MessageChannel extends TypedEventEmitter { private lamportTimestamp: number; private filter: DefaultBloomFilter; private outgoingBuffer: Message[]; - private acknowledgements: Map; + private acknowledgements: Map; private incomingBuffer: Message[]; private localHistory: { timestamp: number; historyEntry: HistoryEntry }[]; - private causalHistorySize: number; - private acknowledgementCount: number; - private timeReceived: Map; - private receivedMessageTimeoutEnabled: boolean; - private receivedMessageTimeout: number; + private timeReceived: Map; + // TODO: To be removed once sender id is added to SDS protocol + private outgoingMessages: Set; + private readonly causalHistorySize: number; + private readonly acknowledgementCount: number; + private readonly receivedMessageTimeoutEnabled: boolean; + private readonly receivedMessageTimeout: number; private tasks: Task[] = []; private handlers: Handlers = { @@ -83,9 +86,10 @@ export class MessageChannel extends TypedEventEmitter { options.receivedMessageTimeoutEnabled ?? false; this.receivedMessageTimeout = options.receivedMessageTimeout ?? DEFAULT_RECEIVED_MESSAGE_TIMEOUT; + this.outgoingMessages = new Set(); } - public static getMessageId(payload: Uint8Array): string { + public static getMessageId(payload: Uint8Array): MessageId { return bytesToHex(sha256(payload)); } @@ -354,6 +358,15 @@ export class MessageChannel extends TypedEventEmitter { return; } + const isOwnOutgoingMessage = + message.content && + message.content.length > 0 && + this.outgoingMessages.has(MessageChannel.getMessageId(message.content)); + + if (isOwnOutgoingMessage) { + return; + } + if (!message.lamportTimestamp) { this.deliverMessage(message); return; @@ -427,6 +440,8 @@ export class MessageChannel extends TypedEventEmitter { const messageId = MessageChannel.getMessageId(payload); + this.outgoingMessages.add(messageId); + const message: Message = { messageId, channelId: this.channelId,