mirror of
https://github.com/status-im/nim-libp2p.git
synced 2025-01-11 05:26:02 +00:00
parent
1c99aca054
commit
32233d36c8
163
libp2p/discovery/discoverymngr.nim
Normal file
163
libp2p/discovery/discoverymngr.nim
Normal file
@ -0,0 +1,163 @@
|
||||
# Nim-LibP2P
|
||||
# Copyright (c) 2022 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
# at your option.
|
||||
# This file may not be copied, modified, or distributed except according to
|
||||
# those terms.
|
||||
|
||||
when (NimMajor, NimMinor) < (1, 4):
|
||||
{.push raises: [Defect].}
|
||||
else:
|
||||
{.push raises: [].}
|
||||
|
||||
import std/sequtils
|
||||
import chronos, chronicles, stew/results
|
||||
import ../errors
|
||||
|
||||
type
|
||||
BaseAttr = ref object of RootObj
|
||||
comparator: proc(f, c: BaseAttr): bool {.gcsafe, raises: [Defect].}
|
||||
|
||||
Attribute[T] = ref object of BaseAttr
|
||||
value: T
|
||||
|
||||
PeerAttributes* = object
|
||||
attributes: seq[BaseAttr]
|
||||
|
||||
DiscoveryService* = distinct string
|
||||
|
||||
proc `==`*(a, b: DiscoveryService): bool {.borrow.}
|
||||
|
||||
proc ofType*[T](f: BaseAttr, _: type[T]): bool =
|
||||
return f of Attribute[T]
|
||||
|
||||
proc to*[T](f: BaseAttr, _: type[T]): T =
|
||||
Attribute[T](f).value
|
||||
|
||||
proc add*[T](pa: var PeerAttributes,
|
||||
value: T) =
|
||||
pa.attributes.add(Attribute[T](
|
||||
value: value,
|
||||
comparator: proc(f: BaseAttr, c: BaseAttr): bool =
|
||||
f.ofType(T) and c.ofType(T) and f.to(T) == c.to(T)
|
||||
)
|
||||
)
|
||||
|
||||
iterator items*(pa: PeerAttributes): BaseAttr =
|
||||
for f in pa.attributes:
|
||||
yield f
|
||||
|
||||
proc getAll*[T](pa: PeerAttributes, t: typedesc[T]): seq[T] =
|
||||
for f in pa.attributes:
|
||||
if f.ofType(T):
|
||||
result.add(f.to(T))
|
||||
|
||||
proc `{}`*[T](pa: PeerAttributes, t: typedesc[T]): Opt[T] =
|
||||
for f in pa.attributes:
|
||||
if f.ofType(T):
|
||||
return Opt.some(f.to(T))
|
||||
Opt.none(T)
|
||||
|
||||
proc `[]`*[T](pa: PeerAttributes, t: typedesc[T]): T {.raises: [Defect, KeyError].} =
|
||||
pa{T}.valueOr: raise newException(KeyError, "Attritute not found")
|
||||
|
||||
proc match*(pa, candidate: PeerAttributes): bool =
|
||||
for f in pa.attributes:
|
||||
block oneAttribute:
|
||||
for field in candidate.attributes:
|
||||
if field.comparator(field, f):
|
||||
break oneAttribute
|
||||
return false
|
||||
return true
|
||||
|
||||
type
|
||||
PeerFoundCallback* = proc(pa: PeerAttributes) {.raises: [Defect], gcsafe.}
|
||||
|
||||
DiscoveryInterface* = ref object of RootObj
|
||||
onPeerFound*: PeerFoundCallback
|
||||
toAdvertise*: PeerAttributes
|
||||
advertisementUpdated*: AsyncEvent
|
||||
advertiseLoop*: Future[void]
|
||||
|
||||
method request*(self: DiscoveryInterface, pa: PeerAttributes) {.async, base.} =
|
||||
doAssert(false, "Not implemented!")
|
||||
|
||||
method advertise*(self: DiscoveryInterface) {.async, base.} =
|
||||
doAssert(false, "Not implemented!")
|
||||
|
||||
type
|
||||
DiscoveryError* = object of LPError
|
||||
|
||||
DiscoveryQuery* = ref object
|
||||
attr: PeerAttributes
|
||||
peers: AsyncQueue[PeerAttributes]
|
||||
futs: seq[Future[void]]
|
||||
|
||||
DiscoveryManager* = ref object
|
||||
interfaces: seq[DiscoveryInterface]
|
||||
queries: seq[DiscoveryQuery]
|
||||
|
||||
proc add*(dm: DiscoveryManager, di: DiscoveryInterface) =
|
||||
dm.interfaces &= di
|
||||
|
||||
di.onPeerFound = proc (pa: PeerAttributes) =
|
||||
for query in dm.queries:
|
||||
if query.attr.match(pa):
|
||||
try:
|
||||
query.peers.putNoWait(pa)
|
||||
except AsyncQueueFullError as exc:
|
||||
debug "Cannot push discovered peer to queue"
|
||||
|
||||
proc request*(dm: DiscoveryManager, pa: PeerAttributes): DiscoveryQuery =
|
||||
var query = DiscoveryQuery(attr: pa, peers: newAsyncQueue[PeerAttributes]())
|
||||
for i in dm.interfaces:
|
||||
query.futs.add(i.request(pa))
|
||||
dm.queries.add(query)
|
||||
dm.queries.keepItIf(it.futs.anyIt(not it.finished()))
|
||||
return query
|
||||
|
||||
proc request*[T](dm: DiscoveryManager, value: T): DiscoveryQuery =
|
||||
var pa: PeerAttributes
|
||||
pa.add(value)
|
||||
return dm.request(pa)
|
||||
|
||||
proc advertise*(dm: DiscoveryManager, pa: PeerAttributes) =
|
||||
for i in dm.interfaces:
|
||||
i.toAdvertise = pa
|
||||
if i.advertiseLoop.isNil:
|
||||
i.advertisementUpdated = newAsyncEvent()
|
||||
i.advertiseLoop = i.advertise()
|
||||
else:
|
||||
i.advertisementUpdated.fire()
|
||||
|
||||
proc advertise*[T](dm: DiscoveryManager, value: T) =
|
||||
var pa: PeerAttributes
|
||||
pa.add(value)
|
||||
dm.advertise(pa)
|
||||
|
||||
proc stop*(query: DiscoveryQuery) =
|
||||
for r in query.futs:
|
||||
if not r.finished(): r.cancel()
|
||||
|
||||
proc stop*(dm: DiscoveryManager) =
|
||||
for q in dm.queries:
|
||||
q.stop()
|
||||
for i in dm.interfaces:
|
||||
if isNil(i.advertiseLoop): continue
|
||||
i.advertiseLoop.cancel()
|
||||
|
||||
proc getPeer*(query: DiscoveryQuery): Future[PeerAttributes] {.async.} =
|
||||
let getter = query.peers.popFirst()
|
||||
|
||||
try:
|
||||
await getter or allFinished(query.futs)
|
||||
except CancelledError as exc:
|
||||
getter.cancel()
|
||||
raise exc
|
||||
|
||||
if not finished(getter):
|
||||
# discovery loops only finish when they don't handle the query
|
||||
raise newException(DiscoveryError, "Unable to find any peer matching this request")
|
||||
return await getter
|
77
libp2p/discovery/rendezvousinterface.nim
Normal file
77
libp2p/discovery/rendezvousinterface.nim
Normal file
@ -0,0 +1,77 @@
|
||||
# Nim-LibP2P
|
||||
# Copyright (c) 2022 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
# at your option.
|
||||
# This file may not be copied, modified, or distributed except according to
|
||||
# those terms.
|
||||
|
||||
when (NimMajor, NimMinor) < (1, 4):
|
||||
{.push raises: [Defect].}
|
||||
else:
|
||||
{.push raises: [].}
|
||||
|
||||
import sequtils
|
||||
import chronos
|
||||
import ./discoverymngr,
|
||||
../protocols/rendezvous,
|
||||
../peerid
|
||||
|
||||
type
|
||||
RendezVousInterface* = ref object of DiscoveryInterface
|
||||
rdv*: RendezVous
|
||||
timeToRequest: Duration
|
||||
timeToAdvertise: Duration
|
||||
|
||||
RdvNamespace* = distinct string
|
||||
|
||||
proc `==`*(a, b: RdvNamespace): bool {.borrow.}
|
||||
|
||||
method request*(self: RendezVousInterface, pa: PeerAttributes) {.async.} =
|
||||
var namespace = ""
|
||||
for attr in pa:
|
||||
if attr.ofType(RdvNamespace):
|
||||
namespace = string attr.to(RdvNamespace)
|
||||
elif attr.ofType(DiscoveryService):
|
||||
namespace = string attr.to(DiscoveryService)
|
||||
elif attr.ofType(PeerId):
|
||||
namespace = $attr.to(PeerId)
|
||||
else:
|
||||
# unhandled type
|
||||
return
|
||||
while true:
|
||||
for pr in await self.rdv.request(namespace):
|
||||
var peer: PeerAttributes
|
||||
peer.add(pr.peerId)
|
||||
for address in pr.addresses:
|
||||
peer.add(address.address)
|
||||
|
||||
peer.add(DiscoveryService(namespace))
|
||||
peer.add(RdvNamespace(namespace))
|
||||
self.onPeerFound(peer)
|
||||
|
||||
await sleepAsync(self.timeToRequest)
|
||||
|
||||
method advertise*(self: RendezVousInterface) {.async.} =
|
||||
while true:
|
||||
var toAdvertise: seq[string]
|
||||
for attr in self.toAdvertise:
|
||||
if attr.ofType(RdvNamespace):
|
||||
toAdvertise.add string attr.to(RdvNamespace)
|
||||
elif attr.ofType(DiscoveryService):
|
||||
toAdvertise.add string attr.to(DiscoveryService)
|
||||
elif attr.ofType(PeerId):
|
||||
toAdvertise.add $attr.to(PeerId)
|
||||
|
||||
self.advertisementUpdated.clear()
|
||||
for toAdv in toAdvertise:
|
||||
await self.rdv.advertise(toAdv, self.timeToAdvertise)
|
||||
|
||||
await sleepAsync(self.timeToAdvertise) or self.advertisementUpdated.wait()
|
||||
|
||||
proc new*(T: typedesc[RendezVousInterface],
|
||||
rdv: RendezVous,
|
||||
ttr: Duration = 1.minutes,
|
||||
tta: Duration = MinimumDuration): RendezVousInterface =
|
||||
T(rdv: rdv, timeToRequest: ttr, timeToAdvertise: tta)
|
@ -32,7 +32,7 @@ logScope:
|
||||
|
||||
const
|
||||
RendezVousCodec* = "/rendezvous/1.0.0"
|
||||
MinimumDuration = 2.hours
|
||||
MinimumDuration* = 2.hours
|
||||
MaximumDuration = 72.hours
|
||||
MinimumTTL = MinimumDuration.seconds.uint64
|
||||
MaximumTTL = MaximumDuration.seconds.uint64
|
||||
@ -284,10 +284,6 @@ type
|
||||
peerId: PeerId
|
||||
data: Register
|
||||
|
||||
RegisteredSeq = object
|
||||
s: seq[RegisteredData]
|
||||
offset: uint64
|
||||
|
||||
RendezVous* = ref object of LPProtocol
|
||||
# Registered needs to be an offsetted sequence
|
||||
# because we need stable index for the cookies.
|
||||
|
51
tests/testdiscovery.nim
Normal file
51
tests/testdiscovery.nim
Normal file
@ -0,0 +1,51 @@
|
||||
{.used.}
|
||||
|
||||
import options, chronos, sets
|
||||
import stew/byteutils
|
||||
import ../libp2p/[protocols/rendezvous,
|
||||
switch,
|
||||
builders,
|
||||
discovery/discoverymngr,
|
||||
discovery/rendezvousinterface,]
|
||||
import ./helpers
|
||||
|
||||
proc createSwitch(rdv: RendezVous = RendezVous.new()): Switch =
|
||||
SwitchBuilder.new()
|
||||
.withRng(newRng())
|
||||
.withAddresses(@[ MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() ])
|
||||
.withTcpTransport()
|
||||
.withMplex()
|
||||
.withNoise()
|
||||
.withRendezVous(rdv)
|
||||
.build()
|
||||
|
||||
suite "Discovery":
|
||||
teardown:
|
||||
checkTrackers()
|
||||
asyncTest "RendezVous test":
|
||||
let
|
||||
rdvA = RendezVous.new()
|
||||
rdvB = RendezVous.new()
|
||||
clientA = createSwitch(rdvA)
|
||||
clientB = createSwitch(rdvB)
|
||||
remoteNode = createSwitch()
|
||||
dmA = DiscoveryManager()
|
||||
dmB = DiscoveryManager()
|
||||
dmA.add(RendezVousInterface.new(rdvA, ttr = 500.milliseconds))
|
||||
dmB.add(RendezVousInterface.new(rdvB))
|
||||
await allFutures(clientA.start(), clientB.start(), remoteNode.start())
|
||||
|
||||
await clientB.connect(remoteNode.peerInfo.peerId, remoteNode.peerInfo.addrs)
|
||||
await clientA.connect(remoteNode.peerInfo.peerId, remoteNode.peerInfo.addrs)
|
||||
|
||||
dmB.advertise(RdvNamespace("foo"))
|
||||
|
||||
let
|
||||
query = dmA.request(RdvNamespace("foo"))
|
||||
res = await query.getPeer()
|
||||
check:
|
||||
res{PeerId}.get() == clientB.peerInfo.peerId
|
||||
res[PeerId] == clientB.peerInfo.peerId
|
||||
res.getAll(PeerId) == @[clientB.peerInfo.peerId]
|
||||
toHashSet(res.getAll(MultiAddress)) == toHashSet(clientB.peerInfo.addrs)
|
||||
await allFutures(clientA.stop(), clientB.stop(), remoteNode.stop())
|
@ -38,5 +38,6 @@ import testtcptransport,
|
||||
testrelayv1,
|
||||
testrelayv2,
|
||||
testrendezvous,
|
||||
testdiscovery,
|
||||
testyamux,
|
||||
testautonat
|
||||
|
Loading…
x
Reference in New Issue
Block a user