fix: should not self-acknowledge messages (#2528)

* introduce `MessageId` type

* fix deprecated import

* test: own messages are not used for acks

* fix: own messages are not used for ack

* fix: own messages are not used for ack

* test: do not self-possible-ack

* doc: long term solution is SDS protocol change
This commit is contained in:
fryorcraken 2025-08-08 10:18:01 +10:00 committed by GitHub
parent 52e5c34520
commit 459fe96fe6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 61 additions and 16 deletions

View File

@ -11,6 +11,7 @@ export enum MessageChannelEvent {
SyncReceived = "syncReceived"
}
export type MessageId = string;
export type Message = proto_sds_message.SdsMessage;
export type HistoryEntry = proto_sds_message.HistoryEntry;
export type ChannelId = string;
@ -26,13 +27,13 @@ export function decodeMessage(data: Uint8Array): Message {
export type MessageChannelEvents = {
[MessageChannelEvent.MessageSent]: CustomEvent<Message>;
[MessageChannelEvent.MessageDelivered]: CustomEvent<{
messageId: string;
messageId: MessageId;
sentOrReceived: "sent" | "received";
}>;
[MessageChannelEvent.MessageReceived]: CustomEvent<Message>;
[MessageChannelEvent.MessageAcknowledged]: CustomEvent<string>;
[MessageChannelEvent.MessageAcknowledged]: CustomEvent<MessageId>;
[MessageChannelEvent.PartialAcknowledgement]: CustomEvent<{
messageId: string;
messageId: MessageId;
count: number;
}>;
[MessageChannelEvent.MissedMessages]: CustomEvent<HistoryEntry[]>;

View File

@ -3,7 +3,7 @@ import { expect } from "chai";
import { DefaultBloomFilter } from "../bloom_filter/bloom.js";
import { HistoryEntry, Message } from "./events.js";
import { HistoryEntry, Message, MessageId } from "./events.js";
import {
DEFAULT_BLOOM_FILTER_OPTIONS,
MessageChannel
@ -293,6 +293,21 @@ describe("MessageChannel", function () {
);
});
it("should not mark messages in causal history as acknowledged if it's our own message", async () => {
for (const m of messagesA) {
await sendMessage(channelA, utf8ToBytes(m), async (message) => {
await receiveMessage(channelA, message); // same channel used on purpose
return { success: true };
});
}
await channelA.processTasks();
// All messages remain in the buffer
expect((channelA as any).outgoingBuffer.length).to.equal(
messagesA.length
);
});
it("should track probabilistic acknowledgements of messages received in bloom filter", async () => {
const acknowledgementCount = (channelA as any).acknowledgementCount;
@ -326,7 +341,7 @@ describe("MessageChannel", function () {
}
);
const acknowledgements: ReadonlyMap<string, number> = (channelA as any)
const acknowledgements: ReadonlyMap<MessageId, number> = (channelA as any)
.acknowledgements;
// Other than the message IDs which were included in causal history,
// the remaining messages sent by channel A should be considered possibly acknowledged
@ -376,6 +391,20 @@ describe("MessageChannel", function () {
).to.equal(true);
});
});
it("should not track probabilistic acknowledgements of messages received in bloom filter of own messages", async () => {
for (const m of messagesA) {
await sendMessage(channelA, utf8ToBytes(m), async (message) => {
await receiveMessage(channelA, message);
return { success: true };
});
}
const acknowledgements: ReadonlyMap<MessageId, number> = (channelA as any)
.acknowledgements;
expect(acknowledgements.size).to.equal(0);
});
});
describe("Sweeping incoming buffer", () => {
@ -566,7 +595,7 @@ describe("MessageChannel", function () {
const localLog = (channelA as any).localHistory as {
timestamp: number;
messageId: string;
messageId: MessageId;
}[];
expect(localLog.length).to.equal(0);
});
@ -585,7 +614,7 @@ describe("MessageChannel", function () {
const localLog = (channelB as any).localHistory as {
timestamp: number;
messageId: string;
messageId: MessageId;
}[];
expect(localLog.length).to.equal(0);

View File

@ -1,5 +1,5 @@
import { TypedEventEmitter } from "@libp2p/interface";
import { sha256 } from "@noble/hashes/sha256";
import { sha256 } from "@noble/hashes/sha2";
import { bytesToHex } from "@noble/hashes/utils";
import { Logger } from "@waku/utils";
@ -11,7 +11,8 @@ import {
HistoryEntry,
Message,
MessageChannelEvent,
MessageChannelEvents
MessageChannelEvents,
type MessageId
} from "./events.js";
export const DEFAULT_BLOOM_FILTER_OPTIONS = {
@ -35,14 +36,16 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
private lamportTimestamp: number;
private filter: DefaultBloomFilter;
private outgoingBuffer: Message[];
private acknowledgements: Map<string, number>;
private acknowledgements: Map<MessageId, number>;
private incomingBuffer: Message[];
private localHistory: { timestamp: number; historyEntry: HistoryEntry }[];
private causalHistorySize: number;
private acknowledgementCount: number;
private timeReceived: Map<string, number>;
private receivedMessageTimeoutEnabled: boolean;
private receivedMessageTimeout: number;
private timeReceived: Map<MessageId, number>;
// TODO: To be removed once sender id is added to SDS protocol
private outgoingMessages: Set<MessageId>;
private readonly causalHistorySize: number;
private readonly acknowledgementCount: number;
private readonly receivedMessageTimeoutEnabled: boolean;
private readonly receivedMessageTimeout: number;
private tasks: Task[] = [];
private handlers: Handlers = {
@ -83,9 +86,10 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
options.receivedMessageTimeoutEnabled ?? false;
this.receivedMessageTimeout =
options.receivedMessageTimeout ?? DEFAULT_RECEIVED_MESSAGE_TIMEOUT;
this.outgoingMessages = new Set();
}
public static getMessageId(payload: Uint8Array): string {
public static getMessageId(payload: Uint8Array): MessageId {
return bytesToHex(sha256(payload));
}
@ -354,6 +358,15 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
return;
}
const isOwnOutgoingMessage =
message.content &&
message.content.length > 0 &&
this.outgoingMessages.has(MessageChannel.getMessageId(message.content));
if (isOwnOutgoingMessage) {
return;
}
if (!message.lamportTimestamp) {
this.deliverMessage(message);
return;
@ -427,6 +440,8 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
const messageId = MessageChannel.getMessageId(payload);
this.outgoingMessages.add(messageId);
const message: Message = {
messageId,
channelId: this.channelId,