diff --git a/packages/discovery/src/peer-exchange/waku_peer_exchange.ts b/packages/discovery/src/peer-exchange/waku_peer_exchange.ts index 0aed7b141e..43d337728d 100644 --- a/packages/discovery/src/peer-exchange/waku_peer_exchange.ts +++ b/packages/discovery/src/peer-exchange/waku_peer_exchange.ts @@ -41,12 +41,13 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange { public async query( params: PeerExchangeQueryParams ): Promise { - const { numPeers } = params; + const { numPeers, peerId } = params; + const rpcQuery = PeerExchangeRPC.createRequest({ numPeers: BigInt(numPeers) }); - const peer = await this.peerStore.get(params.peerId); + const peer = await this.peerStore.get(peerId); if (!peer) { return { peerInfos: null, diff --git a/packages/discovery/src/peer-exchange/waku_peer_exchange_discovery.ts b/packages/discovery/src/peer-exchange/waku_peer_exchange_discovery.ts index 516115ee5b..f58f298be1 100644 --- a/packages/discovery/src/peer-exchange/waku_peer_exchange_discovery.ts +++ b/packages/discovery/src/peer-exchange/waku_peer_exchange_discovery.ts @@ -11,9 +11,10 @@ import { type Libp2pComponents, type PeerExchangeQueryResult, PubsubTopic, + ShardInfo, Tags } from "@waku/interfaces"; -import { encodeRelayShard, Logger } from "@waku/utils"; +import { decodeRelayShard, encodeRelayShard, Logger } from "@waku/utils"; import { PeerExchangeCodec, WakuPeerExchange } from "./waku_peer_exchange.js"; @@ -198,7 +199,48 @@ export class PeerExchangeDiscovery const hasPeer = await this.components.peerStore.has(peerId); if (hasPeer) { - continue; + const { hasMultiaddrDiff, hasShardDiff } = await this.checkPeerInfoDiff( + peerInfo, + shardInfo + ); + + if (hasMultiaddrDiff || hasShardDiff) { + log.info( + `Peer ${peerId.toString()} has updated multiaddrs or shardInfo, updating` + ); + + if (hasMultiaddrDiff) { + log.info( + `Peer ${peerId.toString()} has updated multiaddrs, updating` + ); + + await this.components.peerStore.patch(peerId, { + multiaddrs: peerInfo.multiaddrs + }); + } + + if (hasShardDiff && shardInfo) { + log.info( + `Peer ${peerId.toString()} has updated shardInfo, updating` + ); + await this.components.peerStore.merge(peerId, { + metadata: { + shardInfo: encodeRelayShard(shardInfo) + } + }); + + this.dispatchEvent( + new CustomEvent("peer", { + detail: { + id: peerId, + multiaddrs: peerInfo.multiaddrs + } + }) + ); + } + + continue; + } } // update the tags for the peer @@ -213,6 +255,9 @@ export class PeerExchangeDiscovery metadata: { shardInfo: encodeRelayShard(shardInfo) } + }), + ...(peerInfo.multiaddrs && { + multiaddrs: peerInfo.multiaddrs }) }); @@ -236,6 +281,37 @@ export class PeerExchangeDiscovery this.queryingPeers.delete(peerIdStr); this.queryAttempts.delete(peerIdStr); } + + private async checkPeerInfoDiff( + peerInfo: PeerInfo, + shardInfo?: ShardInfo + ): Promise<{ hasMultiaddrDiff: boolean; hasShardDiff: boolean }> { + const { id: peerId } = peerInfo; + const peer = await this.components.peerStore.get(peerId); + + const existingMultiaddrs = peer.addresses.map((a) => + a.multiaddr.toString() + ); + const newMultiaddrs = peerInfo.multiaddrs.map((ma) => ma.toString()); + const hasMultiaddrDiff = existingMultiaddrs.some( + (ma) => !newMultiaddrs.includes(ma) + ); + + let hasShardDiff: boolean = false; + const existingShardInfoBytes = peer.metadata.get("shardInfo"); + if (existingShardInfoBytes) { + const existingShardInfo = decodeRelayShard(existingShardInfoBytes); + if (existingShardInfo || shardInfo) { + hasShardDiff = + existingShardInfo.clusterId !== shardInfo?.clusterId || + existingShardInfo.shards.some( + (shard) => !shardInfo?.shards.includes(shard) + ); + } + } + + return { hasMultiaddrDiff, hasShardDiff }; + } } export function wakuPeerExchangeDiscovery( diff --git a/packages/tests/tests/peer-exchange/continuous_discovery.spec.ts b/packages/tests/tests/peer-exchange/continuous_discovery.spec.ts new file mode 100644 index 0000000000..fe5c18ba73 --- /dev/null +++ b/packages/tests/tests/peer-exchange/continuous_discovery.spec.ts @@ -0,0 +1,127 @@ +import { type PeerId } from "@libp2p/interface"; +import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; +import { multiaddr } from "@multiformats/multiaddr"; +import { PeerExchangeDiscovery } from "@waku/discovery"; +import { IEnr, LightNode } from "@waku/interfaces"; +import { createLightNode, ShardInfo } from "@waku/sdk"; +import { decodeRelayShard, shardInfoToPubsubTopics } from "@waku/utils"; +import { expect } from "chai"; +import Sinon from "sinon"; + +describe("Peer Exchange Continuous Discovery", () => { + let peerExchangeDiscovery: PeerExchangeDiscovery; + let queryStub: Sinon.SinonStub; + let peerId: PeerId; + let randomPeerId: PeerId; + let waku: LightNode; + const shardInfo: ShardInfo = { + clusterId: 1, + shards: [1, 2] + }; + const multiaddrs = [multiaddr("/ip4/127.0.0.1/udp/1234")]; + + beforeEach(async () => { + waku = await createLightNode(); + + peerExchangeDiscovery = new PeerExchangeDiscovery( + waku.libp2p.components, + shardInfoToPubsubTopics(shardInfo) + ); + queryStub = Sinon.stub( + (peerExchangeDiscovery as any).peerExchange, + "query" as any + ); + + await discoverPeerOnce(); + }); + + it("Should update multiaddrs", async () => { + const newMultiaddrs = [multiaddr("/ip4/192.168.1.1/udp/1234")]; + const newPeerInfo = { + ENR: { + peerId, + shardInfo, + peerInfo: { + multiaddrs: newMultiaddrs, + id: peerId + } + } as IEnr + }; + queryStub.resolves({ error: null, peerInfos: [newPeerInfo] }); + + const newResult = await (peerExchangeDiscovery as any).query(randomPeerId); + expect(newResult.error).to.be.null; + const newPeers = await waku.libp2p.peerStore.all(); + expect(newPeers.length).to.equal(1); + const newPeer = newPeers[0]; + expect(newPeer.addresses.length).to.equal(1); + expect(newPeer.addresses[0].multiaddr.toString()).to.equal( + newMultiaddrs[0].toString() + ); + }); + + it("Should update shard info", async () => { + const newShardInfo: ShardInfo = { + clusterId: 2, + shards: [1, 2, 3] + }; + const newPeerInfo = { + ENR: { + peerId, + shardInfo: newShardInfo, + peerInfo: { + multiaddrs: multiaddrs, + id: peerId + } + } as IEnr + }; + queryStub.resolves({ error: null, peerInfos: [newPeerInfo] }); + + const newResult = await (peerExchangeDiscovery as any).query(randomPeerId); + expect(newResult.error).to.be.null; + const newPeers = await waku.libp2p.peerStore.all(); + expect(newPeers.length).to.equal(1); + const newPeer = newPeers[0]; + expect(newPeer.addresses.length).to.equal(1); + expect(newPeer.addresses[0].multiaddr.toString()).to.equal( + multiaddrs[0].toString() + ); + + const _shardInfo = decodeRelayShard(newPeer.metadata.get("shardInfo")!); + expect(_shardInfo).to.deep.equal(newShardInfo); + }); + + async function discoverPeerOnce(): Promise { + peerId = await createSecp256k1PeerId(); + + const enr: IEnr = { + peerId, + shardInfo, + peerInfo: { + multiaddrs: multiaddrs, + id: peerId + } + } as IEnr; + + const peerInfo = { + ENR: enr + }; + + queryStub.resolves({ error: null, peerInfos: [peerInfo] }); + + randomPeerId = await createSecp256k1PeerId(); + + const result = await (peerExchangeDiscovery as any).query(randomPeerId); + expect(result.error).to.be.null; + + const peers = await waku.libp2p.peerStore.all(); + expect(peers.length).to.equal(1); + const peer = peers[0]; + expect(peer.addresses.length).to.equal(1); + expect(peer.addresses[0].multiaddr.toString()).to.equal( + multiaddrs[0].toString() + ); + const _shardInfo = decodeRelayShard(peer.metadata.get("shardInfo")!); + expect(_shardInfo).to.deep.equal(shardInfo); + } +});