mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-02-09 16:33:09 +00:00
For an edge node, there is no such thing as a "pubsub topic configuration". An edge node should be able to operate for any possible shard, and it is a per-protocol matter (eg send message with light push). A relay node do subscribe to shards, but in this case, even metadata protocol does not need to advertise them, this is already handled by gossipsub. Only service node should advertise their shards via metadata protocol, which is out of scope for js-waku. # Conflicts: # packages/interfaces/src/connection_manager.ts
134 lines
3.3 KiB
TypeScript
134 lines
3.3 KiB
TypeScript
import type { PeerId } from "@libp2p/interface";
|
|
import { LightPushCore } from "@waku/core";
|
|
import {
|
|
type CoreProtocolResult,
|
|
Failure,
|
|
type IEncoder,
|
|
ILightPush,
|
|
type IMessage,
|
|
type ISendOptions,
|
|
type Libp2p,
|
|
type LightPushProtocolOptions,
|
|
ProtocolError,
|
|
Protocols,
|
|
SDKProtocolResult
|
|
} from "@waku/interfaces";
|
|
import { Logger } from "@waku/utils";
|
|
|
|
import { PeerManager } from "../peer_manager/index.js";
|
|
|
|
import { RetryManager } from "./retry_manager.js";
|
|
|
|
const log = new Logger("sdk:light-push");
|
|
|
|
const DEFAULT_MAX_ATTEMPTS = 3;
|
|
const DEFAULT_SEND_OPTIONS: LightPushProtocolOptions = {
|
|
autoRetry: true,
|
|
retryIntervalMs: 1000,
|
|
maxAttempts: DEFAULT_MAX_ATTEMPTS,
|
|
numPeersToUse: 1
|
|
};
|
|
|
|
type LightPushConstructorParams = {
|
|
peerManager: PeerManager;
|
|
libp2p: Libp2p;
|
|
options?: Partial<LightPushProtocolOptions>;
|
|
};
|
|
|
|
export class LightPush implements ILightPush {
|
|
private readonly config: LightPushProtocolOptions;
|
|
private readonly retryManager: RetryManager;
|
|
private readonly peerManager: PeerManager;
|
|
private readonly protocol: LightPushCore;
|
|
|
|
public constructor(params: LightPushConstructorParams) {
|
|
this.config = {
|
|
...DEFAULT_SEND_OPTIONS,
|
|
...(params.options || {})
|
|
} as LightPushProtocolOptions;
|
|
|
|
this.peerManager = params.peerManager;
|
|
this.protocol = new LightPushCore(params.libp2p);
|
|
this.retryManager = new RetryManager({
|
|
peerManager: params.peerManager,
|
|
retryIntervalMs: this.config.retryIntervalMs
|
|
});
|
|
}
|
|
|
|
public get multicodec(): string {
|
|
return this.protocol.multicodec;
|
|
}
|
|
|
|
public start(): void {
|
|
this.retryManager.start();
|
|
}
|
|
|
|
public stop(): void {
|
|
this.retryManager.stop();
|
|
}
|
|
|
|
public async send(
|
|
encoder: IEncoder,
|
|
message: IMessage,
|
|
options: ISendOptions = {}
|
|
): Promise<SDKProtocolResult> {
|
|
options = {
|
|
...this.config,
|
|
...options
|
|
};
|
|
|
|
const { pubsubTopic } = encoder;
|
|
|
|
log.info("send: attempting to send a message to pubsubTopic:", pubsubTopic);
|
|
|
|
const peerIds = await this.peerManager.getPeers({
|
|
protocol: Protocols.LightPush,
|
|
pubsubTopic: encoder.pubsubTopic
|
|
});
|
|
|
|
const coreResults: CoreProtocolResult[] =
|
|
peerIds?.length > 0
|
|
? await Promise.all(
|
|
peerIds.map((peerId) =>
|
|
this.protocol.send(encoder, message, peerId).catch((_e) => ({
|
|
success: null,
|
|
failure: {
|
|
error: ProtocolError.GENERIC_FAIL
|
|
}
|
|
}))
|
|
)
|
|
)
|
|
: [];
|
|
|
|
const results: SDKProtocolResult = coreResults.length
|
|
? {
|
|
successes: coreResults
|
|
.filter((v) => v.success)
|
|
.map((v) => v.success) as PeerId[],
|
|
failures: coreResults
|
|
.filter((v) => v.failure)
|
|
.map((v) => v.failure) as Failure[]
|
|
}
|
|
: {
|
|
successes: [],
|
|
failures: [
|
|
{
|
|
error: ProtocolError.NO_PEER_AVAILABLE
|
|
}
|
|
]
|
|
};
|
|
|
|
if (options.autoRetry && results.successes.length === 0) {
|
|
const sendCallback = (peerId: PeerId): Promise<CoreProtocolResult> =>
|
|
this.protocol.send(encoder, message, peerId);
|
|
this.retryManager.push(
|
|
sendCallback.bind(this),
|
|
options.maxAttempts || DEFAULT_MAX_ATTEMPTS,
|
|
encoder.pubsubTopic
|
|
);
|
|
}
|
|
|
|
return results;
|
|
}
|
|
}
|