fryorcraken 3842d84b55
feat!: Introduce routing info concept
Concepts are being mixed up between the global network config (static vs auto sharding), that needs to be the same of all nodes in the network, individual node configuration (eg relay node subscribing to a given shard), and the routing characteristic of a specific message (eg pubsub topic, shard).

This stops proper configuration of nwaku post 0.36.0 because we know need to be deliberate on whether nwaku nodes are running with auto or static sharding.

It also included various back and forth conversions between shards, pubsub topics, etc.

With this change, we tidy up the network configuration, and make it explicit whether it is static or auto sharded.
We also introduce the concept of routing info, which is specific to a message, and tied to the overall network configuration.

Routing info abstract pubsub topic, shard, and autosharding needs. Which should lead to easier tidy up of the pubsub concept at a later stage.

# Conflicts:
#	packages/core/src/lib/connection_manager/connection_manager.ts
#	packages/core/src/lib/metadata/metadata.ts
#	packages/interfaces/src/metadata.ts
#	packages/interfaces/src/sharding.ts
#	packages/relay/src/create.ts
#	packages/sdk/src/filter/filter.ts
#	packages/sdk/src/filter/types.ts
#	packages/sdk/src/light_push/light_push.spec.ts
#	packages/tests/tests/sharding/auto_sharding.spec.ts
#	packages/tests/tests/sharding/static_sharding.spec.ts

# Conflicts:
#	packages/sdk/src/store/store.ts
2025-07-19 13:22:45 +10:00

187 lines
4.6 KiB
TypeScript

import type { PeerId } from "@libp2p/interface";
import { IncomingStreamData } from "@libp2p/interface";
import {
type ClusterId,
type IMetadata,
type Libp2pComponents,
type MetadataQueryResult,
type PeerIdStr,
ProtocolError,
type RelayShards
} from "@waku/interfaces";
import { proto_metadata } from "@waku/proto";
import { encodeRelayShard, Logger } from "@waku/utils";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { Uint8ArrayList } from "uint8arraylist";
import { StreamManager } from "../stream_manager/index.js";
const log = new Logger("metadata");
export const MetadataCodec = "/vac/waku/metadata/1.0.0";
class Metadata implements IMetadata {
private readonly streamManager: StreamManager;
private readonly libp2pComponents: Libp2pComponents;
protected handshakesConfirmed: Map<PeerIdStr, RelayShards> = new Map();
public readonly multicodec = MetadataCodec;
public constructor(
public clusterId: ClusterId,
libp2p: Libp2pComponents
) {
this.streamManager = new StreamManager(MetadataCodec, libp2p);
this.libp2pComponents = libp2p;
void libp2p.registrar.handle(MetadataCodec, (streamData) => {
void this.onRequest(streamData);
});
}
/**
* Make a metadata query to a peer
*/
public async query(peerId: PeerId): Promise<MetadataQueryResult> {
const request = proto_metadata.WakuMetadataRequest.encode({
clusterId: this.clusterId,
shards: [] // Only services node need to provide shards
});
const peer = await this.libp2pComponents.peerStore.get(peerId);
if (!peer) {
return {
shardInfo: null,
error: ProtocolError.NO_PEER_AVAILABLE
};
}
let stream;
try {
stream = await this.streamManager.getStream(peerId);
} catch (error) {
log.error("Failed to get stream", error);
return {
shardInfo: null,
error: ProtocolError.NO_STREAM_AVAILABLE
};
}
const encodedResponse = await pipe(
[request],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
const { error, shardInfo } = this.decodeMetadataResponse(encodedResponse);
if (error) {
return {
shardInfo: null,
error
};
}
await this.savePeerShardInfo(peerId, shardInfo);
return {
shardInfo,
error: null
};
}
public async confirmOrAttemptHandshake(
peerId: PeerId
): Promise<MetadataQueryResult> {
const shardInfo = this.handshakesConfirmed.get(peerId.toString());
if (shardInfo) {
return {
shardInfo,
error: null
};
}
return await this.query(peerId);
}
/**
* Handle an incoming metadata request
*/
private async onRequest(streamData: IncomingStreamData): Promise<void> {
try {
const { stream, connection } = streamData;
const encodedShardInfo = proto_metadata.WakuMetadataResponse.encode({
clusterId: this.clusterId,
shards: [] // Only service nodes need to provide shards
});
const encodedResponse = await pipe(
[encodedShardInfo],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
const { error, shardInfo } = this.decodeMetadataResponse(encodedResponse);
if (error) {
return;
}
await this.savePeerShardInfo(connection.remotePeer, shardInfo);
} catch (error) {
log.error("Error handling metadata request", error);
}
}
private decodeMetadataResponse(
encodedResponse: Uint8ArrayList[]
): MetadataQueryResult {
const bytes = new Uint8ArrayList();
encodedResponse.forEach((chunk) => {
bytes.append(chunk);
});
const response = proto_metadata.WakuMetadataResponse.decode(
bytes
) as RelayShards;
if (!response) {
log.error("Error decoding metadata response");
return {
shardInfo: null,
error: ProtocolError.DECODE_FAILED
};
}
return {
shardInfo: response,
error: null
};
}
private async savePeerShardInfo(
peerId: PeerId,
relayShards: RelayShards
): Promise<void> {
// add or update the relayShards to peer store
await this.libp2pComponents.peerStore.merge(peerId, {
metadata: {
shardInfo: encodeRelayShard(relayShards)
}
});
this.handshakesConfirmed.set(peerId.toString(), relayShards);
}
}
export function wakuMetadata(
clusterId: ClusterId
): (components: Libp2pComponents) => IMetadata {
return (components: Libp2pComponents) => new Metadata(clusterId, components);
}