183 lines
5.1 KiB
Nim
183 lines
5.1 KiB
Nim
# Nim-LibP2P
|
|
# Copyright (c) 2022 Status Research & Development GmbH
|
|
# Licensed under either of
|
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
# at your option.
|
|
# This file may not be copied, modified, or distributed except according to
|
|
# those terms.
|
|
|
|
when (NimMajor, NimMinor) < (1, 4):
|
|
{.push raises: [Defect].}
|
|
else:
|
|
{.push raises: [].}
|
|
|
|
import std/sequtils
|
|
import chronos, chronicles, stew/results
|
|
import ../errors
|
|
|
|
type
|
|
BaseAttr = ref object of RootObj
|
|
comparator: proc(f, c: BaseAttr): bool {.gcsafe, raises: [Defect].}
|
|
|
|
Attribute[T] = ref object of BaseAttr
|
|
value: T
|
|
|
|
PeerAttributes* = object
|
|
attributes: seq[BaseAttr]
|
|
|
|
DiscoveryService* = distinct string
|
|
|
|
proc `==`*(a, b: DiscoveryService): bool {.borrow.}
|
|
|
|
proc ofType*[T](f: BaseAttr, _: type[T]): bool =
|
|
return f of Attribute[T]
|
|
|
|
proc to*[T](f: BaseAttr, _: type[T]): T =
|
|
Attribute[T](f).value
|
|
|
|
proc add*[T](pa: var PeerAttributes,
|
|
value: T) =
|
|
pa.attributes.add(Attribute[T](
|
|
value: value,
|
|
comparator: proc(f: BaseAttr, c: BaseAttr): bool =
|
|
f.ofType(T) and c.ofType(T) and f.to(T) == c.to(T)
|
|
)
|
|
)
|
|
|
|
iterator items*(pa: PeerAttributes): BaseAttr =
|
|
for f in pa.attributes:
|
|
yield f
|
|
|
|
proc getAll*[T](pa: PeerAttributes, t: typedesc[T]): seq[T] =
|
|
for f in pa.attributes:
|
|
if f.ofType(T):
|
|
result.add(f.to(T))
|
|
|
|
proc `{}`*[T](pa: PeerAttributes, t: typedesc[T]): Opt[T] =
|
|
for f in pa.attributes:
|
|
if f.ofType(T):
|
|
return Opt.some(f.to(T))
|
|
Opt.none(T)
|
|
|
|
proc `[]`*[T](pa: PeerAttributes, t: typedesc[T]): T {.raises: [Defect, KeyError].} =
|
|
pa{T}.valueOr: raise newException(KeyError, "Attritute not found")
|
|
|
|
proc match*(pa, candidate: PeerAttributes): bool =
|
|
for f in pa.attributes:
|
|
block oneAttribute:
|
|
for field in candidate.attributes:
|
|
if field.comparator(field, f):
|
|
break oneAttribute
|
|
return false
|
|
return true
|
|
|
|
type
|
|
PeerFoundCallback* = proc(pa: PeerAttributes) {.raises: [Defect], gcsafe.}
|
|
|
|
DiscoveryInterface* = ref object of RootObj
|
|
onPeerFound*: PeerFoundCallback
|
|
toAdvertise*: PeerAttributes
|
|
advertisementUpdated*: AsyncEvent
|
|
advertiseLoop*: Future[void]
|
|
|
|
method request*(self: DiscoveryInterface, pa: PeerAttributes) {.async, base.} =
|
|
doAssert(false, "Not implemented!")
|
|
|
|
method advertise*(self: DiscoveryInterface) {.async, base.} =
|
|
doAssert(false, "Not implemented!")
|
|
|
|
type
|
|
DiscoveryError* = object of LPError
|
|
DiscoveryFinished* = object of LPError
|
|
|
|
DiscoveryQuery* = ref object
|
|
attr: PeerAttributes
|
|
peers: AsyncQueue[PeerAttributes]
|
|
finished: bool
|
|
futs: seq[Future[void]]
|
|
|
|
DiscoveryManager* = ref object
|
|
interfaces: seq[DiscoveryInterface]
|
|
queries: seq[DiscoveryQuery]
|
|
|
|
proc add*(dm: DiscoveryManager, di: DiscoveryInterface) =
|
|
dm.interfaces &= di
|
|
|
|
di.onPeerFound = proc (pa: PeerAttributes) =
|
|
for query in dm.queries:
|
|
if query.attr.match(pa):
|
|
try:
|
|
query.peers.putNoWait(pa)
|
|
except AsyncQueueFullError as exc:
|
|
debug "Cannot push discovered peer to queue"
|
|
|
|
proc request*(dm: DiscoveryManager, pa: PeerAttributes): DiscoveryQuery =
|
|
var query = DiscoveryQuery(attr: pa, peers: newAsyncQueue[PeerAttributes]())
|
|
for i in dm.interfaces:
|
|
query.futs.add(i.request(pa))
|
|
dm.queries.add(query)
|
|
dm.queries.keepItIf(it.futs.anyIt(not it.finished()))
|
|
return query
|
|
|
|
proc request*[T](dm: DiscoveryManager, value: T): DiscoveryQuery =
|
|
var pa: PeerAttributes
|
|
pa.add(value)
|
|
return dm.request(pa)
|
|
|
|
proc advertise*(dm: DiscoveryManager, pa: PeerAttributes) =
|
|
for i in dm.interfaces:
|
|
i.toAdvertise = pa
|
|
if i.advertiseLoop.isNil:
|
|
i.advertisementUpdated = newAsyncEvent()
|
|
i.advertiseLoop = i.advertise()
|
|
else:
|
|
i.advertisementUpdated.fire()
|
|
|
|
proc advertise*[T](dm: DiscoveryManager, value: T) =
|
|
var pa: PeerAttributes
|
|
pa.add(value)
|
|
dm.advertise(pa)
|
|
|
|
template forEach*(query: DiscoveryQuery, code: untyped) =
|
|
## Will execute `code` for each discovered peer. The
|
|
## peer attritubtes are available through the variable
|
|
## `peer`
|
|
|
|
proc forEachInternal(q: DiscoveryQuery) {.async.} =
|
|
while true:
|
|
let peer {.inject.} =
|
|
try: await q.getPeer()
|
|
except DiscoveryFinished: return
|
|
code
|
|
|
|
asyncSpawn forEachInternal(query)
|
|
|
|
proc stop*(query: DiscoveryQuery) =
|
|
query.finished = true
|
|
for r in query.futs:
|
|
if not r.finished(): r.cancel()
|
|
|
|
proc stop*(dm: DiscoveryManager) =
|
|
for q in dm.queries:
|
|
q.stop()
|
|
for i in dm.interfaces:
|
|
if isNil(i.advertiseLoop): continue
|
|
i.advertiseLoop.cancel()
|
|
|
|
proc getPeer*(query: DiscoveryQuery): Future[PeerAttributes] {.async.} =
|
|
let getter = query.peers.popFirst()
|
|
|
|
try:
|
|
await getter or allFinished(query.futs)
|
|
except CancelledError as exc:
|
|
getter.cancel()
|
|
raise exc
|
|
|
|
if not finished(getter):
|
|
if query.finished:
|
|
raise newException(DiscoveryFinished, "Discovery query stopped")
|
|
# discovery loops only finish when they don't handle the query
|
|
raise newException(DiscoveryError, "Unable to find any peer matching this request")
|
|
return await getter
|