feat!: filter v2 (#1332)

* implement proto

* implement filter v2

* add tests

* minor improvements
- make unsubscribe functions private in
filter
- enable all tests

* enable all tests

* readd multiaddrinput

* address comment removals

* unsubscribe based on contentFilters passed

* update unsubscribe function parameters in test

* reset interfaces & filter v1

* refactor filterv2 into 2 classes
- removes generics from types on filter which means
manual typecasting to filter version is required on
consumer side
- defaults to filterv2
- splits filterv2 into 2 classes:
  - one to create the subscription object with a
peer which returns the second class
  - the other to manage all subscription functions

* updates filter tests for the new API
- also fixes an interface import

* update `toAsyncIterator` test for Filter V1

* implement IReceiver on FilterV2

* remove return values from subscription functions

* update `to_async_iterator`

* address variable naming

* add tsdoc comments for hidden function

* address minor comments

* update docs to default to filter v2

* address comments

* rename `wakuFilter` to `wakuFilterV1`

* chore: Remove static variables (#1371)

* chore: Remove static variables

- Remove internal types from `@core/interfaces`
- Remove data being redundantly stored (pubsub topic)
- Remove usage of static variables
- Clean up callbacks and decoders when using `unsubscribe`
- Clean up callbacks and decoders when using `unsubscribeAll`

* fix setting activeSubscription

---------

Co-authored-by: danisharora099 <danisharora099@gmail.com>

* make activeSub getter and setter private

* update size-limit

---------

Co-authored-by: fryorcraken.eth <110212804+fryorcraken@users.noreply.github.com>
This commit is contained in:
Danish Arora 2023-05-23 16:06:46 +05:30 committed by GitHub
parent 6870c9e9d7
commit 8d0e647966
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1615 additions and 106 deletions

View File

@ -39,6 +39,8 @@
"exponentiate",
"extip",
"fanout",
"Filterv1",
"Filterv2",
"floodsub",
"fontsource",
"globby",

View File

@ -36,7 +36,7 @@ module.exports = [
{
name: "Light protocols",
path: "packages/core/bundle/index.js",
import: "{ wakuLightPush, wakuFilter }",
import: "{ wakuLightPush, wakuFilterV1, wakuFilterV2 }",
},
{
name: "History retrieval protocols",

114
package-lock.json generated
View File

@ -29551,13 +29551,13 @@
},
"packages/core": {
"name": "@waku/core",
"version": "0.0.17",
"version": "0.0.18",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@noble/hashes": "^1.3.0",
"@waku/interfaces": "*",
"@waku/interfaces": "0.0.13",
"@waku/proto": "*",
"@waku/utils": "*",
"@waku/utils": "0.0.6",
"debug": "^4.3.4",
"it-all": "^3.0.1",
"it-length-prefixed": "^9.0.1",
@ -29630,15 +29630,15 @@
},
"packages/create": {
"name": "@waku/create",
"version": "0.0.13",
"version": "0.0.14",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@chainsafe/libp2p-noise": "^11.0.0",
"@libp2p/mplex": "^7.1.1",
"@libp2p/websockets": "^5.0.3",
"@waku/core": "*",
"@waku/dns-discovery": "*",
"@waku/relay": "*",
"@waku/core": "0.0.18",
"@waku/dns-discovery": "0.0.12",
"@waku/relay": "0.0.1",
"libp2p": "^0.42.2"
},
"devDependencies": {
@ -29662,7 +29662,7 @@
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^5.51.0",
"@waku/build-utils": "*",
"@waku/interfaces": "*",
"@waku/interfaces": "0.0.13",
"cspell": "^6.31.1",
"eslint": "^8.35.0",
"eslint-config-prettier": "^8.6.0",
@ -29698,13 +29698,13 @@
},
"packages/dns-discovery": {
"name": "@waku/dns-discovery",
"version": "0.0.11",
"version": "0.0.12",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@libp2p/interface-peer-discovery": "^1.0.5",
"@libp2p/interfaces": "^3.3.1",
"@waku/enr": "*",
"@waku/utils": "*",
"@waku/enr": "0.0.12",
"@waku/utils": "0.0.6",
"debug": "^4.3.4",
"dns-query": "^0.11.2",
"hi-base32": "^0.5.1",
@ -29723,7 +29723,7 @@
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^5.51.0",
"@waku/build-utils": "*",
"@waku/interfaces": "*",
"@waku/interfaces": "0.0.13",
"chai": "^4.3.7",
"cspell": "^6.31.1",
"eslint": "^8.35.0",
@ -29749,7 +29749,7 @@
},
"packages/enr": {
"name": "@waku/enr",
"version": "0.0.11",
"version": "0.0.12",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@ethersproject/rlp": "^5.7.0",
@ -29757,7 +29757,7 @@
"@libp2p/peer-id": "^2.0.3",
"@multiformats/multiaddr": "^12.0.0",
"@noble/secp256k1": "^1.7.1",
"@waku/utils": "*",
"@waku/utils": "0.0.6",
"debug": "^4.3.4",
"js-sha3": "^0.8.0"
},
@ -29773,7 +29773,7 @@
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^5.51.0",
"@waku/build-utils": "*",
"@waku/interfaces": "0.0.12",
"@waku/interfaces": "0.0.13",
"chai": "^4.3.7",
"cspell": "^6.31.1",
"eslint": "^8.35.0",
@ -29802,7 +29802,7 @@
},
"packages/interfaces": {
"name": "@waku/interfaces",
"version": "0.0.12",
"version": "0.0.13",
"license": "MIT OR Apache-2.0",
"devDependencies": {
"@chainsafe/libp2p-gossipsub": "^6.1.0",
@ -29833,14 +29833,14 @@
},
"packages/message-encryption": {
"name": "@waku/message-encryption",
"version": "0.0.15",
"version": "0.0.16",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@noble/secp256k1": "^1.7.1",
"@waku/core": "*",
"@waku/interfaces": "*",
"@waku/core": "0.0.18",
"@waku/interfaces": "0.0.13",
"@waku/proto": "*",
"@waku/utils": "*",
"@waku/utils": "0.0.6",
"debug": "^4.3.4",
"js-sha3": "^0.8.0"
},
@ -29887,11 +29887,11 @@
},
"packages/message-hash": {
"name": "@waku/message-hash",
"version": "0.1.1",
"version": "0.1.2",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@noble/hashes": "^1.2.0",
"@waku/utils": "*"
"@waku/utils": "0.0.6"
},
"devDependencies": {
"@rollup/plugin-commonjs": "^24.0.1",
@ -29903,7 +29903,7 @@
"@typescript-eslint/eslint-plugin": "^5.54.1",
"@typescript-eslint/parser": "^5.51.0",
"@waku/build-utils": "*",
"@waku/interfaces": "*",
"@waku/interfaces": "0.0.13",
"chai": "^4.3.7",
"cspell": "^6.28.0",
"eslint": "^8.35.0",
@ -29935,15 +29935,15 @@
},
"packages/peer-exchange": {
"name": "@waku/peer-exchange",
"version": "0.0.10",
"version": "0.0.11",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@libp2p/interface-peer-discovery": "^1.0.5",
"@libp2p/interfaces": "^3.3.1",
"@waku/core": "*",
"@waku/enr": "*",
"@waku/core": "0.0.18",
"@waku/enr": "0.0.12",
"@waku/proto": "*",
"@waku/utils": "*",
"@waku/utils": "0.0.6",
"debug": "^4.3.4",
"it-all": "^3.0.1",
"it-length-prefixed": "^9.0.1",
@ -29961,7 +29961,7 @@
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^5.51.0",
"@waku/build-utils": "*",
"@waku/interfaces": "*",
"@waku/interfaces": "0.0.13",
"chai": "^4.3.7",
"cspell": "^6.31.1",
"eslint": "^8.35.0",
@ -30062,10 +30062,10 @@
"@chainsafe/libp2p-gossipsub": "^6.1.0",
"@noble/hashes": "^1.3.0",
"@waku/build-utils": "*",
"@waku/core": "*",
"@waku/interfaces": "*",
"@waku/core": "0.0.18",
"@waku/interfaces": "0.0.13",
"@waku/proto": "*",
"@waku/utils": "*",
"@waku/utils": "0.0.6",
"chai": "^4.3.7",
"debug": "^4.3.4",
"fast-check": "^3.8.1"
@ -30135,7 +30135,7 @@
},
"packages/utils": {
"name": "@waku/utils",
"version": "0.0.5",
"version": "0.0.6",
"license": "MIT OR Apache-2.0",
"dependencies": {
"debug": "^4.3.4",
@ -30151,7 +30151,7 @@
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^5.51.0",
"@waku/build-utils": "*",
"@waku/interfaces": "*",
"@waku/interfaces": "0.0.13",
"cspell": "^6.31.1",
"eslint": "^8.35.0",
"eslint-config-prettier": "^8.6.0",
@ -34974,9 +34974,9 @@
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^5.51.0",
"@waku/build-utils": "*",
"@waku/interfaces": "*",
"@waku/interfaces": "0.0.13",
"@waku/proto": "*",
"@waku/utils": "*",
"@waku/utils": "0.0.6",
"chai": "^4.3.7",
"cspell": "^6.31.1",
"debug": "^4.3.4",
@ -35041,10 +35041,10 @@
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^5.51.0",
"@waku/build-utils": "*",
"@waku/core": "*",
"@waku/dns-discovery": "*",
"@waku/interfaces": "*",
"@waku/relay": "*",
"@waku/core": "0.0.18",
"@waku/dns-discovery": "0.0.12",
"@waku/interfaces": "0.0.13",
"@waku/relay": "0.0.1",
"cspell": "^6.31.1",
"eslint": "^8.35.0",
"eslint-config-prettier": "^8.6.0",
@ -35090,9 +35090,9 @@
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^5.51.0",
"@waku/build-utils": "*",
"@waku/enr": "*",
"@waku/interfaces": "*",
"@waku/utils": "*",
"@waku/enr": "0.0.12",
"@waku/interfaces": "0.0.13",
"@waku/utils": "0.0.6",
"chai": "^4.3.7",
"cspell": "^6.31.1",
"debug": "^4.3.4",
@ -35136,8 +35136,8 @@
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^5.51.0",
"@waku/build-utils": "*",
"@waku/interfaces": "0.0.12",
"@waku/utils": "*",
"@waku/interfaces": "0.0.13",
"@waku/utils": "0.0.6",
"chai": "^4.3.7",
"cspell": "^6.31.1",
"debug": "^4.3.4",
@ -35207,10 +35207,10 @@
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^5.51.0",
"@waku/build-utils": "*",
"@waku/core": "*",
"@waku/interfaces": "*",
"@waku/core": "0.0.18",
"@waku/interfaces": "0.0.13",
"@waku/proto": "*",
"@waku/utils": "*",
"@waku/utils": "0.0.6",
"chai": "^4.3.7",
"cspell": "^6.31.1",
"debug": "^4.3.4",
@ -35249,8 +35249,8 @@
"@typescript-eslint/eslint-plugin": "^5.54.1",
"@typescript-eslint/parser": "^5.51.0",
"@waku/build-utils": "*",
"@waku/interfaces": "*",
"@waku/utils": "*",
"@waku/interfaces": "0.0.13",
"@waku/utils": "0.0.6",
"chai": "^4.3.7",
"cspell": "^6.28.0",
"eslint": "^8.35.0",
@ -35293,11 +35293,11 @@
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^5.51.0",
"@waku/build-utils": "*",
"@waku/core": "*",
"@waku/enr": "*",
"@waku/interfaces": "*",
"@waku/core": "0.0.18",
"@waku/enr": "0.0.12",
"@waku/interfaces": "0.0.13",
"@waku/proto": "*",
"@waku/utils": "*",
"@waku/utils": "0.0.6",
"chai": "^4.3.7",
"cspell": "^6.31.1",
"debug": "^4.3.4",
@ -35381,10 +35381,10 @@
"@rollup/plugin-json": "^6.0.0",
"@rollup/plugin-node-resolve": "^15.0.2",
"@waku/build-utils": "*",
"@waku/core": "*",
"@waku/interfaces": "*",
"@waku/core": "0.0.18",
"@waku/interfaces": "0.0.13",
"@waku/proto": "*",
"@waku/utils": "*",
"@waku/utils": "0.0.6",
"chai": "^4.3.7",
"debug": "^4.3.4",
"fast-check": "^3.8.1",
@ -35448,7 +35448,7 @@
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^5.51.0",
"@waku/build-utils": "*",
"@waku/interfaces": "*",
"@waku/interfaces": "0.0.13",
"cspell": "^6.31.1",
"debug": "^4.3.4",
"eslint": "^8.35.0",

View File

@ -11,8 +11,11 @@ export * as message from "./lib/message/index.js";
export * as waku from "./lib/waku.js";
export { WakuNode, WakuOptions } from "./lib/waku.js";
export * as waku_filter from "./lib/filter/index.js";
export { wakuFilter } from "./lib/filter/index.js";
export * as waku_filter_v1 from "./lib/filter/v1/index.js";
export { wakuFilter as wakuFilterV1 } from "./lib/filter/v1/index.js";
export * as waku_filter_v2 from "./lib/filter/v2/index.js";
export { wakuFilterV2 } from "./lib/filter/v2/index.js";
export * as waku_light_push from "./lib/light_push/index.js";
export { wakuLightPush, LightPushCodec } from "./lib/light_push/index.js";

View File

@ -1,10 +1,7 @@
import { ContentFilter } from "@waku/interfaces";
import { proto_filter as proto } from "@waku/proto";
import { v4 as uuid } from "uuid";
export type ContentFilter = {
contentTopic: string;
};
/**
* FilterRPC represents a message conforming to the Waku Filter protocol
*/

View File

@ -4,6 +4,7 @@ import type { IncomingStreamData } from "@libp2p/interface-registrar";
import type {
ActiveSubscriptions,
Callback,
ContentFilter,
IAsyncIterator,
IDecodedMessage,
IDecoder,
@ -19,13 +20,11 @@ import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { BaseProtocol } from "../base_protocol.js";
import { DefaultPubSubTopic } from "../constants.js";
import { toProtoMessage } from "../to_proto_message.js";
import { BaseProtocol } from "../../base_protocol.js";
import { DefaultPubSubTopic } from "../../constants.js";
import { toProtoMessage } from "../../to_proto_message.js";
import { ContentFilter, FilterRpc } from "./filter_rpc.js";
export { ContentFilter };
import { FilterRpc } from "./filter_rpc.js";
export const FilterCodec = "/vac/waku/filter/2.0.0-beta1";

View File

@ -0,0 +1,131 @@
import { proto_filter_v2 as proto, WakuMessage } from "@waku/proto";
import { v4 as uuid } from "uuid";
/**
* FilterPushRPC represents a message conforming to the Waku FilterPush protocol.
* Protocol documentation: https://rfc.vac.dev/spec/12/
*/
export class FilterPushRpc {
public constructor(public proto: proto.MessagePush) {}
static decode(bytes: Uint8Array): FilterPushRpc {
const res = proto.MessagePush.decode(bytes);
return new FilterPushRpc(res);
}
encode(): Uint8Array {
return proto.MessagePush.encode(this.proto);
}
get wakuMessage(): WakuMessage | undefined {
return this.proto.wakuMessage;
}
/**
* Get the pubsub topic from the FilterPushRpc object.
* @returns string
*/
get pubsubTopic(): string | undefined {
return this.proto.pubsubTopic;
}
}
export class FilterSubscribeRpc {
public constructor(public proto: proto.FilterSubscribeRequest) {}
static createSubscribeRequest(
pubsubTopic: string,
contentTopics: string[]
): FilterSubscribeRpc {
return new FilterSubscribeRpc({
requestId: uuid(),
filterSubscribeType:
proto.FilterSubscribeRequest.FilterSubscribeType.SUBSCRIBE,
pubsubTopic,
contentTopics,
});
}
static createUnsubscribeRequest(
pubsubTopic: string,
contentTopics: string[]
): FilterSubscribeRpc {
return new FilterSubscribeRpc({
requestId: uuid(),
filterSubscribeType:
proto.FilterSubscribeRequest.FilterSubscribeType.UNSUBSCRIBE,
pubsubTopic,
contentTopics,
});
}
static createUnsubscribeAllRequest(pubsubTopic: string): FilterSubscribeRpc {
return new FilterSubscribeRpc({
requestId: uuid(),
filterSubscribeType:
proto.FilterSubscribeRequest.FilterSubscribeType.UNSUBSCRIBE_ALL,
pubsubTopic,
contentTopics: [],
});
}
static createSubscriberPingRequest(): FilterSubscribeRpc {
return new FilterSubscribeRpc({
requestId: uuid(),
filterSubscribeType:
proto.FilterSubscribeRequest.FilterSubscribeType.SUBSCRIBER_PING,
pubsubTopic: "",
contentTopics: [],
});
}
static decode(bytes: Uint8Array): FilterSubscribeRpc {
const res = proto.FilterSubscribeRequest.decode(bytes);
return new FilterSubscribeRpc(res);
}
encode(): Uint8Array {
return proto.FilterSubscribeRequest.encode(this.proto);
}
get filterSubscribeType(): proto.FilterSubscribeRequest.FilterSubscribeType {
return this.proto.filterSubscribeType;
}
get requestId(): string {
return this.proto.requestId;
}
get pubsubTopic(): string | undefined {
return this.proto.pubsubTopic;
}
get contentTopics(): string[] {
return this.proto.contentTopics;
}
}
export class FilterSubscribeResponse {
public constructor(public proto: proto.FilterSubscribeResponse) {}
static decode(bytes: Uint8Array): FilterSubscribeResponse {
const res = proto.FilterSubscribeResponse.decode(bytes);
return new FilterSubscribeResponse(res);
}
encode(): Uint8Array {
return proto.FilterSubscribeResponse.encode(this.proto);
}
get statusCode(): number {
return this.proto.statusCode;
}
get statusDesc(): string | undefined {
return this.proto.statusDesc;
}
get requestId(): string {
return this.proto.requestId;
}
}

View File

@ -0,0 +1,413 @@
import { Stream } from "@libp2p/interface-connection";
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { Peer } from "@libp2p/interface-peer-store";
import type { IncomingStreamData } from "@libp2p/interface-registrar";
import type {
Callback,
ContentTopic,
IAsyncIterator,
IDecodedMessage,
IDecoder,
IFilterV2,
IProtoMessage,
IReceiver,
PeerIdStr,
ProtocolCreateOptions,
ProtocolOptions,
PubSubTopic,
Unsubscribe,
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { groupByContentTopic, toAsyncIterator } from "@waku/utils";
import debug from "debug";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { BaseProtocol } from "../../base_protocol.js";
import { DefaultPubSubTopic } from "../../constants.js";
import {
FilterPushRpc,
FilterSubscribeResponse,
FilterSubscribeRpc,
} from "./filter_rpc.js";
const log = debug("waku:filter:v2");
type SubscriptionCallback<T extends IDecodedMessage> = {
decoders: IDecoder<T>[];
callback: Callback<T>;
};
const FilterV2Codecs = {
SUBSCRIBE: "/vac/waku/filter-subscribe/2.0.0-beta1",
PUSH: "/vac/waku/filter-push/2.0.0-beta1",
};
class Subscription {
private readonly peer: Peer;
private readonly pubSubTopic: PubSubTopic;
private newStream: (peer: Peer) => Promise<Stream>;
private subscriptionCallbacks: Map<
ContentTopic,
SubscriptionCallback<IDecodedMessage>
>;
constructor(
pubSubTopic: PubSubTopic,
remotePeer: Peer,
newStream: (peer: Peer) => Promise<Stream>
) {
this.peer = remotePeer;
this.pubSubTopic = pubSubTopic;
this.newStream = newStream;
this.subscriptionCallbacks = new Map();
}
async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): Promise<void> {
const decodersArray = Array.isArray(decoders) ? decoders : [decoders];
const decodersGroupedByCT = groupByContentTopic(decodersArray);
const contentTopics = Array.from(decodersGroupedByCT.keys());
const stream = await this.newStream(this.peer);
const request = FilterSubscribeRpc.createSubscribeRequest(
this.pubSubTopic,
contentTopics
);
try {
const res = await pipe(
[request.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
const { statusCode, requestId, statusDesc } =
FilterSubscribeResponse.decode(res[0].slice());
if (statusCode < 200 || statusCode >= 300) {
throw new Error(
`Filter subscribe request ${requestId} failed with status code ${statusCode}: ${statusDesc}`
);
}
log(
"Subscribed to peer ",
this.peer.id.toString(),
"for content topics",
contentTopics
);
} catch (e) {
throw new Error(
"Error subscribing to peer: " +
this.peer.id.toString() +
" for content topics: " +
contentTopics +
": " +
e
);
}
// Save the callback functions by content topics so they
// can easily be removed (reciprocally replaced) if `unsubscribe` (reciprocally `subscribe`)
// is called for those content topics
decodersGroupedByCT.forEach((decoders, contentTopic) => {
// Cast the type because a given `subscriptionCallbacks` map may hold
// Decoder that decode to different implementations of `IDecodedMessage`
const subscriptionCallback = {
decoders,
callback,
} as unknown as SubscriptionCallback<IDecodedMessage>;
// The callback and decoder may override previous values, this is on
// purpose as the user may call `subscribe` to refresh the subscription
this.subscriptionCallbacks.set(contentTopic, subscriptionCallback);
});
}
async unsubscribe(contentTopics: ContentTopic[]): Promise<void> {
const stream = await this.newStream(this.peer);
const unsubscribeRequest = FilterSubscribeRpc.createUnsubscribeRequest(
this.pubSubTopic,
contentTopics
);
try {
await pipe([unsubscribeRequest.encode()], lp.encode, stream.sink);
} catch (error) {
throw new Error("Error subscribing: " + error);
}
contentTopics.forEach((contentTopic: string) => {
this.subscriptionCallbacks.delete(contentTopic);
});
}
async ping(): Promise<void> {
const stream = await this.newStream(this.peer);
const request = FilterSubscribeRpc.createSubscriberPingRequest();
try {
const res = await pipe(
[request.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
const { statusCode, requestId, statusDesc } =
FilterSubscribeResponse.decode(res[0].slice());
if (statusCode < 200 || statusCode >= 300) {
throw new Error(
`Filter ping request ${requestId} failed with status code ${statusCode}: ${statusDesc}`
);
}
log("Ping successful");
} catch (error) {
log("Error pinging: ", error);
throw new Error("Error pinging: " + error);
}
}
async unsubscribeAll(): Promise<void> {
const stream = await this.newStream(this.peer);
const request = FilterSubscribeRpc.createUnsubscribeAllRequest(
this.pubSubTopic
);
try {
const res = await pipe(
[request.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
const { statusCode, requestId, statusDesc } =
FilterSubscribeResponse.decode(res[0].slice());
if (statusCode < 200 || statusCode >= 300) {
throw new Error(
`Filter unsubscribe all request ${requestId} failed with status code ${statusCode}: ${statusDesc}`
);
}
this.subscriptionCallbacks.clear();
log("Unsubscribed from all content topics");
} catch (error) {
throw new Error("Error unsubscribing from all content topics: " + error);
}
}
async processMessage(message: WakuMessage): Promise<void> {
const contentTopic = message.contentTopic;
const subscriptionCallback = this.subscriptionCallbacks.get(contentTopic);
if (!subscriptionCallback) {
log("No subscription callback available for ", contentTopic);
return;
}
await pushMessage(subscriptionCallback, this.pubSubTopic, message);
}
}
class FilterV2 extends BaseProtocol implements IReceiver {
private readonly options: ProtocolCreateOptions;
private activeSubscriptions = new Map<string, Subscription>();
private getActiveSubscription(
pubSubTopic: PubSubTopic,
peerIdStr: PeerIdStr
): Subscription | undefined {
return this.activeSubscriptions.get(`${pubSubTopic}_${peerIdStr}`);
}
private setActiveSubscription(
pubSubTopic: PubSubTopic,
peerIdStr: PeerIdStr,
subscription: Subscription
): Subscription {
this.activeSubscriptions.set(`${pubSubTopic}_${peerIdStr}`, subscription);
return subscription;
}
constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(
FilterV2Codecs.SUBSCRIBE,
libp2p.peerStore,
libp2p.getConnections.bind(libp2p)
);
this.libp2p
.handle(FilterV2Codecs.PUSH, this.onRequest.bind(this))
.catch((e) => {
log("Failed to register ", FilterV2Codecs.PUSH, e);
});
this.activeSubscriptions = new Map();
this.options = options ?? {};
}
async createSubscription(
pubSubTopic?: string,
peerId?: PeerId
): Promise<Subscription> {
const _pubSubTopic =
pubSubTopic ?? this.options.pubSubTopic ?? DefaultPubSubTopic;
const peer = await this.getPeer(peerId);
const subscription =
this.getActiveSubscription(_pubSubTopic, peer.id.toString()) ??
this.setActiveSubscription(
_pubSubTopic,
peer.id.toString(),
new Subscription(_pubSubTopic, peer, this.newStream.bind(this, peer))
);
return subscription;
}
public toSubscriptionIterator<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
opts?: ProtocolOptions | undefined
): Promise<IAsyncIterator<T>> {
return toAsyncIterator(this, decoders, opts);
}
/**
* This method is used to satisfy the `IReceiver` interface.
*
* @hidden
*
* @param decoders The decoders to use for the subscription.
* @param callback The callback function to use for the subscription.
* @param opts Optional protocol options for the subscription.
*
* @returns A Promise that resolves to a function that unsubscribes from the subscription.
*
* @remarks
* This method should not be used directly.
* Instead, use `createSubscription` to create a new subscription.
*/
async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
opts?: ProtocolOptions
): Promise<Unsubscribe> {
const subscription = await this.createSubscription(undefined, opts?.peerId);
subscription.subscribe(decoders, callback);
const contentTopics = Array.from(
groupByContentTopic(
Array.isArray(decoders) ? decoders : [decoders]
).keys()
);
return async () => {
await subscription.unsubscribe(contentTopics);
};
}
private onRequest(streamData: IncomingStreamData): void {
log("Receiving message push");
try {
pipe(streamData.stream, lp.decode, async (source) => {
for await (const bytes of source) {
const response = FilterPushRpc.decode(bytes.slice());
const { pubsubTopic, wakuMessage } = response;
if (!wakuMessage) {
log("Received empty message");
return;
}
if (!pubsubTopic) {
log("PubSub topic missing from push message");
return;
}
const peerIdStr = streamData.connection.remotePeer.toString();
const subscription = this.getActiveSubscription(
pubsubTopic,
peerIdStr
);
if (!subscription) {
log(`No subscription locally registered for topic ${pubsubTopic}`);
return;
}
await subscription.processMessage(wakuMessage);
}
}).then(
() => {
log("Receiving pipe closed.");
},
(e) => {
log("Error with receiving pipe", e);
}
);
} catch (e) {
log("Error decoding message", e);
}
}
}
export function wakuFilterV2(
init: Partial<ProtocolCreateOptions> = {}
): (libp2p: Libp2p) => IFilterV2 {
return (libp2p: Libp2p) => new FilterV2(libp2p, init);
}
async function pushMessage<T extends IDecodedMessage>(
subscriptionCallback: SubscriptionCallback<T>,
pubSubTopic: PubSubTopic,
message: WakuMessage
): Promise<void> {
const { decoders, callback } = subscriptionCallback;
const { contentTopic } = message;
if (!contentTopic) {
log("Message has no content topic, skipping");
return;
}
let didDecodeMsg = false;
// We don't want to wait for decoding failure, just attempt to decode
// all messages and do the call back on the one that works
// noinspection ES6MissingAwait
decoders.forEach(async (dec: IDecoder<T>) => {
if (didDecodeMsg) return;
const decoded = await dec.fromProtoObj(
pubSubTopic,
message as IProtoMessage
);
// const decoded = await dec.fromProtoObj(pubSubTopic, message);
if (!decoded) {
log("Not able to decode message");
return;
}
// This is just to prevent more decoding attempt
// TODO: Could be better if we were to abort promises
didDecodeMsg = Boolean(decoded);
await callback(decoded);
});
}

View File

@ -1,11 +1,10 @@
import type { Stream } from "@libp2p/interface-connection";
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id";
import { isPeerId } from "@libp2p/interface-peer-id";
import type { Multiaddr, MultiaddrInput } from "@multiformats/multiaddr";
import { multiaddr } from "@multiformats/multiaddr";
import { isPeerId, PeerId } from "@libp2p/interface-peer-id";
import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr";
import type {
IFilter,
IFilterV2,
ILightPush,
IRelay,
IStore,
@ -48,7 +47,7 @@ export class WakuNode implements Waku {
public libp2p: Libp2p;
public relay?: IRelay;
public store?: IStore;
public filter?: IFilter;
public filter?: IFilter | IFilterV2;
public lightPush?: ILightPush;
public connectionManager: ConnectionManager;
@ -57,7 +56,7 @@ export class WakuNode implements Waku {
libp2p: Libp2p,
store?: (libp2p: Libp2p) => IStore,
lightPush?: (libp2p: Libp2p) => ILightPush,
filter?: (libp2p: Libp2p) => IFilter,
filter?: (libp2p: Libp2p) => IFilter | IFilterV2,
relay?: (libp2p: Libp2p) => IRelay
) {
this.libp2p = libp2p;
@ -192,7 +191,6 @@ export class WakuNode implements Waku {
return localMultiaddr + "/p2p/" + this.libp2p.peerId.toString();
}
}
function mapToPeerIdOrMultiaddr(
peerId: PeerId | MultiaddrInput
): PeerId | Multiaddr {

View File

@ -7,7 +7,8 @@ import { webSockets } from "@libp2p/websockets";
import { all as filterAll } from "@libp2p/websockets/filters";
import {
DefaultUserAgent,
wakuFilter,
wakuFilterV1,
wakuFilterV2,
wakuLightPush,
WakuNode,
WakuOptions,
@ -16,6 +17,8 @@ import {
import { enrTree, wakuDnsDiscovery } from "@waku/dns-discovery";
import type {
FullNode,
IFilter,
IFilterV2,
LightNode,
ProtocolCreateOptions,
RelayNode,
@ -36,6 +39,9 @@ export { Libp2pComponents };
/**
* Create a Waku node that uses Waku Light Push, Filter and Store to send and
* receive messages, enabling low resource consumption.
* If `useFilterV1` is set to true, the node will use Filter V1 protocol.
* If `useFilterV1` is set to false or undefined, the node will use Filter V2 protocol. (default behavior)
*
* **Note: This is NOT compatible with nwaku v0.11**
*
* @see https://github.com/status-im/nwaku/issues/1085
@ -58,7 +64,14 @@ export async function createLightNode(
const store = wakuStore(options);
const lightPush = wakuLightPush(options);
const filter = wakuFilter(options);
let filter: (libp2p: Libp2p) => IFilter | IFilterV2;
if (options?.useFilterV1) {
filter = wakuFilterV1(options) as (libp2p: Libp2p) => IFilter;
} else {
filter = wakuFilterV2() as (libp2p: Libp2p) => IFilterV2;
}
return new WakuNode(
options ?? {},
@ -103,6 +116,9 @@ export async function createRelayNode(
/**
* Create a Waku node that uses all Waku protocols.
* Implements generics to allow for conditional type checking for Filter V1 and V2 protocols.
* If `useFilterV1` is set to true, the node will use Filter V1 protocol.
* If `useFilterV1` is set to false or undefined, the node will use Filter V2 protocol. (default behavior)
*
* This helper is not recommended except if:
* - you are interfacing with nwaku v0.11 or below
@ -132,7 +148,14 @@ export async function createFullNode(
const store = wakuStore(options);
const lightPush = wakuLightPush(options);
const filter = wakuFilter(options);
let filter: (libp2p: Libp2p) => IFilter | IFilterV2;
if (!options?.useFilterV1) {
filter = wakuFilterV2();
} else {
filter = wakuFilterV1(options);
}
const relay = wakuRelay(options);
return new WakuNode(

View File

@ -1,4 +1,33 @@
import type { PointToPointProtocol } from "./protocols.js";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { IDecodedMessage, IDecoder } from "./message.js";
import type { ContentTopic } from "./misc.js";
import type { Callback, PointToPointProtocol } from "./protocols.js";
import type { IReceiver } from "./receiver.js";
export type ContentFilter = {
contentTopic: string;
};
export type IFilter = IReceiver & PointToPointProtocol;
export interface IFilterV2Subscription {
subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): Promise<void>;
unsubscribe(contentTopics: ContentTopic[]): Promise<void>;
ping(): Promise<void>;
unsubscribeAll(): Promise<void>;
}
export type IFilterV2 = IReceiver &
PointToPointProtocol & {
createSubscription(
pubSubTopic?: string,
peerId?: PeerId
): Promise<IFilterV2Subscription>;
};

View File

@ -9,3 +9,5 @@ export type Unsubscribe = () => void | Promise<void>;
export type PubSubTopic = string;
export type ContentTopic = string;
export type PeerIdStr = string;

View File

@ -49,6 +49,12 @@ export type ProtocolCreateOptions = {
* Use recommended bootstrap method to discovery and connect to new nodes.
*/
defaultBootstrap?: boolean;
/**
* FilterV2 has been set to default
* Use this flag to enable the previous version of the filter protocol
* See [Improved Filter protocol specifications](https://github.com/vacp2p/rfc/pull/562)
*/
useFilterV1?: boolean;
};
export type ProtocolOptions = {

View File

@ -1,12 +1,9 @@
import type { IDecodedMessage, IDecoder } from "./message.js";
import type {
ContentTopic,
IAsyncIterator,
PubSubTopic,
Unsubscribe,
} from "./misc.js";
import type { IAsyncIterator, PubSubTopic, Unsubscribe } from "./misc.js";
import type { Callback, ProtocolOptions } from "./protocols.js";
type ContentTopic = string;
export type ActiveSubscriptions = Map<PubSubTopic, ContentTopic[]>;
export interface IReceiver {
@ -19,5 +16,4 @@ export interface IReceiver {
callback: Callback<T>,
opts?: ProtocolOptions
) => Unsubscribe | Promise<Unsubscribe>;
getActiveSubscriptions: () => ActiveSubscriptions;
}

View File

@ -3,7 +3,7 @@ import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { Multiaddr } from "@multiformats/multiaddr";
import type { IFilter } from "./filter.js";
import type { IFilter, IFilterV2 } from "./filter.js";
import type { ILightPush } from "./light_push.js";
import { Protocols } from "./protocols.js";
import type { IRelay } from "./relay.js";
@ -13,7 +13,7 @@ export interface Waku {
libp2p: Libp2p;
relay?: IRelay;
store?: IStore;
filter?: IFilter;
filter?: IFilter | IFilterV2;
lightPush?: ILightPush;
dial(peer: PeerId | Multiaddr, protocols?: Protocols[]): Promise<Stream>;
@ -28,7 +28,7 @@ export interface Waku {
export interface LightNode extends Waku {
relay: undefined;
store: IStore;
filter: IFilter;
filter: IFilter | IFilterV2;
lightPush: ILightPush;
}
@ -42,6 +42,6 @@ export interface RelayNode extends Waku {
export interface FullNode extends Waku {
relay: IRelay;
store: IStore;
filter: IFilter;
filter: IFilter | IFilterV2;
lightPush: ILightPush;
}

View File

@ -5,6 +5,7 @@ export * as proto_topic_only_message from "./lib/topic_only_message.js";
export { TopicOnlyMessage } from "./lib/topic_only_message.js";
export * as proto_filter from "./lib/filter.js";
export * as proto_filter_v2 from "./lib/filter_v2.js";
export * as proto_lightpush from "./lib/light_push.js";
export { PushResponse } from "./lib/light_push.js";

View File

@ -0,0 +1,35 @@
syntax = "proto3";
// 12/WAKU2-FILTER rfc: https://rfc.vac.dev/spec/12/
import "message.proto";
// Protocol identifier: /vac/waku/filter-subscribe/2.0.0-beta1
message FilterSubscribeRequest {
enum FilterSubscribeType {
SUBSCRIBER_PING = 0;
SUBSCRIBE = 1;
UNSUBSCRIBE = 2;
UNSUBSCRIBE_ALL = 3;
}
string request_id = 1;
FilterSubscribeType filter_subscribe_type = 2;
// Filter criteria
optional string pubsub_topic = 10;
repeated string content_topics = 11;
}
message FilterSubscribeResponse {
string request_id = 1;
uint32 status_code = 10;
optional string status_desc = 11;
}
// Protocol identifier: /vac/waku/filter-push/2.0.0-beta1
message MessagePush {
WakuMessage waku_message = 1;
optional string pubsub_topic = 2;
}

View File

@ -0,0 +1,530 @@
/* eslint-disable import/export */
/* eslint-disable complexity */
/* eslint-disable @typescript-eslint/no-namespace */
/* eslint-disable @typescript-eslint/no-unnecessary-boolean-literal-compare */
/* eslint-disable @typescript-eslint/no-empty-interface */
import {
enumeration,
encodeMessage,
decodeMessage,
message,
} from "protons-runtime";
import type { Codec } from "protons-runtime";
import type { Uint8ArrayList } from "uint8arraylist";
export interface FilterSubscribeRequest {
requestId: string;
filterSubscribeType: FilterSubscribeRequest.FilterSubscribeType;
pubsubTopic?: string;
contentTopics: string[];
}
export namespace FilterSubscribeRequest {
export enum FilterSubscribeType {
SUBSCRIBER_PING = "SUBSCRIBER_PING",
SUBSCRIBE = "SUBSCRIBE",
UNSUBSCRIBE = "UNSUBSCRIBE",
UNSUBSCRIBE_ALL = "UNSUBSCRIBE_ALL",
}
enum __FilterSubscribeTypeValues {
SUBSCRIBER_PING = 0,
SUBSCRIBE = 1,
UNSUBSCRIBE = 2,
UNSUBSCRIBE_ALL = 3,
}
export namespace FilterSubscribeType {
export const codec = (): Codec<FilterSubscribeType> => {
return enumeration<FilterSubscribeType>(__FilterSubscribeTypeValues);
};
}
let _codec: Codec<FilterSubscribeRequest>;
export const codec = (): Codec<FilterSubscribeRequest> => {
if (_codec == null) {
_codec = message<FilterSubscribeRequest>(
(obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork();
}
if (obj.requestId != null && obj.requestId !== "") {
w.uint32(10);
w.string(obj.requestId);
}
if (
obj.filterSubscribeType != null &&
__FilterSubscribeTypeValues[obj.filterSubscribeType] !== 0
) {
w.uint32(16);
FilterSubscribeRequest.FilterSubscribeType.codec().encode(
obj.filterSubscribeType,
w
);
}
if (obj.pubsubTopic != null) {
w.uint32(82);
w.string(obj.pubsubTopic);
}
if (obj.contentTopics != null) {
for (const value of obj.contentTopics) {
w.uint32(90);
w.string(value);
}
}
if (opts.lengthDelimited !== false) {
w.ldelim();
}
},
(reader, length) => {
const obj: any = {
requestId: "",
filterSubscribeType: FilterSubscribeType.SUBSCRIBER_PING,
contentTopics: [],
};
const end = length == null ? reader.len : reader.pos + length;
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
obj.requestId = reader.string();
break;
case 2:
obj.filterSubscribeType =
FilterSubscribeRequest.FilterSubscribeType.codec().decode(
reader
);
break;
case 10:
obj.pubsubTopic = reader.string();
break;
case 11:
obj.contentTopics.push(reader.string());
break;
default:
reader.skipType(tag & 7);
break;
}
}
return obj;
}
);
}
return _codec;
};
export const encode = (obj: Partial<FilterSubscribeRequest>): Uint8Array => {
return encodeMessage(obj, FilterSubscribeRequest.codec());
};
export const decode = (
buf: Uint8Array | Uint8ArrayList
): FilterSubscribeRequest => {
return decodeMessage(buf, FilterSubscribeRequest.codec());
};
}
export interface FilterSubscribeResponse {
requestId: string;
statusCode: number;
statusDesc?: string;
}
export namespace FilterSubscribeResponse {
let _codec: Codec<FilterSubscribeResponse>;
export const codec = (): Codec<FilterSubscribeResponse> => {
if (_codec == null) {
_codec = message<FilterSubscribeResponse>(
(obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork();
}
if (obj.requestId != null && obj.requestId !== "") {
w.uint32(10);
w.string(obj.requestId);
}
if (obj.statusCode != null && obj.statusCode !== 0) {
w.uint32(80);
w.uint32(obj.statusCode);
}
if (obj.statusDesc != null) {
w.uint32(90);
w.string(obj.statusDesc);
}
if (opts.lengthDelimited !== false) {
w.ldelim();
}
},
(reader, length) => {
const obj: any = {
requestId: "",
statusCode: 0,
};
const end = length == null ? reader.len : reader.pos + length;
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
obj.requestId = reader.string();
break;
case 10:
obj.statusCode = reader.uint32();
break;
case 11:
obj.statusDesc = reader.string();
break;
default:
reader.skipType(tag & 7);
break;
}
}
return obj;
}
);
}
return _codec;
};
export const encode = (obj: Partial<FilterSubscribeResponse>): Uint8Array => {
return encodeMessage(obj, FilterSubscribeResponse.codec());
};
export const decode = (
buf: Uint8Array | Uint8ArrayList
): FilterSubscribeResponse => {
return decodeMessage(buf, FilterSubscribeResponse.codec());
};
}
export interface MessagePush {
wakuMessage?: WakuMessage;
pubsubTopic?: string;
}
export namespace MessagePush {
let _codec: Codec<MessagePush>;
export const codec = (): Codec<MessagePush> => {
if (_codec == null) {
_codec = message<MessagePush>(
(obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork();
}
if (obj.wakuMessage != null) {
w.uint32(10);
WakuMessage.codec().encode(obj.wakuMessage, w);
}
if (obj.pubsubTopic != null) {
w.uint32(18);
w.string(obj.pubsubTopic);
}
if (opts.lengthDelimited !== false) {
w.ldelim();
}
},
(reader, length) => {
const obj: any = {};
const end = length == null ? reader.len : reader.pos + length;
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
obj.wakuMessage = WakuMessage.codec().decode(
reader,
reader.uint32()
);
break;
case 2:
obj.pubsubTopic = reader.string();
break;
default:
reader.skipType(tag & 7);
break;
}
}
return obj;
}
);
}
return _codec;
};
export const encode = (obj: Partial<MessagePush>): Uint8Array => {
return encodeMessage(obj, MessagePush.codec());
};
export const decode = (buf: Uint8Array | Uint8ArrayList): MessagePush => {
return decodeMessage(buf, MessagePush.codec());
};
}
export interface RateLimitProof {
proof: Uint8Array;
merkleRoot: Uint8Array;
epoch: Uint8Array;
shareX: Uint8Array;
shareY: Uint8Array;
nullifier: Uint8Array;
rlnIdentifier: Uint8Array;
}
export namespace RateLimitProof {
let _codec: Codec<RateLimitProof>;
export const codec = (): Codec<RateLimitProof> => {
if (_codec == null) {
_codec = message<RateLimitProof>(
(obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork();
}
if (obj.proof != null && obj.proof.byteLength > 0) {
w.uint32(10);
w.bytes(obj.proof);
}
if (obj.merkleRoot != null && obj.merkleRoot.byteLength > 0) {
w.uint32(18);
w.bytes(obj.merkleRoot);
}
if (obj.epoch != null && obj.epoch.byteLength > 0) {
w.uint32(26);
w.bytes(obj.epoch);
}
if (obj.shareX != null && obj.shareX.byteLength > 0) {
w.uint32(34);
w.bytes(obj.shareX);
}
if (obj.shareY != null && obj.shareY.byteLength > 0) {
w.uint32(42);
w.bytes(obj.shareY);
}
if (obj.nullifier != null && obj.nullifier.byteLength > 0) {
w.uint32(50);
w.bytes(obj.nullifier);
}
if (obj.rlnIdentifier != null && obj.rlnIdentifier.byteLength > 0) {
w.uint32(58);
w.bytes(obj.rlnIdentifier);
}
if (opts.lengthDelimited !== false) {
w.ldelim();
}
},
(reader, length) => {
const obj: any = {
proof: new Uint8Array(0),
merkleRoot: new Uint8Array(0),
epoch: new Uint8Array(0),
shareX: new Uint8Array(0),
shareY: new Uint8Array(0),
nullifier: new Uint8Array(0),
rlnIdentifier: new Uint8Array(0),
};
const end = length == null ? reader.len : reader.pos + length;
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
obj.proof = reader.bytes();
break;
case 2:
obj.merkleRoot = reader.bytes();
break;
case 3:
obj.epoch = reader.bytes();
break;
case 4:
obj.shareX = reader.bytes();
break;
case 5:
obj.shareY = reader.bytes();
break;
case 6:
obj.nullifier = reader.bytes();
break;
case 7:
obj.rlnIdentifier = reader.bytes();
break;
default:
reader.skipType(tag & 7);
break;
}
}
return obj;
}
);
}
return _codec;
};
export const encode = (obj: Partial<RateLimitProof>): Uint8Array => {
return encodeMessage(obj, RateLimitProof.codec());
};
export const decode = (buf: Uint8Array | Uint8ArrayList): RateLimitProof => {
return decodeMessage(buf, RateLimitProof.codec());
};
}
export interface WakuMessage {
payload: Uint8Array;
contentTopic: string;
version?: number;
timestamp?: bigint;
meta?: Uint8Array;
rateLimitProof?: RateLimitProof;
ephemeral?: boolean;
}
export namespace WakuMessage {
let _codec: Codec<WakuMessage>;
export const codec = (): Codec<WakuMessage> => {
if (_codec == null) {
_codec = message<WakuMessage>(
(obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork();
}
if (obj.payload != null && obj.payload.byteLength > 0) {
w.uint32(10);
w.bytes(obj.payload);
}
if (obj.contentTopic != null && obj.contentTopic !== "") {
w.uint32(18);
w.string(obj.contentTopic);
}
if (obj.version != null) {
w.uint32(24);
w.uint32(obj.version);
}
if (obj.timestamp != null) {
w.uint32(80);
w.sint64(obj.timestamp);
}
if (obj.meta != null) {
w.uint32(90);
w.bytes(obj.meta);
}
if (obj.rateLimitProof != null) {
w.uint32(170);
RateLimitProof.codec().encode(obj.rateLimitProof, w);
}
if (obj.ephemeral != null) {
w.uint32(248);
w.bool(obj.ephemeral);
}
if (opts.lengthDelimited !== false) {
w.ldelim();
}
},
(reader, length) => {
const obj: any = {
payload: new Uint8Array(0),
contentTopic: "",
};
const end = length == null ? reader.len : reader.pos + length;
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
obj.payload = reader.bytes();
break;
case 2:
obj.contentTopic = reader.string();
break;
case 3:
obj.version = reader.uint32();
break;
case 10:
obj.timestamp = reader.sint64();
break;
case 11:
obj.meta = reader.bytes();
break;
case 21:
obj.rateLimitProof = RateLimitProof.codec().decode(
reader,
reader.uint32()
);
break;
case 31:
obj.ephemeral = reader.bool();
break;
default:
reader.skipType(tag & 7);
break;
}
}
return obj;
}
);
}
return _codec;
};
export const encode = (obj: Partial<WakuMessage>): Uint8Array => {
return encodeMessage(obj, WakuMessage.codec());
};
export const decode = (buf: Uint8Array | Uint8ArrayList): WakuMessage => {
return decodeMessage(buf, WakuMessage.codec());
};
}

View File

@ -82,6 +82,10 @@ export class NimGoNode {
return isGoWaku ? "go-waku" : "nwaku";
}
get nodeType(): "go-waku" | "nwaku" {
return isGoWaku ? "go-waku" : "nwaku";
}
async start(args: Args = {}): Promise<void> {
this.docker = await Dockerode.createInstance(DOCKER_IMAGE_NAME);
try {
@ -126,6 +130,7 @@ export class NimGoNode {
tcpPort,
websocketPort,
...(args?.peerExchange && { discv5UdpPort }),
...(isGoWaku && { minRelayPeersToPublish: 0 }),
},
{ rpcAddress: "0.0.0.0" },
args

View File

@ -5,7 +5,7 @@ import {
waitForRemotePeer,
} from "@waku/core";
import { createLightNode } from "@waku/create";
import { Protocols } from "@waku/interfaces";
import { IFilterV2, IFilterV2Subscription, Protocols } from "@waku/interfaces";
import type { LightNode } from "@waku/interfaces";
import {
createDecoder as eciesDecoder,
@ -42,6 +42,8 @@ describe("Waku Message Ephemeral field", () => {
let waku: LightNode;
let nwaku: NimGoNode;
let subscription: IFilterV2Subscription;
afterEach(async function () {
!!nwaku &&
nwaku.stop().catch((e) => console.log("Nwaku failed to stop", e));
@ -69,6 +71,8 @@ describe("Waku Message Ephemeral field", () => {
Protocols.LightPush,
Protocols.Store,
]);
subscription = await (waku.filter as IFilterV2).createSubscription();
});
it("Ephemeral messages are not stored", async function () {
@ -177,7 +181,7 @@ describe("Waku Message Ephemeral field", () => {
const callback = (msg: DecodedMessage): void => {
messages.push(msg);
};
await waku.filter.subscribe([TestDecoder], callback);
await subscription.subscribe([TestDecoder], callback);
await delay(200);
const normalTxt = "Normal message";
@ -226,7 +230,7 @@ describe("Waku Message Ephemeral field", () => {
const callback = (msg: DecodedMessage): void => {
messages.push(msg);
};
await waku.filter.subscribe([decoder], callback);
await subscription.subscribe([decoder], callback);
await delay(200);
const normalTxt = "Normal message";
@ -276,7 +280,7 @@ describe("Waku Message Ephemeral field", () => {
const callback = (msg: DecodedMessage): void => {
messages.push(msg);
};
await waku.filter.subscribe([decoder], callback);
await subscription.subscribe([decoder], callback);
await delay(200);
const normalTxt = "Normal message";

View File

@ -6,7 +6,7 @@ import {
waitForRemotePeer,
} from "@waku/core";
import { createLightNode } from "@waku/create";
import type { LightNode } from "@waku/interfaces";
import type { IFilter, LightNode } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
@ -21,10 +21,12 @@ const TestContentTopic = "/test/1/waku-filter";
const TestEncoder = createEncoder({ contentTopic: TestContentTopic });
const TestDecoder = createDecoder(TestContentTopic);
describe("Waku Filter", () => {
describe("Waku Filter: V1", () => {
let waku: LightNode;
let nwaku: NimGoNode;
let filter: IFilter;
afterEach(async function () {
!!nwaku &&
nwaku.stop().catch((e) => console.log("Nwaku failed to stop", e));
@ -36,12 +38,15 @@ describe("Waku Filter", () => {
nwaku = new NimGoNode(makeLogFileName(this));
await nwaku.start({ filter: true, lightpush: true, relay: true });
waku = await createLightNode({
useFilterV1: true,
staticNoiseKey: NOISE_KEY_1,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
filter = waku.filter as IFilter;
});
it("creates a subscription", async function () {
@ -59,7 +64,7 @@ describe("Waku Filter", () => {
expect(bytesToUtf8(msg.payload)).to.eq(messageText);
};
await waku.filter.subscribe([TestDecoder], callback);
await filter.subscribe([TestDecoder], callback);
// As the filter protocol does not cater for an ack of subscription
// we cannot know whether the subscription happened. Something we want to
// correct in future versions of the protocol.
@ -80,7 +85,7 @@ describe("Waku Filter", () => {
messageCount++;
expect(msg.contentTopic).to.eq(TestContentTopic);
};
await waku.filter.subscribe(TestDecoder, callback);
await filter.subscribe(TestDecoder, callback);
await delay(200);
await waku.lightPush.send(TestEncoder, {
@ -100,7 +105,7 @@ describe("Waku Filter", () => {
const callback = (): void => {
messageCount++;
};
const unsubscribe = await waku.filter.subscribe([TestDecoder], callback);
const unsubscribe = await filter.subscribe([TestDecoder], callback);
await delay(200);
await waku.lightPush.send(TestEncoder, {

View File

@ -0,0 +1,217 @@
import {
createDecoder,
createEncoder,
DecodedMessage,
DefaultPubSubTopic,
waitForRemotePeer,
} from "@waku/core";
import { createLightNode } from "@waku/create";
import type {
IFilterV2,
IFilterV2Subscription,
LightNode,
} from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import debug from "debug";
import {
delay,
makeLogFileName,
NimGoNode,
NOISE_KEY_1,
} from "../src/index.js";
const log = debug("waku:test");
const TestContentTopic = "/test/1/waku-filter";
const TestEncoder = createEncoder({ contentTopic: TestContentTopic });
const TestDecoder = createDecoder(TestContentTopic);
describe("Waku Filter: V2", () => {
let waku: LightNode;
let nwaku: NimGoNode;
let subscription: IFilterV2Subscription;
afterEach(async function () {
!!nwaku &&
nwaku.stop().catch((e) => console.log("Nwaku failed to stop", e));
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
});
beforeEach(async function () {
this.timeout(15000);
nwaku = new NimGoNode(makeLogFileName(this));
await nwaku.start({
filter: true,
lightpush: true,
relay: true,
});
waku = await createLightNode({
staticNoiseKey: NOISE_KEY_1,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
subscription = await (waku.filter as IFilterV2).createSubscription();
});
it("creates a subscription", async function () {
this.timeout(10000);
let messageCount = 0;
const messageText = "Filtering works!";
const message = { payload: utf8ToBytes(messageText) };
const callback = (msg: DecodedMessage): void => {
log("Got a message");
messageCount++;
expect(msg.contentTopic).to.eq(TestContentTopic);
expect(msg.pubSubTopic).to.eq(DefaultPubSubTopic);
expect(bytesToUtf8(msg.payload)).to.eq(messageText);
};
await subscription.subscribe([TestDecoder], callback);
await waku.lightPush.send(TestEncoder, message);
while (messageCount === 0) {
await delay(250);
}
expect(messageCount).to.eq(1);
});
it("modifies subscription", async function () {
this.timeout(10000);
let messageCount = 0;
const messageText = "Filtering works!";
const message = { payload: utf8ToBytes(messageText) };
const callback = (msg: DecodedMessage): void => {
log("Got a message");
messageCount++;
expect(msg.contentTopic).to.eq(TestContentTopic);
expect(msg.pubSubTopic).to.eq(DefaultPubSubTopic);
expect(bytesToUtf8(msg.payload)).to.eq(messageText);
};
await subscription.subscribe([TestDecoder], callback);
await delay(200);
await waku.lightPush.send(TestEncoder, message);
while (messageCount === 0) {
await delay(250);
}
expect(messageCount).to.eq(1);
// Modify subscription
messageCount = 0;
const newMessageText = "Filtering still works!";
const newMessage = { payload: utf8ToBytes(newMessageText) };
const newContentTopic = "/test/2/waku-filter";
const newEncoder = createEncoder({ contentTopic: newContentTopic });
const newDecoder = createDecoder(newContentTopic);
const newCallback = (msg: DecodedMessage): void => {
log("Got a message");
messageCount++;
expect(msg.contentTopic).to.eq(newContentTopic);
expect(msg.pubSubTopic).to.eq(DefaultPubSubTopic);
expect(bytesToUtf8(msg.payload)).to.eq(newMessageText);
};
await subscription.subscribe([newDecoder], newCallback);
await waku.lightPush.send(newEncoder, newMessage);
while (messageCount === 0) {
await delay(250);
}
expect(messageCount).to.eq(1);
});
it("handles multiple messages", async function () {
this.timeout(10000);
let messageCount = 0;
const callback = (msg: DecodedMessage): void => {
messageCount++;
expect(msg.contentTopic).to.eq(TestContentTopic);
};
await subscription.subscribe(TestDecoder, callback);
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("Filtering works!"),
});
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("Filtering still works!"),
});
while (messageCount < 2) {
await delay(250);
}
expect(messageCount).to.eq(2);
});
it("unsubscribes", async function () {
let messageCount = 0;
const callback = (): void => {
messageCount++;
};
await subscription.subscribe([TestDecoder], callback);
await delay(200);
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("This should be received"),
});
await delay(100);
await subscription.unsubscribe([TestContentTopic]);
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("This should not be received"),
});
await delay(100);
expect(messageCount).to.eq(1);
});
it("ping", async function () {
let messageCount = 0;
const callback = (): void => {
messageCount++;
};
await subscription.subscribe([TestDecoder], callback);
await delay(200);
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("This should be received"),
});
await delay(100);
await subscription.ping();
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("This should also be received"),
});
await delay(100);
expect(messageCount).to.eq(2);
});
it("unsubscribes all", async function () {
let messageCount = 0;
const callback = (): void => {
messageCount++;
};
await subscription.subscribe([TestDecoder], callback);
await delay(200);
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("This should be received"),
});
await delay(100);
await subscription.unsubscribeAll();
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("This should not be received"),
});
await delay(100);
expect(messageCount).to.eq(1);
});
});

View File

@ -5,7 +5,7 @@ import {
waitForRemotePeer,
} from "@waku/core";
import { createLightNode } from "@waku/create";
import type { LightNode } from "@waku/interfaces";
import type { IFilter, IFilterV2, LightNode } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { toAsyncIterator } from "@waku/utils";
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
@ -18,21 +18,25 @@ const TestContentTopic = "/test/1/waku-filter";
const TestEncoder = createEncoder({ contentTopic: TestContentTopic });
const TestDecoder = createDecoder(TestContentTopic);
describe("Util: toAsyncIterator", () => {
describe("Util: toAsyncIterator: FilterV1", () => {
let waku: LightNode;
let nwaku: NimGoNode;
let filter: IFilter;
beforeEach(async function () {
this.timeout(15000);
nwaku = new NimGoNode(makeLogFileName(this));
await nwaku.start({ filter: true, lightpush: true, relay: true });
waku = await createLightNode({
useFilterV1: true,
staticNoiseKey: NOISE_KEY_1,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
filter = waku.filter as IFilter;
});
afterEach(async () => {
@ -50,7 +54,7 @@ describe("Util: toAsyncIterator", () => {
const sent = { payload: utf8ToBytes(messageText) };
const { iterator } = await toAsyncIterator(
waku.filter,
filter,
TestDecoder,
{},
{ timeoutMs: 1000 }
@ -67,7 +71,7 @@ describe("Util: toAsyncIterator", () => {
it("handles multiple messages", async function () {
this.timeout(10000);
const { iterator } = await toAsyncIterator(
waku.filter,
filter,
TestDecoder,
{},
{ timeoutMs: 1000 }
@ -90,7 +94,108 @@ describe("Util: toAsyncIterator", () => {
it("unsubscribes", async function () {
this.timeout(10000);
const { iterator, stop } = await toAsyncIterator(
waku.filter,
filter,
TestDecoder,
{},
{ timeoutMs: 1000 }
);
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("This should be received"),
});
await stop();
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("This should not be received"),
});
let result = await iterator.next();
expect(result.done).to.eq(true);
expect(bytesToUtf8(result.value.payload)).to.eq("This should be received");
result = await iterator.next();
expect(result.value).to.eq(undefined);
expect(result.done).to.eq(true);
});
});
describe("Util: toAsyncIterator: FilterV2", () => {
let waku: LightNode;
let nwaku: NimGoNode;
let filter: IFilterV2;
beforeEach(async function () {
this.timeout(15000);
nwaku = new NimGoNode(makeLogFileName(this));
await nwaku.start({ filter: true, lightpush: true, relay: true });
waku = await createLightNode({
staticNoiseKey: NOISE_KEY_1,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
filter = waku.filter as IFilterV2;
});
afterEach(async () => {
try {
await nwaku.stop();
await waku.stop();
} catch (err) {
console.log("Failed to stop", err);
}
});
it("creates an iterator", async function () {
this.timeout(10000);
const messageText = "hey, what's up?";
const sent = { payload: utf8ToBytes(messageText) };
const { iterator } = await toAsyncIterator(
filter,
TestDecoder,
{},
{ timeoutMs: 1000 }
);
await waku.lightPush.send(TestEncoder, sent);
const { value } = await iterator.next();
expect(value.contentTopic).to.eq(TestContentTopic);
expect(value.pubSubTopic).to.eq(DefaultPubSubTopic);
expect(bytesToUtf8(value.payload)).to.eq(messageText);
});
it("handles multiple messages", async function () {
this.timeout(10000);
const { iterator } = await toAsyncIterator(
filter,
TestDecoder,
{},
{ timeoutMs: 1000 }
);
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("Filtering works!"),
});
await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes("Filtering still works!"),
});
let result = await iterator.next();
expect(bytesToUtf8(result.value.payload)).to.eq("Filtering works!");
result = await iterator.next();
expect(bytesToUtf8(result.value.payload)).to.eq("Filtering still works!");
});
it("unsubscribes", async function () {
this.timeout(10000);
const { iterator, stop } = await toAsyncIterator(
filter,
TestDecoder,
{},
{ timeoutMs: 1000 }

View File

@ -1,5 +1,13 @@
export * from "./is_defined.js";
export * from "./random_subset.js";
export function removeItemFromArray(arr: unknown[], value: unknown): unknown[] {
const index = arr.indexOf(value);
if (index > -1) {
arr.splice(index, 1);
}
return arr;
}
export * from "./group_by.js";
export * from "./to_async_iterator.js";
export * from "./is_size_valid.js";