diff --git a/libp2p/discovery/discoverymngr.nim b/libp2p/discovery/discoverymngr.nim new file mode 100644 index 000000000..6c1854361 --- /dev/null +++ b/libp2p/discovery/discoverymngr.nim @@ -0,0 +1,163 @@ +# Nim-LibP2P +# Copyright (c) 2022 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import std/sequtils +import chronos, chronicles, stew/results +import ../errors + +type + BaseAttr = ref object of RootObj + comparator: proc(f, c: BaseAttr): bool {.gcsafe, raises: [Defect].} + + Attribute[T] = ref object of BaseAttr + value: T + + PeerAttributes* = object + attributes: seq[BaseAttr] + + DiscoveryService* = distinct string + +proc `==`*(a, b: DiscoveryService): bool {.borrow.} + +proc ofType*[T](f: BaseAttr, _: type[T]): bool = + return f of Attribute[T] + +proc to*[T](f: BaseAttr, _: type[T]): T = + Attribute[T](f).value + +proc add*[T](pa: var PeerAttributes, + value: T) = + pa.attributes.add(Attribute[T]( + value: value, + comparator: proc(f: BaseAttr, c: BaseAttr): bool = + f.ofType(T) and c.ofType(T) and f.to(T) == c.to(T) + ) + ) + +iterator items*(pa: PeerAttributes): BaseAttr = + for f in pa.attributes: + yield f + +proc getAll*[T](pa: PeerAttributes, t: typedesc[T]): seq[T] = + for f in pa.attributes: + if f.ofType(T): + result.add(f.to(T)) + +proc `{}`*[T](pa: PeerAttributes, t: typedesc[T]): Opt[T] = + for f in pa.attributes: + if f.ofType(T): + return Opt.some(f.to(T)) + Opt.none(T) + +proc `[]`*[T](pa: PeerAttributes, t: typedesc[T]): T {.raises: [Defect, KeyError].} = + pa{T}.valueOr: raise newException(KeyError, "Attritute not found") + +proc match*(pa, candidate: PeerAttributes): bool = + for f in pa.attributes: + block oneAttribute: + for field in candidate.attributes: + if field.comparator(field, f): + break oneAttribute + return false + return true + +type + PeerFoundCallback* = proc(pa: PeerAttributes) {.raises: [Defect], gcsafe.} + + DiscoveryInterface* = ref object of RootObj + onPeerFound*: PeerFoundCallback + toAdvertise*: PeerAttributes + advertisementUpdated*: AsyncEvent + advertiseLoop*: Future[void] + +method request*(self: DiscoveryInterface, pa: PeerAttributes) {.async, base.} = + doAssert(false, "Not implemented!") + +method advertise*(self: DiscoveryInterface) {.async, base.} = + doAssert(false, "Not implemented!") + +type + DiscoveryError* = object of LPError + + DiscoveryQuery* = ref object + attr: PeerAttributes + peers: AsyncQueue[PeerAttributes] + futs: seq[Future[void]] + + DiscoveryManager* = ref object + interfaces: seq[DiscoveryInterface] + queries: seq[DiscoveryQuery] + +proc add*(dm: DiscoveryManager, di: DiscoveryInterface) = + dm.interfaces &= di + + di.onPeerFound = proc (pa: PeerAttributes) = + for query in dm.queries: + if query.attr.match(pa): + try: + query.peers.putNoWait(pa) + except AsyncQueueFullError as exc: + debug "Cannot push discovered peer to queue" + +proc request*(dm: DiscoveryManager, pa: PeerAttributes): DiscoveryQuery = + var query = DiscoveryQuery(attr: pa, peers: newAsyncQueue[PeerAttributes]()) + for i in dm.interfaces: + query.futs.add(i.request(pa)) + dm.queries.add(query) + dm.queries.keepItIf(it.futs.anyIt(not it.finished())) + return query + +proc request*[T](dm: DiscoveryManager, value: T): DiscoveryQuery = + var pa: PeerAttributes + pa.add(value) + return dm.request(pa) + +proc advertise*(dm: DiscoveryManager, pa: PeerAttributes) = + for i in dm.interfaces: + i.toAdvertise = pa + if i.advertiseLoop.isNil: + i.advertisementUpdated = newAsyncEvent() + i.advertiseLoop = i.advertise() + else: + i.advertisementUpdated.fire() + +proc advertise*[T](dm: DiscoveryManager, value: T) = + var pa: PeerAttributes + pa.add(value) + dm.advertise(pa) + +proc stop*(query: DiscoveryQuery) = + for r in query.futs: + if not r.finished(): r.cancel() + +proc stop*(dm: DiscoveryManager) = + for q in dm.queries: + q.stop() + for i in dm.interfaces: + if isNil(i.advertiseLoop): continue + i.advertiseLoop.cancel() + +proc getPeer*(query: DiscoveryQuery): Future[PeerAttributes] {.async.} = + let getter = query.peers.popFirst() + + try: + await getter or allFinished(query.futs) + except CancelledError as exc: + getter.cancel() + raise exc + + if not finished(getter): + # discovery loops only finish when they don't handle the query + raise newException(DiscoveryError, "Unable to find any peer matching this request") + return await getter diff --git a/libp2p/discovery/rendezvousinterface.nim b/libp2p/discovery/rendezvousinterface.nim new file mode 100644 index 000000000..d3196e87b --- /dev/null +++ b/libp2p/discovery/rendezvousinterface.nim @@ -0,0 +1,77 @@ +# Nim-LibP2P +# Copyright (c) 2022 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import sequtils +import chronos +import ./discoverymngr, + ../protocols/rendezvous, + ../peerid + +type + RendezVousInterface* = ref object of DiscoveryInterface + rdv*: RendezVous + timeToRequest: Duration + timeToAdvertise: Duration + + RdvNamespace* = distinct string + +proc `==`*(a, b: RdvNamespace): bool {.borrow.} + +method request*(self: RendezVousInterface, pa: PeerAttributes) {.async.} = + var namespace = "" + for attr in pa: + if attr.ofType(RdvNamespace): + namespace = string attr.to(RdvNamespace) + elif attr.ofType(DiscoveryService): + namespace = string attr.to(DiscoveryService) + elif attr.ofType(PeerId): + namespace = $attr.to(PeerId) + else: + # unhandled type + return + while true: + for pr in await self.rdv.request(namespace): + var peer: PeerAttributes + peer.add(pr.peerId) + for address in pr.addresses: + peer.add(address.address) + + peer.add(DiscoveryService(namespace)) + peer.add(RdvNamespace(namespace)) + self.onPeerFound(peer) + + await sleepAsync(self.timeToRequest) + +method advertise*(self: RendezVousInterface) {.async.} = + while true: + var toAdvertise: seq[string] + for attr in self.toAdvertise: + if attr.ofType(RdvNamespace): + toAdvertise.add string attr.to(RdvNamespace) + elif attr.ofType(DiscoveryService): + toAdvertise.add string attr.to(DiscoveryService) + elif attr.ofType(PeerId): + toAdvertise.add $attr.to(PeerId) + + self.advertisementUpdated.clear() + for toAdv in toAdvertise: + await self.rdv.advertise(toAdv, self.timeToAdvertise) + + await sleepAsync(self.timeToAdvertise) or self.advertisementUpdated.wait() + +proc new*(T: typedesc[RendezVousInterface], + rdv: RendezVous, + ttr: Duration = 1.minutes, + tta: Duration = MinimumDuration): RendezVousInterface = + T(rdv: rdv, timeToRequest: ttr, timeToAdvertise: tta) diff --git a/libp2p/protocols/rendezvous.nim b/libp2p/protocols/rendezvous.nim index f7d58c95b..5460cdc21 100644 --- a/libp2p/protocols/rendezvous.nim +++ b/libp2p/protocols/rendezvous.nim @@ -32,7 +32,7 @@ logScope: const RendezVousCodec* = "/rendezvous/1.0.0" - MinimumDuration = 2.hours + MinimumDuration* = 2.hours MaximumDuration = 72.hours MinimumTTL = MinimumDuration.seconds.uint64 MaximumTTL = MaximumDuration.seconds.uint64 @@ -284,10 +284,6 @@ type peerId: PeerId data: Register - RegisteredSeq = object - s: seq[RegisteredData] - offset: uint64 - RendezVous* = ref object of LPProtocol # Registered needs to be an offsetted sequence # because we need stable index for the cookies. diff --git a/tests/testdiscovery.nim b/tests/testdiscovery.nim new file mode 100644 index 000000000..58ebf9310 --- /dev/null +++ b/tests/testdiscovery.nim @@ -0,0 +1,51 @@ +{.used.} + +import options, chronos, sets +import stew/byteutils +import ../libp2p/[protocols/rendezvous, + switch, + builders, + discovery/discoverymngr, + discovery/rendezvousinterface,] +import ./helpers + +proc createSwitch(rdv: RendezVous = RendezVous.new()): Switch = + SwitchBuilder.new() + .withRng(newRng()) + .withAddresses(@[ MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() ]) + .withTcpTransport() + .withMplex() + .withNoise() + .withRendezVous(rdv) + .build() + +suite "Discovery": + teardown: + checkTrackers() + asyncTest "RendezVous test": + let + rdvA = RendezVous.new() + rdvB = RendezVous.new() + clientA = createSwitch(rdvA) + clientB = createSwitch(rdvB) + remoteNode = createSwitch() + dmA = DiscoveryManager() + dmB = DiscoveryManager() + dmA.add(RendezVousInterface.new(rdvA, ttr = 500.milliseconds)) + dmB.add(RendezVousInterface.new(rdvB)) + await allFutures(clientA.start(), clientB.start(), remoteNode.start()) + + await clientB.connect(remoteNode.peerInfo.peerId, remoteNode.peerInfo.addrs) + await clientA.connect(remoteNode.peerInfo.peerId, remoteNode.peerInfo.addrs) + + dmB.advertise(RdvNamespace("foo")) + + let + query = dmA.request(RdvNamespace("foo")) + res = await query.getPeer() + check: + res{PeerId}.get() == clientB.peerInfo.peerId + res[PeerId] == clientB.peerInfo.peerId + res.getAll(PeerId) == @[clientB.peerInfo.peerId] + toHashSet(res.getAll(MultiAddress)) == toHashSet(clientB.peerInfo.addrs) + await allFutures(clientA.stop(), clientB.stop(), remoteNode.stop()) diff --git a/tests/testnative.nim b/tests/testnative.nim index 3493c0f9f..f35971b4b 100644 --- a/tests/testnative.nim +++ b/tests/testnative.nim @@ -38,5 +38,6 @@ import testtcptransport, testrelayv1, testrelayv2, testrendezvous, + testdiscovery, testyamux, testautonat