Add routing table metrics + tweaks + fixes (#261)

- routing table metrics + option in dcli
- only forward "seen" nodes on a findNode request
- setJustSeen & replace on ping AND findnode
- self lookup only at start
- revalidate 10x more
- use bitsPerHop (b) of 5
- small fix in resolve
- small fix in bucket split
This commit is contained in:
Kim De Mey 2020-06-30 13:35:15 +02:00 committed by GitHub
parent 0d591c6423
commit 9a467225fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 142 additions and 57 deletions

View File

@ -1,6 +1,6 @@
import
sequtils, options, strutils, chronos, chronicles, chronicles/topics_registry,
stew/byteutils, stew/shims/net, confutils,
stew/byteutils, confutils, confutils/std/net, metrics,
eth/keys, eth/trie/db, eth/net/nat,
eth/p2p/discoveryv5/[protocol, discovery_db, enr, node]
@ -30,6 +30,26 @@ type
"Must be one of: any, none, upnp, pmp, extip:<IP>."
defaultValue: "any" .}: string
nodeKey* {.
desc: "P2P node private key as hex.",
defaultValue: PrivateKey.random().expect("Properly intialized private key")
name: "nodekey" .}: PrivateKey
metricsEnabled* {.
defaultValue: false
desc: "Enable the metrics server."
name: "metrics" .}: bool
metricsAddress* {.
defaultValue: ValidIpAddress.init("127.0.0.1")
desc: "Listening address of the metrics server."
name: "metrics-address" .}: ValidIpAddress
metricsPort* {.
defaultValue: 8008
desc: "Listening HTTP port of the metrics server."
name: "metrics-port" .}: Port
case cmd* {.
command
defaultValue: noCommand }: DiscoveryCmd
@ -76,6 +96,15 @@ proc parseCmdArg*(T: type Node, p: TaintedString): T =
proc completeCmdArg*(T: type Node, val: TaintedString): seq[string] =
return @[]
proc parseCmdArg*(T: type PrivateKey, p: TaintedString): T =
try:
result = PrivateKey.fromHex(string(p)).tryGet()
except CatchableError as e:
raise newException(ConfigurationError, "Invalid private key")
proc completeCmdArg*(T: type PrivateKey, val: TaintedString): seq[string] =
return @[]
proc setupNat(conf: DiscoveryConf): tuple[ip: Option[ValidIpAddress],
tcpPort: Port,
udpPort: Port] {.gcsafe.} =
@ -116,14 +145,21 @@ proc setupNat(conf: DiscoveryConf): tuple[ip: Option[ValidIpAddress],
proc run(config: DiscoveryConf) =
let
(ip, tcpPort, udpPort) = setupNat(config)
privKey = PrivateKey.random().expect("Properly intialized private key")
ddb = DiscoveryDB.init(newMemoryDB())
# TODO: newProtocol should allow for no tcpPort
d = newProtocol(privKey, ddb, ip, tcpPort, udpPort,
d = newProtocol(config.nodeKey, ddb, ip, tcpPort, udpPort,
bootstrapRecords = config.bootnodes)
d.open()
when defined(insecure):
if config.metricsEnabled:
let
address = config.metricsAddress
port = config.metricsPort
info "Starting metrics HTTP server", address, port
metrics.startHttpServer($address, port)
case config.cmd
of ping:
let pong = waitFor d.ping(config.pingTarget)

View File

@ -16,6 +16,8 @@ type
pubkey*: PublicKey
address*: Option[Address]
record*: Record
seen*: bool ## Indicates if there was at least one successful
## request-response with this node.
proc toNodeId*(pk: PublicKey): NodeId =
readUintBE[256](keccak256.digest(pk.toRaw()).data)

View File

@ -76,7 +76,7 @@ import
std/[tables, sets, options, math, random],
stew/shims/net as stewNet, json_serialization/std/net,
stew/[byteutils, endians2], chronicles, chronos, stint,
eth/[rlp, keys], types, encoding, node, routing_table, enr
eth/[rlp, keys, async_utils], types, encoding, node, routing_table, enr
import nimcrypto except toHex
@ -95,9 +95,11 @@ const
lookupInterval = 60.seconds ## Interval of launching a random lookup to
## populate the routing table. go-ethereum seems to do 3 runs every 30
## minutes. Trinity starts one every minute.
revalidateMax = 1000 ## Revalidation of a peer is done between 0 and this
## value in milliseconds
handshakeTimeout* = 2.seconds ## timeout for the reply on the
## whoareyou message
responseTimeout* = 2.seconds ## timeout for the response of a request-response
responseTimeout* = 4.seconds ## timeout for the response of a request-response
## call
magicSize = 32 ## size of the magic which is the start of the whoareyou
## message
@ -303,7 +305,7 @@ proc handleFindNode(d: Protocol, fromId: NodeId, fromAddr: Address,
else:
let distance = min(fn.distance, 256)
d.sendNodes(fromId, fromAddr, reqId,
d.routingTable.neighboursAtDistance(distance))
d.routingTable.neighboursAtDistance(distance, seenOnly = true))
proc receive*(d: Protocol, a: Address, packet: openArray[byte]) {.gcsafe,
raises: [
@ -455,6 +457,22 @@ proc validIp(sender, address: IpAddress): bool {.raises: [Defect].} =
# https://www.iana.org/assignments/iana-ipv6-special-registry/iana-ipv6-special-registry.xhtml
return true
proc replaceNode(d: Protocol, n: Node) =
if n.record notin d.bootstrapRecords:
d.routingTable.replaceNode(n)
# Remove shared secrets when removing the node from routing table.
# TODO: This might be to direct, so we could keep these longer. But better
# would be to simply not remove the nodes immediatly but use an LRU cache.
# Also because some shared secrets will be with nodes not eligable for
# the routing table, and these don't get deleted now, see issue:
# https://github.com/status-im/nim-eth/issues/242
discard d.codec.db.deleteKeys(n.id, n.address.get())
else:
# For now we never remove bootstrap nodes. It might make sense to actually
# do so and to retry them only in case we drop to a really low amount of
# peers in the routing table.
debug "Revalidation of bootstrap node failed", enr = toURI(n.record)
# TODO: This could be improved to do the clean-up immediatily in case a non
# whoareyou response does arrive, but we would need to store the AuthTag
# somewhere
@ -497,12 +515,12 @@ proc waitNodes(d: Protocol, fromNode: Node, reqId: RequestId):
if op.isSome and op.get.kind == nodes:
res.addNodesFromENRs(op.get.nodes.enrs)
else:
# No error on this as we received some nodes.
break
return ok(res)
else:
return err("Nodes message not received in time")
proc sendMessage*[T: SomeMessage](d: Protocol, toNode: Node, m: T):
DiscResult[RequestId] {.raises: [Exception, Defect].} =
doAssert(toNode.address.isSome())
@ -524,8 +542,10 @@ proc ping*(d: Protocol, toNode: Node):
let resp = await d.waitMessage(toNode, reqId[])
if resp.isSome() and resp.get().kind == pong:
d.routingTable.setJustSeen(toNode)
return ok(resp.get().pong)
else:
d.replaceNode(toNode)
return err("Pong message not received in time")
proc findNode*(d: Protocol, toNode: Node, distance: uint32):
@ -538,12 +558,18 @@ proc findNode*(d: Protocol, toNode: Node, distance: uint32):
if nodes.isOk:
var res = newSeq[Node]()
for n in nodes[]:
# Check if the node has an address and if the address is public or from
# the same local network or lo network as the sender. The latter allows
# for local testing.
# Any port is allowed, also the so called "well-known" ports.
if n.address.isSome() and
validIp(toNode.address.get().ip, n.address.get().ip):
res.add(n)
# TODO: Check ports
d.routingTable.setJustSeen(toNode)
return ok(res)
else:
d.replaceNode(toNode)
return err(nodes.error)
proc lookupDistances(target, dest: NodeId): seq[uint32] {.raises: [Defect].} =
@ -577,7 +603,8 @@ proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]]
## Perform a lookup for the given target, return the closest n nodes to the
## target. Maximum value for n is `BUCKET_SIZE`.
# TODO: Sort the returned nodes on distance
result = d.routingTable.neighbours(target, BUCKET_SIZE)
# Also use unseen nodes as a form of validation.
result = d.routingTable.neighbours(target, BUCKET_SIZE, seenOnly = false)
var asked = initHashSet[NodeId]()
asked.incl(d.localNode.id)
var seen = asked
@ -646,9 +673,10 @@ proc resolve*(d: Protocol, id: NodeId): Future[Option[Node]]
else:
return some(n)
return node
proc revalidateNode*(d: Protocol, n: Node)
{.async, raises: [Exception, Defect].} = # TODO: Exception
trace "Ping to revalidate node", node = $n
let pong = await d.ping(n)
if pong.isOK():
@ -656,47 +684,25 @@ proc revalidateNode*(d: Protocol, n: Node)
# TODO: Request new ENR
discard
d.routingTable.setJustSeen(n)
trace "Revalidated node", node = $n
else:
# TODO: Handle failures better. E.g. don't remove nodes on different
# failures than timeout
# For now we never remove bootstrap nodes. It might make sense to actually
# do so and to retry them only in case we drop to a really low amount of
# peers in the DHT
if n.record notin d.bootstrapRecords:
trace "Revalidation of node failed, removing node", record = n.record
d.routingTable.replaceNode(n)
# Remove shared secrets when removing the node from routing table.
# This might be to direct, so we could keep these longer. But better
# would be to simply not remove the nodes immediatly but only after x
# amount of failures.
doAssert(n.address.isSome())
discard d.codec.db.deleteKeys(n.id, n.address.get())
else:
debug "Revalidation of bootstrap node failed", enr = toURI(n.record)
proc revalidateLoop(d: Protocol) {.async, raises: [Exception, Defect].} =
# TODO: General Exception raised.
try:
randomize()
while true:
await sleepAsync(rand(10 * 1000).milliseconds)
await sleepAsync(rand(revalidateMax).milliseconds)
let n = d.routingTable.nodeToRevalidate()
if not n.isNil:
# TODO: Should we do these in parallel and/or async to be certain of how
# often nodes are revalidated?
await d.revalidateNode(n)
traceAsyncErrors d.revalidateNode(n)
except CancelledError:
trace "revalidateLoop canceled"
proc lookupLoop(d: Protocol) {.async, raises: [Exception, Defect].} =
# TODO: General Exception raised.
try:
while true:
# lookup self (neighbour nodes)
let selfLookup = await d.lookup(d.localNode.id)
trace "Discovered nodes in self lookup", nodes = $selfLookup
while true:
let randomLookup = await d.lookupRandom()
if randomLookup.isOK:
trace "Discovered nodes in random lookup", nodes = $randomLookup[]
@ -733,7 +739,7 @@ proc newProtocol*(privKey: PrivateKey, db: Database,
codec: Codec(localNode: node, privKey: privKey, db: db),
bootstrapRecords: @bootstrapRecords)
result.routingTable.init(node)
result.routingTable.init(node, 5)
proc open*(d: Protocol) {.raises: [Exception, Defect].} =
info "Starting discovery node", node = $d.localNode,

View File

@ -1,12 +1,15 @@
import
std/[algorithm, times, sequtils, bitops, random, sets, options],
stint, chronicles,
stint, chronicles, metrics,
node
export options
{.push raises: [Defect].}
declarePublicGauge routing_table_nodes,
"Discovery routing table nodes", labels = ["state"]
type
RoutingTable* = object
thisNode: Node
@ -108,6 +111,7 @@ proc add(k: KBucket, n: Node): Node =
return nil
elif k.len < BUCKET_SIZE:
k.nodes.add(n)
routing_table_nodes.inc()
return nil
else:
return k.tail
@ -130,7 +134,9 @@ proc addReplacement(k: KBucket, n: Node) =
proc removeNode(k: KBucket, n: Node) =
let i = k.nodes.find(n)
if i != -1: k.nodes.delete(i)
if i != -1:
k.nodes.delete(i)
routing_table_nodes.dec()
proc split(k: KBucket): tuple[lower, upper: KBucket] =
## Split at the median id
@ -139,7 +145,7 @@ proc split(k: KBucket): tuple[lower, upper: KBucket] =
result.upper = newKBucket(splitid + 1.u256, k.iend)
for node in k.nodes:
let bucket = if node.id <= splitid: result.lower else: result.upper
discard bucket.add(node)
bucket.nodes.add(node)
for node in k.replacementCache:
let bucket = if node.id <= splitid: result.lower else: result.upper
bucket.replacementCache.add(node)
@ -243,9 +249,14 @@ proc replaceNode*(r: var RoutingTable, n: Node) =
let b = r.bucketForNode(n.id)
let idx = b.nodes.find(n)
if idx != -1:
routing_table_nodes.dec()
if b.nodes[idx].seen:
routing_table_nodes.dec(labelValues = ["seen"])
b.nodes.delete(idx)
if b.replacementCache.len > 0:
b.nodes.add(b.replacementCache[high(b.replacementCache)])
routing_table_nodes.inc()
b.replacementCache.delete(high(b.replacementCache))
proc getNode*(r: RoutingTable, id: NodeId): Option[Node] =
@ -259,12 +270,15 @@ proc contains*(r: RoutingTable, n: Node): bool = n in r.bucketForNode(n.id)
proc bucketsByDistanceTo(r: RoutingTable, id: NodeId): seq[KBucket] =
sortedByIt(r.buckets, it.distanceTo(id))
proc neighbours*(r: RoutingTable, id: NodeId, k: int = BUCKET_SIZE): seq[Node] =
proc neighbours*(r: RoutingTable, id: NodeId, k: int = BUCKET_SIZE,
seenOnly = false): seq[Node] =
## Return up to k neighbours of the given node.
result = newSeqOfCap[Node](k * 2)
block addNodes:
for bucket in r.bucketsByDistanceTo(id):
for n in bucket.nodesByDistanceTo(id):
# Only provide actively seen nodes when `seenOnly` set
if not seenOnly or n.seen:
result.add(n)
if result.len == k * 2:
break addNodes
@ -284,8 +298,8 @@ proc idAtDistance*(id: NodeId, dist: uint32): NodeId =
id xor (1.stuint(256) shl (dist.int - 1))
proc neighboursAtDistance*(r: RoutingTable, distance: uint32,
k: int = BUCKET_SIZE): seq[Node] =
result = r.neighbours(idAtDistance(r.thisNode.id, distance), k)
k: int = BUCKET_SIZE, seenOnly = false): seq[Node] =
result = r.neighbours(idAtDistance(r.thisNode.id, distance), k, seenOnly)
# This is a bit silly, first getting closest nodes then to only keep the ones
# that are exactly the requested distance.
keepIf(result, proc(n: Node): bool = logDist(n.id, r.thisNode.id) == distance)
@ -311,6 +325,10 @@ proc setJustSeen*(r: RoutingTable, n: Node) =
b.nodes.moveRight(0, idx - 1)
b.lastUpdated = epochTime()
if not n.seen:
b.nodes[0].seen = true
routing_table_nodes.inc(labelValues = ["seen"])
proc nodeToRevalidate*(r: RoutingTable): Node =
## Return a node to revalidate. The least recently seen node from a random
## bucket is selected.

View File

@ -53,3 +53,8 @@ proc nodeAtDistance*(n: Node, d: uint32): Node =
proc nodesAtDistance*(n: Node, d: uint32, amount: int): seq[Node] =
for i in 0..<amount:
result.add(nodeAtDistance(n, d))
proc addSeenNode*(d: discv5_protocol.Protocol, n: Node): bool =
# Add it as a seen node, warning: for testing convenience only!
n.seen = true
d.addNode(n)

View File

@ -183,7 +183,11 @@ procSuite "Discovery v5 Tests":
let nodes = nodesAtDistance(mainNode.localNode, dist, 10)
for n in nodes:
discard mainNode.addNode(n)
discard mainNode.addSeenNode(n) # for testing only!
# ping in one direction to add, ping in the other to update seen.
check (await testNode.ping(mainNode.localNode)).isOk()
check (await mainNode.ping(testNode.localNode)).isOk()
# Get ENR of the node itself
var discovered =
@ -192,7 +196,6 @@ procSuite "Discovery v5 Tests":
discovered.isOk
discovered[].len == 1
discovered[][0] == mainNode.localNode
# Get ENRs of nodes added at provided logarithmic distance
discovered =
await discv5_protocol.findNode(testNode, mainNode.localNode, dist)
@ -217,7 +220,7 @@ procSuite "Discovery v5 Tests":
let moreNodes = nodesAtDistance(mainNode.localNode, dist, 10)
for n in moreNodes:
discard mainNode.addNode(n)
discard mainNode.addSeenNode(n) # for testing only!
# Full bucket
discovered =
@ -234,7 +237,7 @@ procSuite "Discovery v5 Tests":
# Generate 1000 random nodes and add to our main node's routing table
for i in 0..<1000:
discard mainNode.addNode(generateNode())
discard mainNode.addSeenNode(generateNode()) # for testing only!
let
neighbours = mainNode.neighbours(mainNode.localNode.id)
@ -267,6 +270,14 @@ procSuite "Discovery v5 Tests":
for i in 1 ..< nodeCount:
nodes.add(initDiscoveryNode(PrivateKey.random()[], localAddress(20301 + i),
@[bootNode.localNode.record]))
# Make sure all nodes have "seen" each other by forcing pings
for n in nodes:
for t in nodes:
if n != t:
check (await n.ping(t.localNode)).isOk()
for i in 1 ..< nodeCount:
nodes[i].start()
for i in 0..<nodeCount-1:
@ -305,12 +316,16 @@ procSuite "Discovery v5 Tests":
# Bring target back online, update seqNum in ENR, check if we get the
# updated ENR.
block:
targetNode.open()
# ping to node again to add as it was removed after failed findNode in
# resolve in previous test block
let pong = await targetNode.ping(mainNode.localNode)
check pong.isOk()
# TODO: need to add some logic to update ENRs properly
targetSeqNum.inc()
let r = enr.Record.init(targetSeqNum, targetKey,
some(targetAddress.ip), targetAddress.port, targetAddress.port)[]
targetNode.localNode.record = r
targetNode.open()
let n = await mainNode.resolve(targetId)
check:
n.isSome()
@ -321,18 +336,21 @@ procSuite "Discovery v5 Tests":
# close targetNode, resolve should lookup, check if we get updated ENR.
block:
targetSeqNum.inc()
let r = enr.Record.init(3, targetKey, some(targetAddress.ip),
let r = enr.Record.init(targetSeqNum, targetKey, some(targetAddress.ip),
targetAddress.port, targetAddress.port)[]
targetNode.localNode.record = r
let pong = await targetNode.ping(lookupNode.localNode)
check pong.isOk()
# ping node so that its ENR gets added
check (await targetNode.ping(lookupNode.localNode)).isOk()
# ping node so that it becomes "seen" and thus will be forwarded on a
# findNode request
check (await lookupNode.ping(targetNode.localNode)).isOk()
await targetNode.closeWait()
# TODO: This step should eventually not be needed and ENRs with new seqNum
# should just get updated in the lookup.
await mainNode.revalidateNode(targetNode.localNode)
check mainNode.addNode(lookupNode.localNode.record)
check mainNode.addNode(lookupNode.localNode)
let n = await mainNode.resolve(targetId)
check:
n.isSome()