Merge pull request #322 from status-im/refresh-only

Only do discovery queries to refresh the table
This commit is contained in:
Kim De Mey 2021-01-13 22:40:35 +01:00 committed by GitHub
commit e25500c9f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 95 additions and 31 deletions

View File

@ -22,6 +22,11 @@ type
desc: "UDP listening port."
name: "udp-port" .}: uint16
listenAddress* {.
defaultValue: defaultListenAddress(config)
desc: "Listening address for the Discovery v5 traffic"
name: "listen-address" }: ValidIpAddress
bootnodes* {.
desc: "ENR URI of node to bootstrap discovery with. Argument may be repeated."
name: "bootnode" .}: seq[enr.Record]
@ -42,7 +47,7 @@ type
name: "metrics" .}: bool
metricsAddress* {.
defaultValue: ValidIpAddress.init("127.0.0.1")
defaultValue: defaultAdminListenAddress(config)
desc: "Listening address of the metrics server."
name: "metrics-address" .}: ValidIpAddress
@ -78,6 +83,12 @@ type
desc: "ENR URI of the node to send a talkreq message"
name: "node" .}: Node
func defaultListenAddress*(conf: DiscoveryConf): ValidIpAddress =
(static ValidIpAddress.init("0.0.0.0"))
func defaultAdminListenAddress*(conf: DiscoveryConf): ValidIpAddress =
(static ValidIpAddress.init("127.0.0.1"))
proc parseCmdArg*(T: type enr.Record, p: TaintedString): T =
if not fromURI(result, p):
raise newException(ConfigurationError, "Invalid ENR")
@ -152,7 +163,7 @@ proc run(config: DiscoveryConf) =
let
(ip, tcpPort, udpPort) = setupNat(config)
d = newProtocol(config.nodeKey, ip, tcpPort, udpPort,
bootstrapRecords = config.bootnodes)
bootstrapRecords = config.bootnodes, bindIp = config.listenAddress)
d.open()

View File

