From 969330d5cbb349d11d19e81f0b8884afcc34da5f Mon Sep 17 00:00:00 2001 From: Sasha Date: Wed, 8 Oct 2025 00:29:59 +0200 Subject: [PATCH] add unit tests --- .../sdk/src/messaging/ack_manager.spec.ts | 309 ++++++++++++++++ .../sdk/src/messaging/message_store.spec.ts | 349 ++++++++++++++++++ packages/sdk/src/messaging/message_store.ts | 2 +- packages/sdk/src/messaging/messaging.spec.ts | 137 ++----- packages/sdk/src/messaging/sender.spec.ts | 144 ++++++++ 5 files changed, 830 insertions(+), 111 deletions(-) create mode 100644 packages/sdk/src/messaging/ack_manager.spec.ts create mode 100644 packages/sdk/src/messaging/message_store.spec.ts create mode 100644 packages/sdk/src/messaging/sender.spec.ts diff --git a/packages/sdk/src/messaging/ack_manager.spec.ts b/packages/sdk/src/messaging/ack_manager.spec.ts new file mode 100644 index 0000000000..37cfcaefa2 --- /dev/null +++ b/packages/sdk/src/messaging/ack_manager.spec.ts @@ -0,0 +1,309 @@ +import type { + IDecodedMessage, + IFilter, + IStore, + NetworkConfig +} from "@waku/interfaces"; +import { expect } from "chai"; +import { afterEach, beforeEach, describe, it } from "mocha"; +import sinon from "sinon"; + +import { AckManager } from "./ack_manager.js"; +import { MessageStore } from "./message_store.js"; + +const mockMessage: IDecodedMessage = { + version: 1, + payload: new Uint8Array([1, 2, 3]), + contentTopic: "/test/1/topic/proto", + pubsubTopic: "test-pubsub", + timestamp: new Date(), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined, + hash: new Uint8Array([4, 5, 6]), + hashStr: "test-hash-123" +}; + +const mockNetworkConfig: NetworkConfig = { + clusterId: 1, + numShardsInCluster: 8 +}; + +describe("AckManager", () => { + let messageStore: MessageStore; + let mockFilter: IFilter; + let mockStore: IStore; + let ackManager: AckManager; + let clock: sinon.SinonFakeTimers; + + beforeEach(() => { + messageStore = new MessageStore(); + + mockFilter = { + subscribe: sinon.stub().resolves(true), + unsubscribe: sinon.stub().resolves(true) + } as unknown as IFilter; + + mockStore = { + queryWithOrderedCallback: sinon.stub().resolves(undefined) + } as unknown as IStore; + + ackManager = new AckManager({ + messageStore, + filter: mockFilter, + store: mockStore, + networkConfig: mockNetworkConfig + }); + + clock = sinon.useFakeTimers(); + }); + + afterEach(() => { + clock.restore(); + sinon.restore(); + }); + + describe("constructor", () => { + it("should initialize with provided parameters", () => { + expect(ackManager).to.be.instanceOf(AckManager); + }); + }); + + describe("start", () => { + it("should start filter and store ack managers", () => { + ackManager.start(); + + expect(clock.countTimers()).to.equal(1); + }); + + it("should be idempotent", () => { + ackManager.start(); + ackManager.start(); + + expect(clock.countTimers()).to.equal(1); + }); + }); + + describe("stop", () => { + it("should stop filter and store ack managers", async () => { + ackManager.start(); + await ackManager.stop(); + + expect(clock.countTimers()).to.equal(0); + }); + + it("should clear subscribed content topics", async () => { + await ackManager.subscribe("/test/1/clear/proto"); + await ackManager.stop(); + + const result = await ackManager.subscribe("/test/1/clear/proto"); + expect(result).to.be.true; + }); + + it("should handle stop without start", async () => { + await ackManager.stop(); + }); + }); + + describe("subscribe", () => { + it("should subscribe to new content topic", async () => { + const result = await ackManager.subscribe("/test/1/new/proto"); + + expect(result).to.be.true; + expect( + (mockFilter.subscribe as sinon.SinonStub).calledWith( + sinon.match.object, + sinon.match.func + ) + ).to.be.true; + }); + + it("should return true for already subscribed topic", async () => { + await ackManager.subscribe("/test/1/existing/proto"); + const result = await ackManager.subscribe("/test/1/existing/proto"); + + expect(result).to.be.true; + expect((mockFilter.subscribe as sinon.SinonStub).calledOnce).to.be.true; + }); + + it("should return true if at least one subscription succeeds", async () => { + (mockFilter.subscribe as sinon.SinonStub).resolves(false); + + const result = await ackManager.subscribe("/test/1/topic/proto"); + + expect(result).to.be.true; + }); + + it("should return true when filter fails but store succeeds", async () => { + (mockFilter.subscribe as sinon.SinonStub).resolves(false); + + const result = await ackManager.subscribe("/test/1/topic/proto"); + + expect(result).to.be.true; + }); + }); + + describe("FilterAckManager", () => { + beforeEach(() => { + ackManager.start(); + }); + + it("should handle message reception and acknowledgment", async () => { + await ackManager.subscribe("/test/1/topic/proto"); + const onMessageCallback = ( + mockFilter.subscribe as sinon.SinonStub + ).getCall(0).args[1]; + + await onMessageCallback(mockMessage); + + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should not add duplicate messages", async () => { + messageStore.add(mockMessage, { filterAck: false }); + await ackManager.subscribe("/test/1/topic/proto"); + + const onMessageCallback = ( + mockFilter.subscribe as sinon.SinonStub + ).getCall(0).args[1]; + await onMessageCallback(mockMessage); + + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should unsubscribe all decoders on stop", async () => { + await ackManager.subscribe("/test/1/topic1/proto"); + await ackManager.subscribe("/test/1/topic2/proto"); + + await ackManager.stop(); + + expect((mockFilter.unsubscribe as sinon.SinonStub).calledTwice).to.be + .true; + }); + }); + + describe("StoreAckManager", () => { + beforeEach(() => { + ackManager.start(); + }); + + it("should query store periodically", async () => { + await ackManager.subscribe("/test/1/topic/proto"); + + await clock.tickAsync(5000); + + expect( + (mockStore.queryWithOrderedCallback as sinon.SinonStub).calledWith( + sinon.match.array, + sinon.match.func, + sinon.match.object + ) + ).to.be.true; + }); + + it("should handle store query callback", async () => { + await ackManager.subscribe("/test/1/topic/proto"); + + await clock.tickAsync(5000); + + const callback = ( + mockStore.queryWithOrderedCallback as sinon.SinonStub + ).getCall(0).args[1]; + callback(mockMessage); + + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should not add duplicate messages from store", async () => { + messageStore.add(mockMessage, { storeAck: false }); + + await ackManager.subscribe("/test/1/topic/proto"); + await clock.tickAsync(5000); + + const callback = ( + mockStore.queryWithOrderedCallback as sinon.SinonStub + ).getCall(0).args[1]; + callback(mockMessage); + + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should stop interval on stop", async () => { + ackManager.start(); + await ackManager.stop(); + + expect(clock.countTimers()).to.equal(0); + }); + }); + + describe("integration scenarios", () => { + it("should handle complete lifecycle", async () => { + ackManager.start(); + + const result1 = await ackManager.subscribe("/test/1/topic1/proto"); + const result2 = await ackManager.subscribe("/test/1/topic2/proto"); + + expect(result1).to.be.true; + expect(result2).to.be.true; + + await ackManager.stop(); + + expect(clock.countTimers()).to.equal(0); + }); + + it("should handle multiple subscriptions to same topic", async () => { + ackManager.start(); + + const result1 = await ackManager.subscribe("/test/1/same/proto"); + const result2 = await ackManager.subscribe("/test/1/same/proto"); + + expect(result1).to.be.true; + expect(result2).to.be.true; + expect((mockFilter.subscribe as sinon.SinonStub).calledOnce).to.be.true; + }); + + it("should handle subscription after stop", async () => { + ackManager.start(); + await ackManager.stop(); + + const result = await ackManager.subscribe("/test/1/after-stop/proto"); + expect(result).to.be.true; + }); + }); + + describe("error handling", () => { + it("should handle filter subscription errors gracefully", async () => { + (mockFilter.subscribe as sinon.SinonStub).resolves(false); + + const result = await ackManager.subscribe("/test/1/error/proto"); + + expect(result).to.be.true; + }); + + it("should handle store query errors gracefully", async () => { + (mockStore.queryWithOrderedCallback as sinon.SinonStub).rejects( + new Error("Store query error") + ); + + ackManager.start(); + await ackManager.subscribe("/test/1/error/proto"); + + await clock.tickAsync(5000); + }); + + it("should handle unsubscribe errors gracefully", async () => { + ackManager.start(); + await ackManager.subscribe("/test/1/error/proto"); + + (mockFilter.unsubscribe as sinon.SinonStub).rejects( + new Error("Unsubscribe error") + ); + + try { + await ackManager.stop(); + } catch { + // Expected to throw + } + }); + }); +}); diff --git a/packages/sdk/src/messaging/message_store.spec.ts b/packages/sdk/src/messaging/message_store.spec.ts new file mode 100644 index 0000000000..ae5cf5f5c4 --- /dev/null +++ b/packages/sdk/src/messaging/message_store.spec.ts @@ -0,0 +1,349 @@ +import type { IDecodedMessage, ISendMessage } from "@waku/interfaces"; +import { expect } from "chai"; +import { beforeEach, describe, it } from "mocha"; + +import { MessageStore } from "./message_store.js"; + +describe("MessageStore", () => { + let messageStore: MessageStore; + let mockMessage: IDecodedMessage; + let mockSendMessage: ISendMessage; + + beforeEach(() => { + messageStore = new MessageStore(); + mockMessage = { + version: 1, + payload: new Uint8Array([1, 2, 3]), + contentTopic: "test-topic", + pubsubTopic: "test-pubsub", + timestamp: new Date(1000), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined, + hash: new Uint8Array([4, 5, 6]), + hashStr: "test-hash-123" + }; + mockSendMessage = { + contentTopic: "test-topic", + payload: new Uint8Array([7, 8, 9]), + ephemeral: false + }; + }); + + describe("constructor", () => { + it("should create instance with default options", () => { + const store = new MessageStore(); + expect(store).to.be.instanceOf(MessageStore); + }); + + it("should create instance with custom resend interval", () => { + const customInterval = 10000; + const store = new MessageStore({ resendIntervalMs: customInterval }); + expect(store).to.be.instanceOf(MessageStore); + }); + }); + + describe("has", () => { + it("should return false for non-existent message", () => { + expect(messageStore.has("non-existent")).to.be.false; + }); + + it("should return true for added message", () => { + messageStore.add(mockMessage); + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should return true for pending message", async () => { + await messageStore.queue(mockSendMessage); + expect(messageStore.has("pending-hash")).to.be.false; + }); + }); + + describe("add", () => { + it("should add new message with default options", () => { + messageStore.add(mockMessage); + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should add message with custom options", () => { + messageStore.add(mockMessage, { filterAck: true, storeAck: false }); + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should not add duplicate message", () => { + messageStore.add(mockMessage); + messageStore.add(mockMessage); + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should not add message if already exists", () => { + messageStore.add(mockMessage); + messageStore.add(mockMessage); + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + }); + + describe("queue", () => { + it("should queue message and return request ID", async () => { + const requestId = await messageStore.queue(mockSendMessage); + expect(typeof requestId).to.equal("string"); + expect(requestId.length).to.be.greaterThan(0); + }); + + it("should queue multiple messages with different request IDs", async () => { + const requestId1 = await messageStore.queue(mockSendMessage); + const requestId2 = await messageStore.queue(mockSendMessage); + expect(requestId1).to.not.equal(requestId2); + }); + }); + + describe("markFilterAck", () => { + it("should mark filter acknowledgment for existing message", () => { + messageStore.add(mockMessage); + messageStore.markFilterAck(mockMessage.hashStr); + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should handle filter ack for non-existent message", () => { + expect(() => { + messageStore.markFilterAck("non-existent"); + }).to.not.throw(); + }); + + it("should handle filter ack for pending message", async () => { + const requestId = await messageStore.queue(mockSendMessage); + messageStore.markSent(requestId, mockMessage); + messageStore.markFilterAck(mockMessage.hashStr); + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + }); + + describe("markStoreAck", () => { + it("should mark store acknowledgment for existing message", () => { + messageStore.add(mockMessage); + messageStore.markStoreAck(mockMessage.hashStr); + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should handle store ack for non-existent message", () => { + expect(() => { + messageStore.markStoreAck("non-existent"); + }).to.not.throw(); + }); + + it("should handle store ack for pending message", async () => { + const requestId = await messageStore.queue(mockSendMessage); + messageStore.markSent(requestId, mockMessage); + messageStore.markStoreAck(mockMessage.hashStr); + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + }); + + describe("markSent", () => { + it("should mark message as sent with valid request ID", async () => { + const requestId = await messageStore.queue(mockSendMessage); + messageStore.markSent(requestId, mockMessage); + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should handle markSent with invalid request ID", () => { + expect(() => { + messageStore.markSent("invalid-request-id", mockMessage); + }).to.not.throw(); + }); + + it("should handle markSent with request ID without message", async () => { + const requestId = await messageStore.queue(mockSendMessage); + const entry = (messageStore as any).pendingRequests.get(requestId); + if (entry) { + entry.messageRequest = undefined; + } + expect(() => { + messageStore.markSent(requestId, mockMessage); + }).to.not.throw(); + }); + + it("should set lastSentAt timestamp", async () => { + const requestId = await messageStore.queue(mockSendMessage); + const sentMessage = { ...mockMessage, timestamp: new Date(2000) }; + messageStore.markSent(requestId, sentMessage); + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + }); + + describe("getMessagesToSend", () => { + it("should return empty array when no messages queued", () => { + const messages = messageStore.getMessagesToSend(); + expect(messages).to.deep.equal([]); + }); + + it("should return queued messages that need sending", async () => { + const customStore = new MessageStore({ resendIntervalMs: 0 }); + const requestId = await customStore.queue(mockSendMessage); + const messages = customStore.getMessagesToSend(); + expect(messages).to.have.length(1); + expect(messages[0].requestId).to.equal(requestId); + expect(messages[0].message).to.equal(mockSendMessage); + }); + + it("should not return acknowledged messages", async () => { + const requestId = await messageStore.queue(mockSendMessage); + const entry = (messageStore as any).pendingRequests.get(requestId); + if (entry) { + entry.filterAck = true; + } + const messages = messageStore.getMessagesToSend(); + expect(messages).to.have.length(0); + }); + + it("should not return store acknowledged messages", async () => { + const requestId = await messageStore.queue(mockSendMessage); + const entry = (messageStore as any).pendingRequests.get(requestId); + if (entry) { + entry.storeAck = true; + } + const messages = messageStore.getMessagesToSend(); + expect(messages).to.have.length(0); + }); + + it("should respect resend interval", async () => { + const customStore = new MessageStore({ resendIntervalMs: 10000 }); + const requestId = await customStore.queue(mockSendMessage); + + const entry = (customStore as any).pendingRequests.get(requestId); + if (entry) { + entry.lastSentAt = Date.now() - 5000; + } + + const messagesAfterShortTime = customStore.getMessagesToSend(); + expect(messagesAfterShortTime).to.have.length(0); + + if (entry) { + entry.lastSentAt = Date.now() - 15000; + } + + const messagesAfterLongTime = customStore.getMessagesToSend(); + expect(messagesAfterLongTime).to.have.length(1); + }); + + it("should return messages after resend interval", async () => { + const customStore = new MessageStore({ resendIntervalMs: 1000 }); + const requestId = await customStore.queue(mockSendMessage); + + const entry = (customStore as any).pendingRequests.get(requestId); + if (entry) { + entry.lastSentAt = Date.now() - 2000; + } + + const messages = customStore.getMessagesToSend(); + expect(messages).to.have.length(1); + }); + + it("should not return messages without messageRequest", async () => { + const requestId = await messageStore.queue(mockSendMessage); + const entry = (messageStore as any).pendingRequests.get(requestId); + if (entry) { + entry.messageRequest = undefined; + } + const messages = messageStore.getMessagesToSend(); + expect(messages).to.have.length(0); + }); + }); + + describe("edge cases", () => { + it("should handle multiple acknowledgments for same message", () => { + messageStore.add(mockMessage); + messageStore.markFilterAck(mockMessage.hashStr); + messageStore.markStoreAck(mockMessage.hashStr); + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should handle message received before sent", async () => { + messageStore.add(mockMessage); + const requestId = await messageStore.queue(mockSendMessage); + messageStore.markSent(requestId, mockMessage); + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should handle empty message hash", () => { + const emptyHashMessage = { ...mockMessage, hashStr: "" }; + messageStore.add(emptyHashMessage); + expect(messageStore.has("")).to.be.true; + }); + + it("should handle very long message hash", () => { + const longHash = "a".repeat(1000); + const longHashMessage = { ...mockMessage, hashStr: longHash }; + messageStore.add(longHashMessage); + expect(messageStore.has(longHash)).to.be.true; + }); + + it("should handle special characters in hash", () => { + const specialHash = "test-hash-!@#$%^&*()_+-=[]{}|;':\",./<>?"; + const specialHashMessage = { ...mockMessage, hashStr: specialHash }; + messageStore.add(specialHashMessage); + expect(messageStore.has(specialHash)).to.be.true; + }); + }); + + describe("state transitions", () => { + it("should move message from pending to stored on ack", async () => { + const requestId = await messageStore.queue(mockSendMessage); + messageStore.markSent(requestId, mockMessage); + messageStore.markFilterAck(mockMessage.hashStr); + + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + const pendingMessages = (messageStore as any).pendingMessages; + expect(pendingMessages.has(mockMessage.hashStr)).to.be.false; + }); + + it("should merge pending and stored message data", async () => { + messageStore.add(mockMessage, { filterAck: true }); + const requestId = await messageStore.queue(mockSendMessage); + messageStore.markSent(requestId, mockMessage); + messageStore.markStoreAck(mockMessage.hashStr); + + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + + it("should preserve acknowledgment state during transition", async () => { + const requestId = await messageStore.queue(mockSendMessage); + const entry = (messageStore as any).pendingRequests.get(requestId); + if (entry) { + entry.filterAck = true; + } + messageStore.markSent(requestId, mockMessage); + messageStore.markStoreAck(mockMessage.hashStr); + + expect(messageStore.has(mockMessage.hashStr)).to.be.true; + }); + }); + + describe("timing edge cases", () => { + it("should handle zero timestamp", async () => { + const zeroTimeMessage = { ...mockMessage, timestamp: new Date(0) }; + const requestId = await messageStore.queue(mockSendMessage); + expect(() => { + messageStore.markSent(requestId, zeroTimeMessage); + }).to.not.throw(); + }); + + it("should handle future timestamp", async () => { + const futureTime = new Date(Date.now() + 86400000); + const futureMessage = { ...mockMessage, timestamp: futureTime }; + const requestId = await messageStore.queue(mockSendMessage); + expect(() => { + messageStore.markSent(requestId, futureMessage); + }).to.not.throw(); + }); + + it("should handle very old timestamp", async () => { + const oldTime = new Date(0); + const oldMessage = { ...mockMessage, timestamp: oldTime }; + const requestId = await messageStore.queue(mockSendMessage); + expect(() => { + messageStore.markSent(requestId, oldMessage); + }).to.not.throw(); + }); + }); +}); diff --git a/packages/sdk/src/messaging/message_store.ts b/packages/sdk/src/messaging/message_store.ts index f065b75651..3d57482252 100644 --- a/packages/sdk/src/messaging/message_store.ts +++ b/packages/sdk/src/messaging/message_store.ts @@ -70,7 +70,7 @@ export class MessageStore { } public async queue(message: ISendMessage): Promise { - const requestId = uuidv4(); + const requestId = uuidv4(); // cspell:ignore uuidv4 this.pendingRequests.set(requestId.toString(), { messageRequest: message, diff --git a/packages/sdk/src/messaging/messaging.spec.ts b/packages/sdk/src/messaging/messaging.spec.ts index 209336634c..667110a300 100644 --- a/packages/sdk/src/messaging/messaging.spec.ts +++ b/packages/sdk/src/messaging/messaging.spec.ts @@ -1,48 +1,40 @@ -import { createDecoder, createEncoder } from "@waku/core"; import type { - IDecodedMessage, - IDecoder, - IEncoder, IFilter, ILightPush, - IMessage, + ISendMessage, IStore } from "@waku/interfaces"; -import { createRoutingInfo } from "@waku/utils"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; import sinon from "sinon"; -import { - FilterAckManager, - MessageStore, - Messaging, - StoreAckManager -} from "./messaging.js"; +import { MessageStore } from "./message_store.js"; +import { Messaging } from "./messaging.js"; const testContentTopic = "/test/1/waku-messaging/utf8"; const testNetworkconfig = { clusterId: 0, numShardsInCluster: 9 }; -const testRoutingInfo = createRoutingInfo(testNetworkconfig, { - contentTopic: testContentTopic -}); describe("MessageStore", () => { it("queues, marks sent and acks", async () => { - const encoder = createEncoder({ - contentTopic: testContentTopic, - routingInfo: testRoutingInfo - }); const store = new MessageStore({ resendIntervalMs: 1 }); - const msg: IMessage = { payload: utf8ToBytes("hello") }; + const msg: ISendMessage = { + contentTopic: testContentTopic, + payload: utf8ToBytes("hello") + }; - const hash = await store.queue(encoder as IEncoder, msg); + const hash = await store.queue(msg); expect(hash).to.be.a("string"); if (!hash) return; - expect(store.has(hash)).to.be.true; - store.markSent(hash); + + const mockDecodedMessage = { + hashStr: hash, + timestamp: new Date() + } as any; + + store.markSent(hash, mockDecodedMessage); store.markFilterAck(hash); store.markStoreAck(hash); @@ -50,94 +42,9 @@ describe("MessageStore", () => { expect(toSend.length).to.eq(0); }); }); -describe("FilterAckManager", () => { - it("subscribes and marks filter ack on messages", async () => { - const store = new MessageStore(); - const filter: IFilter = { - multicodec: "filter", - start: sinon.stub().resolves(), - stop: sinon.stub().resolves(), - subscribe: sinon.stub().callsFake(async (_dec, cb: any) => { - const decoder = createDecoder(testContentTopic, testRoutingInfo); - const proto = await decoder.fromProtoObj(decoder.pubsubTopic, { - payload: utf8ToBytes("x"), - contentTopic: testContentTopic, - version: 0, - timestamp: BigInt(Date.now()), - meta: undefined, - rateLimitProof: undefined, - ephemeral: false - } as any); - if (proto) { - await cb({ ...proto, hashStr: "hash" } as IDecodedMessage); - } - return true; - }), - unsubscribe: sinon.stub().resolves(true), - unsubscribeAll: sinon.stub() - } as unknown as IFilter; - - const mgr = new FilterAckManager(store, filter); - const encoder = createEncoder({ - contentTopic: testContentTopic, - routingInfo: testRoutingInfo - }); - - const subscribed = await mgr.subscribe({ - ...encoder, - fromWireToProtoObj: (b: Uint8Array) => - createDecoder(testContentTopic, testRoutingInfo).fromWireToProtoObj(b), - fromProtoObj: (pubsub: string, p: any) => - createDecoder(testContentTopic, testRoutingInfo).fromProtoObj(pubsub, p) - } as unknown as IDecoder & IEncoder); - expect(subscribed).to.be.true; - }); -}); - -describe("StoreAckManager", () => { - it("queries and marks store ack", async () => { - const store = new MessageStore(); - const decoder = createDecoder(testContentTopic, testRoutingInfo); - const d = decoder as IDecoder & IEncoder; - - const mockStore: IStore = { - multicodec: "store", - createCursor: sinon.stub() as any, - queryGenerator: sinon.stub() as any, - queryWithOrderedCallback: sinon - .stub() - .callsFake(async (_decs: any, cb: any) => { - const proto = await decoder.fromProtoObj(decoder.pubsubTopic, { - payload: utf8ToBytes("x"), - contentTopic: testContentTopic, - version: 0, - timestamp: BigInt(Date.now()), - meta: undefined, - rateLimitProof: undefined, - ephemeral: false - } as any); - if (proto) { - await cb({ ...proto, hashStr: "hash2" }); - } - }), - queryWithPromiseCallback: sinon.stub() as any - } as unknown as IStore; - - const mgr = new StoreAckManager(store, mockStore); - await mgr.subscribe(d); - mgr.start(); - await new Promise((r) => setTimeout(r, 5)); - mgr.stop(); - }); -}); describe("Messaging", () => { it("queues and sends via light push, marks sent", async () => { - const encoder = createEncoder({ - contentTopic: testContentTopic, - routingInfo: testRoutingInfo - }); - const lightPush: ILightPush = { multicodec: "lightpush", start: () => {}, @@ -162,9 +69,19 @@ describe("Messaging", () => { queryWithPromiseCallback: sinon.stub().resolves() } as unknown as IStore; - const messaging = new Messaging({ lightPush, filter, store }); + const messaging = new Messaging({ + lightPush, + filter, + store, + networkConfig: testNetworkconfig + }); - await messaging.send(encoder, { payload: utf8ToBytes("hello") }); + const message: ISendMessage = { + contentTopic: testContentTopic, + payload: utf8ToBytes("hello") + }; + + await messaging.send(message); expect((lightPush.send as any).calledOnce).to.be.true; }); }); diff --git a/packages/sdk/src/messaging/sender.spec.ts b/packages/sdk/src/messaging/sender.spec.ts new file mode 100644 index 0000000000..4f9082d019 --- /dev/null +++ b/packages/sdk/src/messaging/sender.spec.ts @@ -0,0 +1,144 @@ +import type { ILightPush, ISendMessage, NetworkConfig } from "@waku/interfaces"; +import { expect } from "chai"; +import { afterEach, beforeEach, describe, it } from "mocha"; +import sinon from "sinon"; + +import type { AckManager } from "./ack_manager.js"; +import type { MessageStore } from "./message_store.js"; +import { Sender } from "./sender.js"; + +describe("Sender", () => { + let sender: Sender; + let mockMessageStore: MessageStore; + let mockLightPush: ILightPush; + let mockAckManager: AckManager; + let mockNetworkConfig: NetworkConfig; + + beforeEach(() => { + mockMessageStore = { + queue: sinon.stub(), + getMessagesToSend: sinon.stub(), + markSent: sinon.stub() + } as any; + + mockLightPush = { + send: sinon.stub() + } as any; + + mockAckManager = { + subscribe: sinon.stub() + } as any; + + mockNetworkConfig = { + clusterId: 1, + shardId: 0 + } as any; + + sender = new Sender({ + messageStore: mockMessageStore, + lightPush: mockLightPush, + ackManager: mockAckManager, + networkConfig: mockNetworkConfig + }); + }); + + afterEach(() => { + sinon.restore(); + }); + + describe("constructor", () => { + it("should initialize with provided parameters", () => { + expect(sender).to.be.instanceOf(Sender); + }); + }); + + describe("start", () => { + it("should set up background sending interval", () => { + const setIntervalSpy = sinon.spy(global, "setInterval"); + + sender.start(); + + expect(setIntervalSpy.calledWith(sinon.match.func, 1000)).to.be.true; + }); + + it("should create multiple intervals when called multiple times", () => { + const setIntervalSpy = sinon.spy(global, "setInterval"); + + sender.start(); + sender.start(); + + expect(setIntervalSpy.calledTwice).to.be.true; + }); + }); + + describe("stop", () => { + it("should clear interval when called", () => { + const clearIntervalSpy = sinon.spy(global, "clearInterval"); + + sender.start(); + sender.stop(); + + expect(clearIntervalSpy.called).to.be.true; + }); + + it("should handle multiple stop calls gracefully", () => { + const clearIntervalSpy = sinon.spy(global, "clearInterval"); + + sender.start(); + sender.stop(); + sender.stop(); + + expect(clearIntervalSpy.calledOnce).to.be.true; + }); + + it("should handle stop without start", () => { + expect(() => sender.stop()).to.not.throw(); + }); + }); + + describe("send", () => { + const mockMessage: ISendMessage = { + contentTopic: "test-topic", + payload: new Uint8Array([1, 2, 3]), + ephemeral: false + }; + + const mockRequestId = "test-request-id"; + + it("should handle messageStore.queue failure", async () => { + const error = new Error("Queue failed"); + (mockMessageStore.queue as sinon.SinonStub).rejects(error); + + try { + await sender.send(mockMessage); + expect.fail("Expected error to be thrown"); + } catch (e: any) { + expect(e).to.equal(error); + } + }); + + it("should handle ackManager.subscribe failure", async () => { + const error = new Error("Subscribe failed"); + (mockAckManager.subscribe as sinon.SinonStub).rejects(error); + (mockMessageStore.queue as sinon.SinonStub).resolves(mockRequestId); + + try { + await sender.send(mockMessage); + expect.fail("Expected error to be thrown"); + } catch (e: any) { + expect(e).to.equal(error); + } + }); + }); + + describe("backgroundSend", () => { + it("should handle empty pending messages", async () => { + (mockMessageStore.getMessagesToSend as sinon.SinonStub).returns([]); + + await sender["backgroundSend"](); + + expect((mockMessageStore.getMessagesToSend as sinon.SinonStub).called).to + .be.true; + }); + }); +});