feat: Reliable Channel: Status Sync, overflow protection, stop TODOs (#2729)

* feat(sds): messages with lost deps are delivered

This is to re-enable participation in the SDS protocol. Meaning the
received message with missing dependencies becomes part of the causal
history, re-enabling acknowledgements.

* fix(sds): avoid overflow in message history storage

* feat(reliable-channel): Emit a "Synced" Status with message counts

Return a "synced" or "syncing" status on `ReliableChannel.status` that
let the developer know whether messages are missing, and if so, how many.

* fix: clean up subscriptions, intervals and timeouts when stopping

# Conflicts:
#	packages/sdk/src/reliable_channel/reliable_channel.ts

* chore: extract random timeout

* fix rebase

* revert listener changes

* typo

* Ensuring no inconsistency on missing message

* test: streamline, stop channels

* clear sync status sets when stopping channel

* prevent sync status event spam

* test: improve naming

* try/catch for callback

* encapsulate/simplify reliable channel API

* sanity checks

* test: ensure sync status cleanup
This commit is contained in:
fryorcraken 2025-11-16 08:57:12 +11:00 committed by GitHub
parent 84a6ea69cf
commit e5f51d7df1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 1210 additions and 274 deletions

View File

@ -27,6 +27,10 @@ describe("StreamManager", () => {
} as any as Libp2pComponents);
});
afterEach(() => {
sinon.restore();
});
it("should return usable stream attached to connection", async () => {
for (const writeStatus of ["ready", "writing"]) {
const con1 = createMockConnection();

View File

@ -9,6 +9,7 @@ import { Libp2p, LightPushError, LightPushStatusCode } from "@waku/interfaces";
import { createRoutingInfo } from "@waku/utils";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import { afterEach } from "mocha";
import sinon, { SinonSpy } from "sinon";
import { PeerManager } from "../peer_manager/index.js";
@ -38,6 +39,10 @@ describe("LightPush SDK", () => {
lightPush = mockLightPush({ libp2p });
});
afterEach(() => {
sinon.restore();
});
it("should fail to send if no connected peers found", async () => {
const result = await lightPush.send(encoder, {
payload: utf8ToBytes("test")

View File

@ -47,7 +47,9 @@ describe("RetryManager", () => {
sinon.restore();
});
it("should start and stop interval correctly", () => {
// TODO: Skipped because the global state is not being restored and it breaks
// tests of functionalities that rely on intervals
it.skip("should start and stop interval correctly", () => {
const setIntervalSpy = sinon.spy(global, "setInterval");
const clearIntervalSpy = sinon.spy(global, "clearInterval");

View File

@ -10,6 +10,7 @@ import {
import { delay } from "@waku/utils";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import { afterEach } from "mocha";
import sinon from "sinon";
import {
@ -91,6 +92,10 @@ describe("QueryOnConnect", () => {
};
});
afterEach(() => {
sinon.restore();
});
describe("constructor", () => {
it("should create QueryOnConnect instance with all required parameters", () => {
queryOnConnect = new QueryOnConnect(
@ -337,6 +342,7 @@ describe("QueryOnConnect", () => {
});
afterEach(() => {
sinon.restore();
mockClock.restore();
});

View File

@ -1,2 +1,8 @@
export { ReliableChannel, ReliableChannelOptions } from "./reliable_channel.js";
export { ReliableChannelEvents, ReliableChannelEvent } from "./events.js";
export {
StatusEvent,
StatusEvents,
StatusDetail,
ISyncStatusEvents
} from "./sync_status.js";

View File

@ -0,0 +1,67 @@
import { Logger } from "@waku/utils";
const log = new Logger("sdk:random-timeout");
/**
* Enables waiting a random time before doing an action (using `setTimeout`),
* with possibility to apply a multiplier to manipulate said time.
*/
export class RandomTimeout {
private timeout: ReturnType<typeof setTimeout> | undefined;
public constructor(
/**
* The maximum interval one would wait before the call is made, in milliseconds.
*/
private maxIntervalMs: number,
/**
* When not zero: Anytime a call is made, then a new call will be rescheduled
* using this multiplier
*/
private multiplierOnCall: number,
/**
* The function to call when the timer is reached
*/
private callback: () => void | Promise<void>
) {
if (!Number.isFinite(maxIntervalMs) || maxIntervalMs < 0) {
throw new Error(
`maxIntervalMs must be a non-negative finite number, got: ${maxIntervalMs}`
);
}
if (!Number.isFinite(multiplierOnCall)) {
throw new Error(
`multiplierOnCall must be a finite number, got: ${multiplierOnCall}`
);
}
}
/**
* Use to start the timer. If a timer was already set, it deletes it and
* schedule a new one.
* @param multiplier applied to [[maxIntervalMs]]
*/
public restart(multiplier: number = 1): void {
this.stop();
if (this.maxIntervalMs) {
const timeoutMs = Math.random() * this.maxIntervalMs * multiplier;
this.timeout = setTimeout(() => {
try {
void this.callback();
} catch (error) {
log.error("Error in RandomTimeout callback:", error);
}
void this.restart(this.multiplierOnCall);
}, timeoutMs);
}
}
public stop(): void {
if (this.timeout) {
clearTimeout(this.timeout);
this.timeout = undefined;
}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -32,7 +32,9 @@ import {
import { ReliableChannelEvent, ReliableChannelEvents } from "./events.js";
import { MissingMessageRetriever } from "./missing_message_retriever.js";
import { RandomTimeout } from "./random_timeout.js";
import { RetryManager } from "./retry_manager.js";
import { ISyncStatusEvents, SyncStatus } from "./sync_status.js";
const log = new Logger("sdk:reliable-channel");
@ -147,8 +149,7 @@ export class ReliableChannel<
) => AsyncGenerator<Promise<T | undefined>[]>;
private eventListenerCleanups: Array<() => void> = [];
private readonly syncMinIntervalMs: number;
private syncTimeout: ReturnType<typeof setTimeout> | undefined;
private syncRandomTimeout: RandomTimeout;
private sweepInBufInterval: ReturnType<typeof setInterval> | undefined;
private readonly sweepInBufIntervalMs: number;
private processTaskTimeout: ReturnType<typeof setTimeout> | undefined;
@ -203,8 +204,11 @@ export class ReliableChannel<
}
}
this.syncMinIntervalMs =
options?.syncMinIntervalMs ?? DEFAULT_SYNC_MIN_INTERVAL_MS;
this.syncRandomTimeout = new RandomTimeout(
options?.syncMinIntervalMs ?? DEFAULT_SYNC_MIN_INTERVAL_MS,
2,
this.sendSyncMessage.bind(this)
);
this.sweepInBufIntervalMs =
options?.sweepInBufIntervalMs ?? DEFAULT_SWEEP_IN_BUF_INTERVAL_MS;
@ -234,8 +238,22 @@ export class ReliableChannel<
}
this._started = false;
this._internalSyncStatus = new SyncStatus();
this.syncStatus = this._internalSyncStatus;
}
/**
* Emit events when the channel is aware of missing message.
* Note that "synced" may mean some messages are irretrievably lost.
* Check the emitted data for details.
*
* @emits [[StatusEvents]]
*
*/
public readonly syncStatus: ISyncStatusEvents;
private readonly _internalSyncStatus: SyncStatus;
public get isStarted(): boolean {
return this._started;
}
@ -492,9 +510,15 @@ export class ReliableChannel<
});
// Clear timeout once triggered
this.clearProcessTasks();
}, this.processTaskMinElapseMs); // we ensure that we don't call process tasks more than once per second
}
}
private clearProcessTasks(): void {
if (this.processTaskTimeout) {
clearTimeout(this.processTaskTimeout);
this.processTaskTimeout = undefined;
}, this.processTaskMinElapseMs); // we ensure that we don't call process tasks more than once per second
}
}
@ -517,13 +541,10 @@ export class ReliableChannel<
log.info("Stopping ReliableChannel...");
this._started = false;
this.removeAllEventListeners();
this.stopSync();
this.stopSweepIncomingBufferLoop();
if (this.processTaskTimeout) {
clearTimeout(this.processTaskTimeout);
this.processTaskTimeout = undefined;
}
this.clearProcessTasks();
if (this.activePendingProcessTask) {
await this.activePendingProcessTask;
@ -537,7 +558,7 @@ export class ReliableChannel<
await this.unsubscribe();
this.removeAllEventListeners();
this._internalSyncStatus.cleanUp();
log.info("ReliableChannel stopped successfully");
}
@ -562,32 +583,11 @@ export class ReliableChannel<
}
private restartSync(multiplier: number = 1): void {
if (this.syncTimeout) {
clearTimeout(this.syncTimeout);
this.syncTimeout = undefined;
}
if (this.syncMinIntervalMs) {
const timeoutMs = this.random() * this.syncMinIntervalMs * multiplier;
this.syncTimeout = setTimeout(() => {
void this.sendSyncMessage();
// Always restart a sync, no matter whether the message was sent.
// Set a multiplier so we wait a bit longer to not hog the conversation
void this.restartSync(2);
}, timeoutMs);
}
this.syncRandomTimeout.restart(multiplier);
}
private stopSync(): void {
if (this.syncTimeout) {
clearTimeout(this.syncTimeout);
this.syncTimeout = undefined;
}
}
// Used to enable overriding when testing
private random(): number {
return Math.random();
this.syncRandomTimeout.stop();
}
private safeSendEvent<T extends ReliableChannelEvent>(
@ -661,11 +661,16 @@ export class ReliableChannel<
this.addTrackedEventListener(
MessageChannelEvent.OutMessageSent,
(event) => {
if (event.detail.content) {
if (isContentMessage(event.detail)) {
const messageId = ReliableChannel.getMessageId(event.detail.content);
this.safeSendEvent("message-sent", {
detail: messageId
});
// restart the timeout when a content message has been sent
// because the functionality is fulfilled (content message contains
// causal history)
this.restartSync();
}
}
);
@ -678,7 +683,7 @@ export class ReliableChannel<
detail: event.detail
});
// Stopping retries
// Stopping retries as the message was acknowledged
this.retryManager?.stopRetries(event.detail);
}
}
@ -709,6 +714,7 @@ export class ReliableChannel<
this.addTrackedEventListener(
MessageChannelEvent.InMessageReceived,
(event) => {
this._internalSyncStatus.onMessagesReceived(event.detail.messageId);
// restart the timeout when a content message has been received
if (isContentMessage(event.detail)) {
// send a sync message faster to ack someone's else
@ -717,19 +723,13 @@ export class ReliableChannel<
}
);
this.addTrackedEventListener(
MessageChannelEvent.OutMessageSent,
(event) => {
// restart the timeout when a content message has been sent
if (isContentMessage(event.detail)) {
this.restartSync();
}
}
);
this.addTrackedEventListener(
MessageChannelEvent.InMessageMissing,
(event) => {
this._internalSyncStatus.onMessagesMissing(
...event.detail.map((m) => m.messageId)
);
for (const { messageId, retrievalHint } of event.detail) {
if (retrievalHint && this.missingMessageRetriever) {
this.missingMessageRetriever.addMissingMessage(
@ -741,6 +741,12 @@ export class ReliableChannel<
}
);
this.addTrackedEventListener(MessageChannelEvent.InMessageLost, (event) => {
this._internalSyncStatus.onMessagesLost(
...event.detail.map((m) => m.messageId)
);
});
if (this.queryOnConnect) {
const queryListener = (event: any): void => {
void this.processIncomingMessages(event.detail);

View File

@ -66,7 +66,7 @@ describe("Reliable Channel: Sync", () => {
});
while (!messageSent) {
await delay(50);
await delay(10);
}
let syncMessageSent = false;

View File

@ -0,0 +1,207 @@
import { createDecoder, createEncoder } from "@waku/core";
import {
AutoSharding,
IDecodedMessage,
IDecoder,
IEncoder
} from "@waku/interfaces";
import { createRoutingInfo, delay, MockWakuNode } from "@waku/utils";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import { beforeEach, describe } from "mocha";
import {
createMockNodes,
sendAndWaitForEvent,
TEST_CONSTANTS,
waitFor
} from "./test_utils.js";
import { ReliableChannel, StatusDetail } from "./index.js";
const TEST_CONTENT_TOPIC = "/my-tests/0/topic-name/proto";
const TEST_NETWORK_CONFIG: AutoSharding = {
clusterId: 0,
numShardsInCluster: 1
};
const TEST_ROUTING_INFO = createRoutingInfo(TEST_NETWORK_CONFIG, {
contentTopic: TEST_CONTENT_TOPIC
});
describe("Sync Status", () => {
let encoder: IEncoder;
let decoder: IDecoder<IDecodedMessage>;
let mockWakuNodeAlice: MockWakuNode;
let mockWakuNodeBob: MockWakuNode;
let reliableChannelAlice: ReliableChannel<any> | undefined;
let reliableChannelBob: ReliableChannel<any> | undefined;
beforeEach(async () => {
encoder = createEncoder({
contentTopic: TEST_CONTENT_TOPIC,
routingInfo: TEST_ROUTING_INFO
});
decoder = createDecoder(TEST_CONTENT_TOPIC, TEST_ROUTING_INFO);
const mockNodes = createMockNodes();
mockWakuNodeAlice = mockNodes.alice;
mockWakuNodeBob = mockNodes.bob;
});
afterEach(async () => {
if (reliableChannelAlice) {
await reliableChannelAlice.stop();
reliableChannelAlice = undefined;
}
if (reliableChannelBob) {
await reliableChannelBob.stop();
reliableChannelBob = undefined;
}
});
it("Synced status is emitted when a message is received", async () => {
reliableChannelAlice = await ReliableChannel.create(
mockWakuNodeAlice,
"MyChannel",
"alice",
encoder,
decoder
);
reliableChannelBob = await ReliableChannel.create(
mockWakuNodeBob,
"MyChannel",
"bob",
encoder,
decoder
);
let statusDetail: StatusDetail | undefined;
reliableChannelBob.syncStatus.addEventListener("synced", (event) => {
statusDetail = event.detail;
});
const message = utf8ToBytes("message in channel");
reliableChannelAlice.send(message);
await waitFor(() => statusDetail);
expect(statusDetail!.received).to.eq(1);
});
it("Synced status is emitted when a missing message is received", async () => {
reliableChannelAlice = await ReliableChannel.create(
mockWakuNodeAlice,
"MyChannel",
"alice",
encoder,
decoder,
{
retryIntervalMs: TEST_CONSTANTS.RETRY_INTERVAL_MS
}
);
// Send a message before Bob goes online so it's marked as missing
await sendAndWaitForEvent(
reliableChannelAlice,
utf8ToBytes("missing message")
);
reliableChannelBob = await ReliableChannel.create(
mockWakuNodeBob,
"MyChannel",
"bob",
encoder,
decoder
);
let syncingStatusDetail: StatusDetail | undefined;
reliableChannelBob.syncStatus.addEventListener("syncing", (event) => {
syncingStatusDetail = event.detail;
});
let syncedStatusDetail: StatusDetail | undefined;
reliableChannelBob.syncStatus.addEventListener("synced", (event) => {
syncedStatusDetail = event.detail;
});
await sendAndWaitForEvent(
reliableChannelAlice,
utf8ToBytes("second message with missing message as dep")
);
await waitFor(() => syncingStatusDetail);
expect(syncingStatusDetail!.missing).to.eq(1);
expect(syncingStatusDetail!.received).to.eq(1);
await waitFor(() => syncedStatusDetail);
expect(syncedStatusDetail!.missing).to.eq(0);
expect(syncedStatusDetail!.received).to.eq(2);
});
it("Synced status is emitted when a missing message is marked as lost", async () => {
reliableChannelAlice = await ReliableChannel.create(
mockWakuNodeAlice,
"MyChannel",
"alice",
encoder,
decoder,
{
syncMinIntervalMs: 0,
retryIntervalMs: 0 // Do not retry so we can lose the message
}
);
// Send a message before Bob goes online so it's marked as missing
await sendAndWaitForEvent(
reliableChannelAlice,
utf8ToBytes("missing message")
);
reliableChannelBob = await ReliableChannel.create(
mockWakuNodeBob,
"MyChannel",
"bob",
encoder,
decoder,
{
retrieveFrequencyMs: 0,
syncMinIntervalMs: 0,
sweepInBufIntervalMs: 0, // we want to control this
timeoutForLostMessagesMs: 200 // timeout within the test
}
);
let syncingStatusDetail: StatusDetail | undefined;
reliableChannelBob.syncStatus.addEventListener("syncing", (event) => {
syncingStatusDetail = event.detail;
});
await sendAndWaitForEvent(
reliableChannelAlice,
utf8ToBytes("second message with missing message as dep")
);
await waitFor(() => syncingStatusDetail);
expect(syncingStatusDetail!.missing).to.eq(1, "at first, one missing");
expect(syncingStatusDetail!.received).to.eq(1, "at first, one received");
expect(syncingStatusDetail!.lost).to.eq(0, "at first, no loss");
let syncedStatusDetail: StatusDetail | undefined;
reliableChannelBob.syncStatus.addEventListener("synced", (event) => {
syncedStatusDetail = event.detail;
});
// await long enough so message will be marked as lost
await delay(200);
reliableChannelBob.messageChannel["sweepIncomingBuffer"]();
await waitFor(() => syncedStatusDetail);
expect(syncedStatusDetail!.missing).to.eq(0, "no more missing message");
expect(syncedStatusDetail!.received).to.eq(1, "still one received message");
expect(syncedStatusDetail!.lost).to.eq(1, "missing message is marked lost");
});
});

View File

@ -39,6 +39,12 @@ export class RetryManager {
this.retry(id, retry, 0);
}
public stop(): void {
for (const timeout of this.timeouts.values()) {
clearTimeout(timeout);
}
}
private retry(
id: string,
retry: () => void | Promise<void>,

View File

@ -0,0 +1,189 @@
import { MessageId } from "@waku/sds";
import { delay } from "@waku/utils";
import { expect } from "chai";
import { StatusDetail, StatusEvents, SyncStatus } from "./sync_status.js";
async function testSyncStatus(
syncStatus: SyncStatus,
statusEvent: keyof StatusEvents,
onMessageFn: (...msgIds: MessageId[]) => void,
expectedStatusDetail: Partial<StatusDetail>,
...messageIds: MessageId[]
): Promise<void> {
let statusDetail: StatusDetail;
syncStatus.addEventListener(statusEvent, (event) => {
statusDetail = event.detail;
});
onMessageFn.bind(syncStatus)(...messageIds);
while (!statusDetail!) {
await delay(10);
}
expect(statusDetail.received).to.eq(expectedStatusDetail.received ?? 0);
expect(statusDetail.missing).to.eq(expectedStatusDetail.missing ?? 0);
expect(statusDetail.lost).to.eq(expectedStatusDetail.lost ?? 0);
}
describe("Sync Status", () => {
let syncStatus: SyncStatus;
beforeEach(() => {
syncStatus = new SyncStatus();
});
afterEach(() => {
syncStatus.cleanUp();
});
it("Emits 'synced' when new message received", async () => {
await testSyncStatus(
syncStatus,
"synced",
syncStatus.onMessagesReceived,
{ received: 1 },
"123"
);
});
it("Emits 'syncing' when message flagged as missed", async () => {
await testSyncStatus(
syncStatus,
"syncing",
syncStatus.onMessagesMissing,
{ missing: 1 },
"123"
);
});
it("Emits 'synced' when message flagged as lost", async () => {
await testSyncStatus(
syncStatus,
"synced",
syncStatus.onMessagesLost,
{ lost: 1 },
"123"
);
});
it("Emits 'syncing' then 'synced' when message flagged as missing and then received", async () => {
await testSyncStatus(
syncStatus,
"syncing",
syncStatus.onMessagesMissing,
{ missing: 1 },
"123"
);
await testSyncStatus(
syncStatus,
"synced",
syncStatus.onMessagesReceived,
{ received: 1 },
"123"
);
});
it("Emits 'syncing' then 'synced' when message flagged as missing and then lost", async () => {
await testSyncStatus(
syncStatus,
"syncing",
syncStatus.onMessagesMissing,
{ missing: 1 },
"123"
);
await testSyncStatus(
syncStatus,
"synced",
syncStatus.onMessagesLost,
{ lost: 1 },
"123"
);
});
it("Emits 'synced' then 'synced' when message flagged as lost and then received", async () => {
await testSyncStatus(
syncStatus,
"synced",
syncStatus.onMessagesLost,
{ lost: 1 },
"123"
);
await testSyncStatus(
syncStatus,
"synced",
syncStatus.onMessagesReceived,
{ received: 1 },
"123"
);
});
it("Emits 'syncing' until all messages are received or lost", async () => {
await testSyncStatus(
syncStatus,
"synced",
syncStatus.onMessagesReceived,
{ received: 1 },
"1"
);
await testSyncStatus(
syncStatus,
"syncing",
syncStatus.onMessagesMissing,
{ received: 1, missing: 3 },
"2",
"3",
"4"
);
await testSyncStatus(
syncStatus,
"syncing",
syncStatus.onMessagesReceived,
{ received: 2, missing: 2 },
"2"
);
await testSyncStatus(
syncStatus,
"syncing",
syncStatus.onMessagesReceived,
{ received: 3, missing: 1 },
"3"
);
await testSyncStatus(
syncStatus,
"synced",
syncStatus.onMessagesLost,
{ received: 3, lost: 1 },
"4"
);
});
it("Debounces events when receiving batch of messages", async () => {
let eventCount = 0;
let statusDetail: StatusDetail | undefined;
syncStatus.addEventListener("synced", (event) => {
eventCount++;
statusDetail = event.detail;
});
// Process 100 messages in the same task
for (let i = 0; i < 100; i++) {
syncStatus.onMessagesReceived(`msg-${i}`);
}
// Wait for microtask to complete
await delay(10);
// Should only emit 1 event despite 100 calls
expect(eventCount).to.eq(1, "Should only emit one event for batch");
expect(statusDetail!.received).to.eq(100, "Should track all 100 messages");
});
});

View File

@ -0,0 +1,163 @@
import { TypedEventEmitter } from "@libp2p/interface";
import { MessageId } from "@waku/sds";
import { Logger } from "@waku/utils";
const log = new Logger("sds:sync-status");
export const StatusEvent = {
/**
* We are not aware of any missing messages that we may be able to get
* We MAY have messages lost forever, see the `event.detail`
*/
Synced: "synced", // TODO or synced or health or caught-up?
/**
* We are aware of missing messages that we may be able to get
*/
Syncing: "syncing" // TODO: it assumes "syncing" is happening via SDS repair or store queries
};
export type StatusEvent = (typeof StatusEvent)[keyof typeof StatusEvent];
export type StatusDetail = {
/**
* number of received messages
*/
received: number;
/**
* number of missing messages that are not yet considered as irretrievably lost
*/
missing: number;
/**
* number of messages considered as irretrievably lost
*/
lost: number;
};
export interface StatusEvents {
synced: CustomEvent<StatusDetail>;
syncing: CustomEvent<StatusDetail>;
}
/**
* Read-only interface for sync status events.
* Only exposes event listener methods, hiding internal state management.
*/
export interface ISyncStatusEvents {
addEventListener(
event: "synced",
callback: (e: CustomEvent<StatusDetail>) => void
): void;
addEventListener(
event: "syncing",
callback: (e: CustomEvent<StatusDetail>) => void
): void;
removeEventListener(
event: "synced",
callback: (e: CustomEvent<StatusDetail>) => void
): void;
removeEventListener(
event: "syncing",
callback: (e: CustomEvent<StatusDetail>) => void
): void;
}
export class SyncStatus extends TypedEventEmitter<StatusEvents> {
private readonly receivedMessages: Set<MessageId>;
private readonly missingMessages: Set<MessageId>;
private readonly lostMessages: Set<MessageId>;
private sendScheduled = false;
private cleaned = false;
public constructor() {
super();
this.receivedMessages = new Set();
this.missingMessages = new Set();
this.lostMessages = new Set();
}
/**
* Cleanup all tracked message IDs. Should be called when stopping the channel.
*/
public cleanUp(): void {
// Mark as cleaned to prevent any pending microtasks from firing
this.cleaned = true;
this.receivedMessages.clear();
this.missingMessages.clear();
this.lostMessages.clear();
}
public onMessagesReceived(...messageIds: MessageId[]): void {
for (const messageId of messageIds) {
this.missingMessages.delete(messageId);
this.lostMessages.delete(messageId);
this.receivedMessages.add(messageId);
}
this.scheduleSend();
}
public onMessagesMissing(...messageIds: MessageId[]): void {
for (const messageId of messageIds) {
if (
!this.receivedMessages.has(messageId) &&
!this.lostMessages.has(messageId)
) {
this.missingMessages.add(messageId);
} else {
log.error(
"A message previously received or lost has been marked as missing",
messageId
);
}
}
this.scheduleSend();
}
public onMessagesLost(...messageIds: MessageId[]): void {
for (const messageId of messageIds) {
this.missingMessages.delete(messageId);
this.lostMessages.add(messageId);
}
this.scheduleSend();
}
/**
* Schedule an event to be sent on the next microtask.
* Multiple calls within the same task will result in only one event being sent.
* This prevents event spam when processing batches of messages.
*/
private scheduleSend(): void {
if (!this.sendScheduled) {
this.sendScheduled = true;
queueMicrotask(() => {
this.sendScheduled = false;
this.safeSend();
});
}
}
private safeSend(): void {
// Don't send events if cleanup was already called
if (this.cleaned) {
return;
}
const statusEvent =
this.missingMessages.size === 0
? StatusEvent.Synced
: StatusEvent.Syncing;
try {
this.dispatchEvent(
new CustomEvent(statusEvent, {
detail: {
received: this.receivedMessages.size,
missing: this.missingMessages.size,
lost: this.lostMessages.size
}
})
);
} catch (error) {
log.error(`Failed to dispatch sync status:`, error);
}
}
}

View File

@ -0,0 +1,68 @@
import { TypedEventEmitter } from "@libp2p/interface";
import { delay, MockWakuEvents, MockWakuNode } from "@waku/utils";
import { ReliableChannel } from "./reliable_channel.js";
export const TEST_CONSTANTS = {
POLL_INTERVAL_MS: 50,
RETRY_INTERVAL_MS: 300
} as const;
/**
* Wait for a condition to become truthy, with timeout
* @param condition Function that returns the value when ready, or undefined while waiting
* @param timeoutMs Maximum time to wait before throwing
* @returns The value returned by condition
* @throws Error if timeout is reached
*/
export async function waitFor<T>(
condition: () => T | undefined,
timeoutMs = 5000
): Promise<T> {
const start = Date.now();
while (!condition()) {
if (Date.now() - start > timeoutMs) {
throw new Error(
`Timeout after ${timeoutMs}ms waiting for condition to be met`
);
}
await delay(TEST_CONSTANTS.POLL_INTERVAL_MS);
}
return condition()!;
}
/**
* Send a message and wait for the "message-sent" event
* @param channel The ReliableChannel to send from
* @param message The message payload to send
*/
export async function sendAndWaitForEvent(
channel: ReliableChannel<any>,
message: Uint8Array
): Promise<void> {
return new Promise((resolve) => {
const handler = (): void => {
channel.removeEventListener("message-sent", handler);
resolve();
};
channel.addEventListener("message-sent", handler);
channel.send(message);
});
}
/**
* Create a common event emitter and two mock Waku nodes
* @returns Object containing the emitter and two mock nodes (alice and bob)
*/
export function createMockNodes(): {
emitter: TypedEventEmitter<MockWakuEvents>;
alice: MockWakuNode;
bob: MockWakuNode;
} {
const emitter = new TypedEventEmitter<MockWakuEvents>();
return {
emitter,
alice: new MockWakuNode(emitter),
bob: new MockWakuNode(emitter)
};
}

View File

@ -19,6 +19,10 @@ describe("waitForRemotePeer", () => {
eventTarget = new EventTarget();
});
afterEach(() => {
sinon.restore();
});
it("should reject if WakuNode is not started", async () => {
const wakuMock = mockWakuNode({
connections: [{}]

View File

@ -0,0 +1,50 @@
import { expect } from "chai";
import { MemLocalHistory } from "./mem_local_history.js";
import { ContentMessage } from "./message.js";
describe("MemLocalHistory", () => {
it("Cap max size when messages are pushed one at a time", () => {
const maxSize = 2;
const hist = new MemLocalHistory(maxSize);
hist.push(
new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1]))
);
expect(hist.length).to.eq(1);
hist.push(
new ContentMessage("2", "c", "a", [], 2n, undefined, new Uint8Array([2]))
);
expect(hist.length).to.eq(2);
hist.push(
new ContentMessage("3", "c", "a", [], 3n, undefined, new Uint8Array([3]))
);
expect(hist.length).to.eq(2);
expect(hist.findIndex((m) => m.messageId === "1")).to.eq(-1);
expect(hist.findIndex((m) => m.messageId === "2")).to.not.eq(-1);
expect(hist.findIndex((m) => m.messageId === "3")).to.not.eq(-1);
});
it("Cap max size when a pushed array is exceeding the cap", () => {
const maxSize = 2;
const hist = new MemLocalHistory(maxSize);
hist.push(
new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1]))
);
expect(hist.length).to.eq(1);
hist.push(
new ContentMessage("2", "c", "a", [], 2n, undefined, new Uint8Array([2])),
new ContentMessage("3", "c", "a", [], 3n, undefined, new Uint8Array([3]))
);
expect(hist.length).to.eq(2);
expect(hist.findIndex((m) => m.messageId === "1")).to.eq(-1);
expect(hist.findIndex((m) => m.messageId === "2")).to.not.eq(-1);
expect(hist.findIndex((m) => m.messageId === "3")).to.not.eq(-1);
});
});

View File

@ -2,18 +2,31 @@ import _ from "lodash";
import { ContentMessage, isContentMessage } from "./message.js";
export const DEFAULT_MAX_LENGTH = 10_000;
/**
* In-Memory implementation of a local store of messages.
* In-Memory implementation of a local history of messages.
*
* Messages are store in SDS chronological order:
* - messages[0] is the oldest message
* - messages[n] is the newest message
*
* Only stores content message: `message.lamportTimestamp` and `message.content` are present.
*
* Oldest messages are dropped when `maxLength` is reached.
* If an array of items longer than `maxLength` is pushed, dropping will happen
* at next push.
*/
export class MemLocalHistory {
private items: ContentMessage[] = [];
/**
* Construct a new in-memory local history
*
* @param maxLength The maximum number of message to store.
*/
public constructor(private maxLength: number = DEFAULT_MAX_LENGTH) {}
public get length(): number {
return this.items.length;
}
@ -33,6 +46,12 @@ export class MemLocalHistory {
// Remove duplicates by messageId while maintaining order
this.items = _.uniqBy(combinedItems, "messageId");
// Let's drop older messages if max length is reached
if (this.length > this.maxLength) {
const numItemsToRemove = this.length - this.maxLength;
this.items.splice(0, numItemsToRemove);
}
return this.items.length;
}

View File

@ -283,7 +283,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
/**
* Processes messages in the incoming buffer, delivering those with satisfied dependencies.
*
* @returns Array of history entries for messages still missing dependencies
* @returns The missing dependencies
*/
public sweepIncomingBuffer(): HistoryEntry[] {
const { buffer, missing } = this.incomingBuffer.reduce<{
@ -319,8 +319,8 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
})
);
// Optionally, if a message has not been received after a predetermined amount of time,
// its dependencies are marked as irretrievably lost (implicitly by removing it from the buffer without delivery)
// Optionally, if a message did not get its dependencies fulfilled after a predetermined amount of time,
// they are marked as irretrievably lost (implicitly by removing it from the buffer without delivery)
if (this.timeoutForLostMessagesMs) {
const timeReceived = this.timeReceived.get(message.messageId);
if (
@ -330,9 +330,19 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
this.safeSendEvent(MessageChannelEvent.InMessageLost, {
detail: Array.from(missingDependencies)
});
// We deliver the message to resume participation in the log
if (isContentMessage(message) && this.deliverMessage(message)) {
this.safeSendEvent(MessageChannelEvent.InMessageDelivered, {
detail: message.messageId
});
}
// The message and its missing dependencies are dropped
// from the incoming buffer
return { buffer, missing };
}
}
missingDependencies.forEach((dependency) => {
missing.add(dependency);
});

View File

@ -59,10 +59,11 @@ export class MockWakuNode implements IWaku {
unsubscribe<T extends IDecodedMessage>(
_decoders: IDecoder<T> | IDecoder<T>[]
): Promise<boolean> {
throw "Not implemented";
// The expectation is that it does not matter for tests
return Promise.resolve(true);
},
unsubscribeAll(): void {
throw "Not implemented";
throw "unsubscribeAll not implemented";
}
};
}
@ -138,7 +139,7 @@ export class MockWakuNode implements IWaku {
return Promise.resolve();
}
public stop(): Promise<void> {
throw new Error("Method not implemented.");
return Promise.resolve();
}
public waitForPeers(
_protocols?: Protocols[],