220 lines
6.0 KiB
TypeScript
Raw Normal View History

2021-04-22 14:47:43 +10:00
import {
2022-06-21 13:23:42 +10:00
GossipSub,
GossipsubMessage,
GossipsubOpts,
} from "@chainsafe/libp2p-gossipsub";
import {
PeerIdStr,
TopicStr,
} from "@chainsafe/libp2p-gossipsub/dist/src/types";
2022-06-21 13:23:42 +10:00
import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types";
import debug from "debug";
2022-05-30 15:01:57 +10:00
import { DefaultPubSubTopic } from "../constants";
2022-02-14 10:50:02 +11:00
import { hexToBytes } from "../utils";
2022-05-30 15:01:57 +10:00
import { CreateOptions } from "../waku";
2022-02-04 14:12:00 +11:00
import { DecryptionMethod, WakuMessage } from "../waku_message";
2022-02-04 14:12:00 +11:00
import * as constants from "./constants";
2022-02-04 14:12:00 +11:00
const dbg = debug("waku:relay");
2021-05-10 11:41:13 +10:00
/**
* Implements the [Waku v2 Relay protocol]{@link https://rfc.vac.dev/spec/11/}.
* Must be passed as a `pubsub` module to a {Libp2p} instance.
*
2021-07-15 12:16:21 +10:00
* @implements {require('libp2p-interfaces/src/pubsub')}
* @noInheritDoc
2021-05-10 11:41:13 +10:00
*/
2022-06-21 13:23:42 +10:00
export class WakuRelay extends GossipSub {
pubSubTopic: string;
2022-06-21 13:23:42 +10:00
public static multicodec: string = constants.RelayCodecs[0];
public decryptionKeys: Map<
Uint8Array,
{ method?: DecryptionMethod; contentTopics?: string[] }
>;
/**
* observers called when receiving new message.
* Observers under key `""` are always called.
*/
public observers: {
[contentTopic: string]: Set<(message: WakuMessage) => void>;
};
2022-06-21 13:23:42 +10:00
constructor(options?: Partial<CreateOptions & GossipsubOpts>) {
2021-04-22 14:47:43 +10:00
super(
2022-06-21 13:23:42 +10:00
Object.assign(options, {
2021-04-22 14:47:43 +10:00
// Ensure that no signature is included nor expected in the messages.
globalSignaturePolicy: SignaturePolicy.StrictNoSign,
2022-06-21 13:23:42 +10:00
fallbackToFloodsub: false,
2021-04-22 14:47:43 +10:00
})
);
2022-06-21 13:23:42 +10:00
this.multicodecs = constants.RelayCodecs;
this.observers = {};
this.decryptionKeys = new Map();
this.pubSubTopic = options?.pubSubTopic || DefaultPubSubTopic;
options?.decryptionKeys?.forEach((key) => {
this.addDecryptionKey(key);
});
}
/**
* Mounts the gossipsub protocol onto the libp2p node
2021-05-10 11:41:13 +10:00
* and subscribes to the default topic.
*
* @override
* @returns {void}
*/
public async start(): Promise<void> {
await super.start();
this.subscribe(this.pubSubTopic);
}
/**
2021-05-10 11:41:13 +10:00
* Send Waku message.
*
* @param {WakuMessage} message
* @returns {Promise<void>}
*/
2021-05-10 11:41:13 +10:00
public async send(message: WakuMessage): Promise<void> {
2021-05-03 16:26:02 +10:00
const msg = message.encode();
2022-06-21 13:23:42 +10:00
await this.publish(this.pubSubTopic, msg);
}
/**
2021-09-01 12:35:55 +10:00
* Register a decryption key to attempt decryption of received messages.
* This can either be a private key for asymmetric encryption or a symmetric
* key. `WakuRelay` will attempt to decrypt messages using both methods.
*
* Strings must be in hex format.
*/
addDecryptionKey(
key: Uint8Array | string,
options?: { method?: DecryptionMethod; contentTopics?: string[] }
): void {
2022-02-14 10:50:02 +11:00
this.decryptionKeys.set(hexToBytes(key), options ?? {});
}
/**
2021-09-01 12:35:55 +10:00
* Delete a decryption key that was used to attempt decryption of received
* messages.
*
* Strings must be in hex format.
*/
deleteDecryptionKey(key: Uint8Array | string): void {
2022-02-14 10:50:02 +11:00
this.decryptionKeys.delete(hexToBytes(key));
}
/**
* Register an observer of new messages received via waku relay
*
* @param callback called when a new message is received via waku relay
* @param contentTopics Content Topics for which the callback with be called,
* all of them if undefined, [] or ["",..] is passed.
* @returns {void}
*/
addObserver(
callback: (message: WakuMessage) => void,
contentTopics: string[] = []
): void {
if (contentTopics.length === 0) {
2022-02-04 14:12:00 +11:00
if (!this.observers[""]) {
this.observers[""] = new Set();
}
2022-02-04 14:12:00 +11:00
this.observers[""].add(callback);
} else {
contentTopics.forEach((contentTopic) => {
if (!this.observers[contentTopic]) {
this.observers[contentTopic] = new Set();
}
this.observers[contentTopic].add(callback);
});
}
}
/**
* Remove an observer of new messages received via waku relay.
* Useful to ensure the same observer is not registered several time
* (e.g when loading React components)
*/
deleteObserver(
callback: (message: WakuMessage) => void,
contentTopics: string[] = []
): void {
if (contentTopics.length === 0) {
2022-02-04 14:12:00 +11:00
if (this.observers[""]) {
this.observers[""].delete(callback);
}
} else {
contentTopics.forEach((contentTopic) => {
if (this.observers[contentTopic]) {
this.observers[contentTopic].delete(callback);
}
});
}
}
/**
* Subscribe to a pubsub topic and start emitting Waku messages to observers.
*
* @override
*/
subscribe(pubSubTopic: string): void {
2022-06-21 13:23:42 +10:00
this.addEventListener(
"gossipsub:message",
(event: CustomEvent<GossipsubMessage>) => {
if (event.detail.msg.topic === pubSubTopic) {
const decryptionKeys = Array.from(this.decryptionKeys).map(
([key, { method, contentTopics }]) => {
return {
key,
method,
contentTopics,
};
}
2022-06-21 13:23:42 +10:00
);
dbg(`Message received on ${pubSubTopic}`);
WakuMessage.decode(event.detail.msg.data, decryptionKeys)
.then((wakuMsg) => {
if (!wakuMsg) {
dbg("Failed to decode Waku Message");
return;
}
if (this.observers[""]) {
this.observers[""].forEach((callbackFn) => {
callbackFn(wakuMsg);
});
}
if (wakuMsg.contentTopic) {
if (this.observers[wakuMsg.contentTopic]) {
this.observers[wakuMsg.contentTopic].forEach((callbackFn) => {
callbackFn(wakuMsg);
});
}
}
})
.catch((e) => {
dbg("Failed to decode Waku Message", e);
});
}
}
2022-06-21 13:23:42 +10:00
);
2022-06-21 13:23:42 +10:00
super.subscribe(pubSubTopic);
}
getMeshPeers(topic?: TopicStr): PeerIdStr[] {
return super.getMeshPeers(topic ?? DefaultPubSubTopic);
}
2022-06-21 13:23:42 +10:00
// TODO: Implement method that uses Relay codec
// public async heartbeat(): Promise<void> {
}