Add lookupLoop and other fixes

- add lookupLoop
- protocol close / closeWait
- randomize randomNodes
- Use lookupRequestLimit
- Remove invalid check in neighbours proc
- Add lookup test
This commit is contained in:
kdeme 2020-02-24 15:45:30 +01:00
parent 8aad1231f5
commit f81a87f31b
4 changed files with 109 additions and 36 deletions

View File

@ -20,15 +20,19 @@ type
routingTable: RoutingTable routingTable: RoutingTable
codec: Codec codec: Codec
awaitedPackets: Table[(Node, RequestId), Future[Option[Packet]]] awaitedPackets: Table[(Node, RequestId), Future[Option[Packet]]]
lookupLoop: Future[void]
revalidateLoop: Future[void]
PendingRequest = object PendingRequest = object
node: Node node: Node
packet: seq[byte] packet: seq[byte]
const const
lookupRequestLimit = 15 lookupRequestLimit = 3
findNodeResultLimit = 15 # applies in FINDNODE handler findNodeResultLimit = 15 # applies in FINDNODE handler
findNodeAttempts = 3 lookupInterval = 60.seconds ## Interval of launching a random lookup to
## populate the routing table. go-ethereum seems to do 3 runs every 30
## minutes. Trinity starts one every minute.
proc whoareyouMagic(toNode: NodeId): array[32, byte] = proc whoareyouMagic(toNode: NodeId): array[32, byte] =
const prefix = "WHOAREYOU" const prefix = "WHOAREYOU"
@ -55,9 +59,6 @@ proc newProtocol*(privKey: PrivateKey, db: Database,
result.routingTable.init(node) result.routingTable.init(node)
proc start*(p: Protocol) =
discard
proc send(d: Protocol, a: Address, data: seq[byte]) = proc send(d: Protocol, a: Address, data: seq[byte]) =
# debug "Sending bytes", amount = data.len, to = a # debug "Sending bytes", amount = data.len, to = a
let ta = initTAddress(a.ip, a.udpPort) let ta = initTAddress(a.ip, a.udpPort)
@ -243,16 +244,17 @@ proc lookupDistances(target, dest: NodeId): seq[uint32] =
proc lookupWorker(p: Protocol, destNode: Node, target: NodeId): Future[seq[Node]] {.async.} = proc lookupWorker(p: Protocol, destNode: Node, target: NodeId): Future[seq[Node]] {.async.} =
let dists = lookupDistances(target, destNode.id) let dists = lookupDistances(target, destNode.id)
var i = 0 var i = 0
while i < findNodeAttempts and result.len < findNodeResultLimit: while i < lookupRequestLimit and result.len < findNodeResultLimit:
let r = await p.findNode(destNode, dists[i])
# TODO: Handle failures # TODO: Handle failures
let r = await p.findNode(destNode, dists[i])
# TODO: I guess it makes sense to limit here also to `findNodeResultLimit`?
result.add(r) result.add(r)
inc i inc i
for n in result: for n in result:
discard p.routingTable.addNode(n) discard p.routingTable.addNode(n)
proc lookup(p: Protocol, target: NodeId): Future[seq[Node]] {.async.} = proc lookup*(p: Protocol, target: NodeId): Future[seq[Node]] {.async.} =
## Perform a lookup for the given target, return the closest n nodes to the ## Perform a lookup for the given target, return the closest n nodes to the
## target. Maximum value for n is `BUCKET_SIZE`. ## target. Maximum value for n is `BUCKET_SIZE`.
# TODO: Sort the returned nodes on distance # TODO: Sort the returned nodes on distance
@ -290,7 +292,7 @@ proc lookup(p: Protocol, target: NodeId): Future[seq[Node]] {.async.} =
if result.len < BUCKET_SIZE: if result.len < BUCKET_SIZE:
result.add(n) result.add(n)
proc lookupRandom*(p: Protocol): Future[seq[Node]] = proc lookupRandom*(p: Protocol): Future[seq[Node]] {.raises:[Defect, Exception].} =
var id: NodeId var id: NodeId
discard randomBytes(addr id, sizeof(id)) discard randomBytes(addr id, sizeof(id))
p.lookup(id) p.lookup(id)
@ -312,7 +314,8 @@ proc processClient(transp: DatagramTransport,
debug "Receive failed", exception = e.name, msg = e.msg, debug "Receive failed", exception = e.name, msg = e.msg,
stacktrace = e.getStackTrace() stacktrace = e.getStackTrace()
proc revalidateNode(p: Protocol, n: Node) {.async.} = proc revalidateNode(p: Protocol, n: Node)
{.async, raises:[Defect, Exception].} = # TODO: Exception
let reqId = newRequestId() let reqId = newRequestId()
var ping: PingPacket var ping: PingPacket
ping.enrSeq = p.localNode.record.seqNum ping.enrSeq = p.localNode.record.seqNum
@ -333,17 +336,56 @@ proc revalidateNode(p: Protocol, n: Node) {.async.} =
p.routingTable.removeNode(n) p.routingTable.removeNode(n)
proc revalidateLoop(p: Protocol) {.async.} = proc revalidateLoop(p: Protocol) {.async.} =
while true: try:
await sleepAsync(rand(10 * 1000).milliseconds) # TODO: We need to handle actual errors still, which might just allow to
let n = p.routingTable.nodeToRevalidate() # continue the loop. However, currently `revalidateNode` raises a general
if not n.isNil: # `Exception` making this rather hard.
await p.revalidateNode(n) while true:
await sleepAsync(rand(10 * 1000).milliseconds)
let n = p.routingTable.nodeToRevalidate()
if not n.isNil:
await p.revalidateNode(n)
except CancelledError:
trace "revalidateLoop canceled"
proc lookupLoop(d: Protocol) {.async.} =
## TODO: Same story as for `revalidateLoop`
try:
while true:
let nodes = await d.lookupRandom()
trace "Discovered nodes", nodes
await sleepAsync(lookupInterval)
except CancelledError:
trace "lookupLoop canceled"
proc open*(d: Protocol) = proc open*(d: Protocol) =
debug "Starting discovery node", n = d.localNode
# TODO allow binding to specific IP / IPv6 / etc # TODO allow binding to specific IP / IPv6 / etc
let ta = initTAddress(IPv4_any(), d.localNode.node.address.udpPort) let ta = initTAddress(IPv4_any(), d.localNode.node.address.udpPort)
d.transp = newDatagramTransport(processClient, udata = d, local = ta) d.transp = newDatagramTransport(processClient, udata = d, local = ta)
asyncCheck d.revalidateLoop() # TODO: This loop has to be terminated on close() # Might want to move these to a separate proc if this turns out to be needed.
d.lookupLoop = lookupLoop(d)
d.revalidateLoop = revalidateLoop(d)
proc close*(d: Protocol) =
doAssert(not d.lookupLoop.isNil() or not d.revalidateLoop.isNil())
doAssert(not d.transp.closed)
debug "Closing discovery node", n = d.localNode
d.revalidateLoop.cancel()
d.lookupLoop.cancel()
# TODO: unsure if close can't create issues in the not awaited cancellations
# above
d.transp.close()
proc closeWait*(d: Protocol) {.async.} =
doAssert(not d.lookupLoop.isNil() or not d.revalidateLoop.isNil())
doAssert(not d.transp.closed)
debug "Closing discovery node", n = d.localNode
await allFutures([d.revalidateLoop.cancelAndWait(),
d.lookupLoop.cancelAndWait()])
await d.transp.closeWait()
proc addNode*(d: Protocol, node: Node) = proc addNode*(d: Protocol, node: Node) =
discard d.routingTable.addNode(node) discard d.routingTable.addNode(node)

View File

@ -62,13 +62,13 @@ proc add(k: KBucket, n: Node): Node =
k.lastUpdated = epochTime() k.lastUpdated = epochTime()
let nodeIdx = k.nodes.find(n) let nodeIdx = k.nodes.find(n)
if nodeIdx != -1: if nodeIdx != -1:
k.nodes.delete(nodeIdx) k.nodes.delete(nodeIdx)
k.nodes.add(n) k.nodes.add(n)
elif k.len < BUCKET_SIZE: elif k.len < BUCKET_SIZE:
k.nodes.add(n) k.nodes.add(n)
else: else:
k.replacementCache.add(n) k.replacementCache.add(n)
return k.head return k.head
return nil return nil
proc removeNode(k: KBucket, n: Node) = proc removeNode(k: KBucket, n: Node) =
@ -130,6 +130,7 @@ proc computeSharedPrefixBits(nodes: openarray[Node]): int =
proc init*(r: var RoutingTable, thisNode: Node) {.inline.} = proc init*(r: var RoutingTable, thisNode: Node) {.inline.} =
r.thisNode = thisNode r.thisNode = thisNode
r.buckets = @[newKBucket(0.u256, high(Uint256))] r.buckets = @[newKBucket(0.u256, high(Uint256))]
randomize() # for later `randomNodes` selection
proc splitBucket(r: var RoutingTable, index: int) = proc splitBucket(r: var RoutingTable, index: int) =
let bucket = r.buckets[index] let bucket = r.buckets[index]
@ -180,10 +181,10 @@ proc neighbours*(r: RoutingTable, id: NodeId, k: int = BUCKET_SIZE): seq[Node] =
result = newSeqOfCap[Node](k * 2) result = newSeqOfCap[Node](k * 2)
for bucket in r.bucketsByDistanceTo(id): for bucket in r.bucketsByDistanceTo(id):
for n in bucket.nodesByDistanceTo(id): for n in bucket.nodesByDistanceTo(id):
if n.id != id: result.add(n)
result.add(n) if result.len == k * 2:
if result.len == k * 2: break
break
result = sortedByIt(result, it.distanceTo(id)) result = sortedByIt(result, it.distanceTo(id))
if result.len > k: if result.len > k:
result.setLen(k) result.setLen(k)
@ -215,7 +216,7 @@ proc setJustSeen*(r: RoutingTable, n: Node) =
b.nodes[0] = n b.nodes[0] = n
b.lastUpdated = epochTime() b.lastUpdated = epochTime()
proc nodeToRevalidate*(r: RoutingTable): Node = proc nodeToRevalidate*(r: RoutingTable): Node {.raises:[].} =
var buckets = r.buckets var buckets = r.buckets
shuffle(buckets) shuffle(buckets)
# TODO: Should we prioritize less-recently-updated buckets instead? # TODO: Should we prioritize less-recently-updated buckets instead?

View File

@ -175,6 +175,7 @@ proc computeSharedPrefixBits(nodes: openarray[Node]): int =
proc init(r: var RoutingTable, thisNode: Node) {.inline.} = proc init(r: var RoutingTable, thisNode: Node) {.inline.} =
r.thisNode = thisNode r.thisNode = thisNode
r.buckets = @[newKBucket(0.u256, high(Uint256))] r.buckets = @[newKBucket(0.u256, high(Uint256))]
randomize() # for later `randomNodes` selection
proc splitBucket(r: var RoutingTable, index: int) = proc splitBucket(r: var RoutingTable, index: int) =
let bucket = r.buckets[index] let bucket = r.buckets[index]

View File

@ -1,11 +1,11 @@
import import
unittest, chronos, sequtils, chronicles, random, unittest, chronos, sequtils, chronicles,
eth/keys, eth/p2p/enode, eth/trie/db, eth/keys, eth/p2p/enode, eth/trie/db,
eth/p2p/discoveryv5/[discovery_db, enr, node, types], eth/p2p/discoveryv5/[discovery_db, enr, node, types, routing_table],
eth/p2p/discoveryv5/protocol as discv5_protocol, eth/p2p/discoveryv5/protocol as discv5_protocol,
./p2p_test_helper ./p2p_test_helper
proc startDiscoveryv5Node*(privKey: PrivateKey, address: Address, proc initDiscoveryNode*(privKey: PrivateKey, address: Address,
bootnodes: seq[Record]): discv5_protocol.Protocol = bootnodes: seq[Record]): discv5_protocol.Protocol =
var db = DiscoveryDB.init(newMemoryDB()) var db = DiscoveryDB.init(newMemoryDB())
result = newProtocol(privKey, db, result = newProtocol(privKey, db,
@ -16,20 +16,17 @@ proc startDiscoveryv5Node*(privKey: PrivateKey, address: Address,
result.addNode(node) result.addNode(node)
result.open() result.open()
result.start()
proc nodeIdInNodes(id: NodeId, nodes: openarray[Node]): bool = proc nodeIdInNodes(id: NodeId, nodes: openarray[Node]): bool =
for n in nodes: for n in nodes:
if id == n.id: return true if id == n.id: return true
suite "Discovery v5 Tests": suite "Discovery v5 Tests":
asyncTest "Discover nodes": asyncTest "Random nodes":
let let
bootNodeKey = initPrivateKey("a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a617") bootNodeKey = initPrivateKey("a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a617")
bootNodeAddr = localAddress(20301) bootNodeAddr = localAddress(20301)
bootNode = startDiscoveryv5Node(bootNodeKey, bootNodeAddr, @[]) bootNode = initDiscoveryNode(bootNodeKey, bootNodeAddr, @[])
bootNodeRecord = initRecord(1, bootNodeKey,
{"udp": bootNodeAddr.udpPort.uint16, "ip": [byte 127, 0, 0, 1]})
let nodeKeys = [ let nodeKeys = [
initPrivateKey("a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a618"), initPrivateKey("a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a618"),
@ -40,16 +37,48 @@ suite "Discovery v5 Tests":
for i in 0 ..< nodeKeys.len: nodeAddrs.add(localAddress(20302 + i)) for i in 0 ..< nodeKeys.len: nodeAddrs.add(localAddress(20302 + i))
var nodes = zip(nodeKeys, nodeAddrs).mapIt( var nodes = zip(nodeKeys, nodeAddrs).mapIt(
startDiscoveryv5Node(it.a, it.b, @[bootNodeRecord])) initDiscoveryNode(it.a, it.b, @[bootNode.localNode.record]))
nodes.add(bootNode) nodes.add(bootNode)
for node in nodes: for node in nodes:
let discovered = await node.lookupRandom() let discovered = await node.lookupRandom()
check discovered.len < nodes.len check discovered.len < nodes.len
debug "Lookup from random id", node=node.localNode, discovered debug "Lookup from random id", node = node.localNode, discovered
# Check for each node if the other nodes shows up in the routing table # Check for each node if the other nodes shows up in the routing table
for i in nodes: for i in nodes:
for j in nodes: for j in nodes:
if j != i: if j != i:
check(nodeIdInNodes(i.localNode.id, j.randomNodes(nodes.len - 1))) check(nodeIdInNodes(i.localNode.id, j.randomNodes(nodes.len - 1)))
for node in nodes:
await node.closeWait()
asyncTest "Lookup targets":
const
nodeCount = 5
let
bootNodeKey = newPrivateKey()
bootNodeAddr = localAddress(20301)
bootNode = initDiscoveryNode(bootNodeKey, bootNodeAddr, @[])
var nodes = newSeqOfCap[discv5_protocol.Protocol](nodeCount)
nodes.add(bootNode)
for i in 1 ..< nodeCount:
nodes.add(initDiscoveryNode(newPrivateKey(), localAddress(20301 + i),
@[bootNode.localNode.record]))
# Make sure all random lookups ran once (not guaranteed with the loops)
for node in nodes:
let discovered = await node.lookupRandom()
for i in 0..<nodeCount-1:
let target = nodes[i]
let discovered = await nodes[nodeCount-1].lookup(target.localNode.id)
debug "Lookup result", target = target.localNode, discovered
# if lookUp would return ordered on distance we could check discovered[0]
check discovered.contains(target.localNode)
for node in nodes:
await node.closeWait()