mirror of
https://github.com/logos-messaging/logos-messaging-js.git
synced 2026-05-05 20:29:26 +00:00
* set peer-exchange with default bootstrap * only initialise protocols with bootstrap peers * update package * update package-lock * refactor `getPeers` while setting up a protocol * move codecs to `@waku/interfaces` * lightpush: send messages to multiple peers * only use multiple peers for LP and Filter * fix: ts warnings * lightpush: tests pass * update breaking changes for new API * move codecs back into protocol files * refactor: `getPeers()` * rm: log as an arg * add tsdoc for getPeers * add import * add prettier rule to eslint * add: peer exchange to sdk as a dep * fix eslint error * add try catch * revert unecessary diff * revert unecessary diff * fix imports * convert relaycodecs to array * remove: peerId as an arg for protocol methods * keep peerId as an arg for peer-exchange * remove: peerId from getPeers() * lightpush: extract hardcoded numPeers as a constant * return all peers if numPeers is 0 and increase readability for random peers * refactor considering more than 1 bootstrap peers can exist * use `getPeers` * change arg for `getPeers` to object * address comments * refactor tests for new API * lightpush: make constant the class variable * use `maxBootstrapPeers` instead of `includeBootstrap` * refactor protocols for new API * add tests for `getPeers` * skip getPeers test * rm: only from test * move tests to `base_protocol.spec.ts` * break down `getPeers` into a `filter` method * return all bootstrap peers if arg is 0 * refactor test without stubbing * address comments * update test title * move `filterPeers` to a separate file * address comments & add more test * make test title more verbose * address comments * remove ProtocolOptions * chore: refactor tests for new API * add defaults for getPeers * address comments * rm unneeded comment * address comment: add diversity of node tags to test * address comments * fix: imports
94 lines
2.3 KiB
TypeScript
94 lines
2.3 KiB
TypeScript
import { BaseProtocol } from "@waku/core/lib/base_protocol";
|
|
import { EnrDecoder } from "@waku/enr";
|
|
import type {
|
|
IPeerExchange,
|
|
Libp2pComponents,
|
|
PeerExchangeQueryParams,
|
|
PeerInfo
|
|
} from "@waku/interfaces";
|
|
import { isDefined } from "@waku/utils";
|
|
import debug from "debug";
|
|
import all from "it-all";
|
|
import * as lp from "it-length-prefixed";
|
|
import { pipe } from "it-pipe";
|
|
import { Uint8ArrayList } from "uint8arraylist";
|
|
|
|
import { PeerExchangeRPC } from "./rpc.js";
|
|
|
|
export const PeerExchangeCodec = "/vac/waku/peer-exchange/2.0.0-alpha1";
|
|
|
|
const log = debug("waku:peer-exchange");
|
|
|
|
/**
|
|
* Implementation of the Peer Exchange protocol (https://rfc.vac.dev/spec/34/)
|
|
*/
|
|
export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
|
|
/**
|
|
* @param components - libp2p components
|
|
*/
|
|
constructor(components: Libp2pComponents) {
|
|
super(PeerExchangeCodec, components);
|
|
}
|
|
|
|
/**
|
|
* Make a peer exchange query to a peer
|
|
*/
|
|
async query(
|
|
params: PeerExchangeQueryParams
|
|
): Promise<PeerInfo[] | undefined> {
|
|
const { numPeers } = params;
|
|
|
|
const rpcQuery = PeerExchangeRPC.createRequest({
|
|
numPeers: BigInt(numPeers)
|
|
});
|
|
|
|
const peer = await this.getPeer(params.peerId);
|
|
|
|
const stream = await this.getStream(peer);
|
|
|
|
const res = await pipe(
|
|
[rpcQuery.encode()],
|
|
lp.encode,
|
|
stream,
|
|
lp.decode,
|
|
async (source) => await all(source)
|
|
);
|
|
|
|
try {
|
|
const bytes = new Uint8ArrayList();
|
|
res.forEach((chunk) => {
|
|
bytes.append(chunk);
|
|
});
|
|
|
|
const { response } = PeerExchangeRPC.decode(bytes);
|
|
|
|
if (!response) {
|
|
log("PeerExchangeRPC message did not contains a `response` field");
|
|
return;
|
|
}
|
|
|
|
return Promise.all(
|
|
response.peerInfos
|
|
.map((peerInfo) => peerInfo.enr)
|
|
.filter(isDefined)
|
|
.map(async (enr) => {
|
|
return { ENR: await EnrDecoder.fromRLP(enr) };
|
|
})
|
|
);
|
|
} catch (err) {
|
|
log("Failed to decode push reply", err);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @returns A function that creates a new peer exchange protocol
|
|
*/
|
|
export function wakuPeerExchange(): (
|
|
components: Libp2pComponents
|
|
) => WakuPeerExchange {
|
|
return (components: Libp2pComponents) => new WakuPeerExchange(components);
|
|
}
|