Only do discovery queries to refresh the table

This commit is contained in:
kdeme 2021-01-05 15:03:38 +01:00
parent 8de6bd4f48
commit ac58a1f35c
1 changed files with 37 additions and 16 deletions

View File

@ -98,9 +98,8 @@ const
findNodeResultLimit = 16 ## Maximum amount of ENRs in the total Nodes messages findNodeResultLimit = 16 ## Maximum amount of ENRs in the total Nodes messages
## that will be processed ## that will be processed
maxNodesPerMessage = 3 ## Maximum amount of ENRs per individual Nodes message maxNodesPerMessage = 3 ## Maximum amount of ENRs per individual Nodes message
lookupInterval = 60.seconds ## Interval of launching a random lookup to refreshInterval = 5.minutes ## Interval of launching a random query to
## populate the routing table. go-ethereum seems to do 3 runs every 30 ## refresh the routing table.
## minutes. Trinity starts one every minute.
revalidateMax = 10000 ## Revalidation of a peer is done between 0 and this revalidateMax = 10000 ## Revalidation of a peer is done between 0 and this
## value in milliseconds ## value in milliseconds
handshakeTimeout* = 2.seconds ## timeout for the reply on the handshakeTimeout* = 2.seconds ## timeout for the reply on the
@ -118,8 +117,9 @@ type
routingTable: RoutingTable routingTable: RoutingTable
codec*: Codec codec*: Codec
awaitedMessages: Table[(NodeId, RequestId), Future[Option[Message]]] awaitedMessages: Table[(NodeId, RequestId), Future[Option[Message]]]
queryLoop: Future[void] refreshLoop: Future[void]
revalidateLoop: Future[void] revalidateLoop: Future[void]
lastLookup: chronos.Moment
bootstrapRecords*: seq[Record] bootstrapRecords*: seq[Record]
rng*: ref BrHmacDrbgContext rng*: ref BrHmacDrbgContext
@ -717,6 +717,7 @@ proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]]
if closestNodes.len > BUCKET_SIZE: if closestNodes.len > BUCKET_SIZE:
closestNodes.del(closestNodes.high()) closestNodes.del(closestNodes.high())
d.lastLookup = now(chronos.Moment)
return closestNodes return closestNodes
proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]] proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]]
@ -765,6 +766,7 @@ proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]]
if not seen.containsOrIncl(n.id): if not seen.containsOrIncl(n.id):
queryBuffer.add(n) queryBuffer.add(n)
d.lastLookup = now(chronos.Moment)
return queryBuffer return queryBuffer
proc queryRandom*(d: Protocol): Future[seq[Node]] proc queryRandom*(d: Protocol): Future[seq[Node]]
@ -777,6 +779,18 @@ proc queryRandom*(d: Protocol): Future[seq[Node]]
return await d.query(id) return await d.query(id)
proc queryRandom*(d: Protocol, enrField: (string, seq[byte])):
Future[seq[Node]] {.async, raises:[Exception, Defect].} =
## Perform a query for a random target, return all nodes discovered which
## contain enrField.
let nodes = await d.queryRandom()
var filtered: seq[Node]
for n in nodes:
if n.record.contains(enrField):
filtered.add(n)
return filtered
proc resolve*(d: Protocol, id: NodeId): Future[Option[Node]] proc resolve*(d: Protocol, id: NodeId): Future[Option[Node]]
{.async, raises: [Exception, Defect].} = {.async, raises: [Exception, Defect].} =
## Resolve a `Node` based on provided `NodeId`. ## Resolve a `Node` based on provided `NodeId`.
@ -816,6 +830,8 @@ proc revalidateNode*(d: Protocol, n: Node)
discard d.addNode(nodes[][0]) discard d.addNode(nodes[][0])
proc revalidateLoop(d: Protocol) {.async, raises: [Exception, Defect].} = proc revalidateLoop(d: Protocol) {.async, raises: [Exception, Defect].} =
## Loop which revalidates the nodes in the routing table by sending the ping
## message.
# TODO: General Exception raised. # TODO: General Exception raised.
try: try:
while true: while true:
@ -826,19 +842,24 @@ proc revalidateLoop(d: Protocol) {.async, raises: [Exception, Defect].} =
except CancelledError: except CancelledError:
trace "revalidateLoop canceled" trace "revalidateLoop canceled"
proc queryLoop(d: Protocol) {.async, raises: [Exception, Defect].} = proc refreshLoop(d: Protocol) {.async, raises: [Exception, Defect].} =
## Loop that refreshes the routing table by starting a random query in case
## no queries were done since `refreshInterval` or more.
# TODO: General Exception raised. # TODO: General Exception raised.
try: try:
# query target self (neighbour nodes) # start with a query target self (neighbour nodes)
let selfQuery = await d.query(d.localNode.id) let selfQuery = await d.query(d.localNode.id)
trace "Discovered nodes in self target query", nodes = selfQuery.len trace "Discovered nodes in self target query", nodes = selfQuery.len
while true: while true:
let randomQuery = await d.queryRandom() let currentTime = now(chronos.Moment)
trace "Discovered nodes in random target query", nodes = randomQuery.len if currentTime > (d.lastLookup + refreshInterval):
debug "Total nodes in discv5 routing table", total = d.routingTable.len() let randomQuery = await d.queryRandom()
await sleepAsync(lookupInterval) trace "Discovered nodes in random target query", nodes = randomQuery.len
debug "Total nodes in discv5 routing table", total = d.routingTable.len()
await sleepAsync(refreshInterval)
except CancelledError: except CancelledError:
trace "queryLoop canceled" trace "refreshLoop canceled"
proc newProtocol*(privKey: PrivateKey, proc newProtocol*(privKey: PrivateKey,
externalIp: Option[ValidIpAddress], tcpPort, udpPort: Port, externalIp: Option[ValidIpAddress], tcpPort, udpPort: Port,
@ -901,7 +922,7 @@ proc open*(d: Protocol) {.raises: [Exception, Defect].} =
debug "Bootstrap node could not be added", uri = toURI(record) debug "Bootstrap node could not be added", uri = toURI(record)
proc start*(d: Protocol) {.raises: [Exception, Defect].} = proc start*(d: Protocol) {.raises: [Exception, Defect].} =
d.queryLoop = queryLoop(d) d.refreshLoop = refreshLoop(d)
d.revalidateLoop = revalidateLoop(d) d.revalidateLoop = revalidateLoop(d)
proc close*(d: Protocol) {.raises: [Exception, Defect].} = proc close*(d: Protocol) {.raises: [Exception, Defect].} =
@ -910,8 +931,8 @@ proc close*(d: Protocol) {.raises: [Exception, Defect].} =
debug "Closing discovery node", node = d.localNode debug "Closing discovery node", node = d.localNode
if not d.revalidateLoop.isNil: if not d.revalidateLoop.isNil:
d.revalidateLoop.cancel() d.revalidateLoop.cancel()
if not d.queryLoop.isNil: if not d.refreshLoop.isNil:
d.queryLoop.cancel() d.refreshLoop.cancel()
d.transp.close() d.transp.close()
@ -921,7 +942,7 @@ proc closeWait*(d: Protocol) {.async, raises: [Exception, Defect].} =
debug "Closing discovery node", node = d.localNode debug "Closing discovery node", node = d.localNode
if not d.revalidateLoop.isNil: if not d.revalidateLoop.isNil:
await d.revalidateLoop.cancelAndWait() await d.revalidateLoop.cancelAndWait()
if not d.queryLoop.isNil: if not d.refreshLoop.isNil:
await d.queryLoop.cancelAndWait() await d.refreshLoop.cancelAndWait()
await d.transp.closeWait() await d.transp.closeWait()