feat!: add and implement IReceiver (#1219)

- remove extend Relay by GossipSub and use it as public property;
- detach GossipSub initialisation;
This commit is contained in:
Sasha 2023-03-31 03:17:41 +02:00 committed by GitHub
parent e8f750fa2b
commit e11e5b4870
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 224 additions and 121 deletions

View File

@ -19,7 +19,11 @@ export * as waku_light_push from "./lib/light_push/index.js";
export { wakuLightPush, LightPushCodec } from "./lib/light_push/index.js";
export * as waku_relay from "./lib/relay/index.js";
export { wakuRelay, RelayCreateOptions } from "./lib/relay/index.js";
export {
wakuRelay,
RelayCreateOptions,
wakuGossipSub,
} from "./lib/relay/index.js";
export * as waku_store from "./lib/store/index.js";
export {

View File

@ -2,6 +2,7 @@ import type { Libp2p } from "@libp2p/interface-libp2p";
import type { Peer } from "@libp2p/interface-peer-store";
import type { IncomingStreamData } from "@libp2p/interface-registrar";
import type {
ActiveSubscriptions,
Callback,
IDecodedMessage,
IDecoder,
@ -58,19 +59,20 @@ class Filter extends BaseProtocol implements IFilter {
}
/**
* @param decoders Array of Decoders to use to decode messages, it also specifies the content topics.
* @param decoders Decoder or array of Decoders to use to decode messages, it also specifies the content topics.
* @param callback A function that will be called on each message returned by the filter.
* @param opts The FilterSubscriptionOpts used to narrow which messages are returned, and which peer to connect to.
* @returns Unsubscribe function that can be used to end the subscription.
*/
async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
opts?: ProtocolOptions
): Promise<UnsubscribeFunction> {
const decodersArray = Array.isArray(decoders) ? decoders : [decoders];
const { pubSubTopic = DefaultPubSubTopic } = this.options;
const contentTopics = Array.from(groupByContentTopic(decoders).keys());
const contentTopics = Array.from(groupByContentTopic(decodersArray).keys());
const contentFilters = contentTopics.map((contentTopic) => ({
contentTopic,
@ -109,7 +111,11 @@ class Filter extends BaseProtocol implements IFilter {
throw e;
}
const subscription: Subscription<T> = { callback, decoders, pubSubTopic };
const subscription: Subscription<T> = {
callback,
decoders: decodersArray,
pubSubTopic,
};
this.subscriptions.set(requestId, subscription);
return async () => {
@ -118,6 +124,22 @@ class Filter extends BaseProtocol implements IFilter {
};
}
public getActiveSubscriptions(): ActiveSubscriptions {
const map: ActiveSubscriptions = new Map();
const subscriptions = this.subscriptions as Map<
RequestID,
Subscription<IDecodedMessage>
>;
for (const item of subscriptions.values()) {
const values = map.get(item.pubSubTopic) || [];
const nextValues = item.decoders.map((decoder) => decoder.contentTopic);
map.set(item.pubSubTopic, [...values, ...nextValues]);
}
return map;
}
private onRequest(streamData: IncomingStreamData): void {
log("Receiving message push");
try {

View File

@ -6,6 +6,8 @@ import {
} from "@chainsafe/libp2p-gossipsub";
import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types";
import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types";
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PubSub } from "@libp2p/interface-pubsub";
import type {
ActiveSubscriptions,
Callback,
@ -20,8 +22,8 @@ import type {
import debug from "debug";
import { DefaultPubSubTopic } from "../constants.js";
import { groupByContentTopic } from "../group_by.js";
import { TopicOnlyDecoder } from "../message/topic_only_message.js";
import { pushOrInitMapSet } from "../push_or_init_map.js";
import * as constants from "./constants.js";
import { messageValidator } from "./message_validator.js";
@ -38,14 +40,14 @@ export type ContentTopic = string;
/**
* Implements the [Waku v2 Relay protocol](https://rfc.vac.dev/spec/11/).
* Must be passed as a `pubsub` module to a `Libp2p` instance.
*
* @implements {require('libp2p-interfaces/src/pubsub')}
* Throws if libp2p.pubsub does not support Waku Relay
*/
class Relay extends GossipSub implements IRelay {
class Relay implements IRelay {
private readonly pubSubTopic: string;
defaultDecoder: IDecoder<IDecodedMessage>;
private defaultDecoder: IDecoder<IDecodedMessage>;
public static multicodec: string = constants.RelayCodecs[0];
public readonly gossipSub: GossipSub;
/**
* observers called when receiving new message.
@ -53,21 +55,20 @@ class Relay extends GossipSub implements IRelay {
*/
private observers: Map<ContentTopic, Set<unknown>>;
constructor(
components: GossipSubComponents,
options?: Partial<RelayCreateOptions>
) {
options = Object.assign(options ?? {}, {
// Ensure that no signature is included nor expected in the messages.
globalSignaturePolicy: SignaturePolicy.StrictNoSign,
fallbackToFloodsub: false,
});
super(components, options);
this.multicodecs = constants.RelayCodecs;
constructor(libp2p: Libp2p, options?: Partial<RelayCreateOptions>) {
if (!this.isRelayPubSub(libp2p.pubsub)) {
throw Error(
`Failed to initialize Relay. libp2p.pubsub does not support ${Relay.multicodec}`
);
}
this.gossipSub = libp2p.pubsub as GossipSub;
this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic;
if (this.gossipSub.isStarted()) {
this.gossipSubSubscribe(this.pubSubTopic);
}
this.observers = new Map();
// TODO: User might want to decide what decoder should be used (e.g. for RLN)
@ -82,8 +83,12 @@ class Relay extends GossipSub implements IRelay {
* @returns {void}
*/
public async start(): Promise<void> {
await super.start();
this.subscribe(this.pubSubTopic);
if (this.gossipSub.isStarted()) {
throw Error("GossipSub already started.");
}
await this.gossipSub.start();
this.gossipSubSubscribe(this.pubSubTopic);
}
/**
@ -96,7 +101,7 @@ class Relay extends GossipSub implements IRelay {
return { recipients: [] };
}
return this.publish(this.pubSubTopic, msg);
return this.gossipSub.publish(this.pubSubTopic, msg);
}
/**
@ -104,22 +109,38 @@ class Relay extends GossipSub implements IRelay {
*
* @returns Function to delete the observer
*/
addObserver<T extends IDecodedMessage>(
decoder: IDecoder<T>,
public subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): () => void {
const observer = {
decoder,
callback,
};
const contentTopic = decoder.contentTopic;
const contentTopicToObservers = Array.isArray(decoders)
? toObservers(decoders, callback)
: toObservers([decoders], callback);
pushOrInitMapSet(this.observers, contentTopic, observer);
for (const contentTopic of contentTopicToObservers.keys()) {
const currObservers = this.observers.get(contentTopic) || new Set();
const newObservers =
contentTopicToObservers.get(contentTopic) || new Set();
this.observers.set(contentTopic, union(currObservers, newObservers));
}
return () => {
const observers = this.observers.get(contentTopic);
if (observers) {
observers.delete(observer);
for (const contentTopic of contentTopicToObservers.keys()) {
const currentObservers = this.observers.get(contentTopic) || new Set();
const observersToRemove =
contentTopicToObservers.get(contentTopic) || new Set();
const nextObservers = leftMinusJoin(
currentObservers,
observersToRemove
);
if (nextObservers.size) {
this.observers.set(contentTopic, nextObservers);
} else {
this.observers.delete(contentTopic);
}
}
};
}
@ -130,6 +151,10 @@ class Relay extends GossipSub implements IRelay {
return map;
}
public getMeshPeers(topic?: TopicStr): PeerIdStr[] {
return this.gossipSub.getMeshPeers(topic ?? this.pubSubTopic);
}
private async processIncomingMessage<T extends IDecodedMessage>(
pubSubTopic: string,
bytes: Uint8Array
@ -168,8 +193,8 @@ class Relay extends GossipSub implements IRelay {
*
* @override
*/
subscribe(pubSubTopic: string): void {
this.addEventListener(
private gossipSubSubscribe(pubSubTopic: string): void {
this.gossipSub.addEventListener(
"gossipsub:message",
async (event: CustomEvent<GossipsubMessage>) => {
if (event.detail.msg.topic !== pubSubTopic) return;
@ -182,24 +207,76 @@ class Relay extends GossipSub implements IRelay {
}
);
this.topicValidators.set(pubSubTopic, messageValidator);
super.subscribe(pubSubTopic);
this.gossipSub.topicValidators.set(pubSubTopic, messageValidator);
this.gossipSub.subscribe(pubSubTopic);
}
unsubscribe(pubSubTopic: TopicStr): void {
super.unsubscribe(pubSubTopic);
this.topicValidators.delete(pubSubTopic);
}
getMeshPeers(topic?: TopicStr): PeerIdStr[] {
return super.getMeshPeers(topic ?? this.pubSubTopic);
private isRelayPubSub(pubsub: PubSub): boolean {
return pubsub?.multicodecs?.includes(Relay.multicodec) || false;
}
}
Relay.multicodec = constants.RelayCodecs[constants.RelayCodecs.length - 1];
export function wakuRelay(
init: Partial<RelayCreateOptions> = {}
): (components: GossipSubComponents) => IRelay {
return (components: GossipSubComponents) => new Relay(components, init);
init: Partial<ProtocolCreateOptions> = {}
): (libp2p: Libp2p) => IRelay {
return (libp2p: Libp2p) => new Relay(libp2p, init);
}
export function wakuGossipSub(
init: Partial<RelayCreateOptions> = {}
): (components: GossipSubComponents) => GossipSub {
return (components: GossipSubComponents) => {
init = {
...init,
// Ensure that no signature is included nor expected in the messages.
globalSignaturePolicy: SignaturePolicy.StrictNoSign,
fallbackToFloodsub: false,
};
const pubsub = new GossipSub(components, init);
pubsub.multicodecs = constants.RelayCodecs;
return pubsub;
};
}
function toObservers<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: Callback<T>
): Map<ContentTopic, Set<Observer<T>>> {
const contentTopicToDecoders = Array.from(
groupByContentTopic(decoders).entries()
);
const contentTopicToObserversEntries = contentTopicToDecoders.map(
([contentTopic, decoders]) =>
[
contentTopic,
new Set(
decoders.map(
(decoder) =>
({
decoder,
callback,
} as Observer<T>)
)
),
] as [ContentTopic, Set<Observer<T>>]
);
return new Map(contentTopicToObserversEntries);
}
function union(left: Set<unknown>, right: Set<unknown>): Set<unknown> {
for (const val of right.values()) {
left.add(val);
}
return left;
}
function leftMinusJoin(left: Set<unknown>, right: Set<unknown>): Set<unknown> {
for (const val of right.values()) {
if (left.has(val)) {
left.delete(val);
}
}
return left;
}

View File

@ -105,7 +105,7 @@ async function waitForGossipSubPeerInMesh(waku: IRelay): Promise<void> {
let peers = waku.getMeshPeers();
while (peers.length == 0) {
await pEvent(waku, "gossipsub:heartbeat");
await pEvent(waku.gossipSub, "gossipsub:heartbeat");
peers = waku.getMeshPeers();
}
}

View File

@ -1,7 +1,6 @@
import type { Stream } from "@libp2p/interface-connection";
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { PubSub } from "@libp2p/interface-pubsub";
import type { Multiaddr } from "@multiformats/multiaddr";
import type {
IFilter,
@ -14,7 +13,6 @@ import { Protocols } from "@waku/interfaces";
import debug from "debug";
import { ConnectionManager } from "./connection_manager.js";
import * as relayConstants from "./relay/constants.js";
export const DefaultPingKeepAliveValueSecs = 0;
export const DefaultRelayKeepAliveValueSecs = 5 * 60;
@ -57,7 +55,8 @@ export class WakuNode implements Waku {
libp2p: Libp2p,
store?: (libp2p: Libp2p) => IStore,
lightPush?: (libp2p: Libp2p) => ILightPush,
filter?: (libp2p: Libp2p) => IFilter
filter?: (libp2p: Libp2p) => IFilter,
relay?: (libp2p: Libp2p) => IRelay
) {
this.libp2p = libp2p;
@ -71,8 +70,8 @@ export class WakuNode implements Waku {
this.lightPush = lightPush(libp2p);
}
if (isRelay(libp2p.pubsub)) {
this.relay = libp2p.pubsub;
if (relay) {
this.relay = relay(libp2p);
}
const pingKeepAlive =
@ -120,7 +119,9 @@ export class WakuNode implements Waku {
const codecs: string[] = [];
if (_protocols.includes(Protocols.Relay)) {
if (this.relay) {
this.relay.multicodecs.forEach((codec) => codecs.push(codec));
this.relay.gossipSub.multicodecs.forEach((codec: string) =>
codecs.push(codec)
);
} else {
log(
"Relay codec not included in dial codec: protocol not mounted locally"
@ -188,16 +189,3 @@ export class WakuNode implements Waku {
return localMultiaddr + "/p2p/" + this.libp2p.peerId.toString();
}
}
function isRelay(pubsub: PubSub): pubsub is IRelay {
if (pubsub) {
try {
return pubsub.multicodecs.includes(
relayConstants.RelayCodecs[relayConstants.RelayCodecs.length - 1]
);
// Exception is expected if `libp2p` was not instantiated with pubsub
// eslint-disable-next-line no-empty
} catch (e) {}
}
return false;
}

View File

@ -1,3 +1,4 @@
import type { GossipSub } from "@chainsafe/libp2p-gossipsub";
import { noise } from "@chainsafe/libp2p-noise";
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerDiscovery } from "@libp2p/interface-peer-discovery";
@ -8,6 +9,7 @@ import {
DefaultUserAgent,
RelayCreateOptions,
wakuFilter,
wakuGossipSub,
wakuLightPush,
WakuNode,
WakuOptions,
@ -17,7 +19,6 @@ import {
import { enrTree, wakuDnsDiscovery } from "@waku/dns-discovery";
import type {
FullNode,
IRelay,
LightNode,
ProtocolCreateOptions,
RelayNode,
@ -85,12 +86,21 @@ export async function createRelayNode(
}
const libp2p = await defaultLibp2p(
wakuRelay(options),
wakuGossipSub(options),
libp2pOptions,
options?.userAgent
);
return new WakuNode(options ?? {}, libp2p) as RelayNode;
const relay = wakuRelay(options);
return new WakuNode(
options ?? {},
libp2p,
undefined,
undefined,
undefined,
relay
) as RelayNode;
}
/**
@ -117,7 +127,7 @@ export async function createFullNode(
}
const libp2p = await defaultLibp2p(
wakuRelay(options),
wakuGossipSub(options),
libp2pOptions,
options?.userAgent
);
@ -125,13 +135,15 @@ export async function createFullNode(
const store = wakuStore(options);
const lightPush = wakuLightPush(options);
const filter = wakuFilter(options);
const relay = wakuRelay(options);
return new WakuNode(
options ?? {},
libp2p,
store,
lightPush,
filter
filter,
relay
) as FullNode;
}
@ -142,7 +154,7 @@ export function defaultPeerDiscovery(): (
}
export async function defaultLibp2p(
wakuRelay?: (components: Libp2pComponents) => IRelay,
wakuGossipSub?: (components: Libp2pComponents) => GossipSub,
options?: Partial<Libp2pOptions>,
userAgent?: string
): Promise<Libp2p> {
@ -157,7 +169,7 @@ export async function defaultLibp2p(
},
},
} as Libp2pOptions,
wakuRelay ? { pubsub: wakuRelay } : {},
wakuGossipSub ? { pubsub: wakuGossipSub } : {},
options ?? {}
);

View File

@ -1,14 +1,4 @@
import type { IDecodedMessage, IDecoder } from "./message.js";
import type {
Callback,
PointToPointProtocol,
ProtocolOptions,
} from "./protocols.js";
import type { PointToPointProtocol } from "./protocols.js";
import type { IReceiver } from "./receiver.js";
export interface IFilter extends PointToPointProtocol {
subscribe: <T extends IDecodedMessage>(
decoders: IDecoder<T>[],
callback: Callback<T>,
opts?: ProtocolOptions
) => Promise<() => Promise<void>>;
}
export type IFilter = IReceiver & PointToPointProtocol;

View File

@ -9,3 +9,4 @@ export * from "./store.js";
export * from "./waku.js";
export * from "./connection_manager.js";
export * from "./sender.js";
export * from "./receiver.js";

View File

@ -0,0 +1,17 @@
import type { IDecodedMessage, IDecoder } from "./message.js";
import type { Callback, ProtocolOptions } from "./protocols.js";
type Unsubscribe = () => void | Promise<void>;
type PubSubTopic = string;
type ContentTopic = string;
export type ActiveSubscriptions = Map<PubSubTopic, ContentTopic[]>;
export interface IReceiver {
subscribe: <T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
opts?: ProtocolOptions
) => Unsubscribe | Promise<Unsubscribe>;
getActiveSubscriptions: () => ActiveSubscriptions;
}

View File

@ -1,21 +1,13 @@
import type { GossipSub } from "@chainsafe/libp2p-gossipsub";
import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types";
import type { IDecodedMessage, IDecoder } from "./message.js";
import type { Callback } from "./protocols.js";
import { IReceiver } from "./receiver.js";
import type { ISender } from "./sender.js";
type PubSubTopic = string;
type ContentTopic = string;
export type ActiveSubscriptions = Map<PubSubTopic, ContentTopic[]>;
interface IRelayAPI {
addObserver: <T extends IDecodedMessage>(
decoder: IDecoder<T>,
callback: Callback<T>
) => () => void;
getMeshPeers: () => string[];
getActiveSubscriptions: () => ActiveSubscriptions | undefined;
readonly gossipSub: GossipSub;
start: () => Promise<void>;
getMeshPeers: (topic?: TopicStr) => PeerIdStr[];
}
export type IRelay = IRelayAPI & GossipSub & ISender;
export type IRelay = IRelayAPI & ISender & IReceiver;

View File

@ -78,7 +78,7 @@ describe("Waku Filter", () => {
messageCount++;
expect(msg.contentTopic).to.eq(TestContentTopic);
};
await waku.filter.subscribe([TestDecoder], callback);
await waku.filter.subscribe(TestDecoder, callback);
await delay(200);
await waku.lightPush.send(TestEncoder, {

View File

@ -121,7 +121,7 @@ describe("Waku Relay [node only]", () => {
const receivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve) => {
waku2.relay.addObserver(TestDecoder, resolve);
waku2.relay.subscribe([TestDecoder], resolve);
}
);
@ -152,12 +152,12 @@ describe("Waku Relay [node only]", () => {
const barDecoder = createDecoder(barContentTopic);
const fooMessages: DecodedMessage[] = [];
waku2.relay.addObserver(fooDecoder, (msg) => {
waku2.relay.subscribe([fooDecoder], (msg) => {
fooMessages.push(msg);
});
const barMessages: DecodedMessage[] = [];
waku2.relay.addObserver(barDecoder, (msg) => {
waku2.relay.subscribe([barDecoder], (msg) => {
barMessages.push(msg);
});
@ -207,10 +207,10 @@ describe("Waku Relay [node only]", () => {
const symDecoder = createSymDecoder(symTopic, symKey);
const msgs: DecodedMessage[] = [];
waku2.relay.addObserver(eciesDecoder, (wakuMsg) => {
waku2.relay.subscribe([eciesDecoder], (wakuMsg) => {
msgs.push(wakuMsg);
});
waku2.relay.addObserver(symDecoder, (wakuMsg) => {
waku2.relay.subscribe([symDecoder], (wakuMsg) => {
msgs.push(wakuMsg);
});
@ -239,10 +239,10 @@ describe("Waku Relay [node only]", () => {
// The promise **fails** if we receive a message on this observer.
const receivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve, reject) => {
const deleteObserver = waku2.relay.addObserver(
createDecoder(contentTopic),
const deleteObserver = waku2.relay.subscribe(
[createDecoder(contentTopic)],
reject
);
) as () => void;
deleteObserver();
setTimeout(resolve, 500);
}
@ -313,7 +313,7 @@ describe("Waku Relay [node only]", () => {
const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve) => {
waku2.relay.addObserver(TestDecoder, resolve);
waku2.relay.subscribe([TestDecoder], resolve);
}
);
@ -321,7 +321,7 @@ describe("Waku Relay [node only]", () => {
// pubsub topic.
const waku3NoMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve, reject) => {
waku3.relay.addObserver(TestDecoder, reject);
waku3.relay.subscribe([TestDecoder], reject);
setTimeout(resolve, 1000);
}
);
@ -401,7 +401,7 @@ describe("Waku Relay [node only]", () => {
const receivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve) => {
waku.relay.addObserver<DecodedMessage>(TestDecoder, (msg) =>
waku.relay.subscribe<DecodedMessage>(TestDecoder, (msg) =>
resolve(msg)
);
}
@ -472,7 +472,7 @@ describe("Waku Relay [node only]", () => {
const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve) => {
waku2.relay.addObserver(TestDecoder, resolve);
waku2.relay.subscribe(TestDecoder, resolve);
}
);

View File

@ -178,7 +178,7 @@ describe("Decryption Keys", () => {
const receivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve) => {
waku2.relay.addObserver(decoder, resolve);
waku2.relay.subscribe([decoder], resolve);
}
);