mirror of
https://github.com/logos-messaging/logos-messaging-js.git
synced 2026-01-15 14:33:13 +00:00
Merge 974fb21f0b7ea45d74c87644947e61ac20344372 into 74ad13ba2460b29f5288ddcbb665c86c11a3fb02
This commit is contained in:
commit
d888635668
3
package-lock.json
generated
3
package-lock.json
generated
@ -35002,7 +35002,8 @@
|
||||
"@waku/sds": "^0.0.8",
|
||||
"@waku/utils": "0.0.27",
|
||||
"libp2p": "2.8.11",
|
||||
"lodash.debounce": "^4.0.8"
|
||||
"lodash.debounce": "^4.0.8",
|
||||
"uuid": "^10.0.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@libp2p/interface": "2.10.4",
|
||||
|
||||
@ -60,11 +60,11 @@ export class LightPushCore {
|
||||
};
|
||||
}
|
||||
|
||||
const { rpc, error: prepError } = await ProtocolHandler.preparePushMessage(
|
||||
encoder,
|
||||
message,
|
||||
protocol
|
||||
);
|
||||
const {
|
||||
rpc,
|
||||
error: prepError,
|
||||
message: protoMessage
|
||||
} = await ProtocolHandler.preparePushMessage(encoder, message, protocol);
|
||||
|
||||
if (prepError) {
|
||||
return {
|
||||
@ -122,7 +122,21 @@ export class LightPushCore {
|
||||
};
|
||||
}
|
||||
|
||||
return ProtocolHandler.handleResponse(bytes, protocol, peerId);
|
||||
const processedResponse = ProtocolHandler.handleResponse(
|
||||
bytes,
|
||||
protocol,
|
||||
peerId
|
||||
);
|
||||
|
||||
if (processedResponse.success) {
|
||||
return {
|
||||
success: processedResponse.success,
|
||||
failure: null,
|
||||
message: protoMessage
|
||||
};
|
||||
}
|
||||
|
||||
return processedResponse;
|
||||
}
|
||||
|
||||
private async getProtocol(
|
||||
|
||||
@ -1,5 +1,10 @@
|
||||
import type { PeerId } from "@libp2p/interface";
|
||||
import type { IEncoder, IMessage, LightPushCoreResult } from "@waku/interfaces";
|
||||
import type {
|
||||
IEncoder,
|
||||
IMessage,
|
||||
IProtoMessage,
|
||||
LightPushCoreResult
|
||||
} from "@waku/interfaces";
|
||||
import { LightPushError, LightPushStatusCode } from "@waku/interfaces";
|
||||
import { PushResponse, WakuMessage } from "@waku/proto";
|
||||
import { isMessageSizeUnderCap, Logger } from "@waku/utils";
|
||||
@ -15,8 +20,8 @@ type VersionedPushRpc =
|
||||
| ({ version: "v3" } & PushRpc);
|
||||
|
||||
type PreparePushMessageResult =
|
||||
| { rpc: VersionedPushRpc; error: null }
|
||||
| { rpc: null; error: LightPushError };
|
||||
| { rpc: VersionedPushRpc; error: null; message?: IProtoMessage }
|
||||
| { rpc: null; error: LightPushError; message?: IProtoMessage };
|
||||
|
||||
const log = new Logger("light-push:protocol-handler");
|
||||
|
||||
@ -47,13 +52,15 @@ export class ProtocolHandler {
|
||||
log.info("Creating v3 RPC message");
|
||||
return {
|
||||
rpc: ProtocolHandler.createV3Rpc(protoMessage, encoder.pubsubTopic),
|
||||
error: null
|
||||
error: null,
|
||||
message: protoMessage
|
||||
};
|
||||
}
|
||||
|
||||
log.info("Creating v2 RPC message");
|
||||
return {
|
||||
rpc: ProtocolHandler.createV2Rpc(protoMessage, encoder.pubsubTopic),
|
||||
message: protoMessage,
|
||||
error: null
|
||||
};
|
||||
} catch (err) {
|
||||
|
||||
2
packages/core/src/lib/message/constants.ts
Normal file
2
packages/core/src/lib/message/constants.ts
Normal file
@ -0,0 +1,2 @@
|
||||
export const OneMillion = BigInt(1_000_000);
|
||||
export const Version = 0;
|
||||
@ -1 +1,2 @@
|
||||
export * as version_0 from "./version_0.js";
|
||||
export { OneMillion, Version } from "./constants.js";
|
||||
|
||||
@ -16,10 +16,10 @@ import { bytesToHex } from "@waku/utils/bytes";
|
||||
|
||||
import { messageHash } from "../message_hash/index.js";
|
||||
|
||||
const log = new Logger("message:version-0");
|
||||
const OneMillion = BigInt(1_000_000);
|
||||
import { OneMillion, Version } from "./constants.js";
|
||||
|
||||
const log = new Logger("message:version-0");
|
||||
|
||||
export const Version = 0;
|
||||
export { proto };
|
||||
|
||||
export class DecodedMessage implements IDecodedMessage {
|
||||
|
||||
@ -69,6 +69,21 @@ export interface IMessage {
|
||||
rateLimitProof?: IRateLimitProof;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send message data structure used in {@link IWaku.send}.
|
||||
*/
|
||||
export interface ISendMessage {
|
||||
contentTopic: string;
|
||||
payload: Uint8Array;
|
||||
ephemeral?: boolean;
|
||||
rateLimitProof?: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Request ID of attempt to send a message.
|
||||
*/
|
||||
export type RequestId = string;
|
||||
|
||||
export interface IMetaSetter {
|
||||
(message: IProtoMessage & { meta: undefined }): Uint8Array;
|
||||
}
|
||||
|
||||
@ -5,7 +5,7 @@ import type { DiscoveryOptions, PeerCache } from "./discovery.js";
|
||||
import type { FilterProtocolOptions } from "./filter.js";
|
||||
import type { CreateLibp2pOptions } from "./libp2p.js";
|
||||
import type { LightPushProtocolOptions } from "./light_push.js";
|
||||
import type { IDecodedMessage } from "./message.js";
|
||||
import type { IDecodedMessage, IProtoMessage } from "./message.js";
|
||||
import type { ThisAndThat, ThisOrThat } from "./misc.js";
|
||||
import { NetworkConfig } from "./sharding.js";
|
||||
import type { StoreProtocolOptions } from "./store.js";
|
||||
@ -195,7 +195,13 @@ export type LightPushCoreResult = ThisOrThat<
|
||||
PeerId,
|
||||
"failure",
|
||||
LightPushFailure
|
||||
>;
|
||||
> & {
|
||||
/**
|
||||
* The proto object of the message.
|
||||
* Present only if the message was successfully pushed to the network.
|
||||
*/
|
||||
message?: IProtoMessage;
|
||||
};
|
||||
|
||||
export type FilterCoreResult = ThisOrThat<
|
||||
"success",
|
||||
@ -209,7 +215,13 @@ export type LightPushSDKResult = ThisAndThat<
|
||||
PeerId[],
|
||||
"failures",
|
||||
LightPushFailure[]
|
||||
>;
|
||||
> & {
|
||||
/**
|
||||
* The proto objects of the messages.
|
||||
* Present only if the messages were successfully pushed to the network.
|
||||
*/
|
||||
messages?: IProtoMessage[];
|
||||
};
|
||||
|
||||
export type FilterSDKResult = ThisAndThat<
|
||||
"successes",
|
||||
|
||||
@ -20,6 +20,12 @@ export type ISendOptions = {
|
||||
* @default false
|
||||
*/
|
||||
useLegacy?: boolean;
|
||||
|
||||
/**
|
||||
* Amount of peers to send message to.
|
||||
* Overrides `numPeersToUse` in {@link @waku/interfaces!CreateNodeOptions}.
|
||||
*/
|
||||
numPeersToUse?: number;
|
||||
};
|
||||
|
||||
export interface ISender {
|
||||
|
||||
@ -10,7 +10,13 @@ import type { IFilter } from "./filter.js";
|
||||
import type { HealthStatus } from "./health_status.js";
|
||||
import type { Libp2p } from "./libp2p.js";
|
||||
import type { ILightPush } from "./light_push.js";
|
||||
import { IDecodedMessage, IDecoder, IEncoder } from "./message.js";
|
||||
import {
|
||||
IDecodedMessage,
|
||||
IDecoder,
|
||||
IEncoder,
|
||||
ISendMessage,
|
||||
RequestId
|
||||
} from "./message.js";
|
||||
import type { Protocols } from "./protocols.js";
|
||||
import type { IRelay } from "./relay.js";
|
||||
import type { ShardId } from "./sharding.js";
|
||||
@ -58,9 +64,22 @@ export type IWakuEventEmitter = TypedEventEmitter<IWakuEvents>;
|
||||
|
||||
export interface IWaku {
|
||||
libp2p: Libp2p;
|
||||
|
||||
/**
|
||||
* @deprecated should not be accessed directly, use {@link IWaku.send} and {@link IWaku.subscribe} instead
|
||||
*/
|
||||
relay?: IRelay;
|
||||
|
||||
store?: IStore;
|
||||
|
||||
/**
|
||||
* @deprecated should not be accessed directly, use {@link IWaku.subscribe} instead
|
||||
*/
|
||||
filter?: IFilter;
|
||||
|
||||
/**
|
||||
* @deprecated should not be accessed directly, use {@link IWaku.send} instead
|
||||
*/
|
||||
lightPush?: ILightPush;
|
||||
|
||||
/**
|
||||
@ -251,6 +270,14 @@ export interface IWaku {
|
||||
*/
|
||||
createEncoder(params: CreateEncoderParams): IEncoder;
|
||||
|
||||
/**
|
||||
* Sends a message to the Waku network.
|
||||
*
|
||||
* @param {ISendMessage} message - The message to send.
|
||||
* @returns {Promise<RequestId>} A promise that resolves to the request ID
|
||||
*/
|
||||
send(message: ISendMessage): Promise<RequestId>;
|
||||
|
||||
/**
|
||||
* @returns {boolean} `true` if the node was started and `false` otherwise
|
||||
*/
|
||||
|
||||
@ -76,7 +76,8 @@
|
||||
"@waku/sds": "^0.0.8",
|
||||
"@waku/utils": "0.0.27",
|
||||
"libp2p": "2.8.11",
|
||||
"lodash.debounce": "^4.0.8"
|
||||
"lodash.debounce": "^4.0.8",
|
||||
"uuid": "^10.0.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@libp2p/interface": "2.10.4",
|
||||
|
||||
@ -4,6 +4,7 @@ import {
|
||||
type IEncoder,
|
||||
ILightPush,
|
||||
type IMessage,
|
||||
IProtoMessage,
|
||||
type ISendOptions,
|
||||
type Libp2p,
|
||||
LightPushCoreResult,
|
||||
@ -83,10 +84,11 @@ export class LightPush implements ILightPush {
|
||||
|
||||
log.info("send: attempting to send a message to pubsubTopic:", pubsubTopic);
|
||||
|
||||
const peerIds = await this.peerManager.getPeers({
|
||||
let peerIds = await this.peerManager.getPeers({
|
||||
protocol: options.useLegacy ? "light-push-v2" : Protocols.LightPush,
|
||||
pubsubTopic: encoder.pubsubTopic
|
||||
});
|
||||
peerIds = peerIds?.slice(0, options.numPeersToUse);
|
||||
|
||||
const coreResults =
|
||||
peerIds?.length > 0
|
||||
@ -94,12 +96,15 @@ export class LightPush implements ILightPush {
|
||||
peerIds.map((peerId) =>
|
||||
this.protocol
|
||||
.send(encoder, message, peerId, options.useLegacy)
|
||||
.catch((_e) => ({
|
||||
success: null,
|
||||
failure: {
|
||||
error: LightPushError.GENERIC_FAIL
|
||||
}
|
||||
}))
|
||||
.catch(
|
||||
(_e) =>
|
||||
({
|
||||
success: null,
|
||||
failure: {
|
||||
error: LightPushError.GENERIC_FAIL
|
||||
}
|
||||
}) as LightPushCoreResult
|
||||
)
|
||||
)
|
||||
)
|
||||
: [];
|
||||
@ -111,7 +116,10 @@ export class LightPush implements ILightPush {
|
||||
.map((v) => v.success) as PeerId[],
|
||||
failures: coreResults
|
||||
.filter((v) => v.failure)
|
||||
.map((v) => v.failure) as LightPushFailure[]
|
||||
.map((v) => v.failure) as LightPushFailure[],
|
||||
messages: coreResults
|
||||
.filter((v) => v.message)
|
||||
.map((v) => v.message) as IProtoMessage[]
|
||||
}
|
||||
: {
|
||||
successes: [],
|
||||
|
||||
309
packages/sdk/src/messaging/ack_manager.spec.ts
Normal file
309
packages/sdk/src/messaging/ack_manager.spec.ts
Normal file
@ -0,0 +1,309 @@
|
||||
import type {
|
||||
IDecodedMessage,
|
||||
IFilter,
|
||||
IStore,
|
||||
NetworkConfig
|
||||
} from "@waku/interfaces";
|
||||
import { expect } from "chai";
|
||||
import { afterEach, beforeEach, describe, it } from "mocha";
|
||||
import sinon from "sinon";
|
||||
|
||||
import { AckManager } from "./ack_manager.js";
|
||||
import { MessageStore } from "./message_store.js";
|
||||
|
||||
const mockMessage: IDecodedMessage = {
|
||||
version: 1,
|
||||
payload: new Uint8Array([1, 2, 3]),
|
||||
contentTopic: "/test/1/topic/proto",
|
||||
pubsubTopic: "test-pubsub",
|
||||
timestamp: new Date(),
|
||||
rateLimitProof: undefined,
|
||||
ephemeral: false,
|
||||
meta: undefined,
|
||||
hash: new Uint8Array([4, 5, 6]),
|
||||
hashStr: "test-hash-123"
|
||||
};
|
||||
|
||||
const mockNetworkConfig: NetworkConfig = {
|
||||
clusterId: 1,
|
||||
numShardsInCluster: 8
|
||||
};
|
||||
|
||||
describe("AckManager", () => {
|
||||
let messageStore: MessageStore;
|
||||
let mockFilter: IFilter;
|
||||
let mockStore: IStore;
|
||||
let ackManager: AckManager;
|
||||
let clock: sinon.SinonFakeTimers;
|
||||
|
||||
beforeEach(() => {
|
||||
messageStore = new MessageStore();
|
||||
|
||||
mockFilter = {
|
||||
subscribe: sinon.stub().resolves(true),
|
||||
unsubscribe: sinon.stub().resolves(true)
|
||||
} as unknown as IFilter;
|
||||
|
||||
mockStore = {
|
||||
queryWithOrderedCallback: sinon.stub().resolves(undefined)
|
||||
} as unknown as IStore;
|
||||
|
||||
ackManager = new AckManager({
|
||||
messageStore,
|
||||
filter: mockFilter,
|
||||
store: mockStore,
|
||||
networkConfig: mockNetworkConfig
|
||||
});
|
||||
|
||||
clock = sinon.useFakeTimers();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
clock.restore();
|
||||
sinon.restore();
|
||||
});
|
||||
|
||||
describe("constructor", () => {
|
||||
it("should initialize with provided parameters", () => {
|
||||
expect(ackManager).to.be.instanceOf(AckManager);
|
||||
});
|
||||
});
|
||||
|
||||
describe("start", () => {
|
||||
it("should start filter and store ack managers", () => {
|
||||
ackManager.start();
|
||||
|
||||
expect(clock.countTimers()).to.equal(1);
|
||||
});
|
||||
|
||||
it("should be idempotent", () => {
|
||||
ackManager.start();
|
||||
ackManager.start();
|
||||
|
||||
expect(clock.countTimers()).to.equal(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe("stop", () => {
|
||||
it("should stop filter and store ack managers", async () => {
|
||||
ackManager.start();
|
||||
await ackManager.stop();
|
||||
|
||||
expect(clock.countTimers()).to.equal(0);
|
||||
});
|
||||
|
||||
it("should clear subscribed content topics", async () => {
|
||||
await ackManager.subscribe("/test/1/clear/proto");
|
||||
await ackManager.stop();
|
||||
|
||||
const result = await ackManager.subscribe("/test/1/clear/proto");
|
||||
expect(result).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle stop without start", async () => {
|
||||
await ackManager.stop();
|
||||
});
|
||||
});
|
||||
|
||||
describe("subscribe", () => {
|
||||
it("should subscribe to new content topic", async () => {
|
||||
const result = await ackManager.subscribe("/test/1/new/proto");
|
||||
|
||||
expect(result).to.be.true;
|
||||
expect(
|
||||
(mockFilter.subscribe as sinon.SinonStub).calledWith(
|
||||
sinon.match.object,
|
||||
sinon.match.func
|
||||
)
|
||||
).to.be.true;
|
||||
});
|
||||
|
||||
it("should return true for already subscribed topic", async () => {
|
||||
await ackManager.subscribe("/test/1/existing/proto");
|
||||
const result = await ackManager.subscribe("/test/1/existing/proto");
|
||||
|
||||
expect(result).to.be.true;
|
||||
expect((mockFilter.subscribe as sinon.SinonStub).calledOnce).to.be.true;
|
||||
});
|
||||
|
||||
it("should return true if at least one subscription succeeds", async () => {
|
||||
(mockFilter.subscribe as sinon.SinonStub).resolves(false);
|
||||
|
||||
const result = await ackManager.subscribe("/test/1/topic/proto");
|
||||
|
||||
expect(result).to.be.true;
|
||||
});
|
||||
|
||||
it("should return true when filter fails but store succeeds", async () => {
|
||||
(mockFilter.subscribe as sinon.SinonStub).resolves(false);
|
||||
|
||||
const result = await ackManager.subscribe("/test/1/topic/proto");
|
||||
|
||||
expect(result).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
describe("FilterAckManager", () => {
|
||||
beforeEach(() => {
|
||||
ackManager.start();
|
||||
});
|
||||
|
||||
it("should handle message reception and acknowledgment", async () => {
|
||||
await ackManager.subscribe("/test/1/topic/proto");
|
||||
const onMessageCallback = (
|
||||
mockFilter.subscribe as sinon.SinonStub
|
||||
).getCall(0).args[1];
|
||||
|
||||
await onMessageCallback(mockMessage);
|
||||
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should not add duplicate messages", async () => {
|
||||
messageStore.add(mockMessage, { filterAck: false });
|
||||
await ackManager.subscribe("/test/1/topic/proto");
|
||||
|
||||
const onMessageCallback = (
|
||||
mockFilter.subscribe as sinon.SinonStub
|
||||
).getCall(0).args[1];
|
||||
await onMessageCallback(mockMessage);
|
||||
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should unsubscribe all decoders on stop", async () => {
|
||||
await ackManager.subscribe("/test/1/topic1/proto");
|
||||
await ackManager.subscribe("/test/1/topic2/proto");
|
||||
|
||||
await ackManager.stop();
|
||||
|
||||
expect((mockFilter.unsubscribe as sinon.SinonStub).calledTwice).to.be
|
||||
.true;
|
||||
});
|
||||
});
|
||||
|
||||
describe("StoreAckManager", () => {
|
||||
beforeEach(() => {
|
||||
ackManager.start();
|
||||
});
|
||||
|
||||
it("should query store periodically", async () => {
|
||||
await ackManager.subscribe("/test/1/topic/proto");
|
||||
|
||||
await clock.tickAsync(5000);
|
||||
|
||||
expect(
|
||||
(mockStore.queryWithOrderedCallback as sinon.SinonStub).calledWith(
|
||||
sinon.match.array,
|
||||
sinon.match.func,
|
||||
sinon.match.object
|
||||
)
|
||||
).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle store query callback", async () => {
|
||||
await ackManager.subscribe("/test/1/topic/proto");
|
||||
|
||||
await clock.tickAsync(5000);
|
||||
|
||||
const callback = (
|
||||
mockStore.queryWithOrderedCallback as sinon.SinonStub
|
||||
).getCall(0).args[1];
|
||||
callback(mockMessage);
|
||||
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should not add duplicate messages from store", async () => {
|
||||
messageStore.add(mockMessage, { storeAck: false });
|
||||
|
||||
await ackManager.subscribe("/test/1/topic/proto");
|
||||
await clock.tickAsync(5000);
|
||||
|
||||
const callback = (
|
||||
mockStore.queryWithOrderedCallback as sinon.SinonStub
|
||||
).getCall(0).args[1];
|
||||
callback(mockMessage);
|
||||
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should stop interval on stop", async () => {
|
||||
ackManager.start();
|
||||
await ackManager.stop();
|
||||
|
||||
expect(clock.countTimers()).to.equal(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe("integration scenarios", () => {
|
||||
it("should handle complete lifecycle", async () => {
|
||||
ackManager.start();
|
||||
|
||||
const result1 = await ackManager.subscribe("/test/1/topic1/proto");
|
||||
const result2 = await ackManager.subscribe("/test/1/topic2/proto");
|
||||
|
||||
expect(result1).to.be.true;
|
||||
expect(result2).to.be.true;
|
||||
|
||||
await ackManager.stop();
|
||||
|
||||
expect(clock.countTimers()).to.equal(0);
|
||||
});
|
||||
|
||||
it("should handle multiple subscriptions to same topic", async () => {
|
||||
ackManager.start();
|
||||
|
||||
const result1 = await ackManager.subscribe("/test/1/same/proto");
|
||||
const result2 = await ackManager.subscribe("/test/1/same/proto");
|
||||
|
||||
expect(result1).to.be.true;
|
||||
expect(result2).to.be.true;
|
||||
expect((mockFilter.subscribe as sinon.SinonStub).calledOnce).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle subscription after stop", async () => {
|
||||
ackManager.start();
|
||||
await ackManager.stop();
|
||||
|
||||
const result = await ackManager.subscribe("/test/1/after-stop/proto");
|
||||
expect(result).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
describe("error handling", () => {
|
||||
it("should handle filter subscription errors gracefully", async () => {
|
||||
(mockFilter.subscribe as sinon.SinonStub).resolves(false);
|
||||
|
||||
const result = await ackManager.subscribe("/test/1/error/proto");
|
||||
|
||||
expect(result).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle store query errors gracefully", async () => {
|
||||
(mockStore.queryWithOrderedCallback as sinon.SinonStub).rejects(
|
||||
new Error("Store query error")
|
||||
);
|
||||
|
||||
ackManager.start();
|
||||
await ackManager.subscribe("/test/1/error/proto");
|
||||
|
||||
await clock.tickAsync(5000);
|
||||
});
|
||||
|
||||
it("should handle unsubscribe errors gracefully", async () => {
|
||||
ackManager.start();
|
||||
await ackManager.subscribe("/test/1/error/proto");
|
||||
|
||||
(mockFilter.unsubscribe as sinon.SinonStub).rejects(
|
||||
new Error("Unsubscribe error")
|
||||
);
|
||||
|
||||
try {
|
||||
await ackManager.stop();
|
||||
} catch {
|
||||
// Expected to throw
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
176
packages/sdk/src/messaging/ack_manager.ts
Normal file
176
packages/sdk/src/messaging/ack_manager.ts
Normal file
@ -0,0 +1,176 @@
|
||||
import { createDecoder } from "@waku/core";
|
||||
import {
|
||||
IDecodedMessage,
|
||||
IDecoder,
|
||||
IFilter,
|
||||
IStore,
|
||||
NetworkConfig
|
||||
} from "@waku/interfaces";
|
||||
import { createRoutingInfo } from "@waku/utils";
|
||||
|
||||
import { MessageStore } from "./message_store.js";
|
||||
import { IAckManager } from "./utils.js";
|
||||
|
||||
type AckManagerConstructorParams = {
|
||||
messageStore: MessageStore;
|
||||
filter: IFilter;
|
||||
store: IStore;
|
||||
networkConfig: NetworkConfig;
|
||||
};
|
||||
|
||||
const DEFAULT_QUERY_INTERVAL = 5000;
|
||||
const QUERY_TIME_WINDOW_MS = 60 * 60 * 1000;
|
||||
|
||||
export class AckManager implements IAckManager {
|
||||
private readonly messageStore: MessageStore;
|
||||
private readonly filterAckManager: FilterAckManager;
|
||||
private readonly storeAckManager: StoreAckManager;
|
||||
private readonly networkConfig: NetworkConfig;
|
||||
|
||||
private readonly subscribedContentTopics: Set<string> = new Set();
|
||||
private readonly subscribingAttempts: Set<string> = new Set();
|
||||
|
||||
public constructor(params: AckManagerConstructorParams) {
|
||||
this.messageStore = params.messageStore;
|
||||
this.networkConfig = params.networkConfig;
|
||||
|
||||
this.filterAckManager = new FilterAckManager(
|
||||
this.messageStore,
|
||||
params.filter
|
||||
);
|
||||
|
||||
this.storeAckManager = new StoreAckManager(this.messageStore, params.store);
|
||||
}
|
||||
|
||||
public start(): void {
|
||||
this.filterAckManager.start();
|
||||
this.storeAckManager.start();
|
||||
}
|
||||
|
||||
public async stop(): Promise<void> {
|
||||
await this.filterAckManager.stop();
|
||||
this.storeAckManager.stop();
|
||||
this.subscribedContentTopics.clear();
|
||||
}
|
||||
|
||||
public async subscribe(contentTopic: string): Promise<boolean> {
|
||||
if (
|
||||
this.subscribedContentTopics.has(contentTopic) ||
|
||||
this.subscribingAttempts.has(contentTopic)
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
|
||||
this.subscribingAttempts.add(contentTopic);
|
||||
|
||||
const decoder = createDecoder(
|
||||
contentTopic,
|
||||
createRoutingInfo(this.networkConfig, {
|
||||
contentTopic
|
||||
})
|
||||
);
|
||||
|
||||
const promises = await Promise.all([
|
||||
this.filterAckManager.subscribe(decoder),
|
||||
this.storeAckManager.subscribe(decoder)
|
||||
]);
|
||||
|
||||
this.subscribedContentTopics.add(contentTopic);
|
||||
this.subscribingAttempts.delete(contentTopic);
|
||||
return promises.some((success) => success);
|
||||
}
|
||||
}
|
||||
|
||||
class FilterAckManager {
|
||||
private decoders: Set<IDecoder<IDecodedMessage>> = new Set();
|
||||
|
||||
public constructor(
|
||||
private messageStore: MessageStore,
|
||||
private filter: IFilter
|
||||
) {}
|
||||
|
||||
public start(): void {
|
||||
return;
|
||||
}
|
||||
|
||||
public async stop(): Promise<void> {
|
||||
const promises = Array.from(this.decoders.entries()).map((decoder) =>
|
||||
this.filter.unsubscribe(decoder)
|
||||
);
|
||||
await Promise.all(promises);
|
||||
this.decoders.clear();
|
||||
}
|
||||
|
||||
public async subscribe(decoder: IDecoder<IDecodedMessage>): Promise<boolean> {
|
||||
const success = await this.filter.subscribe(
|
||||
decoder,
|
||||
this.onMessage.bind(this)
|
||||
);
|
||||
if (success) {
|
||||
this.decoders.add(decoder);
|
||||
}
|
||||
return success;
|
||||
}
|
||||
|
||||
private async onMessage(message: IDecodedMessage): Promise<void> {
|
||||
if (!this.messageStore.has(message.hashStr)) {
|
||||
this.messageStore.add(message, { filterAck: true });
|
||||
}
|
||||
|
||||
this.messageStore.markFilterAck(message.hashStr);
|
||||
}
|
||||
}
|
||||
|
||||
class StoreAckManager {
|
||||
private interval: ReturnType<typeof setInterval> | null = null;
|
||||
|
||||
private decoders: Set<IDecoder<IDecodedMessage>> = new Set();
|
||||
|
||||
public constructor(
|
||||
private messageStore: MessageStore,
|
||||
private store: IStore
|
||||
) {}
|
||||
|
||||
public start(): void {
|
||||
if (this.interval) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.interval = setInterval(() => {
|
||||
void this.query();
|
||||
}, DEFAULT_QUERY_INTERVAL);
|
||||
}
|
||||
|
||||
public stop(): void {
|
||||
if (!this.interval) {
|
||||
return;
|
||||
}
|
||||
|
||||
clearInterval(this.interval);
|
||||
this.interval = null;
|
||||
}
|
||||
|
||||
public async subscribe(decoder: IDecoder<IDecodedMessage>): Promise<boolean> {
|
||||
this.decoders.add(decoder);
|
||||
return true;
|
||||
}
|
||||
|
||||
private async query(): Promise<void> {
|
||||
for (const decoder of this.decoders) {
|
||||
await this.store.queryWithOrderedCallback(
|
||||
[decoder],
|
||||
(message) => {
|
||||
if (!this.messageStore.has(message.hashStr)) {
|
||||
this.messageStore.add(message, { storeAck: true });
|
||||
}
|
||||
|
||||
this.messageStore.markStoreAck(message.hashStr);
|
||||
},
|
||||
{
|
||||
timeStart: new Date(Date.now() - QUERY_TIME_WINDOW_MS),
|
||||
timeEnd: new Date()
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
1
packages/sdk/src/messaging/index.ts
Normal file
1
packages/sdk/src/messaging/index.ts
Normal file
@ -0,0 +1 @@
|
||||
export { Messaging } from "./messaging.js";
|
||||
349
packages/sdk/src/messaging/message_store.spec.ts
Normal file
349
packages/sdk/src/messaging/message_store.spec.ts
Normal file
@ -0,0 +1,349 @@
|
||||
import type { IDecodedMessage, ISendMessage } from "@waku/interfaces";
|
||||
import { expect } from "chai";
|
||||
import { beforeEach, describe, it } from "mocha";
|
||||
|
||||
import { MessageStore } from "./message_store.js";
|
||||
|
||||
describe("MessageStore", () => {
|
||||
let messageStore: MessageStore;
|
||||
let mockMessage: IDecodedMessage;
|
||||
let mockSendMessage: ISendMessage;
|
||||
|
||||
beforeEach(() => {
|
||||
messageStore = new MessageStore();
|
||||
mockMessage = {
|
||||
version: 1,
|
||||
payload: new Uint8Array([1, 2, 3]),
|
||||
contentTopic: "test-topic",
|
||||
pubsubTopic: "test-pubsub",
|
||||
timestamp: new Date(1000),
|
||||
rateLimitProof: undefined,
|
||||
ephemeral: false,
|
||||
meta: undefined,
|
||||
hash: new Uint8Array([4, 5, 6]),
|
||||
hashStr: "test-hash-123"
|
||||
};
|
||||
mockSendMessage = {
|
||||
contentTopic: "test-topic",
|
||||
payload: new Uint8Array([7, 8, 9]),
|
||||
ephemeral: false
|
||||
};
|
||||
});
|
||||
|
||||
describe("constructor", () => {
|
||||
it("should create instance with default options", () => {
|
||||
const store = new MessageStore();
|
||||
expect(store).to.be.instanceOf(MessageStore);
|
||||
});
|
||||
|
||||
it("should create instance with custom resend interval", () => {
|
||||
const customInterval = 10000;
|
||||
const store = new MessageStore({ resendIntervalMs: customInterval });
|
||||
expect(store).to.be.instanceOf(MessageStore);
|
||||
});
|
||||
});
|
||||
|
||||
describe("has", () => {
|
||||
it("should return false for non-existent message", () => {
|
||||
expect(messageStore.has("non-existent")).to.be.false;
|
||||
});
|
||||
|
||||
it("should return true for added message", () => {
|
||||
messageStore.add(mockMessage);
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should return true for pending message", async () => {
|
||||
await messageStore.queue(mockSendMessage);
|
||||
expect(messageStore.has("pending-hash")).to.be.false;
|
||||
});
|
||||
});
|
||||
|
||||
describe("add", () => {
|
||||
it("should add new message with default options", () => {
|
||||
messageStore.add(mockMessage);
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should add message with custom options", () => {
|
||||
messageStore.add(mockMessage, { filterAck: true, storeAck: false });
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should not add duplicate message", () => {
|
||||
messageStore.add(mockMessage);
|
||||
messageStore.add(mockMessage);
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should not add message if already exists", () => {
|
||||
messageStore.add(mockMessage);
|
||||
messageStore.add(mockMessage);
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
describe("queue", () => {
|
||||
it("should queue message and return request ID", async () => {
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
expect(typeof requestId).to.equal("string");
|
||||
expect(requestId.length).to.be.greaterThan(0);
|
||||
});
|
||||
|
||||
it("should queue multiple messages with different request IDs", async () => {
|
||||
const requestId1 = await messageStore.queue(mockSendMessage);
|
||||
const requestId2 = await messageStore.queue(mockSendMessage);
|
||||
expect(requestId1).to.not.equal(requestId2);
|
||||
});
|
||||
});
|
||||
|
||||
describe("markFilterAck", () => {
|
||||
it("should mark filter acknowledgment for existing message", () => {
|
||||
messageStore.add(mockMessage);
|
||||
messageStore.markFilterAck(mockMessage.hashStr);
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle filter ack for non-existent message", () => {
|
||||
expect(() => {
|
||||
messageStore.markFilterAck("non-existent");
|
||||
}).to.not.throw();
|
||||
});
|
||||
|
||||
it("should handle filter ack for pending message", async () => {
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
messageStore.markSent(requestId, mockMessage);
|
||||
messageStore.markFilterAck(mockMessage.hashStr);
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
describe("markStoreAck", () => {
|
||||
it("should mark store acknowledgment for existing message", () => {
|
||||
messageStore.add(mockMessage);
|
||||
messageStore.markStoreAck(mockMessage.hashStr);
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle store ack for non-existent message", () => {
|
||||
expect(() => {
|
||||
messageStore.markStoreAck("non-existent");
|
||||
}).to.not.throw();
|
||||
});
|
||||
|
||||
it("should handle store ack for pending message", async () => {
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
messageStore.markSent(requestId, mockMessage);
|
||||
messageStore.markStoreAck(mockMessage.hashStr);
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
describe("markSent", () => {
|
||||
it("should mark message as sent with valid request ID", async () => {
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
messageStore.markSent(requestId, mockMessage);
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle markSent with invalid request ID", () => {
|
||||
expect(() => {
|
||||
messageStore.markSent("invalid-request-id", mockMessage);
|
||||
}).to.not.throw();
|
||||
});
|
||||
|
||||
it("should handle markSent with request ID without message", async () => {
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
const entry = (messageStore as any).pendingRequests.get(requestId);
|
||||
if (entry) {
|
||||
entry.messageRequest = undefined;
|
||||
}
|
||||
expect(() => {
|
||||
messageStore.markSent(requestId, mockMessage);
|
||||
}).to.not.throw();
|
||||
});
|
||||
|
||||
it("should set lastSentAt timestamp", async () => {
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
const sentMessage = { ...mockMessage, timestamp: new Date(2000) };
|
||||
messageStore.markSent(requestId, sentMessage);
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
describe("getMessagesToSend", () => {
|
||||
it("should return empty array when no messages queued", () => {
|
||||
const messages = messageStore.getMessagesToSend();
|
||||
expect(messages).to.deep.equal([]);
|
||||
});
|
||||
|
||||
it("should return queued messages that need sending", async () => {
|
||||
const customStore = new MessageStore({ resendIntervalMs: 0 });
|
||||
const requestId = await customStore.queue(mockSendMessage);
|
||||
const messages = customStore.getMessagesToSend();
|
||||
expect(messages).to.have.length(1);
|
||||
expect(messages[0].requestId).to.equal(requestId);
|
||||
expect(messages[0].message).to.equal(mockSendMessage);
|
||||
});
|
||||
|
||||
it("should not return acknowledged messages", async () => {
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
const entry = (messageStore as any).pendingRequests.get(requestId);
|
||||
if (entry) {
|
||||
entry.filterAck = true;
|
||||
}
|
||||
const messages = messageStore.getMessagesToSend();
|
||||
expect(messages).to.have.length(0);
|
||||
});
|
||||
|
||||
it("should not return store acknowledged messages", async () => {
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
const entry = (messageStore as any).pendingRequests.get(requestId);
|
||||
if (entry) {
|
||||
entry.storeAck = true;
|
||||
}
|
||||
const messages = messageStore.getMessagesToSend();
|
||||
expect(messages).to.have.length(0);
|
||||
});
|
||||
|
||||
it("should respect resend interval", async () => {
|
||||
const customStore = new MessageStore({ resendIntervalMs: 10000 });
|
||||
const requestId = await customStore.queue(mockSendMessage);
|
||||
|
||||
const entry = (customStore as any).pendingRequests.get(requestId);
|
||||
if (entry) {
|
||||
entry.lastSentAt = Date.now() - 5000;
|
||||
}
|
||||
|
||||
const messagesAfterShortTime = customStore.getMessagesToSend();
|
||||
expect(messagesAfterShortTime).to.have.length(0);
|
||||
|
||||
if (entry) {
|
||||
entry.lastSentAt = Date.now() - 15000;
|
||||
}
|
||||
|
||||
const messagesAfterLongTime = customStore.getMessagesToSend();
|
||||
expect(messagesAfterLongTime).to.have.length(1);
|
||||
});
|
||||
|
||||
it("should return messages after resend interval", async () => {
|
||||
const customStore = new MessageStore({ resendIntervalMs: 1000 });
|
||||
const requestId = await customStore.queue(mockSendMessage);
|
||||
|
||||
const entry = (customStore as any).pendingRequests.get(requestId);
|
||||
if (entry) {
|
||||
entry.lastSentAt = Date.now() - 2000;
|
||||
}
|
||||
|
||||
const messages = customStore.getMessagesToSend();
|
||||
expect(messages).to.have.length(1);
|
||||
});
|
||||
|
||||
it("should not return messages without messageRequest", async () => {
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
const entry = (messageStore as any).pendingRequests.get(requestId);
|
||||
if (entry) {
|
||||
entry.messageRequest = undefined;
|
||||
}
|
||||
const messages = messageStore.getMessagesToSend();
|
||||
expect(messages).to.have.length(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe("edge cases", () => {
|
||||
it("should handle multiple acknowledgments for same message", () => {
|
||||
messageStore.add(mockMessage);
|
||||
messageStore.markFilterAck(mockMessage.hashStr);
|
||||
messageStore.markStoreAck(mockMessage.hashStr);
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle message received before sent", async () => {
|
||||
messageStore.add(mockMessage);
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
messageStore.markSent(requestId, mockMessage);
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle empty message hash", () => {
|
||||
const emptyHashMessage = { ...mockMessage, hashStr: "" };
|
||||
messageStore.add(emptyHashMessage);
|
||||
expect(messageStore.has("")).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle very long message hash", () => {
|
||||
const longHash = "a".repeat(1000);
|
||||
const longHashMessage = { ...mockMessage, hashStr: longHash };
|
||||
messageStore.add(longHashMessage);
|
||||
expect(messageStore.has(longHash)).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle special characters in hash", () => {
|
||||
const specialHash = "test-hash-!@#$%^&*()_+-=[]{}|;':\",./<>?";
|
||||
const specialHashMessage = { ...mockMessage, hashStr: specialHash };
|
||||
messageStore.add(specialHashMessage);
|
||||
expect(messageStore.has(specialHash)).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
describe("state transitions", () => {
|
||||
it("should move message from pending to stored on ack", async () => {
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
messageStore.markSent(requestId, mockMessage);
|
||||
messageStore.markFilterAck(mockMessage.hashStr);
|
||||
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
const pendingMessages = (messageStore as any).pendingMessages;
|
||||
expect(pendingMessages.has(mockMessage.hashStr)).to.be.false;
|
||||
});
|
||||
|
||||
it("should merge pending and stored message data", async () => {
|
||||
messageStore.add(mockMessage, { filterAck: true });
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
messageStore.markSent(requestId, mockMessage);
|
||||
messageStore.markStoreAck(mockMessage.hashStr);
|
||||
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
|
||||
it("should preserve acknowledgment state during transition", async () => {
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
const entry = (messageStore as any).pendingRequests.get(requestId);
|
||||
if (entry) {
|
||||
entry.filterAck = true;
|
||||
}
|
||||
messageStore.markSent(requestId, mockMessage);
|
||||
messageStore.markStoreAck(mockMessage.hashStr);
|
||||
|
||||
expect(messageStore.has(mockMessage.hashStr)).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
describe("timing edge cases", () => {
|
||||
it("should handle zero timestamp", async () => {
|
||||
const zeroTimeMessage = { ...mockMessage, timestamp: new Date(0) };
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
expect(() => {
|
||||
messageStore.markSent(requestId, zeroTimeMessage);
|
||||
}).to.not.throw();
|
||||
});
|
||||
|
||||
it("should handle future timestamp", async () => {
|
||||
const futureTime = new Date(Date.now() + 86400000);
|
||||
const futureMessage = { ...mockMessage, timestamp: futureTime };
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
expect(() => {
|
||||
messageStore.markSent(requestId, futureMessage);
|
||||
}).to.not.throw();
|
||||
});
|
||||
|
||||
it("should handle very old timestamp", async () => {
|
||||
const oldTime = new Date(0);
|
||||
const oldMessage = { ...mockMessage, timestamp: oldTime };
|
||||
const requestId = await messageStore.queue(mockSendMessage);
|
||||
expect(() => {
|
||||
messageStore.markSent(requestId, oldMessage);
|
||||
}).to.not.throw();
|
||||
});
|
||||
});
|
||||
});
|
||||
172
packages/sdk/src/messaging/message_store.ts
Normal file
172
packages/sdk/src/messaging/message_store.ts
Normal file
@ -0,0 +1,172 @@
|
||||
import { IDecodedMessage, ISendMessage, RequestId } from "@waku/interfaces";
|
||||
import { v4 as uuidv4 } from "uuid";
|
||||
|
||||
type QueuedMessage = {
|
||||
messageRequest?: ISendMessage;
|
||||
filterAck: boolean;
|
||||
storeAck: boolean;
|
||||
lastSentAt?: number;
|
||||
createdAt: number;
|
||||
};
|
||||
|
||||
type AddMessageOptions = {
|
||||
filterAck?: boolean;
|
||||
storeAck?: boolean;
|
||||
};
|
||||
|
||||
type MessageStoreOptions = {
|
||||
resendIntervalMs?: number;
|
||||
};
|
||||
|
||||
type MessageHashStr = string;
|
||||
|
||||
export class MessageStore {
|
||||
private readonly messages: Map<MessageHashStr, QueuedMessage> = new Map();
|
||||
|
||||
private readonly pendingRequests: Map<RequestId, QueuedMessage> = new Map();
|
||||
private readonly pendingMessages: Map<MessageHashStr, RequestId> = new Map();
|
||||
|
||||
private readonly resendIntervalMs: number;
|
||||
|
||||
public constructor(options: MessageStoreOptions = {}) {
|
||||
this.resendIntervalMs = options.resendIntervalMs ?? 5000;
|
||||
}
|
||||
|
||||
public has(hashStr: string): boolean {
|
||||
return this.messages.has(hashStr) || this.pendingMessages.has(hashStr);
|
||||
}
|
||||
|
||||
public add(message: IDecodedMessage, options: AddMessageOptions = {}): void {
|
||||
if (!this.has(message.hashStr)) {
|
||||
this.messages.set(message.hashStr, {
|
||||
filterAck: options.filterAck ?? false,
|
||||
storeAck: options.storeAck ?? false,
|
||||
createdAt: Date.now()
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public markFilterAck(hashStr: string): void {
|
||||
this.ackMessage(hashStr, { filterAck: true });
|
||||
this.replacePendingWithMessage(hashStr);
|
||||
}
|
||||
|
||||
public markStoreAck(hashStr: string): void {
|
||||
this.ackMessage(hashStr, { storeAck: true });
|
||||
this.replacePendingWithMessage(hashStr);
|
||||
}
|
||||
|
||||
public markSent(requestId: RequestId, sentMessage: IDecodedMessage): void {
|
||||
const entry = this.pendingRequests.get(requestId);
|
||||
|
||||
if (!entry || !entry.messageRequest) {
|
||||
return;
|
||||
}
|
||||
|
||||
entry.lastSentAt = Number(sentMessage.timestamp);
|
||||
this.pendingMessages.set(sentMessage.hashStr, requestId);
|
||||
|
||||
this.replacePendingWithMessage(sentMessage.hashStr);
|
||||
}
|
||||
|
||||
public async queue(message: ISendMessage): Promise<RequestId> {
|
||||
const requestId = uuidv4(); // cspell:ignore uuidv4
|
||||
|
||||
this.pendingRequests.set(requestId.toString(), {
|
||||
messageRequest: message,
|
||||
filterAck: false,
|
||||
storeAck: false,
|
||||
createdAt: Date.now()
|
||||
});
|
||||
|
||||
return requestId;
|
||||
}
|
||||
|
||||
public getMessagesToSend(): Array<{
|
||||
requestId: string;
|
||||
message: ISendMessage;
|
||||
}> {
|
||||
const res: Array<{
|
||||
requestId: string;
|
||||
message: ISendMessage;
|
||||
}> = [];
|
||||
|
||||
for (const [requestId, entry] of this.pendingRequests.entries()) {
|
||||
const isAcknowledged = entry.filterAck || entry.storeAck;
|
||||
|
||||
if (!entry.messageRequest || isAcknowledged) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const sentAt = entry.lastSentAt || entry.createdAt;
|
||||
const notTooRecent = Date.now() - sentAt >= this.resendIntervalMs;
|
||||
const notAcknowledged = !isAcknowledged;
|
||||
|
||||
if (notTooRecent && notAcknowledged) {
|
||||
res.push({
|
||||
requestId,
|
||||
message: entry.messageRequest
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
private ackMessage(
|
||||
hashStr: MessageHashStr,
|
||||
ackParams: AddMessageOptions = {}
|
||||
): void {
|
||||
let entry = this.messages.get(hashStr);
|
||||
|
||||
if (entry) {
|
||||
entry.filterAck = ackParams.filterAck ?? entry.filterAck;
|
||||
entry.storeAck = ackParams.storeAck ?? entry.storeAck;
|
||||
return;
|
||||
}
|
||||
|
||||
const requestId = this.pendingMessages.get(hashStr);
|
||||
|
||||
if (!requestId) {
|
||||
return;
|
||||
}
|
||||
|
||||
entry = this.pendingRequests.get(requestId);
|
||||
|
||||
if (!entry) {
|
||||
return;
|
||||
}
|
||||
|
||||
entry.filterAck = ackParams.filterAck ?? entry.filterAck;
|
||||
entry.storeAck = ackParams.storeAck ?? entry.storeAck;
|
||||
}
|
||||
|
||||
private replacePendingWithMessage(hashStr: MessageHashStr): void {
|
||||
const requestId = this.pendingMessages.get(hashStr);
|
||||
|
||||
if (!requestId) {
|
||||
return;
|
||||
}
|
||||
|
||||
let entry = this.pendingRequests.get(requestId);
|
||||
|
||||
if (!entry) {
|
||||
return;
|
||||
}
|
||||
|
||||
// merge with message entry if possible
|
||||
// this can happen if message we sent got received before we could add it to the message store
|
||||
const messageEntry = this.messages.get(hashStr);
|
||||
entry = {
|
||||
...entry,
|
||||
...messageEntry,
|
||||
filterAck: messageEntry?.filterAck ?? entry.filterAck,
|
||||
storeAck: messageEntry?.storeAck ?? entry.storeAck
|
||||
};
|
||||
|
||||
this.pendingRequests.delete(requestId);
|
||||
this.pendingMessages.delete(hashStr);
|
||||
|
||||
this.messages.set(hashStr, entry);
|
||||
}
|
||||
}
|
||||
87
packages/sdk/src/messaging/messaging.spec.ts
Normal file
87
packages/sdk/src/messaging/messaging.spec.ts
Normal file
@ -0,0 +1,87 @@
|
||||
import type {
|
||||
IFilter,
|
||||
ILightPush,
|
||||
ISendMessage,
|
||||
IStore
|
||||
} from "@waku/interfaces";
|
||||
import { utf8ToBytes } from "@waku/utils/bytes";
|
||||
import { expect } from "chai";
|
||||
import sinon from "sinon";
|
||||
|
||||
import { MessageStore } from "./message_store.js";
|
||||
import { Messaging } from "./messaging.js";
|
||||
|
||||
const testContentTopic = "/test/1/waku-messaging/utf8";
|
||||
const testNetworkconfig = {
|
||||
clusterId: 0,
|
||||
numShardsInCluster: 9
|
||||
};
|
||||
|
||||
describe("MessageStore", () => {
|
||||
it("queues, marks sent and acks", async () => {
|
||||
const store = new MessageStore({ resendIntervalMs: 1 });
|
||||
const msg: ISendMessage = {
|
||||
contentTopic: testContentTopic,
|
||||
payload: utf8ToBytes("hello")
|
||||
};
|
||||
|
||||
const hash = await store.queue(msg);
|
||||
expect(hash).to.be.a("string");
|
||||
if (!hash) return;
|
||||
|
||||
const mockDecodedMessage = {
|
||||
hashStr: hash,
|
||||
timestamp: new Date()
|
||||
} as any;
|
||||
|
||||
store.markSent(hash, mockDecodedMessage);
|
||||
store.markFilterAck(hash);
|
||||
store.markStoreAck(hash);
|
||||
|
||||
const toSend = store.getMessagesToSend();
|
||||
expect(toSend.length).to.eq(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe("Messaging", () => {
|
||||
it("queues and sends via light push, marks sent", async () => {
|
||||
const lightPush: ILightPush = {
|
||||
multicodec: "lightpush",
|
||||
start: () => {},
|
||||
stop: () => {},
|
||||
send: sinon.stub().resolves({ successes: [], failures: [] }) as any
|
||||
} as unknown as ILightPush;
|
||||
|
||||
const filter: IFilter = {
|
||||
multicodec: "filter",
|
||||
start: sinon.stub().resolves(),
|
||||
stop: sinon.stub().resolves(),
|
||||
subscribe: sinon.stub().resolves(true),
|
||||
unsubscribe: sinon.stub().resolves(true),
|
||||
unsubscribeAll: sinon.stub()
|
||||
} as unknown as IFilter;
|
||||
|
||||
const store: IStore = {
|
||||
multicodec: "store",
|
||||
createCursor: sinon.stub() as any,
|
||||
queryGenerator: sinon.stub() as any,
|
||||
queryWithOrderedCallback: sinon.stub().resolves(),
|
||||
queryWithPromiseCallback: sinon.stub().resolves()
|
||||
} as unknown as IStore;
|
||||
|
||||
const messaging = new Messaging({
|
||||
lightPush,
|
||||
filter,
|
||||
store,
|
||||
networkConfig: testNetworkconfig
|
||||
});
|
||||
|
||||
const message: ISendMessage = {
|
||||
contentTopic: testContentTopic,
|
||||
payload: utf8ToBytes("hello")
|
||||
};
|
||||
|
||||
await messaging.send(message);
|
||||
expect((lightPush.send as any).calledOnce).to.be.true;
|
||||
});
|
||||
});
|
||||
61
packages/sdk/src/messaging/messaging.ts
Normal file
61
packages/sdk/src/messaging/messaging.ts
Normal file
@ -0,0 +1,61 @@
|
||||
import {
|
||||
IFilter,
|
||||
ILightPush,
|
||||
ISendMessage,
|
||||
IStore,
|
||||
NetworkConfig,
|
||||
RequestId
|
||||
} from "@waku/interfaces";
|
||||
|
||||
import { AckManager } from "./ack_manager.js";
|
||||
import { MessageStore } from "./message_store.js";
|
||||
import { Sender } from "./sender.js";
|
||||
|
||||
interface IMessaging {
|
||||
send(wakuLikeMessage: ISendMessage): Promise<RequestId>;
|
||||
}
|
||||
|
||||
type MessagingConstructorParams = {
|
||||
lightPush: ILightPush;
|
||||
filter: IFilter;
|
||||
store: IStore;
|
||||
networkConfig: NetworkConfig;
|
||||
};
|
||||
|
||||
export class Messaging implements IMessaging {
|
||||
private readonly messageStore: MessageStore;
|
||||
private readonly ackManager: AckManager;
|
||||
private readonly sender: Sender;
|
||||
|
||||
public constructor(params: MessagingConstructorParams) {
|
||||
this.messageStore = new MessageStore();
|
||||
|
||||
this.ackManager = new AckManager({
|
||||
messageStore: this.messageStore,
|
||||
filter: params.filter,
|
||||
store: params.store,
|
||||
networkConfig: params.networkConfig
|
||||
});
|
||||
|
||||
this.sender = new Sender({
|
||||
messageStore: this.messageStore,
|
||||
lightPush: params.lightPush,
|
||||
ackManager: this.ackManager,
|
||||
networkConfig: params.networkConfig
|
||||
});
|
||||
}
|
||||
|
||||
public start(): void {
|
||||
this.ackManager.start();
|
||||
this.sender.start();
|
||||
}
|
||||
|
||||
public async stop(): Promise<void> {
|
||||
await this.ackManager.stop();
|
||||
this.sender.stop();
|
||||
}
|
||||
|
||||
public send(wakuLikeMessage: ISendMessage): Promise<RequestId> {
|
||||
return this.sender.send(wakuLikeMessage);
|
||||
}
|
||||
}
|
||||
144
packages/sdk/src/messaging/sender.spec.ts
Normal file
144
packages/sdk/src/messaging/sender.spec.ts
Normal file
@ -0,0 +1,144 @@
|
||||
import type { ILightPush, ISendMessage, NetworkConfig } from "@waku/interfaces";
|
||||
import { expect } from "chai";
|
||||
import { afterEach, beforeEach, describe, it } from "mocha";
|
||||
import sinon from "sinon";
|
||||
|
||||
import type { AckManager } from "./ack_manager.js";
|
||||
import type { MessageStore } from "./message_store.js";
|
||||
import { Sender } from "./sender.js";
|
||||
|
||||
describe("Sender", () => {
|
||||
let sender: Sender;
|
||||
let mockMessageStore: MessageStore;
|
||||
let mockLightPush: ILightPush;
|
||||
let mockAckManager: AckManager;
|
||||
let mockNetworkConfig: NetworkConfig;
|
||||
|
||||
beforeEach(() => {
|
||||
mockMessageStore = {
|
||||
queue: sinon.stub(),
|
||||
getMessagesToSend: sinon.stub(),
|
||||
markSent: sinon.stub()
|
||||
} as any;
|
||||
|
||||
mockLightPush = {
|
||||
send: sinon.stub()
|
||||
} as any;
|
||||
|
||||
mockAckManager = {
|
||||
subscribe: sinon.stub()
|
||||
} as any;
|
||||
|
||||
mockNetworkConfig = {
|
||||
clusterId: 1,
|
||||
shardId: 0
|
||||
} as any;
|
||||
|
||||
sender = new Sender({
|
||||
messageStore: mockMessageStore,
|
||||
lightPush: mockLightPush,
|
||||
ackManager: mockAckManager,
|
||||
networkConfig: mockNetworkConfig
|
||||
});
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
sinon.restore();
|
||||
});
|
||||
|
||||
describe("constructor", () => {
|
||||
it("should initialize with provided parameters", () => {
|
||||
expect(sender).to.be.instanceOf(Sender);
|
||||
});
|
||||
});
|
||||
|
||||
describe("start", () => {
|
||||
it("should set up background sending interval", () => {
|
||||
const setIntervalSpy = sinon.spy(global, "setInterval");
|
||||
|
||||
sender.start();
|
||||
|
||||
expect(setIntervalSpy.calledWith(sinon.match.func, 1000)).to.be.true;
|
||||
});
|
||||
|
||||
it("should not create multiple intervals when called multiple times", () => {
|
||||
const setIntervalSpy = sinon.spy(global, "setInterval");
|
||||
|
||||
sender.start();
|
||||
sender.start();
|
||||
|
||||
expect(setIntervalSpy.calledOnce).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
describe("stop", () => {
|
||||
it("should clear interval when called", () => {
|
||||
const clearIntervalSpy = sinon.spy(global, "clearInterval");
|
||||
|
||||
sender.start();
|
||||
sender.stop();
|
||||
|
||||
expect(clearIntervalSpy.called).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle multiple stop calls gracefully", () => {
|
||||
const clearIntervalSpy = sinon.spy(global, "clearInterval");
|
||||
|
||||
sender.start();
|
||||
sender.stop();
|
||||
sender.stop();
|
||||
|
||||
expect(clearIntervalSpy.calledOnce).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle stop without start", () => {
|
||||
expect(() => sender.stop()).to.not.throw();
|
||||
});
|
||||
});
|
||||
|
||||
describe("send", () => {
|
||||
const mockMessage: ISendMessage = {
|
||||
contentTopic: "test-topic",
|
||||
payload: new Uint8Array([1, 2, 3]),
|
||||
ephemeral: false
|
||||
};
|
||||
|
||||
const mockRequestId = "test-request-id";
|
||||
|
||||
it("should handle messageStore.queue failure", async () => {
|
||||
const error = new Error("Queue failed");
|
||||
(mockMessageStore.queue as sinon.SinonStub).rejects(error);
|
||||
|
||||
try {
|
||||
await sender.send(mockMessage);
|
||||
expect.fail("Expected error to be thrown");
|
||||
} catch (e: any) {
|
||||
expect(e).to.equal(error);
|
||||
}
|
||||
});
|
||||
|
||||
it("should handle ackManager.subscribe failure", async () => {
|
||||
const error = new Error("Subscribe failed");
|
||||
(mockAckManager.subscribe as sinon.SinonStub).rejects(error);
|
||||
(mockMessageStore.queue as sinon.SinonStub).resolves(mockRequestId);
|
||||
|
||||
try {
|
||||
await sender.send(mockMessage);
|
||||
expect.fail("Expected error to be thrown");
|
||||
} catch (e: any) {
|
||||
expect(e).to.equal(error);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("backgroundSend", () => {
|
||||
it("should handle empty pending messages", async () => {
|
||||
(mockMessageStore.getMessagesToSend as sinon.SinonStub).returns([]);
|
||||
|
||||
await sender["backgroundSend"]();
|
||||
|
||||
expect((mockMessageStore.getMessagesToSend as sinon.SinonStub).called).to
|
||||
.be.true;
|
||||
});
|
||||
});
|
||||
});
|
||||
127
packages/sdk/src/messaging/sender.ts
Normal file
127
packages/sdk/src/messaging/sender.ts
Normal file
@ -0,0 +1,127 @@
|
||||
import { createDecoder, createEncoder } from "@waku/core";
|
||||
import {
|
||||
ILightPush,
|
||||
ISendMessage,
|
||||
NetworkConfig,
|
||||
RequestId
|
||||
} from "@waku/interfaces";
|
||||
import { createRoutingInfo } from "@waku/utils";
|
||||
|
||||
import { AckManager } from "./ack_manager.js";
|
||||
import type { MessageStore } from "./message_store.js";
|
||||
|
||||
type SenderConstructorParams = {
|
||||
messageStore: MessageStore;
|
||||
lightPush: ILightPush;
|
||||
ackManager: AckManager;
|
||||
networkConfig: NetworkConfig;
|
||||
};
|
||||
|
||||
const DEFAULT_SEND_INTERVAL = 1000;
|
||||
|
||||
export class Sender {
|
||||
private readonly messageStore: MessageStore;
|
||||
private readonly lightPush: ILightPush;
|
||||
private readonly ackManager: AckManager;
|
||||
private readonly networkConfig: NetworkConfig;
|
||||
|
||||
private readonly processingRequests: Set<RequestId> = new Set();
|
||||
|
||||
private sendInterval: ReturnType<typeof setInterval> | null = null;
|
||||
|
||||
public constructor(params: SenderConstructorParams) {
|
||||
this.messageStore = params.messageStore;
|
||||
this.lightPush = params.lightPush;
|
||||
this.ackManager = params.ackManager;
|
||||
this.networkConfig = params.networkConfig;
|
||||
}
|
||||
|
||||
public start(): void {
|
||||
if (this.sendInterval) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.sendInterval = setInterval(
|
||||
() => void this.backgroundSend(),
|
||||
DEFAULT_SEND_INTERVAL
|
||||
);
|
||||
}
|
||||
|
||||
public stop(): void {
|
||||
if (this.sendInterval) {
|
||||
clearInterval(this.sendInterval);
|
||||
this.sendInterval = null;
|
||||
}
|
||||
}
|
||||
|
||||
public async send(message: ISendMessage): Promise<RequestId> {
|
||||
const requestId = await this.messageStore.queue(message);
|
||||
|
||||
await this.ackManager.subscribe(message.contentTopic);
|
||||
await this.sendMessage(requestId, message);
|
||||
|
||||
return requestId;
|
||||
}
|
||||
|
||||
private async backgroundSend(): Promise<void> {
|
||||
const pendingRequests = this.messageStore.getMessagesToSend();
|
||||
|
||||
for (const { requestId, message } of pendingRequests) {
|
||||
await this.sendMessage(requestId, message);
|
||||
}
|
||||
}
|
||||
|
||||
private async sendMessage(
|
||||
requestId: RequestId,
|
||||
message: ISendMessage
|
||||
): Promise<void> {
|
||||
try {
|
||||
if (this.processingRequests.has(requestId)) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.processingRequests.add(requestId);
|
||||
|
||||
const encoder = createEncoder({
|
||||
contentTopic: message.contentTopic,
|
||||
routingInfo: createRoutingInfo(this.networkConfig, {
|
||||
contentTopic: message.contentTopic
|
||||
}),
|
||||
ephemeral: message.ephemeral
|
||||
});
|
||||
|
||||
const decoder = createDecoder(
|
||||
message.contentTopic,
|
||||
createRoutingInfo(this.networkConfig, {
|
||||
contentTopic: message.contentTopic
|
||||
})
|
||||
);
|
||||
|
||||
const response = await this.lightPush.send(
|
||||
encoder,
|
||||
{
|
||||
payload: message.payload
|
||||
},
|
||||
{
|
||||
// force no retry as we have retry implemented in the sender
|
||||
autoRetry: false,
|
||||
// send to only one peer as we will retry on failure and need to ensure only one message is in the network
|
||||
numPeersToUse: 1
|
||||
}
|
||||
);
|
||||
|
||||
if (response?.messages && response.messages.length > 0) {
|
||||
const decodedMessage = await decoder.fromProtoObj(
|
||||
decoder.pubsubTopic,
|
||||
response.messages[0]
|
||||
);
|
||||
|
||||
this.messageStore.markSent(requestId, decodedMessage!);
|
||||
} else {
|
||||
// do nothing on failure, will retry
|
||||
}
|
||||
} finally {
|
||||
this.processingRequests.delete(requestId);
|
||||
}
|
||||
}
|
||||
}
|
||||
5
packages/sdk/src/messaging/utils.ts
Normal file
5
packages/sdk/src/messaging/utils.ts
Normal file
@ -0,0 +1,5 @@
|
||||
export interface IAckManager {
|
||||
start(): void;
|
||||
stop(): void;
|
||||
subscribe(contentTopic: string): Promise<boolean>;
|
||||
}
|
||||
@ -26,13 +26,16 @@ import type {
|
||||
import {
|
||||
DefaultNetworkConfig,
|
||||
HealthStatus,
|
||||
Protocols
|
||||
ISendMessage,
|
||||
Protocols,
|
||||
RequestId
|
||||
} from "@waku/interfaces";
|
||||
import { createRoutingInfo, Logger } from "@waku/utils";
|
||||
|
||||
import { Filter } from "../filter/index.js";
|
||||
import { HealthIndicator } from "../health_indicator/index.js";
|
||||
import { LightPush } from "../light_push/index.js";
|
||||
import { Messaging } from "../messaging/index.js";
|
||||
import { PeerManager } from "../peer_manager/index.js";
|
||||
import { Store } from "../store/index.js";
|
||||
|
||||
@ -64,6 +67,7 @@ export class WakuNode implements IWaku {
|
||||
private readonly connectionManager: ConnectionManager;
|
||||
private readonly peerManager: PeerManager;
|
||||
private readonly healthIndicator: HealthIndicator;
|
||||
private messaging: Messaging | null = null;
|
||||
|
||||
public constructor(
|
||||
options: CreateNodeOptions,
|
||||
@ -126,6 +130,15 @@ export class WakuNode implements IWaku {
|
||||
});
|
||||
}
|
||||
|
||||
if (this.lightPush && this.filter && this.store) {
|
||||
this.messaging = new Messaging({
|
||||
lightPush: this.lightPush,
|
||||
filter: this.filter,
|
||||
store: this.store,
|
||||
networkConfig: this.networkConfig
|
||||
});
|
||||
}
|
||||
|
||||
log.info(
|
||||
"Waku node created",
|
||||
peerId,
|
||||
@ -221,6 +234,7 @@ export class WakuNode implements IWaku {
|
||||
this.peerManager.start();
|
||||
this.healthIndicator.start();
|
||||
this.lightPush?.start();
|
||||
this.messaging?.start();
|
||||
|
||||
this._nodeStateLock = false;
|
||||
this._nodeStarted = true;
|
||||
@ -231,6 +245,7 @@ export class WakuNode implements IWaku {
|
||||
|
||||
this._nodeStateLock = true;
|
||||
|
||||
await this.messaging?.stop();
|
||||
this.lightPush?.stop();
|
||||
this.store?.stop();
|
||||
await this.filter?.stop();
|
||||
@ -284,6 +299,14 @@ export class WakuNode implements IWaku {
|
||||
});
|
||||
}
|
||||
|
||||
public send(message: ISendMessage): Promise<RequestId> {
|
||||
if (!this.messaging) {
|
||||
throw new Error("Messaging not initialized");
|
||||
}
|
||||
|
||||
return this.messaging.send(message);
|
||||
}
|
||||
|
||||
private createRoutingInfo(
|
||||
contentTopic?: string,
|
||||
shardId?: number
|
||||
|
||||
@ -12,13 +12,15 @@ import {
|
||||
ILightPush,
|
||||
type IMessage,
|
||||
IRelay,
|
||||
ISendMessage,
|
||||
ISendOptions,
|
||||
IStore,
|
||||
IWaku,
|
||||
IWakuEventEmitter,
|
||||
Libp2p,
|
||||
LightPushSDKResult,
|
||||
Protocols
|
||||
Protocols,
|
||||
RequestId
|
||||
} from "@waku/interfaces";
|
||||
|
||||
export type MockWakuEvents = {
|
||||
@ -155,6 +157,9 @@ export class MockWakuNode implements IWaku {
|
||||
public createEncoder(_params: CreateEncoderParams): IEncoder {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
public send(_message: ISendMessage): Promise<RequestId> {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
public isStarted(): boolean {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user