mirror of
https://github.com/logos-messaging/logos-messaging-js.git
synced 2026-03-21 22:33:10 +00:00
chore: metadata protocol uses error codes
This commit is contained in:
parent
5952298e7f
commit
dd7354462a
@ -1,10 +1,12 @@
|
||||
import type { PeerId } from "@libp2p/interface";
|
||||
import { IncomingStreamData } from "@libp2p/interface";
|
||||
import type {
|
||||
IMetadata,
|
||||
Libp2pComponents,
|
||||
PeerIdStr,
|
||||
ShardInfo
|
||||
import {
|
||||
type IMetadata,
|
||||
type Libp2pComponents,
|
||||
type PeerIdStr,
|
||||
ProtocolError,
|
||||
QueryResult,
|
||||
type ShardInfo
|
||||
} from "@waku/interfaces";
|
||||
import { proto_metadata } from "@waku/proto";
|
||||
import { encodeRelayShard, Logger, shardInfoToPubsubTopics } from "@waku/utils";
|
||||
@ -21,7 +23,7 @@ export const MetadataCodec = "/vac/waku/metadata/1.0.0";
|
||||
|
||||
class Metadata extends BaseProtocol implements IMetadata {
|
||||
private libp2pComponents: Libp2pComponents;
|
||||
handshakesConfirmed: Set<PeerIdStr> = new Set();
|
||||
handshakesConfirmed: Map<PeerIdStr, ShardInfo> = new Map();
|
||||
|
||||
constructor(
|
||||
public shardInfo: ShardInfo,
|
||||
@ -57,13 +59,14 @@ class Metadata extends BaseProtocol implements IMetadata {
|
||||
async (source) => await all(source)
|
||||
);
|
||||
|
||||
const remoteShardInfoResponse =
|
||||
this.decodeMetadataResponse(encodedResponse);
|
||||
const { error, shardInfo } = this.decodeMetadataResponse(encodedResponse);
|
||||
|
||||
await this.savePeerShardInfo(
|
||||
connection.remotePeer,
|
||||
remoteShardInfoResponse
|
||||
);
|
||||
if (error) {
|
||||
log.error("Error decoding metadata response from peer", error);
|
||||
return;
|
||||
}
|
||||
|
||||
await this.savePeerShardInfo(connection.remotePeer, shardInfo);
|
||||
} catch (error) {
|
||||
log.error("Error handling metadata request", error);
|
||||
}
|
||||
@ -72,12 +75,15 @@ class Metadata extends BaseProtocol implements IMetadata {
|
||||
/**
|
||||
* Make a metadata query to a peer
|
||||
*/
|
||||
async query(peerId: PeerId): Promise<ShardInfo> {
|
||||
async query(peerId: PeerId): Promise<QueryResult> {
|
||||
const request = proto_metadata.WakuMetadataRequest.encode(this.shardInfo);
|
||||
|
||||
const peer = await this.peerStore.get(peerId);
|
||||
if (!peer) {
|
||||
throw new Error(`Peer ${peerId.toString()} not found`);
|
||||
return {
|
||||
shardInfo: null,
|
||||
error: ProtocolError.NO_PEER_AVAILABLE
|
||||
};
|
||||
}
|
||||
|
||||
const stream = await this.getStream(peer);
|
||||
@ -90,22 +96,39 @@ class Metadata extends BaseProtocol implements IMetadata {
|
||||
async (source) => await all(source)
|
||||
);
|
||||
|
||||
const decodedResponse = this.decodeMetadataResponse(encodedResponse);
|
||||
const { error, shardInfo } = this.decodeMetadataResponse(encodedResponse);
|
||||
|
||||
await this.savePeerShardInfo(peerId, decodedResponse);
|
||||
if (error) {
|
||||
log.error("Error decoding metadata response from peer", error);
|
||||
return {
|
||||
shardInfo: null,
|
||||
error
|
||||
};
|
||||
}
|
||||
|
||||
return decodedResponse;
|
||||
await this.savePeerShardInfo(peerId, shardInfo);
|
||||
|
||||
return {
|
||||
shardInfo,
|
||||
error: null
|
||||
};
|
||||
}
|
||||
|
||||
public async confirmOrAttemptHandshake(peerId: PeerId): Promise<void> {
|
||||
if (this.handshakesConfirmed.has(peerId.toString())) return;
|
||||
public async confirmOrAttemptHandshake(peerId: PeerId): Promise<QueryResult> {
|
||||
const shardInfo = this.handshakesConfirmed.get(peerId.toString());
|
||||
if (shardInfo) {
|
||||
return {
|
||||
shardInfo,
|
||||
error: null
|
||||
};
|
||||
}
|
||||
|
||||
await this.query(peerId);
|
||||
|
||||
return;
|
||||
return await this.query(peerId);
|
||||
}
|
||||
|
||||
private decodeMetadataResponse(encodedResponse: Uint8ArrayList[]): ShardInfo {
|
||||
private decodeMetadataResponse(
|
||||
encodedResponse: Uint8ArrayList[]
|
||||
): QueryResult {
|
||||
const bytes = new Uint8ArrayList();
|
||||
|
||||
encodedResponse.forEach((chunk) => {
|
||||
@ -115,9 +138,18 @@ class Metadata extends BaseProtocol implements IMetadata {
|
||||
bytes
|
||||
) as ShardInfo;
|
||||
|
||||
if (!response) log.error("Error decoding metadata response");
|
||||
if (!response) {
|
||||
log.error("Error decoding metadata response");
|
||||
return {
|
||||
shardInfo: null,
|
||||
error: ProtocolError.DECODE_FAILED
|
||||
};
|
||||
}
|
||||
|
||||
return response;
|
||||
return {
|
||||
shardInfo: response,
|
||||
error: null
|
||||
};
|
||||
}
|
||||
|
||||
private async savePeerShardInfo(
|
||||
@ -131,7 +163,7 @@ class Metadata extends BaseProtocol implements IMetadata {
|
||||
}
|
||||
});
|
||||
|
||||
this.handshakesConfirmed.add(peerId.toString());
|
||||
this.handshakesConfirmed.set(peerId.toString(), shardInfo);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -1,11 +1,25 @@
|
||||
import type { PeerId } from "@libp2p/interface";
|
||||
|
||||
import type { ShardInfo } from "./enr.js";
|
||||
import type { IBaseProtocol, ShardingParams } from "./protocols.js";
|
||||
import type {
|
||||
IBaseProtocol,
|
||||
ProtocolError,
|
||||
ShardingParams
|
||||
} from "./protocols.js";
|
||||
|
||||
export type QueryResult =
|
||||
| {
|
||||
shardInfo: ShardInfo;
|
||||
error: null;
|
||||
}
|
||||
| {
|
||||
shardInfo: null;
|
||||
error: ProtocolError;
|
||||
};
|
||||
|
||||
// IMetadata always has shardInfo defined while it is optionally undefined in IBaseProtocol
|
||||
export interface IMetadata extends Omit<IBaseProtocol, "shardInfo"> {
|
||||
shardInfo: ShardingParams;
|
||||
confirmOrAttemptHandshake(peerId: PeerId): Promise<void>;
|
||||
query(peerId: PeerId): Promise<ShardInfo | undefined>;
|
||||
confirmOrAttemptHandshake(peerId: PeerId): Promise<QueryResult>;
|
||||
query(peerId: PeerId): Promise<QueryResult>;
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user