mirror of https://github.com/status-im/nim-eth.git
Use asyncSpawn instead of asyncCheck so chronos strict makes sense
And additional cleanup: - Push raises Defect at top - remove inlines - remove unneeded gcsafe - remove usage of deprecated calls
This commit is contained in:
parent
9fed10de88
commit
a1da5d5e59
|
@ -289,7 +289,7 @@ proc lookupRandom*(d: DiscoveryProtocol): Future[seq[Node]] {.inline.} =
|
||||||
proc run(d: DiscoveryProtocol) {.async.} =
|
proc run(d: DiscoveryProtocol) {.async.} =
|
||||||
while true:
|
while true:
|
||||||
discard await d.lookupRandom()
|
discard await d.lookupRandom()
|
||||||
await sleepAsync(3000)
|
await sleepAsync(chronos.seconds(3))
|
||||||
trace "Discovered nodes", nodes = d.kademlia.nodesDiscovered
|
trace "Discovered nodes", nodes = d.kademlia.nodesDiscovered
|
||||||
|
|
||||||
proc bootstrap*(d: DiscoveryProtocol) {.async.} =
|
proc bootstrap*(d: DiscoveryProtocol) {.async.} =
|
||||||
|
|
|
@ -16,6 +16,8 @@ import
|
||||||
|
|
||||||
export sets # TODO: This should not be needed, but compilation fails otherwise
|
export sets # TODO: This should not be needed, but compilation fails otherwise
|
||||||
|
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "kademlia"
|
topics = "kademlia"
|
||||||
|
|
||||||
|
@ -79,7 +81,8 @@ proc `$`*(n: Node): string =
|
||||||
"Node[" & $n.node.address.ip & ":" & $n.node.address.udpPort & "]"
|
"Node[" & $n.node.address.ip & ":" & $n.node.address.udpPort & "]"
|
||||||
|
|
||||||
proc hash*(n: Node): hashes.Hash = hash(n.node.pubkey.toRaw)
|
proc hash*(n: Node): hashes.Hash = hash(n.node.pubkey.toRaw)
|
||||||
proc `==`*(a, b: Node): bool = (a.isNil and b.isNil) or (not a.isNil and not b.isNil and a.node.pubkey == b.node.pubkey)
|
proc `==`*(a, b: Node): bool = (a.isNil and b.isNil) or
|
||||||
|
(not a.isNil and not b.isNil and a.node.pubkey == b.node.pubkey)
|
||||||
|
|
||||||
proc newKBucket(istart, iend: NodeId): KBucket =
|
proc newKBucket(istart, iend: NodeId): KBucket =
|
||||||
result.new()
|
result.new()
|
||||||
|
@ -95,8 +98,8 @@ proc distanceTo(k: KBucket, id: NodeId): UInt256 = k.midpoint xor id
|
||||||
proc nodesByDistanceTo(k: KBucket, id: NodeId): seq[Node] =
|
proc nodesByDistanceTo(k: KBucket, id: NodeId): seq[Node] =
|
||||||
sortedByIt(k.nodes, it.distanceTo(id))
|
sortedByIt(k.nodes, it.distanceTo(id))
|
||||||
|
|
||||||
proc len(k: KBucket): int {.inline.} = k.nodes.len
|
proc len(k: KBucket): int = k.nodes.len
|
||||||
proc head(k: KBucket): Node {.inline.} = k.nodes[0]
|
proc head(k: KBucket): Node = k.nodes[0]
|
||||||
|
|
||||||
proc add(k: KBucket, n: Node): Node =
|
proc add(k: KBucket, n: Node): Node =
|
||||||
## Try to add the given node to this bucket.
|
## Try to add the given node to this bucket.
|
||||||
|
@ -137,15 +140,15 @@ proc split(k: KBucket): tuple[lower, upper: KBucket] =
|
||||||
let bucket = if node.id <= splitid: result.lower else: result.upper
|
let bucket = if node.id <= splitid: result.lower else: result.upper
|
||||||
bucket.replacementCache.add(node)
|
bucket.replacementCache.add(node)
|
||||||
|
|
||||||
proc inRange(k: KBucket, n: Node): bool {.inline.} =
|
proc inRange(k: KBucket, n: Node): bool =
|
||||||
k.istart <= n.id and n.id <= k.iend
|
k.istart <= n.id and n.id <= k.iend
|
||||||
|
|
||||||
proc isFull(k: KBucket): bool = k.len == BUCKET_SIZE
|
proc isFull(k: KBucket): bool = k.len == BUCKET_SIZE
|
||||||
|
|
||||||
proc contains(k: KBucket, n: Node): bool = n in k.nodes
|
proc contains(k: KBucket, n: Node): bool = n in k.nodes
|
||||||
|
|
||||||
proc binaryGetBucketForNode(buckets: openarray[KBucket],
|
proc binaryGetBucketForNode(buckets: openarray[KBucket], n: Node):
|
||||||
n: Node): KBucket {.inline.} =
|
KBucket {.raises: [ValueError, Defect].} =
|
||||||
## Given a list of ordered buckets, returns the bucket for a given node.
|
## Given a list of ordered buckets, returns the bucket for a given node.
|
||||||
let bucketPos = lowerBound(buckets, n.id) do(a: KBucket, b: NodeId) -> int:
|
let bucketPos = lowerBound(buckets, n.id) do(a: KBucket, b: NodeId) -> int:
|
||||||
cmp(a.iend, b)
|
cmp(a.iend, b)
|
||||||
|
@ -174,7 +177,7 @@ proc computeSharedPrefixBits(nodes: openarray[Node]): int =
|
||||||
|
|
||||||
doAssert(false, "Unable to calculate number of shared prefix bits")
|
doAssert(false, "Unable to calculate number of shared prefix bits")
|
||||||
|
|
||||||
proc init(r: var RoutingTable, thisNode: Node) {.inline.} =
|
proc init(r: var RoutingTable, thisNode: Node) =
|
||||||
r.thisNode = thisNode
|
r.thisNode = thisNode
|
||||||
r.buckets = @[newKBucket(0.u256, high(Uint256))]
|
r.buckets = @[newKBucket(0.u256, high(Uint256))]
|
||||||
randomize() # for later `randomNodes` selection
|
randomize() # for later `randomNodes` selection
|
||||||
|
@ -185,13 +188,15 @@ proc splitBucket(r: var RoutingTable, index: int) =
|
||||||
r.buckets[index] = a
|
r.buckets[index] = a
|
||||||
r.buckets.insert(b, index + 1)
|
r.buckets.insert(b, index + 1)
|
||||||
|
|
||||||
proc bucketForNode(r: RoutingTable, n: Node): KBucket =
|
proc bucketForNode(r: RoutingTable, n: Node): KBucket
|
||||||
|
{.raises: [ValueError, Defect].} =
|
||||||
binaryGetBucketForNode(r.buckets, n)
|
binaryGetBucketForNode(r.buckets, n)
|
||||||
|
|
||||||
proc removeNode(r: var RoutingTable, n: Node) =
|
proc removeNode(r: var RoutingTable, n: Node) {.raises: [ValueError, Defect].} =
|
||||||
r.bucketForNode(n).removeNode(n)
|
r.bucketForNode(n).removeNode(n)
|
||||||
|
|
||||||
proc addNode(r: var RoutingTable, n: Node): Node =
|
proc addNode(r: var RoutingTable, n: Node): Node
|
||||||
|
{.raises: [ValueError, Defect].} =
|
||||||
if n == r.thisNode:
|
if n == r.thisNode:
|
||||||
warn "Trying to add ourselves to the routing table", node = n
|
warn "Trying to add ourselves to the routing table", node = n
|
||||||
return
|
return
|
||||||
|
@ -209,7 +214,8 @@ proc addNode(r: var RoutingTable, n: Node): Node =
|
||||||
# Nothing added, ping evictionCandidate
|
# Nothing added, ping evictionCandidate
|
||||||
return evictionCandidate
|
return evictionCandidate
|
||||||
|
|
||||||
proc contains(r: RoutingTable, n: Node): bool = n in r.bucketForNode(n)
|
proc contains(r: RoutingTable, n: Node): bool {.raises: [ValueError, Defect].} =
|
||||||
|
n in r.bucketForNode(n)
|
||||||
|
|
||||||
proc bucketsByDistanceTo(r: RoutingTable, id: NodeId): seq[KBucket] =
|
proc bucketsByDistanceTo(r: RoutingTable, id: NodeId): seq[KBucket] =
|
||||||
sortedByIt(r.buckets, it.distanceTo(id))
|
sortedByIt(r.buckets, it.distanceTo(id))
|
||||||
|
@ -243,9 +249,11 @@ proc newKademliaProtocol*[Wire](
|
||||||
result.routing.init(thisNode)
|
result.routing.init(thisNode)
|
||||||
result.rng = rng
|
result.rng = rng
|
||||||
|
|
||||||
proc bond(k: KademliaProtocol, n: Node): Future[bool] {.async, gcsafe.}
|
proc bond(k: KademliaProtocol, n: Node): Future[bool] {.async.}
|
||||||
|
proc bondDiscard(k: KademliaProtocol, n: Node) {.async.}
|
||||||
|
|
||||||
proc updateRoutingTable(k: KademliaProtocol, n: Node) {.gcsafe.} =
|
proc updateRoutingTable(k: KademliaProtocol, n: Node)
|
||||||
|
{.raises: [ValueError, Defect].} =
|
||||||
## Update the routing table entry for the given node.
|
## Update the routing table entry for the given node.
|
||||||
let evictionCandidate = k.routing.addNode(n)
|
let evictionCandidate = k.routing.addNode(n)
|
||||||
if not evictionCandidate.isNil:
|
if not evictionCandidate.isNil:
|
||||||
|
@ -253,17 +261,17 @@ proc updateRoutingTable(k: KademliaProtocol, n: Node) {.gcsafe.} =
|
||||||
# with the least recently seen node on that bucket. If the bonding fails the node will
|
# with the least recently seen node on that bucket. If the bonding fails the node will
|
||||||
# be removed from the bucket and a new one will be picked from the bucket's
|
# be removed from the bucket and a new one will be picked from the bucket's
|
||||||
# replacement cache.
|
# replacement cache.
|
||||||
asyncCheck k.bond(evictionCandidate)
|
asyncSpawn k.bondDiscard(evictionCandidate)
|
||||||
|
|
||||||
proc doSleep(p: proc() {.gcsafe, raises: [Defect].}) {.async, gcsafe.} =
|
proc doSleep(p: proc() {.gcsafe, raises: [Defect].}) {.async.} =
|
||||||
await sleepAsync(REQUEST_TIMEOUT)
|
await sleepAsync(REQUEST_TIMEOUT)
|
||||||
p()
|
p()
|
||||||
|
|
||||||
template onTimeout(b: untyped) =
|
template onTimeout(b: untyped) =
|
||||||
asyncCheck doSleep() do():
|
asyncSpawn doSleep() do():
|
||||||
b
|
b
|
||||||
|
|
||||||
proc pingId(n: Node, token: seq[byte]): seq[byte] {.inline.} =
|
proc pingId(n: Node, token: seq[byte]): seq[byte] =
|
||||||
result = token & @(n.node.pubkey.toRaw)
|
result = token & @(n.node.pubkey.toRaw)
|
||||||
|
|
||||||
proc waitPong(k: KademliaProtocol, n: Node, pingid: seq[byte]): Future[bool] =
|
proc waitPong(k: KademliaProtocol, n: Node, pingid: seq[byte]): Future[bool] =
|
||||||
|
@ -276,11 +284,11 @@ proc waitPong(k: KademliaProtocol, n: Node, pingid: seq[byte]): Future[bool] =
|
||||||
k.pongFutures.del(pingid)
|
k.pongFutures.del(pingid)
|
||||||
fut.complete(false)
|
fut.complete(false)
|
||||||
|
|
||||||
proc ping(k: KademliaProtocol, n: Node): seq[byte] {.raises: [Defect].} =
|
proc ping(k: KademliaProtocol, n: Node): seq[byte] =
|
||||||
doAssert(n != k.thisNode)
|
doAssert(n != k.thisNode)
|
||||||
result = k.wire.sendPing(n)
|
result = k.wire.sendPing(n)
|
||||||
|
|
||||||
proc waitPing(k: KademliaProtocol, n: Node): Future[bool] {.gcsafe.} =
|
proc waitPing(k: KademliaProtocol, n: Node): Future[bool] =
|
||||||
result = newFuture[bool]("waitPing")
|
result = newFuture[bool]("waitPing")
|
||||||
doAssert(n notin k.pingFutures)
|
doAssert(n notin k.pingFutures)
|
||||||
k.pingFutures[n] = result
|
k.pingFutures[n] = result
|
||||||
|
@ -337,9 +345,21 @@ proc findNode*(k: KademliaProtocol, nodesSeen: ref HashSet[Node],
|
||||||
# 3. Deduplicates candidates
|
# 3. Deduplicates candidates
|
||||||
candidates.keepItIf(not nodesSeen[].containsOrIncl(it))
|
candidates.keepItIf(not nodesSeen[].containsOrIncl(it))
|
||||||
trace "Got new candidates", count = candidates.len
|
trace "Got new candidates", count = candidates.len
|
||||||
let bonded = await all(candidates.mapIt(k.bond(it)))
|
|
||||||
for i in 0 ..< bonded.len:
|
var bondedNodes: seq[Future[bool]] = @[]
|
||||||
if not bonded[i]: candidates[i] = nil
|
for node in candidates:
|
||||||
|
bondedNodes.add(k.bond(node))
|
||||||
|
|
||||||
|
await allFutures(bondedNodes)
|
||||||
|
|
||||||
|
for i in 0..<bondedNodes.len:
|
||||||
|
let b = bondedNodes[i]
|
||||||
|
# `bond` will not raise so there should be no failures,
|
||||||
|
# and for cancellation this should be fine to raise for now.
|
||||||
|
doAssert(b.finished() and not(b.failed()))
|
||||||
|
let bonded = b.read()
|
||||||
|
if not bonded: candidates[i] = nil
|
||||||
|
|
||||||
candidates.keepItIf(not it.isNil)
|
candidates.keepItIf(not it.isNil)
|
||||||
trace "Bonded with candidates", count = candidates.len
|
trace "Bonded with candidates", count = candidates.len
|
||||||
result = candidates
|
result = candidates
|
||||||
|
@ -351,9 +371,9 @@ proc populateNotFullBuckets(k: KademliaProtocol) =
|
||||||
## When the bonding succeeds the node is automatically added to the bucket.
|
## When the bonding succeeds the node is automatically added to the bucket.
|
||||||
for bucket in k.routing.notFullBuckets:
|
for bucket in k.routing.notFullBuckets:
|
||||||
for node in bucket.replacementCache:
|
for node in bucket.replacementCache:
|
||||||
asyncCheck k.bond(node)
|
asyncSpawn k.bondDiscard(node)
|
||||||
|
|
||||||
proc bond(k: KademliaProtocol, n: Node): Future[bool] {.async, gcsafe.} =
|
proc bond(k: KademliaProtocol, n: Node): Future[bool] {.async.} =
|
||||||
## Bond with the given node.
|
## Bond with the given node.
|
||||||
##
|
##
|
||||||
## Bonding consists of pinging the node, waiting for a pong and maybe a ping as well.
|
## Bonding consists of pinging the node, waiting for a pong and maybe a ping as well.
|
||||||
|
@ -389,6 +409,9 @@ proc bond(k: KademliaProtocol, n: Node): Future[bool] {.async, gcsafe.} =
|
||||||
k.updateRoutingTable(n)
|
k.updateRoutingTable(n)
|
||||||
return true
|
return true
|
||||||
|
|
||||||
|
proc bondDiscard(k: KademliaProtocol, n: Node) {.async.} =
|
||||||
|
discard (await k.bond(n))
|
||||||
|
|
||||||
proc sortByDistance(nodes: var seq[Node], nodeId: NodeId, maxResults = 0) =
|
proc sortByDistance(nodes: var seq[Node], nodeId: NodeId, maxResults = 0) =
|
||||||
nodes = nodes.sortedByIt(it.distanceTo(nodeId))
|
nodes = nodes.sortedByIt(it.distanceTo(nodeId))
|
||||||
if maxResults != 0 and nodes.len > maxResults:
|
if maxResults != 0 and nodes.len > maxResults:
|
||||||
|
@ -412,9 +435,19 @@ proc lookup*(k: KademliaProtocol, nodeId: NodeId): Future[seq[Node]] {.async.} =
|
||||||
while nodesToAsk.len != 0:
|
while nodesToAsk.len != 0:
|
||||||
trace "Node lookup; querying ", nodesToAsk
|
trace "Node lookup; querying ", nodesToAsk
|
||||||
nodesAsked.incl(nodesToAsk.toHashSet())
|
nodesAsked.incl(nodesToAsk.toHashSet())
|
||||||
let results = await all(nodesToAsk.mapIt(k.findNode(nodesSeen, nodeId, it)))
|
|
||||||
for candidates in results:
|
var findNodeRequests: seq[Future[seq[Node]]] = @[]
|
||||||
closest.add(candidates)
|
for node in nodesToAsk:
|
||||||
|
findNodeRequests.add(k.findNode(nodesSeen, nodeId, node))
|
||||||
|
|
||||||
|
await allFutures(findNodeRequests)
|
||||||
|
|
||||||
|
for candidates in findNodeRequests:
|
||||||
|
# `findNode` will not raise so there should be no failures,
|
||||||
|
# and for cancellation this should be fine to raise for now.
|
||||||
|
doAssert(candidates.finished() and not(candidates.failed()))
|
||||||
|
closest.add(candidates.read())
|
||||||
|
|
||||||
sortByDistance(closest, nodeId, BUCKET_SIZE)
|
sortByDistance(closest, nodeId, BUCKET_SIZE)
|
||||||
nodesToAsk = excludeIfAsked(closest)
|
nodesToAsk = excludeIfAsked(closest)
|
||||||
|
|
||||||
|
@ -441,7 +474,15 @@ proc bootstrap*(k: KademliaProtocol, bootstrapNodes: seq[Node], retries = 0) {.a
|
||||||
var numTries = 0
|
var numTries = 0
|
||||||
if bootstrapNodes.len != 0:
|
if bootstrapNodes.len != 0:
|
||||||
while true:
|
while true:
|
||||||
let bonded = await all(bootstrapNodes.mapIt(k.bond(it)))
|
var bondedNodes: seq[Future[bool]] = @[]
|
||||||
|
for node in bootstrapNodes:
|
||||||
|
bondedNodes.add(k.bond(node))
|
||||||
|
await allFutures(bondedNodes)
|
||||||
|
|
||||||
|
# `bond` will not raise so there should be no failures,
|
||||||
|
# and for cancellation this should be fine to raise for now.
|
||||||
|
let bonded = bondedNodes.mapIt(it.read())
|
||||||
|
|
||||||
if true notin bonded:
|
if true notin bonded:
|
||||||
inc numTries
|
inc numTries
|
||||||
if retries == 0 or numTries < retries:
|
if retries == 0 or numTries < retries:
|
||||||
|
@ -464,7 +505,8 @@ proc recvPong*(k: KademliaProtocol, n: Node, token: seq[byte]) =
|
||||||
if k.pongFutures.take(pingid, future):
|
if k.pongFutures.take(pingid, future):
|
||||||
future.complete(true)
|
future.complete(true)
|
||||||
|
|
||||||
proc recvPing*(k: KademliaProtocol, n: Node, msgHash: any) =
|
proc recvPing*(k: KademliaProtocol, n: Node, msgHash: any)
|
||||||
|
{.raises: [ValueError, Defect].} =
|
||||||
trace "<<< ping from ", n
|
trace "<<< ping from ", n
|
||||||
k.updateRoutingTable(n)
|
k.updateRoutingTable(n)
|
||||||
k.wire.sendPong(n, msgHash)
|
k.wire.sendPong(n, msgHash)
|
||||||
|
@ -487,7 +529,8 @@ proc recvNeighbours*(k: KademliaProtocol, remote: Node, neighbours: seq[Node]) =
|
||||||
else:
|
else:
|
||||||
trace "Unexpected neighbours, probably came too late", remote
|
trace "Unexpected neighbours, probably came too late", remote
|
||||||
|
|
||||||
proc recvFindNode*(k: KademliaProtocol, remote: Node, nodeId: NodeId) {.gcsafe.} =
|
proc recvFindNode*(k: KademliaProtocol, remote: Node, nodeId: NodeId)
|
||||||
|
{.raises: [ValueError, Defect].} =
|
||||||
if remote notin k.routing:
|
if remote notin k.routing:
|
||||||
# FIXME: This is not correct; a node we've bonded before may have become unavailable
|
# FIXME: This is not correct; a node we've bonded before may have become unavailable
|
||||||
# and thus removed from self.routing, but once it's back online we should accept
|
# and thus removed from self.routing, but once it's back online we should accept
|
||||||
|
@ -521,7 +564,7 @@ proc randomNodes*(k: KademliaProtocol, count: int): seq[Node] =
|
||||||
result.add(node)
|
result.add(node)
|
||||||
seen.incl(node)
|
seen.incl(node)
|
||||||
|
|
||||||
proc nodesDiscovered*(k: KademliaProtocol): int {.inline.} = k.routing.len
|
proc nodesDiscovered*(k: KademliaProtocol): int = k.routing.len
|
||||||
|
|
||||||
when isMainModule:
|
when isMainModule:
|
||||||
proc randomNode(): Node =
|
proc randomNode(): Node =
|
||||||
|
|
Loading…
Reference in New Issue