Liveness checks

This commit is contained in:
Yuriy Glukhov 2019-12-23 19:21:11 +02:00 committed by zah
parent 5795054dbf
commit 129710d3e4
3 changed files with 94 additions and 15 deletions

View File

@ -1,4 +1,4 @@
import tables
import std/tables
import types, node, enr, hkdf, ../enode, eth/[rlp, keys], nimcrypto, stint
const
@ -204,7 +204,6 @@ proc decodeAuthResp(c: Codec, fromId: NodeId, head: AuthHeader, challenge: Whoar
proc decodeEncrypted*(c: var Codec, fromId: NodeID, fromAddr: Address, input: seq[byte], authTag: var array[12, byte], newNode: var Node, packet: var Packet): bool =
let input = input.toRange
var r = rlpFromBytes(input[32 .. ^1])
let authEndPos = r.currentElemEnd
var auth: AuthHeader
var readKey: array[16, byte]
if r.isList:

View File

@ -1,5 +1,5 @@
import
tables, sets, endians, options, math,
std/[tables, sets, endians, options, math, random],
stew/byteutils, eth/[rlp, keys], chronicles, chronos, stint,
../enode, types, encoding, node, routing_table, enr
@ -27,10 +27,10 @@ const
findnodeResultLimit = 15 # applies in FINDNODE handler
proc whoareyouMagic(toNode: NodeId): array[32, byte] =
let srcId = toNode.toByteArrayBE()
var data: seq[byte]
data.add(srcId)
for c in "WHOAREYOU": data.add(byte(c))
const prefix = "WHOAREYOU"
var data: array[prefix.len + sizeof(toNode), byte]
data[0 .. sizeof(toNode) - 1] = toNode.toByteArrayBE()
for i, c in prefix: data[sizeof(toNode) + i] = byte(c)
sha256.digest(data).data
proc newProtocol*(privKey: PrivateKey, db: Database, port: Port): Protocol =
@ -44,10 +44,9 @@ proc newProtocol*(privKey: PrivateKey, db: Database, port: Port): Protocol =
result.localNode = newNode(initENode(result.privateKey.getPublicKey(), a))
result.localNode.record = initRecord(12, result.privateKey, {"udp": int(a.udpPort), "ip": ipAddr})
let srcId = result.localNode.id.toByteArrayBE()
result.whoareyouMagic = whoareyouMagic(result.localNode.id)
result.idHash = sha256.digest(srcId).data
result.idHash = sha256.digest(result.localNode.id.toByteArrayBE).data
result.routingTable.init(result.localNode)
result.codec = Codec(localNode: result.localNode, privKey: result.privateKey, db: result.db)
@ -178,8 +177,8 @@ proc receive*(d: Protocol, a: Address, msg: Bytes) {.gcsafe.} =
echo "TODO: handle packet: ", packet.kind, " from ", node
else:
d.sendWhoareyou(a, sender, authTag)
echo "Could not decode, respond with whoareyou"
d.sendWhoareyou(a, sender, authTag)
except Exception as e:
echo "Exception: ", e.msg
@ -292,10 +291,38 @@ proc processClient(transp: DatagramTransport,
debug "Receive failed", err = getCurrentExceptionMsg()
raise
proc revalidateNode(p: Protocol, n: Node) {.async.} =
let reqId = newRequestId()
var ping: PingPacket
ping.enrSeq = p.localNode.record.sequenceNumber
let (data, nonce) = p.codec.encodeEncrypted(n, encodePacket(ping, reqId), challenge = nil)
p.pendingRequests[nonce] = PendingRequest(node: n, packet: data)
p.send(n, data)
let resp = await p.waitPacket(n, reqId)
if resp.isSome and resp.get.kind == pong:
let pong = resp.get.pong
if pong.enrSeq > n.record.sequenceNumber:
# TODO: Request new ENR
discard
p.routingTable.setJustSeen(n)
else:
if false: # TODO: if not bootnode:
p.routingTable.removeNode(n)
proc revalidateLoop(p: Protocol) {.async.} =
while true:
await sleepAsync(rand(10 * 1000).milliseconds)
let n = p.routingTable.nodeToRevalidate()
if not n.isNil:
await p.revalidateNode(n)
proc open*(d: Protocol) =
# TODO allow binding to specific IP / IPv6 / etc
let ta = initTAddress(IPv4_any(), d.localNode.node.address.udpPort)
d.transp = newDatagramTransport(processClient, udata = d, local = ta)
asyncCheck d.revalidateLoop() # TODO: This loop has to be terminated on close()
proc addNode*(d: Protocol, r: Record) =
discard d.routingTable.addNode(newNode(r))
@ -306,6 +333,11 @@ proc addNode*(d: Protocol, enr: EnrUri) =
doAssert(res)
discard d.routingTable.addNode(newNode(r))
proc randomNodes*(k: Protocol, count: int): seq[Node] =
k.routingTable.randomNodes(count)
proc nodesDiscovered*(k: Protocol): int {.inline.} = k.routingTable.len
when isMainModule:
import discovery_db
import eth/trie/db

View File

@ -1,4 +1,4 @@
import algorithm, times, sequtils, bitops
import std/[algorithm, times, sequtils, bitops, random, sets]
import types, node
import stint, chronicles
@ -140,7 +140,7 @@ proc splitBucket(r: var RoutingTable, index: int) =
proc bucketForNode(r: RoutingTable, id: NodeId): KBucket =
binaryGetBucketForNode(r.buckets, id)
proc removeNode(r: var RoutingTable, n: Node) =
proc removeNode*(r: var RoutingTable, n: Node) =
r.bucketForNode(n.id).removeNode(n)
proc addNode*(r: var RoutingTable, n: Node): Node =
@ -162,12 +162,12 @@ proc addNode*(r: var RoutingTable, n: Node): Node =
return evictionCandidate
proc getNode*(r: RoutingTable, id: NodeId): Node =
let b = binaryGetBucketForNode(r.buckets, id)
let b = r.bucketForNode(id)
for n in b.nodes:
if n.id == id:
return n
proc contains(r: RoutingTable, n: Node): bool = n in r.bucketForNode(n.id)
proc contains*(r: RoutingTable, n: Node): bool = n in r.bucketForNode(n.id)
proc bucketsByDistanceTo(r: RoutingTable, id: NodeId): seq[KBucket] =
sortedByIt(r.buckets, it.distanceTo(id))
@ -194,5 +194,53 @@ proc idAtDistance(id: NodeId, dist: uint32): NodeId =
proc neighboursAtDistance*(r: RoutingTable, distance: uint32, k: int = BUCKET_SIZE): seq[Node] =
r.neighbours(idAtDistance(r.thisNode.id, distance), k)
proc len(r: RoutingTable): int =
proc len*(r: RoutingTable): int =
for b in r.buckets: result += b.len
proc moveRight[T](arr: var openarray[T], a, b: int) {.inline.} =
## In `arr` move elements in range [a, b] right by 1.
var t: T
shallowCopy(t, arr[b + 1])
for i in countdown(b, a):
shallowCopy(arr[i + 1], arr[i])
shallowCopy(arr[a], t)
proc setJustSeen*(r: RoutingTable, n: Node) =
# Move `n` to front of its bucket
let b = r.bucketForNode(n.id)
let idx = b.nodes.find(n)
doAssert(idx >= 0)
if idx != 0:
b.nodes.moveRight(0, idx - 1)
b.nodes[0] = n
b.lastUpdated = epochTime()
proc nodeToRevalidate*(r: RoutingTable): Node =
var buckets = r.buckets
shuffle(buckets)
# TODO: Should we prioritize less-recently-updated buckets instead?
for b in buckets:
if b.len > 0:
return b.nodes[^1]
proc randomNodes*(r: RoutingTable, count: int): seq[Node] =
var count = count
let sz = r.len
if count > sz:
debug "Looking for peers", requested = count, present = sz
count = sz
result = newSeqOfCap[Node](count)
var seen = initHashSet[Node]()
# This is a rather inneficient way of randomizing nodes from all buckets, but even if we
# iterate over all nodes in the routing table, the time it takes would still be
# insignificant compared to the time it takes for the network roundtrips when connecting
# to nodes.
while len(seen) < count:
let bucket = sample(r.buckets)
if bucket.nodes.len != 0:
let node = sample(bucket.nodes)
if node notin seen:
result.add(node)
seen.incl(node)