2025-01-31 00:16:00 +01:00
|
|
|
import type { PeerId } from "@libp2p/interface";
|
|
|
|
|
import { IncomingStreamData } from "@libp2p/interface";
|
|
|
|
|
import {
|
2025-07-11 12:55:02 +10:00
|
|
|
type ClusterId,
|
2025-01-31 00:16:00 +01:00
|
|
|
type IMetadata,
|
|
|
|
|
type Libp2pComponents,
|
|
|
|
|
type MetadataQueryResult,
|
|
|
|
|
type PeerIdStr,
|
|
|
|
|
ProtocolError,
|
|
|
|
|
type ShardInfo
|
|
|
|
|
} from "@waku/interfaces";
|
|
|
|
|
import { proto_metadata } from "@waku/proto";
|
2025-07-11 12:55:02 +10:00
|
|
|
import { encodeRelayShard, Logger } from "@waku/utils";
|
2025-01-31 00:16:00 +01:00
|
|
|
import all from "it-all";
|
|
|
|
|
import * as lp from "it-length-prefixed";
|
|
|
|
|
import { pipe } from "it-pipe";
|
|
|
|
|
import { Uint8ArrayList } from "uint8arraylist";
|
|
|
|
|
|
2025-06-20 12:53:42 +02:00
|
|
|
import { StreamManager } from "../stream_manager/index.js";
|
2025-01-31 00:16:00 +01:00
|
|
|
|
|
|
|
|
const log = new Logger("metadata");
|
|
|
|
|
|
|
|
|
|
export const MetadataCodec = "/vac/waku/metadata/1.0.0";
|
|
|
|
|
|
2025-06-20 12:53:42 +02:00
|
|
|
class Metadata implements IMetadata {
|
|
|
|
|
private readonly streamManager: StreamManager;
|
|
|
|
|
private readonly libp2pComponents: Libp2pComponents;
|
2025-01-31 00:16:00 +01:00
|
|
|
protected handshakesConfirmed: Map<PeerIdStr, ShardInfo> = new Map();
|
|
|
|
|
|
2025-06-20 12:53:42 +02:00
|
|
|
public readonly multicodec = MetadataCodec;
|
|
|
|
|
|
2025-01-31 00:16:00 +01:00
|
|
|
public constructor(
|
2025-07-11 12:55:02 +10:00
|
|
|
public clusterId: ClusterId,
|
2025-01-31 00:16:00 +01:00
|
|
|
libp2p: Libp2pComponents
|
|
|
|
|
) {
|
2025-06-20 12:53:42 +02:00
|
|
|
this.streamManager = new StreamManager(MetadataCodec, libp2p);
|
2025-01-31 00:16:00 +01:00
|
|
|
this.libp2pComponents = libp2p;
|
|
|
|
|
void libp2p.registrar.handle(MetadataCodec, (streamData) => {
|
|
|
|
|
void this.onRequest(streamData);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Make a metadata query to a peer
|
|
|
|
|
*/
|
|
|
|
|
public async query(peerId: PeerId): Promise<MetadataQueryResult> {
|
2025-07-11 12:55:02 +10:00
|
|
|
const request = proto_metadata.WakuMetadataRequest.encode({
|
|
|
|
|
clusterId: this.clusterId,
|
|
|
|
|
shards: [] // Only services node need to provide shards
|
|
|
|
|
});
|
2025-01-31 00:16:00 +01:00
|
|
|
|
|
|
|
|
const peer = await this.libp2pComponents.peerStore.get(peerId);
|
|
|
|
|
if (!peer) {
|
|
|
|
|
return {
|
|
|
|
|
shardInfo: null,
|
|
|
|
|
error: ProtocolError.NO_PEER_AVAILABLE
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
2025-08-12 23:25:23 +02:00
|
|
|
const stream = await this.streamManager.getStream(peerId);
|
|
|
|
|
|
|
|
|
|
if (!stream) {
|
|
|
|
|
log.error(`Failed to get a stream for remote peer:${peerId.toString()}`);
|
2025-01-31 00:16:00 +01:00
|
|
|
return {
|
|
|
|
|
shardInfo: null,
|
|
|
|
|
error: ProtocolError.NO_STREAM_AVAILABLE
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const encodedResponse = await pipe(
|
|
|
|
|
[request],
|
|
|
|
|
lp.encode,
|
|
|
|
|
stream,
|
|
|
|
|
lp.decode,
|
|
|
|
|
async (source) => await all(source)
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
const { error, shardInfo } = this.decodeMetadataResponse(encodedResponse);
|
|
|
|
|
|
|
|
|
|
if (error) {
|
|
|
|
|
return {
|
|
|
|
|
shardInfo: null,
|
|
|
|
|
error
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
await this.savePeerShardInfo(peerId, shardInfo);
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
shardInfo,
|
|
|
|
|
error: null
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public async confirmOrAttemptHandshake(
|
|
|
|
|
peerId: PeerId
|
|
|
|
|
): Promise<MetadataQueryResult> {
|
|
|
|
|
const shardInfo = this.handshakesConfirmed.get(peerId.toString());
|
|
|
|
|
if (shardInfo) {
|
|
|
|
|
return {
|
|
|
|
|
shardInfo,
|
|
|
|
|
error: null
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return await this.query(peerId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Handle an incoming metadata request
|
|
|
|
|
*/
|
|
|
|
|
private async onRequest(streamData: IncomingStreamData): Promise<void> {
|
|
|
|
|
try {
|
|
|
|
|
const { stream, connection } = streamData;
|
2025-07-11 12:55:02 +10:00
|
|
|
const encodedShardInfo = proto_metadata.WakuMetadataResponse.encode({
|
|
|
|
|
clusterId: this.clusterId,
|
|
|
|
|
shards: [] // Only service nodes need to provide shards
|
|
|
|
|
});
|
2025-01-31 00:16:00 +01:00
|
|
|
|
|
|
|
|
const encodedResponse = await pipe(
|
|
|
|
|
[encodedShardInfo],
|
|
|
|
|
lp.encode,
|
|
|
|
|
stream,
|
|
|
|
|
lp.decode,
|
|
|
|
|
async (source) => await all(source)
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
const { error, shardInfo } = this.decodeMetadataResponse(encodedResponse);
|
|
|
|
|
|
|
|
|
|
if (error) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
await this.savePeerShardInfo(connection.remotePeer, shardInfo);
|
|
|
|
|
} catch (error) {
|
|
|
|
|
log.error("Error handling metadata request", error);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private decodeMetadataResponse(
|
|
|
|
|
encodedResponse: Uint8ArrayList[]
|
|
|
|
|
): MetadataQueryResult {
|
|
|
|
|
const bytes = new Uint8ArrayList();
|
|
|
|
|
|
|
|
|
|
encodedResponse.forEach((chunk) => {
|
|
|
|
|
bytes.append(chunk);
|
|
|
|
|
});
|
|
|
|
|
const response = proto_metadata.WakuMetadataResponse.decode(
|
|
|
|
|
bytes
|
|
|
|
|
) as ShardInfo;
|
|
|
|
|
|
|
|
|
|
if (!response) {
|
|
|
|
|
log.error("Error decoding metadata response");
|
|
|
|
|
return {
|
|
|
|
|
shardInfo: null,
|
|
|
|
|
error: ProtocolError.DECODE_FAILED
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
shardInfo: response,
|
|
|
|
|
error: null
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private async savePeerShardInfo(
|
|
|
|
|
peerId: PeerId,
|
|
|
|
|
shardInfo: ShardInfo
|
|
|
|
|
): Promise<void> {
|
|
|
|
|
// add or update the shardInfo to peer store
|
|
|
|
|
await this.libp2pComponents.peerStore.merge(peerId, {
|
|
|
|
|
metadata: {
|
|
|
|
|
shardInfo: encodeRelayShard(shardInfo)
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
this.handshakesConfirmed.set(peerId.toString(), shardInfo);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
export function wakuMetadata(
|
2025-07-11 12:55:02 +10:00
|
|
|
clusterId: ClusterId
|
2025-01-31 00:16:00 +01:00
|
|
|
): (components: Libp2pComponents) => IMetadata {
|
2025-07-11 12:55:02 +10:00
|
|
|
return (components: Libp2pComponents) => new Metadata(clusterId, components);
|
2025-01-31 00:16:00 +01:00
|
|
|
}
|