Discv5 eh2 (#240)

* Discv5: More error handling improvements

- More results usage and raises pragma annotations
- Remove ENode related code and adjust Node object
- Misc.

* Add sendMessage and catch RlpError when decoding WhoAreYou

* Make the receive proc exception free

Except for `Exception` hah...

* Address review comments

* And another bunch of results and raises annotations

* Send Nodes Message also on 0 nodes and remove usage of broken require
This commit is contained in:
Kim De Mey 2020-05-28 10:19:36 +02:00 committed by GitHub
parent ff546d27c3
commit a110f091af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 381 additions and 267 deletions

View File

@ -1,6 +1,8 @@
import
std/net,
eth/trie/db, types, ../enode
std/net, stint, stew/endians2,
eth/trie/db, types, node
{.push raises: [Defect].}
type
DiscoveryDB* = ref object of Database
@ -19,16 +21,19 @@ const keySize = 1 + # unique triedb prefix (kNodeToKeys)
proc makeKey(id: NodeId, address: Address): array[keySize, byte] =
result[0] = byte(kNodeToKeys)
copyMem(addr result[1], unsafeAddr id, sizeof(id))
var pos = 1
result[pos ..< pos+sizeof(id)] = toBytes(id)
pos.inc(sizeof(id))
case address.ip.family
of IpAddressFamily.IpV4:
copyMem(addr result[sizeof(id) + 1], unsafeAddr address.ip.address_v4, sizeof(address.ip.address_v4))
result[pos ..< pos+sizeof(address.ip.address_v4)] = address.ip.address_v4
of IpAddressFamily.IpV6:
copyMem(addr result[sizeof(id) + 1], unsafeAddr address.ip.address_v6, sizeof(address.ip.address_v6))
copyMem(addr result[sizeof(id) + 1 + sizeof(address.ip.address_v6)], unsafeAddr address.udpPort, sizeof(address.udpPort))
result[pos..< pos+sizeof(address.ip.address_v6)] = address.ip.address_v6
pos.inc(sizeof(address.ip.address_v6))
result[pos ..< pos+sizeof(address.port)] = toBytes(address.port.uint16)
method storeKeys*(db: DiscoveryDB, id: NodeId, address: Address, r, w: AesKey):
bool {.raises: [Defect].} =
bool =
try:
var value: array[sizeof(r) + sizeof(w), byte]
value[0 .. 15] = r
@ -38,8 +43,8 @@ method storeKeys*(db: DiscoveryDB, id: NodeId, address: Address, r, w: AesKey):
except CatchableError:
return false
method loadKeys*(db: DiscoveryDB, id: NodeId, address: Address, r, w: var AesKey):
bool {.raises: [Defect].} =
method loadKeys*(db: DiscoveryDB, id: NodeId, address: Address,
r, w: var AesKey): bool =
try:
let res = db.backend.get(makeKey(id, address))
if res.len != sizeof(r) + sizeof(w):
@ -50,8 +55,7 @@ method loadKeys*(db: DiscoveryDB, id: NodeId, address: Address, r, w: var AesKey
except CatchableError:
return false
method deleteKeys*(db: DiscoveryDB, id: NodeId, address: Address):
bool {.raises: [Defect].} =
method deleteKeys*(db: DiscoveryDB, id: NodeId, address: Address): bool =
try:
db.backend.del(makeKey(id, address))
return true

View File

@ -1,6 +1,6 @@
import
std/[tables, options], nimcrypto, stint, chronicles, stew/results,
types, node, enr, hkdf, ../enode, eth/[rlp, keys]
types, node, enr, hkdf, eth/[rlp, keys]
export keys
@ -43,8 +43,8 @@ type
DecodeError* = enum
HandshakeError = "discv5: handshake failed"
PacketError = "discv5: invalid packet",
DecryptError = "discv5: decryption failed",
PacketError = "discv5: invalid packet"
DecryptError = "discv5: decryption failed"
UnsupportedMessage = "discv5: unsupported message"
DecodeResult*[T] = Result[T, DecodeError]
@ -253,14 +253,9 @@ proc decodeAuthResp(c: Codec, fromId: NodeId, head: AuthHeader,
# 2. Should verify ENR and check for correct id in case an ENR is included
# 3. Should verify id nonce signature
# More TODO:
# This will also not work if ENR does not contain an IP address or if the
# IP address is out of date and doesn't match current UDP end point
try:
newNode = newNode(authResp.record)
ok()
except KeyError, ValueError:
err(HandshakeError)
# Node returned might not have an address or not a valid address
newNode = ? newNode(authResp.record).mapErrTo(HandshakeError)
ok()
proc decodePacket*(c: var Codec,
fromId: NodeID,
@ -299,11 +294,6 @@ proc decodePacket*(c: var Codec,
c.handshakes.del(key)
# For an incoming handshake, we are not sure the address in the ENR is there
# and if it is the real external IP, so we use the one we know from the
# UDP packet.
updateEndpoint(newNode, fromAddr)
# Swap keys to match remote
swap(sec.readKey, sec.writeKey)
# TODO: is it safe to ignore the error here?

View File

@ -4,7 +4,7 @@
import
net, strutils, macros, algorithm, options,
nimcrypto, stew/base64,
eth/[rlp, keys], ../enode
eth/[rlp, keys]
export options
@ -193,10 +193,10 @@ proc get*(r: Record, T: type PublicKey): Option[T] =
proc tryGet*(r: Record, key: string, T: type): Option[T] =
try:
return some get(r, key, T)
except CatchableError:
except ValueError:
discard
proc toTypedRecord*(r: Record): Option[TypedRecord] =
proc toTypedRecord*(r: Record): EnrResult[TypedRecord] =
let id = r.tryGet("id", string)
if id.isSome:
var tr: TypedRecord
@ -213,7 +213,9 @@ proc toTypedRecord*(r: Record): Option[TypedRecord] =
readField udp
readField udp6
return some(tr)
ok(tr)
else:
err("Record without id field")
proc verifySignatureV4(r: Record, sigData: openarray[byte], content: seq[byte]):
bool =
@ -290,7 +292,7 @@ proc fromBytes*(r: var Record, s: openarray[byte]): bool =
r.raw = @s
try:
result = fromBytesAux(r)
except CatchableError:
except RlpError:
discard
proc fromBase64*(r: var Record, s: string): bool =
@ -299,7 +301,7 @@ proc fromBase64*(r: var Record, s: string): bool =
try:
r.raw = Base64Url.decode(s)
result = fromBytesAux(r)
except CatchableError:
except RlpError, Base64Error:
discard
proc fromURI*(r: var Record, s: string): bool =
@ -344,6 +346,8 @@ proc `==`*(a, b: Record): bool = a.raw == b.raw
proc read*(rlp: var Rlp, T: typedesc[Record]):
T {.inline, raises:[RlpError, ValueError, Defect].} =
if not result.fromBytes(rlp.rawData):
# TODO: This could also just be an invalid signature, would be cleaner to
# split of RLP deserialisation errors from this.
raise newException(ValueError, "Could not deserialize")
rlp.skipElem()

View File

@ -1,64 +1,59 @@
import
std/[net, hashes], nimcrypto, stint, chronicles,
types, enr, eth/keys, ../enode
std/[net, hashes], nimcrypto, stint, chronos,
eth/keys, enr
{.push raises: [Defect].}
type
NodeId* = UInt256
Address* = object
ip*: IpAddress
port*: Port
Node* = ref object
node*: ENode
id*: NodeId
pubkey*: PublicKey
address*: Option[Address]
record*: Record
proc toNodeId*(pk: PublicKey): NodeId =
readUintBE[256](keccak256.digest(pk.toRaw()).data)
# TODO: Lets not allow to create a node where enode info is not in sync with the
# record
proc newNode*(enode: ENode, r: Record): Node =
Node(node: enode,
id: enode.pubkey.toNodeId(),
record: r)
proc newNode*(r: Record): Node =
proc newNode*(r: Record): Result[Node, cstring] =
# TODO: Handle IPv6
var a: Address
try:
let
ipBytes = r.get("ip", array[4, byte])
udpPort = r.get("udp", uint16)
a = Address(ip: IpAddress(family: IpAddressFamily.IPv4,
address_v4: ipBytes),
udpPort: Port udpPort)
except KeyError, ValueError:
# TODO: This will result in a 0.0.0.0 address. Might introduce more bugs.
# Maybe we shouldn't allow the creation of Node from Record without IP.
# Will need some refactor though.
discard
let pk = r.get(PublicKey)
# This check is redundant as the deserialisation of `Record` will already fail
# at `verifySignature` if there is no public key
if pk.isNone():
warn "Could not recover public key from ENR"
return
return err("Could not recover public key from ENR")
let enode = ENode(pubkey: pk.get(), address: a)
result = Node(node: enode,
id: enode.pubkey.toNodeId(),
record: r)
let tr = ? r.toTypedRecord()
if tr.ip.isSome() and tr.udp.isSome():
let
ip = IpAddress(family: IpAddressFamily.IPv4, address_v4: tr.ip.get())
a = Address(ip: ip, port: Port(tr.udp.get()))
proc hash*(n: Node): hashes.Hash = hash(n.node.pubkey.toRaw)
ok(Node(id: pk.get().toNodeId(), pubkey: pk.get() , record: r,
address: some(a)))
else:
ok(Node(id: pk.get().toNodeId(), pubkey: pk.get(), record: r,
address: none(Address)))
proc hash*(n: Node): hashes.Hash = hash(n.pubkey.toRaw)
proc `==`*(a, b: Node): bool =
(a.isNil and b.isNil) or
(not a.isNil and not b.isNil and a.node.pubkey == b.node.pubkey)
(not a.isNil and not b.isNil and a.pubkey == b.pubkey)
proc address*(n: Node): Address {.inline.} = n.node.address
proc updateEndpoint*(n: Node, a: Address) {.inline.} =
n.node.address = a
proc `$`*(a: Address): string =
result.add($a.ip)
result.add(":" & $a.port)
proc `$`*(n: Node): string =
if n == nil:
"Node[local]"
"Node[uninitialized]"
elif n.address.isNone():
"Node[unaddressable]"
else:
"Node[" & $n.node.address.ip & ":" & $n.node.address.udpPort & "]"
"Node[" & $n.address.get().ip & ":" & $n.address.get().port & "]"

View File

@ -76,12 +76,14 @@ import
std/[tables, sets, options, math, random],
json_serialization/std/net,
stew/[byteutils, endians2], chronicles, chronos, stint,
eth/[rlp, keys], ../enode, types, encoding, node, routing_table, enr
eth/[rlp, keys], types, encoding, node, routing_table, enr
import nimcrypto except toHex
export options
{.push raises: [Defect].}
logScope:
topics = "discv5"
@ -120,22 +122,24 @@ type
node: Node
message: seq[byte]
RandomSourceDepleted* = object of CatchableError
DiscResult*[T] = Result[T, cstring]
proc addNode*(d: Protocol, node: Node) =
discard d.routingTable.addNode(node)
proc addNode*(d: Protocol, node: Node): bool =
if node.address.isSome():
# Only add nodes with an address to the routing table
discard d.routingTable.addNode(node)
return true
template addNode*(d: Protocol, enode: ENode) =
addNode d, newNode(enode)
proc addNode*(d: Protocol, r: Record): bool =
let node = newNode(r)
if node.isOk():
return d.addNode(node[])
template addNode*(d: Protocol, r: Record) =
addNode d, newNode(r)
proc addNode*(d: Protocol, enr: EnrUri) =
proc addNode*(d: Protocol, enr: EnrUri): bool =
var r: Record
let res = r.fromUri(enr)
doAssert(res)
d.addNode newNode(r)
if res:
return d.addNode(r)
proc getNode*(d: Protocol, id: NodeId): Option[Node] =
d.routingTable.getNode(id)
@ -152,15 +156,31 @@ func privKey*(d: Protocol): lent PrivateKey =
d.privateKey
proc send(d: Protocol, a: Address, data: seq[byte]) =
# debug "Sending bytes", amount = data.len, to = a
let ta = initTAddress(a.ip, a.udpPort)
let f = d.transp.sendTo(ta, data)
f.callback = proc(data: pointer) {.gcsafe.} =
if f.failed:
debug "Discovery send failed", msg = f.readError.msg
let ta = initTAddress(a.ip, a.port)
try:
let f = d.transp.sendTo(ta, data)
f.callback = proc(data: pointer) {.gcsafe.} =
if f.failed:
# Could be `TransportUseClosedError` in case the transport is already
# closed, or could be `TransportOsError` in case of a socket error.
# In the latter case this would probably mostly occur if the network
# interface underneath gets disconnected or similar.
# TODO: Should this kind of error be propagated upwards? Probably, but
# it should not stop the process as that would reset the discovery
# progress in case there is even a small window of no connection.
# One case that needs this error available upwards is when revalidating
# nodes. Else the revalidation might end up clearing the routing tabl
# because of ping failures due to own network connection failure.
debug "Discovery send failed", msg = f.readError.msg
except Exception as e:
# TODO: General exception still being raised from Chronos.
if e of Defect:
raise (ref Defect)(e)
else: doAssert(false)
proc send(d: Protocol, n: Node, data: seq[byte]) =
d.send(n.node.address, data)
doAssert(n.address.isSome())
d.send(n.address.get(), data)
proc `xor`[N: static[int], T](a, b: array[N, T]): array[N, T] =
for i in 0 .. a.high:
@ -177,16 +197,19 @@ proc isWhoAreYou(d: Protocol, packet: openArray[byte]): bool =
if packet.len > d.whoareyouMagic.len:
result = d.whoareyouMagic == packet.toOpenArray(0, magicSize - 1)
proc decodeWhoAreYou(d: Protocol, packet: openArray[byte]): Whoareyou =
proc decodeWhoAreYou(d: Protocol, packet: openArray[byte]):
Whoareyou {.raises: [RlpError].} =
result = Whoareyou()
result[] = rlp.decode(packet.toOpenArray(magicSize, packet.high), WhoareyouObj)
proc sendWhoareyou(d: Protocol, address: Address, toNode: NodeId, authTag: AuthTag) =
proc sendWhoareyou(d: Protocol, address: Address, toNode: NodeId,
authTag: AuthTag): DiscResult[void] {.raises: [Exception, Defect].} =
trace "sending who are you", to = $toNode, toAddress = $address
let challenge = Whoareyou(authTag: authTag, recordSeq: 0)
if randomBytes(challenge.idNonce) != challenge.idNonce.len:
raise newException(RandomSourceDepleted, "Could not randomize bytes")
return err("Could not randomize bytes")
# If there is already a handshake going on for this nodeid then we drop this
# new one. Handshake will get cleaned up after `handshakeTimeout`.
# If instead overwriting the handshake would be allowed, the handshake timeout
@ -195,9 +218,9 @@ proc sendWhoareyou(d: Protocol, address: Address, toNode: NodeId, authTag: AuthT
# a loop.
# Use toNode + address to make it more difficult for an attacker to occupy
# the handshake of another node.
let key = HandShakeKey(nodeId: toNode, address: $address)
if not d.codec.handshakes.hasKeyOrPut(key, challenge):
# TODO: raises: [Exception]
sleepAsync(handshakeTimeout).addCallback() do(data: pointer):
# TODO: should we still provide cancellation in case handshake completes
# correctly?
@ -206,43 +229,61 @@ proc sendWhoareyou(d: Protocol, address: Address, toNode: NodeId, authTag: AuthT
var data = @(whoareyouMagic(toNode))
data.add(rlp.encode(challenge[]))
d.send(address, data)
ok()
else:
err("NodeId already has ongoing handshake")
proc sendNodes(d: Protocol, toId: NodeId, toAddr: Address, reqId: RequestId,
nodes: openarray[Node]) =
nodes: openarray[Node]): DiscResult[void] =
proc sendNodes(d: Protocol, toId: NodeId, toAddr: Address,
message: NodesMessage, reqId: RequestId) {.nimcall.} =
let (data, _) = d.codec.encodePacket(toId, toAddr,
encodeMessage(message, reqId), challenge = nil).tryGet()
message: NodesMessage, reqId: RequestId): DiscResult[void] {.nimcall.} =
let (data, _) = ? d.codec.encodePacket(toId, toAddr,
encodeMessage(message, reqId), challenge = nil)
d.send(toAddr, data)
ok()
if nodes.len == 0:
# In case of 0 nodes, a reply is still needed
return d.sendNodes(toId, toAddr, NodesMessage(total: 1, enrs: @[]), reqId)
var message: NodesMessage
# TODO: Do the total calculation based on the max UDP packet size we want to
# send and the ENR size of all (max 16) nodes.
# Which UDP packet size to take? 1280? 576?
message.total = ceil(nodes.len / maxNodesPerMessage).uint32
for i in 0 ..< nodes.len:
message.enrs.add(nodes[i].record)
if message.enrs.len == 3: # TODO: Uh, what is this?
d.sendNodes(toId, toAddr, message, reqId)
if message.enrs.len == maxNodesPerMessage:
let res = d.sendNodes(toId, toAddr, message, reqId)
if res.isErr: # TODO: is there something nicer for this?
return res
message.enrs.setLen(0)
if message.enrs.len != 0:
d.sendNodes(toId, toAddr, message, reqId)
let res = d.sendNodes(toId, toAddr, message, reqId)
if res.isErr: # TODO: is there something nicer for this?
return res
ok()
proc handlePing(d: Protocol, fromId: NodeId, fromAddr: Address,
ping: PingMessage, reqId: RequestId) =
ping: PingMessage, reqId: RequestId): DiscResult[void] =
let a = fromAddr
var pong: PongMessage
pong.enrSeq = ping.enrSeq
pong.ip = case a.ip.family
of IpAddressFamily.IPv4: @(a.ip.address_v4)
of IpAddressFamily.IPv6: @(a.ip.address_v6)
pong.port = a.udpPort.uint16
pong.port = a.port.uint16
let (data, _) = ? d.codec.encodePacket(fromId, fromAddr,
encodeMessage(pong, reqId), challenge = nil)
let (data, _) = d.codec.encodePacket(fromId, fromAddr,
encodeMessage(pong, reqId), challenge = nil).tryGet()
d.send(fromAddr, data)
ok()
proc handleFindNode(d: Protocol, fromId: NodeId, fromAddr: Address,
fn: FindNodeMessage, reqId: RequestId) =
fn: FindNodeMessage, reqId: RequestId): DiscResult[void] =
if fn.distance == 0:
d.sendNodes(fromId, fromAddr, reqId, [d.localNode])
else:
@ -253,12 +294,9 @@ proc handleFindNode(d: Protocol, fromId: NodeId, fromAddr: Address,
proc receive*(d: Protocol, a: Address, packet: openArray[byte]) {.gcsafe,
raises: [
Defect,
# TODO This is now coming from Chronos's callSoon
Exception,
# TODO All of these should probably be handled here
RlpError,
IOError,
TransportAddressError,
# This just comes now from a future.complete() and `sendWhoareyou` which
# has it because of `sleepAsync` with `addCallback`
Exception
].} =
if packet.len < tagSize: # or magicSize, can be either
return # Invalid packet
@ -267,18 +305,29 @@ proc receive*(d: Protocol, a: Address, packet: openArray[byte]) {.gcsafe,
if d.isWhoAreYou(packet):
trace "Received whoareyou", localNode = $d.localNode, address = a
let whoareyou = d.decodeWhoAreYou(packet)
var whoareyou: WhoAreYou
try:
whoareyou = d.decodeWhoAreYou(packet)
except RlpError:
debug "Invalid WhoAreYou packet, decoding failed"
return
var pr: PendingRequest
if d.pendingRequests.take(whoareyou.authTag, pr):
let toNode = pr.node
whoareyou.pubKey = toNode.node.pubkey # TODO: Yeah, rather ugly this.
try:
let (data, _) = d.codec.encodePacket(toNode.id, toNode.address,
pr.message, challenge = whoareyou).tryGet()
d.send(toNode, data)
except RandomSourceDepleted:
debug "Failed to respond to a who-you-are packet " &
"due to randomness source depletion."
whoareyou.pubKey = toNode.pubkey # TODO: Yeah, rather ugly this.
doAssert(toNode.address.isSome())
let encoded = d.codec.encodePacket(toNode.id, toNode.address.get(),
pr.message, challenge = whoareyou)
# TODO: Perhaps just expect here? Or raise Defect in `encodePacket`?
# if this occurs there is an issue with the system anyhow?
if encoded.isErr:
warn "Not enough randomness to encode packet"
return
let (data, _) = encoded[]
d.send(toNode, data)
else:
debug "Timed out or unrequested WhoAreYou packet"
else:
var tag: array[tagSize, byte]
@ -293,56 +342,87 @@ proc receive*(d: Protocol, a: Address, packet: openArray[byte]) {.gcsafe,
let message = decoded[]
if not node.isNil:
# Not filling table with nodes without correct IP in the ENR
if a.ip == node.address.ip:
# TODO: Should we care about this???
if node.address.isSome() and a == node.address.get():
debug "Adding new node to routing table", node = $node,
localNode = $d.localNode
discard d.routingTable.addNode(node)
discard d.addNode(node)
case message.kind
of ping:
d.handlePing(sender, a, message.ping, message.reqId)
if d.handlePing(sender, a, message.ping, message.reqId).isErr:
debug "Sending Pong message failed"
of findNode:
d.handleFindNode(sender, a, message.findNode, message.reqId)
if d.handleFindNode(sender, a, message.findNode, message.reqId).isErr:
debug "Sending Nodes message failed"
else:
var waiter: Future[Option[Message]]
if d.awaitedMessages.take((sender, message.reqId), waiter):
waiter.complete(some(message))
waiter.complete(some(message)) # TODO: raises: [Exception]
else:
trace "Timed out or unrequested message", message = message.kind,
origin = a
elif decoded.error == DecodeError.DecryptError:
debug "Could not decrypt packet, respond with whoareyou",
trace "Could not decrypt packet, respond with whoareyou",
localNode = $d.localNode, address = a
# only sendingWhoareyou in case it is a decryption failure
d.sendWhoareyou(a, sender, authTag)
let res = d.sendWhoareyou(a, sender, authTag)
if res.isErr():
trace "Sending WhoAreYou packet failed", err = res.error
elif decoded.error == DecodeError.UnsupportedMessage:
# Still adding the node in case failure is because of unsupported message.
if not node.isNil:
if a.ip == node.address.ip:
# Not filling table with nodes without correct IP in the ENR
# TODO: Should we care about this???s
if node.address.isSome() and a == node.address.get():
debug "Adding new node to routing table", node = $node,
localNode = $d.localNode
discard d.routingTable.addNode(node)
discard d.addNode(node)
# elif decoded.error == DecodeError.PacketError:
# Not adding this node as from our perspective it is sending rubbish.
proc processClient(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async, gcsafe.} =
var proto = getUserData[Protocol](transp)
try:
# TODO: Maybe here better to use `peekMessage()` to avoid allocation,
# but `Bytes` object is just a simple seq[byte], and `ByteRange` object
# do not support custom length.
var buf = transp.getMessage()
let a = Address(ip: raddr.address, udpPort: raddr.port, tcpPort: raddr.port)
proto.receive(a, buf)
except RlpError as e:
debug "Receive failed", exception = e.name, msg = e.msg
# TODO: what else can be raised? Figure this out and be more restrictive?
except CatchableError as e:
debug "Receive failed", exception = e.name, msg = e.msg,
stacktrace = e.getStackTrace()
# TODO: Not sure why but need to pop the raises here as it is apparently not
# enough to put it in the raises pragma of `processClient` and other async procs.
{.pop.}
# Next, below there is no more effort done in catching the general `Exception`
# as async procs always require `Exception` in the raises pragma, see also:
# https://github.com/status-im/nim-chronos/issues/98
# So I don't bother for now and just add them in the raises pragma until this
# gets fixed.
proc processClient(transp: DatagramTransport, raddr: TransportAddress):
Future[void] {.async, gcsafe, raises: [Exception, Defect].} =
let proto = getUserData[Protocol](transp)
var a: Address
var buf = newSeq[byte]()
proc validIp(sender, address: IpAddress): bool =
try:
a = Address(ip: raddr.address, port: raddr.port)
except ValueError:
# This should not be possible considering we bind to an IP address.
error "Not a valid IpAddress"
return
try:
# TODO: should we use `peekMessage()` to avoid allocation?
# TODO: This can still raise general `Exception` while it probably should
# only give TransportOsError.
buf = transp.getMessage()
except TransportOsError as e:
# This is likely to be local network connection issues.
error "Transport getMessage error", exception = e.name, msg = e.msg
except Exception as e:
if e of Defect:
raise (ref Defect)(e)
else: doAssert(false)
try:
proto.receive(a, buf)
except Exception as e:
if e of Defect:
raise (ref Defect)(e)
else: doAssert(false)
proc validIp(sender, address: IpAddress): bool {.raises: [Defect].} =
let
s = initTAddress(sender, Port(0))
a = initTAddress(address, Port(0))
@ -362,74 +442,95 @@ proc validIp(sender, address: IpAddress): bool =
# TODO: This could be improved to do the clean-up immediatily in case a non
# whoareyou response does arrive, but we would need to store the AuthTag
# somewhere
proc registerRequest(d: Protocol, n: Node, message: seq[byte], nonce: AuthTag) =
proc registerRequest(d: Protocol, n: Node, message: seq[byte], nonce: AuthTag)
{.raises: [Exception, Defect].} =
let request = PendingRequest(node: n, message: message)
if not d.pendingRequests.hasKeyOrPut(nonce, request):
# TODO: raises: [Exception]
sleepAsync(responseTimeout).addCallback() do(data: pointer):
d.pendingRequests.del(nonce)
proc waitMessage(d: Protocol, fromNode: Node, reqId: RequestId): Future[Option[Message]] =
proc waitMessage(d: Protocol, fromNode: Node, reqId: RequestId):
Future[Option[Message]] {.raises: [Exception, Defect].} =
result = newFuture[Option[Message]]("waitMessage")
let res = result
let key = (fromNode.id, reqId)
# TODO: raises: [Exception]
sleepAsync(responseTimeout).addCallback() do(data: pointer):
d.awaitedMessages.del(key)
if not res.finished:
res.complete(none(Message))
res.complete(none(Message)) # TODO: raises: [Exception]
d.awaitedMessages[key] = result
proc addNodesFromENRs(result: var seq[Node], enrs: openarray[Record]) =
for r in enrs: result.add(newNode(r))
proc addNodesFromENRs(result: var seq[Node], enrs: openarray[Record])
{.raises: [Defect].} =
for r in enrs:
let node = newNode(r)
if node.isOk():
result.add(node[])
proc waitNodes(d: Protocol, fromNode: Node, reqId: RequestId): Future[seq[Node]] {.async.} =
proc waitNodes(d: Protocol, fromNode: Node, reqId: RequestId):
Future[DiscResult[seq[Node]]] {.async, raises: [Exception, Defect].} =
var op = await d.waitMessage(fromNode, reqId)
if op.isSome and op.get.kind == nodes:
result.addNodesFromENRs(op.get.nodes.enrs)
var res = newSeq[Node]()
res.addNodesFromENRs(op.get.nodes.enrs)
let total = op.get.nodes.total
for i in 1 ..< total:
op = await d.waitMessage(fromNode, reqId)
if op.isSome and op.get.kind == nodes:
result.addNodesFromENRs(op.get.nodes.enrs)
res.addNodesFromENRs(op.get.nodes.enrs)
else:
break
return ok(res)
else:
return err("Nodes message not received in time")
proc sendPing(d: Protocol, toNode: Node): RequestId =
proc sendMessage*[T: SomeMessage](d: Protocol, toNode: Node, m: T):
DiscResult[RequestId] {.raises: [Exception, Defect].} =
doAssert(toNode.address.isSome())
let
reqId = newRequestId().tryGet()
ping = PingMessage(enrSeq: d.localNode.record.seqNum)
message = encodeMessage(ping, reqId)
(data, nonce) = d.codec.encodePacket(toNode.id, toNode.address, message,
challenge = nil).tryGet()
reqId = ? newRequestId()
message = encodeMessage(m, reqId)
(data, nonce) = ? d.codec.encodePacket(toNode.id, toNode.address.get(),
message, challenge = nil)
d.registerRequest(toNode, message, nonce)
d.send(toNode, data)
return reqId
return ok(reqId)
proc ping*(d: Protocol, toNode: Node): Future[Option[PongMessage]] {.async.} =
let reqId = d.sendPing(toNode)
let resp = await d.waitMessage(toNode, reqId)
proc ping*(d: Protocol, toNode: Node):
Future[DiscResult[PongMessage]] {.async, raises: [Exception, Defect].} =
let reqId = d.sendMessage(toNode,
PingMessage(enrSeq: d.localNode.record.seqNum))
if reqId.isErr:
return err(reqId.error)
let resp = await d.waitMessage(toNode, reqId[])
if resp.isSome() and resp.get().kind == pong:
return some(resp.get().pong)
return ok(resp.get().pong)
else:
return err("Pong message not received in time")
proc sendFindNode(d: Protocol, toNode: Node, distance: uint32): RequestId =
let reqId = newRequestId().tryGet()
let message = encodeMessage(FindNodeMessage(distance: distance), reqId)
let (data, nonce) = d.codec.encodePacket(toNode.id, toNode.address, message,
challenge = nil).tryGet()
d.registerRequest(toNode, message, nonce)
proc findNode*(d: Protocol, toNode: Node, distance: uint32):
Future[DiscResult[seq[Node]]] {.async, raises: [Exception, Defect].} =
let reqId = d.sendMessage(toNode, FindNodeMessage(distance: distance))
if reqId.isErr:
return err(reqId.error)
let nodes = await d.waitNodes(toNode, reqId[])
d.send(toNode, data)
return reqId
if nodes.isOk:
var res = newSeq[Node]()
for n in nodes[]:
if n.address.isSome() and
validIp(toNode.address.get().ip, n.address.get().ip):
res.add(n)
# TODO: Check ports
return ok(res)
else:
return err(nodes.error)
proc findNode*(d: Protocol, toNode: Node, distance: uint32): Future[seq[Node]] {.async.} =
let reqId = sendFindNode(d, toNode, distance)
let nodes = await d.waitNodes(toNode, reqId)
for n in nodes:
if validIp(toNode.address.ip, n.address.ip):
result.add(n)
proc lookupDistances(target, dest: NodeId): seq[uint32] =
proc lookupDistances(target, dest: NodeId): seq[uint32] {.raises: [Defect].} =
let td = logDist(target, dest)
result.add(td)
var i = 1'u32
@ -440,20 +541,23 @@ proc lookupDistances(target, dest: NodeId): seq[uint32] =
result.add(td - i)
inc i
proc lookupWorker(d: Protocol, destNode: Node, target: NodeId): Future[seq[Node]] {.async.} =
proc lookupWorker(d: Protocol, destNode: Node, target: NodeId):
Future[seq[Node]] {.async, raises: [Exception, Defect].} =
let dists = lookupDistances(target, destNode.id)
var i = 0
while i < lookupRequestLimit and result.len < findNodeResultLimit:
# TODO: Handle failures
let r = await d.findNode(destNode, dists[i])
# TODO: I guess it makes sense to limit here also to `findNodeResultLimit`?
result.add(r)
# TODO: Handle failures better. E.g. stop on different failures than timeout
if r.isOk:
# TODO: I guess it makes sense to limit here also to `findNodeResultLimit`?
result.add(r[])
inc i
for n in result:
discard d.routingTable.addNode(n)
proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]] {.async.} =
proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]]
{.async, raises: [Exception, Defect].} =
## Perform a lookup for the given target, return the closest n nodes to the
## target. Maximum value for n is `BUCKET_SIZE`.
# TODO: Sort the returned nodes on distance
@ -489,14 +593,16 @@ proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]] {.async.} =
if result.len < BUCKET_SIZE:
result.add(n)
proc lookupRandom*(d: Protocol): Future[seq[Node]]
{.raises:[RandomSourceDepleted, Defect, Exception].} =
proc lookupRandom*(d: Protocol): Future[DiscResult[seq[Node]]]
{.async, raises:[Exception, Defect].} =
var id: NodeId
if randomBytes(addr id, sizeof(id)) != sizeof(id):
raise newException(RandomSourceDepleted, "Could not randomize bytes")
d.lookup(id)
return err("Could not randomize bytes")
proc resolve*(d: Protocol, id: NodeId): Future[Option[Node]] {.async.} =
return ok(await d.lookup(id))
proc resolve*(d: Protocol, id: NodeId): Future[Option[Node]]
{.async, raises: [Exception, Defect].} =
## Resolve a `Node` based on provided `NodeId`.
##
## This will first look in the own DHT. If the node is known, it will try to
@ -508,8 +614,9 @@ proc resolve*(d: Protocol, id: NodeId): Future[Option[Node]] {.async.} =
if node.isSome():
let request = await d.findNode(node.get(), 0)
if request.len > 0:
return some(request[0])
# TODO: Handle failures better. E.g. stop on different failures than timeout
if request.isOk() and request[].len > 0:
return some(request[][0])
let discovered = await d.lookup(id)
for n in discovered:
@ -522,11 +629,11 @@ proc resolve*(d: Protocol, id: NodeId): Future[Option[Node]] {.async.} =
return some(n)
proc revalidateNode*(d: Protocol, n: Node)
{.async, raises:[Defect, Exception].} = # TODO: Exception
{.async, raises: [Exception, Defect].} = # TODO: Exception
trace "Ping to revalidate node", node = $n
let pong = await d.ping(n)
if pong.isSome():
if pong.isOK():
if pong.get().enrSeq > n.record.seqNum:
# TODO: Request new ENR
discard
@ -534,6 +641,8 @@ proc revalidateNode*(d: Protocol, n: Node)
d.routingTable.setJustSeen(n)
trace "Revalidated node", node = $n
else:
# TODO: Handle failures better. E.g. don't remove nodes on different
# failures than timeout
# For now we never remove bootstrap nodes. It might make sense to actually
# do so and to retry them only in case we drop to a really low amount of
# peers in the DHT
@ -544,15 +653,14 @@ proc revalidateNode*(d: Protocol, n: Node)
# This might be to direct, so we could keep these longer. But better
# would be to simply not remove the nodes immediatly but only after x
# amount of failures.
discard d.codec.db.deleteKeys(n.id, n.address)
doAssert(n.address.isSome())
discard d.codec.db.deleteKeys(n.id, n.address.get())
else:
debug "Revalidation of bootstrap node failed", enr = toURI(n.record)
proc revalidateLoop(d: Protocol) {.async.} =
proc revalidateLoop(d: Protocol) {.async, raises: [Exception, Defect].} =
# TODO: General Exception raised.
try:
# TODO: We need to handle actual errors still, which might just allow to
# continue the loop. However, currently `revalidateNode` raises a general
# `Exception` making this rather hard.
while true:
await sleepAsync(rand(10 * 1000).milliseconds)
let n = d.routingTable.nodeToRevalidate()
@ -563,16 +671,20 @@ proc revalidateLoop(d: Protocol) {.async.} =
except CancelledError:
trace "revalidateLoop canceled"
proc lookupLoop(d: Protocol) {.async.} =
## TODO: Same story as for `revalidateLoop`
proc lookupLoop(d: Protocol) {.async, raises: [Exception, Defect].} =
# TODO: General Exception raised.
try:
while true:
# lookup self (neighbour nodes)
var nodes = await d.lookup(d.localNode.id)
trace "Discovered nodes in self lookup", nodes = $nodes
let selfLookup = await d.lookup(d.localNode.id)
trace "Discovered nodes in self lookup", nodes = $selfLookup
nodes = await d.lookupRandom()
trace "Discovered nodes in random lookup", nodes = $nodes
let randomLookup = await d.lookupRandom()
if randomLookup.isOK:
trace "Discovered nodes in random lookup", nodes = $randomLookup[]
trace "Total nodes in routing table", total = d.routingTable.len()
else:
trace "random lookup failed", err = randomLookup.error
await sleepAsync(lookupInterval)
except CancelledError:
trace "lookupLoop canceled"
@ -580,14 +692,12 @@ proc lookupLoop(d: Protocol) {.async.} =
proc newProtocol*(privKey: PrivateKey, db: Database,
externalIp: Option[IpAddress], tcpPort, udpPort: Port,
localEnrFields: openarray[FieldPair] = [],
bootstrapRecords: openarray[Record] = []): Protocol =
bootstrapRecords: openarray[Record] = []):
Protocol {.raises: [Defect].} =
let
a = Address(ip: externalIp.get(IPv4_any()),
tcpPort: tcpPort, udpPort: udpPort)
enode = ENode(pubkey: privKey.toPublicKey().tryGet(), address: a)
enrRec = enr.Record.init(1, privKey, externalIp, tcpPort, udpPort,
localEnrFields).expect("Properly intialized private key")
node = newNode(enode, enrRec)
node = newNode(enrRec).expect("Properly initialized node")
result = Protocol(
privateKey: privKey,
@ -600,23 +710,24 @@ proc newProtocol*(privKey: PrivateKey, db: Database,
result.routingTable.init(node)
proc open*(d: Protocol) =
proc open*(d: Protocol) {.raises: [Exception, Defect].} =
info "Starting discovery node", node = $d.localNode,
uri = toURI(d.localNode.record)
# TODO allow binding to specific IP / IPv6 / etc
let ta = initTAddress(IPv4_any(), d.localNode.node.address.udpPort)
let ta = initTAddress(IPv4_any(), Port(d.localNode.address.get().port))
# TODO: raises `OSError` and `IOSelectorsException`, the latter which is
# object of Exception. In Nim devel this got changed to CatchableError.
d.transp = newDatagramTransport(processClient, udata = d, local = ta)
for record in d.bootstrapRecords:
debug "Adding bootstrap node", uri = toURI(record)
d.addNode(record)
discard d.addNode(record)
proc start*(d: Protocol) =
# Might want to move these to a separate proc if this turns out to be needed.
proc start*(d: Protocol) {.raises: [Exception, Defect].} =
d.lookupLoop = lookupLoop(d)
d.revalidateLoop = revalidateLoop(d)
proc close*(d: Protocol) =
proc close*(d: Protocol) {.raises: [Exception, Defect].} =
doAssert(not d.transp.closed)
debug "Closing discovery node", node = $d.localNode
@ -624,11 +735,10 @@ proc close*(d: Protocol) =
d.revalidateLoop.cancel()
if not d.lookupLoop.isNil:
d.lookupLoop.cancel()
# TODO: unsure if close can't create issues in the not awaited cancellations
# above
d.transp.close()
proc closeWait*(d: Protocol) {.async.} =
proc closeWait*(d: Protocol) {.async, raises: [Exception, Defect].} =
doAssert(not d.transp.closed)
debug "Closing discovery node", node = $d.localNode

View File

@ -1,7 +1,7 @@
import
std/[algorithm, times, sequtils, bitops, random, sets, options],
stint, chronicles,
types, node
node
{.push raises: [Defect].}

View File

@ -1,6 +1,8 @@
import
hashes, stint,
eth/[keys, rlp], ../enode, enr
hashes, stint, chronos,
eth/[keys, rlp], enr, node
{.push raises: [Defect].}
const
authTagSize* = 12
@ -8,7 +10,6 @@ const
aesKeySize* = 128 div 8
type
NodeId* = UInt256
AuthTag* = array[authTagSize, byte]
IdNonce* = array[idNonceSize, byte]
AesKey* = array[aesKeySize, byte]
@ -82,14 +83,14 @@ template messageKind*(T: typedesc[SomeMessage]): MessageKind =
elif T is FindNodeMessage: findNode
elif T is NodesMessage: nodes
method storeKeys*(db: Database, id: NodeId, address: Address, r, w: AesKey):
bool {.base, raises: [Defect].} = discard
method storeKeys*(db: Database, id: NodeId, address: Address,
r, w: AesKey): bool {.base.} = discard
method loadKeys*(db: Database, id: NodeId, address: Address, r, w: var AesKey):
bool {.base, raises: [Defect].} = discard
method loadKeys*(db: Database, id: NodeId, address: Address,
r, w: var AesKey): bool {.base.} = discard
method deleteKeys*(db: Database, id: NodeId, address: Address):
bool {.raises: [Defect].} = discard
bool {.base.} = discard
proc toBytes*(id: NodeId): array[32, byte] {.inline.} =
id.toByteArrayBE()

View File

@ -1,17 +1,20 @@
import
unittest, chronos, sequtils, chronicles, tables, stint, nimcrypto,
eth/[keys, rlp], eth/p2p/enode, eth/trie/db,
eth/[keys, rlp], eth/trie/db,
eth/p2p/discoveryv5/[discovery_db, enr, node, types, routing_table, encoding],
eth/p2p/discoveryv5/protocol as discv5_protocol,
./p2p_test_helper
proc localAddress*(port: int): Address =
Address(ip: parseIpAddress("127.0.0.1"), port: Port(port))
proc initDiscoveryNode*(privKey: PrivateKey, address: Address,
bootstrapRecords: openarray[Record] = []):
discv5_protocol.Protocol =
var db = DiscoveryDB.init(newMemoryDB())
result = newProtocol(privKey, db,
some(parseIpAddress("127.0.0.1")),
address.tcpPort, address.udpPort,
some(address.ip),
address.port, address.port,
bootstrapRecords = bootstrapRecords)
result.open()
@ -26,8 +29,8 @@ proc randomPacket(tag: PacketTag): seq[byte] =
authTag: AuthTag
msg: array[44, byte]
require randomBytes(authTag) == authTag.len
require randomBytes(msg) == msg.len
check randomBytes(authTag) == authTag.len
check randomBytes(msg) == msg.len
result.add(tag)
result.add(rlp.encode(authTag))
result.add(msg)
@ -36,7 +39,7 @@ proc generateNode(privKey = PrivateKey.random()[], port: int = 20302): Node =
let port = Port(port)
let enr = enr.Record.init(1, privKey, some(parseIpAddress("127.0.0.1")),
port, port).expect("Properly intialized private key")
result = newNode(enr)
result = newNode(enr).expect("Properly initialized node")
proc nodeAtDistance(n: Node, d: uint32): Node =
while true:
@ -55,13 +58,13 @@ suite "Discovery v5 Tests":
node = initDiscoveryNode(PrivateKey.random()[], localAddress(20302))
targetNode = generateNode()
node.addNode(targetNode)
check node.addNode(targetNode)
for i in 0..<1000:
node.addNode(generateNode())
discard node.addNode(generateNode())
let n = node.getNode(targetNode.id)
require n.isSome()
check n.isSome()
check n.get() == targetNode
await node.closeWait()
@ -76,7 +79,7 @@ suite "Discovery v5 Tests":
pong1 = await discv5_protocol.ping(node1, bootnode.localNode)
pong2 = await discv5_protocol.ping(node1, node2.localNode)
check pong1.isSome() and pong2.isSome()
check pong1.isOk() and pong2.isOk()
await bootnode.closeWait()
await node2.closeWait()
@ -85,9 +88,10 @@ suite "Discovery v5 Tests":
await node1.revalidateNode(node2.localNode)
let n = node1.getNode(bootnode.localNode.id)
require n.isSome()
check n.get() == bootnode.localNode
check node1.getNode(node2.localNode.id).isNone()
check:
n.isSome()
n.get() == bootnode.localNode
node1.getNode(node2.localNode.id).isNone()
await node1.closeWait()
@ -98,7 +102,7 @@ suite "Discovery v5 Tests":
let a = localAddress(20303)
for i in 0 ..< 5:
require randomBytes(tag) == tag.len
check randomBytes(tag) == tag.len
node.receive(a, randomPacket(tag))
# Checking different nodeIds but same address
@ -225,42 +229,47 @@ suite "Discovery v5 Tests":
let nodes = nodesAtDistance(mainNode.localNode, dist, 10)
for n in nodes:
mainNode.addNode(n)
discard mainNode.addNode(n)
# Get ENR of the node itself
var discovered =
await discv5_protocol.findNode(testNode, mainNode.localNode, 0)
check:
discovered.len == 1
discovered[0] == mainNode.localNode
discovered.isOk
discovered[].len == 1
discovered[][0] == mainNode.localNode
# Get ENRs of nodes added at provided logarithmic distance
discovered =
await discv5_protocol.findNode(testNode, mainNode.localNode, dist)
check discovered.len == 10
check discovered.isOk
check discovered[].len == 10
for n in nodes:
check discovered.contains(n)
check discovered[].contains(n)
# Too high logarithmic distance, caps at 256
discovered =
await discv5_protocol.findNode(testNode, mainNode.localNode, 4294967295'u32)
check:
discovered.len == 1
discovered[0] == testNode.localNode
discovered.isOk
discovered[].len == 1
discovered[][0] == testNode.localNode
# Empty bucket
discovered =
await discv5_protocol.findNode(testNode, mainNode.localNode, 254)
check discovered.len == 0
check discovered.isOk
check discovered[].len == 0
let moreNodes = nodesAtDistance(mainNode.localNode, dist, 10)
for n in moreNodes:
mainNode.addNode(n)
discard mainNode.addNode(n)
# Full bucket
discovered =
await discv5_protocol.findNode(testNode, mainNode.localNode, dist)
check discovered.len == 16
check discovered.isOk
check discovered[].len == 16
await mainNode.closeWait()
await testNode.closeWait()
@ -271,7 +280,7 @@ suite "Discovery v5 Tests":
# Generate 1000 random nodes and add to our main node's routing table
for i in 0..<1000:
mainNode.addNode(generateNode())
discard mainNode.addNode(generateNode())
let
neighbours = mainNode.neighbours(mainNode.localNode.id)
@ -286,7 +295,8 @@ suite "Discovery v5 Tests":
discovered = await discv5_protocol.findNode(testNode, mainNode.localNode,
closestDistance)
check closest in discovered
check discovered.isOk
check closest in discovered[]
await mainNode.closeWait()
await testNode.closeWait()
@ -330,11 +340,11 @@ suite "Discovery v5 Tests":
# if resolve works (only local lookup)
block:
let pong = await targetNode.ping(mainNode.localNode)
require pong.isSome()
check pong.isOk()
await targetNode.closeWait()
let n = await mainNode.resolve(targetId)
require n.isSome()
check:
n.isSome()
n.get().id == targetId
n.get().record.seqNum == targetSeqNum
@ -344,12 +354,12 @@ suite "Discovery v5 Tests":
# TODO: need to add some logic to update ENRs properly
targetSeqNum.inc()
let r = enr.Record.init(targetSeqNum, targetKey,
some(targetAddress.ip), targetAddress.tcpPort, targetAddress.udpPort)[]
some(targetAddress.ip), targetAddress.port, targetAddress.port)[]
targetNode.localNode.record = r
targetNode.open()
let n = await mainNode.resolve(targetId)
require n.isSome()
check:
n.isSome()
n.get().id == targetId
n.get().record.seqNum == targetSeqNum
@ -358,20 +368,20 @@ suite "Discovery v5 Tests":
block:
targetSeqNum.inc()
let r = enr.Record.init(3, targetKey, some(targetAddress.ip),
targetAddress.tcpPort, targetAddress.udpPort)[]
targetAddress.port, targetAddress.port)[]
targetNode.localNode.record = r
let pong = await targetNode.ping(lookupNode.localNode)
require pong.isSome()
check pong.isOk()
await targetNode.closeWait()
# TODO: This step should eventually not be needed and ENRs with new seqNum
# should just get updated in the lookup.
await mainNode.revalidateNode(targetNode.localNode)
mainNode.addNode(lookupNode.localNode.record)
check mainNode.addNode(lookupNode.localNode.record)
let n = await mainNode.resolve(targetId)
require n.isSome()
check:
n.isSome()
n.get().id == targetId
n.get().record.seqNum == targetSeqNum

View File

@ -169,7 +169,7 @@ suite "Discovery v5 Cryptographic Primitives":
privKey = PrivateKey.fromHex(localSecretKey)[]
signature = signIDNonce(privKey, hexToByteArray[idNonceSize](idNonce),
hexToByteArray[64](ephemeralKey))
require signature.isOK()
check signature.isOK()
check signature[].toRaw() == hexToByteArray[64](idNonceSig)
test "Encryption/Decryption":