mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-01-09 09:13:10 +00:00
* feat: implement LightPush v3 protocol support Add comprehensive LightPush v3 protocol implementation with: Core Features: - LightPush v3 protocol codec and multicodec detection - Status code-based error handling and validation - Protocol version inference and compatibility layers - Enhanced error types with detailed failure information Protocol Support: - Automatic v3/v2 protocol negotiation and fallback - Status code mapping to LightPush error types - Protocol version tracking in SDK results - Mixed protocol environment support Testing Infrastructure: - Comprehensive v3 error code handling tests - Mock functions for v3/v2 response scenarios - Protocol version detection and validation tests - Backward compatibility verification Implementation Details: - Clean separation between v2 and v3 response handling - Type-safe status code validation with isSuccess helper - Enhanced failure reporting with protocol version context - Proper error propagation through SDK layers This implementation maintains full backward compatibility with v2 while providing enhanced functionality for v3 protocol features. * feat: handle both light push protocols * fix: unsubscribe test * feat: consolidate lpv2/v3 types * feat(tests): bump nwaku to 0.36.0 * fix: remove extraneous exports * fix: add delay to tests * fix: remove protocol result types * feat: consolidate light push codec branching * fix: revert nwaku image * fix: remove multicodec * fix: remove protocolversion * feat: simplify v2/v3 branching logic to use two stream managers * fix: remove unused utils * fix: remove comments * fix: revert store test * fix: cleanup lightpush sdk * fix: remove unused util * fix: remove unused exports * fix: rename file from public to protocol_handler * fix: use proper type for sdk result * fix: update return types in filter * fix: rebase against latest master * fix: use both lightpush codecs when waiting for peer * fix: handle both lp codecs * fix: remove unused code * feat: use array for multicodec fields * fix: add timestamp if missing in v3 rpc * fix: resolve on either lp codec when waiting for peer * fix: remove unused util * fix: remove unnecessary abstraction * feat: accept nwaku docker image as arg, test lp backwards compat * fix: revert filter error * feat: add legacy flag to enable lightpushv2 only * Revert "feat: accept nwaku docker image as arg, test lp backwards compat" This reverts commit 857e12cbc73305e5c51abd057665bd34708b2737. * fix: remove unused test * feat: improve lp3 (#2597) * improve light push core * move back to singualar multicodec property, enable array prop only for light push * implement v2/v3 interop e2e test, re-add useLegacy flag, ensure e2e runs for v2 and v3 * fix v2 v3 condition * generate message package earlier * add log, fix condition --------- Co-authored-by: Sasha <118575614+weboko@users.noreply.github.com> Co-authored-by: Sasha <oleksandr@status.im>
339 lines
9.3 KiB
TypeScript
339 lines
9.3 KiB
TypeScript
import {
|
|
GossipSub,
|
|
GossipSubComponents,
|
|
GossipsubMessage,
|
|
GossipsubOpts
|
|
} from "@chainsafe/libp2p-gossipsub";
|
|
import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types";
|
|
import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types";
|
|
import type { PubSub as Libp2pPubsub } from "@libp2p/interface";
|
|
import { sha256 } from "@noble/hashes/sha256";
|
|
import {
|
|
Callback,
|
|
CreateNodeOptions,
|
|
IAsyncIterator,
|
|
IDecodedMessage,
|
|
IDecoder,
|
|
IEncoder,
|
|
IMessage,
|
|
IRelay,
|
|
type IRoutingInfo,
|
|
Libp2p,
|
|
LightPushError,
|
|
LightPushSDKResult,
|
|
PubsubTopic
|
|
} from "@waku/interfaces";
|
|
import { isWireSizeUnderCap, toAsyncIterator } from "@waku/utils";
|
|
import { pushOrInitMapSet } from "@waku/utils";
|
|
import { Logger } from "@waku/utils";
|
|
import { pEvent } from "p-event";
|
|
|
|
import { RelayCodecs } from "./constants.js";
|
|
import { messageValidator } from "./message_validator.js";
|
|
import { ContentTopicOnlyDecoder } from "./topic_only_message.js";
|
|
|
|
const log = new Logger("relay");
|
|
|
|
export type Observer<T extends IDecodedMessage> = {
|
|
decoder: IDecoder<T>;
|
|
callback: Callback<T>;
|
|
};
|
|
|
|
export type RelayCreateOptions = CreateNodeOptions & {
|
|
routingInfos: IRoutingInfo[];
|
|
} & Partial<GossipsubOpts>;
|
|
export type ContentTopic = string;
|
|
|
|
type ActiveSubscriptions = Map<PubsubTopic, ContentTopic[]>;
|
|
|
|
type RelayConstructorParams = {
|
|
libp2p: Libp2p;
|
|
pubsubTopics: PubsubTopic[];
|
|
};
|
|
|
|
/**
|
|
* Implements the [Waku v2 Relay protocol](https://rfc.vac.dev/spec/11/).
|
|
* Throws if libp2p.pubsub does not support Waku Relay
|
|
*/
|
|
export class Relay implements IRelay {
|
|
public pubsubTopics: Set<PubsubTopic>;
|
|
private defaultDecoder: IDecoder<IDecodedMessage>;
|
|
|
|
public static multicodec: string = RelayCodecs[0];
|
|
public readonly gossipSub: GossipSub;
|
|
|
|
/**
|
|
* observers called when receiving new message.
|
|
* Observers under key `""` are always called.
|
|
*/
|
|
private observers: Map<PubsubTopic, Map<ContentTopic, Set<unknown>>>;
|
|
|
|
public constructor(params: RelayConstructorParams) {
|
|
if (!this.isRelayPubsub(params.libp2p.services.pubsub)) {
|
|
throw Error(
|
|
`Failed to initialize Relay. libp2p.pubsub does not support ${Relay.multicodec}`
|
|
);
|
|
}
|
|
|
|
this.gossipSub = params.libp2p.services.pubsub as GossipSub;
|
|
|
|
this.pubsubTopics = new Set(params.pubsubTopics);
|
|
|
|
if (this.gossipSub.isStarted()) {
|
|
this.subscribeToAllTopics();
|
|
}
|
|
|
|
this.observers = new Map();
|
|
|
|
// TODO: User might want to decide what decoder should be used (e.g. for RLN)
|
|
this.defaultDecoder = new ContentTopicOnlyDecoder();
|
|
}
|
|
|
|
/**
|
|
* Mounts the gossipsub protocol onto the libp2p node
|
|
* and subscribes to all the topics.
|
|
*
|
|
* @override
|
|
* @returns {void}
|
|
*/
|
|
public async start(): Promise<void> {
|
|
if (this.gossipSub.isStarted()) {
|
|
throw Error("GossipSub already started.");
|
|
}
|
|
|
|
await this.gossipSub.start();
|
|
this.subscribeToAllTopics();
|
|
}
|
|
|
|
/**
|
|
* Wait for at least one peer with the given protocol to be connected and in the gossipsub
|
|
* mesh for all pubsubTopics.
|
|
*/
|
|
public async waitForPeers(): Promise<void> {
|
|
let peers = this.getMeshPeers();
|
|
const pubsubTopics = this.pubsubTopics;
|
|
|
|
for (const topic of pubsubTopics) {
|
|
while (peers.length == 0) {
|
|
await pEvent(this.gossipSub, "gossipsub:heartbeat");
|
|
peers = this.getMeshPeers(topic);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Send Waku message.
|
|
*/
|
|
public async send(
|
|
encoder: IEncoder,
|
|
message: IMessage
|
|
): Promise<LightPushSDKResult> {
|
|
const { pubsubTopic } = encoder;
|
|
if (!this.pubsubTopics.has(pubsubTopic)) {
|
|
log.error("Failed to send waku relay: topic not configured");
|
|
return {
|
|
successes: [],
|
|
failures: [
|
|
{
|
|
error: LightPushError.TOPIC_NOT_CONFIGURED
|
|
}
|
|
]
|
|
};
|
|
}
|
|
|
|
const msg = await encoder.toWire(message);
|
|
if (!msg) {
|
|
log.error("Failed to encode message, aborting publish");
|
|
return {
|
|
successes: [],
|
|
failures: [
|
|
{
|
|
error: LightPushError.ENCODE_FAILED
|
|
}
|
|
]
|
|
};
|
|
}
|
|
|
|
if (!isWireSizeUnderCap(msg)) {
|
|
log.error("Failed to send waku relay: message is bigger that 1MB");
|
|
return {
|
|
successes: [],
|
|
failures: [
|
|
{
|
|
error: LightPushError.SIZE_TOO_BIG
|
|
}
|
|
]
|
|
};
|
|
}
|
|
|
|
const { recipients } = await this.gossipSub.publish(pubsubTopic, msg);
|
|
return {
|
|
successes: recipients,
|
|
failures: []
|
|
};
|
|
}
|
|
|
|
public subscribeWithUnsubscribe<T extends IDecodedMessage>(
|
|
decoders: IDecoder<T> | IDecoder<T>[],
|
|
callback: Callback<T>
|
|
): () => void {
|
|
const observers: Array<[PubsubTopic, Observer<T>]> = [];
|
|
|
|
for (const decoder of Array.isArray(decoders) ? decoders : [decoders]) {
|
|
const { pubsubTopic } = decoder;
|
|
const ctObs: Map<ContentTopic, Set<Observer<T>>> = this.observers.get(
|
|
pubsubTopic
|
|
) ?? new Map();
|
|
const observer = { pubsubTopic, decoder, callback };
|
|
pushOrInitMapSet(ctObs, decoder.contentTopic, observer);
|
|
|
|
this.observers.set(pubsubTopic, ctObs);
|
|
observers.push([pubsubTopic, observer]);
|
|
}
|
|
|
|
return () => {
|
|
this.removeObservers(observers);
|
|
};
|
|
}
|
|
|
|
public subscribe = this.subscribeWithUnsubscribe;
|
|
|
|
private removeObservers<T extends IDecodedMessage>(
|
|
observers: Array<[PubsubTopic, Observer<T>]>
|
|
): void {
|
|
for (const [pubsubTopic, observer] of observers) {
|
|
const ctObs = this.observers.get(pubsubTopic);
|
|
if (!ctObs) continue;
|
|
|
|
const contentTopic = observer.decoder.contentTopic;
|
|
const _obs = ctObs.get(contentTopic);
|
|
if (!_obs) continue;
|
|
|
|
_obs.delete(observer);
|
|
ctObs.set(contentTopic, _obs);
|
|
this.observers.set(pubsubTopic, ctObs);
|
|
}
|
|
}
|
|
|
|
public toSubscriptionIterator<T extends IDecodedMessage>(
|
|
decoders: IDecoder<T> | IDecoder<T>[]
|
|
): Promise<IAsyncIterator<T>> {
|
|
return toAsyncIterator(this, decoders);
|
|
}
|
|
|
|
public getActiveSubscriptions(): ActiveSubscriptions {
|
|
const map = new Map();
|
|
for (const pubsubTopic of this.pubsubTopics) {
|
|
map.set(pubsubTopic, Array.from(this.observers.keys()));
|
|
}
|
|
return map;
|
|
}
|
|
|
|
public getMeshPeers(topic?: TopicStr): PeerIdStr[] {
|
|
// if no TopicStr is provided - returns empty array
|
|
return this.gossipSub.getMeshPeers(topic || "");
|
|
}
|
|
|
|
private subscribeToAllTopics(): void {
|
|
for (const pubsubTopic of this.pubsubTopics) {
|
|
this.gossipSubSubscribe(pubsubTopic);
|
|
}
|
|
}
|
|
|
|
private async processIncomingMessage<T extends IDecodedMessage>(
|
|
pubsubTopic: string,
|
|
bytes: Uint8Array
|
|
): Promise<void> {
|
|
const contentTopicOnlyMsg =
|
|
await this.defaultDecoder.fromWireToProtoObj(bytes);
|
|
if (!contentTopicOnlyMsg || !contentTopicOnlyMsg.contentTopic) {
|
|
log.warn("Message does not have a content topic, skipping");
|
|
return;
|
|
}
|
|
|
|
// Retrieve the map of content topics for the given pubsubTopic
|
|
const contentTopicMap = this.observers.get(pubsubTopic);
|
|
if (!contentTopicMap) {
|
|
return;
|
|
}
|
|
|
|
// Retrieve the set of observers for the given contentTopic
|
|
const observers = contentTopicMap.get(
|
|
contentTopicOnlyMsg.contentTopic
|
|
) as Set<Observer<T>>;
|
|
if (!observers) {
|
|
return;
|
|
}
|
|
|
|
await Promise.all(
|
|
Array.from(observers).map(({ decoder, callback }) => {
|
|
return (async () => {
|
|
try {
|
|
const protoMsg = await decoder.fromWireToProtoObj(bytes);
|
|
if (!protoMsg) {
|
|
log.error(
|
|
"Internal error: message previously decoded failed on 2nd pass."
|
|
);
|
|
return;
|
|
}
|
|
const msg = await decoder.fromProtoObj(pubsubTopic, protoMsg);
|
|
if (msg) {
|
|
await callback(msg);
|
|
} else {
|
|
log.error(
|
|
"Failed to decode messages on",
|
|
contentTopicOnlyMsg.contentTopic
|
|
);
|
|
}
|
|
} catch (error) {
|
|
log.error("Error while decoding message:", error);
|
|
}
|
|
})();
|
|
})
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Subscribe to a pubsub topic and start emitting Waku messages to observers.
|
|
*
|
|
* @override
|
|
*/
|
|
private gossipSubSubscribe(pubsubTopic: string): void {
|
|
this.gossipSub.addEventListener(
|
|
"gossipsub:message",
|
|
(event: CustomEvent<GossipsubMessage>) => {
|
|
if (event.detail.msg.topic !== pubsubTopic) return;
|
|
|
|
this.processIncomingMessage(
|
|
event.detail.msg.topic,
|
|
event.detail.msg.data
|
|
).catch((e) => log.error("Failed to process incoming message", e));
|
|
}
|
|
);
|
|
|
|
this.gossipSub.topicValidators.set(pubsubTopic, messageValidator);
|
|
this.gossipSub.subscribe(pubsubTopic);
|
|
}
|
|
|
|
private isRelayPubsub(pubsub: Libp2pPubsub | undefined): boolean {
|
|
return pubsub?.multicodecs?.includes(Relay.multicodec) ?? false;
|
|
}
|
|
}
|
|
|
|
export function wakuGossipSub(
|
|
init: Partial<RelayCreateOptions> = {}
|
|
): (components: GossipSubComponents) => GossipSub {
|
|
return (components: GossipSubComponents) => {
|
|
init = {
|
|
...init,
|
|
msgIdFn: ({ data }) => sha256(data),
|
|
// Ensure that no signature is included nor expected in the messages.
|
|
globalSignaturePolicy: SignaturePolicy.StrictNoSign,
|
|
fallbackToFloodsub: false
|
|
};
|
|
const pubsub = new GossipSub(components, init);
|
|
pubsub.multicodecs = RelayCodecs;
|
|
return pubsub;
|
|
};
|
|
}
|