mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-01-04 23:03:07 +00:00
add unit tests
This commit is contained in:
parent
82be279331
commit
969330d5cb
309
packages/sdk/src/messaging/ack_manager.spec.ts
Normal file
309
packages/sdk/src/messaging/ack_manager.spec.ts
Normal file
@ -0,0 +1,309 @@
|
||||
import type {
|
||||
IDecodedMessage,
|
||||
IFilter,
|
||||
IStore,
|
||||
NetworkConfig
|
||||
} from "@waku/interfaces";
|
||||
import { expect } from "chai";
|
||||
import { afterEach, beforeEach, describe, it } from "mocha";
|
||||
import sinon from "sinon";
|
||||
|
||||
import { AckManager } from "./ack_manager.js";
|
||||
import { MessageStore } from "./message_store.js";
|
||||
|
||||
const mockMessage: IDecodedMessage = {
|
||||
version: 1,
|
||||
payload: new Uint8Array([1, 2, 3]),
|
||||
contentTopic: "/test/1/topic/proto",
|
||||
pubsubTopic: "test-pubsub",
|
||||
timestamp: new Date(),
|
||||
rateLimitProof: undefined,
|
||||
ephemeral: false,
|
||||
meta: undefined,
|
||||
hash: new Uint8Array([4, 5, 6]),
|
||||
hashStr: "test-hash-123"
|
||||
};
|
||||
|
||||
const mockNetworkConfig: NetworkConfig = {
|
||||
clusterId: 1,
|
||||
numShardsInCluster: 8
|
||||
};
|
||||
|
||||
describe("AckManager", () => {
|
||||
let messageStore: MessageStore;
|
||||
let mockFilter: IFilter;
|
||||
let mockStore: IStore;
|
||||
let ackManager: AckManager;
|
||||
let clock: sinon.SinonFakeTimers;
|
||||
|
||||
beforeEach(() => {
|
||||
messageStore = new MessageStore();
|
||||
|
||||
mockFilter = {
|
||||
subscribe: sinon.stub().resolves(true),
|
||||
unsubscribe: sinon.stub().resolves(true)
|
||||
} as unknown as IFilter;
|
||||
|
||||
mockStore = {
|
||||
queryWithOrderedCallback: sinon.stub().resolves(undefined)
|
||||
} as unknown as IStore;
|
||||
|
||||
ackManager = new AckManager({
|
||||
messageStore,
|
||||
filter: mockFilter,
|
||||
store: mockStore,
|
||||
networkConfig: mockNetworkConfig
|
||||
});
|
||||
|
||||
clock = sinon.useFakeTimers();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
clock.restore();
|
||||
sinon.restore();
|
||||
});
|
||||
|
||||
describe("constructor", () => {
|
||||
it("should initialize with provided parameters", () => {
|
||||
expect(ackManager).to.be.instanceOf(AckManager);
|
||||
});
|
||||
});
|
||||
|
||||
describe("start", () => {
|
||||
it("should start filter and store ack managers", () => {
|
||||
ackManager.start();
|
||||
|
||||
expect(clock.countTimers()).to.equal(1);
|
||||
});
|
||||
|
||||
it("should be idempotent", () => {
|
||||
ackManager.start();
|
||||
ackManager.start();
|
||||
|
||||
expect(clock.countTimers()).to.equal(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe("stop", () => {
|
||||
it("should stop filter and store ack managers", async () => {
|
||||
ackManager.start();
|
||||
await ackManager.stop();
|
||||
|
||||
expect(clock.countTimers()).to.equal(0);
|
||||
});
|
||||
|
||||
it("should clear subscribed content topics", async () => {
|
||||
await ackManager.subscribe("/test/1/clear/proto");
|
||||
await ackManager.stop();
|
||||
|
||||
const result = await ackManager.subscribe("/test/1/clear/proto");
|
||||
expect(result).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle stop without start", async () => {
|
||||
await ackManager.stop();
|
||||
});
|
||||
});
|
||||
|
||||
describe("subscribe", () => {
|
||||
it("should subscribe to new content topic", async () => {
|
||||
const result = await ackManager.subscribe("/test/1/new/proto");
|
||||
|
||||
expect(result).to.be.true;
|
||||
expect(
|
||||
(mockFilter.subscribe as sinon.SinonStub).calledWith(
|
||||
sinon.match.object,
|
||||
sinon.match.func
|
||||
)
|
||||
).to.be.true;
|
||||
});
|
||||
|
||||
it("should return true for already subscribed topic", async () => {
|
||||
await ackManager.subscribe("/test/1/existing/proto");
|
||||
const result = await ackManager.subscribe("/test/1/existing/proto");
|
||||
|
||||
expect(result).to.be.true;
|
||||
expect((mockFilter.subscribe as sinon.SinonStub).calledOnce).to.be.true;
|
||||
});
|
||||
|
||||
it("should return true if at least one subscription succeeds", async () => {
|
||||
(mockFilter.subscribe as sinon.SinonStub).resolves(false);
|
||||
|
||||
const result = await ackManager.subscribe("/test/1/topic/proto");
|
||||
|
||||
expect(result).to.be.true;
|
||||
});
|
||||
|
||||
it("should return true when filter fails but store succeeds", async () => {
|
||||
(mockFilter.subscribe as sinon.SinonStub).resolves(false);
|
||||
|
||||
const result = await ackManager.subscribe("/test/1/topic/proto");
|
||||
|
||||
expect(result).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
describe("FilterAckManager", () => {
|
||||
beforeEach(() => {
|
||||
ackManager.start();
|
||||
});
|
||||
|
||||
it("should handle message reception and acknowledgment", async () => {
|
||||
await ackManager.subscribe("/test/1/topic/proto");
|
||||
const onMessageCallback = (
|
||||
mockFilter.subscribe as sinon.SinonStub
|
||||
).getCall(0).args[1];
|
||||
|
||||
await onMessageCallback(mockMessage);
|
||||
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should not add duplicate messages", async () => {
|
||||
messageStore.add(mockMessage, { filterAck: false });
|
||||
await ackManager.subscribe("/test/1/topic/proto");
|
||||
|
||||
const onMessageCallback = (
|
||||
mockFilter.subscribe as sinon.SinonStub
|
||||
).getCall(0).args[1];
|
||||
await onMessageCallback(mockMessage);
|
||||
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should unsubscribe all decoders on stop", async () => {
|
||||
await ackManager.subscribe("/test/1/topic1/proto");
|
||||
await ackManager.subscribe("/test/1/topic2/proto");
|
||||
|
||||
await ackManager.stop();
|
||||
|
||||
expect((mockFilter.unsubscribe as sinon.SinonStub).calledTwice).to.be
|
||||
.true;
|
||||
});
|
||||
});
|
||||
|
||||
describe("StoreAckManager", () => {
|
||||
beforeEach(() => {
|
||||
ackManager.start();
|
||||
});
|
||||
|
||||
it("should query store periodically", async () => {
|
||||
await ackManager.subscribe("/test/1/topic/proto");
|
||||
|
||||
await clock.tickAsync(5000);
|
||||
|
||||
expect(
|
||||
(mockStore.queryWithOrderedCallback as sinon.SinonStub).calledWith(
|
||||
sinon.match.array,
|
||||
sinon.match.func,
|
||||
sinon.match.object
|
||||
)
|
||||
).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle store query callback", async () => {
|
||||
await ackManager.subscribe("/test/1/topic/proto");
|
||||
|
||||
await clock.tickAsync(5000);
|
||||
|
||||
const callback = (
|
||||
mockStore.queryWithOrderedCallback as sinon.SinonStub
|
||||
).getCall(0).args[1];
|
||||
callback(mockMessage);
|
||||
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should not add duplicate messages from store", async () => {
|
||||
messageStore.add(mockMessage, { storeAck: false });
|
||||
|
||||
await ackManager.subscribe("/test/1/topic/proto");
|
||||
await clock.tickAsync(5000);
|
||||
|
||||
const callback = (
|
||||
mockStore.queryWithOrderedCallback as sinon.SinonStub
|
||||
).getCall(0).args[1];
|
||||
callback(mockMessage);
|
||||
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should stop interval on stop", async () => {
|
||||
ackManager.start();
|
||||
await ackManager.stop();
|
||||
|
||||
expect(clock.countTimers()).to.equal(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe("integration scenarios", () => {
|
||||
it("should handle complete lifecycle", async () => {
|
||||
ackManager.start();
|
||||
|
||||
const result1 = await ackManager.subscribe("/test/1/topic1/proto");
|
||||
const result2 = await ackManager.subscribe("/test/1/topic2/proto");
|
||||
|
||||
expect(result1).to.be.true;
|
||||
expect(result2).to.be.true;
|
||||
|
||||
await ackManager.stop();
|
||||
|
||||
expect(clock.countTimers()).to.equal(0);
|
||||
});
|
||||
|
||||
it("should handle multiple subscriptions to same topic", async () => {
|
||||
ackManager.start();
|
||||
|
||||
const result1 = await ackManager.subscribe("/test/1/same/proto");
|
||||
const result2 = await ackManager.subscribe("/test/1/same/proto");
|
||||
|
||||
expect(result1).to.be.true;
|
||||
expect(result2).to.be.true;
|
||||
expect((mockFilter.subscribe as sinon.SinonStub).calledOnce).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle subscription after stop", async () => {
|
||||
ackManager.start();
|
||||
await ackManager.stop();
|
||||
|
||||
const result = await ackManager.subscribe("/test/1/after-stop/proto");
|
||||
expect(result).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
describe("error handling", () => {
|
||||
it("should handle filter subscription errors gracefully", async () => {
|
||||
(mockFilter.subscribe as sinon.SinonStub).resolves(false);
|
||||
|
||||
const result = await ackManager.subscribe("/test/1/error/proto");
|
||||
|
||||
expect(result).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle store query errors gracefully", async () => {
|
||||
(mockStore.queryWithOrderedCallback as sinon.SinonStub).rejects(
|
||||
new Error("Store query error")
|
||||
);
|
||||
|
||||
ackManager.start();
|
||||
await ackManager.subscribe("/test/1/error/proto");
|
||||
|
||||
await clock.tickAsync(5000);
|
||||
});
|
||||
|
||||
it("should handle unsubscribe errors gracefully", async () => {
|
||||
ackManager.start();
|
||||
await ackManager.subscribe("/test/1/error/proto");
|
||||
|
||||
(mockFilter.unsubscribe as sinon.SinonStub).rejects(
|
||||
new Error("Unsubscribe error")
|
||||
);
|
||||
|
||||
try {
|
||||
await ackManager.stop();
|
||||
} catch {
|
||||
// Expected to throw
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
349
packages/sdk/src/messaging/message_store.spec.ts
Normal file
349
packages/sdk/src/messaging/message_store.spec.ts
Normal file
@ -0,0 +1,349 @@
|
||||
import type { IDecodedMessage, ISendMessage } from "@waku/interfaces";
|
||||
import { expect } from "chai";
|
||||
import { beforeEach, describe, it } from "mocha";
|
||||
|
||||
import { MessageStore } from "./message_store.js";
|
||||
|
||||
describe("MessageStore", () => {
|
||||
let messageStore: MessageStore;
|
||||
let mockMessage: IDecodedMessage;
|
||||
let mockSendMessage: ISendMessage;
|
||||
|
||||
beforeEach(() => {
|
||||
messageStore = new MessageStore();
|
||||
mockMessage = {
|
||||
version: 1,
|
||||
payload: new Uint8Array([1, 2, 3]),
|
||||
contentTopic: "test-topic",
|
||||
pubsubTopic: "test-pubsub",
|
||||
timestamp: new Date(1000),
|
||||
rateLimitProof: undefined,
|
||||
ephemeral: false,
|
||||
meta: undefined,
|
||||
hash: new Uint8Array([4, 5, 6]),
|
||||
hashStr: "test-hash-123"
|
||||
};
|
||||
mockSendMessage = {
|
||||
contentTopic: "test-topic",
|
||||
payload: new Uint8Array([7, 8, 9]),
|
||||
ephemeral: false
|
||||
};
|
||||
});
|
||||
|
||||
describe("constructor", () => {
|
||||
it("should create instance with default options", () => {
|
||||
const store = new MessageStore();
|
||||
expect(store).to.be.instanceOf(MessageStore);
|
||||
});
|
||||
|
||||
it("should create instance with custom resend interval", () => {
|
||||
const customInterval = 10000;
|
||||
const store = new MessageStore({ resendIntervalMs: customInterval });
|
||||
expect(store).to.be.instanceOf(MessageStore);
|
||||
});
|
||||
});
|
||||
|
||||
describe("has", () => {
|
||||
it("should return false for non-existent message", () => {
|
||||
expect(messageStore.has("non-existent")).to.be.false;
|
||||
});
|
||||
|
||||
it("should return true for added message", () => {
|
||||
messageStore.add(mockMessage);
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should return true for pending message", async () => {
|
||||
await messageStore.queue(mockSendMessage);
|
||||
expect(messageStore.has("pending-hash")).to.be.false;
|
||||
});
|
||||
});
|
||||
|
||||
describe("add", () => {
|
||||
it("should add new message with default options", () => {
|
||||
messageStore.add(mockMessage);
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should add message with custom options", () => {
|
||||
messageStore.add(mockMessage, { filterAck: true, storeAck: false });
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should not add duplicate message", () => {
|
||||
messageStore.add(mockMessage);
|
||||
messageStore.add(mockMessage);
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should not add message if already exists", () => {
|
||||
messageStore.add(mockMessage);
|
||||
messageStore.add(mockMessage);
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
describe("queue", () => {
|
||||
it("should queue message and return request ID", async () => {
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
expect(typeof requestId).to.equal("string");
|
||||
expect(requestId.length).to.be.greaterThan(0);
|
||||
});
|
||||
|
||||
it("should queue multiple messages with different request IDs", async () => {
|
||||
const requestId1 = await messageStore.queue(mockSendMessage);
|
||||
const requestId2 = await messageStore.queue(mockSendMessage);
|
||||
expect(requestId1).to.not.equal(requestId2);
|
||||
});
|
||||
});
|
||||
|
||||
describe("markFilterAck", () => {
|
||||
it("should mark filter acknowledgment for existing message", () => {
|
||||
messageStore.add(mockMessage);
|
||||
messageStore.markFilterAck(mockMessage.hashStr);
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle filter ack for non-existent message", () => {
|
||||
expect(() => {
|
||||
messageStore.markFilterAck("non-existent");
|
||||
}).to.not.throw();
|
||||
});
|
||||
|
||||
it("should handle filter ack for pending message", async () => {
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
messageStore.markSent(requestId, mockMessage);
|
||||
messageStore.markFilterAck(mockMessage.hashStr);
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
describe("markStoreAck", () => {
|
||||
it("should mark store acknowledgment for existing message", () => {
|
||||
messageStore.add(mockMessage);
|
||||
messageStore.markStoreAck(mockMessage.hashStr);
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle store ack for non-existent message", () => {
|
||||
expect(() => {
|
||||
messageStore.markStoreAck("non-existent");
|
||||
}).to.not.throw();
|
||||
});
|
||||
|
||||
it("should handle store ack for pending message", async () => {
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
messageStore.markSent(requestId, mockMessage);
|
||||
messageStore.markStoreAck(mockMessage.hashStr);
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
describe("markSent", () => {
|
||||
it("should mark message as sent with valid request ID", async () => {
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
messageStore.markSent(requestId, mockMessage);
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle markSent with invalid request ID", () => {
|
||||
expect(() => {
|
||||
messageStore.markSent("invalid-request-id", mockMessage);
|
||||
}).to.not.throw();
|
||||
});
|
||||
|
||||
it("should handle markSent with request ID without message", async () => {
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
const entry = (messageStore as any).pendingRequests.get(requestId);
|
||||
if (entry) {
|
||||
entry.messageRequest = undefined;
|
||||
}
|
||||
expect(() => {
|
||||
messageStore.markSent(requestId, mockMessage);
|
||||
}).to.not.throw();
|
||||
});
|
||||
|
||||
it("should set lastSentAt timestamp", async () => {
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
const sentMessage = { ...mockMessage, timestamp: new Date(2000) };
|
||||
messageStore.markSent(requestId, sentMessage);
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
describe("getMessagesToSend", () => {
|
||||
it("should return empty array when no messages queued", () => {
|
||||
const messages = messageStore.getMessagesToSend();
|
||||
expect(messages).to.deep.equal([]);
|
||||
});
|
||||
|
||||
it("should return queued messages that need sending", async () => {
|
||||
const customStore = new MessageStore({ resendIntervalMs: 0 });
|
||||
const requestId = await customStore.queue(mockSendMessage);
|
||||
const messages = customStore.getMessagesToSend();
|
||||
expect(messages).to.have.length(1);
|
||||
expect(messages[0].requestId).to.equal(requestId);
|
||||
expect(messages[0].message).to.equal(mockSendMessage);
|
||||
});
|
||||
|
||||
it("should not return acknowledged messages", async () => {
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
const entry = (messageStore as any).pendingRequests.get(requestId);
|
||||
if (entry) {
|
||||
entry.filterAck = true;
|
||||
}
|
||||
const messages = messageStore.getMessagesToSend();
|
||||
expect(messages).to.have.length(0);
|
||||
});
|
||||
|
||||
it("should not return store acknowledged messages", async () => {
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
const entry = (messageStore as any).pendingRequests.get(requestId);
|
||||
if (entry) {
|
||||
entry.storeAck = true;
|
||||
}
|
||||
const messages = messageStore.getMessagesToSend();
|
||||
expect(messages).to.have.length(0);
|
||||
});
|
||||
|
||||
it("should respect resend interval", async () => {
|
||||
const customStore = new MessageStore({ resendIntervalMs: 10000 });
|
||||
const requestId = await customStore.queue(mockSendMessage);
|
||||
|
||||
const entry = (customStore as any).pendingRequests.get(requestId);
|
||||
if (entry) {
|
||||
entry.lastSentAt = Date.now() - 5000;
|
||||
}
|
||||
|
||||
const messagesAfterShortTime = customStore.getMessagesToSend();
|
||||
expect(messagesAfterShortTime).to.have.length(0);
|
||||
|
||||
if (entry) {
|
||||
entry.lastSentAt = Date.now() - 15000;
|
||||
}
|
||||
|
||||
const messagesAfterLongTime = customStore.getMessagesToSend();
|
||||
expect(messagesAfterLongTime).to.have.length(1);
|
||||
});
|
||||
|
||||
it("should return messages after resend interval", async () => {
|
||||
const customStore = new MessageStore({ resendIntervalMs: 1000 });
|
||||
const requestId = await customStore.queue(mockSendMessage);
|
||||
|
||||
const entry = (customStore as any).pendingRequests.get(requestId);
|
||||
if (entry) {
|
||||
entry.lastSentAt = Date.now() - 2000;
|
||||
}
|
||||
|
||||
const messages = customStore.getMessagesToSend();
|
||||
expect(messages).to.have.length(1);
|
||||
});
|
||||
|
||||
it("should not return messages without messageRequest", async () => {
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
const entry = (messageStore as any).pendingRequests.get(requestId);
|
||||
if (entry) {
|
||||
entry.messageRequest = undefined;
|
||||
}
|
||||
const messages = messageStore.getMessagesToSend();
|
||||
expect(messages).to.have.length(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe("edge cases", () => {
|
||||
it("should handle multiple acknowledgments for same message", () => {
|
||||
messageStore.add(mockMessage);
|
||||
messageStore.markFilterAck(mockMessage.hashStr);
|
||||
messageStore.markStoreAck(mockMessage.hashStr);
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle message received before sent", async () => {
|
||||
messageStore.add(mockMessage);
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
messageStore.markSent(requestId, mockMessage);
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle empty message hash", () => {
|
||||
const emptyHashMessage = { ...mockMessage, hashStr: "" };
|
||||
messageStore.add(emptyHashMessage);
|
||||
expect(messageStore.has("")).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle very long message hash", () => {
|
||||
const longHash = "a".repeat(1000);
|
||||
const longHashMessage = { ...mockMessage, hashStr: longHash };
|
||||
messageStore.add(longHashMessage);
|
||||
expect(messageStore.has(longHash)).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle special characters in hash", () => {
|
||||
const specialHash = "test-hash-!@#$%^&*()_+-=[]{}|;':\",./<>?";
|
||||
const specialHashMessage = { ...mockMessage, hashStr: specialHash };
|
||||
messageStore.add(specialHashMessage);
|
||||
expect(messageStore.has(specialHash)).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
describe("state transitions", () => {
|
||||
it("should move message from pending to stored on ack", async () => {
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
messageStore.markSent(requestId, mockMessage);
|
||||
messageStore.markFilterAck(mockMessage.hashStr);
|
||||
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
const pendingMessages = (messageStore as any).pendingMessages;
|
||||
expect(pendingMessages.has(mockMessage.hashStr)).to.be.false;
|
||||
});
|
||||
|
||||
it("should merge pending and stored message data", async () => {
|
||||
messageStore.add(mockMessage, { filterAck: true });
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
messageStore.markSent(requestId, mockMessage);
|
||||
messageStore.markStoreAck(mockMessage.hashStr);
|
||||
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should preserve acknowledgment state during transition", async () => {
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
const entry = (messageStore as any).pendingRequests.get(requestId);
|
||||
if (entry) {
|
||||
entry.filterAck = true;
|
||||
}
|
||||
messageStore.markSent(requestId, mockMessage);
|
||||
messageStore.markStoreAck(mockMessage.hashStr);
|
||||
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
describe("timing edge cases", () => {
|
||||
it("should handle zero timestamp", async () => {
|
||||
const zeroTimeMessage = { ...mockMessage, timestamp: new Date(0) };
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
expect(() => {
|
||||
messageStore.markSent(requestId, zeroTimeMessage);
|
||||
}).to.not.throw();
|
||||
});
|
||||
|
||||
it("should handle future timestamp", async () => {
|
||||
const futureTime = new Date(Date.now() + 86400000);
|
||||
const futureMessage = { ...mockMessage, timestamp: futureTime };
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
expect(() => {
|
||||
messageStore.markSent(requestId, futureMessage);
|
||||
}).to.not.throw();
|
||||
});
|
||||
|
||||
it("should handle very old timestamp", async () => {
|
||||
const oldTime = new Date(0);
|
||||
const oldMessage = { ...mockMessage, timestamp: oldTime };
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
expect(() => {
|
||||
messageStore.markSent(requestId, oldMessage);
|
||||
}).to.not.throw();
|
||||
});
|
||||
});
|
||||
});
|
||||
@ -70,7 +70,7 @@ export class MessageStore {
|
||||
}
|
||||
|
||||
public async queue(message: ISendMessage): Promise<RequestId> {
|
||||
const requestId = uuidv4();
|
||||
const requestId = uuidv4(); // cspell:ignore uuidv4
|
||||
|
||||
this.pendingRequests.set(requestId.toString(), {
|
||||
messageRequest: message,
|
||||
|
||||
@ -1,48 +1,40 @@
|
||||
import { createDecoder, createEncoder } from "@waku/core";
|
||||
import type {
|
||||
IDecodedMessage,
|
||||
IDecoder,
|
||||
IEncoder,
|
||||
IFilter,
|
||||
ILightPush,
|
||||
IMessage,
|
||||
ISendMessage,
|
||||
IStore
|
||||
} from "@waku/interfaces";
|
||||
import { createRoutingInfo } from "@waku/utils";
|
||||
import { utf8ToBytes } from "@waku/utils/bytes";
|
||||
import { expect } from "chai";
|
||||
import sinon from "sinon";
|
||||
|
||||
import {
|
||||
FilterAckManager,
|
||||
MessageStore,
|
||||
Messaging,
|
||||
StoreAckManager
|
||||
} from "./messaging.js";
|
||||
import { MessageStore } from "./message_store.js";
|
||||
import { Messaging } from "./messaging.js";
|
||||
|
||||
const testContentTopic = "/test/1/waku-messaging/utf8";
|
||||
const testNetworkconfig = {
|
||||
clusterId: 0,
|
||||
numShardsInCluster: 9
|
||||
};
|
||||
const testRoutingInfo = createRoutingInfo(testNetworkconfig, {
|
||||
contentTopic: testContentTopic
|
||||
});
|
||||
|
||||
describe("MessageStore", () => {
|
||||
it("queues, marks sent and acks", async () => {
|
||||
const encoder = createEncoder({
|
||||
contentTopic: testContentTopic,
|
||||
routingInfo: testRoutingInfo
|
||||
});
|
||||
const store = new MessageStore({ resendIntervalMs: 1 });
|
||||
const msg: IMessage = { payload: utf8ToBytes("hello") };
|
||||
const msg: ISendMessage = {
|
||||
contentTopic: testContentTopic,
|
||||
payload: utf8ToBytes("hello")
|
||||
};
|
||||
|
||||
const hash = await store.queue(encoder as IEncoder, msg);
|
||||
const hash = await store.queue(msg);
|
||||
expect(hash).to.be.a("string");
|
||||
if (!hash) return;
|
||||
expect(store.has(hash)).to.be.true;
|
||||
store.markSent(hash);
|
||||
|
||||
const mockDecodedMessage = {
|
||||
hashStr: hash,
|
||||
timestamp: new Date()
|
||||
} as any;
|
||||
|
||||
store.markSent(hash, mockDecodedMessage);
|
||||
store.markFilterAck(hash);
|
||||
store.markStoreAck(hash);
|
||||
|
||||
@ -50,94 +42,9 @@ describe("MessageStore", () => {
|
||||
expect(toSend.length).to.eq(0);
|
||||
});
|
||||
});
|
||||
describe("FilterAckManager", () => {
|
||||
it("subscribes and marks filter ack on messages", async () => {
|
||||
const store = new MessageStore();
|
||||
const filter: IFilter = {
|
||||
multicodec: "filter",
|
||||
start: sinon.stub().resolves(),
|
||||
stop: sinon.stub().resolves(),
|
||||
subscribe: sinon.stub().callsFake(async (_dec, cb: any) => {
|
||||
const decoder = createDecoder(testContentTopic, testRoutingInfo);
|
||||
const proto = await decoder.fromProtoObj(decoder.pubsubTopic, {
|
||||
payload: utf8ToBytes("x"),
|
||||
contentTopic: testContentTopic,
|
||||
version: 0,
|
||||
timestamp: BigInt(Date.now()),
|
||||
meta: undefined,
|
||||
rateLimitProof: undefined,
|
||||
ephemeral: false
|
||||
} as any);
|
||||
if (proto) {
|
||||
await cb({ ...proto, hashStr: "hash" } as IDecodedMessage);
|
||||
}
|
||||
return true;
|
||||
}),
|
||||
unsubscribe: sinon.stub().resolves(true),
|
||||
unsubscribeAll: sinon.stub()
|
||||
} as unknown as IFilter;
|
||||
|
||||
const mgr = new FilterAckManager(store, filter);
|
||||
const encoder = createEncoder({
|
||||
contentTopic: testContentTopic,
|
||||
routingInfo: testRoutingInfo
|
||||
});
|
||||
|
||||
const subscribed = await mgr.subscribe({
|
||||
...encoder,
|
||||
fromWireToProtoObj: (b: Uint8Array) =>
|
||||
createDecoder(testContentTopic, testRoutingInfo).fromWireToProtoObj(b),
|
||||
fromProtoObj: (pubsub: string, p: any) =>
|
||||
createDecoder(testContentTopic, testRoutingInfo).fromProtoObj(pubsub, p)
|
||||
} as unknown as IDecoder<IDecodedMessage> & IEncoder);
|
||||
expect(subscribed).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
describe("StoreAckManager", () => {
|
||||
it("queries and marks store ack", async () => {
|
||||
const store = new MessageStore();
|
||||
const decoder = createDecoder(testContentTopic, testRoutingInfo);
|
||||
const d = decoder as IDecoder<IDecodedMessage> & IEncoder;
|
||||
|
||||
const mockStore: IStore = {
|
||||
multicodec: "store",
|
||||
createCursor: sinon.stub() as any,
|
||||
queryGenerator: sinon.stub() as any,
|
||||
queryWithOrderedCallback: sinon
|
||||
.stub()
|
||||
.callsFake(async (_decs: any, cb: any) => {
|
||||
const proto = await decoder.fromProtoObj(decoder.pubsubTopic, {
|
||||
payload: utf8ToBytes("x"),
|
||||
contentTopic: testContentTopic,
|
||||
version: 0,
|
||||
timestamp: BigInt(Date.now()),
|
||||
meta: undefined,
|
||||
rateLimitProof: undefined,
|
||||
ephemeral: false
|
||||
} as any);
|
||||
if (proto) {
|
||||
await cb({ ...proto, hashStr: "hash2" });
|
||||
}
|
||||
}),
|
||||
queryWithPromiseCallback: sinon.stub() as any
|
||||
} as unknown as IStore;
|
||||
|
||||
const mgr = new StoreAckManager(store, mockStore);
|
||||
await mgr.subscribe(d);
|
||||
mgr.start();
|
||||
await new Promise((r) => setTimeout(r, 5));
|
||||
mgr.stop();
|
||||
});
|
||||
});
|
||||
|
||||
describe("Messaging", () => {
|
||||
it("queues and sends via light push, marks sent", async () => {
|
||||
const encoder = createEncoder({
|
||||
contentTopic: testContentTopic,
|
||||
routingInfo: testRoutingInfo
|
||||
});
|
||||
|
||||
const lightPush: ILightPush = {
|
||||
multicodec: "lightpush",
|
||||
start: () => {},
|
||||
@ -162,9 +69,19 @@ describe("Messaging", () => {
|
||||
queryWithPromiseCallback: sinon.stub().resolves()
|
||||
} as unknown as IStore;
|
||||
|
||||
const messaging = new Messaging({ lightPush, filter, store });
|
||||
const messaging = new Messaging({
|
||||
lightPush,
|
||||
filter,
|
||||
store,
|
||||
networkConfig: testNetworkconfig
|
||||
});
|
||||
|
||||
await messaging.send(encoder, { payload: utf8ToBytes("hello") });
|
||||
const message: ISendMessage = {
|
||||
contentTopic: testContentTopic,
|
||||
payload: utf8ToBytes("hello")
|
||||
};
|
||||
|
||||
await messaging.send(message);
|
||||
expect((lightPush.send as any).calledOnce).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
144
packages/sdk/src/messaging/sender.spec.ts
Normal file
144
packages/sdk/src/messaging/sender.spec.ts
Normal file
@ -0,0 +1,144 @@
|
||||
import type { ILightPush, ISendMessage, NetworkConfig } from "@waku/interfaces";
|
||||
import { expect } from "chai";
|
||||
import { afterEach, beforeEach, describe, it } from "mocha";
|
||||
import sinon from "sinon";
|
||||
|
||||
import type { AckManager } from "./ack_manager.js";
|
||||
import type { MessageStore } from "./message_store.js";
|
||||
import { Sender } from "./sender.js";
|
||||
|
||||
describe("Sender", () => {
|
||||
let sender: Sender;
|
||||
let mockMessageStore: MessageStore;
|
||||
let mockLightPush: ILightPush;
|
||||
let mockAckManager: AckManager;
|
||||
let mockNetworkConfig: NetworkConfig;
|
||||
|
||||
beforeEach(() => {
|
||||
mockMessageStore = {
|
||||
queue: sinon.stub(),
|
||||
getMessagesToSend: sinon.stub(),
|
||||
markSent: sinon.stub()
|
||||
} as any;
|
||||
|
||||
mockLightPush = {
|
||||
send: sinon.stub()
|
||||
} as any;
|
||||
|
||||
mockAckManager = {
|
||||
subscribe: sinon.stub()
|
||||
} as any;
|
||||
|
||||
mockNetworkConfig = {
|
||||
clusterId: 1,
|
||||
shardId: 0
|
||||
} as any;
|
||||
|
||||
sender = new Sender({
|
||||
messageStore: mockMessageStore,
|
||||
lightPush: mockLightPush,
|
||||
ackManager: mockAckManager,
|
||||
networkConfig: mockNetworkConfig
|
||||
});
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
sinon.restore();
|
||||
});
|
||||
|
||||
describe("constructor", () => {
|
||||
it("should initialize with provided parameters", () => {
|
||||
expect(sender).to.be.instanceOf(Sender);
|
||||
});
|
||||
});
|
||||
|
||||
describe("start", () => {
|
||||
it("should set up background sending interval", () => {
|
||||
const setIntervalSpy = sinon.spy(global, "setInterval");
|
||||
|
||||
sender.start();
|
||||
|
||||
expect(setIntervalSpy.calledWith(sinon.match.func, 1000)).to.be.true;
|
||||
});
|
||||
|
||||
it("should create multiple intervals when called multiple times", () => {
|
||||
const setIntervalSpy = sinon.spy(global, "setInterval");
|
||||
|
||||
sender.start();
|
||||
sender.start();
|
||||
|
||||
expect(setIntervalSpy.calledTwice).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
describe("stop", () => {
|
||||
it("should clear interval when called", () => {
|
||||
const clearIntervalSpy = sinon.spy(global, "clearInterval");
|
||||
|
||||
sender.start();
|
||||
sender.stop();
|
||||
|
||||
expect(clearIntervalSpy.called).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle multiple stop calls gracefully", () => {
|
||||
const clearIntervalSpy = sinon.spy(global, "clearInterval");
|
||||
|
||||
sender.start();
|
||||
sender.stop();
|
||||
sender.stop();
|
||||
|
||||
expect(clearIntervalSpy.calledOnce).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle stop without start", () => {
|
||||
expect(() => sender.stop()).to.not.throw();
|
||||
});
|
||||
});
|
||||
|
||||
describe("send", () => {
|
||||
const mockMessage: ISendMessage = {
|
||||
contentTopic: "test-topic",
|
||||
payload: new Uint8Array([1, 2, 3]),
|
||||
ephemeral: false
|
||||
};
|
||||
|
||||
const mockRequestId = "test-request-id";
|
||||
|
||||
it("should handle messageStore.queue failure", async () => {
|
||||
const error = new Error("Queue failed");
|
||||
(mockMessageStore.queue as sinon.SinonStub).rejects(error);
|
||||
|
||||
try {
|
||||
await sender.send(mockMessage);
|
||||
expect.fail("Expected error to be thrown");
|
||||
} catch (e: any) {
|
||||
expect(e).to.equal(error);
|
||||
}
|
||||
});
|
||||
|
||||
it("should handle ackManager.subscribe failure", async () => {
|
||||
const error = new Error("Subscribe failed");
|
||||
(mockAckManager.subscribe as sinon.SinonStub).rejects(error);
|
||||
(mockMessageStore.queue as sinon.SinonStub).resolves(mockRequestId);
|
||||
|
||||
try {
|
||||
await sender.send(mockMessage);
|
||||
expect.fail("Expected error to be thrown");
|
||||
} catch (e: any) {
|
||||
expect(e).to.equal(error);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("backgroundSend", () => {
|
||||
it("should handle empty pending messages", async () => {
|
||||
(mockMessageStore.getMessagesToSend as sinon.SinonStub).returns([]);
|
||||
|
||||
await sender["backgroundSend"]();
|
||||
|
||||
expect((mockMessageStore.getMessagesToSend as sinon.SinonStub).called).to
|
||||
.be.true;
|
||||
});
|
||||
});
|
||||
});
|
||||
Loading…
x
Reference in New Issue
Block a user