mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-01-02 05:43:08 +00:00
feat!: local peer discovery improvements (#2557)
* update local peer discovery, make it configurable for cache * move to separate file * up tests, remove local storage from tests * pass local peer cache options * add e2e tests * add aditional e2e tests for local cache * rename local-peer-cache into peer-cache * update tests, ci * prevent filterign ws addresses
This commit is contained in:
parent
95da57a870
commit
eab8ce81b4
@ -33,9 +33,9 @@ module.exports = [
|
||||
import: "{ wakuPeerExchangeDiscovery }",
|
||||
},
|
||||
{
|
||||
name: "Local Peer Cache Discovery",
|
||||
name: "Peer Cache Discovery",
|
||||
path: "packages/discovery/bundle/index.js",
|
||||
import: "{ wakuLocalPeerCacheDiscovery }",
|
||||
import: "{ wakuPeerCacheDiscovery }",
|
||||
},
|
||||
{
|
||||
name: "Privacy preserving protocols",
|
||||
|
||||
@ -508,7 +508,7 @@ describe("ConnectionLimiter", () => {
|
||||
pxPeer.addresses = [
|
||||
{ multiaddr: multiaddr("/dns4/px/tcp/443/wss"), isCertified: false }
|
||||
];
|
||||
const localPeer = createMockPeer("l", [Tags.LOCAL]);
|
||||
const localPeer = createMockPeer("l", [Tags.PEER_CACHE]);
|
||||
localPeer.addresses = [
|
||||
{ multiaddr: multiaddr("/dns4/l/tcp/443/wss"), isCertified: false }
|
||||
];
|
||||
|
||||
@ -231,7 +231,7 @@ export class ConnectionLimiter implements IConnectionLimiter {
|
||||
* Returns a list of peers ordered by priority:
|
||||
* - bootstrap peers
|
||||
* - peers from peer exchange
|
||||
* - peers from local store (last because we are not sure that locally stored information is up to date)
|
||||
* - peers from peer cache (last because we are not sure that locally stored information is up to date)
|
||||
*/
|
||||
private async getPrioritizedPeers(): Promise<Peer[]> {
|
||||
const allPeers = await this.libp2p.peerStore.all();
|
||||
@ -260,7 +260,7 @@ export class ConnectionLimiter implements IConnectionLimiter {
|
||||
);
|
||||
|
||||
const localStorePeers = notConnectedPeers.filter((p) =>
|
||||
p.tags.has(Tags.LOCAL)
|
||||
p.tags.has(Tags.PEER_CACHE)
|
||||
);
|
||||
|
||||
return [...bootstrapPeers, ...peerExchangePeers, ...localStorePeers];
|
||||
|
||||
@ -9,6 +9,6 @@ export {
|
||||
} from "./peer-exchange/index.js";
|
||||
|
||||
export {
|
||||
LocalPeerCacheDiscovery,
|
||||
wakuLocalPeerCacheDiscovery
|
||||
} from "./local-peer-cache/index.js";
|
||||
PeerCacheDiscovery,
|
||||
wakuPeerCacheDiscovery
|
||||
} from "./peer-cache/index.js";
|
||||
|
||||
@ -1,163 +0,0 @@
|
||||
import { TypedEventEmitter } from "@libp2p/interface";
|
||||
import {
|
||||
IdentifyResult,
|
||||
PeerDiscovery,
|
||||
PeerDiscoveryEvents,
|
||||
PeerInfo,
|
||||
Startable
|
||||
} from "@libp2p/interface";
|
||||
import { peerIdFromString } from "@libp2p/peer-id";
|
||||
import { multiaddr } from "@multiformats/multiaddr";
|
||||
import {
|
||||
type Libp2pComponents,
|
||||
type LocalStoragePeerInfo,
|
||||
Tags
|
||||
} from "@waku/interfaces";
|
||||
import { getWsMultiaddrFromMultiaddrs, Logger } from "@waku/utils";
|
||||
|
||||
const log = new Logger("local-cache-discovery");
|
||||
|
||||
type LocalPeerCacheDiscoveryOptions = {
|
||||
tagName?: string;
|
||||
tagValue?: number;
|
||||
tagTTL?: number;
|
||||
};
|
||||
|
||||
const DEFAULT_LOCAL_TAG_NAME = Tags.LOCAL;
|
||||
const DEFAULT_LOCAL_TAG_VALUE = 50;
|
||||
const DEFAULT_LOCAL_TAG_TTL = 100_000_000;
|
||||
|
||||
export class LocalPeerCacheDiscovery
|
||||
extends TypedEventEmitter<PeerDiscoveryEvents>
|
||||
implements PeerDiscovery, Startable
|
||||
{
|
||||
private isStarted: boolean;
|
||||
private peers: LocalStoragePeerInfo[] = [];
|
||||
|
||||
public constructor(
|
||||
private readonly components: Libp2pComponents,
|
||||
private readonly options?: LocalPeerCacheDiscoveryOptions
|
||||
) {
|
||||
super();
|
||||
this.isStarted = false;
|
||||
this.peers = this.getPeersFromLocalStorage();
|
||||
}
|
||||
|
||||
public get [Symbol.toStringTag](): string {
|
||||
return "@waku/local-peer-cache-discovery";
|
||||
}
|
||||
|
||||
public async start(): Promise<void> {
|
||||
if (this.isStarted) return;
|
||||
|
||||
log.info("Starting Local Storage Discovery");
|
||||
this.components.events.addEventListener(
|
||||
"peer:identify",
|
||||
this.handleNewPeers
|
||||
);
|
||||
|
||||
for (const { id: idStr, address } of this.peers) {
|
||||
const peerId = peerIdFromString(idStr);
|
||||
if (await this.components.peerStore.has(peerId)) continue;
|
||||
|
||||
await this.components.peerStore.save(peerId, {
|
||||
multiaddrs: [multiaddr(address)],
|
||||
tags: {
|
||||
[this.options?.tagName ?? DEFAULT_LOCAL_TAG_NAME]: {
|
||||
value: this.options?.tagValue ?? DEFAULT_LOCAL_TAG_VALUE,
|
||||
ttl: this.options?.tagTTL ?? DEFAULT_LOCAL_TAG_TTL
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
this.dispatchEvent(
|
||||
new CustomEvent<PeerInfo>("peer", {
|
||||
detail: {
|
||||
id: peerId,
|
||||
multiaddrs: [multiaddr(address)]
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
log.info(`Discovered ${this.peers.length} peers`);
|
||||
|
||||
this.isStarted = true;
|
||||
}
|
||||
|
||||
public stop(): void | Promise<void> {
|
||||
if (!this.isStarted) return;
|
||||
log.info("Stopping Local Storage Discovery");
|
||||
this.components.events.removeEventListener(
|
||||
"peer:identify",
|
||||
this.handleNewPeers
|
||||
);
|
||||
this.isStarted = false;
|
||||
|
||||
this.savePeersToLocalStorage();
|
||||
}
|
||||
|
||||
public handleNewPeers = (event: CustomEvent<IdentifyResult>): void => {
|
||||
const { peerId, listenAddrs } = event.detail;
|
||||
|
||||
const websocketMultiaddr = getWsMultiaddrFromMultiaddrs(listenAddrs);
|
||||
|
||||
const localStoragePeers = this.getPeersFromLocalStorage();
|
||||
|
||||
const existingPeerIndex = localStoragePeers.findIndex(
|
||||
(_peer) => _peer.id === peerId.toString()
|
||||
);
|
||||
|
||||
if (existingPeerIndex >= 0) {
|
||||
localStoragePeers[existingPeerIndex].address =
|
||||
websocketMultiaddr.toString();
|
||||
} else {
|
||||
localStoragePeers.push({
|
||||
id: peerId.toString(),
|
||||
address: websocketMultiaddr.toString()
|
||||
});
|
||||
}
|
||||
|
||||
this.peers = localStoragePeers;
|
||||
this.savePeersToLocalStorage();
|
||||
};
|
||||
|
||||
private getPeersFromLocalStorage(): LocalStoragePeerInfo[] {
|
||||
try {
|
||||
const storedPeersData = localStorage.getItem("waku:peers");
|
||||
if (!storedPeersData) return [];
|
||||
const peers = JSON.parse(storedPeersData);
|
||||
return peers.filter(isValidStoredPeer);
|
||||
} catch (error) {
|
||||
log.error("Error parsing peers from local storage:", error);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
private savePeersToLocalStorage(): void {
|
||||
try {
|
||||
localStorage.setItem("waku:peers", JSON.stringify(this.peers));
|
||||
} catch (error) {
|
||||
log.error("Error saving peers to local storage:", error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function isValidStoredPeer(peer: any): peer is LocalStoragePeerInfo {
|
||||
return (
|
||||
peer &&
|
||||
typeof peer === "object" &&
|
||||
typeof peer.id === "string" &&
|
||||
typeof peer.address === "string"
|
||||
);
|
||||
}
|
||||
|
||||
export function wakuLocalPeerCacheDiscovery(): (
|
||||
components: Libp2pComponents,
|
||||
options?: LocalPeerCacheDiscoveryOptions
|
||||
) => LocalPeerCacheDiscovery {
|
||||
return (
|
||||
components: Libp2pComponents,
|
||||
options?: LocalPeerCacheDiscoveryOptions
|
||||
) => new LocalPeerCacheDiscovery(components, options);
|
||||
}
|
||||
4
packages/discovery/src/peer-cache/constants.ts
Normal file
4
packages/discovery/src/peer-cache/constants.ts
Normal file
@ -0,0 +1,4 @@
|
||||
import { Tags } from "@waku/interfaces";
|
||||
|
||||
export const DEFAULT_PEER_CACHE_TAG_NAME = Tags.PEER_CACHE;
|
||||
export const DEFAULT_PEER_CACHE_TAG_VALUE = 50;
|
||||
1
packages/discovery/src/peer-cache/index.ts
Normal file
1
packages/discovery/src/peer-cache/index.ts
Normal file
@ -0,0 +1 @@
|
||||
export { wakuPeerCacheDiscovery, PeerCacheDiscovery } from "./peer_cache.js";
|
||||
@ -6,70 +6,68 @@ import { prefixLogger } from "@libp2p/logger";
|
||||
import { peerIdFromPrivateKey, peerIdFromString } from "@libp2p/peer-id";
|
||||
import { persistentPeerStore } from "@libp2p/peer-store";
|
||||
import { multiaddr } from "@multiformats/multiaddr";
|
||||
import { Libp2pComponents } from "@waku/interfaces";
|
||||
import { LocalStoragePeerInfo } from "@waku/interfaces";
|
||||
import { Libp2pComponents, PartialPeerInfo, PeerCache } from "@waku/interfaces";
|
||||
import chai, { expect } from "chai";
|
||||
import chaiAsPromised from "chai-as-promised";
|
||||
import { MemoryDatastore } from "datastore-core/memory";
|
||||
import sinon from "sinon";
|
||||
|
||||
import { LocalPeerCacheDiscovery } from "./index.js";
|
||||
import { PeerCacheDiscovery } from "./index.js";
|
||||
|
||||
chai.use(chaiAsPromised);
|
||||
|
||||
if (typeof window === "undefined") {
|
||||
try {
|
||||
global.localStorage = {
|
||||
store: {} as Record<string, string>,
|
||||
getItem(key: string) {
|
||||
return this.store[key] || null;
|
||||
},
|
||||
setItem(key: string, value: string) {
|
||||
this.store[key] = value;
|
||||
},
|
||||
removeItem(key: string) {
|
||||
delete this.store[key];
|
||||
},
|
||||
clear() {
|
||||
this.store = {};
|
||||
}
|
||||
} as any;
|
||||
} catch (error) {
|
||||
console.error("Failed to load localStorage polyfill:", error);
|
||||
}
|
||||
}
|
||||
|
||||
const mockPeers = [
|
||||
const mockPeers: PartialPeerInfo[] = [
|
||||
{
|
||||
id: "16Uiu2HAm4v86W3bmT1BiH6oSPzcsSr24iDQpSN5Qa992BCjjwgrD",
|
||||
address:
|
||||
"/ip4/127.0.0.1/tcp/8000/ws/p2p/16Uiu2HAm4v86W3bmT1BiH6oSPzcsSr24iDQpSN5Qa992BCjjwgrD"
|
||||
multiaddrs: [
|
||||
"/ip4/127.0.0.1/tcp/8000/wss/p2p/16Uiu2HAm4v86W3bmT1BiH6oSPzcsSr24iDQpSN5Qa992BCjjwgrD"
|
||||
]
|
||||
},
|
||||
{
|
||||
id: "16Uiu2HAm4v86W3bmT1BiH6oSPzcsSr24iDQpSN5Qa992BCjjwgrE",
|
||||
address:
|
||||
"/ip4/127.0.0.1/tcp/8001/ws/p2p/16Uiu2HAm4v86W3bmT1BiH6oSPzcsSr24iDQpSN5Qa992BCjjwgrE"
|
||||
multiaddrs: [
|
||||
"/ip4/127.0.0.1/tcp/8001/wss/p2p/16Uiu2HAm4v86W3bmT1BiH6oSPzcsSr24iDQpSN5Qa992BCjjwgrE"
|
||||
]
|
||||
}
|
||||
];
|
||||
|
||||
async function setPeersInLocalStorage(
|
||||
peers: LocalStoragePeerInfo[]
|
||||
): Promise<void> {
|
||||
localStorage.setItem("waku:peers", JSON.stringify(peers));
|
||||
class MockPeerCache implements PeerCache {
|
||||
public data: PartialPeerInfo[] = [];
|
||||
public throwOnGet = false;
|
||||
public get(): PartialPeerInfo[] {
|
||||
if (this.throwOnGet) {
|
||||
throw new Error("cache get error");
|
||||
}
|
||||
return this.data;
|
||||
}
|
||||
public set(value: PartialPeerInfo[]): void {
|
||||
this.data = value;
|
||||
}
|
||||
public remove(): void {
|
||||
this.data = [];
|
||||
}
|
||||
}
|
||||
|
||||
describe("Local Storage Discovery", function () {
|
||||
async function setPeersInCache(
|
||||
cache: MockPeerCache,
|
||||
peers: PartialPeerInfo[]
|
||||
): Promise<void> {
|
||||
cache.set(peers);
|
||||
}
|
||||
|
||||
describe("Peer Cache Discovery", function () {
|
||||
this.timeout(25_000);
|
||||
let components: Libp2pComponents;
|
||||
let mockCache: MockPeerCache;
|
||||
|
||||
beforeEach(async function () {
|
||||
localStorage.clear();
|
||||
mockCache = new MockPeerCache();
|
||||
components = {
|
||||
peerStore: persistentPeerStore({
|
||||
events: new TypedEventEmitter(),
|
||||
peerId: await generateKeyPair("secp256k1").then(peerIdFromPrivateKey),
|
||||
datastore: new MemoryDatastore(),
|
||||
logger: prefixLogger("local_discovery.spec.ts")
|
||||
logger: prefixLogger("peer_cache_discovery.spec.ts")
|
||||
}),
|
||||
events: new TypedEventEmitter()
|
||||
} as unknown as Libp2pComponents;
|
||||
@ -77,23 +75,24 @@ describe("Local Storage Discovery", function () {
|
||||
|
||||
describe("Compliance Tests", function () {
|
||||
beforeEach(async function () {
|
||||
await setPeersInLocalStorage([mockPeers[0]]);
|
||||
mockCache = new MockPeerCache();
|
||||
await setPeersInCache(mockCache, [mockPeers[0]]);
|
||||
});
|
||||
|
||||
tests({
|
||||
async setup() {
|
||||
return new LocalPeerCacheDiscovery(components);
|
||||
return new PeerCacheDiscovery(components, { cache: mockCache });
|
||||
},
|
||||
async teardown() {}
|
||||
});
|
||||
});
|
||||
|
||||
describe("Unit Tests", function () {
|
||||
let discovery: LocalPeerCacheDiscovery;
|
||||
let discovery: PeerCacheDiscovery;
|
||||
|
||||
beforeEach(async function () {
|
||||
discovery = new LocalPeerCacheDiscovery(components);
|
||||
await setPeersInLocalStorage(mockPeers);
|
||||
discovery = new PeerCacheDiscovery(components, { cache: mockCache });
|
||||
await setPeersInCache(mockCache, mockPeers);
|
||||
});
|
||||
|
||||
it("should load peers from local storage and dispatch events", async () => {
|
||||
@ -103,43 +102,46 @@ describe("Local Storage Discovery", function () {
|
||||
|
||||
expect(dispatchEventSpy.calledWith(sinon.match.has("type", "peer"))).to.be
|
||||
.true;
|
||||
|
||||
const dispatchedIds = dispatchEventSpy
|
||||
.getCalls()
|
||||
.map((c) => (c.args[0] as CustomEvent<any>).detail?.id?.toString?.())
|
||||
.filter(Boolean);
|
||||
|
||||
mockPeers.forEach((mockPeer) => {
|
||||
expect(
|
||||
dispatchEventSpy.calledWith(
|
||||
sinon.match.hasNested("detail.id", mockPeer.id)
|
||||
)
|
||||
).to.be.true;
|
||||
expect(dispatchedIds).to.include(mockPeer.id);
|
||||
});
|
||||
});
|
||||
|
||||
it("should update peers in local storage on 'peer:identify' event", async () => {
|
||||
const newPeerIdentifyEvent = {
|
||||
detail: {
|
||||
peerId: peerIdFromString(mockPeers[1].id.toString()),
|
||||
listenAddrs: [multiaddr(mockPeers[1].address)]
|
||||
it("should update peers in cache on 'peer:identify' event", async () => {
|
||||
await discovery.start();
|
||||
|
||||
const newPeerIdentifyEvent = new CustomEvent<IdentifyResult>(
|
||||
"peer:identify",
|
||||
{
|
||||
detail: {
|
||||
peerId: peerIdFromString(mockPeers[1].id.toString()),
|
||||
listenAddrs: [multiaddr(mockPeers[1].multiaddrs[0])]
|
||||
} as IdentifyResult
|
||||
}
|
||||
} as CustomEvent<IdentifyResult>;
|
||||
|
||||
// Directly invoke handleNewPeers to simulate receiving an 'identify' event
|
||||
discovery.handleNewPeers(newPeerIdentifyEvent);
|
||||
|
||||
const updatedPeers = JSON.parse(
|
||||
localStorage.getItem("waku:peers") || "[]"
|
||||
);
|
||||
expect(updatedPeers).to.deep.include({
|
||||
id: newPeerIdentifyEvent.detail.peerId.toString(),
|
||||
address: newPeerIdentifyEvent.detail.listenAddrs[0].toString()
|
||||
|
||||
components.events.dispatchEvent(newPeerIdentifyEvent);
|
||||
|
||||
expect(mockCache.get()).to.deep.include({
|
||||
id: mockPeers[1].id,
|
||||
multiaddrs: [mockPeers[1].multiaddrs[0]]
|
||||
});
|
||||
});
|
||||
|
||||
it("should handle corrupted local storage data gracefully", async () => {
|
||||
localStorage.setItem("waku:peers", "not-a-valid-json");
|
||||
it("should handle cache.get errors gracefully", async () => {
|
||||
mockCache.throwOnGet = true;
|
||||
|
||||
try {
|
||||
await discovery.start();
|
||||
} catch (error) {
|
||||
expect.fail(
|
||||
"start() should not have thrown an error for corrupted local storage data"
|
||||
"start() should not have thrown an error when cache.get throws"
|
||||
);
|
||||
}
|
||||
});
|
||||
152
packages/discovery/src/peer-cache/peer_cache.ts
Normal file
152
packages/discovery/src/peer-cache/peer_cache.ts
Normal file
@ -0,0 +1,152 @@
|
||||
import { TypedEventEmitter } from "@libp2p/interface";
|
||||
import {
|
||||
IdentifyResult,
|
||||
PeerDiscovery,
|
||||
PeerDiscoveryEvents,
|
||||
PeerInfo,
|
||||
Startable
|
||||
} from "@libp2p/interface";
|
||||
import { peerIdFromString } from "@libp2p/peer-id";
|
||||
import { multiaddr } from "@multiformats/multiaddr";
|
||||
import type {
|
||||
Libp2pComponents,
|
||||
PartialPeerInfo,
|
||||
PeerCache,
|
||||
PeerCacheDiscoveryOptions
|
||||
} from "@waku/interfaces";
|
||||
import { Logger } from "@waku/utils";
|
||||
|
||||
import {
|
||||
DEFAULT_PEER_CACHE_TAG_NAME,
|
||||
DEFAULT_PEER_CACHE_TAG_VALUE
|
||||
} from "./constants.js";
|
||||
import { defaultCache } from "./utils.js";
|
||||
|
||||
const log = new Logger("peer-cache");
|
||||
|
||||
export class PeerCacheDiscovery
|
||||
extends TypedEventEmitter<PeerDiscoveryEvents>
|
||||
implements PeerDiscovery, Startable
|
||||
{
|
||||
private isStarted: boolean = false;
|
||||
private readonly cache: PeerCache;
|
||||
|
||||
public constructor(
|
||||
private readonly components: Libp2pComponents,
|
||||
options?: Partial<PeerCacheDiscoveryOptions>
|
||||
) {
|
||||
super();
|
||||
this.cache = options?.cache ?? defaultCache();
|
||||
}
|
||||
|
||||
public get [Symbol.toStringTag](): string {
|
||||
return `@waku/${DEFAULT_PEER_CACHE_TAG_NAME}`;
|
||||
}
|
||||
|
||||
public async start(): Promise<void> {
|
||||
if (this.isStarted) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("Starting Peer Cache Discovery");
|
||||
|
||||
this.components.events.addEventListener(
|
||||
"peer:identify",
|
||||
this.handleDiscoveredPeer
|
||||
);
|
||||
|
||||
await this.discoverPeers();
|
||||
|
||||
this.isStarted = true;
|
||||
}
|
||||
|
||||
public stop(): void | Promise<void> {
|
||||
if (!this.isStarted) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("Stopping Peer Cache Discovery");
|
||||
|
||||
this.components.events.removeEventListener(
|
||||
"peer:identify",
|
||||
this.handleDiscoveredPeer
|
||||
);
|
||||
|
||||
this.isStarted = false;
|
||||
}
|
||||
|
||||
private handleDiscoveredPeer = (event: CustomEvent<IdentifyResult>): void => {
|
||||
const { peerId, listenAddrs } = event.detail;
|
||||
const multiaddrs = listenAddrs.map((addr) => addr.toString());
|
||||
|
||||
const peerIdStr = peerId.toString();
|
||||
const knownPeers = this.readPeerInfoFromCache();
|
||||
const peerIndex = knownPeers.findIndex((p) => p.id === peerIdStr);
|
||||
|
||||
if (peerIndex !== -1) {
|
||||
knownPeers[peerIndex].multiaddrs = multiaddrs;
|
||||
} else {
|
||||
knownPeers.push({
|
||||
id: peerIdStr,
|
||||
multiaddrs
|
||||
});
|
||||
}
|
||||
|
||||
this.writePeerInfoToCache(knownPeers);
|
||||
};
|
||||
|
||||
private async discoverPeers(): Promise<void> {
|
||||
const knownPeers = this.readPeerInfoFromCache();
|
||||
|
||||
for (const peer of knownPeers) {
|
||||
const peerId = peerIdFromString(peer.id);
|
||||
const multiaddrs = peer.multiaddrs.map((addr) => multiaddr(addr));
|
||||
|
||||
if (await this.components.peerStore.has(peerId)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
await this.components.peerStore.save(peerId, {
|
||||
multiaddrs,
|
||||
tags: {
|
||||
[DEFAULT_PEER_CACHE_TAG_NAME]: {
|
||||
value: DEFAULT_PEER_CACHE_TAG_VALUE
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
this.dispatchEvent(
|
||||
new CustomEvent<PeerInfo>("peer", {
|
||||
detail: {
|
||||
id: peerId,
|
||||
multiaddrs
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private readPeerInfoFromCache(): PartialPeerInfo[] {
|
||||
try {
|
||||
return this.cache.get();
|
||||
} catch (error) {
|
||||
log.error("Error parsing peers from cache:", error);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
private writePeerInfoToCache(peers: PartialPeerInfo[]): void {
|
||||
try {
|
||||
this.cache.set(peers);
|
||||
} catch (error) {
|
||||
log.error("Error saving peers to cache:", error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function wakuPeerCacheDiscovery(
|
||||
options: Partial<PeerCacheDiscoveryOptions> = {}
|
||||
): (components: Libp2pComponents) => PeerCacheDiscovery {
|
||||
return (components: Libp2pComponents) =>
|
||||
new PeerCacheDiscovery(components, options);
|
||||
}
|
||||
73
packages/discovery/src/peer-cache/utils.ts
Normal file
73
packages/discovery/src/peer-cache/utils.ts
Normal file
@ -0,0 +1,73 @@
|
||||
import type { PartialPeerInfo, PeerCache } from "@waku/interfaces";
|
||||
|
||||
const isValidStoredPeer = (peer: unknown): boolean => {
|
||||
return (
|
||||
!!peer &&
|
||||
typeof peer === "object" &&
|
||||
"id" in peer &&
|
||||
typeof peer.id === "string" &&
|
||||
"multiaddrs" in peer &&
|
||||
Array.isArray(peer.multiaddrs)
|
||||
);
|
||||
};
|
||||
|
||||
/**
|
||||
* A noop cache that will be used in environments where localStorage is not available.
|
||||
*/
|
||||
class NoopCache implements PeerCache {
|
||||
public get(): PartialPeerInfo[] {
|
||||
return [];
|
||||
}
|
||||
|
||||
public set(_value: PartialPeerInfo[]): void {
|
||||
return;
|
||||
}
|
||||
|
||||
public remove(): void {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A cache that uses localStorage to store peer information.
|
||||
*/
|
||||
class LocalStorageCache implements PeerCache {
|
||||
public get(): PartialPeerInfo[] {
|
||||
try {
|
||||
const cachedPeers = localStorage.getItem("waku:peers");
|
||||
const peers = cachedPeers ? JSON.parse(cachedPeers) : [];
|
||||
|
||||
return peers.filter(isValidStoredPeer);
|
||||
} catch (e) {
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
public set(_value: PartialPeerInfo[]): void {
|
||||
try {
|
||||
localStorage.setItem("waku:peers", JSON.stringify(_value));
|
||||
} catch (e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
public remove(): void {
|
||||
try {
|
||||
localStorage.removeItem("waku:peers");
|
||||
} catch (e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const defaultCache = (): PeerCache => {
|
||||
try {
|
||||
if (typeof localStorage !== "undefined") {
|
||||
return new LocalStorageCache();
|
||||
}
|
||||
} catch (_e) {
|
||||
// ignore
|
||||
}
|
||||
|
||||
return new NoopCache();
|
||||
};
|
||||
@ -7,7 +7,7 @@ import { ShardId } from "./sharding.js";
|
||||
export enum Tags {
|
||||
BOOTSTRAP = "bootstrap",
|
||||
PEER_EXCHANGE = "peer-exchange",
|
||||
LOCAL = "local-peer-cache"
|
||||
PEER_CACHE = "peer-cache"
|
||||
}
|
||||
|
||||
// Connection tag
|
||||
|
||||
52
packages/interfaces/src/discovery.ts
Normal file
52
packages/interfaces/src/discovery.ts
Normal file
@ -0,0 +1,52 @@
|
||||
/**
|
||||
* Options for the discovery.
|
||||
*/
|
||||
export type DiscoveryOptions = {
|
||||
peerExchange: boolean;
|
||||
dns: boolean;
|
||||
peerCache: boolean;
|
||||
};
|
||||
|
||||
/**
|
||||
* Partial peer information used to store in the cache.
|
||||
*/
|
||||
export type PartialPeerInfo = {
|
||||
id: string;
|
||||
multiaddrs: string[];
|
||||
};
|
||||
|
||||
/**
|
||||
* A cache interface for persisting peer information.
|
||||
*/
|
||||
export type PeerCache = {
|
||||
/**
|
||||
* Get the peer information from the cache.
|
||||
*
|
||||
* @returns The peer information from the cache or empty array if no peer information is found.
|
||||
*/
|
||||
get: () => PartialPeerInfo[];
|
||||
|
||||
/**
|
||||
* Set the peer information in the cache.
|
||||
*
|
||||
* @param value The peer information to set in the cache.
|
||||
*/
|
||||
set: (value: PartialPeerInfo[]) => void;
|
||||
|
||||
/**
|
||||
* Remove the peer information from the cache.
|
||||
*/
|
||||
remove: () => void;
|
||||
};
|
||||
|
||||
/**
|
||||
* Options for the peer cache discovery.
|
||||
*/
|
||||
export type PeerCacheDiscoveryOptions = {
|
||||
/**
|
||||
* The cache to use for getting and storing cached peer information.
|
||||
*
|
||||
* @default LocalStorage
|
||||
*/
|
||||
cache: PeerCache;
|
||||
};
|
||||
@ -15,6 +15,6 @@ export * from "./libp2p.js";
|
||||
export * from "./dns_discovery.js";
|
||||
export * from "./metadata.js";
|
||||
export * from "./constants.js";
|
||||
export * from "./local_storage.js";
|
||||
export * from "./sharding.js";
|
||||
export * from "./health_status.js";
|
||||
export * from "./discovery.js";
|
||||
|
||||
@ -1,4 +0,0 @@
|
||||
export type LocalStoragePeerInfo = {
|
||||
id: string;
|
||||
address: string;
|
||||
};
|
||||
@ -1,6 +1,7 @@
|
||||
import type { PeerId } from "@libp2p/interface";
|
||||
|
||||
import type { ConnectionManagerOptions } from "./connection_manager.js";
|
||||
import type { DiscoveryOptions, PeerCache } from "./discovery.js";
|
||||
import type { FilterProtocolOptions } from "./filter.js";
|
||||
import type { CreateLibp2pOptions } from "./libp2p.js";
|
||||
import type { LightPushProtocolOptions } from "./light_push.js";
|
||||
@ -83,13 +84,17 @@ export type CreateNodeOptions = {
|
||||
/**
|
||||
* Enable or disable specific discovery methods.
|
||||
*
|
||||
* @default { peerExchange: true, dns: true, localPeerCache: true }
|
||||
* @default { peerExchange: true, dns: true, peerCache: true }
|
||||
*/
|
||||
discovery?: {
|
||||
peerExchange: boolean;
|
||||
dns: boolean;
|
||||
localPeerCache: boolean;
|
||||
};
|
||||
discovery?: Partial<DiscoveryOptions>;
|
||||
|
||||
/**
|
||||
* Peer cache to use for storing and retrieving peer information.
|
||||
* If present, enables peer cache discovery.
|
||||
*
|
||||
* @default browser's localStorage
|
||||
*/
|
||||
peerCache?: PeerCache;
|
||||
|
||||
/**
|
||||
* List of peers to use to bootstrap the node. Ignored if defaultBootstrap is set to true.
|
||||
|
||||
@ -12,16 +12,16 @@ describe("Default Peer Discoveries", () => {
|
||||
const discoveries = getPeerDiscoveries({
|
||||
dns: true,
|
||||
peerExchange: true,
|
||||
localPeerCache: true
|
||||
peerCache: true
|
||||
});
|
||||
expect(discoveries.length).to.equal(3);
|
||||
});
|
||||
|
||||
it("should enable only peerExchange and localPeerCache when dns is disabled", () => {
|
||||
it("should enable only peerExchange and peerCache when dns is disabled", () => {
|
||||
const discoveries = getPeerDiscoveries({
|
||||
dns: false,
|
||||
peerExchange: true,
|
||||
localPeerCache: true
|
||||
peerCache: true
|
||||
});
|
||||
expect(discoveries.length).to.equal(2);
|
||||
});
|
||||
@ -30,25 +30,25 @@ describe("Default Peer Discoveries", () => {
|
||||
const discoveries = getPeerDiscoveries({
|
||||
dns: true,
|
||||
peerExchange: false,
|
||||
localPeerCache: true
|
||||
peerCache: true
|
||||
});
|
||||
expect(discoveries.length).to.equal(2);
|
||||
});
|
||||
|
||||
it("should enable only dns and peerExchange when localPeerCache is disabled", () => {
|
||||
it("should enable only dns and peerExchange when peerCache is disabled", () => {
|
||||
const discoveries = getPeerDiscoveries({
|
||||
dns: true,
|
||||
peerExchange: true,
|
||||
localPeerCache: false
|
||||
peerCache: false
|
||||
});
|
||||
expect(discoveries.length).to.equal(2);
|
||||
});
|
||||
|
||||
it("should enable only localPeerCache when dns and peerExchange are disabled", () => {
|
||||
it("should enable only peerCache when dns and peerExchange are disabled", () => {
|
||||
const discoveries = getPeerDiscoveries({
|
||||
dns: false,
|
||||
peerExchange: false,
|
||||
localPeerCache: true
|
||||
peerCache: true
|
||||
});
|
||||
expect(discoveries.length).to.equal(1);
|
||||
});
|
||||
|
||||
@ -2,13 +2,14 @@ import type { PeerDiscovery } from "@libp2p/interface";
|
||||
import {
|
||||
enrTree,
|
||||
wakuDnsDiscovery,
|
||||
wakuLocalPeerCacheDiscovery,
|
||||
wakuPeerCacheDiscovery,
|
||||
wakuPeerExchangeDiscovery
|
||||
} from "@waku/discovery";
|
||||
import { CreateNodeOptions, type Libp2pComponents } from "@waku/interfaces";
|
||||
|
||||
export function getPeerDiscoveries(
|
||||
enabled?: CreateNodeOptions["discovery"]
|
||||
enabled?: CreateNodeOptions["discovery"],
|
||||
peerCache?: CreateNodeOptions["peerCache"]
|
||||
): ((components: Libp2pComponents) => PeerDiscovery)[] {
|
||||
const dnsEnrTrees = [enrTree["SANDBOX"], enrTree["TEST"]];
|
||||
|
||||
@ -18,8 +19,8 @@ export function getPeerDiscoveries(
|
||||
discoveries.push(wakuDnsDiscovery(dnsEnrTrees));
|
||||
}
|
||||
|
||||
if (enabled?.localPeerCache) {
|
||||
discoveries.push(wakuLocalPeerCacheDiscovery());
|
||||
if (enabled?.peerCache || peerCache) {
|
||||
discoveries.push(wakuPeerCacheDiscovery({ cache: peerCache }));
|
||||
}
|
||||
|
||||
if (enabled?.peerExchange) {
|
||||
|
||||
@ -68,12 +68,6 @@ export async function defaultLibp2p(
|
||||
}) as any as Libp2p; // TODO: make libp2p include it;
|
||||
}
|
||||
|
||||
const DEFAULT_DISCOVERIES_ENABLED = {
|
||||
dns: true,
|
||||
peerExchange: true,
|
||||
localPeerCache: true
|
||||
};
|
||||
|
||||
export async function createLibp2pAndUpdateOptions(
|
||||
options: CreateNodeOptions
|
||||
): Promise<Libp2p> {
|
||||
@ -87,13 +81,20 @@ export async function createLibp2pAndUpdateOptions(
|
||||
|
||||
if (options?.defaultBootstrap) {
|
||||
peerDiscovery.push(
|
||||
...getPeerDiscoveries({
|
||||
...DEFAULT_DISCOVERIES_ENABLED,
|
||||
...options.discovery
|
||||
})
|
||||
...getPeerDiscoveries(
|
||||
{
|
||||
dns: true,
|
||||
peerExchange: true,
|
||||
peerCache: true,
|
||||
...options.discovery
|
||||
},
|
||||
options.peerCache
|
||||
)
|
||||
);
|
||||
} else {
|
||||
peerDiscovery.push(...getPeerDiscoveries(options.discovery));
|
||||
peerDiscovery.push(
|
||||
...getPeerDiscoveries(options.discovery, options.peerCache)
|
||||
);
|
||||
}
|
||||
|
||||
const bootstrapPeers = [
|
||||
|
||||
144
packages/tests/tests/peer-cache/peer_cache.spec.ts
Normal file
144
packages/tests/tests/peer-cache/peer_cache.spec.ts
Normal file
@ -0,0 +1,144 @@
|
||||
import type { LightNode, PartialPeerInfo, PeerCache } from "@waku/interfaces";
|
||||
import { createLightNode } from "@waku/sdk";
|
||||
import { expect } from "chai";
|
||||
import Sinon, { SinonSpy } from "sinon";
|
||||
|
||||
import {
|
||||
afterEachCustom,
|
||||
beforeEachCustom,
|
||||
DefaultTestClusterId,
|
||||
DefaultTestNetworkConfig,
|
||||
DefaultTestShardInfo,
|
||||
makeLogFileName,
|
||||
ServiceNode,
|
||||
tearDownNodes
|
||||
} from "../../src/index.js";
|
||||
|
||||
class MockPeerCache implements PeerCache {
|
||||
public data: PartialPeerInfo[] = [];
|
||||
|
||||
public get(): PartialPeerInfo[] {
|
||||
return this.data;
|
||||
}
|
||||
|
||||
public set(value: PartialPeerInfo[]): void {
|
||||
this.data = value;
|
||||
}
|
||||
|
||||
public remove(): void {
|
||||
this.data = [];
|
||||
}
|
||||
}
|
||||
|
||||
describe("Peer Cache Discovery", function () {
|
||||
this.timeout(150_000);
|
||||
let ctx: Mocha.Context;
|
||||
let waku: LightNode;
|
||||
|
||||
let nwaku1: ServiceNode;
|
||||
let nwaku2: ServiceNode;
|
||||
|
||||
let dialPeerSpy: SinonSpy;
|
||||
|
||||
beforeEachCustom(this, async () => {
|
||||
ctx = this.ctx;
|
||||
|
||||
nwaku1 = new ServiceNode(makeLogFileName(ctx) + "1");
|
||||
nwaku2 = new ServiceNode(makeLogFileName(ctx) + "2");
|
||||
|
||||
await nwaku1.start({
|
||||
clusterId: DefaultTestClusterId,
|
||||
shard: DefaultTestShardInfo.shards,
|
||||
discv5Discovery: true,
|
||||
peerExchange: true,
|
||||
relay: true
|
||||
});
|
||||
|
||||
await nwaku2.start({
|
||||
clusterId: DefaultTestClusterId,
|
||||
shard: DefaultTestShardInfo.shards,
|
||||
discv5Discovery: true,
|
||||
peerExchange: true,
|
||||
discv5BootstrapNode: (await nwaku1.info()).enrUri,
|
||||
relay: true
|
||||
});
|
||||
});
|
||||
|
||||
afterEachCustom(this, async () => {
|
||||
await tearDownNodes([nwaku1, nwaku2], waku);
|
||||
});
|
||||
|
||||
it("should discover peers from provided peer cache", async function () {
|
||||
const mockCache = new MockPeerCache();
|
||||
|
||||
mockCache.set([
|
||||
{
|
||||
id: (await nwaku1.getPeerId()).toString(),
|
||||
multiaddrs: [(await nwaku1.getMultiaddrWithId()).toString()]
|
||||
},
|
||||
{
|
||||
id: (await nwaku2.getPeerId()).toString(),
|
||||
multiaddrs: [(await nwaku2.getMultiaddrWithId()).toString()]
|
||||
}
|
||||
]);
|
||||
|
||||
waku = await createLightNode({
|
||||
networkConfig: DefaultTestNetworkConfig,
|
||||
discovery: {
|
||||
peerExchange: true,
|
||||
peerCache: true
|
||||
},
|
||||
peerCache: mockCache
|
||||
});
|
||||
|
||||
dialPeerSpy = Sinon.spy((waku as any).libp2p, "dial");
|
||||
|
||||
const discoveredPeers = new Set<string>();
|
||||
await new Promise<void>((resolve) => {
|
||||
waku.libp2p.addEventListener("peer:identify", (evt) => {
|
||||
const peerId = evt.detail.peerId;
|
||||
discoveredPeers.add(peerId.toString());
|
||||
|
||||
if (discoveredPeers.size === 2) {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
expect(dialPeerSpy.callCount).to.equal(2);
|
||||
expect(discoveredPeers.size).to.equal(2);
|
||||
});
|
||||
|
||||
it("should monitor connected peers and store them into cache", async function () {
|
||||
const mockCache = new MockPeerCache();
|
||||
|
||||
waku = await createLightNode({
|
||||
networkConfig: DefaultTestNetworkConfig,
|
||||
bootstrapPeers: [(await nwaku2.getMultiaddrWithId()).toString()],
|
||||
discovery: {
|
||||
peerExchange: true,
|
||||
peerCache: true
|
||||
},
|
||||
peerCache: mockCache
|
||||
});
|
||||
|
||||
const discoveredPeers = new Set<string>();
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
waku.libp2p.addEventListener("peer:identify", (evt) => {
|
||||
const peerId = evt.detail.peerId;
|
||||
discoveredPeers.add(peerId.toString());
|
||||
|
||||
if (discoveredPeers.size === 1) {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
expect(discoveredPeers.size).to.equal(1);
|
||||
|
||||
const cachedPeers = mockCache.get();
|
||||
expect(cachedPeers.length).to.equal(1);
|
||||
expect(discoveredPeers.has(cachedPeers[0].id)).to.be.true;
|
||||
});
|
||||
});
|
||||
@ -1,4 +1,3 @@
|
||||
import type { Multiaddr } from "@multiformats/multiaddr";
|
||||
export * from "./is_defined.js";
|
||||
export * from "./random_subset.js";
|
||||
export * from "./group_by.js";
|
||||
@ -8,23 +7,3 @@ export * from "./sharding/index.js";
|
||||
export * from "./push_or_init_map.js";
|
||||
export * from "./relay_shard_codec.js";
|
||||
export * from "./delay.js";
|
||||
|
||||
export function removeItemFromArray(arr: unknown[], value: unknown): unknown[] {
|
||||
const index = arr.indexOf(value);
|
||||
if (index > -1) {
|
||||
arr.splice(index, 1);
|
||||
}
|
||||
return arr;
|
||||
}
|
||||
|
||||
export function getWsMultiaddrFromMultiaddrs(
|
||||
addresses: Multiaddr[]
|
||||
): Multiaddr {
|
||||
const wsMultiaddr = addresses.find(
|
||||
(addr) => addr.toString().includes("ws") || addr.toString().includes("wss")
|
||||
);
|
||||
if (!wsMultiaddr) {
|
||||
throw new Error("No ws multiaddr found in the given addresses");
|
||||
}
|
||||
return wsMultiaddr;
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user