@ -85,8 +85,12 @@ export options
{.push raises: [Defect].}
declarePublicGauge discovery_message_requests,
"Discovery protocol message requests", labels = ["response"]
declarePublicGauge discovery_message_requests_outgoing,
"Discovery protocol outgoing message requests", labels = ["response"]
declarePublicGauge discovery_message_requests_incoming,
"Discovery protocol incoming message requests", labels = ["response"]
declarePublicGauge discovery_unsolicited_messages,
"Discovery protocol unsolicited or timed-out messages"
logScope:
topics = "discv5"
@ -98,11 +102,11 @@ const
findNodeResultLimit = 16 ## Maximum amount of ENRs in the total Nodes messages
## that will be processed
maxNodesPerMessage = 3 ## Maximum amount of ENRs per individual Nodes message
lookupInterval = 60.seconds ## Interval of launching a random lookup to
## populate the routing table. go-ethereum seems to do 3 runs every 30
## minutes. Trinity starts one every minute.
refreshInterval = 5.minutes ## Interval of launching a random query to
## refresh the routing table.
revalidateMax = 10000 ## Revalidation of a peer is done between 0 and this
## value in milliseconds
initialLookups = 1 ## Amount of lookups done when populating the routing table
handshakeTimeout* = 2.seconds ## timeout for the reply on the
## whoareyou message
responseTimeout* = 4.seconds ## timeout for the response of a request-response
@ -118,8 +122,9 @@ type
routingTable: RoutingTable
codec*: Codec
awaitedMessages: Table[(NodeId, RequestId), Future[Option[Message]]]
queryLoop: Future[void]
refreshLoop: Future[void]
revalidateLoop: Future[void]
lastLookup: chronos.Moment
bootstrapRecords*: seq[Record]
rng*: ref BrHmacDrbgContext
@ -310,12 +315,17 @@ proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address,
message: Message) {.raises:[Exception].} =
case message.kind
of ping:
discovery_message_requests_incoming.inc()
d.handlePing(srcId, fromAddr, message.ping, message.reqId)
of findNode:
discovery_message_requests_incoming.inc()
d.handleFindNode(srcId, fromAddr, message.findNode, message.reqId)
of talkreq:
discovery_message_requests_incoming.inc()
d.handleTalkReq(srcId, fromAddr, message.talkreq, message.reqId)
of regtopic, topicquery:
discovery_message_requests_incoming.inc()
discovery_message_requests_incoming.inc(labelValues = ["no_response"])
trace "Received unimplemented message kind", kind = message.kind,
origin = fromAddr
else:
@ -323,6 +333,7 @@ proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address,
if d.awaitedMessages.take((srcId, message.reqId), waiter):
waiter.complete(some(message)) # TODO: raises: [Exception]
else:
discovery_unsolicited_messages.inc()
trace "Timed out or unrequested message", kind = message.kind,
origin = fromAddr
@ -583,7 +594,7 @@ proc sendMessage*[T: SomeMessage](d: Protocol, toNode: Node, m: T):
d.registerRequest(toNode, message, nonce)
trace "Send message packet", dstId = toNode.id, address, kind = messageKind(T)
d.send(toNode, data)
discovery_message_requests.inc()
discovery_message_requests_outgoing.inc()
return reqId
proc ping*(d: Protocol, toNode: Node):
@ -600,7 +611,7 @@ proc ping*(d: Protocol, toNode: Node):
return ok(resp.get().pong)
else:
d.replaceNode(toNode)
discovery_message_requests.inc(labelValues = ["timed_out"])
discovery_message_requests_outgoing.inc(labelValues = ["no_response"])
return err("Pong message not received in time")
proc findNode*(d: Protocol, toNode: Node, distances: seq[uint32]):
@ -618,7 +629,7 @@ proc findNode*(d: Protocol, toNode: Node, distances: seq[uint32]):
return ok(res)
else:
d.replaceNode(toNode)
discovery_message_requests.inc(labelValues = ["timed_out"])
discovery_message_requests_outgoing.inc(labelValues = ["no_response"])
return err(nodes.error)
proc talkreq*(d: Protocol, toNode: Node, protocol, request: seq[byte]):
@ -635,7 +646,7 @@ proc talkreq*(d: Protocol, toNode: Node, protocol, request: seq[byte]):
return ok(resp.get().talkresp)
else:
d.replaceNode(toNode)
discovery_message_requests.inc(labelValues = ["timed_out"])
discovery_message_requests_outgoing.inc(labelValues = ["no_response"])
return err("Talk response message not received in time")
proc lookupDistances(target, dest: NodeId): seq[uint32] {.raises: [Defect].} =
@ -717,6 +728,7 @@ proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]]
if closestNodes.len > BUCKET_SIZE:
closestNodes.del(closestNodes.high())
d.lastLookup = now(chronos.Moment)
return closestNodes
proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]]
@ -765,6 +777,7 @@ proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]]
if not seen.containsOrIncl(n.id):
queryBuffer.add(n)
d.lastLookup = now(chronos.Moment)
return queryBuffer
proc queryRandom*(d: Protocol): Future[seq[Node]]
@ -777,6 +790,18 @@ proc queryRandom*(d: Protocol): Future[seq[Node]]
return await d.query(id)
proc queryRandom*(d: Protocol, enrField: (string, seq[byte])):
Future[seq[Node]] {.async, raises:[Exception, Defect].} =
## Perform a query for a random target, return all nodes discovered which
## contain enrField.
let nodes = await d.queryRandom()
var filtered: seq[Node]
for n in nodes:
if n.record.contains(enrField):
filtered.add(n)
return filtered
proc resolve*(d: Protocol, id: NodeId): Future[Option[Node]]
{.async, raises: [Exception, Defect].} =
## Resolve a `Node` based on provided `NodeId`.
@ -804,6 +829,32 @@ proc resolve*(d: Protocol, id: NodeId): Future[Option[Node]]
return node
proc seedTable*(d: Protocol) =
## Seed the table with known nodes.
for record in d.bootstrapRecords:
if d.addNode(record):
debug "Added bootstrap node", uri = toURI(record)
else:
debug "Bootstrap node could not be added", uri = toURI(record)
# TODO:
# Persistent stored nodes could be added to seed from here
# See: https://github.com/status-im/nim-eth/issues/189
proc populateTable*(d: Protocol) {.async, raises: [Exception, Defect].} =
## Do a set of initial lookups to quickly populate the table.
# start with a self target query (neighbour nodes)
let selfQuery = await d.query(d.localNode.id)
trace "Discovered nodes in self target query", nodes = selfQuery.len
# `initialLookups` random queries
for i in 0..<initialLookups:
let randomQuery = await d.queryRandom()
trace "Discovered nodes in random target query", nodes = randomQuery.len
debug "Total nodes in routing table after populate",
total = d.routingTable.len()
proc revalidateNode*(d: Protocol, n: Node)
{.async, raises: [Exception, Defect].} = # TODO: Exception
let pong = await d.ping(n)
@ -816,6 +867,8 @@ proc revalidateNode*(d: Protocol, n: Node)
discard d.addNode(nodes[][0])
proc revalidateLoop(d: Protocol) {.async, raises: [Exception, Defect].} =
## Loop which revalidates the nodes in the routing table by sending the ping
## message.
# TODO: General Exception raised.
try:
while true:
@ -826,19 +879,23 @@ proc revalidateLoop(d: Protocol) {.async, raises: [Exception, Defect].} =
except CancelledError:
trace "revalidateLoop canceled"
proc queryLoop(d: Protocol) {.async, raises: [Exception, Defect].} =
proc refreshLoop(d: Protocol) {.async, raises: [Exception, Defect].} =
## Loop that refreshes the routing table by starting a random query in case
## no queries were done since `refreshInterval` or more.
# TODO: General Exception raised.
try:
# query target self (neighbour nodes)
let selfQuery = await d.query(d.localNode.id)
trace "Discovered nodes in self target query", nodes = selfQuery.len
await d.populateTable()
while true:
let randomQuery = await d.queryRandom()
trace "Discovered nodes in random target query", nodes = randomQuery.len
debug "Total nodes in discv5 routing table", total = d.routingTable.len()
await sleepAsync(lookupInterval)
let currentTime = now(chronos.Moment)
if currentTime > (d.lastLookup + refreshInterval):
let randomQuery = await d.queryRandom()
trace "Discovered nodes in random target query", nodes = randomQuery.len
debug "Total nodes in discv5 routing table", total = d.routingTable.len()
await sleepAsync(refreshInterval)
except CancelledError:
trace "queryLoop canceled"
trace "refreshLoop canceled"
proc newProtocol*(privKey: PrivateKey,
externalIp: Option[ValidIpAddress], tcpPort, udpPort: Port,
@ -894,14 +951,10 @@ proc open*(d: Protocol) {.raises: [Exception, Defect].} =
# object of Exception. In Nim devel this got changed to CatchableError.
d.transp = newDatagramTransport(processClient, udata = d, local = ta)
for record in d.bootstrapRecords:
if d.addNode(record):
debug "Added bootstrap node", uri = toURI(record)
else:
debug "Bootstrap node could not be added", uri = toURI(record)
d.seedTable()
proc start*(d: Protocol) {.raises: [Exception, Defect].} =
d.queryLoop = queryLoop(d)
d.refreshLoop = refreshLoop(d)
d.revalidateLoop = revalidateLoop(d)
proc close*(d: Protocol) {.raises: [Exception, Defect].} =
@ -910,8 +963,8 @@ proc close*(d: Protocol) {.raises: [Exception, Defect].} =
debug "Closing discovery node", node = d.localNode
if not d.revalidateLoop.isNil:
d.revalidateLoop.cancel()
if not d.queryLoop.isNil:
d.queryLoop.cancel()
if not d.refreshLoop.isNil:
d.refreshLoop.cancel()
d.transp.close()
@ -921,7 +974,7 @@ proc closeWait*(d: Protocol) {.async, raises: [Exception, Defect].} =
debug "Closing discovery node", node = d.localNode
if not d.revalidateLoop.isNil:
await d.revalidateLoop.cancelAndWait()
if not d.queryLoop.isNil:
await d.queryLoop.cancelAndWait()
if not d.refreshLoop.isNil:
await d.refreshLoop.cancelAndWait()
await d.transp.closeWait()