diff --git a/eth/p2p/discoveryv5/discovery_db.nim b/eth/p2p/discoveryv5/discovery_db.nim index bb9681d..64b8293 100644 --- a/eth/p2p/discoveryv5/discovery_db.nim +++ b/eth/p2p/discoveryv5/discovery_db.nim @@ -27,7 +27,8 @@ proc makeKey(id: NodeId, address: Address): array[keySize, byte] = copyMem(addr result[sizeof(id) + 1], unsafeAddr address.ip.address_v6, sizeof(address.ip.address_v6)) copyMem(addr result[sizeof(id) + 1 + sizeof(address.ip.address_v6)], unsafeAddr address.udpPort, sizeof(address.udpPort)) -method storeKeys*(db: DiscoveryDB, id: NodeId, address: Address, r, w: array[16, byte]): bool {.raises: [Defect].} = +method storeKeys*(db: DiscoveryDB, id: NodeId, address: Address, r, w: AesKey): + bool {.raises: [Defect].} = try: var value: array[sizeof(r) + sizeof(w), byte] value[0 .. 15] = r @@ -37,7 +38,8 @@ method storeKeys*(db: DiscoveryDB, id: NodeId, address: Address, r, w: array[16, except CatchableError: return false -method loadKeys*(db: DiscoveryDB, id: NodeId, address: Address, r, w: var array[16, byte]): bool {.raises: [Defect].} = +method loadKeys*(db: DiscoveryDB, id: NodeId, address: Address, r, w: var AesKey): + bool {.raises: [Defect].} = try: let res = db.backend.get(makeKey(id, address)) if res.len != sizeof(r) + sizeof(w): diff --git a/eth/p2p/discoveryv5/encoding.nim b/eth/p2p/discoveryv5/encoding.nim index 8c2577f..23e9f3d 100644 --- a/eth/p2p/discoveryv5/encoding.nim +++ b/eth/p2p/discoveryv5/encoding.nim @@ -4,11 +4,17 @@ import const idNoncePrefix = "discovery-id-nonce" - gcmNonceSize* = 12 keyAgreementPrefix = "discovery v5 key agreement" authSchemeName* = "gcm" + gcmNonceSize* = 12 + gcmTagSize = 16 + tagSize* = 32 ## size of the tag where each message (except whoareyou) starts + ## with type + + PacketTag* = array[tagSize, byte] + AuthResponse = object version: int signature: array[64, byte] @@ -18,26 +24,28 @@ type localNode*: Node privKey*: PrivateKey db*: Database - handshakes*: Table[string, Whoareyou] # TODO: Implement hash for NodeID + handshakes*: Table[HandShakeKey, Whoareyou] HandshakeSecrets = object - writeKey: array[16, byte] - readKey: array[16, byte] - authRespKey: array[16, byte] + writeKey: AesKey + readKey: AesKey + authRespKey: AesKey AuthHeader* = object - auth*: array[12, byte] - idNonce*: array[32, byte] + auth*: AuthTag + idNonce*: IdNonce scheme*: string ephemeralKey*: array[64, byte] response*: seq[byte] RandomSourceDepleted* = object of CatchableError -const - gcmTagSize = 16 + DecodeStatus* = enum + Success, + HandshakeError, + PacketError -proc randomBytes(v: var openarray[byte]) = +proc randomBytes*(v: var openarray[byte]) = if nimcrypto.randomBytes(v) != v.len: raise newException(RandomSourceDepleted, "Could not randomize bytes") @@ -67,7 +75,7 @@ proc deriveKeys(n1, n2: NodeID, priv: PrivateKey, pub: PublicKey, # echo "EPH: ", eph.data.toHex, " idNonce: ", challenge.idNonce.toHex, "info: ", info.toHex - static: assert(sizeof(result) == 16 * 3) + static: assert(sizeof(result) == aesKeySize * 3) var res = cast[ptr UncheckedArray[byte]](addr result) hkdf(sha256, eph.data, idNonce, info, toOpenArray(res, 0, sizeof(result) - 1)) @@ -109,22 +117,26 @@ proc `xor`[N: static[int], T](a, b: array[N, T]): array[N, T] = for i in 0 .. a.high: result[i] = a[i] xor b[i] -proc packetTag(destNode, srcNode: NodeID): array[32, byte] = +proc packetTag(destNode, srcNode: NodeID): PacketTag = let destId = destNode.toByteArrayBE() let srcId = srcNode.toByteArrayBE() let destidHash = sha256.digest(destId) result = srcId xor destidHash.data -proc encodeEncrypted*(c: Codec, toNode: Node, packetData: seq[byte], challenge: Whoareyou): (seq[byte], array[gcmNonceSize, byte]) = +proc encodeEncrypted*(c: Codec, + toNode: Node, + packetData: seq[byte], + challenge: Whoareyou): + (seq[byte], array[gcmNonceSize, byte]) = var nonce: array[gcmNonceSize, byte] randomBytes(nonce) var headEnc: seq[byte] - var writeKey: array[16, byte] + var writeKey: AesKey if challenge.isNil: headEnc = rlp.encode(nonce) - var readKey: array[16, byte] + var readKey: AesKey # We might not have the node's keys if the handshake hasn't been performed # yet. That's fine, we will be responded with whoareyou. @@ -147,7 +159,7 @@ proc encodeEncrypted*(c: Codec, toNode: Node, packetData: seq[byte], challenge: headBuf.add(encryptGCM(writeKey, nonce, body, tag)) return (headBuf, nonce) -proc decryptGCM(key: array[16, byte], nonce, ct, authData: openarray[byte]): seq[byte] = +proc decryptGCM(key: AesKey, nonce, ct, authData: openarray[byte]): seq[byte] = var dctx: GCM[aes128] dctx.init(key, nonce, authData) result = newSeq[byte](ct.len - gcmTagSize) @@ -219,36 +231,36 @@ proc decodeEncrypted*(c: var Codec, fromId: NodeID, fromAddr: Address, input: seq[byte], - authTag: var array[12, byte], + authTag: var AuthTag, newNode: var Node, - packet: var Packet): bool = + packet: var Packet): DecodeStatus = let input = input.toRange - var r = rlpFromBytes(input[32 .. ^1]) + var r = rlpFromBytes(input[tagSize .. ^1]) var auth: AuthHeader - var readKey: array[16, byte] + + var readKey: AesKey logScope: sender = $fromAddr if r.isList: # Handshake - rlp list indicates auth-header - - # TODO: Auth failure will result in resending whoareyou. Do we really want this? auth = r.read(AuthHeader) authTag = auth.auth - let challenge = c.handshakes.getOrDefault($fromId) + let key = HandShakeKey(nodeId: fromId, address: $fromAddr) + let challenge = c.handshakes.getOrDefault(key) if challenge.isNil: trace "Decoding failed (no challenge)" - return false + return HandshakeError if auth.idNonce != challenge.idNonce: trace "Decoding failed (different nonce)" - return false + return HandshakeError var sec: HandshakeSecrets if not c.decodeAuthResp(fromId, auth, challenge, sec, newNode): trace "Decoding failed (bad auth)" - return false - c.handshakes.del($fromId) + return HandshakeError + c.handshakes.del(key) # Swap keys to match remote swap(sec.readKey, sec.writeKey) @@ -258,29 +270,32 @@ proc decodeEncrypted*(c: var Codec, else: # Message packet or random packet - rlp bytes (size 12) indicates auth-tag - authTag = r.read(array[12, byte]) + authTag = r.read(AuthTag) auth.auth = authTag - var writeKey: array[16, byte] + var writeKey: AesKey if not c.db.loadKeys(fromId, fromAddr, readKey, writeKey): trace "Decoding failed (no keys)" - return false + return PacketError # doAssert(false, "TODO: HANDLE ME!") - let headSize = 32 + r.position + let headSize = tagSize + r.position let bodyEnc = input[headSize .. ^1] - let body = decryptGCM(readKey, auth.auth, bodyEnc.toOpenArray, input[0 .. 31].toOpenArray) + let body = decryptGCM(readKey, auth.auth, bodyEnc.toOpenArray, + input[0 .. tagSize - 1].toOpenArray) if body.len > 1: let status = decodePacketBody(body[0], body.toOpenArray(1, body.high), packet) if status == decodingSuccessful: - return true + return Success else: debug "Failed to decode discovery packet", reason = status - return false + return PacketError + else: + return PacketError proc newRequestId*(): RequestId = if randomBytes(addr result, sizeof(result)) != sizeof(result): - raise newException(RandomSourceDepleted, "Could not randomize bytes") # TODO: + raise newException(RandomSourceDepleted, "Could not randomize bytes") proc numFields(T: typedesc): int = for k, v in fieldPairs(default(T)): inc result diff --git a/eth/p2p/discoveryv5/enr.nim b/eth/p2p/discoveryv5/enr.nim index 0f3c5a5..d52c2a5 100644 --- a/eth/p2p/discoveryv5/enr.nim +++ b/eth/p2p/discoveryv5/enr.nim @@ -229,7 +229,7 @@ proc fromBytesAux(r: var Record): bool = return false # We already know we are working with a list - discard rlp.enterList() + doAssert rlp.enterList() rlp.skipElem() # Skip signature r.seqNum = rlp.read(uint64) diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index f8bd37e..cffc2ad 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -1,36 +1,48 @@ import std/[tables, sets, endians, options, math, random], - stew/byteutils, eth/[rlp, keys], chronicles, chronos, stint, - ../enode, types, encoding, node, routing_table, enr + json_serialization/std/net, stew/byteutils, chronicles, chronos, stint, + eth/[rlp, keys], ../enode, types, encoding, node, routing_table, enr import nimcrypto except toHex logScope: topics = "discv5" +const + alpha = 3 ## Kademlia concurrency factor + lookupRequestLimit = 3 + findNodeResultLimit = 15 # applies in FINDNODE handler + maxNodesPerPacket = 3 + lookupInterval = 60.seconds ## Interval of launching a random lookup to + ## populate the routing table. go-ethereum seems to do 3 runs every 30 + ## minutes. Trinity starts one every minute. + handshakeTimeout* = 2.seconds ## timeout for the reply on the + ## whoareyou message + responseTimeout* = 2.seconds ## timeout for the response of a request-response + ## call + magicSize = 32 ## size of the magic which is the start of the whoareyou + ## message + type Protocol* = ref object transp: DatagramTransport localNode*: Node privateKey: PrivateKey - whoareyouMagic: array[32, byte] + whoareyouMagic: array[magicSize, byte] idHash: array[32, byte] - pendingRequests: Table[array[12, byte], PendingRequest] + pendingRequests: Table[AuthTag, PendingRequest] db: Database routingTable: RoutingTable - codec: Codec + codec*: Codec awaitedPackets: Table[(Node, RequestId), Future[Option[Packet]]] + lookupLoop: Future[void] + revalidateLoop: Future[void] PendingRequest = object node: Node packet: seq[byte] -const - lookupRequestLimit = 15 - findNodeResultLimit = 15 # applies in FINDNODE handler - findNodeAttempts = 3 - -proc whoareyouMagic(toNode: NodeId): array[32, byte] = +proc whoareyouMagic(toNode: NodeId): array[magicSize, byte] = const prefix = "WHOAREYOU" var data: array[prefix.len + sizeof(toNode), byte] data[0 .. sizeof(toNode) - 1] = toNode.toByteArrayBE() @@ -55,9 +67,6 @@ proc newProtocol*(privKey: PrivateKey, db: Database, result.routingTable.init(node) -proc start*(p: Protocol) = - discard - proc send(d: Protocol, a: Address, data: seq[byte]) = # debug "Sending bytes", amount = data.len, to = a let ta = initTAddress(a.ip, a.udpPort) @@ -69,38 +78,47 @@ proc send(d: Protocol, a: Address, data: seq[byte]) = proc send(d: Protocol, n: Node, data: seq[byte]) = d.send(n.node.address, data) -proc randomBytes(v: var openarray[byte]) = - if nimcrypto.randomBytes(v) != v.len: - raise newException(RandomSourceDepleted, "Could not randomize bytes") # TODO: - proc `xor`[N: static[int], T](a, b: array[N, T]): array[N, T] = for i in 0 .. a.high: result[i] = a[i] xor b[i] proc isWhoAreYou(d: Protocol, msg: Bytes): bool = if msg.len > d.whoareyouMagic.len: - result = d.whoareyouMagic == msg.toOpenArray(0, 31) + result = d.whoareyouMagic == msg.toOpenArray(0, magicSize - 1) proc decodeWhoAreYou(d: Protocol, msg: Bytes): Whoareyou = result = Whoareyou() - result[] = rlp.decode(msg.toRange[32 .. ^1], WhoareyouObj) + result[] = rlp.decode(msg.toRange[magicSize .. ^1], WhoareyouObj) -proc sendWhoareyou(d: Protocol, address: Address, toNode: NodeId, authTag: array[12, byte]) = +proc sendWhoareyou(d: Protocol, address: Address, toNode: NodeId, authTag: AuthTag) = trace "sending who are you", to = $toNode, toAddress = $address let challenge = Whoareyou(authTag: authTag, recordSeq: 1) - randomBytes(challenge.idNonce) - d.codec.handshakes[$toNode] = challenge - var data = @(whoareyouMagic(toNode)) - data.add(rlp.encode(challenge[])) - d.send(address, data) + encoding.randomBytes(challenge.idNonce) + # If there is already a handshake going on for this nodeid then we drop this + # new one. Handshake will get cleaned up after `handshakeTimeout`. + # If instead overwriting the handshake would be allowed, the handshake timeout + # will need to be canceled each time. + # TODO: could also clean up handshakes in a seperate call, e.g. triggered in + # a loop. + # Use toNode + address to make it more difficult for an attacker to occupy + # the handshake of another node. + + let key = HandShakeKey(nodeId: toNode, address: $address) + if not d.codec.handshakes.hasKeyOrPut(key, challenge): + sleepAsync(handshakeTimeout).addCallback() do(data: pointer): + # TODO: should we still provide cancellation in case handshake completes + # correctly? + d.codec.handshakes.del(key) + + var data = @(whoareyouMagic(toNode)) + data.add(rlp.encode(challenge[])) + d.send(address, data) proc sendNodes(d: Protocol, toNode: Node, reqId: RequestId, nodes: openarray[Node]) = proc sendNodes(d: Protocol, toNode: Node, packet: NodesPacket, reqId: RequestId) {.nimcall.} = let (data, _) = d.codec.encodeEncrypted(toNode, encodePacket(packet, reqId), challenge = nil) d.send(toNode, data) - const maxNodesPerPacket = 3 - var packet: NodesPacket packet.total = ceil(nodes.len / maxNodesPerPacket).uint32 @@ -132,7 +150,7 @@ proc handleFindNode(d: Protocol, fromNode: Node, fn: FindNodePacket, reqId: Requ let distance = min(fn.distance, 256) d.sendNodes(fromNode, reqId, d.routingTable.neighboursAtDistance(distance)) -proc receive(d: Protocol, a: Address, msg: Bytes) {.gcsafe, +proc receive*(d: Protocol, a: Address, msg: Bytes) {.gcsafe, raises: [ Defect, # TODO This is now coming from Chronos's callSoon @@ -144,12 +162,13 @@ proc receive(d: Protocol, a: Address, msg: Bytes) {.gcsafe, EthKeysException, Secp256k1Exception, ].} = - if msg.len < 32: + if msg.len < tagSize: # or magicSize, can be either return # Invalid msg # debug "Packet received: ", length = msg.len if d.isWhoAreYou(msg): + trace "Received whoareyou", localNode = $d.localNode, address = a let whoareyou = d.decodeWhoAreYou(msg) var pr: PendingRequest if d.pendingRequests.take(whoareyou.authTag, pr): @@ -162,20 +181,20 @@ proc receive(d: Protocol, a: Address, msg: Bytes) {.gcsafe, "due to randomness source depletion." else: - var tag: array[32, byte] - tag[0 .. ^1] = msg.toOpenArray(0, 31) + var tag: array[tagSize, byte] + tag[0 .. ^1] = msg.toOpenArray(0, tagSize - 1) let senderData = tag xor d.idHash let sender = readUintBE[256](senderData) - var authTag: array[12, byte] + var authTag: AuthTag var node: Node var packet: Packet - - if d.codec.decodeEncrypted(sender, a, msg, authTag, node, packet): + let decoded = d.codec.decodeEncrypted(sender, a, msg, authTag, node, packet) + if decoded == DecodeStatus.Success: if node.isNil: node = d.routingTable.getNode(sender) else: - debug "Adding new node to routing table" + debug "Adding new node to routing table", node = $node, localNode = $d.localNode discard d.routingTable.addNode(node) doAssert(not node.isNil, "No node in the routing table (internal error?)") @@ -191,16 +210,17 @@ proc receive(d: Protocol, a: Address, msg: Bytes) {.gcsafe, waiter.complete(packet.some) else: debug "TODO: handle packet: ", packet = packet.kind, origin = $node - - else: - debug "Could not decode, respond with whoareyou" + elif decoded == DecodeStatus.PacketError: + debug "Could not decode packet, respond with whoareyou", + localNode = $d.localNode, address = a d.sendWhoareyou(a, sender, authTag) + # No Whoareyou in case it is a Handshake Failure proc waitPacket(d: Protocol, fromNode: Node, reqId: RequestId): Future[Option[Packet]] = result = newFuture[Option[Packet]]("waitPacket") let res = result let key = (fromNode, reqId) - sleepAsync(1000).addCallback() do(data: pointer): + sleepAsync(responseTimeout).addCallback() do(data: pointer): d.awaitedPackets.del(key) if not res.finished: res.complete(none(Packet)) @@ -243,16 +263,17 @@ proc lookupDistances(target, dest: NodeId): seq[uint32] = proc lookupWorker(p: Protocol, destNode: Node, target: NodeId): Future[seq[Node]] {.async.} = let dists = lookupDistances(target, destNode.id) var i = 0 - while i < findNodeAttempts and result.len < findNodeResultLimit: - let r = await p.findNode(destNode, dists[i]) + while i < lookupRequestLimit and result.len < findNodeResultLimit: # TODO: Handle failures + let r = await p.findNode(destNode, dists[i]) + # TODO: I guess it makes sense to limit here also to `findNodeResultLimit`? result.add(r) inc i for n in result: discard p.routingTable.addNode(n) -proc lookup(p: Protocol, target: NodeId): Future[seq[Node]] {.async.} = +proc lookup*(p: Protocol, target: NodeId): Future[seq[Node]] {.async.} = ## Perform a lookup for the given target, return the closest n nodes to the ## target. Maximum value for n is `BUCKET_SIZE`. # TODO: Sort the returned nodes on distance @@ -263,8 +284,6 @@ proc lookup(p: Protocol, target: NodeId): Future[seq[Node]] {.async.} = for node in result: seen.incl(node.id) - const alpha = 3 # Kademlia concurrency factor - var pendingQueries = newSeqOfCap[Future[seq[Node]]](alpha) while true: @@ -275,13 +294,13 @@ proc lookup(p: Protocol, target: NodeId): Future[seq[Node]] {.async.} = pendingQueries.add(p.lookupWorker(n, target)) inc i - debug "discv5 pending queries", total = pendingQueries.len + trace "discv5 pending queries", total = pendingQueries.len if pendingQueries.len == 0: break let idx = await oneIndex(pendingQueries) - debug "Got discv5 lookup response", idx + trace "Got discv5 lookup response", idx let nodes = pendingQueries[idx].read pendingQueries.del(idx) @@ -290,9 +309,11 @@ proc lookup(p: Protocol, target: NodeId): Future[seq[Node]] {.async.} = if result.len < BUCKET_SIZE: result.add(n) -proc lookupRandom*(p: Protocol): Future[seq[Node]] = +proc lookupRandom*(p: Protocol): Future[seq[Node]] + {.raises:[RandomSourceDepleted, Defect, Exception].} = var id: NodeId - discard randomBytes(addr id, sizeof(id)) + if randomBytes(addr id, sizeof(id)) != sizeof(id): + raise newException(RandomSourceDepleted, "Could not randomize bytes") p.lookup(id) proc processClient(transp: DatagramTransport, @@ -312,7 +333,8 @@ proc processClient(transp: DatagramTransport, debug "Receive failed", exception = e.name, msg = e.msg, stacktrace = e.getStackTrace() -proc revalidateNode(p: Protocol, n: Node) {.async.} = +proc revalidateNode(p: Protocol, n: Node) + {.async, raises:[Defect, Exception].} = # TODO: Exception let reqId = newRequestId() var ping: PingPacket ping.enrSeq = p.localNode.record.seqNum @@ -333,17 +355,58 @@ proc revalidateNode(p: Protocol, n: Node) {.async.} = p.routingTable.removeNode(n) proc revalidateLoop(p: Protocol) {.async.} = - while true: - await sleepAsync(rand(10 * 1000).milliseconds) - let n = p.routingTable.nodeToRevalidate() - if not n.isNil: - await p.revalidateNode(n) + try: + # TODO: We need to handle actual errors still, which might just allow to + # continue the loop. However, currently `revalidateNode` raises a general + # `Exception` making this rather hard. + while true: + await sleepAsync(rand(10 * 1000).milliseconds) + let n = p.routingTable.nodeToRevalidate() + if not n.isNil: + # TODO: Should we do these in parallel and/or async to be certain of how + # often nodes are revalidated? + await p.revalidateNode(n) + except CancelledError: + trace "revalidateLoop canceled" + +proc lookupLoop(d: Protocol) {.async.} = + ## TODO: Same story as for `revalidateLoop` + try: + while true: + let nodes = await d.lookupRandom() + trace "Discovered nodes", nodes = $nodes + await sleepAsync(lookupInterval) + except CancelledError: + trace "lookupLoop canceled" proc open*(d: Protocol) = + debug "Starting discovery node", node = $d.localNode # TODO allow binding to specific IP / IPv6 / etc let ta = initTAddress(IPv4_any(), d.localNode.node.address.udpPort) d.transp = newDatagramTransport(processClient, udata = d, local = ta) - asyncCheck d.revalidateLoop() # TODO: This loop has to be terminated on close() + # Might want to move these to a separate proc if this turns out to be needed. + d.lookupLoop = lookupLoop(d) + d.revalidateLoop = revalidateLoop(d) + +proc close*(d: Protocol) = + doAssert(not d.lookupLoop.isNil() or not d.revalidateLoop.isNil()) + doAssert(not d.transp.closed) + + debug "Closing discovery node", node = $d.localNode + d.revalidateLoop.cancel() + d.lookupLoop.cancel() + # TODO: unsure if close can't create issues in the not awaited cancellations + # above + d.transp.close() + +proc closeWait*(d: Protocol) {.async.} = + doAssert(not d.lookupLoop.isNil() or not d.revalidateLoop.isNil()) + doAssert(not d.transp.closed) + + debug "Closing discovery node", node = $d.localNode + await allFutures([d.revalidateLoop.cancelAndWait(), + d.lookupLoop.cancelAndWait()]) + await d.transp.closeWait() proc addNode*(d: Protocol, node: Node) = discard d.routingTable.addNode(node) diff --git a/eth/p2p/discoveryv5/routing_table.nim b/eth/p2p/discoveryv5/routing_table.nim index e4761b0..4a96699 100644 --- a/eth/p2p/discoveryv5/routing_table.nim +++ b/eth/p2p/discoveryv5/routing_table.nim @@ -62,13 +62,13 @@ proc add(k: KBucket, n: Node): Node = k.lastUpdated = epochTime() let nodeIdx = k.nodes.find(n) if nodeIdx != -1: - k.nodes.delete(nodeIdx) - k.nodes.add(n) + k.nodes.delete(nodeIdx) + k.nodes.add(n) elif k.len < BUCKET_SIZE: - k.nodes.add(n) + k.nodes.add(n) else: - k.replacementCache.add(n) - return k.head + k.replacementCache.add(n) + return k.head return nil proc removeNode(k: KBucket, n: Node) = @@ -130,6 +130,7 @@ proc computeSharedPrefixBits(nodes: openarray[Node]): int = proc init*(r: var RoutingTable, thisNode: Node) {.inline.} = r.thisNode = thisNode r.buckets = @[newKBucket(0.u256, high(Uint256))] + randomize() # for later `randomNodes` selection proc splitBucket(r: var RoutingTable, index: int) = let bucket = r.buckets[index] @@ -180,10 +181,10 @@ proc neighbours*(r: RoutingTable, id: NodeId, k: int = BUCKET_SIZE): seq[Node] = result = newSeqOfCap[Node](k * 2) for bucket in r.bucketsByDistanceTo(id): for n in bucket.nodesByDistanceTo(id): - if n.id != id: - result.add(n) - if result.len == k * 2: - break + result.add(n) + if result.len == k * 2: + break + result = sortedByIt(result, it.distanceTo(id)) if result.len > k: result.setLen(k) @@ -215,7 +216,7 @@ proc setJustSeen*(r: RoutingTable, n: Node) = b.nodes[0] = n b.lastUpdated = epochTime() -proc nodeToRevalidate*(r: RoutingTable): Node = +proc nodeToRevalidate*(r: RoutingTable): Node {.raises:[].} = var buckets = r.buckets shuffle(buckets) # TODO: Should we prioritize less-recently-updated buckets instead? @@ -238,6 +239,7 @@ proc randomNodes*(r: RoutingTable, count: int): seq[Node] = # insignificant compared to the time it takes for the network roundtrips when connecting # to nodes. while len(seen) < count: + # TODO: Is it important to get a better random source for these sample calls? let bucket = sample(r.buckets) if bucket.nodes.len != 0: let node = sample(bucket.nodes) diff --git a/eth/p2p/discoveryv5/types.nim b/eth/p2p/discoveryv5/types.nim index 41d0720..a60b01e 100644 --- a/eth/p2p/discoveryv5/types.nim +++ b/eth/p2p/discoveryv5/types.nim @@ -2,12 +2,24 @@ import hashes, stint, ../enode, enr +const + authTagSize* = 12 + idNonceSize* = 32 + aesKeySize* = 128 div 8 + type NodeId* = UInt256 + AuthTag* = array[authTagSize, byte] + IdNonce* = array[idNonceSize, byte] + AesKey* = array[aesKeySize, byte] + + HandshakeKey* = object + nodeId*: NodeId + address*: string # TODO: Replace with Address, need hash WhoareyouObj* = object - authTag*: array[12, byte] - idNonce*: array[32, byte] + authTag*: AuthTag + idNonce*: IdNonce recordSeq*: uint64 Whoareyou* = ref WhoareyouObj @@ -69,12 +81,23 @@ template packetKind*(T: typedesc[SomePacket]): PacketKind = elif T is FindNodePacket: findNode elif T is NodesPacket: nodes -method storeKeys*(db: Database, id: NodeId, address: Address, r, w: array[16, byte]): bool {.base, raises: [Defect].} = discard +method storeKeys*(db: Database, id: NodeId, address: Address, r, w: AesKey): + bool {.base, raises: [Defect].} = discard -method loadKeys*(db: Database, id: NodeId, address: Address, r, w: var array[16, byte]): bool {.base, raises: [Defect].} = discard +method loadKeys*(db: Database, id: NodeId, address: Address, r, w: var AesKey): + bool {.base, raises: [Defect].} = discard proc toBytes*(id: NodeId): array[32, byte] {.inline.} = id.toByteArrayBE() proc hash*(id: NodeId): Hash {.inline.} = - hashData(unsafeAddr id, sizeof(id)) + result = hashData(unsafeAddr id, sizeof(id)) + +# TODO: To make this work I think we also need to implement `==` due to case +# fields in object +proc hash*(address: Address): Hash {.inline.} = + hashData(unsafeAddr address, sizeof(address)) + +proc hash*(key: HandshakeKey): Hash = + result = key.nodeId.hash !& key.address.hash + result = !$result diff --git a/eth/p2p/enode.nim b/eth/p2p/enode.nim index 342e4b0..66e0f5a 100644 --- a/eth/p2p/enode.nim +++ b/eth/p2p/enode.nim @@ -160,3 +160,8 @@ proc `$`*(n: ENode): string = result.add("?") result.add("discport=") result.add($int(n.address.udpPort)) + +proc `$`*(a: Address): string = + result.add($a.ip) + result.add(":" & $a.udpPort) + result.add(":" & $a.tcpPort) diff --git a/eth/p2p/kademlia.nim b/eth/p2p/kademlia.nim index 1d1487a..b92f486 100644 --- a/eth/p2p/kademlia.nim +++ b/eth/p2p/kademlia.nim @@ -175,6 +175,7 @@ proc computeSharedPrefixBits(nodes: openarray[Node]): int = proc init(r: var RoutingTable, thisNode: Node) {.inline.} = r.thisNode = thisNode r.buckets = @[newKBucket(0.u256, high(Uint256))] + randomize() # for later `randomNodes` selection proc splitBucket(r: var RoutingTable, index: int) = let bucket = r.buckets[index] diff --git a/eth/p2p/rlpx_protocols/waku_protocol.nim b/eth/p2p/rlpx_protocols/waku_protocol.nim index 4169619..8269c51 100644 --- a/eth/p2p/rlpx_protocols/waku_protocol.nim +++ b/eth/p2p/rlpx_protocols/waku_protocol.nim @@ -153,11 +153,9 @@ proc read*(rlp: var Rlp, T: typedesc[StatusOptions]): T = let sz = rlp.listLen() # We already know that we are working with a list - discard rlp.enterList() + doAssert rlp.enterList() for i in 0 ..< sz: - if not rlp.enterList(): - raise newException(RlpTypeMismatch, - "List expected, but the source RLP is not a list.") + rlp.tryEnterList() var k: KeyKind try: diff --git a/eth/trie/hexary.nim b/eth/trie/hexary.nim index 60e6576..3585907 100644 --- a/eth/trie/hexary.nim +++ b/eth/trie/hexary.nim @@ -387,7 +387,7 @@ proc replaceValue(data: Rlp, key: NibblesRange, value: BytesRange): Bytes = # XXX: This can be optimized to a direct bitwise copy of the source RLP var iter = data # We already know that we are working with a list - discard iter.enterList() + doAssert iter.enterList() for i in 0 ..< 16: r.append iter iter.skipElem @@ -513,7 +513,7 @@ proc deleteAt(self: var HexaryTrie; var rlpRes = initRlpList(17) var iter = origRlp # We already know that we are working with a list - discard iter.enterList + doAssert iter.enterList for i in 0 ..< 16: rlpRes.append iter iter.skipElem diff --git a/tests/p2p/test_discoveryv5.nim b/tests/p2p/test_discoveryv5.nim index 5159105..96516c2 100644 --- a/tests/p2p/test_discoveryv5.nim +++ b/tests/p2p/test_discoveryv5.nim @@ -1,11 +1,11 @@ import - unittest, chronos, sequtils, chronicles, - eth/keys, eth/p2p/enode, eth/trie/db, - eth/p2p/discoveryv5/[discovery_db, enr, node, types], + random, unittest, chronos, sequtils, chronicles, tables, + eth/[keys, rlp], eth/p2p/enode, eth/trie/db, + eth/p2p/discoveryv5/[discovery_db, enr, node, types, routing_table, encoding], eth/p2p/discoveryv5/protocol as discv5_protocol, ./p2p_test_helper -proc startDiscoveryv5Node*(privKey: PrivateKey, address: Address, +proc initDiscoveryNode*(privKey: PrivateKey, address: Address, bootnodes: seq[Record]): discv5_protocol.Protocol = var db = DiscoveryDB.init(newMemoryDB()) result = newProtocol(privKey, db, @@ -16,20 +16,28 @@ proc startDiscoveryv5Node*(privKey: PrivateKey, address: Address, result.addNode(node) result.open() - result.start() proc nodeIdInNodes(id: NodeId, nodes: openarray[Node]): bool = for n in nodes: if id == n.id: return true +# Creating a random packet with specific nodeid each time +proc randomPacket(tag: PacketTag): seq[byte] = + var + authTag: AuthTag + msg: array[44, byte] + + randomBytes(authTag) + randomBytes(msg) + result.add(tag) + result.add(rlp.encode(authTag)) + result.add(msg) + suite "Discovery v5 Tests": - asyncTest "Discover nodes": + asyncTest "Random nodes": let bootNodeKey = initPrivateKey("a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a617") - bootNodeAddr = localAddress(20301) - bootNode = startDiscoveryv5Node(bootNodeKey, bootNodeAddr, @[]) - bootNodeRecord = initRecord(1, bootNodeKey, - {"udp": bootNodeAddr.udpPort.uint16, "ip": [byte 127, 0, 0, 1]}) + bootNode = initDiscoveryNode(bootNodeKey, localAddress(20301), @[]) let nodeKeys = [ initPrivateKey("a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a618"), @@ -40,16 +48,88 @@ suite "Discovery v5 Tests": for i in 0 ..< nodeKeys.len: nodeAddrs.add(localAddress(20302 + i)) var nodes = zip(nodeKeys, nodeAddrs).mapIt( - startDiscoveryv5Node(it.a, it.b, @[bootNodeRecord])) + initDiscoveryNode(it.a, it.b, @[bootNode.localNode.record])) nodes.add(bootNode) for node in nodes: let discovered = await node.lookupRandom() check discovered.len < nodes.len - debug "Lookup from random id", node=node.localNode, discovered + debug "Lookup from random id", node = node.localNode, discovered # Check for each node if the other nodes shows up in the routing table for i in nodes: for j in nodes: if j != i: check(nodeIdInNodes(i.localNode.id, j.randomNodes(nodes.len - 1))) + + for node in nodes: + await node.closeWait() + + asyncTest "Lookup targets": + const + nodeCount = 17 + + let bootNode = initDiscoveryNode(newPrivateKey(), localAddress(20301), @[]) + + var nodes = newSeqOfCap[discv5_protocol.Protocol](nodeCount) + nodes.add(bootNode) + for i in 1 ..< nodeCount: + nodes.add(initDiscoveryNode(newPrivateKey(), localAddress(20301 + i), + @[bootNode.localNode.record])) + + for i in 0..