feat: add getActiveSubscriptions method (#1249)

This commit is contained in:
Sasha 2023-03-21 02:44:35 +01:00 committed by GitHub
parent 0f6a594644
commit 45284db963
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 30 deletions

View File

@ -6,9 +6,10 @@ import {
} from "@chainsafe/libp2p-gossipsub";
import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types";
import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types";
import { CustomEvent } from "@libp2p/interfaces/events";
import type {
ActiveSubscriptions,
Callback,
IDecodedMessage,
IDecoder,
IEncoder,
IMessage,
@ -16,7 +17,6 @@ import type {
ProtocolCreateOptions,
SendResult,
} from "@waku/interfaces";
import { IDecodedMessage } from "@waku/interfaces";
import debug from "debug";
import { DefaultPubSubTopic } from "../constants.js";
@ -36,10 +36,6 @@ export type Observer<T extends IDecodedMessage> = {
export type RelayCreateOptions = ProtocolCreateOptions & GossipsubOpts;
export type ContentTopic = string;
type BasicEventPayload = {
contentTopic: string;
};
/**
* Implements the [Waku v2 Relay protocol](https://rfc.vac.dev/spec/11/).
* Must be passed as a `pubsub` module to a `Libp2p` instance.
@ -120,30 +116,20 @@ class Relay extends GossipSub implements IRelay {
pushOrInitMapSet(this.observers, contentTopic, observer);
this.dispatchEvent(
new CustomEvent<BasicEventPayload>("observer:added", {
detail: {
contentTopic,
},
})
);
return () => {
const observers = this.observers.get(contentTopic);
if (observers) {
observers.delete(observer);
this.dispatchEvent(
new CustomEvent<BasicEventPayload>("observer:removed", {
detail: {
contentTopic,
},
})
);
}
};
}
public getActiveSubscriptions(): ActiveSubscriptions {
const map = new Map();
map.set(this.pubSubTopic, this.observers.keys());
return map;
}
private async processIncomingMessage<T extends IDecodedMessage>(
pubSubTopic: string,
bytes: Uint8Array

View File

@ -1,16 +1,13 @@
import type { GossipSub, GossipsubEvents } from "@chainsafe/libp2p-gossipsub";
import type { EventEmitter } from "@libp2p/interfaces/events";
import type { GossipSub } from "@chainsafe/libp2p-gossipsub";
import type { IDecodedMessage, IDecoder } from "./message.js";
import type { Callback } from "./protocols.js";
import type { ISender } from "./sender.js";
export interface RelayEvents {
"observer:added": CustomEvent;
"observer:removed": CustomEvent;
}
type PubSubTopic = string;
type ContentTopic = string;
type IRelayEmitter = EventEmitter<RelayEvents & GossipsubEvents>;
export type ActiveSubscriptions = Map<PubSubTopic, ContentTopic[]>;
interface IRelayAPI {
addObserver: <T extends IDecodedMessage>(
@ -18,6 +15,7 @@ interface IRelayAPI {
callback: Callback<T>
) => () => void;
getMeshPeers: () => string[];
getActiveSubscriptions: () => ActiveSubscriptions | undefined;
}
export type IRelay = ISender & GossipSub & IRelayAPI & IRelayEmitter;
export type IRelay = IRelayAPI & GossipSub & ISender;