mirror of
https://github.com/logos-messaging/logos-messaging-js.git
synced 2026-01-14 22:13:09 +00:00
* SDS: pushOutgoingMessage is actually sync * SDS: ensure that `ContentMessage` class is stored in local history with `valueOf` method * feat: introduce reliable channels Easy to use Scalable Data Sync (SDS, e2e reliability) wrapper, that includes: - store queries upon connection to store nodes - store queries to retrieve missing messages * remove `channel` prefix * attempt to improve performance when processing a lot of incoming messages * test: split test file * use index.ts for re-export only. * improve if condition * use getter for isStarted * waku node already auto-start * rename send * fix lightPush.send type post rebase * test: remove extra console.log * SDS: emit messages as missing as soon as they are received * make configurable elapse time for task process * typo * use string instead of enum for event types * ReliableChannel.send returns the message id
167 lines
4.2 KiB
TypeScript
167 lines
4.2 KiB
TypeScript
import { Peer, PeerId, Stream, TypedEventEmitter } from "@libp2p/interface";
|
|
import { MultiaddrInput } from "@multiformats/multiaddr";
|
|
import {
|
|
Callback,
|
|
CreateDecoderParams,
|
|
CreateEncoderParams,
|
|
HealthStatus,
|
|
IDecodedMessage,
|
|
IDecoder,
|
|
IEncoder,
|
|
IFilter,
|
|
ILightPush,
|
|
type IMessage,
|
|
IRelay,
|
|
ISendOptions,
|
|
IStore,
|
|
IWaku,
|
|
IWakuEventEmitter,
|
|
Libp2p,
|
|
LightPushSDKResult,
|
|
Protocols
|
|
} from "@waku/interfaces";
|
|
|
|
export type MockWakuEvents = {
|
|
["new-message"]: CustomEvent<IDecodedMessage>;
|
|
};
|
|
|
|
export class MockWakuNode implements IWaku {
|
|
public relay?: IRelay;
|
|
public store?: IStore;
|
|
public filter?: IFilter;
|
|
public lightPush?: ILightPush;
|
|
public protocols: string[];
|
|
|
|
private readonly subscriptions: {
|
|
decoders: IDecoder<any>[];
|
|
callback: Callback<any>;
|
|
}[];
|
|
|
|
public constructor(
|
|
private mockMessageEmitter?: TypedEventEmitter<MockWakuEvents>
|
|
) {
|
|
this.protocols = [];
|
|
this.events = new TypedEventEmitter();
|
|
this.subscriptions = [];
|
|
|
|
this.lightPush = {
|
|
multicodec: [],
|
|
send: this._send.bind(this),
|
|
start(): void {},
|
|
stop(): void {}
|
|
};
|
|
|
|
this.filter = {
|
|
start: async () => {},
|
|
stop: async () => {},
|
|
multicodec: "filter",
|
|
subscribe: this._subscribe.bind(this),
|
|
unsubscribe<T extends IDecodedMessage>(
|
|
_decoders: IDecoder<T> | IDecoder<T>[]
|
|
): Promise<boolean> {
|
|
throw "Not implemented";
|
|
},
|
|
unsubscribeAll(): void {
|
|
throw "Not implemented";
|
|
}
|
|
};
|
|
}
|
|
|
|
public get libp2p(): Libp2p {
|
|
throw "No libp2p on MockWakuNode";
|
|
}
|
|
|
|
private async _send(
|
|
encoder: IEncoder,
|
|
message: IMessage,
|
|
_sendOptions?: ISendOptions
|
|
): Promise<LightPushSDKResult> {
|
|
for (const { decoders, callback } of this.subscriptions) {
|
|
const protoMessage = await encoder.toProtoObj(message);
|
|
if (!protoMessage) throw "Issue in mock encoding message";
|
|
for (const decoder of decoders) {
|
|
const decodedMessage = await decoder.fromProtoObj(
|
|
decoder.pubsubTopic,
|
|
protoMessage
|
|
);
|
|
if (!decodedMessage) throw "Issue in mock decoding message";
|
|
await callback(decodedMessage);
|
|
if (this.mockMessageEmitter) {
|
|
this.mockMessageEmitter.dispatchEvent(
|
|
new CustomEvent<IDecodedMessage>("new-message", {
|
|
detail: decodedMessage
|
|
})
|
|
);
|
|
}
|
|
}
|
|
}
|
|
return {
|
|
failures: [],
|
|
successes: []
|
|
};
|
|
}
|
|
|
|
private async _subscribe<T extends IDecodedMessage>(
|
|
decoders: IDecoder<T> | IDecoder<T>[],
|
|
callback: Callback<T>
|
|
): Promise<boolean> {
|
|
this.subscriptions.push({
|
|
decoders: Array.isArray(decoders) ? decoders : [decoders],
|
|
callback
|
|
});
|
|
if (this.mockMessageEmitter) {
|
|
this.mockMessageEmitter.addEventListener("new-message", (event) => {
|
|
void callback(event.detail as unknown as T);
|
|
});
|
|
}
|
|
return Promise.resolve(true);
|
|
}
|
|
|
|
public events: IWakuEventEmitter;
|
|
|
|
public get peerId(): PeerId {
|
|
throw "no peerId on MockWakuNode";
|
|
}
|
|
public get health(): HealthStatus {
|
|
throw "no health on MockWakuNode";
|
|
}
|
|
public dial(
|
|
_peer: PeerId | MultiaddrInput,
|
|
_protocols?: Protocols[]
|
|
): Promise<Stream> {
|
|
throw new Error("Method not implemented.");
|
|
}
|
|
public hangUp(_peer: PeerId | MultiaddrInput): Promise<boolean> {
|
|
throw new Error("Method not implemented.");
|
|
}
|
|
public start(): Promise<void> {
|
|
return Promise.resolve();
|
|
}
|
|
public stop(): Promise<void> {
|
|
throw new Error("Method not implemented.");
|
|
}
|
|
public waitForPeers(
|
|
_protocols?: Protocols[],
|
|
_timeoutMs?: number
|
|
): Promise<void> {
|
|
throw new Error("Method not implemented.");
|
|
}
|
|
public createDecoder(
|
|
_params: CreateDecoderParams
|
|
): IDecoder<IDecodedMessage> {
|
|
throw new Error("Method not implemented.");
|
|
}
|
|
public createEncoder(_params: CreateEncoderParams): IEncoder {
|
|
throw new Error("Method not implemented.");
|
|
}
|
|
public isStarted(): boolean {
|
|
throw new Error("Method not implemented.");
|
|
}
|
|
public isConnected(): boolean {
|
|
throw new Error("Method not implemented.");
|
|
}
|
|
public getConnectedPeers(): Promise<Peer[]> {
|
|
throw new Error("Method not implemented.");
|
|
}
|
|
}
|