diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index c782757909..04e37bd19a 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -27,3 +27,5 @@ export { ConnectionManager } from "./lib/connection_manager.js"; export { KeepAliveManager } from "./lib/keep_alive_manager.js"; export { StreamManager } from "./lib/stream_manager.js"; + +export { MetadataCodec, wakuMetadata } from "./lib/metadata/index.js"; diff --git a/packages/core/src/lib/metadata/index.ts b/packages/core/src/lib/metadata/index.ts new file mode 100644 index 0000000000..b3c6892ab0 --- /dev/null +++ b/packages/core/src/lib/metadata/index.ts @@ -0,0 +1,105 @@ +import type { PeerId } from "@libp2p/interface/peer-id"; +import { IncomingStreamData } from "@libp2p/interface/stream-handler"; +import { encodeRelayShard } from "@waku/enr"; +import type { IMetadata, Libp2pComponents, ShardInfo } from "@waku/interfaces"; +import { proto_metadata } from "@waku/proto"; +import { Logger } from "@waku/utils"; +import all from "it-all"; +import * as lp from "it-length-prefixed"; +import { pipe } from "it-pipe"; +import { Uint8ArrayList } from "uint8arraylist"; + +import { BaseProtocol } from "../base_protocol.js"; + +const log = new Logger("metadata"); + +export const MetadataCodec = "/vac/waku/metadata/1.0.0"; + +class Metadata extends BaseProtocol { + private readonly shardInfo: ShardInfo; + private libp2pComponents: Libp2pComponents; + constructor(shardInfo: ShardInfo, libp2p: Libp2pComponents) { + super(MetadataCodec, libp2p.components); + this.libp2pComponents = libp2p; + this.shardInfo = shardInfo; + void libp2p.registrar.handle(MetadataCodec, (streamData) => { + void this.onRequest(streamData); + }); + } + + /** + * Handle an incoming metadata request + */ + private async onRequest(streamData: IncomingStreamData): Promise { + try { + const { stream, connection } = streamData; + const encodedShardInfo = proto_metadata.WakuMetadataResponse.encode( + this.shardInfo + ); + + const encodedResponse = await pipe( + [encodedShardInfo], + lp.encode, + stream, + lp.decode, + async (source) => await all(source) + ); + + const remoteShardInfoResponse = + this.decodeMetadataResponse(encodedResponse); + + // add or update the shardInfo to peer store + await this.libp2pComponents.peerStore.merge(connection.remotePeer, { + metadata: { + shardInfo: encodeRelayShard(remoteShardInfoResponse) + } + }); + } catch (error) { + log.error("Error handling metadata request", error); + } + } + + /** + * Make a metadata query to a peer + */ + async query(peerId: PeerId): Promise { + const request = proto_metadata.WakuMetadataRequest.encode(this.shardInfo); + + const peer = await this.getPeer(peerId); + + const stream = await this.getStream(peer); + + const encodedResponse = await pipe( + [request], + lp.encode, + stream, + lp.decode, + async (source) => await all(source) + ); + + const decodedResponse = this.decodeMetadataResponse(encodedResponse); + + return decodedResponse; + } + + private decodeMetadataResponse(encodedResponse: Uint8ArrayList[]): ShardInfo { + const bytes = new Uint8ArrayList(); + + encodedResponse.forEach((chunk) => { + bytes.append(chunk); + }); + const response = proto_metadata.WakuMetadataResponse.decode( + bytes + ) as ShardInfo; + + if (!response) log.error("Error decoding metadata response"); + + return response; + } +} + +export function wakuMetadata( + shardInfo: ShardInfo +): (components: Libp2pComponents) => IMetadata { + return (components: Libp2pComponents) => new Metadata(shardInfo, components); +} diff --git a/packages/interfaces/src/index.ts b/packages/interfaces/src/index.ts index 80b653ddbf..6ce8738a0d 100644 --- a/packages/interfaces/src/index.ts +++ b/packages/interfaces/src/index.ts @@ -14,3 +14,4 @@ export * from "./misc.js"; export * from "./libp2p.js"; export * from "./keep_alive_manager.js"; export * from "./dns_discovery.js"; +export * from "./metadata.js"; diff --git a/packages/interfaces/src/libp2p.ts b/packages/interfaces/src/libp2p.ts index e657000e3a..daef10606b 100644 --- a/packages/interfaces/src/libp2p.ts +++ b/packages/interfaces/src/libp2p.ts @@ -4,8 +4,11 @@ import type { Libp2pInit, Libp2pOptions } from "libp2p"; import type { identifyService } from "libp2p/identify"; import type { PingService } from "libp2p/ping"; +import { IMetadata } from "./metadata"; + export type Libp2pServices = { ping: PingService; + metadata?: IMetadata; pubsub?: GossipSub; identify: ReturnType>; }; diff --git a/packages/interfaces/src/metadata.ts b/packages/interfaces/src/metadata.ts new file mode 100644 index 0000000000..f37bcba0f3 --- /dev/null +++ b/packages/interfaces/src/metadata.ts @@ -0,0 +1,8 @@ +import type { PeerId } from "@libp2p/interface/peer-id"; + +import type { ShardInfo } from "./enr.js"; +import type { IBaseProtocol } from "./protocols.js"; + +export interface IMetadata extends IBaseProtocol { + query(peerId: PeerId): Promise; +} diff --git a/packages/proto/src/index.ts b/packages/proto/src/index.ts index 9608cafa94..87153e250c 100644 --- a/packages/proto/src/index.ts +++ b/packages/proto/src/index.ts @@ -13,3 +13,5 @@ export { PushResponse } from "./lib/light_push.js"; export * as proto_store from "./lib/store.js"; export * as proto_peer_exchange from "./lib/peer_exchange.js"; + +export * as proto_metadata from './lib/metadata.js' diff --git a/packages/proto/src/lib/metadata.proto b/packages/proto/src/lib/metadata.proto new file mode 100644 index 0000000000..445e898ec3 --- /dev/null +++ b/packages/proto/src/lib/metadata.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; + + +message WakuMetadataRequest { + optional uint32 cluster_id = 1; + repeated uint32 shards = 2; +} + +message WakuMetadataResponse { + optional uint32 cluster_id = 1; + repeated uint32 shards = 2; +} \ No newline at end of file diff --git a/packages/proto/src/lib/metadata.ts b/packages/proto/src/lib/metadata.ts new file mode 100644 index 0000000000..edb8994baa --- /dev/null +++ b/packages/proto/src/lib/metadata.ts @@ -0,0 +1,147 @@ +/* eslint-disable import/export */ +/* eslint-disable complexity */ +/* eslint-disable @typescript-eslint/no-namespace */ +/* eslint-disable @typescript-eslint/no-unnecessary-boolean-literal-compare */ +/* eslint-disable @typescript-eslint/no-empty-interface */ + +import { encodeMessage, decodeMessage, message } from 'protons-runtime' +import type { Codec } from 'protons-runtime' +import type { Uint8ArrayList } from 'uint8arraylist' + +export interface WakuMetadataRequest { + clusterId?: number + shards: number[] +} + +export namespace WakuMetadataRequest { + let _codec: Codec + + export const codec = (): Codec => { + if (_codec == null) { + _codec = message((obj, w, opts = {}) => { + if (opts.lengthDelimited !== false) { + w.fork() + } + + if (obj.clusterId != null) { + w.uint32(8) + w.uint32(obj.clusterId) + } + + if (obj.shards != null) { + for (const value of obj.shards) { + w.uint32(16) + w.uint32(value) + } + } + + if (opts.lengthDelimited !== false) { + w.ldelim() + } + }, (reader, length) => { + const obj: any = { + shards: [] + } + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: + obj.clusterId = reader.uint32() + break + case 2: + obj.shards.push(reader.uint32()) + break + default: + reader.skipType(tag & 7) + break + } + } + + return obj + }) + } + + return _codec + } + + export const encode = (obj: Partial): Uint8Array => { + return encodeMessage(obj, WakuMetadataRequest.codec()) + } + + export const decode = (buf: Uint8Array | Uint8ArrayList): WakuMetadataRequest => { + return decodeMessage(buf, WakuMetadataRequest.codec()) + } +} + +export interface WakuMetadataResponse { + clusterId?: number + shards: number[] +} + +export namespace WakuMetadataResponse { + let _codec: Codec + + export const codec = (): Codec => { + if (_codec == null) { + _codec = message((obj, w, opts = {}) => { + if (opts.lengthDelimited !== false) { + w.fork() + } + + if (obj.clusterId != null) { + w.uint32(8) + w.uint32(obj.clusterId) + } + + if (obj.shards != null) { + for (const value of obj.shards) { + w.uint32(16) + w.uint32(value) + } + } + + if (opts.lengthDelimited !== false) { + w.ldelim() + } + }, (reader, length) => { + const obj: any = { + shards: [] + } + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: + obj.clusterId = reader.uint32() + break + case 2: + obj.shards.push(reader.uint32()) + break + default: + reader.skipType(tag & 7) + break + } + } + + return obj + }) + } + + return _codec + } + + export const encode = (obj: Partial): Uint8Array => { + return encodeMessage(obj, WakuMetadataResponse.codec()) + } + + export const decode = (buf: Uint8Array | Uint8ArrayList): WakuMetadataResponse => { + return decodeMessage(buf, WakuMetadataResponse.codec()) + } +} diff --git a/packages/sdk/src/create.ts b/packages/sdk/src/create.ts index c3ed95ead6..11668689ee 100644 --- a/packages/sdk/src/create.ts +++ b/packages/sdk/src/create.ts @@ -8,6 +8,7 @@ import { DefaultUserAgent, wakuFilter, wakuLightPush, + wakuMetadata, WakuNode, WakuOptions, wakuStore @@ -16,11 +17,13 @@ import { enrTree, wakuDnsDiscovery } from "@waku/dns-discovery"; import type { CreateLibp2pOptions, FullNode, + IMetadata, Libp2p, Libp2pComponents, LightNode, ProtocolCreateOptions, - RelayNode + RelayNode, + ShardInfo } from "@waku/interfaces"; import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange"; import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay"; @@ -54,6 +57,7 @@ export async function createLightNode( } const libp2p = await defaultLibp2p( + options.shardInfo, undefined, libp2pOptions, options?.userAgent @@ -90,6 +94,7 @@ export async function createRelayNode( } const libp2p = await defaultLibp2p( + options.shardInfo, wakuGossipSub(options), libp2pOptions, options?.userAgent @@ -134,6 +139,7 @@ export async function createFullNode( } const libp2p = await defaultLibp2p( + options.shardInfo, wakuGossipSub(options), libp2pOptions, options?.userAgent @@ -169,7 +175,12 @@ type PubsubService = { pubsub?: (components: Libp2pComponents) => GossipSub; }; +type MetadataService = { + metadata?: (components: Libp2pComponents) => IMetadata; +}; + export async function defaultLibp2p( + shardInfo?: ShardInfo, wakuGossipSub?: PubsubService["pubsub"], options?: Partial, userAgent?: string @@ -191,6 +202,10 @@ export async function defaultLibp2p( ? { pubsub: wakuGossipSub } : {}; + const metadataService: MetadataService = shardInfo + ? { metadata: wakuMetadata(shardInfo) } + : {}; + return createLibp2p({ connectionManager: { minConnections: 1 @@ -204,6 +219,7 @@ export async function defaultLibp2p( agentVersion: userAgent ?? DefaultUserAgent }), ping: pingService(), + ...metadataService, ...pubsubService, ...options?.services } diff --git a/packages/tests/tests/metadata.spec.ts b/packages/tests/tests/metadata.spec.ts new file mode 100644 index 0000000000..b8bd961436 --- /dev/null +++ b/packages/tests/tests/metadata.spec.ts @@ -0,0 +1,204 @@ +import { MetadataCodec } from "@waku/core"; +import { decodeRelayShard } from "@waku/enr"; +import type { LightNode, ShardInfo } from "@waku/interfaces"; +import { createLightNode } from "@waku/sdk"; +import { shardInfoToPubsubTopics } from "@waku/utils"; +import chai, { expect } from "chai"; +import chaiAsPromised from "chai-as-promised"; + +import { delay, tearDownNodes } from "../src/index.js"; +import { makeLogFileName } from "../src/log_file.js"; +import { NimGoNode } from "../src/node/node.js"; + +chai.use(chaiAsPromised); + +describe("Metadata Protocol", () => { + let waku: LightNode; + let nwaku1: NimGoNode; + + beforeEach(function () { + nwaku1 = new NimGoNode(makeLogFileName(this) + "1"); + }); + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes([nwaku1], waku); + }); + + describe("connections", function () { + it("same cluster, same shard: nodes connect", async function () { + this.timeout(55_000); + + const shardInfo: ShardInfo = { + clusterId: 1, + shards: [1] + }; + + await nwaku1.start({ + relay: true, + discv5Discovery: true, + peerExchange: true, + clusterId: shardInfo.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo) + }); + + const nwaku1Ma = await nwaku1.getMultiaddrWithId(); + const nwaku1PeerId = await nwaku1.getPeerId(); + + waku = await createLightNode({ shardInfo }); + await waku.start(); + await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec); + + const shardInfoRes = + await waku.libp2p.services.metadata?.query(nwaku1PeerId); + expect(shardInfoRes).to.not.be.undefined; + expect(shardInfoRes?.clusterId).to.equal(shardInfo.clusterId); + expect(shardInfoRes?.shards).to.deep.equal(shardInfo.shards); + + const activeConnections = waku.libp2p.getConnections(); + expect(activeConnections.length).to.equal(1); + }); + + it("same cluster, different shard: nodes connect", async function () { + this.timeout(55_000); + + const shardInfo1: ShardInfo = { + clusterId: 1, + shards: [1] + }; + + const shardInfo2: ShardInfo = { + clusterId: 1, + shards: [2] + }; + + await nwaku1.start({ + relay: true, + discv5Discovery: true, + peerExchange: true, + clusterId: shardInfo1.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo1) + }); + + const nwaku1Ma = await nwaku1.getMultiaddrWithId(); + const nwaku1PeerId = await nwaku1.getPeerId(); + + waku = await createLightNode({ shardInfo: shardInfo2 }); + await waku.start(); + await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec); + + const shardInfoRes = + await waku.libp2p.services.metadata?.query(nwaku1PeerId); + expect(shardInfoRes).to.not.be.undefined; + expect(shardInfoRes?.clusterId).to.equal(shardInfo1.clusterId); + expect(shardInfoRes?.shards).to.deep.equal(shardInfo1.shards); + + const activeConnections = waku.libp2p.getConnections(); + expect(activeConnections.length).to.equal(1); + }); + + it("different cluster, same shard: nodes don't connect", async function () { + this.timeout(55_000); + + const shardInfo1: ShardInfo = { + clusterId: 1, + shards: [1] + }; + + const shardInfo2: ShardInfo = { + clusterId: 2, + shards: [1] + }; + + await nwaku1.start({ + relay: true, + discv5Discovery: true, + peerExchange: true, + clusterId: shardInfo1.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo1) + }); + + const nwaku1Ma = await nwaku1.getMultiaddrWithId(); + + waku = await createLightNode({ shardInfo: shardInfo2 }); + await waku.start(); + await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec); + + // add a delay to make sure the connection is closed from the other side + await delay(100); + + const activeConnections = waku.libp2p.getConnections(); + expect(activeConnections.length).to.equal(0); + }); + + it("different cluster, different shard: nodes don't connect", async function () { + this.timeout(55_000); + + const shardInfo1: ShardInfo = { + clusterId: 1, + shards: [1] + }; + + const shardInfo2: ShardInfo = { + clusterId: 2, + shards: [2] + }; + + await nwaku1.start({ + relay: true, + discv5Discovery: true, + peerExchange: true, + clusterId: shardInfo1.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo1) + }); + + const nwaku1Ma = await nwaku1.getMultiaddrWithId(); + + waku = await createLightNode({ shardInfo: shardInfo2 }); + await waku.start(); + await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec); + + // add a delay to make sure the connection is closed from the other side + await delay(100); + + const activeConnections = waku.libp2p.getConnections(); + expect(activeConnections.length).to.equal(0); + }); + }); + + it("PeerStore has remote peer's shard info after successful connection", async function () { + const shardInfo: ShardInfo = { + clusterId: 1, + shards: [1] + }; + + await nwaku1.start({ + relay: true, + discv5Discovery: true, + peerExchange: true, + clusterId: shardInfo.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo) + }); + + const nwaku1Ma = await nwaku1.getMultiaddrWithId(); + const nwaku1PeerId = await nwaku1.getPeerId(); + + waku = await createLightNode({ shardInfo }); + await waku.start(); + await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec); + + // delay to ensure the connection is estabilished and shardInfo is updated + await delay(500); + + const encodedShardInfo = ( + await waku.libp2p.peerStore.get(nwaku1PeerId) + ).metadata.get("shardInfo"); + expect(encodedShardInfo).to.not.be.undefined; + + const metadataShardInfo = decodeRelayShard(encodedShardInfo!); + expect(metadataShardInfo).not.be.undefined; + + expect(metadataShardInfo!.clusterId).to.eq(shardInfo.clusterId); + expect(metadataShardInfo.shards).to.deep.eq(shardInfo.shards); + }); +});