Comments and cleanup (#276)

* Fix dcli + add more comments

* Fix pong enr sequence number + varia cleanup

* Send randomData in case no handshake was done yet. Fix #277
This commit is contained in:
Kim De Mey 2020-07-12 17:25:18 +02:00 committed by GitHub
parent 28a8d52308
commit 0888667ac0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 129 additions and 56 deletions

View File

@ -1,8 +1,8 @@
import
sequtils, options, strutils, chronos, chronicles, chronicles/topics_registry,
stew/byteutils, confutils, confutils/std/net, metrics,
eth/keys, eth/trie/db, eth/net/nat,
eth/p2p/discoveryv5/[protocol, discovery_db, enr, node]
std/[options, strutils],
chronos, chronicles, chronicles/topics_registry, confutils, metrics,
stew/byteutils, confutils/std/net,
eth/keys, eth/trie/db, eth/net/nat, protocol, discovery_db, enr, node
type
DiscoveryCmd* = enum
@ -32,7 +32,7 @@ type
nodeKey* {.
desc: "P2P node private key as hex.",
defaultValue: PrivateKey.random().expect("Properly intialized private key")
defaultValue: PrivateKey.random(keys.newRng()[])
name: "nodekey" .}: PrivateKey
metricsEnabled* {.

View File

@ -1,6 +1,7 @@
import
std/[tables, options], nimcrypto, stint, chronicles, stew/results,
types, node, enr, hkdf, eth/[rlp, keys], bearssl
std/[tables, options],
nimcrypto, stint, chronicles, stew/results, bearssl,
eth/[rlp, keys], types, node, enr, hkdf
export keys
@ -60,7 +61,8 @@ proc idNonceHash(nonce, ephkey: openarray[byte]): MDigest[256] =
ctx.update(idNoncePrefix)
ctx.update(nonce)
ctx.update(ephkey)
ctx.finish()
result = ctx.finish()
ctx.clear()
proc signIDNonce*(privKey: PrivateKey, idNonce, ephKey: openarray[byte]):
SignatureNR =
@ -95,6 +97,9 @@ proc encodeAuthHeader*(rng: var BrHmacDrbgContext,
nonce: array[gcmNonceSize, byte],
challenge: Whoareyou):
(seq[byte], HandshakeSecrets) =
## Encodes the auth-header, which is required for the packet in response to a
## WHOAREYOU packet. Requires the id-nonce and the enr-seq that were in the
## WHOAREYOU packet, and the public key of the node sending it.
var resp = AuthResponse(version: 5)
let ln = c.localNode
@ -139,34 +144,45 @@ proc encodePacket*(
message: openarray[byte],
challenge: Whoareyou):
(seq[byte], array[gcmNonceSize, byte]) =
## Encode a packet. This can be a regular packet or a packet in response to a
## WHOAREYOU packet. The latter is the case when the `challenge` parameter is
## provided.
var nonce: array[gcmNonceSize, byte]
brHmacDrbgGenerate(rng, nonce)
var headEnc: seq[byte]
var writeKey: AesKey
let tag = packetTag(toId, c.localNode.id)
var packet: seq[byte]
packet.add(tag)
if challenge.isNil:
headEnc = rlp.encode(nonce)
var readKey: AesKey
# Message packet or random packet
let headEnc = rlp.encode(nonce)
packet.add(headEnc)
# TODO: Should we change API to get just the key we need?
var writeKey, readKey: AesKey
# We might not have the node's keys if the handshake hasn't been performed
# yet. That's fine, we will be responded with whoareyou.
discard c.db.loadKeys(toId, toAddr, readKey, writeKey)
if c.db.loadKeys(toId, toAddr, readKey, writeKey):
packet.add(encryptGCM(writeKey, nonce, message, tag))
else:
# We might not have the node's keys if the handshake hasn't been performed
# yet. That's fine, we send a random-packet and we will be responded with
# a WHOAREYOU packet.
var randomData: array[44, byte]
brHmacDrbgGenerate(rng, randomData)
packet.add(randomData)
else:
var secrets: HandshakeSecrets
(headEnc, secrets) = encodeAuthHeader(rng, c, toId, nonce, challenge)
# Handshake
let (headEnc, secrets) = encodeAuthHeader(rng, c, toId, nonce, challenge)
packet.add(headEnc)
writeKey = secrets.writeKey
# TODO: is it safe to ignore the error here?
discard c.db.storeKeys(toId, toAddr, secrets.readKey, secrets.writeKey)
if not c.db.storeKeys(toId, toAddr, secrets.readKey, secrets.writeKey):
warn "Storing of keys for session failed, will have to redo a handshake"
let tag = packetTag(toId, c.localNode.id)
packet.add(encryptGCM(secrets.writeKey, nonce, message, tag))
var packet = newSeqOfCap[byte](tag.len + headEnc.len)
packet.add(tag)
packet.add(headEnc)
packet.add(encryptGCM(writeKey, nonce, message, tag))
(packet, nonce)
proc decryptGCM*(key: AesKey, nonce, ct, authData: openarray[byte]):
@ -188,14 +204,16 @@ proc decryptGCM*(key: AesKey, nonce, ct, authData: openarray[byte]):
return some(res)
proc decodeMessage(body: openarray[byte]):
DecodeResult[Message] {.raises:[Defect].} =
proc decodeMessage(body: openarray[byte]): DecodeResult[Message] =
## Decodes to the specific `Message` type.
if body.len < 1:
return err(PacketError)
if body[0] < MessageKind.low.byte or body[0] > MessageKind.high.byte:
return err(PacketError)
# This cast is covered by the above check (else we could get enum with invalid
# data!). However, can't we do this in a cleaner way?
let kind = cast[MessageKind](body[0])
var message = Message(kind: kind)
var rlp = rlpFromBytes(body.toOpenArray(1, body.high))
@ -228,8 +246,9 @@ proc decodeMessage(body: openarray[byte]):
err(PacketError)
proc decodeAuthResp*(c: Codec, fromId: NodeId, head: AuthHeader,
challenge: Whoareyou, newNode: var Node):
DecodeResult[HandshakeSecrets] {.raises:[Defect].} =
challenge: Whoareyou, newNode: var Node): DecodeResult[HandshakeSecrets] =
## Decrypts and decodes the auth-response, which is part of the auth-header.
## Requiers the id-nonce from the WHOAREYOU packet that was send.
if head.scheme != authSchemeName:
warn "Unknown auth scheme"
return err(HandshakeError)
@ -273,6 +292,8 @@ proc decodePacket*(c: var Codec,
input: openArray[byte],
authTag: var AuthTag,
newNode: var Node): DecodeResult[Message] =
## Decode a packet. This can be a regular packet or a packet in response to a
## WHOAREYOU packet. In case of the latter a `newNode` might be provided.
var r = rlpFromBytes(input.toOpenArray(tagSize, input.high))
var auth: AuthHeader
@ -307,8 +328,8 @@ proc decodePacket*(c: var Codec,
# Swap keys to match remote
swap(sec.readKey, sec.writeKey)
# TODO: is it safe to ignore the error here?
discard c.db.storeKeys(fromId, fromAddr, sec.readKey, sec.writeKey)
if not c.db.storeKeys(fromId, fromAddr, sec.readKey, sec.writeKey):
warn "Storing of keys for session failed, will have to redo a handshake"
readKey = sec.readKey
else:
# Message packet or random packet - rlp bytes (size 12) indicates auth-tag
@ -317,6 +338,7 @@ proc decodePacket*(c: var Codec,
except RlpError:
return err(PacketError)
auth.auth = authTag
# TODO: Should we change API to get just the key we need?
var writeKey: AesKey
if not c.db.loadKeys(fromId, fromAddr, readKey, writeKey):
trace "Decoding failed (no keys)"

View File

@ -2,8 +2,8 @@
# https://github.com/ethereum/EIPs/blob/master/EIPS/eip-778.md
import
strutils, macros, algorithm, options,
stew/shims/net, nimcrypto, stew/base64,
std/[strutils, macros, algorithm, options],
stew/shims/net, stew/base64, nimcrypto,
eth/[rlp, keys]
export options
@ -188,6 +188,9 @@ proc requireKind(f: Field, kind: FieldKind) {.raises: [ValueError].} =
raise newException(ValueError, "Wrong field kind")
proc get*(r: Record, key: string, T: type): T {.raises: [ValueError, Defect].} =
## Get the value from the provided key.
## Throw `KeyError` if key does not exist.
## Throw `ValueError` if the value is invalid according to type `T`.
var f: Field
if r.getField(key, f):
when T is SomeInteger:
@ -219,6 +222,8 @@ proc get*(r: Record, key: string, T: type): T {.raises: [ValueError, Defect].} =
raise newException(KeyError, "Key not found in ENR: " & key)
proc get*(r: Record, T: type PublicKey): Option[T] =
## Get the `PublicKey` from provided `Record`. Return `none` when there is
## no `PublicKey` in the record.
var pubkeyField: Field
if r.getField("secp256k1", pubkeyField) and pubkeyField.kind == kBytes:
let pk = PublicKey.fromRaw(pubkeyField.bytes)
@ -295,6 +300,9 @@ proc update*(r: var Record, pk: PrivateKey,
r.update(pk, fields)
proc tryGet*(r: Record, key: string, T: type): Option[T] =
## Get the value from the provided key.
## Return `none` if the key does not exist or if the value is invalid
## according to type `T`.
try:
return some get(r, key, T)
except ValueError:
@ -400,7 +408,7 @@ proc fromBytesAux(r: var Record): bool {.raises: [RlpError, Defect].} =
verifySignature(r)
proc fromBytes*(r: var Record, s: openarray[byte]): bool =
## Loads ENR from rlp-encoded bytes, and validated the signature.
## Loads ENR from rlp-encoded bytes, and validates the signature.
r.raw = @s
try:
result = fromBytesAux(r)
@ -408,7 +416,7 @@ proc fromBytes*(r: var Record, s: openarray[byte]): bool =
discard
proc fromBase64*(r: var Record, s: string): bool =
## Loads ENR from base64-encoded rlp-encoded bytes, and validated the
## Loads ENR from base64-encoded rlp-encoded bytes, and validates the
## signature.
try:
r.raw = Base64Url.decode(s)
@ -418,7 +426,7 @@ proc fromBase64*(r: var Record, s: string): bool =
proc fromURI*(r: var Record, s: string): bool =
## Loads ENR from its text encoding: base64-encoded rlp-encoded bytes,
## prefixed with "enr:".
## prefixed with "enr:". Validates the signature.
const prefix = "enr:"
if s.startsWith(prefix):
result = r.fromBase64(s[prefix.len .. ^1])

View File

@ -1,5 +1,6 @@
import
std/hashes, nimcrypto, stint, chronos, stew/shims/net,
std/hashes,
nimcrypto, stint, chronos, stew/shims/net,
eth/keys, enr
{.push raises: [Defect].}
@ -20,17 +21,21 @@ type
## request-response with this node.
proc toNodeId*(pk: PublicKey): NodeId =
## Convert public key to a node identifier.
readUintBE[256](keccak256.digest(pk.toRaw()).data)
proc newNode*(r: Record): Result[Node, cstring] =
## Create a new `Node` from a `Record`.
# TODO: Handle IPv6
let pk = r.get(PublicKey)
# This check is redundant as the deserialisation of `Record` will already fail
# at `verifySignature` if there is no public key
# This check is redundant for a properly created record as the deserialization
# of a record will fail at `verifySignature` if there is no public key.
if pk.isNone():
return err("Could not recover public key from ENR")
# Also this can not fail for a properly created record as id is checked upon
# deserialization.
let tr = ? r.toTypedRecord()
if tr.ip.isSome() and tr.udp.isSome():
let a = Address(ip: ipv4(tr.ip.get()), port: Port(tr.udp.get()))

View File

@ -73,9 +73,9 @@
## This might be a concern for mobile devices.
import
std/[tables, sets, options, math, random, sequtils], bearssl,
std/[tables, sets, options, math, random, sequtils],
stew/shims/net as stewNet, json_serialization/std/net,
stew/[byteutils, endians2], chronicles, chronos, stint,
stew/[byteutils, endians2], chronicles, chronos, stint, bearssl,
eth/[rlp, keys, async_utils], types, encoding, node, routing_table, enr
import nimcrypto except toHex
@ -129,23 +129,35 @@ type
DiscResult*[T] = Result[T, cstring]
proc addNode*(d: Protocol, node: Node): bool =
## Add `Node` to discovery routing table.
##
## Returns false only if `Node` is not eligable for adding (no Address).
if node.address.isSome():
# Only add nodes with an address to the routing table
discard d.routingTable.addNode(node)
return true
proc addNode*(d: Protocol, r: Record): bool =
## Add `Node` from a `Record` to discovery routing table.
##
## Returns false only if no valid `Node` can be created from the `Record` or
## on the conditions of `addNode` from a `Node`.
let node = newNode(r)
if node.isOk():
return d.addNode(node[])
proc addNode*(d: Protocol, enr: EnrUri): bool =
## Add `Node` from a ENR URI to discovery routing table.
##
## Returns false if no valid ENR URI, or on the conditions of `addNode` from
## an `Record`.
var r: Record
let res = r.fromUri(enr)
if res:
return d.addNode(r)
proc getNode*(d: Protocol, id: NodeId): Option[Node] =
## Get the node with id from the routing table.
d.routingTable.getNode(id)
proc randomNodes*(d: Protocol, maxAmount: int): seq[Node] =
@ -165,6 +177,7 @@ proc randomNodes*(d: Protocol, maxAmount: int,
d.randomNodes(maxAmount, proc(x: Node): bool = x.record.contains(enrField))
proc neighbours*(d: Protocol, id: NodeId, k: int = BUCKET_SIZE): seq[Node] =
## Return up to k neighbours (closest node ids) of the given node id.
d.routingTable.neighbours(id, k)
proc nodesDiscovered*(d: Protocol): int {.inline.} = d.routingTable.len
@ -173,10 +186,12 @@ func privKey*(d: Protocol): lent PrivateKey =
d.privateKey
func getRecord*(d: Protocol): Record =
## Get the ENR of the local node.
d.localNode.record
proc updateRecord*(
d: Protocol, enrFields: openarray[(string, seq[byte])]): DiscResult[void] =
## Update the ENR of the local node with provided `enrFields` k:v pairs.
let fields = mapIt(enrFields, toFieldPair(it[0], it[1]))
d.localNode.record.update(d.privateKey, fields)
# TODO: Would it make sense to actively ping ("broadcast") to all the peers
@ -200,7 +215,8 @@ proc send(d: Protocol, a: Address, data: seq[byte]) =
# because of ping failures due to own network connection failure.
debug "Discovery send failed", msg = f.readError.msg
except Exception as e:
# TODO: General exception still being raised from Chronos.
# TODO: General exception still being raised from Chronos, but in practice
# all CatchableErrors should be grabbed by the above `f.failed`.
if e of Defect:
raise (ref Defect)(e)
else: doAssert(false)
@ -245,7 +261,7 @@ proc sendWhoareyou(d: Protocol, address: Address, toNode: NodeId,
# the handshake of another node.
let key = HandShakeKey(nodeId: toNode, address: $address)
if not d.codec.handshakes.hasKeyOrPut(key, challenge):
# TODO: raises: [Exception]
# TODO: raises: [Exception], but it shouldn't.
sleepAsync(handshakeTimeout).addCallback() do(data: pointer):
# TODO: should we still provide cancellation in case handshake completes
# correctly?
@ -291,7 +307,7 @@ proc handlePing(d: Protocol, fromId: NodeId, fromAddr: Address,
ping: PingMessage, reqId: RequestId) =
let a = fromAddr
var pong: PongMessage
pong.enrSeq = ping.enrSeq
pong.enrSeq = d.localNode.record.seqNum
pong.ip = case a.ip.family
of IpAddressFamily.IPv4: @(a.ip.address_v4)
of IpAddressFamily.IPv6: @(a.ip.address_v6)
@ -315,7 +331,8 @@ proc receive*(d: Protocol, a: Address, packet: openArray[byte]) {.gcsafe,
raises: [
Defect,
# This just comes now from a future.complete() and `sendWhoareyou` which
# has it because of `sleepAsync` with `addCallback`
# has it because of `sleepAsync` with `addCallback`, but practically, no
# CatchableError should be raised here, we just can't enforce it for now.
Exception
].} =
if packet.len < tagSize: # or magicSize, can be either
@ -400,7 +417,9 @@ proc receive*(d: Protocol, a: Address, packet: openArray[byte]) {.gcsafe,
# as async procs always require `Exception` in the raises pragma, see also:
# https://github.com/status-im/nim-chronos/issues/98
# So I don't bother for now and just add them in the raises pragma until this
# gets fixed.
# gets fixed. It does not mean that we expect these calls to be raising
# CatchableErrors, in fact, we really don't, but hey, they might, considering we
# can't enforce it.
proc processClient(transp: DatagramTransport, raddr: TransportAddress):
Future[void] {.async, gcsafe, raises: [Exception, Defect].} =
let proto = getUserData[Protocol](transp)
@ -463,7 +482,7 @@ proc replaceNode(d: Protocol, n: Node) =
# For now we never remove bootstrap nodes. It might make sense to actually
# do so and to retry them only in case we drop to a really low amount of
# peers in the routing table.
debug "Revalidation of bootstrap node failed", enr = toURI(n.record)
debug "Message request to bootstrap node failed", enr = toURI(n.record)
# TODO: This could be improved to do the clean-up immediatily in case a non
# whoareyou response does arrive, but we would need to store the AuthTag
@ -527,6 +546,9 @@ proc sendMessage*[T: SomeMessage](d: Protocol, toNode: Node, m: T):
proc ping*(d: Protocol, toNode: Node):
Future[DiscResult[PongMessage]] {.async, raises: [Exception, Defect].} =
## Send a discovery ping message.
##
## Returns the received pong message or an error.
let reqId = d.sendMessage(toNode,
PingMessage(enrSeq: d.localNode.record.seqNum))
let resp = await d.waitMessage(toNode, reqId)
@ -540,6 +562,10 @@ proc ping*(d: Protocol, toNode: Node):
proc findNode*(d: Protocol, toNode: Node, distance: uint32):
Future[DiscResult[seq[Node]]] {.async, raises: [Exception, Defect].} =
## Send a discovery findNode message.
##
## Returns the received nodes or an error.
## Received ENRs are already validated and converted to `Node`.
let reqId = d.sendMessage(toNode, FindNodeMessage(distance: distance))
let nodes = await d.waitNodes(toNode, reqId)

View File

@ -1,7 +1,7 @@
import
std/[algorithm, times, sequtils, bitops, random, sets, options],
stint, chronicles, metrics,
node
node, enr
export options
@ -38,8 +38,9 @@ type
lastUpdated: float ## epochTime of last update to `nodes` in the KBucket.
const
BUCKET_SIZE* = 16
REPLACEMENT_CACHE_SIZE* = 8
BUCKET_SIZE* = 16 ## Maximum amount of nodes per bucket
REPLACEMENT_CACHE_SIZE* = 8 ## Maximum amount of nodes per replacement cache
## of a bucket
ID_SIZE = 256
proc distanceTo(n: Node, id: NodeId): UInt256 =
@ -104,7 +105,7 @@ proc add(k: KBucket, n: Node): Node =
## However, in discovery v5 it can be that a node is added after a incoming
## request, and considering a handshake that needs to be done, it is likely
## that this node is reachable. An additional `addSeen` proc could be created
## for this,
## for this.
k.lastUpdated = epochTime()
let nodeIdx = k.nodes.find(n)
if nodeIdx != -1:
@ -139,7 +140,7 @@ proc removeNode(k: KBucket, n: Node) =
routing_table_nodes.dec()
proc split(k: KBucket): tuple[lower, upper: KBucket] =
## Split at the median id
## Split the kbucket `k` at the median id.
let splitid = k.midpoint
result.lower = newKBucket(k.istart, splitid)
result.upper = newKBucket(splitid + 1.u256, k.iend)
@ -186,10 +187,12 @@ proc computeSharedPrefixBits(nodes: openarray[NodeId]): int =
for n in nodes:
echo n.toHex()
# Reaching this would mean that all node ids are equal
# Reaching this would mean that all node ids are equal.
doAssert(false, "Unable to calculate number of shared prefix bits")
proc init*(r: var RoutingTable, thisNode: Node, bitsPerHop = 8) {.inline.} =
proc init*(r: var RoutingTable, thisNode: Node, bitsPerHop = 5) {.inline.} =
## Initialize the routing table for provided `Node` and bitsPerHop value.
## `bitsPerHop` is default set to 5 as recommended by original Kademlia paper.
r.thisNode = thisNode
r.buckets = @[newKBucket(0.u256, high(Uint256))]
r.bitsPerHop = bitsPerHop
@ -260,24 +263,29 @@ proc replaceNode*(r: var RoutingTable, n: Node) =
b.replacementCache.delete(high(b.replacementCache))
proc getNode*(r: RoutingTable, id: NodeId): Option[Node] =
## Get the `Node` with `id` as `NodeId` from the routing table.
## If no node with provided node id can be found,`none` is returned .
let b = r.bucketForNode(id)
for n in b.nodes:
if n.id == id:
return some(n)
proc contains*(r: RoutingTable, n: Node): bool = n in r.bucketForNode(n.id)
# Check if the routing table contains node `n`.
proc bucketsByDistanceTo(r: RoutingTable, id: NodeId): seq[KBucket] =
sortedByIt(r.buckets, it.distanceTo(id))
proc neighbours*(r: RoutingTable, id: NodeId, k: int = BUCKET_SIZE,
seenOnly = false): seq[Node] =
## Return up to k neighbours of the given node.
## Return up to k neighbours of the given node id.
## When seenOnly is set to true, only nodes that have been contacted
## previously successfully will be selected.
result = newSeqOfCap[Node](k * 2)
block addNodes:
for bucket in r.bucketsByDistanceTo(id):
for n in bucket.nodesByDistanceTo(id):
# Only provide actively seen nodes when `seenOnly` set
# Only provide actively seen nodes when `seenOnly` set.
if not seenOnly or n.seen:
result.add(n)
if result.len == k * 2:
@ -299,6 +307,7 @@ proc idAtDistance*(id: NodeId, dist: uint32): NodeId =
proc neighboursAtDistance*(r: RoutingTable, distance: uint32,
k: int = BUCKET_SIZE, seenOnly = false): seq[Node] =
## Return up to k neighbours at given logarithmic distance.
result = r.neighbours(idAtDistance(r.thisNode.id, distance), k, seenOnly)
# This is a bit silly, first getting closest nodes then to only keep the ones
# that are exactly the requested distance.
@ -343,6 +352,8 @@ proc nodeToRevalidate*(r: RoutingTable): Node =
proc randomNodes*(r: RoutingTable, maxAmount: int,
pred: proc(x: Node): bool {.gcsafe, noSideEffect.} = nil): seq[Node] =
## Get a `maxAmount` of random nodes from the routing table with the `pred`
## predicate function applied as filter on the nodes selected.
var maxAmount = maxAmount
let sz = r.len
if maxAmount > sz:

View File

@ -1,5 +1,6 @@
import
hashes, stint, chronos,
std/hashes,
stint, chronos,
eth/[keys, rlp], enr, node
{.push raises: [Defect].}