introduce & implement new interface IFilterV2

This commit is contained in:
danisharora099 2023-04-19 14:40:04 +05:30
parent 11c52f4275
commit 42d9fd694e
No known key found for this signature in database
GPG Key ID: FBD2BF500037F135
8 changed files with 97 additions and 21 deletions

View File

@ -6,7 +6,7 @@ import type {
Callback, Callback,
IDecodedMessage, IDecodedMessage,
IDecoder, IDecoder,
IFilter, IFilterV1,
ProtocolCreateOptions, ProtocolCreateOptions,
ProtocolOptions, ProtocolOptions,
} from "@waku/interfaces"; } from "@waku/interfaces";
@ -45,7 +45,7 @@ type Subscription<T extends IDecodedMessage> = {
* - https://github.com/status-im/go-waku/issues/245 * - https://github.com/status-im/go-waku/issues/245
* - https://github.com/status-im/nwaku/issues/948 * - https://github.com/status-im/nwaku/issues/948
*/ */
class Filter extends BaseProtocol implements IFilter { class Filter extends BaseProtocol implements IFilterV1 {
options: ProtocolCreateOptions; options: ProtocolCreateOptions;
private subscriptions: Map<RequestID, unknown>; private subscriptions: Map<RequestID, unknown>;
@ -235,6 +235,6 @@ class Filter extends BaseProtocol implements IFilter {
export function wakuFilter( export function wakuFilter(
init: Partial<ProtocolCreateOptions> = {} init: Partial<ProtocolCreateOptions> = {}
): (libp2p: Libp2p) => IFilter { ): (libp2p: Libp2p) => IFilterV1 {
return (libp2p: Libp2p) => new Filter(libp2p, init); return (libp2p: Libp2p) => new Filter(libp2p, init);
} }

View File

@ -6,7 +6,7 @@ import type {
Callback, Callback,
IDecodedMessage, IDecodedMessage,
IDecoder, IDecoder,
IFilter, IFilterV2,
IProtoMessage, IProtoMessage,
ProtocolCreateOptions, ProtocolCreateOptions,
ProtocolOptions, ProtocolOptions,
@ -50,7 +50,7 @@ const FilterV2Codecs = {
* - https://github.com/status-im/go-waku/issues/245 * - https://github.com/status-im/go-waku/issues/245
* - https://github.com/status-im/nwaku/issues/948 * - https://github.com/status-im/nwaku/issues/948
*/ */
class FilterV2 extends BaseProtocol implements IFilter { class FilterV2 extends BaseProtocol implements IFilterV2 {
options: ProtocolCreateOptions; options: ProtocolCreateOptions;
private subscriptions: Map<RequestID, unknown>; private subscriptions: Map<RequestID, unknown>;
@ -143,6 +143,74 @@ class FilterV2 extends BaseProtocol implements IFilter {
}; };
} }
public async unsubscribeAll(): Promise<void> {
const { pubSubTopic = DefaultPubSubTopic } = this.options;
const request = FilterSubscribeRpc.createUnsubscribeAllRequest(pubSubTopic);
const peer = await this.getPeer();
const stream = await this.newStream(peer);
try {
const res = await pipe(
[request.encode()],
lp.encode(),
stream,
lp.decode(),
async (source) => await all(source)
);
const { statusCode, requestId } = FilterSubscribeResponse.decode(
res[0].slice()
);
if (statusCode < 200 || statusCode >= 300) {
throw new Error(
`Filter unsubscribe all request ${requestId} failed with status code ${statusCode}`
);
}
log("Unsubscribed from all content topics");
} catch (error) {
log("Error unsubscribing from all content topics: ", error);
throw error;
}
}
public async ping(): Promise<void> {
const { pubSubTopic = DefaultPubSubTopic } = this.options;
const request = FilterSubscribeRpc.createSubscriberPingRequest(pubSubTopic);
const peer = await this.getPeer();
const stream = await this.newStream(peer);
try {
const res = await pipe(
[request.encode()],
lp.encode(),
stream,
lp.decode(),
async (source) => await all(source)
);
const { statusCode, requestId } = FilterSubscribeResponse.decode(
res[0].slice()
);
if (statusCode < 200 || statusCode >= 300) {
throw new Error(
`Filter ping request ${requestId} failed with status code ${statusCode}`
);
}
log("Ping successful");
} catch (error) {
log("Error pinging: ", error);
throw error;
}
}
public getActiveSubscriptions(): ActiveSubscriptions { public getActiveSubscriptions(): ActiveSubscriptions {
const map: ActiveSubscriptions = new Map(); const map: ActiveSubscriptions = new Map();
const subscriptions = this.subscriptions as Map< const subscriptions = this.subscriptions as Map<
@ -261,6 +329,6 @@ class FilterV2 extends BaseProtocol implements IFilter {
export function wakuFilterV2( export function wakuFilterV2(
init: Partial<ProtocolCreateOptions> = {} init: Partial<ProtocolCreateOptions> = {}
): (libp2p: Libp2p) => IFilter { ): (libp2p: Libp2p) => IFilterV2 {
return (libp2p: Libp2p) => new FilterV2(libp2p, init); return (libp2p: Libp2p) => new FilterV2(libp2p, init);
} }

View File

@ -3,7 +3,8 @@ import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id"; import type { PeerId } from "@libp2p/interface-peer-id";
import type { Multiaddr } from "@multiformats/multiaddr"; import type { Multiaddr } from "@multiformats/multiaddr";
import type { import type {
IFilter, IFilterV1,
IFilterV2,
ILightPush, ILightPush,
IRelay, IRelay,
IStore, IStore,
@ -46,7 +47,7 @@ export class WakuNode implements Waku {
public libp2p: Libp2p; public libp2p: Libp2p;
public relay?: IRelay; public relay?: IRelay;
public store?: IStore; public store?: IStore;
public filter?: IFilter; public filter?: IFilterV1 | IFilterV2;
public lightPush?: ILightPush; public lightPush?: ILightPush;
public connectionManager: ConnectionManager; public connectionManager: ConnectionManager;
@ -55,7 +56,7 @@ export class WakuNode implements Waku {
libp2p: Libp2p, libp2p: Libp2p,
store?: (libp2p: Libp2p) => IStore, store?: (libp2p: Libp2p) => IStore,
lightPush?: (libp2p: Libp2p) => ILightPush, lightPush?: (libp2p: Libp2p) => ILightPush,
filter?: (libp2p: Libp2p) => IFilter, filter?: (libp2p: Libp2p) => IFilterV1 | IFilterV2,
relay?: (libp2p: Libp2p) => IRelay relay?: (libp2p: Libp2p) => IRelay
) { ) {
this.libp2p = libp2p; this.libp2p = libp2p;

View File

@ -20,7 +20,8 @@ import {
import { enrTree, wakuDnsDiscovery } from "@waku/dns-discovery"; import { enrTree, wakuDnsDiscovery } from "@waku/dns-discovery";
import type { import type {
FullNode, FullNode,
IFilter, IFilterV1,
IFilterV2,
LightNode, LightNode,
ProtocolCreateOptions, ProtocolCreateOptions,
RelayNode, RelayNode,
@ -63,7 +64,7 @@ export async function createLightNode(
const store = wakuStore(options); const store = wakuStore(options);
const lightPush = wakuLightPush(options); const lightPush = wakuLightPush(options);
let filter: (libp2p: Libp2p) => IFilter; let filter: (libp2p: Libp2p) => IFilterV1 | IFilterV2;
if (!options?.useFilterV2) { if (!options?.useFilterV2) {
filter = wakuFilter(options); filter = wakuFilter(options);
} else { } else {
@ -143,7 +144,7 @@ export async function createFullNode(
const store = wakuStore(options); const store = wakuStore(options);
const lightPush = wakuLightPush(options); const lightPush = wakuLightPush(options);
let filter: (libp2p: Libp2p) => IFilter; let filter: (libp2p: Libp2p) => IFilterV1 | IFilterV2;
if (!options?.useFilterV2) { if (!options?.useFilterV2) {
filter = wakuFilter(options); filter = wakuFilter(options);
} else { } else {

View File

@ -1,4 +1,5 @@
import type { PointToPointProtocol } from "./protocols.js"; import type { PointToPointProtocol } from "./protocols.js";
import type { IReceiver } from "./receiver.js"; import type { IReceiverV1, IReceiverV2 } from "./receiver.js";
export type IFilter = IReceiver & PointToPointProtocol; export type IFilterV1 = IReceiverV1 & PointToPointProtocol;
export type IFilterV2 = IReceiverV2 & PointToPointProtocol;

View File

@ -7,7 +7,7 @@ type ContentTopic = string;
export type ActiveSubscriptions = Map<PubSubTopic, ContentTopic[]>; export type ActiveSubscriptions = Map<PubSubTopic, ContentTopic[]>;
export interface IReceiver { export interface IReceiverV1 {
subscribe: <T extends IDecodedMessage>( subscribe: <T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[], decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>, callback: Callback<T>,
@ -15,3 +15,8 @@ export interface IReceiver {
) => Unsubscribe | Promise<Unsubscribe>; ) => Unsubscribe | Promise<Unsubscribe>;
getActiveSubscriptions: () => ActiveSubscriptions; getActiveSubscriptions: () => ActiveSubscriptions;
} }
export interface IReceiverV2 extends IReceiverV1 {
ping: () => Promise<void>;
unsubscribeAll: () => Promise<void>;
}

View File

@ -1,7 +1,7 @@
import type { GossipSub } from "@chainsafe/libp2p-gossipsub"; import type { GossipSub } from "@chainsafe/libp2p-gossipsub";
import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types"; import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types";
import { IReceiver } from "./receiver.js"; import { IReceiverV1 } from "./receiver.js";
import type { ISender } from "./sender.js"; import type { ISender } from "./sender.js";
interface IRelayAPI { interface IRelayAPI {
@ -10,4 +10,4 @@ interface IRelayAPI {
getMeshPeers: (topic?: TopicStr) => PeerIdStr[]; getMeshPeers: (topic?: TopicStr) => PeerIdStr[];
} }
export type IRelay = IRelayAPI & ISender & IReceiver; export type IRelay = IRelayAPI & ISender & IReceiverV1;

View File

@ -3,7 +3,7 @@ import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id"; import type { PeerId } from "@libp2p/interface-peer-id";
import type { Multiaddr } from "@multiformats/multiaddr"; import type { Multiaddr } from "@multiformats/multiaddr";
import type { IFilter } from "./filter.js"; import type { IFilterV1, IFilterV2 } from "./filter.js";
import type { ILightPush } from "./light_push.js"; import type { ILightPush } from "./light_push.js";
import { Protocols } from "./protocols.js"; import { Protocols } from "./protocols.js";
import type { IRelay } from "./relay.js"; import type { IRelay } from "./relay.js";
@ -13,7 +13,7 @@ export interface Waku {
libp2p: Libp2p; libp2p: Libp2p;
relay?: IRelay; relay?: IRelay;
store?: IStore; store?: IStore;
filter?: IFilter; filter?: IFilterV1 | IFilterV2;
lightPush?: ILightPush; lightPush?: ILightPush;
dial(peer: PeerId | Multiaddr, protocols?: Protocols[]): Promise<Stream>; dial(peer: PeerId | Multiaddr, protocols?: Protocols[]): Promise<Stream>;
@ -28,7 +28,7 @@ export interface Waku {
export interface LightNode extends Waku { export interface LightNode extends Waku {
relay: undefined; relay: undefined;
store: IStore; store: IStore;
filter: IFilter; filter: IFilterV1 | IFilterV2;
lightPush: ILightPush; lightPush: ILightPush;
} }
@ -42,6 +42,6 @@ export interface RelayNode extends Waku {
export interface FullNode extends Waku { export interface FullNode extends Waku {
relay: IRelay; relay: IRelay;
store: IStore; store: IStore;
filter: IFilter; filter: IFilterV1 | IFilterV2;
lightPush: ILightPush; lightPush: ILightPush;
} }