js-waku/packages/peer-exchange/src/waku_peer_exchange.ts
dependabot[bot] df8e68ff36
chore(deps)(deps): bump it-length-prefixed from 8.0.4 to 9.0.1
Bumps [it-length-prefixed](https://github.com/alanshaw/it-length-prefixed) from 8.0.4 to 9.0.1.
- [Release notes](https://github.com/alanshaw/it-length-prefixed/releases)
- [Changelog](https://github.com/alanshaw/it-length-prefixed/blob/master/CHANGELOG.md)
- [Commits](https://github.com/alanshaw/it-length-prefixed/compare/v8.0.4...v9.0.1)

---
updated-dependencies:
- dependency-name: it-length-prefixed
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-05-06 14:07:05 +10:00

110 lines
2.7 KiB
TypeScript

import type { ConnectionManager } from "@libp2p/interface-connection-manager";
import type { PeerStore } from "@libp2p/interface-peer-store";
import { BaseProtocol } from "@waku/core/lib/base_protocol";
import { EnrDecoder } from "@waku/enr";
import type {
IPeerExchange,
PeerExchangeQueryParams,
PeerInfo,
} from "@waku/interfaces";
import { isDefined } from "@waku/utils";
import debug from "debug";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { Uint8ArrayList } from "uint8arraylist";
import { PeerExchangeRPC } from "./rpc.js";
export const PeerExchangeCodec = "/vac/waku/peer-exchange/2.0.0-alpha1";
const log = debug("waku:peer-exchange");
export interface PeerExchangeComponents {
peerStore: PeerStore;
connectionManager: ConnectionManager;
}
/**
* Implementation of the Peer Exchange protocol (https://rfc.vac.dev/spec/34/)
*/
export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
multicodec: string;
/**
* @param components - libp2p components
*/
constructor(public components: PeerExchangeComponents) {
super(
PeerExchangeCodec,
components.peerStore,
components.connectionManager.getConnections.bind(
components.connectionManager
)
);
this.multicodec = PeerExchangeCodec;
}
/**
* Make a peer exchange query to a peer
*/
async query(
params: PeerExchangeQueryParams
): Promise<PeerInfo[] | undefined> {
const { numPeers } = params;
const rpcQuery = PeerExchangeRPC.createRequest({
numPeers: BigInt(numPeers),
});
const peer = await this.getPeer(params.peerId);
const stream = await this.newStream(peer);
const res = await pipe(
[rpcQuery.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
try {
const bytes = new Uint8ArrayList();
res.forEach((chunk) => {
bytes.append(chunk);
});
const { response } = PeerExchangeRPC.decode(bytes);
if (!response) {
log("PeerExchangeRPC message did not contains a `response` field");
return;
}
return Promise.all(
response.peerInfos
.map((peerInfo) => peerInfo.enr)
.filter(isDefined)
.map(async (enr) => {
return { ENR: await EnrDecoder.fromRLP(enr) };
})
);
} catch (err) {
log("Failed to decode push reply", err);
return;
}
}
}
/**
*
* @returns A function that creates a new peer exchange protocol
*/
export function wakuPeerExchange(): (
components: PeerExchangeComponents
) => WakuPeerExchange {
return (components: PeerExchangeComponents) =>
new WakuPeerExchange(components);
}